Spring Boot With Kafka Communication

Introduction

Creating a Producer

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
@Component
public class KafkaProducer {

@Value("${topic.name}")
private String topicName;

private KafkaTemplate kafkaTemplate;

public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@Scheduled(cron = "*/2 * * * * *")
public void sendMessage() {
UUID key = UUID.randomUUID();
Message payload = new Message("jack");
System.out.println("Sending Data " + payload);

ProducerRecord<String, Message> record = new ProducerRecord<String, Message>(topicName,
key.toString(),
payload);

kafkaTemplate.send(record);
}
}
spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
client-id: my-client-consumer
group-id: spring-application-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.amrut.prabhu.kafkacommunicationservice.dto.converters.MessageDeSerializer
producer:
client-id: my-client-application
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.amrut.prabhu.kafkacommunicationservice.dto.converters.MessageSerializer

topic:
name: "first-topic"
public class MessageSerializer implements Serializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, Message data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}

}
}
--------------------------------------------------------------------
public class MessageDeSerializer implements Deserializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public Message deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, Message.class);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}

Creating a Consumer

@Component
public class KafkaConsumer {

@KafkaListener(id = "my-client-application", topics = "${topic.name}")
public void consumer(ConsumerRecord<String, Message> consumerRecord) {
System.out.println("Consumed Record Details: " + consumerRecord);
Message message = consumerRecord.value();
System.out.println("Consumed Message" + message);
}
}
java -jar \ 
target/spring-kafka-communication-service-0.0.1-SNAPSHOT.jar

Conclusion

--

--

--

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Why Ruby on Rails is Perfect for eCommerce Web Development in 2020

Why Ruby on Rails is Widely used for Web Application Development

My Journey to DevOps Mastery

Static vs Shared Libraries

“How to make Data Science in a Box possible”-KubeCon+CloudNativeCon

Software Development Best Practices — Test Driven Development

Project: Kubernetes Reverse Proxy Deployment

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Amrut Prabhu

Amrut Prabhu

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles

More from Medium

Swagger 2 with the Spring Boot

Apache Kafka with Spring Boot Application

Spring Boot Admin: How to use it

Microservices [Part 2] — With Maven, Spring Boot, And Docker