RabbitMQ - одно из популярных решений для отправки сообщений, предоставляется клиентские библиотеки для таких языков как Java, Scala, .NET, Go, Python, PHP и другие. В данном турториале мы рассмотрим отправку и обработку сообщений, а так же как отправлять сообщение с JSON содержимым и работать с ошибками при помощи Dead Letter Queues (DLQ)
Для начала установим сервер RabbitMQ на вашей локальной машине, как указано здесь или запустим с помощью docker-compose:
version: '3'
services:
rabbitmq:
container_name: rabbitmq
image: 'rabbitmq:management'
ports:
- "5672:5672"
- "15672:15672"
Теперь можно запустить RabbitMQ используя docker-compose up и открыть UI администратора по адресу http://localhost:15672/.Если вы знакомы с другими брокерскими службами обмена сообщениями, такими как ActiveMQ, в них обычно используются очереди(Queues) и топики (Topics) для отправки индивидуальной и общедоступной модели связи. В RabbitMQ используется Exchange и в зависимости от ключа маршрутизации, сообщение будет отправлено в очередь (очереди) (Queues). Вы можете больше узнать о концепциях RabbitMQ здесь.
Приложение SpringBoot c RabbitMQ
Создадим SpringBoot приложение, описав следующие стартеры:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sivalabs</groupId>
<artifactId>springboot-rabbitmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RC1</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
Начнем с конфигурации RabbitMQ, создадим RabbitConfig и опишем следующие бины: Queue, Exchange и Binding:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig
{
public static final String QUEUE_ORDERS = "orders-queue";
public static final String EXCHANGE_ORDERS = "orders-exchange";
@Bean
Queue ordersQueue() {
return QueueBuilder.durable(QUEUE_ORDERS).build();
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();
}
@Bean
Exchange ordersExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build();
}
@Bean
Binding binding(Queue ordersQueue, TopicExchange ordersExchange) {
return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS);
}
}
Мы объявляем Queue имен orders-queue и Exchange с именем orders-exchange.Также определили привязку между orders-queue и orders-exchange, чтобы любое сообщение, отправленное на orders-exchange с ключом направления как orders-queue, отправлялось в orders-queue.
Конфигурация RabbitMQ представлена в application.properties:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guestСоздадим бин OrderMessageSender для отправки сообщений в orders-exchange. Spring Boot автоматически конфигурирует инфраструктурные бины, необходимые для отправки/обработки сообщений в/от брокера сообщений RabbitMQ. Нам необходимо просто проинжектить RabbitTemplate и отправить сообщение вызвав метод rabbitTemplate.convertAndSend(“routingKey”, Object).
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Order implements Serializable{
private String orderNumber;
private String productId;
private double amount;
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderMessageSender {
private final RabbitTemplate rabbitTemplate;
@Autowired
public OrderMessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendOrder(Order order) {
this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order);
}
}
По умолчанию в SpringBoot используется org.springframework.amqp.support.converter. SimpleMessageConverterand, который сериализует объект в byte[].После отправки сообщения вы можете просмотреть сообщение из интерфейса администратора, выполнив вход с учетными данными guest/guest.
Теперь создадим слушателя, используя аннотацию RabbitListener.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import ru.config.Config;
import ru.model.Order;
@Component
public class OrderMessageListener {
static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class);
@RabbitListener(queues = Config.QUEUE_ORDERS)
public void process(Order order){
logger.info("Received order");
logger.info(order.toString());
}
}
Теперь, если вы отправляете сообщение в очередь, то с помощью метода OrderMessageListener.process(), вы обработаете данное сообщение и в логах приложения будет написано Received order и само содержимое объекта.
Отправка и прием сообщений c JSON содержимым.
Как мы видели, механизм сериализации по умолчанию преобразует объект сообщения в byte [] с помощью SimpleMessageConverter и на принимающей стороне, он будет десериализован из byte [] в тип объекта (в нашем случае Order) с помощью GenericMessageConverter.
Чтобы изменить это поведение, нам нужно настроить автоконфигурацию бинов Spring Boot RabbitMQ.
Отправить сообщение как JSON
Одним быстрым способом отправки сообщения с JSON является использование ObjectMapper, мы можем конвертировать Orderobject в JSON и отправлять его.
Чтобы изменить это поведение, нам нужно настроить автоконфигурацию бинов Spring Boot RabbitMQ.
Отправить сообщение как JSON
Одним быстрым способом отправки сообщения с JSON является использование ObjectMapper, мы можем конвертировать Orderobject в JSON и отправлять его.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import ru.config.Config;
import ru.model.Order;
@Service
public class OrderMessageService {
private final RabbitTemplate rabbitTemplate;
private ObjectMapper objectMapper;
@Autowired
public OrderMessageService(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) {
this.rabbitTemplate = rabbitTemplate;
this.objectMapper = objectMapper;
}
public void sendOrder(Order order) throws Exception{
String orderJson = objectMapper.writeValueAsString(order);
Message message = MessageBuilder
.withBody(orderJson.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitTemplate.convertAndSend(Config.QUEUE_ORDERS, order);
}
}
Но преобразование объектов в JSON, как это, является своего рода избыточностью. Вместо этого мы можем сделать изменить конфигурацию, настроив компонент org.springframework.amqp.support.converter.Jackson2JsonMessageConverter, который будет использоваться в RabbitTemplate, чтобы сообщение было сериализовано как JSON вместо byte[].@Configuration
public class RabbitConfig{
...
...
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Теперь сообщения отправляются в виде JSON. Так же необходимо настроить прием JSON сообщения. Для этого необходимо имплиментировать для нашего класса конфигурций интерфейс RabbitListenerConfigurer. @Configuration
public class RabbitConfig implements RabbitListenerConfigurer {
...
...
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
@Bean
MessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
return messageHandlerMethodFactory;
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
}
Обработка ошибок и невалидных сообщений, используя DeadLetterQueues(DLQ)
Мы можем отправить невалидные сообщения в отдельную очередь, чтобы мы могли их проверять и обрабатывать позже. Для этого можно использовать концепцию DLQ для автоматизации, вместо написания кода и обработки такого события.
Мы можем объявить dead-letter-exchange, dead-letter-routing-key для очереди, тогда объявление бина будет следующим:
@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {
public static final String QUEUE_ORDERS = "orders-queue";
public static final String EXCHANGE_ORDERS = "orders-exchange";
public static final String QUEUE_DEAD_ORDERS = "dead-orders-queue";
@Bean
Queue ordersQueue() {
return QueueBuilder.durable(QUEUE_ORDERS)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", QUEUE_DEAD_ORDERS)
.withArgument("x-message-ttl", 15000) //Если сообщение не бработано за 15 сек - оно отправляется в DLQ
.build();
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();
}
...
...
}
Теперь можно отправить сообщение с JSON отличным от объекта Order и оно попадет в очередь dead-orders-queue. Необходимо заметить, что если очередь orders-queue была создана - ее необходимо удалить, чтобы приложение запустилось.PS это мой перевод данной статьи

