TsFile-Hadoop-Connector User Guide
关于 TsFile-Hadoop-Connector
TsFile-Hadoop-Connector 实现了 Hadoop 对 Tsfile 类型的外部数据源的支持。这使用户能够通过Hadoop读取,写入和查询Tsfile。
使用此连接器,咱们就可以:
- 将单个 TsFile 从本地文件系统或 hdfs 加载到 Hadoop 中
- 将特定目录中的所有文件(从本地文件系统或HDFS加载到Hadoop中)
- 将数据从 Hadoop 写入 TsFile
系统要求
Hadoop Version | Java Version | TsFile Version |
|
|
|
数据类型对应
TsFile data type | Hadoop writable |
BOOLEAN | BooleanWritable |
INT32 | IntWritable |
INT64 | LongWritable |
FLOAT | FloatWritable |
DOUBLE | DoubleWritable |
TEXT | Text |
TSFInput格式说明
TSFInputFormat 从 tsfile 中提取数据,并将其格式化为 .MapWritable
假设我们要提取名为该设备的数据,该设备具有三个名为 、、 的传感器。d1s1s2s3
s1的类型为 ,的类型为 ,的类型为 。BOOLEANs2DOUBLEs3TEXT
结构将如下所示:MapWritable
{"time_stamp": 10000000,"device_id": d1,"s1": true,"s2": 3.14,"s3": "middle"}
在 Hadoop 的 Map 作业中,你可以按键获取任何你想要的值,如下所示:
mapwritable.get(new Text("s1"))
注意:中的所有键的类型均为 。MapWritableText
阅读示例:计算总和
首先,我们应该告诉 InputFormat 我们想要从 tsfile 获得什么样的数据。
// configure reading time enableTSFInputFormat.setReadTime(job, true);// configure reading deviceId enableTSFInputFormat.setReadDeviceId(job, true);// configure reading which deltaObjectIdsString[] deviceIds = {"device_1"};TSFInputFormat.setReadDeviceIds(job, deltaObjectIds);// configure reading which measurementIdsString[] measurementIds = {"sensor_1", "sensor_2", "sensor_3"};TSFInputFormat.setReadMeasurementIds(job, measurementIds);
然后,应指定映射器和化简器的输出键和值
// set inputformat and outputformatjob.setInputFormatClass(TSFInputFormat.class);// set mapper output key and valuejob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoubleWritable.class);// set reducer output key and valuejob.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);
然后,和类是你如何处理类产生的。mapperreducerMapWritableTSFInputFormat
public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, DoubleWritable> {@Overrideprotected void map(NullWritable key, MapWritable value,Mapper<NullWritable, MapWritable, Text, DoubleWritable>.Context context)throws IOException, InterruptedException {Text deltaObjectId = (Text) value.get(new Text("device_id"));context.write(deltaObjectId, (DoubleWritable) value.get(new Text("sensor_3")));}}public static class TSReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<DoubleWritable> values,Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)throws IOException, InterruptedException {double sum = 0;for (DoubleWritable value : values) {sum = sum + value.get();}context.write(key, new DoubleWritable(sum));}}
注意:有关完整的代码,请参阅以下链接:https://github.com/apache/incubator-iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb//hadoop/tsfile/TSFMRReadExample.java
写入示例:将平均值写入 Tsfile
除了 ,Hadoop-map-reduce 作业的其余配置代码几乎与上面相同。OutputFormatClass
job.setOutputFormatClass(TSFOutputFormat.class);// set reducer output key and valuejob.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(HDFSTSRecord.class);
然后,和类是你如何处理类产生的。mapperreducerMapWritableTSFInputFormat
public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, MapWritable> {@Overrideprotected void map(NullWritable key, MapWritable value,Mapper<NullWritable, MapWritable, Text, MapWritable>.Context context)throws IOException, InterruptedException {Text deltaObjectId = (Text) value.get(new Text("device_id"));long timestamp = ((LongWritable)value.get(new Text("timestamp"))).get();if (timestamp % 100000 == 0) {context.write(deltaObjectId, new MapWritable(value));}}}/*** This reducer calculate the average value.*/public static class TSReducer extends Reducer<Text, MapWritable, NullWritable, HDFSTSRecord> {@Overrideprotected void reduce(Text key, Iterable<MapWritable> values,Reducer<Text, MapWritable, NullWritable, HDFSTSRecord>.Context context) throws IOException, InterruptedException {long sensor1_value_sum = 0;long sensor2_value_sum = 0;double sensor3_value_sum = 0;long num = 0;for (MapWritable value : values) {num++;sensor1_value_sum += ((LongWritable)value.get(new Text("sensor_1"))).get();sensor2_value_sum += ((LongWritable)value.get(new Text("sensor_2"))).get();sensor3_value_sum += ((DoubleWritable)value.get(new Text("sensor_3"))).get();}HDFSTSRecord tsRecord = new HDFSTSRecord(1L, key.toString());DataPoint dPoint1 = new LongDataPoint("sensor_1", sensor1_value_sum / num);DataPoint dPoint2 = new LongDataPoint("sensor_2", sensor2_value_sum / num);DataPoint dPoint3 = new DoubleDataPoint("sensor_3", sensor3_value_sum / num);tsRecord.addTuple(dPoint1);tsRecord.addTuple(dPoint2);tsRecord.addTuple(dPoint3);context.write(NullWritable.get(), tsRecord);}}
注意:有关完整的代码,请参阅以下链接:https://github.com/apache/incubator-iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb//hadoop/tsfile/TSMRWriteExample.java










