spark stream上创建对象实例

阅读 29

2023-04-11


kafkaStream.foreachRDD { rdd =>

            rdd.foreachPartition { partition =>
                //每次在这里创建一个kafka producer实例是不可取的
            }
       }

上边的方式会造通过kafka producer发送的数据部分丢失.
应该用如下的方式, 下边的这种方式就不会造成数据的丢失:

kafkaStream.foreachRDD { rdd =>

            rdd.foreachPartition { partition =>

                var database = new File(geodataPath)

                val producer = getKafka(kafkaMetaDataBrokerList)
            }
       }

    var kafkaProducer:Producer = null

    def getKafka(kafkaMetaDataBrokerList:String) = {

        if(kafkaProducer == null){
            kafkaProducer = new Producer(kafkaMetaDataBrokerList);
        }
        kafkaProducer
    }


精彩评论(0)

0 0 举报