Spring Boot With Kafka Communication

Introduction

Creating a Producer

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
@Component
public class KafkaProducer {

@Value("${topic.name}")
private String topicName;

private KafkaTemplate kafkaTemplate;

public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@Scheduled(cron = "*/2 * * * * *")
public void sendMessage() {
UUID key = UUID.randomUUID();
Message payload = new Message("jack");
System.out.println("Sending Data " + payload);

ProducerRecord<String, Message> record = new ProducerRecord<String, Message>(topicName,
key.toString(),
payload);

kafkaTemplate.send(record);
}
}
spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
client-id: my-client-consumer
group-id: spring-application-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.amrut.prabhu.kafkacommunicationservice.dto.converters.MessageDeSerializer
producer:
client-id: my-client-application
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.amrut.prabhu.kafkacommunicationservice.dto.converters.MessageSerializer

topic:
name: "first-topic"
public class MessageSerializer implements Serializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, Message data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}

}
}
--------------------------------------------------------------------
public class MessageDeSerializer implements Deserializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public Message deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, Message.class);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}

Creating a Consumer

@Component
public class KafkaConsumer {

@KafkaListener(id = "my-client-application", topics = "${topic.name}")
public void consumer(ConsumerRecord<String, Message> consumerRecord) {
System.out.println("Consumed Record Details: " + consumerRecord);
Message message = consumerRecord.value();
System.out.println("Consumed Message" + message);
}
}
java -jar \ 
target/spring-kafka-communication-service-0.0.1-SNAPSHOT.jar

Conclusion

--

--

--

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

How does “learn submit” accomplish a pull request through a terminal window?

CODEWARS : TOP 25 SQL INTERVIEW QUESTIONS TO ACE THE DATA ANALYST INTERVIEW

Er diagram explained

Automating Decision Making With Low-Code Platforms

Automate a flow without any automation setup!!

SQL for Data Science is way more than SELECT *

A beautiful view of the sky in the night with brigth stars

The top four things to get right when implementing Snowflake

WordPress Gutenberg Editor: Nightmare or Dream Come True?

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Amrut Prabhu

Amrut Prabhu

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles

More from Medium

Spring Boot Scheduler

Apache Kafka with Spring Boot Application

Securing your Spring Boot application with JWT

REST API Load performance testing with Apache JMeter