2023/9/22 🌦️
这里只是简单记录开发流程,感觉还挺有用担心以后忘了。由于是本地测试项目,没有使用 Docker 进行部署。
流程分析:
Java 端发送消息:
- Java 端作为消息的生产者,使用 RabbitMQ 的客户端库连接到 RabbitMQ 服务器。
- Java 端将要发送的消息通过 RabbitMQ 发送到一个指定的队列(或交换机,然后路由到队列)。
- 在发送消息时,Java 端可以将消息序列化为 JSON 格式等。
RabbitMQ 服务器:
- RabbitMQ 服务器接收到 Java 端发送的消息。
- 服务器根据消息的路由键和交换机配置,将消息路由到指定的队列。
Python 端接收消息:
- Python 端作为消息的消费者,同样使用 RabbitMQ 的客户端库连接到 RabbitMQ 服务器。
- Python 端监听指定的队列,等待从队列中接收消息。
消息传递:
- 一旦 Java 端将消息发送到队列并被 RabbitMQ 服务器路由到队列中,Python 端就可以开始接收消息。
- Python 端的消息监听器会检测到队列中有新消息,并触发一个回调函数来处理接收到的消息。
- 在回调函数中,Python 端可以对消息进行解析(例如,从 JSON 格式解析为 Python 对象)并执行相关操作,比如打印消息内容。
消息处理和确认:
- Python 端处理完消息后,可以向 RabbitMQ 服务器发送确认(ACK)消息,告诉服务器已成功处理该消息。
- 如果 Python 端处理消息时发生错误或者需要重试,可以选择不发送确认消息,这将导致消息保留在队列中供后续处理。
Java 部分(生产者Produce)
由于只是一个测试案例,所以是经典 SpringBoot 整合
引入 RabbitMQ 依赖
<!-- RabbitMQ 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
源码部分:
(Controller 层)
@RestController
@RequestMapping("/api")
public class RabbitMQController {
@Autowired
private RabbitMQProducerService rabbitMQProducerService;
@PostMapping("/send")
public String sendJsonMessage(@RequestBody Object jsonData) {
rabbitMQProducerService.sendJsonMessage(jsonData);
System.out.println(jsonData);
return "JSON message sent to RabbitMQ!";
}
}
(Service 层)
public interface RabbitMQProducerService {
void sendJsonMessage(Object jsonData);
}
(ServiceImpl 实现类)
@Service
public class RabbitMQProducerServiceImpl implements RabbitMQProducerService {
@Autowired(required = false)
private AmqpTemplate rabbitTemplate;
@Override
public void sendJsonMessage(Object jsonData) {
try {
String jsonStr = new ObjectMapper().writeValueAsString(jsonData);
rabbitTemplate.convertAndSend("stock-exchange", "stock-key", jsonStr);
} catch (Exception e) {
// 处理异常
}
}
}
(application.yml 配置文件)
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
Python 部分(消费者)
源码部分:
(Consumer)
import pika
# RabbitMQ服务器连接配置
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
print("建立连接")
# 声明要监听的队列,确保与Java端的队列名称相匹配
queue_name = 'stock-queue'
channel.queue_declare(queue=queue_name, durable=True)
# 建立回调函数,获取队列消息内容
def callback(ch, method, properties, body):
try:
print("连接成功")
print("回调函数执行成功")
print("Received JSON data:", body)
except Exception as e:
print("Error:", str(e))
# 设置回调函数,当有消息到达时将被调用
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print('Waiting for messages. To exit, press Ctrl+C')
channel.start_consuming()
启动流程
- 首先启动 RabbitMQ
- 第一次启动时,先启动生产者(Java端)后启动消费者(Python端)
- 最后通过 API 发送 JSON 数据进行测试。
测试结果
这里使用 ApiPost 携带一个json请求数据发送给消费者端
可以看到如下,生产者已经成功将消息传输给 MQ,消费者成功拿到数据
细节阐述
关于手动新建 RabbitMQ 交换机和队列并将其进行绑定。
新建一个交换机:名为 stock-exchange
新建一个队列:名为 stock-queue
将二者进行绑定
注意:生产端和消费者端的队列名字必须一致。
ok 解散!👍