JsonDeserializer<Purchase> purchaseJsonDeserializer = new JsonDeserializer<>(Purchase.class);
JsonSerializer<Purchase> purchaseJsonSerializer = new JsonSerializer<>();
JsonSerializer<RewardAccumulator> rewardAccumulatorJsonSerializer = new JsonSerializer<>();
JsonSerializer<PurchasePattern> purchasePatternJsonSerializer = new JsonSerializer<>();
StringDeserializer stringDeserializer = new StringDeserializer();
StringSerializer stringSerializer = new StringSerializer();
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.addSource("SOURCE", stringDeserializer, purchaseJsonDeserializer, "src-topic")
.addProcessor("PROCESS", CreditCardAnonymizer::new, "SOURCE")
.addProcessor("PROCESS2", PurchasePatterns::new, "PROCESS")
.addProcessor("PROCESS3", CustomerRewards::new, "PROCESS")
// kafka(src-topic) -> SOURCE -> PROCESS -+-> PROCESS2
// +-> PROCESS3
.addSink("SINK", "patterns", stringSerializer, purchasePatternJsonSerializer, "PROCESS2")
.addSink("SINK2", "rewards", stringSerializer, rewardAccumulatorJsonSerializer, "PROCESS3")
.addSink("SINK3", "purchases", stringSerializer, purchaseJsonSerializer, "PROCESS");
KafkaStreams streaming = new KafkaStreams(topologyBuilder, streamingConfig);
streaming.start();