在这里我们先来梳理一下consumeGroup的相关知识
1、首先,我们会给每个consume设置groupId,对于相同groupId且订阅相同topic的consume,会组成consumeGroup,如图一所示
 
2、对于Server端的topic来说,会有partition这个概念,如图二所示
 
3、现在我们有多个consume及多个partition,到底由哪个consume来消费哪个partition呢?就由consume启动时的分区分配策略来决定。
-  如果consume数量小于partition的数量,则一个consume有可能消费多个分区,如图三所示 
  
-  如果consume数量大于partition的数量,则会有consume线程空跑,如图四所示 
  
4、kafka的内置topic:consumer_offsets专门记录消费位点信息,既然是内置topic,那自然也会有partition及partition leader的概念,对于同一个groupId的消费位点都会记录在同一个partition中,在这篇文章中findCoordinator即是找到该groupId对应的partition的leader节点,我们知道这个节点才能将位移信息提交到这里保存,如果该partition还有其他副本,则该节点还会与其他副本同步位移信息。与该节点交互都是由GroupCoordinator完成的。
 
findCoordinator流程展示

客户端源码分析
这里还是放一下findCoordinator的代码,看其他consume的代码就发现客户端跟kafkaServer通信的格式大多是这样的,如果通信一次发现该GroupCoordinator的信息还未获取到则继续重试,直到超时,这里的超时时间即为poll时传入的超时时间,这个时间设置贯穿了整个consume的运行代码。
    protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
        //如果还未加入group则与group通信
        if (!coordinatorUnknown())
            return true;
        do {
            if (findCoordinatorException != null && !(findCoordinatorException instanceof RetriableException)) {
                final RuntimeException fatalException = findCoordinatorException;
                findCoordinatorException = null;
                throw fatalException;
            }
            final RequestFuture<Void> future = lookupCoordinator();
            client.poll(future, timer);
          //如果还没回调完成则说明是超时的
            if (!future.isDone()) {
                // ran out of time
                break;
            }
            if (future.failed()) {
                if (future.isRetriable()) {
                    log.debug("Coordinator discovery failed, refreshing metadata");
                    client.awaitMetadataUpdate(timer);
                } else
                    throw future.exception();
                //获取group的信息之后client会与group对应的节点建立连接,如果不可用则还会重试
            } else if (coordinator != null && client.isUnavailable(coordinator)) {
                // we found the coordinator, but the connection has failed, so mark
                // it dead and backoff before retrying discovery
                markCoordinatorUnknown();
                timer.sleep(rebalanceConfig.retryBackoffMs);
            }
            //如果与group通信成功则会跳出循环
        } while (coordinatorUnknown() && timer.notExpired());
        return !coordinatorUnknown();
    }
这里还有一点,跟踪代码可以看到以下代码在每次check以及与Server端通信完成之后都会有一样的逻辑,可以仔细思考一下,coordinator即获取到的group节点对象,client.isUnavailable(coordinator)是在与group建立连接,每次判断coordinator不为空且client与group连接失败,则将coordinator置空,为什么会这样呢?很有可能是请求到group的信息之后发现该节点已下线或者不可用,此时服务端很有可能也在进行选举,所以我们需要将coordinator清空,待服务端选举完成后再次通信。
  protected synchronized Node checkAndGetCoordinator() {
        if (coordinator != null && client.isUnavailable(coordinator)) {
            markCoordinatorUnknown(true);
            return null;
        }
        return this.coordinator;
    }
org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator
这段代码有个亮点就是先寻找的负载最小节点,然后与该节点通信获取group节点的信息。
   protected synchronized RequestFuture<Void> lookupCoordinator() {
        if (findCoordinatorFuture == null) {
            // find a node to ask about the coordinator
            //与最小负载的node通信
            Node node = this.client.leastLoadedNode();
            if (node == null) {
                log.debug("No broker available to send FindCoordinator request");
                return RequestFuture.noBrokersAvailable();
            } else {
                findCoordinatorFuture = sendFindCoordinatorRequest(node);
                // remember the exception even after the future is cleared so that
                // it can still be thrown by the ensureCoordinatorReady caller
                findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
                    @Override
                    public void onSuccess(Void value) {} // do nothing
                    @Override
                    public void onFailure(RuntimeException e) {
                        findCoordinatorException = e;
                    }
                });
            }
        }
        return findCoordinatorFuture;
    }
