rabbitmq-rpc-远程调用-返回

在使用rabbitmq的时候,有时候我们调用后,还需要有返回值,这就涉及到了rpc调用

说明

RabbitMQ的服务器安装在192.168.125.231上

插件安装

1
pip install pika

服务端

用于等待客户端消息
rpc_server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# -*- coding: utf-8 -*-
import json
import os
import pika
import subprocess
import threading
import time
server = '192.168.0.158'
port = 5672
queue_name = 'test_3'
credentials = pika.PlainCredentials(username='', password='')
parameters = pika.ConnectionParameters(host=server, port=port, credentials=credentials)
connection = pika.BlockingConnection(parameters=parameters)
# 创建通道
channel = connection.channel()
#channel.exchange_declare(exchange=exchange, exchange_type='direct')
#channel.queue_declare(queue=queue_name, durable=True) # durable 队列持久化,服务端宕机 消息不丢失
#channel.queue_declare(queue=queue_name)
def create_dir(dir):
if not os.path.exists(dir):
os.makedirs(dir)
def start_minio(data):
stor_path = '/root/.minio_user/%s' % data['name']
create_dir(stor_path)
cmd = "export MINIO_ACCESS_KEY=%s && export MINIO_SECRET_KEY=%s && cd %s ; setsid minio server --address :%s /mnt/%s/ > %s.log 2>&1 & " % (data['access_key'], data['secret_key'], stor_path, data['port'], data['name'], data['name'])
proc= subprocess.Popen(cmd, shell=True)
out, err = proc.communicate()
print out, err
time.sleep(0.5)
# ch 管道(channel)
# method 消息发送给哪个queue
# props 返回给消费者的返回参数
# body 返回给消费者的数据对象
def callback(ch, method, props, body):
data = json.loads(body)
print 'Received json data: %s' % data
start_minio(data)
resp = {'code': 200, 'message': 'Created success!'}
print 'Reply to: %s' % props.reply_to
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=json.dumps(resp))
# 确保任务完成
ch.basic_ack(delivery_tag=method.delivery_tag)
#可能会执行多个服务端,为了在多个服务端上均匀的分布负荷,如果就一个,可以不设置该参数
channel.basic_qos(prefetch_count=1)
# 告诉rabbitmq使用callback来接收信息
#channel.basic_consume(callback, queue=queue_name, no_ack=False)#no_ack,消费者连接断了,消息不丢失
channel.basic_consume(callback, queue=queue_name)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理,按ctrl+c退出
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

客户端

rpc_client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import json
import pika
import time
import uuid
class RabbitMQRpcClient(object):
def __init__(self):
self._get_conn_params()
self.queue_name = ''
self.routing_key = ''
self._get_callback_queue()
self.response = None
self.corr_id = str(uuid.uuid4())
def _get_callback_queue(self):
self.connection = pika.BlockingConnection(parameters=self.conn_params)
self.channel = self.connection.channel()
result_queue = self.channel.queue_declare(exclusive=True)
self.callback_queue = result_queue.method.queue
self.channel.basic_consume(self.on_response, no_ack=False,
queue=self.callback_queue)
def _get_conn_params(self):
self.credentials = pika.PlainCredentials(username='',
password='')
self.conn_params = pika.ConnectionParameters(host='',
port='',
credentials=self.credentials)
def on_response(self, channel, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, **kwargs):
self.channel.basic_publish(exchange='',
routing_key=self.routing_key,
properties=pika.BasicProperties(reply_to=self.callback_queue,
correlation_id=self.corr_id),
body=json.dumps(kwargs))
while self.response is None:
self.connection.process_data_events()
time.sleep(0.5)
return json.loads(self.response)