0
点赞
收藏
分享

微信扫一扫

kafka入门踩坑 连接超时问题

紫荆峰 2022-05-04 阅读 45
kafka

入门案例

public static void main(String[] args) throws ExecutionException, InterruptedException  {
        Map<String, Object> configs = new HashMap<String, Object>();
        // 指定初始连接用到的broker地址
        configs.put("bootstrap.servers", "192.168.0.103:9092");
        // 指定key的序列化类
        configs.put("key.serializer", IntegerSerializer.class);
        // 指定value的序列化类
        configs.put("value.serializer", StringSerializer.class);

//        configs.put("acks", "all");
//        configs.put("reties", "3");

        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);

        // 用于设置用户自定义的消息头字段
        List<Header> headers = new ArrayList<Header>();
        headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));

        ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
                "topic_1",
                0,
                0,
                "hello hqk 0",
                headers
        );

        // 消息的同步确认
        final Future<RecordMetadata> future = producer.send(record);
        final RecordMetadata metadata = future.get();
        System.out.println("消息的主题:" + metadata.topic());
        System.out.println("消息的分区号:" + metadata.partition());
        System.out.println("消息的偏移量:" + metadata.offset());


        // 关闭生产者
        producer.close();
    }
WARN Client session timed out, have not heard from server in 30000ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
	at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:262)
	at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:119)
	at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1881)
	at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:376)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)

查了一些资料可能的原因有:
1.kafka中config目录下的server.properties配置的zookeeper是否错误
2.2181以及9092的端口是否被占用
3.防火墙需要关闭
4.连接zookeeper端口是2181
5.高版本kafka创建主题时的参数不是--zookeeper 而是--bootstrap-server

最后发现都不是

需要在kafka的  /config/service.properties中,最后添加上一句host.name=自己的服务器ip

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

host.name=192.168.0.103

然后重启kafka 再次连接

[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [192.168.0.103:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.2
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 2a121f7b1d402825
消息的主题:topic_1
消息的分区号:0
消息的偏移量:8
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

Process finished with exit code 0

发送成功

举报

相关推荐

0 条评论