0
点赞
收藏
分享

微信扫一扫

SQL关于日期的计算合集

仲秋花似锦 2023-10-24 阅读 11

自动配置 Kafka

整理服务器内容时,发现一个测试 Kafka 的脚本,它可以自动配置 Kafka 部署信息 ,指定三个参数,完成 Kafka 的配置文件的修改。

basePath=$1
brokerId=$2
zookeeperConnect=$3
localIp=`ifconfig |grep inet| awk '{print $2}'| head -1`
echo $localIp
cd $basePath
tar -xvf kafka_2.12-2.3.0.tgz
mv kafka_2.12-2.3.0 kafka
cd kafka/config

sed -i -c "s/broker.id=.*/broker.id=${brokerId}/i" server.properties
sed -i -c "s/zookeeper.connect=.*/zookeeper.connect=${zookeeperConnect}/i" server.properties
echo "listeners=PLAINTEXT://$localIp:9092" >> server.properties

Kafka Java 测试代码

```typescript
public static void main(String[] args) {
        Properties kafkaProps = new Properties();
        kafkaProps.put(DataShareConstant.ACKS, DataShareConstant.DEFAULT_ACKS);

        kafkaProps.put(DataShareConstant.KAFKA_PRODUCER_TYPE, DataShareConstant.SYNC);
        //Avro map
        kafkaProps.put(DataShareConstant.VALUE_SERIALIZER, DataShareConstant.AVRO_MAP_SERIALIZER);

        kafkaProps.put(DataShareConstant.KEY_SERIALIZER, DataShareConstant.DEFAULT_SERIALIZER);

        kafkaProps.put(DataShareConstant.BOOTSTRAP_SERVERS, "localhost:9092");

        //默认是30000ms
        kafkaProps.put(DataShareConstant.REQUEST_TIMEOUT, "5000");
        kafkaProps.put("transaction.timeout.ms", "5000");
        kafkaProps.put("max.block.ms", "6000"); // 该属性决定连接超时的
        kafkaProps.put(DataShareConstant.BATCH_SIZE, "1048576");
        kafkaProps.put(DataShareConstant.LINGER, "10");
        kafkaProps.put(DataShareConstant.BUFFER_MEMORY, "33554432");

        KafkaProducer kafkaProducer  = null;
        try{
            kafkaProducer = new KafkaProducer("mytestkafka", kafkaProps);
        } catch (Exception e) {
            logger.info("Construct producer error {}", e.getMessage());
        }

        Map<String, Object> testMapData = new HashMap<>(1);
        testMapData.put("DATA", "Kafka测试");
        testMapData.put("TIME", DateFormatUtils.format(System.currentTimeMillis(), CommonConstant.EsIndexDayFormat + " HH:mm:ss"));
        Future send = kafkaProducer2.send(testMapData);

        assert send != null;
        try {
            send.get();
            System.out.println("send ok.");
        } catch (Exception e) {
            String errorMsg = e.getMessage();
            System.out.println(errorMsg);
        } finally {
            kafkaProducer2.close();
        }
    }

启示录

整理草稿箱,发现有些都遗忘的代码片段,分享出来,以备有用之时。IT 技术知识,感觉浩瀚无边的啊,真正用到的又有多少呢?

JDK半年一个新版本,可是一个 JDK 8 就够了,20% 的技术知识就能解决 80% 的问题;剩下20%,是大概率也不会遇到的复杂问题。

举报

相关推荐

0 条评论