0%

Rabbitmq 学习

概念

RabbitMQ作为一个消息队列提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全可靠。消息(Message)由Client发送,RabbitMQ接收到消息之后通过交换机转发到对应的队列上面。Worker会从队列中获取未被读取的数据处理。谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

交换机

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout
这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

  • Direct:direct 类型的行为是“先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  • Topic:按规则转发消息(最灵活)
  • Headers:设置 header attribute 参数类型的交换机
  • Fanout:转发消息到所有绑定队列

Direct Exchange

Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

Topic Exchange

Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
在这种交换机模式下:

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
  • 路由模式必须包含一个 星号(),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements…b.,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。
  • *表示一个词.
  • #表示零个或多个词.

Headers Exchange

headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

Fanout Exchange

Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。

代码示例(TopicExchange)

pom.xm加入依赖

1
2
3
4
      <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml添加配置

1
2
3
4
5
6
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

TopicRabbitMqConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.java.study.rabbitmq.topic;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitMqConfig {
final static String TOPIC_ONE = "topic.one";
final static String TOPIC_TWO = "topic.two";
final static String TOPIC_EXCHANGE = "topicExchange";
/**
* Topic 交换机模式 可以用通配符
*/

//创建两个 Queue
@Bean
public Queue queue_one(){
return new Queue(TOPIC_ONE);
}

@Bean
public Queue queue_two(){
return new Queue(TOPIC_TWO);
}

//配置 TopicExchange,指定名称为 topicExchange
@Bean
public TopicExchange exchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}

//给队列绑定 exchange 和 routing_key

@Bean
public Binding bindingExchangeMessage(Queue queue_one, TopicExchange exchange){
return BindingBuilder.bind(queue_one).to(exchange).with("topic.one");
}

@Bean
public Binding bingingExchangeMessages(Queue queue_two,TopicExchange exchange){
return BindingBuilder.bind(queue_two).to(exchange).with("topic.#");
}
}

TopicSender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.java.study.rabbitmq.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicSender {

@Autowired
AmqpTemplate amqpTemplate;

public void send_one(){
String context = "Hi,I am message one";
this.amqpTemplate.convertAndSend(TopicRabbitMqConfig.TOPIC_EXCHANGE,"topic.one",context);
}
public void send_two(){
String context = "Hi,I am message two";
this.amqpTemplate.convertAndSend(TopicRabbitMqConfig.TOPIC_EXCHANGE,"topic.two",context);
}

}

TopicReceiver1.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.java.study.rabbitmq.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.one")
public class TopicReceiver1 {

@RabbitHandler
public void process(String message){
System.out.println("Receiver1 topic.one :"+ message);
}
}

TopicReceiver2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.java.study.rabbitmq.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.two")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver2 topic.two :"+ message);
}
}

RabbitmqdemoApplicationTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitmqdemoApplicationTest {

@Autowired
TopicSender topicSender;

@Test
public void topic(){
topicSender.send_one();
topicSender.send_two();
}

}

topicSender.send_one()打印日志如下:

topicSender.send_two()打印如下:

由此可说明发送send_one会匹配到topic.#和topic.one,所以两个Receiver都可以收到消息,发送send_two只有topic.#可以匹配所有,只有Receiver2监听到消息。

欣赏此文?求鼓励,求支持!