Python分布式过程中遇到了哪些坑?我相信很多没有经验的人对此无能为力。因此,本文总结了问题产生的原因及解决方法。希望你能通过这篇文章解决这个问题。
小惊大怪
你是用Python3还是windows编程?最重要的是你对进程和线程不是很清楚?好了,恭喜你,在python分布式过程中,会有坑等着你去挖。(哈哈哈哈,在这里允许我吓唬你。)我只是开玩笑,但是如果你知道序列中不支持匿名函数,那么这个坑就要和你说再见了。好话不占多数,直接进入正题。
分布式进程
众所周知,Process比Thread更稳定,Process可以分发给多台机器,而Thread最多只能分发给同一台机器的多个CPU。Python的多处理模块不仅支持多处理,而且managers子模块支持将多处理分配给多台机器。服务进程可以充当调度器,将任务分配给其他进程,并依赖网络通信。因为managers模块封装良好,所以在不了解网络通信细节的情况下,很容易编写分布式多进程程序。
代码记录
举个例子
如果我们已经有了一个多进程程序,通过在同一台机器上运行的Queue进行通信,那么现在,因为处理任务的过程比较繁重,我们要把发送任务的过程和处理任务的过程分配到两台机器上。我们应该如何使用分布式流程来实现这一点?您已经知道原来的Queue可以继续使用,通过managers模块通过网络公开Queue,您可以让其他机器的进程访问Queue。好吧,那我们开始吧!
写个task_master.py
我们先来看看服务流程。服务进程负责启动队列,在网络上注册队列,然后将任务写入队列。
#!/user/bin/pyt thon #-*-coding : utf-8-*-# @ Times :2018/3/316:46 # @ author : lichexo # @ file : task _ master . pyimportrandom,time,quefromprocessing . managers import basemanager #发送任务的队列:task_queue=queue。Queue()#接收结果的队列: result _ queue=queue . queue()#。从BaseManager继承的queuemanager :类queuemanager(BaseManager): pass #在网络上注册了两个Queue。可调用参数与queue对象: queuemanager . register(' get _ task _ Queue ',Callable=lambda : task _ Queue)queuemanager . register(' get _ result _ Queue ',Callable=lambda : result _ Queue)#绑定端口5000,设置验证码' ABC' : manager=queue manager(地址=(' ',5000),Authkey=b ' ABC ')# start queue: manager . start()#获取通过访问的Queue对象将几个任务放入: for line change(10):n=random . randint(0,10000) print('放入任务% d.% n) task.put (n) #从结果队列3360print ('trygetresults.').
result.get(timeout=10) print('Result: %s' % r) # 关闭: manager.shutdown() print('master exit.')
请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。然后,在另一台机器上启动任务进程(本机上启动也可以)
写个task_worker.py
#!/user/bin/pytthon # -*- coding:utf-8 -*- # @Time: 2018/3/3 16:46 # @Author: lichexo # @File: task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 处理结束: print('worker exit.')
任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。
运行结果
现在,可以试试分布式进程的工作效果了。先启动task_master.py服务进程:
Traceback (most recent call last): File "F:/Python/untitled/xianchengjincheng/master.py", line 25, in <module> manager.start() File "F:Pythonpystalllibmultiprocessingmanagers.py", line 513, in start self._process.start() File "F:Pythonpystalllibmultiprocessingprocess.py", line 105, in start self._popen = self._Popen(self) File "F:Pythonpystalllibmultiprocessingcontext.py", line 322, in _Popen return Popen(process_obj) File "F:Pythonpystalllibmultiprocessingpopen_spawn_win32.py", line 65, in __init__ reduction.dump(process_obj, to_child) File "F:Pythonpystalllibmultiprocessing eduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) _pickle.PicklingError: Can't pickle <function <lambda> at 0x00000202D1921E18>: attribute lookup <lambda> on __main__ failed
task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:
Connect to server 127.0.0.1... Traceback (most recent call last): File "F:/Python/untitled/xianchengjincheng/work.py", line 24, in <module> m.connect() File "F:Pythonpystalllibmultiprocessingmanagers.py", line 489, in connect conn = Client(self._address, authkey=self._authkey) File "F:Pythonpystalllibmultiprocessingconnection.py", line 487, in Client c = SocketClient(address) File "F:Pythonpystalllibmultiprocessingconnection.py", line 614, in SocketClient s.connect(address) ConnectionRefusedError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。
看到没,结果都出错了,我们好好分析一下到底哪出错了。。。
错误分析
在task_master.py的报错提示中,我们知道它说lambda错误,这是因为序列化不支持匿名函数,所以我们得修改代码,重新对queue用QueueManager进行封装放到网络中。
# 把两个Queue都注册到网络上, callable参数关联了Queue对象 QueueManager.register('get_task_queue',callable=return_task_queue) QueueManager.register('get_result_queue',callable=return_result_queue)
其中task_queue和result_queue是两个队列,分别存放任务和结果。它们用来进行进程间通信,交换对象。
因为是分布式的环境,放入queue中的数据需要等待Workers机器运算处理后再进行读取,这样就需要对queue用QueueManager进行封装放到网络中,这是通过上面的2行代码来实现的。我们给return_task_queue的网络调用接口取了一个名get_task_queue,而return_result_queue的名字是get_result_queue,方便区分对哪个queue进行操作。task.put(n)即是对task_queue进行写入数据,相当于分配任务。而result.get()即是等待workers机器处理后返回的结果。
值得注意 在windows系统中你必须要写IP地址,而其他操作系统比如linux操作系统则就不要了。
# windows需要写ip地址 manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
修改后的代码
在task_master.py中修改如下:
#!/user/bin/pytthon # -*- coding:utf-8 -*- # @Time: 2018/3/3 16:46 # @Author: lichexo # @File: task_master.py # task_master.py import random,time,queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support task_queue = queue.Queue() # 发送任务的队列: result_queue = queue.Queue() # 接收结果的队列: class QueueManager(BaseManager): # 从BaseManager继承的QueueManager: pass # windows下运行 def return_task_queue(): global task_queue return task_queue # 返回发送任务队列 def return_result_queue (): global result_queue return result_queue # 返回接收结果队列 def test(): # 把两个Queue都注册到网络上, callable参数关联了Queue对象,它们用来进行进程间通信,交换对象 #QueueManager.register('get_task_queue', callable=lambda: task_queue) #QueueManager.register('get_result_queue', callable=lambda: result_queue) QueueManager.register('get_task_queue', callable=return_task_queue) QueueManager.register('get_result_queue', callable=return_result_queue) # 绑定端口5000, 设置验证码'abc': #manager = QueueManager(address=('', 5000), authkey=b'abc') # windows需要写ip地址 manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') manager.start() # 启动Queue: # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() for i in range(10): # 放几个任务进去: n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(10): # 这里加了异常捕获 try: r = result.get(timeout=5) print('Result: %s' % r) except queue.Empty: print('result queue is empty.') # 关闭: manager.shutdown() print('master exit.') if __name__=='__main__': freeze_support() print('start!') test()
在task_worker.py中修改如下:
#!/user/bin/pytthon # -*- coding:utf-8 -*- # @Time: 2018/3/3 16:46 # @Author: lichexo # @File: task_worker.py # task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except queue.Empty: print('task queue is empty.') # 处理结束: print('worker exit.')
先运行task_master.py,然后再运行task_worker.py
(1)task_master.py运行结果如下
start! Put task 7872... Put task 6931... Put task 1395... Put task 8477... Put task 8300... Put task 1597... Put task 8738... Put task 8627... Put task 1884... Put task 2561... Try get results... Result: 7872 * 7872 = 61968384 Result: 6931 * 6931 = 48038761 Result: 1395 * 1395 = 1946025 Result: 8477 * 8477 = 71859529 Result: 8300 * 8300 = 68890000 Result: 1597 * 1597 = 2550409 Result: 8738 * 8738 = 76352644 Result: 8627 * 8627 = 74425129 Result: 1884 * 1884 = 3549456 Result: 2561 * 2561 = 6558721 master exit.
(2)task_worker.py运行结果如下
Connect to server 127.0.0.1... run task 8640 * 8640... run task 7418 * 7418... run task 9303 * 9303... run task 568 * 568... run task 1633 * 1633... run task 3583 * 3583... run task 3293 * 3293... run task 8975 * 8975... run task 8189 * 8189... run task 731 * 731... worker exit.
知识补充
这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。
Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:
而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。task_worker这里的QueueManager注册的名字必须和task_manager中的一样。对比上面的例子,可以看出Queue对象从另一个进程通过网络传递了过来。只不过这里的传递和网络通信由QueueManager完成。
authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定连接不上。
看完上述内容,你们掌握Python分布式进程中会遇到的坑都有哪些呢的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/50249.html