Spring Boot With Kafka Communication

Introduction

Kafka over the years has gained a lot in popularity for its high throughput and real-time asynchronous messaging. It's considered a de facto standard for streaming of events and provides fault-tolerant storage that is stable, reliable, and scalable.

Creating a Producer

Let’s go to https://start.spring.io and create an application adding the spring-kafka dependency as bellow.

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
@Component
public class KafkaProducer {

@Value("${topic.name}")
private String topicName;

private KafkaTemplate kafkaTemplate;

public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@Scheduled(cron = "*/2 * * * * *")
public void sendMessage() {
UUID key = UUID.randomUUID();
Message payload = new Message("jack");
System.out.println("Sending Data " + payload);

ProducerRecord<String, Message> record = new ProducerRecord<String, Message>(topicName,
key.toString(),
payload);

kafkaTemplate.send(record);
}
}
spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
client-id: my-client-consumer
group-id: spring-application-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.amrut.prabhu.kafkacommunicationservice.dto.converters.MessageDeSerializer
producer:
client-id: my-client-application
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.amrut.prabhu.kafkacommunicationservice.dto.converters.MessageSerializer

topic:
name: "first-topic"
public class MessageSerializer implements Serializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, Message data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}

}
}
--------------------------------------------------------------------
public class MessageDeSerializer implements Deserializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public Message deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, Message.class);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}

Creating a Consumer

Along with the producer, we have set some consumer properties. So let's create a consumer for the topic.

@Component
public class KafkaConsumer {

@KafkaListener(id = "my-client-application", topics = "${topic.name}")
public void consumer(ConsumerRecord<String, Message> consumerRecord) {
System.out.println("Consumed Record Details: " + consumerRecord);
Message message = consumerRecord.value();
System.out.println("Consumed Message" + message);
}
}
java -jar \ 
target/spring-kafka-communication-service-0.0.1-SNAPSHOT.jar

Conclusion

In this article, We saw how we can read and send messages on a Kafka topic using Spring-Kafka.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Amrut Prabhu

Amrut Prabhu

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles