Introduction
Message Driven architectures are a part of a larger concept known as reactive programming(asynchronous systems). Asynchronous programming is quite critical to creating a truly scalable platform, where various services can communicate with each other easily, can scale up and down independently, and where a service failure won’t be catastrophic i.e. causing system-wide downtime.
Messages vs Events
While the two are closely related and often use the same architecture, there are some core differences.
A message can be defined as an item of data that is sent and intended for a specific destination. In a Message-driven system, recipients await message arrival and react to them accordingly.
An event on the other can be defined as an action or change in state, that can be observed and further trigger different actions in different recipients.
One way of achieving/implementing message-driven architectures is by using PUB/SUB event buses/or protocols to communicate between services.
Message-driven systems are highly scalable as they reduce the load of having to get immediate responses for each request.
What You Will Learn
- Basic Implementation of PUB/SUB
- Redis Basic Configuration
- Spring Boot Redis Integration
Prerequisites
To follow this tutorial, you should have basic knowledge of working with:
- Spring Boot
- Docker
- Git
Key Tools and Components
- Redis
- Pub/Sub
The workflow
The figure above shows the components involved: Service A (Generates events), Notifications Service, Redis, 3rd Party Integrations. The event processor (Notification Service) is what reads the events from the log and processes i.e sending the messages over email or SMS.
Environment Set Up
MySQL
Connect to MySQL instance of choice.
Redis
We’ll set up Redis to run on a docker container for testing purposes.
Folder Structure
├── Dockerfile
├── entrypoint.sh
└── redis.conf
Dockerfile
FROM redis:alpine
WORKDIR /redis
COPY redis.conf /usr/local/etc/redis/redis.conf
COPY entrypoint.sh ./
RUN chmod +x entrypoint.sh
redis.conf
maxmemory-policy allkeys-lru
requirepass {your-redis-password}
entrypoint.sh
#!/bin/sh
# Redis must be restarted after THP is disabled.
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag
sysctl -w net.core.somaxconn=512
sysctl vm.overcommit_memory=1
# start redis server
redis-server /usr/local/etc/redis/redis.conf --bind 0.0.0.0
Spring Boot
Ensure you have JDK and Maven installed.
Head over to Spring Initializr and create your SpringBoot project with the following options.
Step 1 Gradle
Step 2 Properties
spring.datasource.url=jdbc:mysql://${DB_HOST:127.0.0.1}:${DB_PORT
:3306}/${MYSQL_DATABASE:messaging.decoded.application}
spring.datasource.username=${MYSQL_USER:null}
spring.datasource.password=${MYSQL_USER_PASSWORD:null}
spring.redis.host=${REDIS_HOST:127.0.0.1}
spring.redis.port=6379
spring.redis.password=${REDIS_PASSWORD:null}
decoded.africastalking.username=${AT_USER_NAME:null}
decoded.africastalking.api-key=${AT_API_KEY:null}
decoded.africastalking.default-from=${AT_DEFAULT_SENDER:AFRICASTKNG}
server.port=${SERVER_PORT:5768}
springdoc.api-docs.enabled=true
springdoc.swagger-ui.path=/swagger-ui.html
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.
MySQLDialect
Step 3 Configuration
Let’s start by adding the redis configuration.
First Well Define a MessageListenerAdapter this delegates the handling of messages to the target listener methods through reflection, with flexible message type conversion.
This bean defines the subscriber in the pub-sub messaging model, currently, the handler function is defined as “onMessage” you can customize this to something of your own choosing.
@Bean
MessageListenerAdapter messageListener(RedisMessageSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "onMessage");
}
Second, we’ll define the RedisMessageListenerContainer which provides asynchronous behaviour for Redis message Listeners, It handles the low-level details of listening, converting, and message dispatching
@Bean
RedisMessageListenerContainer
redisContainer(MessageListenerAdapter listenerAdapter) {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(jedisConnectionFactory());
container.addMessageListener(listenerAdapter, topic());
return container;
}
Next, we’ll set up a topic to which the publisher should send messages, and to which the subscriber will listen to.
@Bean
ChannelTopic topic() {
return new ChannelTopic("pubsub:queue");
}
We’ll then move on to create a bean using the custom MessagePublisher interface. This will allow us to have a generic message-publishing API, and have the Redis implementation get a redisTemplate and topic as its constructor arguments
@Bean
MessagePublisher redisPublisher() {
return new RedisMessagePublisher(redisTemplate(), topic());
}
Next is to define Redis template to allow key/value data handling, this will also allow us to define the serialization scheme. The below configuration will Serialize the values to Json before publishing, this can also be used to deserialize at the subscriber instance.
@Bean
public RedisTemplate<String, Object> redisTemplate() {
final RedisTemplate<String, Object> redisTemplate = new
RedisTemplate<>();
redisTemplate.setConnectionFactory(jedisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new
GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
Finally, to close the configuration we’ll define the JedisConnectionFactory Bean to allow connection to the Redis instance based on predefined credentials.
@Value("${spring.redis.host}")
private String hostName;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.password}")
private String password;
@Bean
JedisConnectionFactory jedisConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new
RedisStandaloneConfiguration(this.hostName, this.port);
redisStandaloneConfiguration.setPassword(RedisPassword.of(this.password));
return new JedisConnectionFactory(redisStandaloneConfiguration);
}
Step 4 Entity and Redis Dto
Let us define our Message entity as well as its DAO(Repository)
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.LastModifiedDate;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Entity
@Getter(value = AccessLevel.PUBLIC)
@Setter(value = AccessLevel.PUBLIC)
@ToString
public class Message {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long id;
private String recepient;
private MessageStatus status;
private MessageType type;
private Integer retries = 0;
@Column(nullable = true)
private String subject;
@Column(columnDefinition = "text")
private String content;
@CreatedDate
@Temporal(TemporalType.TIMESTAMP)
protected Date creationDate;
@LastModifiedDate
@Temporal(TemporalType.TIMESTAMP)
protected Date lastModifiedDate;
}
public enum MessageStatus {
PENDING, SENT, FAILED
}
public enum MessageType {
SMS, MAIL
}
The Redis Dto
Since we have spring data rest within the installed packages we’ll expose a few methods directly from the repository.
Step 5 Message Handlers
Define the Event Processor
The Event Processor performs tasks triggered by events, in this case, our task will be sending the notification using preconfigured 3rd Party integrations.
Define the Subscriber
RedisMessageSubscriber implements the Redis-provided MessageListener interface
import {your-packaging}.MessageRedisDto;
import {your-packaging}.SmsMessagingService;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
public class RedisMessageSubscriber {
@Autowired
private RedisTemplate<?, ?> redisTemplate;
@Autowired
private SmsMessagingService smsMessagingService;
public void onMessage(String message) throws
JsonProcessingException {
MessageRedisDto ms = (MessageRedisDto)
this.redisTemplate.getValueSerializer().deserialize(message.getBytes());
System.out.println(ms);
smsMessagingService.sendMessages(ms);
}
}
Define the Publisher
We’ll define a custom MessagePublisher interface and a RedisMessagePublisher implementation for it.
public interface MessagePublisher {
void publish(final Object message);
}
import {your-packaging}.MessagePublisher; // your custom
MessagePublisher interface
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Service;
@Service
public class RedisMessagePublisher implements MessagePublisher {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ChannelTopic topic;
public RedisMessagePublisher() {
}
public RedisMessagePublisher(
RedisTemplate<String, Object> redisTemplate, ChannelTopic
topic){
this.redisTemplate = redisTemplate;
this.topic = topic;
}
public void publish(Object message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
Define Sample Controller/Service for Testing
Define the sample service Method to subulate the PUB section of the queue.
@Autowired
private RedisMessagePublisher redisMessagePublisher;
/**
* @param messageRequest
* @return
*/
public MessageRedisDto logMessage(MessageRedisDto messageRequest)
{
MessageRedisDto message = new MessageRedisDto();
message.setRecepient(messageRequest.getRecepient());
message.setContent(messageRequest.getContent());
message.setStatus(MessageStatus.PENDING);
message.setType(messageRequest.getType());
message.setSubject(messageRequest.getSubject());
redisMessagePublisher.publish(message);
return message;
}
Define the sample controller.
import {your-packaging}.MessageRedisDto;
import {your-packaging}.SmsMessagingService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
/**
* MessageController
*/
@RestController
@Tag(name = "Messaging")
@RequestMapping(value = "api/v1/messages")
public class MessageController {
@Autowired
private SmsMessagingService messageService;
@PostMapping(value = "log/message")
@Operation(summary = "Send Message", description = "Send
Message")
public ResponseEntity<MessageRedisDto>
sendMessage(@RequestBody MessageRedisDto messageRequest) {
MessageRedisDto message =
messageService.logMessage(messageRequest);
return new ResponseEntity<>(message, HttpStatus.OK);
}
}
Testing things out
View Redis Logs
run the command below to the redis attached terminal to view activity
redis-cli -a {password} monitor
Run Spring boot and open swagger to test a sample message
Conclusion
Both enterprise and middleware vendors have begun embracing event-driven architecture, the recent years witnessing a huge growth in corporate interest in adopting message and event-driven systems. In this article, we have gone through a simple messaging system implemented on redis PUB/SUB configuration.
Quickstart
To get you started you can quickly pull the repo showcasing this full tutorial and run.