RabbitMQ Start
为了解决程序间通讯的问题
Java 客户端
依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
连接模板
        factory = new ConnectionFactory();
        factory.setHost("himcs.io");
        factory.setUsername("mcs");
        factory.setPassword("mcs");
        factory.setVirtualHost("/test");
                try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        }
核心对象
- Connection TCP物理连接 
- Channel 通信通道,TCP虚拟连接 
声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
参数
- 队列名称
- 是否持久化
- 是否私有化,fase代表所有消费者都可访问,true代表只有第一次拥有他的消费者才能一直使用
- 是否自动删除,连接停掉后是否删除这个队列
- 额外参数
发布消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
参数
- 交换机
- 队列名
- 额外设置属性
- 消息字节数组
消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
参数
- 队列名称
- 是否自动ack
- 消费回调
- 取消回调
手动 ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
RabbitMQ 六种工作模式
- hello-world  
- 工作队列  
- 发布订阅  
- 路由(精准匹配)  
- 主题模式(模糊匹配)  
- RPC  
工作队列模式

- 一个队列有多个消费者
- 有多个消息的情况下,Work Queue 将消息分派个不同的消费者,消费者都会收到不同的消息,消费者可以控制消费速率
// 只处理一个任务
channel.basicQos(1);
发布/订阅(fanout)

- 生产者将数据发送到Exchange
- 交换机根据某些规则将消息送入相关队列
- 发布/订阅模式,交换机无差别的发送消息到队列,所有消费者拿到的消息完全相同,这种类型成为 fanout
发送到交换机
channel.basicPublish(EXCHANGE_NAME, "", null, "good day".getBytes());
声明队列,绑定交换机,
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
路由

通过 Routing key 绑定交换机与队列
交换机与队列的关系可以是多对多,通过消息 通过 路由键 由 交换机 分发到接受相应路由键的 队列
发布
  channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String routingKey = "warning";
            String message = "eroor";
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
接收
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
Topic
常用

在路由的基础上,增加了模糊匹配功能
- *匹配单个关键字
- #匹配所有关键字
声明队列,发送消息
        String routingKey = "china.beijing";
        try (Connection connection = ConnectionUtil.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, "good day".getBytes());
        }
接受消息,模糊匹配 china.*
        String routingKey = "china.*";
        Connection connection = ConnectionUtil.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                log.info(QUEUE_NAME + " 收到:" + new String(body, StandardCharsets.UTF_8));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
消息确认机制
RabbitMQ 充当了 Broker .
生产者 (Producer) 确认 Broker 是否正确接受消息。
RabbitMQ 提供了Listener来 代表 消息投递的状态。
只与 Producer 和 Broker 有关。
有两种状态:
- Confirm
- Return
Confirm 代表 消息送到Broker时产生的状态,有两者情况
- ack正常接收
- nack拒收消息,原因有多种,队列已满,限流,IO异常。。。
Return 代表消息被Broker ack后,Broker没有相应队列投递,消息被退回给Producer
        //开启 confirm 监听模式
        channel.confirmSelect();
        //设置回调
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                
            }
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            }
        });
   // 设置mandatory=true 可以将消息退回给生产者 第三个参数
        channel.basicPublish(EXCHANGE_NAME, routingKey, true, null, "good day".getBytes());
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                
            }
        });
return 数据处理措施:
- 创建相关队列
- 丢弃
Spring Boot 整合
依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
基本配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
# 设置发送确认
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=simple
发送
 @Autowired
 private RabbitTemplate rabbitTemplate;
 
rabbitTemplate.convertAndSend("exchange", "routing", "tset");
发送确认
             rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("correlationData : {}", correlationData);
                log.info("ack : {}", ack);
                if (!ack) {
                    log.error("cause: {}", cause);
                }
            });
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.info("replyCode: {}", replyCode);
                log.info("replyText: {}", replyText);
                log.info("exchange: {}", exchange);
                log.info("routingKey: {}", routingKey);
                log.info("message: {}", message);
            });
            CorrelationData correlationData = new CorrelationData(String.valueOf(System.currentTimeMillis()));
            rabbitTemplate.convertAndSend("test", "test", "hello", correlationData);
消费者
配置
# 手动ack模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 最小消费者
spring.rabbitmq.listener.simple.concurrency=1
# 最大消费者数量 自动扩容
spring.rabbitmq.listener.simple.max-concurrency=10
声明队列,交换机
  @Bean
  Queue queue() {
    return new Queue(queueName, false);
  }
  @Bean
  TopicExchange exchange() {
    return new TopicExchange(topicExchangeName);
  }
  @Bean
  Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
  }
