0
点赞
收藏
分享

微信扫一扫

spark读取hbase数据发送致kafka

乱世小白 2022-04-27 阅读 84

代码

package com.*.spark

import java.text.SimpleDateFormat

import com.alibaba.fastjson.JSONObject
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
// TODO 自己的工具类路径
import com.*.*.myutil.KafkaHelp

object WechatDataToKafka{
  private final val logger: Logger = Logger.getLogger(WechatDataToKafka.getClass)
  logger.setLevel(Level.INFO)
  var topic = "wechat_user_all_data"
  var hbaseTable = "crawl:wechat_user_regionalization"
  def main(args: Array[String]): Unit = {
//    val topic = args(0)
    System.setProperty("HADOOP_USER_NAME", "hdfs") //用户
    val conf = new SparkConf().setAppName("WechatDataToKafka")
    val sc = new SparkContext(conf)
    val topicBC = sc.broadcast(topic)
    sc.hadoopConfiguration.set("fs.defaultFS", "hdfs://nameservice1")
    sc.hadoopConfiguration.set("dfs.nameservices", "nameservice1")
    val hBaseConf: Configuration = HBaseConfiguration.create()
    hBaseConf.addResource("hbase-site.xml")
    hBaseConf.set("hbase.zookeeper.quorum", "fwqml016.zh,fwqml018.zh,fwqml009.zh")
    hBaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTable)
    //设置hbase 超时时间
    hBaseConf.setInt("hbase.rpc.timeout", 200000)
    hBaseConf.setInt("hbase.client.operation.timeout", 200000)
    hBaseConf.setInt("hbase.client.scanner.timeout.period", 3600000)
    //重试次数
    hBaseConf.setInt("hbase.client.retries.number",6)
    hBaseConf.setInt("zookeeper.recovery.retry",3)
    hBaseConf.setInt("zookeeper.recovery.retry.intervalmill",200)

    val scan = new Scan()
    scan.setMaxVersions(1)
    val articleRDD = sc.newAPIHadoopRDD(
      hBaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )
    println("@@@count : "+articleRDD.count())
    val resultRDD = articleRDD.repartition(30).map({ case (_, result) => {
      val dataJson = new JSONObject()
      result.rawCells().foreach(cell => {
        val columnBytes: Array[Byte] = CellUtil.cloneQualifier(cell)
        val valueBytes: Array[Byte] = CellUtil.cloneValue(cell)
        val qualifierByte = cell.getQualifierArray
        if (qualifierByte != null && qualifierByte.nonEmpty) {
          if (valueBytes != null && valueBytes.length != 0) {
            val column = Bytes.toString(columnBytes)
            val value = Bytes.toString(valueBytes)
            dataJson.put(column, value)
          }
        }
      })
      dataJson.toJSONString
    }
    })
    resultRDD.foreachPartition(p=>{
      val tp = topicBC.value
      val kafParams = KafkaHelp.getKafParams()
      var producer: KafkaProducer[String, String]= null
      try {
        producer = new KafkaProducer[String, String](kafParams)
        p.foreach(data => {
          val record = new ProducerRecord[String, String](tp, data)
          producer.send(record)
        })
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (producer != null) producer.close()
      }
    })
    sc.stop()
  }
  def tranTimeToLong(tm:String) :Long={
    val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val dt = fm.parse(tm)
    val tim: Long = dt.getTime()
    tim
  }
}

工具类代码

package com.*.*.myutil

import org.apache.kafka.clients.producer.ProducerConfig

/**
 * 2021/11/8 11:27
 * @author psn
 * deploy
 */
object KafkaHelp {
	def getKafParams() = {
		val hashMap = new java.util.HashMap[String, Object]()
		// 设置连接
		hashMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,PropertiesUtil.getStringByKey("bt.default.brokers","kafkaConfig.properties"))
		// 设置序列化
		hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getStringByKey("default.key_serializer_class_config","kafkaConfig.properties"))
		hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getStringByKey("default.value_serializer_class_config","kafkaConfig.properties"))
		// 设置一次批次发送大小
		hashMap.put(ProducerConfig.BATCH_SIZE_CONFIG,PropertiesUtil.getStringByKey("default.batch_size_config","kafkaConfig.properties"))
		// 批次提交时间限制
		hashMap.put(ProducerConfig.LINGER_MS_CONFIG,PropertiesUtil.getStringByKey("default.linger_ms_config","kafkaConfig.properties"))
		// ack
		hashMap.put(ProducerConfig.ACKS_CONFIG,PropertiesUtil.getStringByKey("default.acks","kafkaConfig.properties"))
		hashMap.put(ProducerConfig.RETRIES_CONFIG,PropertiesUtil.getStringByKey("retries","kafkaConfig.properties"))
		hashMap
	}
}

依赖

 <properties>
    <spark.version>2.1.1</spark.version>
  </properties>
    <dependencies>
	<dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>1.3.1</version>
      <exclusions>
        <exclusion>
          <artifactId>jdk.tools</artifactId>
          <groupId>jdk.tools</groupId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>1.3.1</version>
      <exclusions>
        <exclusion>
          <artifactId>jdk.tools</artifactId>
          <groupId>jdk.tools</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.14</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>
   </dependencies>
举报

相关推荐

0 条评论