import pika
from pika.exchange_type import ExchangeType
class Producer(object):
    def __init__(self, queue_name,exchange_name, username, password, host, port, virtual_host):
        con_param = {
            "host": host,
            "port": port,
            "virtual_host": virtual_host,
            "credentials": pika.credentials.PlainCredentials(
                username, password)
        }
        # 建立连接
        self.con = pika.BlockingConnection(pika.ConnectionParameters(**con_param))
        # 声明队列
        self.channel = self.con.channel()
        self.channel.queue_declare(queue=queue_name)
        self.channel.exchange_declare(exchange=exchange_name, exchange_type=ExchangeType.fanout)
    def send_message(self,queue_name,exchange_name,routing_key, body):
        """fanout 类型的交换机 routing_key 为空字符串,给所有绑定这个交换价的队列发送消息"""
        # 绑定交换机
        self.channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key="")
        # 发送消息
        self.channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=body)
        # 关闭通道
        self.channel.close()
        # 关闭连接
        self.con.close()
if __name__ == '__main__':
    p = Producer("test", "logs","tom", "tom@tom", "localhost", 5672, "/afei")
    p.send_message("test", "logs","","have a good time")
    p1 = Producer("test01", "logs","tom", "tom@tom", "localhost", 5672, "/afei")
    p1.send_message("test01","logs","","good luck !")
Subscribe
import pika
class Consumer(object):
    def __init__(self, queue_name, username, password, host, port, virtual_host):
        con_param = {
            "host": host,
            "port": port,
            "virtual_host": virtual_host,
            "credentials": pika.credentials.PlainCredentials(
                username, password)
        }
        # 建立连接
        self.con = pika.BlockingConnection(pika.ConnectionParameters(**con_param))
        # 创建通道
        self.channel = self.con.channel()
        self.queue_name = queue_name
    def consume_message(self):
        def callback(ch, method, properties, body):
            print("ch===%r" % ch)
            print("method===%r" % method)
            print("properties===%r" % properties)
            print("[x] Received %r" % body)
        # 消费对象
        self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback, auto_ack=True)
        # 开始消费
        self.channel.start_consuming()
        self.channel.close()
if __name__ == '__main__':
    try:
        c = Consumer("test", "tom", "tom@tom", "localhost", 5672, "/afei")
        c.consume_message()
    except KeyboardInterrupt:
        exit(0)










