使用with打开文件的方式,是调用了上下文管理的功能
1 #打开文件的两种方法: 2 3 f = open('a.txt','r') 4 5 with open('a.txt','r') as f 6 7 实现使用with关闭socket 8 import contextlib 9 import socket10 11 @contextlib.contextmanage12 def Sock(ip,port):13 socket = socket.socket()14 socket.bind((ip,port))15 socket.listen(5)16 try:17 yield socket18 finally:19 socket.close()20 21 #执行Sock函数传入参数,执行到yield socket返回值给s,执行with语句体,执行finally后面的语句22 with Sock('127.0.0.1',8000) as s:23 print(s)
redis的发布订阅
class RedisHelper: def __init__(self): #调用类时自动连接redis self.__conn = redis.Redis(host='192.168.1.100') def public(self, msg, chan): self.__conn.publish(chan, msg) return True def subscribe(self, chan): pub = self.__conn.pubsub() pub.subscribe(chan) pub.parse_response() return pub#订阅者import s3obj = s3.RedisHelper()data = obj.subscribe('fm111.7')print(data.parse_response())#发布者import s3obj = s3.RedisHelper()obj.public('alex db', 'fm111.7')
RabbitMQ
1 #消费者 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1')) 5 channel = connection.channel()#创建对象 6 7 channel.queue_declare(queue = 'wocao') 8 def callback(ch,method,properties,body): 9 print("[x] Received %r"%body)10 11 channel.basic_consume(callback,queue = 'wocao',no_ack = True)12 print('[*] Waiting for messages. To exit press CTRL+C')13 channel.start_consuming()14 15 #生产者16 import pika17 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))18 channel = connection.channel()19 channel.queue_declare(queue = 'wocao')#指定一个队列,不存在此队列则创建20 channel.basic_publish(exchange = '',routing_key = 'wocao',body = 'hello world!')21 print("[x] Sent 'hello world!")22 connection.close()
exchange type类型
#生产者import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.87'))channel = connection.channel()#fanout类型,对绑定该exchange的队列实行广播channel.exchange_declare(exchange='logs_fanout', type='fanout')# 随机创建队列result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 绑定exchangechannel.queue_bind(exchange='logs_fanout', queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume(callback, queue=queue_name, no_ack=True)channel.start_consuming()#消费者import pika#发送方connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.87'))channel = connection.channel()channel.exchange_declare(exchange='logs_fanout', type='fanout')message = "what's the fuck"#设置exchange的名channel.basic_publish(exchange='logs_fanout', routing_key='', body=message)print(" [x] Sent %r" % message)connection.close()
1 #根据关键字发送指定队列 2 #生产者(发布者) 3 import pika 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host = '127.0.0.1')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs_1', 9 type='direct') # 关键字发送到队列10 #对error关键字队列发送指令11 severity = 'error'12 message = '123'13 channel.basic_publish(exchange = 'direct_logs_1',14 routing_key = severity,15 body = message)16 print('[x] Sent %r:%r'%(severity,message))17 connection.close()18 #消费者(订阅者)19 import pika20 #消费者21 connection = pika.BlockingConnection(pika.ConnectionParameters(22 host = '127.0.0.1'))23 channel = connection.channel()24 channel.exchange_declare(exchange='direct_logs_1',25 type = 'direct')#关键字发送到队列26 27 result = channel.queue_declare(exclusive=True)28 queue_name = result.method.queue29 serverities = ['error','info','warning']30 for severity in serverities:31 channel.queue_bind(exchange='direct_logs_1',32 queue = queue_name,33 routing_key = severity)34 def callback(ch,method,properties,body):35 print('[x] %r:%r'%(method.routing_key,body))36 37 channel.basic_consume(callback,38 queue = queue_name,39 no_ack = True)40 channel.start_consuming()
1 #实现消息不丢失接收方 2 import pika 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4')) 4 channel = connection.channel() 5 channel.queue_declare(queue = 'hello') 6 7 def callback(ch,method,properties,body): 8 print('redeived %s'%body) 9 import time10 time.sleep(10)11 print('ok')12 ch.basic_ack(delivery_tag= method.delivery_tag)13 #no_ack = False接收方接受完请求后发送给对方一个接受成功的信号,如果没收到mq会重新将任务放到队列14 channel.basic_consume(callback,queue = 'hello',no_ack=False)15 print(' Waiting for messages.To exit press CTRL+C')16 channel.start_consuming()
1 #发送方 2 #实现消息不丢失 3 import pika 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4')) 5 channel = connection.channel() 6 channel.queue_declare(queue = 'hello',durable = True) 7 channel.basic_publish(exchange = '',routing_key = 'hello world', 8 properties = pika.BasicProperties( 9 delivery_mode=2,10 ))#发送方不丢失,发送方保持持久化11 print(' Waiting for messages.To exit press CTRL+C')12 channel.start_consuming()
1 #接收方 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.100')) 5 channel = connection.channel() 6 7 8 channel.queue_declare(queue='hello', durable=True) 9 def callback(ch, method, properties, body):10 print(" [x] Received %r" % body)11 import time12 time.sleep(10)13 print 'ok'14 ch.basic_ack(delivery_tag = method.delivery_tag)15 channel.basic_consume(callback,16 queue='hello',17 no_ack=False)18 channel.start_consuming()
RabbitMQ队列中默认情况下,接收方从队列中获取消息是顺序的,例如:接收方1只从队列中获取奇数的任务,接收方2只从队列中获取偶数任务
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.100'))channel = connection.channel()channel.queue_declare(queue='hello')def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag)#表示队列不分奇偶分配,谁来取任务就给谁channel.basic_qos(prefetch_count=1)channel.basic_consume(callback, queue='hello', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
RabbitMQ会重新将该任务添加到队列中