conduktor.io ↗

Kafka ShareFetch Response Wire Format v2 — Binary Protocol Layout

What is ShareFetch?

The main data path for share group consumers, fetching records and acknowledging previously fetched ones in a single round-trip (KIP-932). Records are locked on fetch with an acquisition timeout; if a consumer doesn't acknowledge within that window, the records become available for re-delivery.

Related Errors

FENCED_STATE_EPOCH · GROUP_AUTHORIZATION_FAILED · INVALID_REQUEST · INVALID_SHARE_SESSION_EPOCH · SHARE_SESSION_LIMIT_REACHED · SHARE_SESSION_NOT_FOUND · UNSUPPORTED_VERSION

Wire Diagram

Response Header · flexible
message_size int32 · 4B
correlation_id int32 · 4B
tagged var
ShareFetchResponse v2
ThrottleTimeMs int32 · 4B
ErrorCode int16 · 2B
ErrorMessage? string (compact)
AcquisitionLockTimeoutMs int32 · 4B
Responses array
NodeEndpoints array
tagged var
TopicId uuid · 16B
Partitions array
tagged var
PartitionIndex int32 · 4B
ErrorCode int16 · 2B
ErrorMessage? string (compact)
AcknowledgeErrorCode int16 · 2B
AcknowledgeErrorMessage? string (compact)
CurrentLeader struct
Records records (compact)
AcquiredRecords array
tagged var
LeaderId int32 · 4B
LeaderEpoch int32 · 4B
tagged var
FirstOffset int64 · 8B
LastOffset int64 · 8B
DeliveryCount int16 · 2B
tagged var
NodeId int32 · 4B
Host string (compact)
Port int32 · 4B
Rack? string (compact)
tagged var

Schema & Example

Schema
{
  "ThrottleTimeMs": int32,
  "ErrorCode": int16,
  "ErrorMessage": string?,
  "AcquisitionLockTimeoutMs": int32,
  "Responses": [{
      "TopicId": uuid,
      "Partitions": [{
          "PartitionIndex": int32,
          "ErrorCode": int16,
          "ErrorMessage": string?,
          "AcknowledgeErrorCode": int16,
          "AcknowledgeErrorMessage": string?,
          "CurrentLeader": {
            "LeaderId": int32,
            "LeaderEpoch": int32
          },
          "Records": records,
          "AcquiredRecords": [{
              "FirstOffset": int64,
              "LastOffset": int64,
              "DeliveryCount": int16
          }]
      }]
  }],
  "NodeEndpoints": [{
      "NodeId": int32,
      "Host": string,
      "Port": int32,
      "Rack": string?
  }]
}
Example
{
  "ThrottleTimeMs": 0,
  "ErrorCode": 0,
  "ErrorMessage": null,
  "AcquisitionLockTimeoutMs": 5000,
  "Responses": [{
      "TopicId": "550e8400-e29b-41d4-a716-446655440000",
      "Partitions": [{
          "PartitionIndex": 3,
          "ErrorCode": 0,
          "ErrorMessage": "NETWORK_EXCEPTION",
          "AcknowledgeErrorCode": 1,
          "AcknowledgeErrorMessage": "NETWORK_EXCEPTION",
          "CurrentLeader": {
            "LeaderId": 1,
            "LeaderEpoch": 17
          },
          "Records": "<binary>",
          "AcquiredRecords": [{
              "FirstOffset": 150382,
              "LastOffset": 150382,
              "DeliveryCount": 10
          }]
      }]
  }],
  "NodeEndpoints": [{
      "NodeId": 1,
      "Host": "broker-1.kafka.local",
      "Port": 9092,
      "Rack": null
  }]
}