Spring Boot Microservices Tutorial - Part 8
In Part 8 of this Spring Boot Microservices Tutorial series, we will integrate Kafka into our project and learn how to build Event-Driven Microservices with Spring Boot and Kafka.
What are Event Driven Microservices?
Event-driven microservices architecture is a way of building applications, where the systems communicate by publishing and consuming events, these events are available whenever other consumers need to read them at any time.
Apache Kafka is a distributed messaging and streaming platform used frequently in the industry to implement Event-Driven Architecture.
Installing Apache Kafka through Docker
We will use Docker to install Apache Kafka together with Zookeeper. We will also use a Kafka UI to see the topics and messages in our Kafka Cluster using the Kafka UI project. Here is how the Docker compose file looks like in the order-service docker-compose.yaml file:
version: '4'
services:
mysql:
image: mysql:8.3.0
container_name: mysql
environment:
MYSQL_ROOT_PASSWORD: mysql
ports:
- "3306:3306"
volumes:
- ./mysql:/var/lib/mysql
- ./docker/mysql/init.sql:/docker-entrypoint-initdb.d/init.sql
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.5.0
container_name: broker
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8085:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- "8086:8080"
depends_on:
- broker
- schema-registry
environment:
KAFKA_CLUSTERS_NAME: local
KAFKA_CLUSTERS_BOOTSTRAPSERVERS: broker:29092
KAFKA_CLUSTERS_SCHEMAREGISTRY: http://schema-registry:8081
DYNAMIC_CONFIG_ENABLED: 'true'
The main services we use are
-
cp-zookeeper which is a Zookeeper cluster that is used to orchestrate multiple Kafka clusters.
-
cp-kafka which is the Kafka server itself
-
cp-schema-registry is the service we used to define the schema of the messages that are sent between producers and consumers
-
Lastly, we have kafka-ui which provides a nice UI to allow us to view the Kafka topics that are created and also helps us to view the messages from and send messages to the Kafka topic.
After updating the docker-compose file, just run docker compose up -d to start all the services.
Spring Kafka
We will be using the Spring Kafka project to implement Kafka functionality in our Spring Boot projects, for that add the below dependencies in the order-service project.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
The above dependencies not only add the spring-kafka functionality but also bring in dependencies to work with schema-registry. We will define our schema in avro format, for that reason we need to also add the avro and kafka-avro-serializer dependencies.
After adding the above dependencies, now it's time to implement the logic to send an event to the kafka topic whenever there is an order placed in the order-service. We will first start by defining the avro-schema of the event we want to send. And we will define the schema in a .avsc file, avsc is the format to define the Avro schema, let's add the below file under src/main/resources/avro folder.
order-placed.avsc
{
"type": "record",
"name": "OrderPlacedEvent",
"namespace": "com.techie.microservices.order.event",
"fields": [
{ "name": "orderNumber", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
]
}
Here we have a few fields orderNumber, email, firstName, and lastName that are used to send notifications to the user whenever an order is placed successfully.
The idea is to generate the Java classes automatically using this schema, so if there is a change in the schema file, then those changes will be automatically applied during the build time.
To be able to generate the Java classes automatically, we are going to use the avro-maven-plugin:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
Add the above plugin under the <plugins>
section and run mvn clean compile command.
Now you should see a file called OrderPlacedEvent.java under the com.techie.microservices.order.event package.
Producing Messages from Order Service
Now it's time to configure Kafka in our Spring Boot application, for that we are going to add the following properties in the application.properties file.
#Kafka Properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.template.default-topic=order-placed
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.properties.schema.registry.url=http://localhost:8085
The above properties provide the necessary configuration to run Kafka Producer and to use the Kafka Schema registry in our Order service application.
Inside the OrderService.java class, let's add the logic to send the messages to the Kafka topic using the KafkaTemplate class.
Here is how the OrderService.java class looks like:
OrderService.java
package com.techie.microservices.order.service;
import com.techie.microservices.order.client.InventoryClient;
import com.techie.microservices.order.dto.OrderRequest;
import com.techie.microservices.order.event.OrderPlacedEvent;
import com.techie.microservices.order.event.handler.OrderPlacedEventHandler;
import com.techie.microservices.order.model.Order;
import com.techie.microservices.order.repository.OrderRepository;
import groovy.util.logging.Slf4j;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.UUID;
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderService {
private static final Logger log = LoggerFactory.getLogger(OrderService.class);
private final OrderRepository orderRepository;
private final InventoryClient inventoryClient;
private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
public void placeOrder(OrderRequest orderRequest) {
var isProductInStock = inventoryClient.isInStock(orderRequest.skuCode(), orderRequest.quantity());
if (isProductInStock) {
Order order = new Order();
order.setOrderNumber(UUID.randomUUID().toString());
order.setPrice(orderRequest.price().multiply(BigDecimal.valueOf(orderRequest.quantity())));
order.setSkuCode(orderRequest.skuCode());
order.setQuantity(orderRequest.quantity());
orderRepository.save(order);
var orderPlacedEvent = new OrderPlacedEvent(order.getOrderNumber(), orderRequest.userDetails()
.email(),
orderRequest.userDetails()
.firstName(),
orderRequest.userDetails()
.lastName());
log.info("Start- Sending OrderPlacedEvent {} to Kafka Topic", orderPlacedEvent);
kafkaTemplate.send("order-placed", orderPlacedEvent);
log.info("End- Sending OrderPlacedEvent {} to Kafka Topic", orderPlacedEvent);
} else {
throw new RuntimeException("Product with SkuCode " + orderRequest.skuCode() + " is not in stock");
}
}
}
This is all the logic we need to produce the events to order-placed kafka topic. Now let's see how to consume the messages in our consumer, that would be the notification-service.
Consuming Messages from Notification Service
Let's create a new Spring Boot application called Notification Service with the following dependencies.
-
Spring Kafka
-
Java Mail Sender
-
Lombok
-
Test Containers
After adding these dependencies, generate the project and open it in your IDE.
Now we need to add some more dependencies like Kafka Schema Registry, Avro Serializer, etc.
The complete pom.xml for notification-service looks like below:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.programming.techie</groupId>
<artifactId>notification-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>notification-service</name>
<description>notification-service</description>
<properties>
<java.version>21</java.version>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
Place the order-placed.avsc file under the src/main/resources/avro folder
order-placed.avsc
{
"type": "record",
"name": "OrderPlacedEvent",
"namespace": "com.techie.microservices.order.event",
"fields": [
{ "name": "orderNumber", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
]
}
Now let's configure the properties for Kafka Consumer in our Spring Boot Application's application.properties file:
spring.application.name=notification-service
# Mail Properties
spring.mail.host=sandbox.smtp.mailtrap.io
spring.mail.port=2525
spring.mail.username=<username>
spring.mail.password=<password>
spring.mail.protocol=smtp
# Kafka Config
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=notificationService
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.schema.registry.url=http://localhost:8085
spring.kafka.consumer.properties.specific.avro.reader=true
Create a class called NotificationService.java that listens for the messages on the topic - "order-placed" and sends email
NotificationService.java
package com.techie.microservices.notification.service;
import com.techie.microservices.order.event.OrderPlacedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.mail.MailException;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.mail.javamail.MimeMessagePreparator;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
private final JavaMailSender javaMailSender;
@KafkaListener(topics = "order-placed")
public void listen(OrderPlacedEvent orderPlacedEvent){
log.info("Got Message from order-placed topic {}", orderPlacedEvent);
MimeMessagePreparator messagePreparator = mimeMessage -> {
MimeMessageHelper messageHelper = new MimeMessageHelper(mimeMessage);
messageHelper.setFrom("springshop@email.com");
messageHelper.setTo(orderPlacedEvent.getEmail().toString());
messageHelper.setSubject(String.format("Your Order with OrderNumber %s is placed successfully", orderPlacedEvent.getOrderNumber()));
messageHelper.setText(String.format("""
Hi %s,%s
Your order with order number %s is now placed successfully.
Best Regards
Spring Shop
""",
orderPlacedEvent.getFirstName().toString(),
orderPlacedEvent.getLastName().toString(),
orderPlacedEvent.getOrderNumber()));
};
try {
javaMailSender.send(messagePreparator);
log.info("Order Notifcation email sent!!");
} catch (MailException e) {
log.error("Exception occurred when sending mail", e);
throw new RuntimeException("Exception occurred when sending mail to springshop@email.com", e);
}
}
}