Skip to content

Latest commit

 

History

History
104 lines (71 loc) · 6.12 KB

kafka-coordinator-group-GroupCoordinator.adoc

File metadata and controls

104 lines (71 loc) · 6.12 KB

GroupCoordinator manages the GroupMetadataManager. GroupCoordinator uses isActive flag to control whether the GroupMetadataManager was requested to start up (when GroupCoordinator was) that is used in handleListGroups and validateGroupStatus.

partitionFor Method

partitionFor(group: String): Int

partitionFor simply requests the GroupMetadataManager to partitionFor the given group ID.

Note

partitionFor is used when:

handleCommitOffsets Method

handleCommitOffsets(
  groupId: String,
  memberId: String,
  generationId: Int,
  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit

handleCommitOffsets firstly validateGroupStatus (for the given groupId and OFFSET_COMMIT API key).

handleCommitOffsets requests the GroupMetadataManager to get the metadata of the group (by the given groupId) and then doCommitOffsets.

If the GroupMetadataManager could not getGroup, handleCommitOffsets…​FIXME

In case of an error while validateGroupStatus, handleCommitOffsets…​FIXME

Note
handleCommitOffsets is used exclusively when KafkaApis is requested to handle an OffsetCommitRequest.
handleGroupImmigration(offsetTopicPartitionId: Int): Unit

handleGroupImmigration simply requests the GroupMetadataManager to scheduleLoadGroupAndOffsets (for the given offset and with the onGroupLoaded callback)

Note
handleGroupImmigration is used exclusively when KafkaApis is requested to handle a LeaderAndIsrRequest.

handleGroupEmigration Method

handleGroupEmigration(offsetTopicPartitionId: Int): Unit

handleGroupEmigration simply requests the GroupMetadataManager to removeGroupsForPartition (for the given offset and with the onGroupUnloaded callback).

Note
handleGroupEmigration is used when KafkaApis is requested to handle a LeaderAndIsrRequest and a StopReplicaRequest.

Handling Deleted Partitions — handleDeletedPartitions Method

handleDeletedPartitions(
  topicPartitions: Seq[TopicPartition]): Unit

handleDeletedPartitions simply requests the GroupMetadataManager to cleanupGroupMetadata and…​FIXME

Note
handleDeletedPartitions is used when…​FIXME

handleJoinGroup Method

handleJoinGroup(
  groupId: String,
  memberId: String,
  clientId: String,
  clientHost: String,
  rebalanceTimeoutMs: Int,
  sessionTimeoutMs: Int,
  protocolType: String,
  protocols: List[(String, Array[Byte])],
  responseCallback: JoinCallback): Unit

handleJoinGroup starts by validating the status of the group and the coordinator itself. In case of an error, handleJoinGroup uses the given JoinCallback to report it back and returns.

handleJoinGroup validates the group configuration, namely the given sessionTimeoutMs. In case of an error, handleJoinGroup uses the given JoinCallback to report a INVALID_SESSION_TIMEOUT error back and returns.

handleJoinGroup requests the GroupMetadataManager to getGroup by the given groupId.

If the group could not be found and the given memberId is defined (i.e. not empty), handleJoinGroup uses the given JoinCallback to report a UNKNOWN_MEMBER_ID error back and returns.

If the group could not be found and the given memberId is undefined (i.e. empty) or simply the group is available, handleJoinGroup requests the GroupMetadataManager to addGroup followed by doJoinGroup.

Note
handleJoinGroup is used exclusively when KafkaApis is requested to handle a JoinGroupRequest.