Kafka Exception TopologyException
org.apache.kafka.streams.errors.TopologyException
Non-retriable
Streams
Indicates a pre run time error occurred while parsing the Topology logical topology to construct the ProcessorTopology physical processor topology.
Common Causes
- Building a topology with no source: calling `StreamsBuilder.build()` / starting `KafkaStreams` when no `.stream()`/`.table()` subscribes to any input topic or GlobalKTable, giving 'Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.'
- State store wiring/ordering errors: connecting a processor to a store before `addStateStore(...)` registered it ('StateStore X is not added yet'), reusing a node name, or adding a source for a topic already consumed, throwing at build time from InternalTopologyBuilder validation.
- Auto-generated repartition/internal node name collisions: e.g. KAFKA-10659 where cogroup over multiple repartitioned KGroupedStreams produces a duplicate internal processor name ('Processor ...-repartition-filter is already added'), or join repartition topics colliding when names aren't set explicitly.
- Invalid topic/store/processor names: empty names, duplicate processor/source/sink names, or a sink writing to a topic that is also a changelog — all rejected during topology construction (this is a pre-runtime, non-retriable error).
Solutions
- Ensure the topology actually subscribes to input: add at least one `builder.stream("topic")`, `builder.table(...)`, or `builder.globalTable(...)`. In Spring, a Kafka Streams `@KafkaListener` does NOT create a topology — define a `Function`/`Consumer` binding or a `KStream` bean.
- Add state stores before the processors that use them: call `topology.addStateStore(storeBuilder, "processorName")` after the processor exists, or pass the store names in `process(...)`/`transform(...)`; keep all node names unique.
- Name repartition/join steps explicitly to dodge auto-name collisions: use `Grouped.as("name")` on `groupByKey()`/`groupBy()`, `Repartitioned.as(...)`, and `StreamJoined.withName(...)`/`Joined.as(...)` (the documented workaround for KAFKA-10659 and join repartition-name reuse).
- Catch TopologyException at startup (it is thrown synchronously from `build()`/`new KafkaStreams(...)`), log `topology.describe()`, and fix the DSL before deploying — it will never resolve by retrying.
Example Stack Trace
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.buildTopology(InternalTopologyBuilder.java:1158)
at org.apache.kafka.streams.processor.internals.TopologyMetadata.buildAndRewriteTopology(TopologyMetadata.java:289)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:1003)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:868)
at com.example.MyStreamsApp.main(MyStreamsApp.java:42)Diagnostic Commands
System.out.println(builder.build().describe()); # print the topology graph to verify sources/processors/stores and spot duplicate or missing nodesRelated
Related Streams exceptions: BrokerNotFoundException · InternalTopicsAlreadySetupException · InvalidStateStoreException · InvalidStateStorePartitionException · LockException · MisconfiguredInternalTopicException · MissingInternalTopicsException · MissingSourceTopicException
Hitting
TopologyException in production? Conduktor Console gives you real-time visibility into clients, consumer groups, and broker health. Browse every Kafka exception or protocol error code.