rabbitmq-rpc-远程调用-返回 发表于 2019-02-28 | in rabbitmq | | 访客 在使用rabbitmq的时候,有时候我们调用后,还需要有返回值,这就涉及到了rpc调用 说明RabbitMQ的服务器安装在192.168.125.231上 插件安装1pip install pika 服务端用于等待客户端消息rpc_server.py1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162# -*- coding: utf-8 -*-import jsonimport osimport pikaimport subprocessimport threadingimport timeserver = '192.168.0.158'port = 5672queue_name = 'test_3'credentials = pika.PlainCredentials(username='', password='')parameters = pika.ConnectionParameters(host=server, port=port, credentials=credentials)connection = pika.BlockingConnection(parameters=parameters)# 创建通道channel = connection.channel()#channel.exchange_declare(exchange=exchange, exchange_type='direct')#channel.queue_declare(queue=queue_name, durable=True) # durable 队列持久化,服务端宕机 消息不丢失#channel.queue_declare(queue=queue_name)def create_dir(dir): if not os.path.exists(dir): os.makedirs(dir)def start_minio(data): stor_path = '/root/.minio_user/%s' % data['name'] create_dir(stor_path) cmd = "export MINIO_ACCESS_KEY=%s && export MINIO_SECRET_KEY=%s && cd %s ; setsid minio server --address :%s /mnt/%s/ > %s.log 2>&1 & " % (data['access_key'], data['secret_key'], stor_path, data['port'], data['name'], data['name']) proc= subprocess.Popen(cmd, shell=True) out, err = proc.communicate() print out, err time.sleep(0.5)# ch 管道(channel)# method 消息发送给哪个queue# props 返回给消费者的返回参数# body 返回给消费者的数据对象def callback(ch, method, props, body): data = json.loads(body) print 'Received json data: %s' % data start_minio(data) resp = {'code': 200, 'message': 'Created success!'} print 'Reply to: %s' % props.reply_to ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=json.dumps(resp)) # 确保任务完成 ch.basic_ack(delivery_tag=method.delivery_tag)#可能会执行多个服务端,为了在多个服务端上均匀的分布负荷,如果就一个,可以不设置该参数channel.basic_qos(prefetch_count=1)# 告诉rabbitmq使用callback来接收信息#channel.basic_consume(callback, queue=queue_name, no_ack=False)#no_ack,消费者连接断了,消息不丢失channel.basic_consume(callback, queue=queue_name)# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理,按ctrl+c退出print ' [*] Waiting for messages. To exit press CTRL+C'channel.start_consuming() 客户端rpc_client.py1234567891011121314151617181920212223242526272829303132333435363738394041424344454647import jsonimport pikaimport timeimport uuidclass RabbitMQRpcClient(object): def __init__(self): self._get_conn_params() self.queue_name = '' self.routing_key = '' self._get_callback_queue() self.response = None self.corr_id = str(uuid.uuid4()) def _get_callback_queue(self): self.connection = pika.BlockingConnection(parameters=self.conn_params) self.channel = self.connection.channel() result_queue = self.channel.queue_declare(exclusive=True) self.callback_queue = result_queue.method.queue self.channel.basic_consume(self.on_response, no_ack=False, queue=self.callback_queue) def _get_conn_params(self): self.credentials = pika.PlainCredentials(username='', password='') self.conn_params = pika.ConnectionParameters(host='', port='', credentials=self.credentials) def on_response(self, channel, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, **kwargs): self.channel.basic_publish(exchange='', routing_key=self.routing_key, properties=pika.BasicProperties(reply_to=self.callback_queue, correlation_id=self.corr_id), body=json.dumps(kwargs)) while self.response is None: self.connection.process_data_events() time.sleep(0.5) return json.loads(self.response)