суббота, 24 февраля 2018 г.

Взаимодействие SpringBoot с RabbitMq

RabbitMQ - одно из популярных решений для отправки сообщений, предоставляется клиентские библиотеки для таких языков как Java, Scala, .NET, Go, Python, PHP  и другие. В данном турториале мы рассмотрим отправку и обработку сообщений, а так же как отправлять сообщение с JSON содержимым и работать с ошибками при помощи Dead Letter Queues (DLQ)

Для начала установим сервер RabbitMQ на вашей локальной машине, как указано здесь или запустим с помощью docker-compose:

version: '3'
services:
  rabbitmq:
    container_name: rabbitmq
    image: 'rabbitmq:management'
    ports:
      - "5672:5672"
      - "15672:15672"
 
Теперь можно запустить RabbitMQ используя docker-compose up и открыть UI администратора по адресу http://localhost:15672/.

Если вы знакомы с другими брокерскими службами обмена сообщениями, такими как ActiveMQ,  в них обычно используются очереди(Queues) и топики (Topics) для отправки индивидуальной и общедоступной модели связи. В RabbitMQ используется Exchange и в зависимости от ключа маршрутизации, сообщение будет отправлено в очередь (очереди) (Queues). Вы можете больше узнать о концепциях RabbitMQ здесь.

Приложение SpringBoot c RabbitMQ

Создадим SpringBoot приложение, описав следующие стартеры:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.sivalabs</groupId>
    <artifactId>springboot-rabbitmq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RC1</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
      
</project>
Начнем с конфигурации RabbitMQ, создадим RabbitConfig и опишем следующие бины: Queue, Exchange и Binding:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
  
@Configuration
public class RabbitConfig 
{
    public static final String QUEUE_ORDERS = "orders-queue";
    public static final String EXCHANGE_ORDERS = "orders-exchange";

    @Bean
    Queue ordersQueue() {
        return QueueBuilder.durable(QUEUE_ORDERS).build();
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();
    }
    @Bean
    Exchange ordersExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build();
    }

    @Bean
    Binding binding(Queue ordersQueue, TopicExchange ordersExchange) {
        return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS);
    }
}
Мы объявляем Queue имен orders-queue и Exchange с именем orders-exchange.
Также определили привязку между orders-queue и orders-exchange, чтобы любое сообщение, отправленное на orders-exchange с ключом направления как orders-queue, отправлялось в orders-queue.

Конфигурация RabbitMQ представлена в application.properties:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Создадим бин OrderMessageSender для отправки сообщений в orders-exchange. Spring Boot автоматически конфигурирует инфраструктурные бины, необходимые для отправки/обработки сообщений в/от брокера сообщений RabbitMQ. Нам необходимо просто проинжектить RabbitTemplate и отправить сообщение вызвав метод  rabbitTemplate.convertAndSend(“routingKey”, Object).

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Order implements Serializable{
    private String orderNumber;
    private String productId;
    private double amount;
}

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderMessageSender {
    private final RabbitTemplate rabbitTemplate;
    @Autowired
    public OrderMessageSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendOrder(Order order) {
        this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order);

    }
}
По умолчанию в SpringBoot используется org.springframework.amqp.support.converter. SimpleMessageConverterand, который сериализует объект в byte[].
После отправки сообщения вы можете просмотреть сообщение из интерфейса администратора, выполнив вход с учетными данными guest/guest.

Теперь создадим слушателя, используя аннотацию RabbitListener.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import ru.config.Config;
import ru.model.Order;

@Component
public class OrderMessageListener {
    static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class);

    @RabbitListener(queues = Config.QUEUE_ORDERS)
    public void process(Order order){
        logger.info("Received order");
        logger.info(order.toString());
    }

}

Вот и все! Просто добавив @RabbitListener и определив, какую очередь слушать, мы можем создать слушателя.

Теперь, если вы отправляете сообщение в очередь, то с помощью метода OrderMessageListener.process(), вы обработаете данное сообщение и в логах приложения будет написано Received order и само содержимое объекта.

Отправка и прием сообщений c JSON содержимым.

Как мы видели, механизм сериализации по умолчанию преобразует объект сообщения в byte [] с помощью SimpleMessageConverter и на принимающей стороне, он будет десериализован из byte [] в тип объекта (в нашем случае Order) с помощью GenericMessageConverter.

Чтобы изменить это поведение, нам нужно настроить автоконфигурацию бинов Spring Boot RabbitMQ.

Отправить сообщение как JSON

Одним быстрым способом отправки сообщения с JSON является использование ObjectMapper, мы можем конвертировать Orderobject в JSON и отправлять его.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import ru.config.Config;
import ru.model.Order;

@Service
public class OrderMessageService {
    private final RabbitTemplate rabbitTemplate;
    private ObjectMapper objectMapper;

    @Autowired
    public OrderMessageService(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) {
        this.rabbitTemplate = rabbitTemplate;
        this.objectMapper = objectMapper;
    }

    public void sendOrder(Order order) throws Exception{
        String orderJson = objectMapper.writeValueAsString(order);
        Message message = MessageBuilder
                .withBody(orderJson.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .build();

        rabbitTemplate.convertAndSend(Config.QUEUE_ORDERS, order);
    }
}
Но преобразование объектов в JSON, как это, является своего рода избыточностью. Вместо этого мы можем сделать изменить конфигурацию, настроив компонент org.springframework.amqp.support.converter.Jackson2JsonMessageConverter, который будет использоваться в RabbitTemplate, чтобы сообщение было сериализовано как JSON вместо byte[].
@Configuration
public class RabbitConfig{
    ...
    ...

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
Теперь сообщения отправляются в виде JSON. Так же необходимо настроить прием JSON сообщения. Для этого необходимо имплиментировать для нашего класса конфигурций интерфейс RabbitListenerConfigurer.
 @Configuration
public class RabbitConfig implements RabbitListenerConfigurer {
    ...
    ...
    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }
    @Bean
    MessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
        return messageHandlerMethodFactory;
    }
  
    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }
}

Обработка ошибок и невалидных сообщений, используя DeadLetterQueues(DLQ)

Мы можем отправить невалидные сообщения в отдельную очередь, чтобы мы могли их проверять и обрабатывать позже. Для этого можно использовать концепцию DLQ для автоматизации, вместо написания кода и обработки такого события.
Мы можем объявить dead-letter-exchange, dead-letter-routing-key для очереди, тогда объявление бина будет следующим:

@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {

    public static final String QUEUE_ORDERS = "orders-queue";
    public static final String EXCHANGE_ORDERS = "orders-exchange";
    public static final String QUEUE_DEAD_ORDERS = "dead-orders-queue";

    @Bean
    Queue ordersQueue() {
        return QueueBuilder.durable(QUEUE_ORDERS)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", QUEUE_DEAD_ORDERS)
                .withArgument("x-message-ttl", 15000) //Если сообщение не бработано за 15 сек - оно отправляется в DLQ
                .build();

    }
 
    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();
    }

    ...
    ...
}

Теперь можно отправить сообщение с JSON отличным от объекта Order и оно попадет в очередь dead-orders-queue. Необходимо заметить, что если очередь orders-queue была создана - ее необходимо удалить, чтобы приложение запустилось.

PS это мой перевод данной статьи