SpringBoot and Kafka

This is a simple tutorial to show how easy it is to get Spring Boot connected to Kafka. The source code can be found here.

Let’s start by getting Kafka running using a docker container. I’ll assume you have Docker installed. Create file docker-compose.yml with this content.

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
  kafka:
    image: wurstmeister/kafka
    ports:
      - '9092:9092'
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

In the terminal run these command to start ZooKeeper and Kafka. The last starts an interactive producer. Once running you can type a message like: Hello World to add a message to Kafka. Note: you will need to install Kafka to have access to kafka-console-producer, I suggest using Homebrew to install this brew install kafka.

docker-compose up -d
docker ps
kafka-console-producer --broker-list localhost:9092 --topic test

To view the messages open a new terminal and run this command.

kafka-console-consumer --bootstrap-server localhost:9092 --topic test

SpringBoot

Go to spring initializer and create a new SpringBoot application with dependencies Web, Kafka, DevTools, and Lombok. Add a controller we can use to publish some messages. Create KafkaController.java.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.codegreenllc.kafka.services.Producer;

@RestController
public class KafkaController {
    @Autowired
    private Producer producer;

    @PostMapping(value = "/kafka/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") final String message) {
        producer.sendMessage(message);
    }
}

Now create the Publish and Consume services.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class Producer {
    private static final String TOPIC = "test";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(final String message) {
        log.info(String.format("$$ -> Producing message --> %s", message));
        kafkaTemplate.send(TOPIC, message);
    }
}
package com.codegreenllc.kafka.services;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class Consumer {
    @KafkaListener(topics = "test", groupId = "group_id")
    public void consume(final String message) {
        log.info(String.format("$$ -> Consumed Message -> %s", message));
    }
}

Finally we need to set up the application.properties like this.

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.bootstrap-servers: localhost:9092

Go ahead and start the app so we can use the publish endpoint to send a message. The simplest way to do this is with Curl (Postman is another great option).

curl -X POST \
  'http://localhost:8080/kafka/publish?message=Hello%20World' \
  -H 'cache-control: no-cache'

The App logs will confirm the message was published and consumed.


Copyright © 2020 Code Green LLC