博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python之上下文管理、redis的发布订阅、rabbitmq
阅读量:4982 次
发布时间:2019-06-12

本文共 6676 字,大约阅读时间需要 22 分钟。

使用with打开文件的方式,是调用了上下文管理的功能

1 #打开文件的两种方法: 2  3 f = open('a.txt','r') 4  5 with open('a.txt','r') as f  6  7 实现使用with关闭socket 8 import contextlib 9 import socket10 11 @contextlib.contextmanage12 def Sock(ip,port):13     socket = socket.socket()14     socket.bind((ip,port))15     socket.listen(5)16     try:17         yield socket18     finally:19         socket.close()20 21 #执行Sock函数传入参数,执行到yield socket返回值给s,执行with语句体,执行finally后面的语句22 with Sock('127.0.0.1',8000) as s:23     print(s)

redis的发布订阅

class RedisHelper:    def __init__(self):        #调用类时自动连接redis        self.__conn = redis.Redis(host='192.168.1.100')    def public(self, msg, chan):        self.__conn.publish(chan, msg)        return True    def subscribe(self, chan):        pub = self.__conn.pubsub()        pub.subscribe(chan)        pub.parse_response()        return pub#订阅者import s3obj = s3.RedisHelper()data = obj.subscribe('fm111.7')print(data.parse_response())#发布者import s3obj = s3.RedisHelper()obj.public('alex db', 'fm111.7')

 RabbitMQ

1 #消费者 2 import pika 3  4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1')) 5 channel = connection.channel()#创建对象 6  7 channel.queue_declare(queue = 'wocao') 8 def callback(ch,method,properties,body): 9     print("[x] Received %r"%body)10 11 channel.basic_consume(callback,queue = 'wocao',no_ack = True)12 print('[*] Waiting for messages. To exit press CTRL+C')13 channel.start_consuming()14 15 #生产者16 import pika17 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))18 channel = connection.channel()19 channel.queue_declare(queue = 'wocao')#指定一个队列,不存在此队列则创建20 channel.basic_publish(exchange = '',routing_key = 'wocao',body = 'hello world!')21 print("[x] Sent 'hello world!")22 connection.close()

 exchange type类型

#生产者import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='192.168.11.87'))channel = connection.channel()#fanout类型,对绑定该exchange的队列实行广播channel.exchange_declare(exchange='logs_fanout',                         type='fanout')# 随机创建队列result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 绑定exchangechannel.queue_bind(exchange='logs_fanout',                   queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r" % body)channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()#消费者import pika#发送方connection = pika.BlockingConnection(pika.ConnectionParameters(        host='192.168.11.87'))channel = connection.channel()channel.exchange_declare(exchange='logs_fanout',                         type='fanout')message = "what's the fuck"#设置exchange的名channel.basic_publish(exchange='logs_fanout',                      routing_key='',                      body=message)print(" [x] Sent %r" % message)connection.close()
1 #根据关键字发送指定队列 2 #生产者(发布者) 3 import pika 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5                                      host = '127.0.0.1')) 6 channel = connection.channel() 7  8 channel.exchange_declare(exchange='direct_logs_1', 9                          type='direct')  # 关键字发送到队列10 #对error关键字队列发送指令11 severity = 'error'12 message = '123'13 channel.basic_publish(exchange = 'direct_logs_1',14                        routing_key = severity,15                        body = message)16 print('[x] Sent %r:%r'%(severity,message))17 connection.close()18 #消费者(订阅者)19 import pika20 #消费者21 connection = pika.BlockingConnection(pika.ConnectionParameters(22                                      host = '127.0.0.1'))23 channel = connection.channel()24 channel.exchange_declare(exchange='direct_logs_1',25                          type = 'direct')#关键字发送到队列26 27 result = channel.queue_declare(exclusive=True)28 queue_name = result.method.queue29 serverities = ['error','info','warning']30 for severity in serverities:31     channel.queue_bind(exchange='direct_logs_1',32                        queue = queue_name,33                        routing_key = severity)34 def callback(ch,method,properties,body):35     print('[x] %r:%r'%(method.routing_key,body))36 37 channel.basic_consume(callback,38                       queue = queue_name,39                       no_ack = True)40 channel.start_consuming()
1 #实现消息不丢失接收方 2 import pika 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4')) 4 channel = connection.channel() 5 channel.queue_declare(queue = 'hello') 6  7 def callback(ch,method,properties,body): 8     print('redeived %s'%body) 9     import time10     time.sleep(10)11     print('ok')12     ch.basic_ack(delivery_tag= method.delivery_tag)13 #no_ack = False接收方接受完请求后发送给对方一个接受成功的信号,如果没收到mq会重新将任务放到队列14 channel.basic_consume(callback,queue = 'hello',no_ack=False)15 print(' Waiting for messages.To exit press CTRL+C')16 channel.start_consuming()
1 #发送方 2 #实现消息不丢失 3 import pika 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4')) 5 channel = connection.channel() 6 channel.queue_declare(queue = 'hello',durable = True) 7 channel.basic_publish(exchange = '',routing_key = 'hello world', 8                       properties = pika.BasicProperties( 9                           delivery_mode=2,10                       ))#发送方不丢失,发送方保持持久化11 print(' Waiting for messages.To exit press CTRL+C')12 channel.start_consuming()
1 #接收方 2 import pika 3  4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.100')) 5 channel = connection.channel() 6  7  8 channel.queue_declare(queue='hello', durable=True) 9 def callback(ch, method, properties, body):10     print(" [x] Received %r" % body)11     import time12     time.sleep(10)13     print 'ok'14     ch.basic_ack(delivery_tag = method.delivery_tag)15 channel.basic_consume(callback,16                       queue='hello',17                       no_ack=False)18 channel.start_consuming()

RabbitMQ队列中默认情况下,接收方从队列中获取消息是顺序的,例如:接收方1只从队列中获取奇数的任务,接收方2只从队列中获取偶数任务

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.100'))channel = connection.channel()channel.queue_declare(queue='hello')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    import time    time.sleep(10)    print 'ok'    ch.basic_ack(delivery_tag = method.delivery_tag)#表示队列不分奇偶分配,谁来取任务就给谁channel.basic_qos(prefetch_count=1)channel.basic_consume(callback,                      queue='hello',                      no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()

 

 

 

 

 

 

 

 

RabbitMQ会重新将该任务添加到队列中

转载于:https://www.cnblogs.com/liguangxu/p/5704390.html

你可能感兴趣的文章
一个请求
查看>>
map和reduce方法理解
查看>>
php调接口
查看>>
Delphi 实现多窗口任务栏显示图标《转》
查看>>
sql异常与函数
查看>>
Jquery Table 的基本操作
查看>>
eclips新建Maven Web项目
查看>>
Log4net使用
查看>>
python 安装psutil
查看>>
[已解决] git 重命名文件夹
查看>>
OpenShare新功能@2014年10月
查看>>
<转>浅谈 Boost.Asio 的多线程模型
查看>>
移动端H5页面的设计稿尺寸大小规范
查看>>
《你们都是魔鬼吗》第八次团队作业:第四天Alpha冲刺
查看>>
AppSettings和ConnectionStrings的辨析
查看>>
Python脚本的编写过程(例子--备份文件)
查看>>
hello,world
查看>>
HDU 5688 Problem D
查看>>
深入浅出scanf、getcha、gets、cin函数
查看>>
jQuery选择器总结2
查看>>