RabbitMQ - одно из популярных решений для отправки сообщений, предоставляется клиентские библиотеки для таких языков как Java, Scala, .NET, Go, Python, PHP и другие. В данном турториале мы рассмотрим отправку и обработку сообщений, а так же как отправлять сообщение с JSON содержимым и работать с ошибками при помощи Dead Letter Queues (DLQ)
Для начала установим сервер RabbitMQ на вашей локальной машине, как указано здесь или запустим с помощью docker-compose:
version: '3' services: rabbitmq: container_name: rabbitmq image: 'rabbitmq:management' ports: - "5672:5672" - "15672:15672"Теперь можно запустить RabbitMQ используя docker-compose up и открыть UI администратора по адресу http://localhost:15672/.
Если вы знакомы с другими брокерскими службами обмена сообщениями, такими как ActiveMQ, в них обычно используются очереди(Queues) и топики (Topics) для отправки индивидуальной и общедоступной модели связи. В RabbitMQ используется Exchange и в зависимости от ключа маршрутизации, сообщение будет отправлено в очередь (очереди) (Queues). Вы можете больше узнать о концепциях RabbitMQ здесь.
Приложение SpringBoot c RabbitMQ
Создадим SpringBoot приложение, описав следующие стартеры:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sivalabs</groupId> <artifactId>springboot-rabbitmq-demo</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RC1</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>Начнем с конфигурации RabbitMQ, создадим RabbitConfig и опишем следующие бины: Queue, Exchange и Binding:
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String QUEUE_ORDERS = "orders-queue"; public static final String EXCHANGE_ORDERS = "orders-exchange"; @Bean Queue ordersQueue() { return QueueBuilder.durable(QUEUE_ORDERS).build(); } @Bean Queue deadLetterQueue() { return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build(); } @Bean Exchange ordersExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build(); } @Bean Binding binding(Queue ordersQueue, TopicExchange ordersExchange) { return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS); } }Мы объявляем Queue имен orders-queue и Exchange с именем orders-exchange.
Также определили привязку между orders-queue и orders-exchange, чтобы любое сообщение, отправленное на orders-exchange с ключом направления как orders-queue, отправлялось в orders-queue.
Конфигурация RabbitMQ представлена в application.properties:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guestСоздадим бин OrderMessageSender для отправки сообщений в orders-exchange. Spring Boot автоматически конфигурирует инфраструктурные бины, необходимые для отправки/обработки сообщений в/от брокера сообщений RabbitMQ. Нам необходимо просто проинжектить RabbitTemplate и отправить сообщение вызвав метод rabbitTemplate.convertAndSend(“routingKey”, Object).
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import java.io.Serializable; @Data @AllArgsConstructor @NoArgsConstructor @ToString public class Order implements Serializable{ private String orderNumber; private String productId; private double amount; } import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class OrderMessageSender { private final RabbitTemplate rabbitTemplate; @Autowired public OrderMessageSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendOrder(Order order) { this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order); } }По умолчанию в SpringBoot используется org.springframework.amqp.support.converter. SimpleMessageConverterand, который сериализует объект в byte[].
После отправки сообщения вы можете просмотреть сообщение из интерфейса администратора, выполнив вход с учетными данными guest/guest.
Теперь создадим слушателя, используя аннотацию RabbitListener.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import ru.config.Config; import ru.model.Order; @Component public class OrderMessageListener { static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class); @RabbitListener(queues = Config.QUEUE_ORDERS) public void process(Order order){ logger.info("Received order"); logger.info(order.toString()); } }
Теперь, если вы отправляете сообщение в очередь, то с помощью метода OrderMessageListener.process(), вы обработаете данное сообщение и в логах приложения будет написано Received order и само содержимое объекта.
Отправка и прием сообщений c JSON содержимым.
Как мы видели, механизм сериализации по умолчанию преобразует объект сообщения в byte [] с помощью SimpleMessageConverter и на принимающей стороне, он будет десериализован из byte [] в тип объекта (в нашем случае Order) с помощью GenericMessageConverter.
Чтобы изменить это поведение, нам нужно настроить автоконфигурацию бинов Spring Boot RabbitMQ.
Отправить сообщение как JSON
Одним быстрым способом отправки сообщения с JSON является использование ObjectMapper, мы можем конвертировать Orderobject в JSON и отправлять его.
Чтобы изменить это поведение, нам нужно настроить автоконфигурацию бинов Spring Boot RabbitMQ.
Отправить сообщение как JSON
Одним быстрым способом отправки сообщения с JSON является использование ObjectMapper, мы можем конвертировать Orderobject в JSON и отправлять его.
import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import ru.config.Config; import ru.model.Order; @Service public class OrderMessageService { private final RabbitTemplate rabbitTemplate; private ObjectMapper objectMapper; @Autowired public OrderMessageService(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) { this.rabbitTemplate = rabbitTemplate; this.objectMapper = objectMapper; } public void sendOrder(Order order) throws Exception{ String orderJson = objectMapper.writeValueAsString(order); Message message = MessageBuilder .withBody(orderJson.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .build(); rabbitTemplate.convertAndSend(Config.QUEUE_ORDERS, order); } }Но преобразование объектов в JSON, как это, является своего рода избыточностью. Вместо этого мы можем сделать изменить конфигурацию, настроив компонент org.springframework.amqp.support.converter.Jackson2JsonMessageConverter, который будет использоваться в RabbitTemplate, чтобы сообщение было сериализовано как JSON вместо byte[].
@Configuration public class RabbitConfig{ ... ... @Bean public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(producerJackson2MessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter producerJackson2MessageConverter() { return new Jackson2JsonMessageConverter(); } }Теперь сообщения отправляются в виде JSON. Так же необходимо настроить прием JSON сообщения. Для этого необходимо имплиментировать для нашего класса конфигурций интерфейс RabbitListenerConfigurer.
@Configuration public class RabbitConfig implements RabbitListenerConfigurer { ... ... @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter()); return messageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter consumerJackson2MessageConverter() { return new MappingJackson2MessageConverter(); } }
Обработка ошибок и невалидных сообщений, используя DeadLetterQueues(DLQ)
Мы можем отправить невалидные сообщения в отдельную очередь, чтобы мы могли их проверять и обрабатывать позже. Для этого можно использовать концепцию DLQ для автоматизации, вместо написания кода и обработки такого события.
Мы можем объявить dead-letter-exchange, dead-letter-routing-key для очереди, тогда объявление бина будет следующим:
@Configuration public class RabbitConfig implements RabbitListenerConfigurer { public static final String QUEUE_ORDERS = "orders-queue"; public static final String EXCHANGE_ORDERS = "orders-exchange"; public static final String QUEUE_DEAD_ORDERS = "dead-orders-queue"; @Bean Queue ordersQueue() { return QueueBuilder.durable(QUEUE_ORDERS) .withArgument("x-dead-letter-exchange", "") .withArgument("x-dead-letter-routing-key", QUEUE_DEAD_ORDERS) .withArgument("x-message-ttl", 15000) //Если сообщение не бработано за 15 сек - оно отправляется в DLQ .build(); } @Bean Queue deadLetterQueue() { return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build(); } ... ... }Теперь можно отправить сообщение с JSON отличным от объекта Order и оно попадет в очередь dead-orders-queue. Необходимо заметить, что если очередь orders-queue была создана - ее необходимо удалить, чтобы приложение запустилось.
PS это мой перевод данной статьи