Spring Cloud Stream With Kafka

Introduction

Creating a Producer and Consumer

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
@Bean
public Supplier<Message> producer() {
return () -> new Message(" jack from Streams");
}
@Bean
public Consumer<Message> consumer() {
return message -> System.out.println("received " + message);
}
spring:
cloud:
function:
definition: consumer;producer
stream:
bindings:
producer-out-0:
destination : first-topic
consumer-in-0:
destination : first-topic
consumer : <functionName> + -in- + <index>
producer : <functionName> + -out- + <index>

Sending Messages on Demand

@Component
public class KafkaProducer {

@Autowired
private StreamBridge streamBridge;

@Scheduled(cron = "*/2 * * * * *")
public void sendMessage(){
streamBridge.send("producer-out-0",new Message(" jack from Stream bridge"));
}
}

Kafka Binder Properties

spring:
cloud:
function:
definition: consumer;producer
stream:
kafka:
binder:
brokers: localhost:9092

bindings:
producer-out-0:
destination : first-topic
consumer-in-0:
destination : first-topic
public class MessageSerializer implements Serializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, Message data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}

}
}
--------------------------------------------------------------------
public class MessageDeSerializer implements Deserializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public Message deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(new String(data), Message.class);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}
spring:
cloud:
function:
definition: consumer;producer
stream:
kafka:
bindings:
producer-out-0:
producer:
configuration:
value.serializer: com.amrut.prabhu.dto.coverters.MessageSerializer
consumer-in-0:
consumer:
configuration:
value.deserializer: com.amrut.prabhu.dto.coverters.MessageDeSerializer
binder:
brokers: localhost:9092

bindings:
producer-out-0:
destination : first-topic
producer:
useNativeEncoding: true # Enables using the custom serializer
consumer-in-0:
destination : first-topic
consumer:
use-native-decoding: true # Enables using the custom deserializer
java -jar \
target/spring-cloud-stream-kafka-communication-0.0.1-SNAPSHOT.jar

Conclusion

--

--

--

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Azure Synapse Spark Pool: PySpark Upsert Function for Azure SQL

RAILGUN Weekly Update, April 25, 2022

Flutter — the Good, the Bad and the Ugly (Part 1)

The Pain of Infrequent Deployments, Release Trains and Lengthy Sprints — Codefresh

2.5D Platformer — Moving Platforms #5

The 10 Best Developer and Tech News Resources

Building a 10x Cloud Team

Terraform at LumApps: Part 1

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Amrut Prabhu

Amrut Prabhu

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles

More from Medium

Audit Log Service-Evaluating Different Approaches

REST API Load performance testing with Apache JMeter

Spring Boot Microservices — Part7 — Event Driven Using RabbitMQ

[Spring Boot] Testing Apache Camel SEDA Endpoint