Spring Cloud Stream With Kafka

Introduction

Spring Cloud Stream, is a mechanism by which you can decouple the implementation of your producers and consumers from the type of messaging infrastructure you want to use. This allows us to keep our consumers and producers to be broker agnostic and we can easily switch to a different kind of broker by changing the binder implementation.

Creating a Producer and Consumer

Let’s go to https://start.spring.io and create an application with the spring cloud streams dependency.

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
@Bean
public Supplier<Message> producer() {
return () -> new Message(" jack from Streams");
}
@Bean
public Consumer<Message> consumer() {
return message -> System.out.println("received " + message);
}
spring:
cloud:
function:
definition: consumer;producer
stream:
bindings:
producer-out-0:
destination : first-topic
consumer-in-0:
destination : first-topic
consumer : <functionName> + -in- + <index>
producer : <functionName> + -out- + <index>

Sending Messages on Demand

To send a message on-demand is pretty simple. We need to use StreamBridge , to send a message as follows.

@Component
public class KafkaProducer {

@Autowired
private StreamBridge streamBridge;

@Scheduled(cron = "*/2 * * * * *")
public void sendMessage(){
streamBridge.send("producer-out-0",new Message(" jack from Stream bridge"));
}
}

Kafka Binder Properties

Now to configure the Kafka binder to send to the right broker, We have set the broker properties as follows.

spring:
cloud:
function:
definition: consumer;producer
stream:
kafka:
binder:
brokers: localhost:9092

bindings:
producer-out-0:
destination : first-topic
consumer-in-0:
destination : first-topic
public class MessageSerializer implements Serializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, Message data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}

}
}
--------------------------------------------------------------------
public class MessageDeSerializer implements Deserializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public Message deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(new String(data), Message.class);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}
spring:
cloud:
function:
definition: consumer;producer
stream:
kafka:
bindings:
producer-out-0:
producer:
configuration:
value.serializer: com.amrut.prabhu.dto.coverters.MessageSerializer
consumer-in-0:
consumer:
configuration:
value.deserializer: com.amrut.prabhu.dto.coverters.MessageDeSerializer
binder:
brokers: localhost:9092

bindings:
producer-out-0:
destination : first-topic
producer:
useNativeEncoding: true # Enables using the custom serializer
consumer-in-0:
destination : first-topic
consumer:
use-native-decoding: true # Enables using the custom deserializer
java -jar \
target/spring-cloud-stream-kafka-communication-0.0.1-SNAPSHOT.jar

Conclusion

In this article, we saw how we can use Spring Cloud Streams to send and receive messages from a Kafka topic. We saw how we define the binders and then used the Kafka binder dependency to send messages to the Kafka broker.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Amrut Prabhu

Amrut Prabhu

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles