架构之旅-扩展-数据-Messaging_RabbitMQ

简述

本文中,将使用rabbitmq来提供消息队列处理功能

源代码

iarc-data

环境

IDE: IntelliJ IDEA
JDK: 1.8.0_u162
Framework: spring boot
Middleware: RabbitMQ

开发

生产者

application.properties

1
2
3
4
5
6
7
8
9
10
11
spring.rabbitmq.addresses=192.168.56.103:5672
spring.rabbitmq.username=root
spring.rabbitmq.password=***
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=3000
spring.rabbitmq.listener.retry.enabled=true
spring.rabbitmq.listener.retry.max-attempts=1
spring.rabbitmq.listener.concurrency=1
spring.rabbitmq.listener.prefetch=3
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.requested-heartbeat=60

pom.xml

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
public class RabbitMQConfig implements InitializingBean {
public static final String DATA_RECEIVER_EXCHANGE = "data-receiver-exchange";
public static final String DATA_RECEIVER_QUEUE = "data-receiver-queue";
public static final String DATA_RECEIVER_KEY = "data-receiver-key";

@Autowired
private CachingConnectionFactory cachingConnectionFactory;

//为了开启setAutomaticRecoveryEnabled功能,连接中断后自动恢复
@Override
public void afterPropertiesSet() {
com.rabbitmq.client.ConnectionFactory conn = cachingConnectionFactory.getRabbitConnectionFactory();
conn.setAutomaticRecoveryEnabled(true);
conn.setTopologyRecoveryEnabled(true);
}

@Bean
public Queue dataReceiverQueue() {
Map<String, Object> args = new HashMap<String, Object>();
return new Queue(DATA_RECEIVER_QUEUE, true, false, false, args);
}

@Bean
TopicExchange dataReceiverExchange() {
return new TopicExchange(DATA_RECEIVER_EXCHANGE);
}

@Bean
Binding bindingDataReceive() {
return BindingBuilder.bind(dataReceiverQueue()).to(dataReceiverExchange()).with(DATA_RECEIVER_KEY);
}
}

iarc-service更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@RestController
@RequestMapping(value = "/data_receiver")
@Api(tags = "data_receiver", description = "数据接收接口")
public class DataReceiverController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}

@ApiOperation(value="接受消息", notes="接受消息")
@CrossOrigin(origins = "*")
@PostMapping("receive")
public void receive(@RequestBody User user) {
sendToMQ(user.getName());
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功:" + correlationData);
} else {
System.out.println("消息发送失败:" + cause);
}
}

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败");
}

private void sendToMQ(String msg){
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(RabbitMQConfig.DATA_RECEIVER_EXCHANGE,
RabbitMQConfig.DATA_RECEIVER_KEY, msg, correlationId);
}
}

消费者

application.properties

与生产者相同

pom.xml

与生产者相同

RabbitMQConfig

与生产者相同

iarc-data更新

1
2
3
4
5
6
7
@Component
public class DataReceiver {
@RabbitListener(queues = RabbitMQConfig.DATA_RECEIVER_QUEUE)
public void process(String msg) {
System.out.println(Thread.currentThread().getName() + ": " + msg);
}
}

rabbitgmq集群

设置好集群后,仅需修改application.properties

1
spring.rabbitmq.addresses=192.168.56.103:5672,192.168.56.101:5672

调试

测试消息的生产与消费

使用postman发起请求 http://localhost:8001/data_receiver/receive
RequestBody如下

1
{"name":"iarc"}

可以观察到IDE中iarc-data相关站点console有日志输出

使用rabbitmq远程管理

登入http://192.168.56.103:15672,可观察到Connections, Exchanges, Queues下均有对应的内容

部署

maven 编译打包

1
mvn clean install -Dmaven.test.skip=true

部署data/service至centos

将打包好的iarc-service.war, iarc-data.war复制到 $CATALINA_HOME/webapps目录下,tomcat会自动解压war包。使用postman发起post请求http://192.168.56.103:8080/iarc-service/data_receiver/receive进行验证

参考