代码
package com.*.spark
import java.text.SimpleDateFormat
import com.alibaba.fastjson.JSONObject
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import com.*.*.myutil.KafkaHelp
object WechatDataToKafka{
private final val logger: Logger = Logger.getLogger(WechatDataToKafka.getClass)
logger.setLevel(Level.INFO)
var topic = "wechat_user_all_data"
var hbaseTable = "crawl:wechat_user_regionalization"
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hdfs")
val conf = new SparkConf().setAppName("WechatDataToKafka")
val sc = new SparkContext(conf)
val topicBC = sc.broadcast(topic)
sc.hadoopConfiguration.set("fs.defaultFS", "hdfs://nameservice1")
sc.hadoopConfiguration.set("dfs.nameservices", "nameservice1")
val hBaseConf: Configuration = HBaseConfiguration.create()
hBaseConf.addResource("hbase-site.xml")
hBaseConf.set("hbase.zookeeper.quorum", "fwqml016.zh,fwqml018.zh,fwqml009.zh")
hBaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTable)
hBaseConf.setInt("hbase.rpc.timeout", 200000)
hBaseConf.setInt("hbase.client.operation.timeout", 200000)
hBaseConf.setInt("hbase.client.scanner.timeout.period", 3600000)
hBaseConf.setInt("hbase.client.retries.number",6)
hBaseConf.setInt("zookeeper.recovery.retry",3)
hBaseConf.setInt("zookeeper.recovery.retry.intervalmill",200)
val scan = new Scan()
scan.setMaxVersions(1)
val articleRDD = sc.newAPIHadoopRDD(
hBaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
println("@@@count : "+articleRDD.count())
val resultRDD = articleRDD.repartition(30).map({ case (_, result) => {
val dataJson = new JSONObject()
result.rawCells().foreach(cell => {
val columnBytes: Array[Byte] = CellUtil.cloneQualifier(cell)
val valueBytes: Array[Byte] = CellUtil.cloneValue(cell)
val qualifierByte = cell.getQualifierArray
if (qualifierByte != null && qualifierByte.nonEmpty) {
if (valueBytes != null && valueBytes.length != 0) {
val column = Bytes.toString(columnBytes)
val value = Bytes.toString(valueBytes)
dataJson.put(column, value)
}
}
})
dataJson.toJSONString
}
})
resultRDD.foreachPartition(p=>{
val tp = topicBC.value
val kafParams = KafkaHelp.getKafParams()
var producer: KafkaProducer[String, String]= null
try {
producer = new KafkaProducer[String, String](kafParams)
p.foreach(data => {
val record = new ProducerRecord[String, String](tp, data)
producer.send(record)
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (producer != null) producer.close()
}
})
sc.stop()
}
def tranTimeToLong(tm:String) :Long={
val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val dt = fm.parse(tm)
val tim: Long = dt.getTime()
tim
}
}
工具类代码
package com.*.*.myutil
import org.apache.kafka.clients.producer.ProducerConfig
object KafkaHelp {
def getKafParams() = {
val hashMap = new java.util.HashMap[String, Object]()
hashMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,PropertiesUtil.getStringByKey("bt.default.brokers","kafkaConfig.properties"))
hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getStringByKey("default.key_serializer_class_config","kafkaConfig.properties"))
hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getStringByKey("default.value_serializer_class_config","kafkaConfig.properties"))
hashMap.put(ProducerConfig.BATCH_SIZE_CONFIG,PropertiesUtil.getStringByKey("default.batch_size_config","kafkaConfig.properties"))
hashMap.put(ProducerConfig.LINGER_MS_CONFIG,PropertiesUtil.getStringByKey("default.linger_ms_config","kafkaConfig.properties"))
hashMap.put(ProducerConfig.ACKS_CONFIG,PropertiesUtil.getStringByKey("default.acks","kafkaConfig.properties"))
hashMap.put(ProducerConfig.RETRIES_CONFIG,PropertiesUtil.getStringByKey("retries","kafkaConfig.properties"))
hashMap
}
}
依赖
<properties>
<spark.version>2.1.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
</dependencies>