没有合适的资源?快使用搜索试试~ 我知道了~
首页hive Hcatalog streaming API使用
hive streaming hive传统的数据导入采用批量导入的方式,这中数据导入难以满足实时性的要求。hive streaming提供了数据流式写入的API,这样外部数据可以连续不断的写入hive中。 必备条件 hive streaming 需要配合hive 事务表使用,表的数据存储格式式必须为 orc 在 hive-site.xml 中设置如下参数以支持hive事务表hive.txn.manager =org.apache.hadoop.hive.ql.lockmgr.DbTxnManager hive.compactor.initiator.on = true (See more im
资源详情
资源评论
资源推荐

hive Hcatalog streaming API使用使用
hive streaming
hive传统的数据导入采用批量导入的方式,这中数据导入难以满足实时性的要求。hive streaming提供了数据流式写入的API,这样外部数据可以连续不断的写入hive中。
必备条件必备条件
hive streaming 需要配合hive 事务表使用,表的数据存储格式式必须为 orc
在 hive-site.xml 中设置如下参数以支持hive事务表hive.txn.manager =org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on = true (See more important details here)
hive.compactor.worker.threads > 0
建表时指定表为事务表 tblproperties(“transactional”=“true”)
hive表必须为分区分桶表
案例案例
hadoop版本:2.6.5
hive版本:1.2.2
1.在hive中新建一张表test.t3
CREATE TABLE t3 (id INT, name STRING, address STRING) partitioned by (country string) CLUSTERED BY (id) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true');
2.代码
```
public class HiveStreamingDemo {
/**
* DelimitedInputWriter使用
* @throws InterruptedException
* @throws StreamingException
* @throws ClassNotFoundException
*/
public static void delimitedInputWriterDemo() throws InterruptedException, StreamingException, ClassNotFoundException {
String dbName = "test";
String tblName = "t3";
List partitionVals = new ArrayList(1);
partitionVals.add("china");
HiveEndPoint hiveEP = new HiveEndPoint("thrift://192.168.61.146:9083", dbName, tblName, partitionVals);
String[] fieldNames = new String[3];
fieldNames[0] = "id";
fieldNames[1] = "name";
fieldNames[2] = "address";
StreamingConnection connection = hiveEP.newConnection(true);
DelimitedInputWriter writer =
new DelimitedInputWriter(fieldNames,",", hiveEP);
TransactionBatch txnBatch= connection.fetchTransactionBatch(10, writer);
txnBatch.beginNextTransaction();
for (int i = 0 ; i < 100; ++i) {
txnBatch.write((i + ",zhangsan,beijing").getBytes());
}
txnBatch.commit();
txnBatch.close();
connection.close();
}
/**
* StrictJsonWriter 使用
* @throws StreamingException
* @throws InterruptedException
*/
public static void strictJsonWriterDemo() throws StreamingException, InterruptedException {
String dbName = "test";
String tblName = "t3";
List partitionVals = new ArrayList(1);
partitionVals.add("china");
HiveEndPoint hiveEP = new HiveEndPoint("thrift://192.168.61.146:9083", dbName, tblName, partitionVals);
StreamingConnection connection = hiveEP.newConnection(true);
StrictJsonWriter writer = new StrictJsonWriter(hiveEP);
TransactionBatch txnBatch= connection.fetchTransactionBatch(10, writer);
txnBatch.beginNextTransaction();
for (int i = 0 ; i < 10; ++i) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", i);
jsonObject.put("name", "chenli" + i);
jsonObject.put("address", "beijing");
txnBatch.write(jsonObject.toJSONString().getBytes());
}
txnBatch.commit();
txnBatch.close();
connection.close();
}
public static void main(String[] args) throws InterruptedException, StreamingException, ClassNotFoundException {
strictJsonWriterDemo();


















安全验证
文档复制为VIP权益,开通VIP直接复制

评论0