RabbitMQ Start
为了解决程序间通讯的问题
Java 客户端
依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
连接模板
factory = new ConnectionFactory();
factory.setHost("himcs.io");
factory.setUsername("mcs");
factory.setPassword("mcs");
factory.setVirtualHost("/test");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
}
核心对象
Connection TCP物理连接
Channel 通信通道,TCP虚拟连接
声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
参数
- 队列名称
- 是否持久化
- 是否私有化,fase代表所有消费者都可访问,true代表只有第一次拥有他的消费者才能一直使用
- 是否自动删除,连接停掉后是否删除这个队列
- 额外参数
发布消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
参数
- 交换机
- 队列名
- 额外设置属性
- 消息字节数组
消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
参数
- 队列名称
- 是否自动ack
- 消费回调
- 取消回调
手动 ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
RabbitMQ 六种工作模式
hello-world
工作队列
发布订阅
路由(精准匹配)
主题模式(模糊匹配)
RPC
工作队列模式
- 一个队列有多个消费者
- 有多个消息的情况下,Work Queue 将消息分派个不同的消费者,消费者都会收到不同的消息,消费者可以控制消费速率
// 只处理一个任务
channel.basicQos(1);
发布/订阅(fanout)
- 生产者将数据发送到Exchange
- 交换机根据某些规则将消息送入相关队列
- 发布/订阅模式,交换机无差别的发送消息到队列,所有消费者拿到的消息完全相同,这种类型成为
fanout
发送到交换机
channel.basicPublish(EXCHANGE_NAME, "", null, "good day".getBytes());
声明队列,绑定交换机,
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
路由
通过 Routing key
绑定交换机与队列
交换机与队列的关系可以是多对多,通过消息 通过 路由键
由 交换机
分发到接受相应路由键
的 队列
发布
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String routingKey = "warning";
String message = "eroor";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
接收
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
Topic
常用
在路由的基础上,增加了模糊匹配功能
*
匹配单个关键字#
匹配所有关键字
声明队列,发送消息
String routingKey = "china.beijing";
try (Connection connection = ConnectionUtil.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, "good day".getBytes());
}
接受消息,模糊匹配 china.*
String routingKey = "china.*";
Connection connection = ConnectionUtil.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info(QUEUE_NAME + " 收到:" + new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
消息确认机制
RabbitMQ
充当了 Broker
.
生产者 (Producer
) 确认 Broker
是否正确接受消息。
RabbitMQ
提供了Listener
来 代表 消息投递的状态。
只与 Producer
和 Broker
有关。
有两种状态:
- Confirm
- Return
Confirm
代表 消息送到Broker
时产生的状态,有两者情况
ack
正常接收nack
拒收消息,原因有多种,队列已满,限流,IO异常。。。
Return
代表消息被Broker
ack
后,Broker
没有相应队列投递,消息被退回给Producer
//开启 confirm 监听模式
channel.confirmSelect();
//设置回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
}
});
// 设置mandatory=true 可以将消息退回给生产者 第三个参数
channel.basicPublish(EXCHANGE_NAME, routingKey, true, null, "good day".getBytes());
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
}
});
return 数据处理措施:
- 创建相关队列
- 丢弃
Spring Boot 整合
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
基本配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
# 设置发送确认
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=simple
发送
@Autowired
private RabbitTemplate rabbitTemplate;
rabbitTemplate.convertAndSend("exchange", "routing", "tset");
发送确认
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("correlationData : {}", correlationData);
log.info("ack : {}", ack);
if (!ack) {
log.error("cause: {}", cause);
}
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("replyCode: {}", replyCode);
log.info("replyText: {}", replyText);
log.info("exchange: {}", exchange);
log.info("routingKey: {}", routingKey);
log.info("message: {}", message);
});
CorrelationData correlationData = new CorrelationData(String.valueOf(System.currentTimeMillis()));
rabbitTemplate.convertAndSend("test", "test", "hello", correlationData);
消费者
配置
# 手动ack模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 最小消费者
spring.rabbitmq.listener.simple.concurrency=1
# 最大消费者数量 自动扩容
spring.rabbitmq.listener.simple.max-concurrency=10
声明队列,交换机
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}