Spring Cloud Stream With Kafka

Introduction

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
@Bean
public Supplier<Message> producer() {
return () -> new Message(" jack from Streams");
}
@Bean
public Consumer<Message> consumer() {
return message -> System.out.println("received " + message);
}
spring:
cloud:
function:
definition: consumer;producer
stream:
bindings:
producer-out-0:
destination : first-topic
consumer-in-0:
destination : first-topic
consumer : <functionName> + -in- + <index>
producer : <functionName> + -out- + <index>
@Component
public class KafkaProducer {

@Autowired
private StreamBridge streamBridge;

@Scheduled(cron = "*/2 * * * * *")
public void sendMessage(){
streamBridge.send("producer-out-0",new Message(" jack from Stream bridge"));
}
}
spring:
cloud:
function:
definition: consumer;producer
stream:
kafka:
binder:
brokers: localhost:9092

bindings:
producer-out-0:
destination : first-topic
consumer-in-0:
destination : first-topic
public class MessageSerializer implements Serializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, Message data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}

}
}
--------------------------------------------------------------------
public class MessageDeSerializer implements Deserializer<Message> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public Message deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(new String(data), Message.class);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}
spring:
cloud:
function:
definition: consumer;producer
stream:
kafka:
bindings:
producer-out-0:
producer:
configuration:
value.serializer: com.amrut.prabhu.dto.coverters.MessageSerializer
consumer-in-0:
consumer:
configuration:
value.deserializer: com.amrut.prabhu.dto.coverters.MessageDeSerializer
binder:
brokers: localhost:9092

bindings:
producer-out-0:
destination : first-topic
producer:
useNativeEncoding: true # Enables using the custom serializer
consumer-in-0:
destination : first-topic
consumer:
use-native-decoding: true # Enables using the custom deserializer

The properties may look garbled here, but you can refer to them in a cleaner way on my site https://refactorfirst.com

java -jar \
target/spring-cloud-stream-kafka-communication-0.0.1-SNAPSHOT.jar

Conclusion

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store