conduktor.io ↗

Kafka StreamsGroupHeartbeat Response Wire Format v0 — Binary Protocol Layout

What is StreamsGroupHeartbeat?

Streams groups send topology metadata, task assignments, and changelog offsets in heartbeats so the broker can make globally optimal task placement decisions (KIP-1071). This moves rebalancing intelligence from the client-side assignor into the broker, enabling smoother scaling and standby task management.

Related Errors

COORDINATOR_NOT_AVAILABLE · GROUP_AUTHORIZATION_FAILED · STREAMS_INVALID_TOPOLOGY · STREAMS_INVALID_TOPOLOGY_EPOCH · STREAMS_TOPOLOGY_FENCED · TOPIC_AUTHORIZATION_FAILED · UNSUPPORTED_VERSION

Wire Diagram

Response Header · flexible
message_size int32 · 4B
correlation_id int32 · 4B
tagged var
StreamsGroupHeartbeatResponse v0
ThrottleTimeMs int32 · 4B
ErrorCode int16 · 2B
ErrorMessage? string (compact)
MemberId string (compact)
MemberEpoch int32 · 4B
HeartbeatIntervalMs int32 · 4B
AcceptableRecoveryLag int32 · 4B
TaskOffsetIntervalMs int32 · 4B
Status? array
ActiveTasks? array
StandbyTasks? array
WarmupTasks? array
EndpointInformationEpoch int32 · 4B
PartitionsByUserEndpoint? array
tagged var
StatusCode int8 · 1B
StatusDetail string (compact)
tagged var
SubtopologyId string (compact)
Partitions []int32
tagged var
SubtopologyId string (compact)
Partitions []int32
tagged var
SubtopologyId string (compact)
Partitions []int32
tagged var
UserEndpoint struct
ActivePartitions array
StandbyPartitions array
tagged var
Host string (compact)
Port uint16 · 2B
tagged var
Topic string (compact)
Partitions []int32
tagged var
Topic string (compact)
Partitions []int32
tagged var

Schema & Example

Schema
{
  "ThrottleTimeMs": int32,
  "ErrorCode": int16,
  "ErrorMessage": string?,
  "MemberId": string,
  "MemberEpoch": int32,
  "HeartbeatIntervalMs": int32,
  "AcceptableRecoveryLag": int32,
  "TaskOffsetIntervalMs": int32,
  "Status": [{
      "StatusCode": int8,
      "StatusDetail": string
  }]?,
  "ActiveTasks": [{
      "SubtopologyId": string,
      "Partitions": [int32]
  }]?,
  "StandbyTasks": [{
      "SubtopologyId": string,
      "Partitions": [int32]
  }]?,
  "WarmupTasks": [{
      "SubtopologyId": string,
      "Partitions": [int32]
  }]?,
  "EndpointInformationEpoch": int32,
  "PartitionsByUserEndpoint": [{
      "UserEndpoint": {
        "Host": string,
        "Port": uint16
      },
      "ActivePartitions": [{
          "Topic": string,
          "Partitions": [int32]
      }],
      "StandbyPartitions": [{
          "Topic": string,
          "Partitions": [int32]
      }]
  }]?
}
Example
{
  "ThrottleTimeMs": 0,
  "ErrorCode": 0,
  "ErrorMessage": "NETWORK_EXCEPTION",
  "MemberId": "consumer-1-abc123",
  "MemberEpoch": 12,
  "HeartbeatIntervalMs": 3000,
  "AcceptableRecoveryLag": 0,
  "TaskOffsetIntervalMs": 150382,
  "Status": [{
      "StatusCode": 1,
      "StatusDetail": ""
  }]?,
  "ActiveTasks": [{
      "SubtopologyId": "abc-123",
      "Partitions": [1, 2, 3]
  }]?,
  "StandbyTasks": [{
      "SubtopologyId": "abc-123",
      "Partitions": [1, 2, 3]
  }]?,
  "WarmupTasks": [{
      "SubtopologyId": "abc-123",
      "Partitions": [1, 2, 3]
  }]?,
  "EndpointInformationEpoch": 12,
  "PartitionsByUserEndpoint": [{
      "UserEndpoint": {
        "Host": "broker-1.kafka.local",
        "Port": 9092
      },
      "ActivePartitions": [{
          "Topic": "orders",
          "Partitions": [1, 2, 3]
      }],
      "StandbyPartitions": [{
          "Topic": "orders",
          "Partitions": [1, 2, 3]
      }]
  }]?
}