Spring Cloud Stream With Kafka

Introduction

Creating a Producer and Consumer

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
@Bean
public Supplier<Message> producer() {
return () -> new Message(" jack from Streams");
}
@Bean
public Consumer<Message> consumer() {
return message -> System.out.println("received " + message);
}
spring:
cloud:
function:
definition: consumer;producer
stream:
bindings:
producer-out-0:
destination : first-topic
consumer-in-0:
destination : first-topic
consumer : <functionName> + -in- + <index>
producer : <functionName> + -out- + <index>

Sending Messages on Demand

@Component
public class KafkaProducer {

@Autowired
private StreamBridge streamBridge;

@Scheduled(cron = "*/2 * * * * *")
public void sendMessage(){
streamBridge.send("producer-out-0",new Message(" jack from Stream bridge"));
}
}

Kafka Binder Properties

spring:
cloud:
function:
definition: consumer;producer
stream:
kafka:
binder:
brokers: localhost:9092

bindings:
producer-out-0:
destination : first-topic
consumer-in-0:
destination : first-topic
public class MessageSerializer implements Serializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, Message data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}

}
}
--------------------------------------------------------------------
public class MessageDeSerializer implements Deserializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public Message deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(new String(data), Message.class);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}
spring:
cloud:
function:
definition: consumer;producer
stream:
kafka:
bindings:
producer-out-0:
producer:
configuration:
value.serializer: com.amrut.prabhu.dto.coverters.MessageSerializer
consumer-in-0:
consumer:
configuration:
value.deserializer: com.amrut.prabhu.dto.coverters.MessageDeSerializer
binder:
brokers: localhost:9092

bindings:
producer-out-0:
destination : first-topic
producer:
useNativeEncoding: true # Enables using the custom serializer
consumer-in-0:
destination : first-topic
consumer:
use-native-decoding: true # Enables using the custom deserializer
java -jar \
target/spring-cloud-stream-kafka-communication-0.0.1-SNAPSHOT.jar

Conclusion

--

--

--

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Building a Job Posting Platform with FaunaDB and Apollo

Ruby superpowers — with great power comes a great prepend

QR Scanner/Generator in Flutter

Temperature sensor library for Raspberry Pi written in Go

Getting started with Apache Allura and Contribution Guide

Interview with Keynote Speaker Jan Jaap Cannegieter

READ/DOWNLOAD$ Introduction to Robotics: Analysis,

Integrate with Dynamics CRM Online 2016 using Web Api

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Amrut Prabhu

Amrut Prabhu

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles

More from Medium

[Spring Boot] Unit Testing Kafka Producer with EmbeddedKafkaBroker

Service-to-service Spring 5 + OAuth2 integration

Spring Data JDBC: Implementing Domain Driven Design Aggregate

Spring Cloud Stream With Apache Kafka [Springboot]