嘘~ 正在从服务器偷取页面 . . .

使用 RabbitMQ Pyhon 模块和 Java 模块进行数据交互


2023/9/22 🌦️
这里只是简单记录开发流程,感觉还挺有用担心以后忘了。由于是本地测试项目,没有使用 Docker 进行部署。

流程分析:

  1. Java 端发送消息

    • Java 端作为消息的生产者,使用 RabbitMQ 的客户端库连接到 RabbitMQ 服务器。
    • Java 端将要发送的消息通过 RabbitMQ 发送到一个指定的队列(或交换机,然后路由到队列)。
    • 在发送消息时,Java 端可以将消息序列化为 JSON 格式等。
  2. RabbitMQ 服务器

    • RabbitMQ 服务器接收到 Java 端发送的消息。
    • 服务器根据消息的路由键和交换机配置,将消息路由到指定的队列。
  3. Python 端接收消息

    • Python 端作为消息的消费者,同样使用 RabbitMQ 的客户端库连接到 RabbitMQ 服务器。
    • Python 端监听指定的队列,等待从队列中接收消息。
  4. 消息传递

    • 一旦 Java 端将消息发送到队列并被 RabbitMQ 服务器路由到队列中,Python 端就可以开始接收消息。
    • Python 端的消息监听器会检测到队列中有新消息,并触发一个回调函数来处理接收到的消息。
    • 在回调函数中,Python 端可以对消息进行解析(例如,从 JSON 格式解析为 Python 对象)并执行相关操作,比如打印消息内容。
  5. 消息处理和确认

    • Python 端处理完消息后,可以向 RabbitMQ 服务器发送确认(ACK)消息,告诉服务器已成功处理该消息。
    • 如果 Python 端处理消息时发生错误或者需要重试,可以选择不发送确认消息,这将导致消息保留在队列中供后续处理。

Java 部分(生产者Produce)

由于只是一个测试案例,所以是经典 SpringBoot 整合

引入 RabbitMQ 依赖

<!-- RabbitMQ 依赖 -->  
<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

源码部分:
(Controller 层)

@RestController  
@RequestMapping("/api")  
public class RabbitMQController {  
  
    @Autowired  
    private RabbitMQProducerService rabbitMQProducerService;  
  
    @PostMapping("/send")  
    public String sendJsonMessage(@RequestBody Object jsonData) {  
        rabbitMQProducerService.sendJsonMessage(jsonData);  
        System.out.println(jsonData);  
        return "JSON message sent to RabbitMQ!";  
    }  
}

(Service 层)

public interface RabbitMQProducerService {  
    void sendJsonMessage(Object jsonData);  
}

(ServiceImpl 实现类)

@Service  
public class RabbitMQProducerServiceImpl implements RabbitMQProducerService {  
  
    @Autowired(required = false)  
    private AmqpTemplate rabbitTemplate;  
  
    @Override  
    public void sendJsonMessage(Object jsonData) {  
        try {  
            String jsonStr = new ObjectMapper().writeValueAsString(jsonData);  
            rabbitTemplate.convertAndSend("stock-exchange", "stock-key", jsonStr);  
        } catch (Exception e) {  
            // 处理异常  
        }  
    }  
}

(application.yml 配置文件)

rabbitmq:  
  host: localhost  
  port: 5672  
  username: guest  
  password: guest  
  virtual-host: /

Python 部分(消费者)

源码部分:
(Consumer)

import pika  
  
# RabbitMQ服务器连接配置  
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
channel = connection.channel()  
print("建立连接")  
  
# 声明要监听的队列,确保与Java端的队列名称相匹配  
queue_name = 'stock-queue'  
channel.queue_declare(queue=queue_name, durable=True)  
  
# 建立回调函数,获取队列消息内容  
def callback(ch, method, properties, body):  
    try:  
        print("连接成功")  
        print("回调函数执行成功")  
        print("Received JSON data:", body)  
    except Exception as e:  
        print("Error:", str(e))  
  
# 设置回调函数,当有消息到达时将被调用  
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)  
  
print('Waiting for messages. To exit, press Ctrl+C')  
channel.start_consuming()

启动流程

  1. 首先启动 RabbitMQ
  2. 第一次启动时,先启动生产者(Java端)后启动消费者(Python端)
  3. 最后通过 API 发送 JSON 数据进行测试。

测试结果

这里使用 ApiPost 携带一个json请求数据发送给消费者端

可以看到如下,生产者已经成功将消息传输给 MQ,消费者成功拿到数据

细节阐述

关于手动新建 RabbitMQ 交换机和队列并将其进行绑定。

新建一个交换机:名为 stock-exchange
新建一个队列:名为 stock-queue

将二者进行绑定

注意:生产端和消费者端的队列名字必须一致。
ok 解散!👍


文章作者: 清风摇翠
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 清风摇翠 !
评论
  目录