conduktor.io ↗

Schema ID Required on Produce (Gateway)

Every record produced to a non-internal topic must carry a Schema Registry schema ID and that ID must resolve in the registry — enforced at the Gateway.

Rationale

A ResourcePolicy that requires a `schema-subject` label is policy theater: nothing forces the producer to actually prepend a schema ID to the payload, and the broker accepts plain JSON or arbitrary bytes regardless of label. Conduktor Gateway's `SchemaPayloadValidationPolicyPlugin` inspects every produce request, requires the Confluent-style magic byte + schema ID on the record value, and resolves that ID against the configured Schema Registry. Enforcement happens at the wire, cannot be bypassed by clients, and matches what auditors expect.

Pattern

Every produced record value starts with [magic byte 0x00 | 4-byte schema ID] and that ID resolves in Schema Registry

Examples

Producer uses KafkaAvroSerializer → bytes [0x00 | schemaId | avro-payload] → accepted
Internal topic __consumer_offsets → matches "topic" regex carve-out → accepted
Producer sends plain JSON to prod.orders.placed.v1 → no magic byte → INVALID_RECORD
Producer sends bytes with schema ID 999 (not in registry) → unresolved → INVALID_RECORD

Parameters

NameDefaultDescription
topic "^(?!__|_).+$" Regex of topics the interceptor enforces. Default exempts internal topics prefixed by `_` or `__`.
schemaIdRequired true true rejects records with no schema ID prepended.
validateSchema false When true, also deserializes the record and validates fields against the schema. Heavier — start with false.
action "BLOCK" BLOCK rejects; INFO only audits; THROTTLE delays the client.
schemaRegistry "${SR_URL}" Schema Registry endpoint. Use ${env:...} or ${secret:...} templating.

Implementation

Drop this YAML into Conduktor Console as a ResourcePolicy, then link it from an ApplicationInstance, Application, or KafkaCluster.

Conduktor ResourcePolicy
# Conduktor Gateway Interceptor — wire-layer enforcement.
# Plugin: io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin
# Docs:   https://docs.conduktor.io/guide/conduktor-in-production/admin/gateway-policies
# Scope:  apply at vCluster, group, or username level for finer rollout.
# Notes:
#   - Requires Gateway audit log to be configured (the policy is a no-op otherwise).
#   - Two related plugins exist:
#       SchemaPayloadValidationPolicyPlugin  ← does registry lookup (this one)
#       TopicRequiredSchemaIdPolicyPlugin    ← byte-level only, no registry call
#     We pick the validation plugin because we want the schema ID to actually
#     resolve in SR, not just be a syntactically valid 4-byte prefix.
---
apiVersion: gateway/v2
kind: Interceptor
metadata:
  name: schema-id-required-on-produce
  scope:
    vCluster: passthrough
spec:
  pluginClass: io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin
  priority: 100
  config:
    topic: "^(?!__|_).+$"
    schemaIdRequired: true
    validateSchema: false
    action: BLOCK
    schemaRegistryConfig:
      host: "${SR_URL}"
      type: CONFLUENT

Related policies

Try Conduktor Console

Enforce policies like this across your team — central audit history, pre-commit guardrails, ApplicationInstance bindings. 5-min Docker install.

Get Started →