Schema ID Required on Produce (Gateway)
Every record produced to a non-internal topic must carry a Schema Registry schema ID and that ID must resolve in the registry — enforced at the Gateway.
Rationale
A ResourcePolicy that requires a `schema-subject` label is policy theater: nothing forces the producer to actually prepend a schema ID to the payload, and the broker accepts plain JSON or arbitrary bytes regardless of label. Conduktor Gateway's `SchemaPayloadValidationPolicyPlugin` inspects every produce request, requires the Confluent-style magic byte + schema ID on the record value, and resolves that ID against the configured Schema Registry. Enforcement happens at the wire, cannot be bypassed by clients, and matches what auditors expect.
Pattern
Every produced record value starts with [magic byte 0x00 | 4-byte schema ID] and that ID resolves in Schema Registry
Examples
Producer uses KafkaAvroSerializer → bytes [0x00 | schemaId | avro-payload] → accepted
Internal topic __consumer_offsets → matches "topic" regex carve-out → accepted
Producer sends plain JSON to prod.orders.placed.v1 → no magic byte → INVALID_RECORD
Producer sends bytes with schema ID 999 (not in registry) → unresolved → INVALID_RECORD
Parameters
| Name | Default | Description |
|---|---|---|
topic |
"^(?!__|_).+$" |
Regex of topics the interceptor enforces. Default exempts internal topics prefixed by `_` or `__`. |
schemaIdRequired |
true |
true rejects records with no schema ID prepended. |
validateSchema |
false |
When true, also deserializes the record and validates fields against the schema. Heavier — start with false. |
action |
"BLOCK" |
BLOCK rejects; INFO only audits; THROTTLE delays the client. |
schemaRegistry |
"${SR_URL}" |
Schema Registry endpoint. Use ${env:...} or ${secret:...} templating. |
Implementation
Drop this YAML into Conduktor Console as a ResourcePolicy, then link it from an ApplicationInstance, Application, or KafkaCluster.
# Conduktor Gateway Interceptor — wire-layer enforcement. # Plugin: io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin # Docs: https://docs.conduktor.io/guide/conduktor-in-production/admin/gateway-policies # Scope: apply at vCluster, group, or username level for finer rollout. # Notes: # - Requires Gateway audit log to be configured (the policy is a no-op otherwise). # - Two related plugins exist: # SchemaPayloadValidationPolicyPlugin ← does registry lookup (this one) # TopicRequiredSchemaIdPolicyPlugin ← byte-level only, no registry call # We pick the validation plugin because we want the schema ID to actually # resolve in SR, not just be a syntactically valid 4-byte prefix. --- apiVersion: gateway/v2 kind: Interceptor metadata: name: schema-id-required-on-produce scope: vCluster: passthrough spec: pluginClass: io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin priority: 100 config: topic: "^(?!__|_).+$" schemaIdRequired: true validateSchema: false action: BLOCK schemaRegistryConfig: host: "${SR_URL}" type: CONFLUENT
Related policies
Try Conduktor Console
Enforce policies like this across your team — central audit history, pre-commit guardrails, ApplicationInstance bindings. 5-min Docker install.
Get Started →