RabbitMQ Start

为了解决程序间通讯的问题

Java 客户端

依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

连接模板

        factory = new ConnectionFactory();
        factory.setHost("himcs.io");
        factory.setUsername("mcs");
        factory.setPassword("mcs");
        factory.setVirtualHost("/test");
                try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        }

核心对象

  • Connection TCP物理连接

  • Channel 通信通道,TCP虚拟连接

声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

参数

  1. 队列名称
  2. 是否持久化
  3. 是否私有化,fase代表所有消费者都可访问,true代表只有第一次拥有他的消费者才能一直使用
  4. 是否自动删除,连接停掉后是否删除这个队列
  5. 额外参数

发布消息

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

参数

  1. 交换机
  2. 队列名
  3. 额外设置属性
  4. 消息字节数组

消费消息

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});

参数

  • 队列名称
  • 是否自动ack
  • 消费回调
  • 取消回调

手动 ack

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

RabbitMQ 六种工作模式

  1. hello-world img

  2. 工作队列 img

  3. 发布订阅 img

  4. 路由(精准匹配) img

  5. 主题模式(模糊匹配) img

  6. RPC img

工作队列模式

img

  • 一个队列有多个消费者
  • 有多个消息的情况下,Work Queue 将消息分派个不同的消费者,消费者都会收到不同的消息,消费者可以控制消费速率
// 只处理一个任务
channel.basicQos(1);

发布/订阅(fanout)

img

  • 生产者将数据发送到Exchange
  • 交换机根据某些规则将消息送入相关队列
  • 发布/订阅模式,交换机无差别的发送消息到队列,所有消费者拿到的消息完全相同,这种类型成为 fanout

发送到交换机

channel.basicPublish(EXCHANGE_NAME, "", null, "good day".getBytes());

声明队列,绑定交换机,

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

路由

img

通过 Routing key 绑定交换机与队列

交换机与队列的关系可以是多对多,通过消息 通过 路由键交换机 分发到接受相应路由键队列

发布

  channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String routingKey = "warning";
            String message = "eroor";
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

接收

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);

Topic

常用

img

在路由的基础上,增加了模糊匹配功能

  • * 匹配单个关键字

  • # 匹配所有关键字

声明队列,发送消息

        String routingKey = "china.beijing";
        try (Connection connection = ConnectionUtil.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, "good day".getBytes());
        }

接受消息,模糊匹配 china.*

        String routingKey = "china.*";
        Connection connection = ConnectionUtil.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                log.info(QUEUE_NAME + " 收到:" + new String(body, StandardCharsets.UTF_8));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

消息确认机制

RabbitMQ 充当了 Broker .

生产者 (Producer) 确认 Broker 是否正确接受消息。

RabbitMQ 提供了Listener来 代表 消息投递的状态。

只与 ProducerBroker 有关。

有两种状态:

  • Confirm
  • Return

Confirm 代表 消息送到Broker时产生的状态,有两者情况

  • ack 正常接收
  • nack 拒收消息,原因有多种,队列已满,限流,IO异常。。。

Return 代表消息被Broker ack后,Broker没有相应队列投递,消息被退回给Producer


        //开启 confirm 监听模式
        channel.confirmSelect();
        //设置回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {

            }
        });
   // 设置mandatory=true 可以将消息退回给生产者 第三个参数
        channel.basicPublish(EXCHANGE_NAME, routingKey, true, null, "good day".getBytes());

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                
            }
        });

return 数据处理措施:

  1. 创建相关队列
  2. 丢弃

Spring Boot 整合

依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

基本配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

# 设置发送确认
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=simple

发送

 @Autowired
 private RabbitTemplate rabbitTemplate;
 
rabbitTemplate.convertAndSend("exchange", "routing", "tset");

发送确认

             rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("correlationData : {}", correlationData);
                log.info("ack : {}", ack);
                if (!ack) {
                    log.error("cause: {}", cause);
                }
            });
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.info("replyCode: {}", replyCode);
                log.info("replyText: {}", replyText);
                log.info("exchange: {}", exchange);
                log.info("routingKey: {}", routingKey);
                log.info("message: {}", message);
            });
            CorrelationData correlationData = new CorrelationData(String.valueOf(System.currentTimeMillis()));
            rabbitTemplate.convertAndSend("test", "test", "hello", correlationData);

消费者

配置

# 手动ack模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 最小消费者
spring.rabbitmq.listener.simple.concurrency=1
# 最大消费者数量 自动扩容
spring.rabbitmq.listener.simple.max-concurrency=10

声明队列,交换机

  @Bean
  Queue queue() {
    return new Queue(queueName, false);
  }

  @Bean
  TopicExchange exchange() {
    return new TopicExchange(topicExchangeName);
  }

  @Bean
  Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
  }
Last Updated:
Contributors: himcs, himcs, mcs