conduktor.io ↗

Kafka StreamsGroupHeartbeat Request Wire Format v0 — Binary Protocol Layout

What is StreamsGroupHeartbeat?

Streams groups send topology metadata, task assignments, and changelog offsets in heartbeats so the broker can make globally optimal task placement decisions (KIP-1071). This moves rebalancing intelligence from the client-side assignor into the broker, enabling smoother scaling and standby task management.

Related Errors

COORDINATOR_NOT_AVAILABLE · FENCED_MEMBER_EPOCH · GROUP_AUTHORIZATION_FAILED · STREAMS_INVALID_TOPOLOGY · TOPIC_AUTHORIZATION_FAILED · UNSUPPORTED_VERSION

Wire Diagram

Request Header · flexible
message_size int32 · 4B
api_key int16 · 2B
api_version int16 · 2B
correlation_id int32 · 4B
client_id string (compact)
tagged var
StreamsGroupHeartbeatRequest v0
GroupId string (compact)
MemberId string (compact)
MemberEpoch int32 · 4B
EndpointInformationEpoch int32 · 4B
InstanceId? string (compact)
RackId? string (compact)
RebalanceTimeoutMs int32 · 4B
Topology? struct
ActiveTasks? array
StandbyTasks? array
WarmupTasks? array
ProcessId? string (compact)
UserEndpoint? struct
ClientTags? array
TaskOffsets? array
TaskEndOffsets? array
ShutdownApplication bool · 1B
tagged var
Epoch int32 · 4B
Subtopologies array
tagged var
SubtopologyId string (compact)
SourceTopics []string
SourceTopicRegex []string
StateChangelogTopics array
RepartitionSinkTopics []string
RepartitionSourceTopics array
CopartitionGroups array
tagged var
Name string (compact)
Partitions int32 · 4B
ReplicationFactor int16 · 2B
TopicConfigs array
tagged var
Key string (compact)
Value string (compact)
tagged var
Name string (compact)
Partitions int32 · 4B
ReplicationFactor int16 · 2B
TopicConfigs array
tagged var
Key string (compact)
Value string (compact)
tagged var
SourceTopics []int16
SourceTopicRegex []int16
RepartitionSourceTopics []int16
tagged var
SubtopologyId string (compact)
Partitions []int32
tagged var
SubtopologyId string (compact)
Partitions []int32
tagged var
SubtopologyId string (compact)
Partitions []int32
tagged var
Host string (compact)
Port uint16 · 2B
tagged var
Key string (compact)
Value string (compact)
tagged var
SubtopologyId string (compact)
Partition int32 · 4B
Offset int64 · 8B
tagged var
SubtopologyId string (compact)
Partition int32 · 4B
Offset int64 · 8B
tagged var

Schema & Example

Schema
{
  "GroupId": string,
  "MemberId": string,
  "MemberEpoch": int32,
  "EndpointInformationEpoch": int32,
  "InstanceId": string?,
  "RackId": string?,
  "RebalanceTimeoutMs": int32,
  "Topology": {
    "Epoch": int32,
    "Subtopologies": [{
        "SubtopologyId": string,
        "SourceTopics": [string],
        "SourceTopicRegex": [string],
        "StateChangelogTopics": [{
            "Name": string,
            "Partitions": int32,
            "ReplicationFactor": int16,
            "TopicConfigs": [{
                "Key": string,
                "Value": string
            }]
        }],
        "RepartitionSinkTopics": [string],
        "RepartitionSourceTopics": [{
            "Name": string,
            "Partitions": int32,
            "ReplicationFactor": int16,
            "TopicConfigs": [{
                "Key": string,
                "Value": string
            }]
        }],
        "CopartitionGroups": [{
            "SourceTopics": [int16],
            "SourceTopicRegex": [int16],
            "RepartitionSourceTopics": [int16]
        }]
    }]
  }?,
  "ActiveTasks": [{
      "SubtopologyId": string,
      "Partitions": [int32]
  }]?,
  "StandbyTasks": [{
      "SubtopologyId": string,
      "Partitions": [int32]
  }]?,
  "WarmupTasks": [{
      "SubtopologyId": string,
      "Partitions": [int32]
  }]?,
  "ProcessId": string?,
  "UserEndpoint": {
    "Host": string,
    "Port": uint16
  }?,
  "ClientTags": [{
      "Key": string,
      "Value": string
  }]?,
  "TaskOffsets": [{
      "SubtopologyId": string,
      "Partition": int32,
      "Offset": int64
  }]?,
  "TaskEndOffsets": [{
      "SubtopologyId": string,
      "Partition": int32,
      "Offset": int64
  }]?,
  "ShutdownApplication": bool
}
Example
{
  "GroupId": "order-processors",
  "MemberId": "consumer-1-abc123",
  "MemberEpoch": 12,
  "EndpointInformationEpoch": 12,
  "InstanceId": null,
  "RackId": "abc-123",
  "RebalanceTimeoutMs": 300000,
  "Topology": {
    "Epoch": 17,
    "Subtopologies": [{
        "SubtopologyId": "abc-123",
        "SourceTopics": ["value-1", "value-2"],
        "SourceTopicRegex": ["value-1", "value-2"],
        "StateChangelogTopics": [{
            "Name": "orders",
            "Partitions": 0,
            "ReplicationFactor": 1,
            "TopicConfigs": [{
                "Key": "retention.ms",
                "Value": "604800000"
            }]
        }],
        "RepartitionSinkTopics": ["value-1", "value-2"],
        "RepartitionSourceTopics": [{
            "Name": "orders",
            "Partitions": 0,
            "ReplicationFactor": 1,
            "TopicConfigs": [{
                "Key": "retention.ms",
                "Value": "604800000"
            }]
        }],
        "CopartitionGroups": [{
            "SourceTopics": [1, 2],
            "SourceTopicRegex": [1, 2],
            "RepartitionSourceTopics": [1, 2]
        }]
    }]
  }?,
  "ActiveTasks": [{
      "SubtopologyId": "abc-123",
      "Partitions": [1, 2, 3]
  }]?,
  "StandbyTasks": [{
      "SubtopologyId": "abc-123",
      "Partitions": [1, 2, 3]
  }]?,
  "WarmupTasks": [{
      "SubtopologyId": "abc-123",
      "Partitions": [1, 2, 3]
  }]?,
  "ProcessId": "abc-123",
  "UserEndpoint": {
    "Host": "broker-1.kafka.local",
    "Port": 9092
  }?,
  "ClientTags": [{
      "Key": "retention.ms",
      "Value": "604800000"
  }]?,
  "TaskOffsets": [{
      "SubtopologyId": "abc-123",
      "Partition": 0,
      "Offset": 284729
  }]?,
  "TaskEndOffsets": [{
      "SubtopologyId": "abc-123",
      "Partition": 0,
      "Offset": 284729
  }]?,
  "ShutdownApplication": true
}