org.apache.kafka.clients.NetworkClient#leastLoadedNode
我们先来看看是如何寻找负载最小节点的,这里代码还是挺讲究的,首先就是取随机数,防止每次都从第一个节点连接,如果判断没有在途的request则直接返回该节点,否则取在途request最小的节点,如果该节点不存在,则依次取连接的节点、需要重试的节点,如果找到不为null的节点则返回该节点,否则返回null。
public Node leastLoadedNode(long now) {
        List<Node> nodes = this.metadataUpdater.fetchNodes();
        if (nodes.isEmpty())
            throw new IllegalStateException("There are no nodes in the Kafka cluster");
        int inflight = Integer.MAX_VALUE;
        Node foundConnecting = null;
        Node foundCanConnect = null;
        Node foundReady = null;
        //随机取一个节点
        int offset = this.randOffset.nextInt(nodes.size());
        for (int i = 0; i < nodes.size(); i++) {
            int idx = (offset + i) % nodes.size();
            Node node = nodes.get(idx);
            //如果该节点是可连接的,且selector空闲,且发送队列空闲则可以发送请求
            if (canSendRequest(node.idString(), now)) {
                //inFlightRequests记录了已发送请求但还未收到response的request,这里判定如果该节点没有这种数据则直接作为最小负载节点返回
                int currInflight = this.inFlightRequests.count(node.idString());
                if (currInflight == 0) {
                    // if we find an established connection with no in-flight requests we can stop right away
                    log.trace("Found least loaded node {} connected with no in-flight requests", node);
                    return node;
                    //否则取inFlightRequests中最小count的节点作为最小负载节点
                } else if (currInflight < inflight) {
                    // otherwise if this is the best we have found so far, record that
                    inflight = currInflight;
                    foundReady = node;
                }
            } else if (connectionStates.isPreparingConnection(node.idString())) {
                foundConnecting = node;
            } else if (canConnect(node, now)) {
                //如果该节点未被记录或者断连之后超过重试时间,则允许设置该节点
                foundCanConnect = node;
            } else {
                log.trace("Removing node {} from least loaded node selection since it is neither ready " +
                        "for sending or connecting", node);
            }
        }
        // We prefer established connections if possible. Otherwise, we will wait for connections
        // which are being established before connecting to new nodes.
        //优先取状态良好的节点
        if (foundReady != null) {
            log.trace("Found least loaded node {} with {} inflight requests", foundReady, inflight);
            return foundReady;
        } else if (foundConnecting != null) {
            log.trace("Found least loaded connecting node {}", foundConnecting);
            return foundConnecting;
        } else if (foundCanConnect != null) {
            log.trace("Found least loaded node {} with no active connection", foundCanConnect);
            return foundCanConnect;
        } else {
            log.trace("Least loaded node selection failed to find an available node");
            return null;
        }
    }
拆解FindCoordinatorRequest
通过下图我们来看看发送了哪些数据,key_type有两种枚举,一种是GROUP,另一种是TRANSACTION,如果type为GROUP的话那key就是groupId
 
服务端源码分析
kafka.server.KafkaApis#handleFindCoordinatorRequest
服务端还是通过KafkaApi来处理请求,代码也比较简单。
 def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
    val findCoordinatorRequest = request.body[FindCoordinatorRequest]
    //校验数据
    //……省略部分代码
      // get metadata (and create the topic if necessary)
      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
        case CoordinatorType.GROUP =>
            //4.1 找到对应发分区
          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
            //4.2 获取对应的元数据
          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
          (partition, metadata)
        case CoordinatorType.TRANSACTION =>
          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
          (partition, metadata)
        case _ =>
          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
      }
      //组装返回参数
     //……省略部分代码
    }
  }
