本文共 5906 字,大约阅读时间需要 19 分钟。
#队列****************************************from multiprocessing import Process,Queueq = Queue(10) # 创建一个只能放10个值的队列try: q.get_nowait() # web qq 长轮询,这个不会阻塞,没有就会报错.所以会有下面的异常捕获,except: print('queue.Empty')# q.get() #这个会在这里阻塞等待for i in range(10): q.put(i)print(q.qsize())print(q.full())# q.put(1111) #这里会阻塞,因为满了,print('*'*10)print(q.empty())print(q.full())# 队列可以在创建的时候制定一个容量# 如果在程序运行的过程中,队列已经有了足够的数据,再put就会发生阻塞# 如果队列为空,在get就会发生阻塞# put# get# qsize 不准 #进队,出队过快,而这里获取的信息相对慢,所以"不准"# full 不准# empty 不准import timefrom multiprocessing import Process,Queuedef wahaha(q): print('hey',q.get()) q.put(2)if __name__ == '__main__': q = Queue() p = Process(target=wahaha,args=[q,]) p.start() q.put(1) time.sleep(1) #这里的时间设定0.1和1结果不同, #设定0.1,主进程put数据,然后就get了,导致子进程阻塞在了get,后面的put没机会运行, #设定1,主进程put数据,子进程get,然后子进程又put,子进程的get就有数据可以得到, print(q.get())# 在进程中使用队列可以完成双向通信import timeimport randomfrom multiprocessing import Process,Queue# # 生产者消费者模型# # 解决数据供需不平衡的情况# # 队列是进程安全的 内置了锁来保证队列中的每一个数据都不会被多个进程重复取def consumer(q,name): while True: food = q.get() if food == 'done':break #这个done,注意下!!! time.sleep(random.random()) print('%s吃了%s'%(name,food))def producer(q,name,food): for i in range(10): time.sleep(random.random()) print('%s生产了%s%s'%(name,food,i)) q.put('%s%s'%(food,i))if __name__ == '__main__': q = Queue() p1 = Process(target=producer,args=[q,'Egon','泔水']) p2 = Process(target=producer,args=[q,'Yuan','骨头鱼刺']) p1.start() p2.start() Process(target=consumer,args=[q,'alex']).start() Process(target=consumer,args=[q,'wusir']).start() p1.join() p2.join() q.put('done') #done要放在最后, q.put('done')import timeimport randomfrom multiprocessing import Process,JoinableQueuedef consumer(q,name): while True: food = q.get() time.sleep(random.random()) print('%s吃了%s'%(name,food)) q.task_done()def producer(q,name,food): for i in range(10): time.sleep(random.random()) print('%s生产了%s%s'%(name,food,i)) q.put('%s%s'%(food,i)) q.join() # 等到所有的数据都被taskdone才结束if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer,args=[q,'Egon','泔水']) p2 = Process(target=producer,args=[q,'Yuan','骨头鱼刺']) p1.start() p2.start() c1 = Process(target=consumer,args=[q,'alex']) c2 = Process(target=consumer,args=[q,'wusir']) c1.daemon = True c2.daemon = True c1.start() c2.start() p1.join() p2.join()# producer # put # 生产完全部的数据就没有其他工作了 # 在生产数据方 : 允许执行q.join # join会发起一个阻塞,直到所有当前队列中的数据都被消费# consumer # get 获取到数据 # 处理数据 # q.task_done() 告诉q,刚刚从q获取的数据已经处理完了# consumer每完成一个任务就会给q发送一个taskdone# producer在所有的数据都生产完之后会执行q.join()# producer会等待consumer消费完数据才结束# 主进程中对producer进程进行join# 主进程中的代码会等待producer执行完才结束# producer结束就意味着主进程代码的结束# consumer作为守护进程结束# consumer中queue中的所有数据被消费# producer join结束# 主进程的代码结束# consumer结束# 主进程结束#**********管道***********************# 管道# from multiprocessing import Pipe# left,right = Pipe()# left.send('1234')# print(right.recv()) #这段代码,说明了管道双向的特点,# left.send('1234') #但是由于管道不安全,所以现在用的很少,# print(right.recv())# from multiprocessing import Process, Pipe## def f(parent_conn,child_conn):# parent_conn.close() #不写close将不会引发EOFError# while True:# try:# print(child_conn.recv())# except EOFError:# child_conn.close()# break## if __name__ == '__main__':# parent_conn, child_conn = Pipe()# p = Process(target=f, args=(parent_conn,child_conn,))# p.start()# child_conn.close() #关闭管道的一端,并不会影响管道的整体,# parent_conn.send('hello')# parent_conn.send('hello')# parent_conn.send('hello')# parent_conn.close()# p.join()#数据共享# from multiprocessing import Manager,Process,Lock# def func(dic,lock):# # lock.acquire()# # dic['count'] = dic['count']-1# # lock.release()# with lock: # 上下文管理 :必须有一个开始动作 和 一个结束动作的时候# dic['count'] = dic['count'] - 1 #类似上面的加锁,## if __name__ == '__main__':# m = Manager()# lock = Lock()# dic = m.dict({'count':100}) #产生一个共享的数据,# p_lst = []# for i in range(100):# p = Process(target=func,args=[dic,lock])# p_lst.append(p)# p.start()# for p in p_lst:p.join()# print(dic)# 同一台机器上 : Queue# 在不同台机器上 :消息中间件#进程池# import time# import random# from multiprocessing import Pool# def func(i):# print('func%s' % i)# time.sleep(random.randint(1,1))# return i**2# if __name__ == '__main__':# p = Pool(5)# ret_l = []# for i in range(15):# # ret = p.apply(func=func,args=(i,)) # 同步调用# ret = p.apply_async(func=func,args=(i,))# 异步调用# ret_l.append(ret)# for ret in ret_l : print(ret.get()) # 主进程和所有的子进程异步了''' def apply(self, func, args=(), kwds={}): Equivalent of `func(*args, **kwds)`.等同于func() assert self._state == RUN return self.apply_async(func, args, kwds).get()'''#回调函数# import os# from urllib.request import urlopen# from multiprocessing import Pool# def get_url(url):# print('-->',url,os.getpid())# ret = urlopen(url)# content = ret.read()# return url## def call(url):# # 分析# print(url,os.getpid())## if __name__ == '__main__':# print(os.getpid())# l = [# 'http://www.baidu.com', # 5# 'http://www.sina.com',# 'http://www.sohu.com',# 'http://www.sogou.com',# 'http://www.qq.com',# 'http://www.bilibili.com', #0.1# ]# p = Pool(5) # count(cpu)+1# ret_l = []# for url in l:# ret = p.apply_async(func = get_url,args=[url,],callback=call)# ret_l.append(ret)# for ret in ret_l : ret.get()# 回调函数# 在进程池中,起了一个任务,这个任务对应的函数在执行完毕之后# 的返回值会自动作为参数返回给回调函数# 回调函数就根据返回值再进行相应的处理# 回调函数 是在主进程执行的,通过pid可以知道,
转载地址:http://lebcn.baihongyu.com/