如何把工作做到极致做到多进程的工作和游戏

查看: 25111|回复: 91
[官方教程]
Firefly 介绍文档!
阅读权限255
在线时间 小时
Firefly是免费、开源、稳定、快速扩展、能 “热更新”的分布式游戏服务器端框架,采用Python编写,基于Twisted框架开发。它包括了开发框架和数据库缓存服务等各种游戏服务器基础服务,节省大量游戏开发的工作时间,真正做到让使用者把精力放在游戏玩法逻辑上。用它可以搭建自定义的分布式架构,只需要修改相应的配置文件即可。
采用单线程多进程架构,支持自定义的分布式架构;方便的服务器扩展机制,可快速扩展服务器类型和数量;与客户端采用TCP长连接,无需考虑粘包等问题;封装数据缓存服务;可实现实时热更新数据以及游戏逻辑,客户端玩家无感觉;有几十个基础游戏玩法系统模块提供组装使用(v1.3.0提供);
123.jpg (37.79 KB, 下载次数: 1475)
15:30 上传
management, firefly 是个多进程、分布式的游戏服务器。因此各游戏server(进程)的管理和扩展是firefly很重要的部分,框架通过抽象使服务器的扩展非常容易。
Network,客户端连接通信、server进程间的通信等构成了整个游戏框架的脉络,所有游戏流程都构建在这个脉络上。与客户端的通信采用的是请求/回应式的,所以受到的客户端的请求,服务端都会给出相应的回应,服务端也能主动的推送,广播给客户端消息。这些请求是基于指令号的请求。(例如定义101为登陆指令)server进程之间的通信时采用的异步回调的方式,这样就减少了的进程间通过网络通信中的时间消耗。
Data, 数据处理是网游的重要部分。在网游有大量的数据需要存储,需要更新,这使得数据库的读写效率成为服务器的最大的性能瓶颈。firefly的db处理能够将数据库表中的数据缓存到memcache中并能以对象的形式进行调用相应的对象方法对数据进行操作。可以在不同的进程中通过实例化相同的名称的缓存实例,得到同步的数据。并能将缓存对象中的数据写回数据库中。
一个最基本的服务器就是一个在不停运行着的应用程序。在分布式游戏服务器中,我们需要的服务器具有的功能有,监听客户端的连接,监听其他服务进程的消息,连接其他的服务进程,有些需要有数据库连接和缓存服务。如下图
234.jpg (20.38 KB, 下载次数: 1482)
15:30 上传
net connect 做客户端连接,root监听其他服务进程消息,node连接其他服务进程,db数据库,cache缓存。是否需要监听客户端连接,是否监听其他服务进程消息等这是都是可以在config.json中进行配置。包括各个服务器的名称以及各个服务器之间的连接关系。这样就可以自定义出自己的分布式架构。
打赏列表共打赏了0次
还木有人打赏~
阅读权限120
在线时间 小时
感谢分享!请记住,果果是我的,鸡蛋,你是大家的。
阅读权限255
在线时间 小时
感谢分享!请记住,果果是我的,鸡蛋,你是大家的。
阅读权限120
在线时间 小时
先看看,虽然不是很多资料!看看再说吧,强烈期待api
阅读权限100
在线时间 小时
我去沙发没了
阅读权限120
在线时间 小时
牛B!好好研究研究
阅读权限120
在线时间 小时
感谢LZ,先回复
阅读权限20
在线时间 小时
此贴会火啊!!!此贴会火啊!!!
阅读权限130
在线时间 小时
阅读权限50
在线时间 小时
感谢分享,学习学习!
500元悬赏!H5游戏《愚公移山》游戏架设..
别说代码不全,有些代码,要会用git自己还原下,
特别声明(务必查看!)
1、HTML5游戏《小英
别嫌我啰嗦,打开帖子要不你就别看,要不你就看完,
微信支付10块,随机抢红包到账1-100块,这就是精
特别声明(务必查看!)
1、Unity3D暗黑风格《黑
| 联系方式
COPYRIGHT(C)2015 ZHONGQINGLONGTU NETWORK CO.LTD ALL RIGHTS RESERVED.ICP备号-4
北京中清龙图网络技术有限公司测试和Postgresql查询性能如何做到单线程和查询是用到两个数据库本身的过程还是用JDBC完成?具体需要做什么工作?--------------------------------------------------------------可以用多进程模拟。如果用批处理的话。看你怎么测。如果使用jdbc段,多线程确实可以模拟。一个线程一个连接。设计好标准的数据集。网上或许有下载的。记录好测试环境和测试各个阶段所花时间。------解决方案--------------------------------------------------------探讨数据库本身的查询 例如
select * from t1 order by rand() limit 10;肯定就是单线程的吗?它与用jdbc 的差别有多大------解决方案--------------------------------------------------------一般是利用程序来实现 多线程查询<专注网络游戏工作室赚钱项目攻略!
当前位置: &#62;&#62;
&#62;&#62; 正文
日 10:05:41&&分类: 游戏工作室建立
2014年九月初的某天,终于下定决定,要学习游戏辅助技术。原因有二,第一、我学过计算机,对编程很感兴趣;第二、除了搞游戏赚钱,也实在找不到什么赚钱的好路子,而经过这两个多月搞网络游戏工作室的经历,深感没有技术的苦恼,决心要自己写游戏辅助。于是,我破釜沉舟的开始学习辅助技术了。为什么叫破釜沉舟?因为这五个多月,我压根没正儿八经上过什么班,中间动摇过,到一个小公司干了一小段时间,后来坑爹老板告诉我还要押工资,果断走人。那时候还要瞒着父母、女朋友说我在上班,我父母早上很早都出去了,中午会回来,我睡到八九点起来,躲到小区里面看书,中午回来吃饭,下午就不出去了,在家编程学习。反正每天的学习时间从没少于过八个小时,有时候一天学习十多个小时,中间的艰辛不一一赘述了。再谈谈学习辅助的摸索过程,刚开始重学C语言,把C学完,开始学内存辅助,学了一个月,深感这条路线太漫长,不适合我这样破釜沉舟的人,于是转向模拟辅助。模拟辅助比内存辅助要容易不少, 经过无数次失败的煎熬后(此处省略一万字),我写的模拟脚本渐渐稳定起来,某些网页游戏用机械硬盘可以做到20开以上。后来我开始接游戏工作室的端游订单,有个订单客户开了很多机器,帮了他不少忙,我只赚了几千块,不过我没有感觉不平衡,一个多星期的艰辛劳动,能换来这样的回报,对我这样一个新手,已经很高兴了。下面谈谈我2015年的规划,也希望和更多像我这样的新手小作者交流思路。1、我希望找一两个信得过的朋友合作,最好是在一个城市可以经常见面的,组建一个小团队。因为我深感写辅助这个行业已经不适合单打独斗了,内存辅助不用说,现在连模拟辅助也最好搞个小团队,大家一起开发项目,按劳分配利润。这个团队至少要包括一个主搞按键的,一个主搞内存的,一个主要负责分析市场,找项目、外加测试辅助的。为什么这么说?第一,纯按键或者纯内存都不是最好的模式,很多游戏纯按键搞不好,必须用到一些内存技术,纯内存开发周期太长,而且很多地方用模拟方式也完全可以胜任,速度和稳定性都不比内存方式差,没有必要费时费力的去用内存方式,第二,找项目是开发项目的先决条件,有个懂辅助技术同时对市场非常熟悉、很擅长找项目的人,开发项目的成功率和速度会大大提高。目前我已经和一个朋友达成了初步的合作意向,在此欢迎新朋友来合作。2、2015年的学习计划,我的重心还是放在模拟方向,把模拟技术继续深造进修,因为我现在的水平还没有到完全转型学习内存技术的程度。内存方面如果有项目必须要用到的会去研究,同时会适当关注手游方面。3、最总要的一环,如何通过辅助赚钱?我觉得比较理想的方式还是直接和熟悉的游戏工作室合作,老实说,我对代理也没太多好感(这里肯定有人要骂人了),有些项目太冷门了的话,还是自己挂机最靠谱。说句很多小型游戏工作室听了不舒服的话,作者开发出来的辅助,如果利润可观,首选都是找熟悉的网游工作室合作甚至自己挂机赚钱,这样省心省力,还不容易暴露,被请去拨大蒜的可能性小了很多。项目搞得差不多了,才会考虑放到代理那里去卖。为什么这么说?很多项目一到市场上,很可能出钱方式很快就会被和谐,辅助自然也卖不了多久,本来能搞三个月的项目,半个月就歇菜了。作者又得找项目,费力不讨好。当然事情都不是绝对的,丢市场的不一定都是没利润的,自己分辨即可。游戏辅助技术通过多年和游戏公司的攻守,目前已经发展得很成熟了,早就深入到内核层,新手作者不要想着去追赶主流的技术水平,这样会导致自己很长时间没有收入,等学成了也许游戏公司又有新手段。如果没有好的团队和大牛带你,不如沉下心好好夯实基础,把简单事情做到极致,把脚本技术优化再优化。据我所知(未经严格考证),某些网页游戏用模拟技术E3 32G可以做到45开以上,我目前用固态硬盘e3 16G估计顶多做到26开左右能流畅运行,这中间的差距还是不小的。还有一点,好好吃透一个游戏是很重要的,比成天瞎找项目靠谱的多。网络游戏工作室是这样,作者亦然。我不喜欢和浮躁的人合作,玩了几天游戏,对游戏一知半解,没有完全吃透,就喊着找作者合作,这样的人搞一个项目会死一个。你自己没赚到钱,我收了你的钱也没意思,如果你钱多,要我写,只要你描述清楚功能需求,我觉得能写,就保证给你写出来,不会跑单,我以前是做市场的,深知信誉和朋友对一个人长期发展的重要性。见一叶落而知岁之将暮,人无远虑必有近忧,干什么事情,用长远的发展眼光来看问题都不会错的。最后祝大家羊年财源滚滚,身体倍棒!推荐阅读:
此文&&原创,转载请注明地址:
标签:&&&&&&
昵称(必填)
记住我,下次回复时不用重新输入个人信息
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。
Post: 21:32:00
楼主有项目合作,能不能留个联系方式。请联系我qq3401153
Post: 21:05:28
可不可以留个联系方式,有项目给你谈。请联系我qq3401153
Post: 15:38:58
希望你能帮帮我,我非常想学这技术
跟我联系好吗
Post: 15:37:54
要你写个辅助
请私聊我哦
价格慢慢谈!
Post: 0:35:52
LZ你好,我从14年开始偶然接触到一款按键小精灵,结果走上; 学习模拟辅助的道路,废寝忘食学习了半年,技术差不多入门了,按键认证也勉强过了6级,现在发现纯按键太鸡叻,正在学习内存辅助制作,希望能交个朋友互相交流,QQ
Post: 11:41:21
楼主你建个群得了
Post: 12:47:58
工作室寻求合作~QQ 要求不高- -!~有自己的方式
Post: 10:33:38
游戏工作室寻求合作,有机会合作,qq
Post: 9:32:09
今天来看才发现很多朋友的留言,抱歉,留联系方式的大部分已经加了。
Post: 17:17:02
游戏工作室寻求合作,有机会聊聊,qq
Post: 11:13:23
LZ,你好,我也是一个初出茅庐的人,希望你能帮帮我,我只往CF辅助方向发展,QQ
Post: 2:43:54
qq;,剑三 E3
32G内存,固态盘,能做到80开,有兴趣的可以联系下我
Post: 11:41:33
LZ,我想学习怎么去做一个辅助,却不知道,学那些开始入门,也不知道具体要怎么去走,想LZ给点意见,指导一下方向,扣扣,.谢谢诶
Post: 15:00:39
网上找教程,很多的,要选自己适合的路线。
Post: 22:22:49
楼主好,我是2000年入行计算机 参加工作8年,以前是连锁网吧技术总管,参加的工作都与计算机相关,08年婚后辞职在家两年发展创业,做的外单,数据录入 在网站这块有丰富的经验,曾经两年发展过IDC,都是中途放弃,现在说到技术也是落伍了,2010年入行游戏,目前停了快半年没收入,机器有几十台E3,近期缩减了规模,省下一部分机器继续,开发主要用VB, 制作脚本有几天的经验,多线程暂时不会,用的是多进程解决同步操作,其它基本可以边学边实现,现在苦恼,一边要学习`一边要赚钱,没办法往一个方面发展,希望有机会认识,有机会一起发展,扣扣
Post: 1:39:48
我去年也是破釜沉舟1年里把C语言和TC学完学完C后发现自己没办法写挂 为了马上能见成效就改学TC现在模拟类脚本做了几个成品 但还感觉到自己有很多不足希望能与你交流下经验Q
Post: 17:54:20
玩内存的人表示压力很大
Post: 16:34:09
楼主有单接吗
Post: 14:19:16
QQ4641092,按键,内存,驱动都懂,现在在研究脱机,想技术交流加我QQ
Post: 20:25:42
Post: 2:38:46
先顶下作者,我的学习路径和你几乎一样,对你在文中对辅助的一些看法也很有同感。希望能交个朋友,即使不在一个城市也可以多多交流学习
Post: 1:44:23
编写模拟类脚本有三年实战 看好楼主的想法 希望能交个朋友
Post: 22:29:19
楼主联系下我,看看能合作吗,不行也可交流。
Post: 22:25:43
联系下我谢谢
Post: 18:03:57
求加qq好友,希望能联系下,如果有合作的话能一起干
Post: 0:56:50
q 技术和作者联系我 工作室别加我 记得申请好友备注下
Post: 0:54:41
联系我 一起开发项目
Post: 20:10:14
2015注定又是悲剧的一年!
Post: 22:42:52
挺有想法的一个人,悔我当初没坚持住,要不然现在也是小牛了,哎,江山带有人才出。
Post: 22:28:58
于是,我破釜沉舟的开始学习辅助技术了。
Post: 19:39:30
这行最少得能用个按键精灵。不然没法干。内存比较难,工作量也大。一个人做不了
【相关文章】 18:52:31 17:19:55 17:41:20 5:11:1 5:15:2 22:11:14 17:57:48 18:52:41 20:16:28 17:37:25
站内搜索_游戏工作室项目
网游工作室_猜你喜欢
网游工作室_最热文章
备案号: 辽ICP备号-1
【NBE游戏工作室】我们一直被采集,但从未被超越!和NBE一起做吧。痛,并快乐着!!!您好,欢迎光临崔庆才的博客,祝大家新年快乐,鸡年大吉!
> Python爬虫进阶六之多进程的用法
在上一节中介绍了thread多线程库。python中的多线程其实并不是真正的多线程,并不能做到充分利用多核CPU资源。
如果想要充分利用,在python中大部分情况需要使用多进程,那么这个包就叫做 multiprocessing。
借助它,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
那么本节要介绍的内容有:
在multiprocessing中,每一个进程都用一个Process类来表示。首先看下它的API
Process([group [, target [, name [, args [, kwargs]]]]])
Process([group [, target [, name [, args [, kwargs]]]]])
target表示调用对象,你可以传入方法的名字
args表示被调用对象的位置参数元组,比如target是函数a,他有两个参数m,n,那么args就传入(m, n)即可
kwargs表示调用对象的字典
name是别名,相当于给这个进程取一个名字
group分组,实际上不使用
我们先用一个实例来感受一下:
import multiprocessing
def process(num):
print 'Process:', num
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(i,))
import multiprocessing&def process(num):&&&&print 'Process:', num&if __name__ == '__main__':&&&&for i in range(5):&&&&&&&&p = multiprocessing.Process(target=process, args=(i,))&&&&&&&&p.start()
最简单的创建Process的过程如上所示,target传入函数名,args是函数的参数,是元组的形式,如果只有一个参数,那就是长度为1的元组。
然后调用start()方法即可启动多个进程了。
另外你还可以通过 cpu_count() 方法还有 active_children() 方法获取当前机器的 CPU 核心数量以及得到目前所有的运行的进程。
通过一个实例来感受一下:
import multiprocessing
import time
def process(num):
time.sleep(num)
print 'Process:', num
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(i,))
print('CPU number:' + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print('Child process name: ' + p.name + ' id: ' + str(p.pid))
print('Process Ended')
1234567891011121314151617
import multiprocessingimport time&def process(num):&&&&time.sleep(num)&&&&print 'Process:', num&if __name__ == '__main__':&&&&for i in range(5):&&&&&&&&p = multiprocessing.Process(target=process, args=(i,))&&&&&&&&p.start()&&&&&print('CPU number:' + str(multiprocessing.cpu_count()))&&&&for p in multiprocessing.active_children():&&&&&&&&print('Child process name: ' + p.name + ' id: ' + str(p.pid))&&&&&print('Process Ended')
运行结果:
Process: 0
CPU number:8
Child process name: Process-2 id: 9641
Child process name: Process-4 id: 9643
Child process name: Process-5 id: 9644
Child process name: Process-3 id: 9642
Process Ended
Process: 1
Process: 2
Process: 3
Process: 4
1234567891011
Process: 0CPU number:8Child process name: Process-2 id: 9641Child process name: Process-4 id: 9643Child process name: Process-5 id: 9644Child process name: Process-3 id: 9642Process EndedProcess: 1Process: 2Process: 3Process: 4
另外你还可以继承Process类,自定义进程类,实现run方法即可。
用一个实例来感受一下:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
12345678910111213141516171819
from multiprocessing import Processimport time&&class MyProcess(Process):&&&&def __init__(self, loop):&&&&&&&&Process.__init__(self)&&&&&&&&self.loop = loop&&&&&def run(self):&&&&&&&&for count in range(self.loop):&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))&&if __name__ == '__main__':&&&&for i in range(2, 5):&&&&&&&&p = MyProcess(i)&&&&&&&&p.start()
在上面的例子中,我们继承了 Process 这个类,然后实现了run方法。打印出来了进程号和参数。
运行结果:
Pid: 28116 LoopCount: 0
Pid: 28117 LoopCount: 0
Pid: 28118 LoopCount: 0
Pid: 28116 LoopCount: 1
Pid: 28117 LoopCount: 1
Pid: 28118 LoopCount: 1
Pid: 28117 LoopCount: 2
Pid: 28118 LoopCount: 2
Pid: 28118 LoopCount: 3
Pid: 28116 LoopCount: 0Pid: 28117 LoopCount: 0Pid: 28118 LoopCount: 0Pid: 28116 LoopCount: 1Pid: 28117 LoopCount: 1Pid: 28118 LoopCount: 1Pid: 28117 LoopCount: 2Pid: 28118 LoopCount: 2Pid: 28118 LoopCount: 3
可以看到,三个进程分别打印出了2、3、4条结果。
我们可以把一些方法独立的写在每个类里封装好,等用的时候直接初始化一个类运行即可。
在这里介绍一个属性,叫做deamon。每个线程都可以单独设置它的属性,如果设置为True,当父进程结束后,子进程会自动被终止。
用一个实例来感受一下,还是原来的例子,增加了deamon属性:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
print 'Main process Ended!'
1234567891011121314151617181920212223
from multiprocessing import Processimport time&&class MyProcess(Process):&&&&def __init__(self, loop):&&&&&&&&Process.__init__(self)&&&&&&&&self.loop = loop&&&&&def run(self):&&&&&&&&for count in range(self.loop):&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))&&if __name__ == '__main__':&&&&for i in range(2, 5):&&&&&&&&p = MyProcess(i)&&&&&&&&p.daemon = True&&&&&&&&p.start()&&&&&&print 'Main process Ended!'
在这里,调用的时候增加了设置deamon,最后的主进程(即父进程)打印输出了一句话。
运行结果:
Main process Ended!
Main process Ended!
结果很简单,因为主进程没有做任何事情,直接输出一句话结束,所以在这时也直接终止了子进程的运行。
这样可以有效防止无控制地生成子进程。如果这样写了,你在关闭这个主程序运行时,就无需额外担心子进程有没有被关闭了。
不过这样并不是我们想要达到的效果呀,能不能让所有子进程都执行完了然后再结束呢?那当然是可以的,只需要加入join()方法即可。
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
print 'Main process Ended!'
123456789101112131415161718192021222324
from multiprocessing import Processimport time&&class MyProcess(Process):&&&&def __init__(self, loop):&&&&&&&&Process.__init__(self)&&&&&&&&self.loop = loop&&&&&def run(self):&&&&&&&&for count in range(self.loop):&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))&&if __name__ == '__main__':&&&&for i in range(2, 5):&&&&&&&&p = MyProcess(i)&&&&&&&&p.daemon = True&&&&&&&&p.start()&&&&&&&&p.join()&&&&&&print 'Main process Ended!'
在这里,每个子进程都调用了join()方法,这样父进程(主进程)就会等待子进程执行完毕。
运行结果:
Pid: 29902 LoopCount: 0
Pid: 29902 LoopCount: 1
Pid: 29905 LoopCount: 0
Pid: 29905 LoopCount: 1
Pid: 29905 LoopCount: 2
Pid: 29912 LoopCount: 0
Pid: 29912 LoopCount: 1
Pid: 29912 LoopCount: 2
Pid: 29912 LoopCount: 3
Main process Ended!
12345678910
Pid: 29902 LoopCount: 0Pid: 29902 LoopCount: 1Pid: 29905 LoopCount: 0Pid: 29905 LoopCount: 1Pid: 29905 LoopCount: 2Pid: 29912 LoopCount: 0Pid: 29912 LoopCount: 1Pid: 29912 LoopCount: 2Pid: 29912 LoopCount: 3Main process Ended!
发现所有子进程都执行完毕之后,父进程最后打印出了结束的结果。
在上面的一些小实例中,你可能会遇到如下的运行结果:
什么问题?有的输出错位了。这是由于并行导致的,两个进程同时进行了输出,结果第一个进程的换行没有来得及输出,第二个进程就输出了结果。所以导致这种排版的问题。
那这归根结底是因为线程同时资源(输出操作)而导致的。
那怎么来避免这种问题?那自然是在某一时间,只能一个进程输出,其他进程等待。等刚才那个进程输出完毕之后,另一个进程再进行输出。这种现象就叫做“互斥”。
我们可以通过 Lock 来实现,在一个进程输出时,加锁,其他进程等待。等此进程执行结束后,释放锁,其他进程可以进行输出。
我们现用一个实例来感受一下:
from multiprocessing import Process, Lock
import time
class MyProcess(Process):
def __init__(self, loop, lock):
Process.__init__(self)
self.loop = loop
self.lock = lock
def run(self):
for count in range(self.loop):
time.sleep(0.1)
#self.lock.acquire()
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
#self.lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10, 15):
p = MyProcess(i, lock)
12345678910111213141516171819202122
from multiprocessing import Process, Lockimport time&&class MyProcess(Process):&&&&def __init__(self, loop, lock):&&&&&&&&Process.__init__(self)&&&&&&&&self.loop = loop&&&&&&&&self.lock = lock&&&&&def run(self):&&&&&&&&for count in range(self.loop):&&&&&&&&&&&&time.sleep(0.1)&&&&&&&&&&&&#self.lock.acquire()&&&&&&&&&&&&print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))&&&&&&&&&&&&#self.lock.release()&if __name__ == '__main__':&&&&lock = Lock()&&&&for i in range(10, 15):&&&&&&&&p = MyProcess(i, lock)&&&&&&&&p.start()
首先看一下不加锁的输出结果:
Pid: 45755 LoopCount: 0
Pid: 45756 LoopCount: 0
Pid: 45757 LoopCount: 0
Pid: 45758 LoopCount: 0
Pid: 45759 LoopCount: 0
Pid: 45755 LoopCount: 1
Pid: 45756 LoopCount: 1
Pid: 45757 LoopCount: 1
Pid: 45758 LoopCount: 1
Pid: 45759 LoopCount: 1
Pid: 45755 LoopCount: 2Pid: 45756 LoopCount: 2
Pid: 45757 LoopCount: 2
Pid: 45758 LoopCount: 2
Pid: 45759 LoopCount: 2
Pid: 45756 LoopCount: 3
Pid: 45755 LoopCount: 3
Pid: 45757 LoopCount: 3
Pid: 45758 LoopCount: 3
Pid: 45759 LoopCount: 3
Pid: 45755 LoopCount: 4
Pid: 45756 LoopCount: 4
Pid: 45757 LoopCount: 4
Pid: 45759 LoopCount: 4
Pid: 45758 LoopCount: 4
Pid: 45756 LoopCount: 5
Pid: 45755 LoopCount: 5
Pid: 45757 LoopCount: 5
Pid: 45759 LoopCount: 5
Pid: 45758 LoopCount: 5
Pid: 45756 LoopCount: 6Pid: 45755 LoopCount: 6
Pid: 45757 LoopCount: 6
Pid: 45759 LoopCount: 6
Pid: 45758 LoopCount: 6
Pid: 45755 LoopCount: 7Pid: 45756 LoopCount: 7
Pid: 45757 LoopCount: 7
Pid: 45758 LoopCount: 7
Pid: 45759 LoopCount: 7
Pid: 45756 LoopCount: 8Pid: 45755 LoopCount: 8
Pid: 45757 LoopCount: 8
Pid: 45758 LoopCount: 8Pid: 45759 LoopCount: 8
Pid: 45755 LoopCount: 9
Pid: 45756 LoopCount: 9
Pid: 45757 LoopCount: 9
Pid: 45758 LoopCount: 9
Pid: 45759 LoopCount: 9
Pid: 45756 LoopCount: 10
Pid: 45757 LoopCount: 10
Pid: 45758 LoopCount: 10
Pid: 45759 LoopCount: 10
Pid: 45757 LoopCount: 11
Pid: 45758 LoopCount: 11
Pid: 45759 LoopCount: 11
Pid: 45758 LoopCount: 12
Pid: 45759 LoopCount: 12
Pid: 45759 LoopCount: 13
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
Pid: 45755 LoopCount: 0Pid: 45756 LoopCount: 0Pid: 45757 LoopCount: 0Pid: 45758 LoopCount: 0Pid: 45759 LoopCount: 0Pid: 45755 LoopCount: 1Pid: 45756 LoopCount: 1Pid: 45757 LoopCount: 1Pid: 45758 LoopCount: 1Pid: 45759 LoopCount: 1Pid: 45755 LoopCount: 2Pid: 45756 LoopCount: 2&Pid: 45757 LoopCount: 2Pid: 45758 LoopCount: 2Pid: 45759 LoopCount: 2Pid: 45756 LoopCount: 3Pid: 45755 LoopCount: 3Pid: 45757 LoopCount: 3Pid: 45758 LoopCount: 3Pid: 45759 LoopCount: 3Pid: 45755 LoopCount: 4Pid: 45756 LoopCount: 4Pid: 45757 LoopCount: 4Pid: 45759 LoopCount: 4Pid: 45758 LoopCount: 4Pid: 45756 LoopCount: 5Pid: 45755 LoopCount: 5Pid: 45757 LoopCount: 5Pid: 45759 LoopCount: 5Pid: 45758 LoopCount: 5Pid: 45756 LoopCount: 6Pid: 45755 LoopCount: 6&Pid: 45757 LoopCount: 6Pid: 45759 LoopCount: 6Pid: 45758 LoopCount: 6Pid: 45755 LoopCount: 7Pid: 45756 LoopCount: 7&Pid: 45757 LoopCount: 7Pid: 45758 LoopCount: 7Pid: 45759 LoopCount: 7Pid: 45756 LoopCount: 8Pid: 45755 LoopCount: 8&Pid: 45757 LoopCount: 8Pid: 45758 LoopCount: 8Pid: 45759 LoopCount: 8&Pid: 45755 LoopCount: 9Pid: 45756 LoopCount: 9Pid: 45757 LoopCount: 9Pid: 45758 LoopCount: 9Pid: 45759 LoopCount: 9Pid: 45756 LoopCount: 10Pid: 45757 LoopCount: 10Pid: 45758 LoopCount: 10Pid: 45759 LoopCount: 10Pid: 45757 LoopCount: 11Pid: 45758 LoopCount: 11Pid: 45759 LoopCount: 11Pid: 45758 LoopCount: 12Pid: 45759 LoopCount: 12Pid: 45759 LoopCount: 13
可以看到有些输出已经造成了影响。
然后我们对其加锁:
from multiprocessing import Process, Lock
import time
class MyProcess(Process):
def __init__(self, loop, lock):
Process.__init__(self)
self.loop = loop
self.lock = lock
def run(self):
for count in range(self.loop):
time.sleep(0.1)
self.lock.acquire()
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
self.lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10, 15):
p = MyProcess(i, lock)
12345678910111213141516171819202122
from multiprocessing import Process, Lockimport time&&class MyProcess(Process):&&&&def __init__(self, loop, lock):&&&&&&&&Process.__init__(self)&&&&&&&&self.loop = loop&&&&&&&&self.lock = lock&&&&&def run(self):&&&&&&&&for count in range(self.loop):&&&&&&&&&&&&time.sleep(0.1)&&&&&&&&&&&&self.lock.acquire()&&&&&&&&&&&&print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))&&&&&&&&&&&&self.lock.release()&if __name__ == '__main__':&&&&lock = Lock()&&&&for i in range(10, 15):&&&&&&&&p = MyProcess(i, lock)&&&&&&&&p.start()
我们在print方法的前后分别添加了获得锁和释放锁的操作。这样就能保证在同一时间只有一个print操作。
看一下运行结果:
Pid: 45889 LoopCount: 0
Pid: 45890 LoopCount: 0
Pid: 45891 LoopCount: 0
Pid: 45892 LoopCount: 0
Pid: 45893 LoopCount: 0
Pid: 45889 LoopCount: 1
Pid: 45890 LoopCount: 1
Pid: 45891 LoopCount: 1
Pid: 45892 LoopCount: 1
Pid: 45893 LoopCount: 1
Pid: 45889 LoopCount: 2
Pid: 45890 LoopCount: 2
Pid: 45891 LoopCount: 2
Pid: 45892 LoopCount: 2
Pid: 45893 LoopCount: 2
Pid: 45889 LoopCount: 3
Pid: 45890 LoopCount: 3
Pid: 45891 LoopCount: 3
Pid: 45892 LoopCount: 3
Pid: 45893 LoopCount: 3
Pid: 45889 LoopCount: 4
Pid: 45890 LoopCount: 4
Pid: 45891 LoopCount: 4
Pid: 45892 LoopCount: 4
Pid: 45893 LoopCount: 4
Pid: 45889 LoopCount: 5
Pid: 45890 LoopCount: 5
Pid: 45891 LoopCount: 5
Pid: 45892 LoopCount: 5
Pid: 45893 LoopCount: 5
Pid: 45889 LoopCount: 6
Pid: 45890 LoopCount: 6
Pid: 45891 LoopCount: 6
Pid: 45893 LoopCount: 6
Pid: 45892 LoopCount: 6
Pid: 45889 LoopCount: 7
Pid: 45890 LoopCount: 7
Pid: 45891 LoopCount: 7
Pid: 45892 LoopCount: 7
Pid: 45893 LoopCount: 7
Pid: 45889 LoopCount: 8
Pid: 45890 LoopCount: 8
Pid: 45891 LoopCount: 8
Pid: 45892 LoopCount: 8
Pid: 45893 LoopCount: 8
Pid: 45889 LoopCount: 9
Pid: 45890 LoopCount: 9
Pid: 45891 LoopCount: 9
Pid: 45892 LoopCount: 9
Pid: 45893 LoopCount: 9
Pid: 45890 LoopCount: 10
Pid: 45891 LoopCount: 10
Pid: 45892 LoopCount: 10
Pid: 45893 LoopCount: 10
Pid: 45891 LoopCount: 11
Pid: 45892 LoopCount: 11
Pid: 45893 LoopCount: 11
Pid: 45893 LoopCount: 12
Pid: 45892 LoopCount: 12
Pid: 45893 LoopCount: 13
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
Pid: 45889 LoopCount: 0Pid: 45890 LoopCount: 0Pid: 45891 LoopCount: 0Pid: 45892 LoopCount: 0Pid: 45893 LoopCount: 0Pid: 45889 LoopCount: 1Pid: 45890 LoopCount: 1Pid: 45891 LoopCount: 1Pid: 45892 LoopCount: 1Pid: 45893 LoopCount: 1Pid: 45889 LoopCount: 2Pid: 45890 LoopCount: 2Pid: 45891 LoopCount: 2Pid: 45892 LoopCount: 2Pid: 45893 LoopCount: 2Pid: 45889 LoopCount: 3Pid: 45890 LoopCount: 3Pid: 45891 LoopCount: 3Pid: 45892 LoopCount: 3Pid: 45893 LoopCount: 3Pid: 45889 LoopCount: 4Pid: 45890 LoopCount: 4Pid: 45891 LoopCount: 4Pid: 45892 LoopCount: 4Pid: 45893 LoopCount: 4Pid: 45889 LoopCount: 5Pid: 45890 LoopCount: 5Pid: 45891 LoopCount: 5Pid: 45892 LoopCount: 5Pid: 45893 LoopCount: 5Pid: 45889 LoopCount: 6Pid: 45890 LoopCount: 6Pid: 45891 LoopCount: 6Pid: 45893 LoopCount: 6Pid: 45892 LoopCount: 6Pid: 45889 LoopCount: 7Pid: 45890 LoopCount: 7Pid: 45891 LoopCount: 7Pid: 45892 LoopCount: 7Pid: 45893 LoopCount: 7Pid: 45889 LoopCount: 8Pid: 45890 LoopCount: 8Pid: 45891 LoopCount: 8Pid: 45892 LoopCount: 8Pid: 45893 LoopCount: 8Pid: 45889 LoopCount: 9Pid: 45890 LoopCount: 9Pid: 45891 LoopCount: 9Pid: 45892 LoopCount: 9Pid: 45893 LoopCount: 9Pid: 45890 LoopCount: 10Pid: 45891 LoopCount: 10Pid: 45892 LoopCount: 10Pid: 45893 LoopCount: 10Pid: 45891 LoopCount: 11Pid: 45892 LoopCount: 11Pid: 45893 LoopCount: 11Pid: 45893 LoopCount: 12Pid: 45892 LoopCount: 12Pid: 45893 LoopCount: 13
嗯,一切都没问题了。
所以在访问临界资源时,使用Lock就可以避免进程同时占用资源而导致的一些问题。
信号量,是在进程同步过程中一个比较重要的角色。可以控制临界资源的数量,保证各个进程之间的互斥和同步。
如果你学过操作系统,那么一定对这方面非常了解,如果你还不了解信号量是什么,可以参考
来了解一下它是做什么的。
那么接下来我们就用一个实例来演示一下进程之间利用Semaphore做到同步和互斥,以及控制临界资源数量。
from multiprocessing import Process, Semaphore, Lock, Queue
import time
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
class Consumer(Process):
def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
buffer.get()
print('Consumer pop an element')
time.sleep(1)
lock.release()
empty.release()
class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
buffer.put(1)
print('Producer append an element')
time.sleep(1)
lock.release()
full.release()
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
print 'Ended!'
1234567891011121314151617181920212223242526272829303132333435363738394041424344
from multiprocessing import Process, Semaphore, Lock, Queueimport time&buffer = Queue(10)empty = Semaphore(2)full = Semaphore(0)lock = Lock()&class Consumer(Process):&&&&&def run(self):&&&&&&&&global buffer, empty, full, lock&&&&&&&&while True:&&&&&&&&&&&&full.acquire()&&&&&&&&&&&&lock.acquire()&&&&&&&&&&&&buffer.get()&&&&&&&&&&&&print('Consumer pop an element')&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&lock.release()&&&&&&&&&&&&empty.release()&&class Producer(Process):&&&&def run(self):&&&&&&&&global buffer, empty, full, lock&&&&&&&&while True:&&&&&&&&&&&&empty.acquire()&&&&&&&&&&&&lock.acquire()&&&&&&&&&&&&buffer.put(1)&&&&&&&&&&&&print('Producer append an element')&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&lock.release()&&&&&&&&&&&&full.release()&&if __name__ == '__main__':&&&&p = Producer()&&&&c = Consumer()&&&&p.daemon = c.daemon = True&&&&p.start()&&&&c.start()&&&&p.join()&&&&c.join()&&&&print 'Ended!'
如上代码实现了注明的生产者和消费者问题,定义了两个进程类,一个是消费者,一个是生产者。
定义了一个共享队列,利用了Queue数据结构,然后定义了两个信号量,一个代表缓冲区空余数,一个表示缓冲区占用数。
生产者Producer使用empty.acquire()方法来占用一个缓冲区位置,然后缓冲区空闲区大小减小1,接下来进行加锁,对缓冲区进行操作。然后释放锁,然后让代表占用的缓冲区位置数量+1,消费者则相反。
运行结果如下:
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
1234567891011121314
Producer append an elementProducer append an elementConsumer pop an elementConsumer pop an elementProducer append an elementProducer append an elementConsumer pop an elementConsumer pop an elementProducer append an elementProducer append an elementConsumer pop an elementConsumer pop an elementProducer append an elementProducer append an element
可以发现两个进程在交替运行,生产者先放入缓冲区物品,然后消费者取出,不停地进行循环。
通过上面的例子来体会一下信号量的用法。
在上面的例子中我们使用了Queue,可以作为进程通信的共享队列使用。
在上面的程序中,如果你把Queue换成普通的list,是完全起不到效果的。即使在一个进程中改变了这个list,在另一个进程也不能获取到它的状态。
因此进程间的通信,队列需要用Queue。当然这里的队列指的是 multiprocessing.Queue
依然是用上面那个例子,我们一个进程向队列中放入数据,然后另一个进程取出数据。
from multiprocessing import Process, Semaphore, Lock, Queue
import time
from random import random
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
class Consumer(Process):
def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
print 'Consumer get', buffer.get()
time.sleep(1)
lock.release()
empty.release()
class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
num = random()
print 'Producer put ', num
buffer.put(num)
time.sleep(1)
lock.release()
full.release()
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
print 'Ended!'
123456789101112131415161718192021222324252627282930313233343536373839404142434445
from multiprocessing import Process, Semaphore, Lock, Queueimport timefrom random import random&buffer = Queue(10)empty = Semaphore(2)full = Semaphore(0)lock = Lock()&class Consumer(Process):&&&&&def run(self):&&&&&&&&global buffer, empty, full, lock&&&&&&&&while True:&&&&&&&&&&&&full.acquire()&&&&&&&&&&&&lock.acquire()&&&&&&&&&&&&print 'Consumer get', buffer.get()&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&lock.release()&&&&&&&&&&&&empty.release()&&class Producer(Process):&&&&def run(self):&&&&&&&&global buffer, empty, full, lock&&&&&&&&while True:&&&&&&&&&&&&empty.acquire()&&&&&&&&&&&&lock.acquire()&&&&&&&&&&&&num = random()&&&&&&&&&&&&print 'Producer put ', num&&&&&&&&&&&&buffer.put(num)&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&lock.release()&&&&&&&&&&&&full.release()&&if __name__ == '__main__':&&&&p = Producer()&&&&c = Consumer()&&&&p.daemon = c.daemon = True&&&&p.start()&&&&c.start()&&&&p.join()&&&&c.join()&&&&print 'Ended!'
运行结果:
Producer put
Producer put
Consumer get 0.
Consumer get 0.
Producer put
Producer put
Consumer get 0.
Consumer get 0.
Producer put&&0.Producer put&&0.Consumer get 0.Consumer get 0.Producer put&&0.Producer put&&0.Consumer get 0.Consumer get 0.
可以看到生产者放入队列中数据,然后消费者将数据取出来。
get方法有两个参数,blocked和timeout,意思为阻塞和超时时间。默认blocked是true,即阻塞式。
当一个队列为空的时候如果再用get取则会阻塞,所以这时候就需要吧blocked设置为false,即非阻塞式,实际上它就会调用get_nowait()方法,此时还需要设置一个超时时间,在这么长的时间内还没有取到队列元素,那就抛出Queue.Empty异常。
当一个队列为满的时候如果再用put放则会阻塞,所以这时候就需要吧blocked设置为false,即非阻塞式,实际上它就会调用put_nowait()方法,此时还需要设置一个超时时间,在这么长的时间内还没有放进去元素,那就抛出Queue.Full异常。
另外队列中常用的方法
Queue.qsize() 返回队列的大小 ,不过在 Mac OS 上没法运行。
def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
return self._maxsize &#8211; self._sem._semlock._get_value()
Queue.empty() 如果队列为空,返回True, 反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 阻塞式写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
管道,顾名思义,一端发一端收。
Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
用一个实例来感受一下:
from multiprocessing import Process, Pipe
class Consumer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
self.pipe.send('Consumer Words')
print 'Consumer Received:', self.pipe.recv()
class Producer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
print 'Producer Received:', self.pipe.recv()
self.pipe.send('Producer Words')
if __name__ == '__main__':
pipe = Pipe()
p = Producer(pipe[0])
c = Consumer(pipe[1])
p.daemon = c.daemon = True
print 'Ended!'
123456789101112131415161718192021222324252627282930313233
from multiprocessing import Process, Pipe&&class Consumer(Process):&&&&def __init__(self, pipe):&&&&&&&&Process.__init__(self)&&&&&&&&self.pipe = pipe&&&&&def run(self):&&&&&&&&self.pipe.send('Consumer Words')&&&&&&&&print 'Consumer Received:', self.pipe.recv()&&class Producer(Process):&&&&def __init__(self, pipe):&&&&&&&&Process.__init__(self)&&&&&&&&self.pipe = pipe&&&&&def run(self):&&&&&&&&print 'Producer Received:', self.pipe.recv()&&&&&&&&self.pipe.send('Producer Words')&&if __name__ == '__main__':&&&&pipe = Pipe()&&&&p = Producer(pipe[0])&&&&c = Consumer(pipe[1])&&&&p.daemon = c.daemon = True&&&&p.start()&&&&c.start()&&&&p.join()&&&&c.join()&&&&print 'Ended!'
在这里声明了一个默认为双向的管道,然后将管道的两端分别传给两个进程。两个进程互相收发。观察一下结果:
Producer Received: Consumer Words
Consumer Received: Producer Words
Producer Received: Consumer WordsConsumer Received: Producer WordsEnded!
以上是对pipe的简单介绍。
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
在这里需要了解阻塞和非阻塞的概念。
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。
阻塞即要等到回调结果出来,在有结果之前,当前进程会被挂起。
Pool的用法有阻塞和非阻塞两种方式。非阻塞即为添加进程后,不一定非要等到改进程执行完就添加其他进程运行,阻塞则相反。
现用一个实例感受一下非阻塞的用法:
from multiprocessing import Lock, Pool
import time
def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
pool.apply_async(function, (i,))
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
12345678910111213141516171819
from multiprocessing import Lock, Poolimport time&&def function(index):&&&&print 'Start process: ', index&&&&time.sleep(3)&&&&print 'End process', index&&if __name__ == '__main__':&&&&pool = Pool(processes=3)&&&&for i in xrange(4):&&&&&&&&pool.apply_async(function, (i,))&&&&&print "Started processes"&&&&pool.close()&&&&pool.join()&&&&print "Subprocess done."
在这里利用了apply_async方法,即非阻塞。
运行结果:
Started processes
Start process: Start process:
Start process:
End processEnd process 0
Start process:
End process 2
End process 3
Subprocess done.
12345678910
Started processesStart process: Start process:&&0 1Start process:&&2End processEnd process 0 1Start process:&&3End process 2End process 3Subprocess done.
可以发现在这里添加三个进程进去后,立马就开始执行,不用非要等到某个进程结束后再添加新的进程进去。
下面再看看阻塞的用法:
from multiprocessing import Lock, Pool
import time
def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
pool.apply(function, (i,))
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
12345678910111213141516171819
from multiprocessing import Lock, Poolimport time&&def function(index):&&&&print 'Start process: ', index&&&&time.sleep(3)&&&&print 'End process', index&&if __name__ == '__main__':&&&&pool = Pool(processes=3)&&&&for i in xrange(4):&&&&&&&&pool.apply(function, (i,))&&&&&print "Started processes"&&&&pool.close()&&&&pool.join()&&&&print "Subprocess done."
在这里只需要把apply_async改成apply即可。
运行结果如下:
Start process:
End process 0
Start process:
End process 1
Start process:
End process 2
Start process:
End process 3
Started processes
Subprocess done.
12345678910
Start process:&&0End process 0Start process:&&1End process 1Start process:&&2End process 2Start process:&&3End process 3Started processesSubprocess done.
这样一来就好理解了吧?
下面对函数进行解释:
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的。
close() 关闭pool,使其不在接受新的任务。
terminate() 结束工作进程,不在处理未完成的任务。
join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
当然每个进程可以在各自的方法返回一个结果。apply或apply_async方法可以拿到这个结果并进一步进行处理。
from multiprocessing import Lock, Pool
import time
def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
return index
if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
result = pool.apply_async(function, (i,))
print result.get()
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
12345678910111213141516171819
from multiprocessing import Lock, Poolimport time&&def function(index):&&&&print 'Start process: ', index&&&&time.sleep(3)&&&&print 'End process', index&&&&return index&if __name__ == '__main__':&&&&pool = Pool(processes=3)&&&&for i in xrange(4):&&&&&&&&result = pool.apply_async(function, (i,))&&&&&&&&print result.get()&&&&print "Started processes"&&&&pool.close()&&&&pool.join()&&&&print "Subprocess done."
运行结果:
Start process:
End process 0
Start process:
End process 1
Start process:
End process 2
Start process:
End process 3
Started processes
Subprocess done.
1234567891011121314
Start process:&&0End process 00Start process:&&1End process 11Start process:&&2End process 22Start process:&&3End process 33Started processesSubprocess done.
另外还有一个非常好用的map方法。
如果你现在有一堆数据要处理,每一项都需要经过一个方法来处理,那么map非常适合。
比如现在你有一个数组,包含了所有的URL,而现在已经有了一个方法用来抓取每个URL内容并解析,那么可以直接在map的第一个参数传入方法名,第二个参数传入URL数组。
现在我们用一个实例来感受一下:
from multiprocessing import Pool
import requests
from requests.exceptions import ConnectionError
def scrape(url):
print requests.get(url)
except ConnectionError:
print 'Error Occured ', url
print 'URL ', url, ' Scraped'
if __name__ == '__main__':
pool = Pool(processes=3)
'http://blog.csdn.net/',
'http://xxxyxxx.net'
pool.map(scrape, urls)
1234567891011121314151617181920212223
from multiprocessing import Poolimport requestsfrom requests.exceptions import ConnectionError&&def scrape(url):&&&&try:&&&&&&&&print requests.get(url)&&&&except ConnectionError:&&&&&&&&print 'Error Occured ', url&&&&finally:&&&&&&&&print 'URL ', url, ' Scraped'&&if __name__ == '__main__':&&&&pool = Pool(processes=3)&&&&urls = [&&&&&&&&'',&&&&&&&&'/',&&&&&&&&'http://blog.csdn.net/',&&&&&&&&'http://xxxyxxx.net'&&&&]&&&&pool.map(scrape, urls)
在这里初始化一个Pool,指定进程数为3,如果不指定,那么会自动根据CPU内核来分配进程数。
然后有一个链接列表,map函数可以遍历每个URL,然后对其分别执行scrape方法。
运行结果:
&Response [403]&
http://blog.csdn.net/
&Response [200]&
Error Occured
http://xxxyxxx.net
http://xxxyxxx.net
&Response [200]&
&Response [403]&URL&&http://blog.csdn.net/&&Scraped&Response [200]&URL&&https://&&ScrapedError Occured&&http://xxxyxxx.netURL&&http://xxxyxxx.net&&Scraped&Response [200]&URL&&http:///&&Scraped
可以看到遍历就这么轻松地实现了。
多进程multiprocessing相比多线程功能强大太多,而且使用范围更广,希望本文对大家有帮助!
转载请注明: &
or分享 (0)
您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请狠狠点击下面的

我要回帖

更多关于 如何在工作中做到诚信 的文章

 

随机推荐