kafka.coordinator.group.GroupMetadataManager#partitionFor
我们知道consume消费后对应的位点是保存在kafka的内部名为"__consumer_offsets"的内置topic中,内置topic初始化时由"offsets.topic.num.partitions “参数来决定分区数,默认值是50,相同consumeGroup的offset最终会保存在其中一个分区中,而保存在哪个分区就由下面这段代码来决定,可以看到逻辑很简单,就是取groupId的hashCode,然后对总的分区数取模。比如groupId为"consume_group”,最终就会在34号分区保存位点。
  def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
kafka.server.KafkaApis#getOrCreateInternalTopic
这里是先从当前node的元数据缓存中拿到对应topic的数据,如果没有,则创建。从这段代码也可以猜想kafka内置topic的创建原理,是一种懒加载的思想,当第一个consume接入之后才会创建对应topicPartition文件。
  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponse.TopicMetadata = {
    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
  }
这里的topicMetadata就是对应入参topic返回类似列表的对象,因为入参只有一个topic,所以直接取第一个数据,数据结构见下图,可以更直观的理解返回参数。
 
 
kafka.server.KafkaApis#createTopic
这里特地再放一下创建内置topic的代码,就是在zk中写入以下数据
 1、/config/topics/{topic}节点写入以下动态配置:
 {“version”:1,“config”:{“segment.bytes”:“104857600”,“compression.type”:“producer”,“cleanup.policy”:“compact”}。
 其中segment.bytes是根据服务端配置offsets.topic.segment.bytes 来设置的,默认为104857600,这个配置是针对topic的压缩策略,这块涉及到的是日志模块,后面再细讲。
 2、/brokers/topics/{topic}节点写入各个分区副本信息,这块涉及到topic的创建分区分配策略了,后面在细讲。
 这种做法也挺有意思的,说明如果我们创建topic不通过脚本,直接在zk中写入数据,也是可以创建成功。在这里也可以猜想一下实现逻辑,无非就是触发了服务端controller对zk节点的监听,根据写入的ar数据创建对应的topicPartition。
 这块还有另一个点需要注意,我们知道kafka创建topic是需要时间的,而这里的实现方式是往zk中写入数据触发创建topic流程,是一种异步方式,往zk中写入数据之后会返回一个error,LEADER_NOT_AVAILABLE,待创建topic的流程走完,并同步各个节点metaData之后,最后从metaData中取到该节点信息findCoordinatorRequest才会成功返回。
  private def createTopic(topic: String,
                          numPartitions: Int,
                          replicationFactor: Int,
                          properties: util.Properties = new util.Properties()): MetadataResponse.TopicMetadata = {
    try {
      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
        .format(topic, numPartitions, replicationFactor))
      new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
        util.Collections.emptyList())
    } catch {
      case _: TopicExistsException => // let it go, possibly another broker created this topic
        new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
          util.Collections.emptyList())
      case ex: Throwable  => // Catch all to prevent unhandled errors
        new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, isInternal(topic),
          util.Collections.emptyList())
    }
  }
拆解FindCoordinatorResponse
通过下图我们来看看返回了哪些数据,可以看到前面取了很多数据,最终拼到返回参数里面的只有leader所在的节点信息
 
总结
这块代码本身不是很复杂,主要是有一些细节需要考虑,通过仔细思量这些细节对我们今后分析consume异常会大有好处。流程总结如下
 1、寻找最小负载节点信息
 2、向最小负载节点发送FindCoordinatorRequest
 3、最小负载节点处理该请求。
- 首先找到该groupId对应的分区
- 通过内存中缓存的metaData获取该分区的信息,如果不存在则创建topic
- 返回查找到的分区leader信息









