Python 多进程使用
最近有个Python项目要搞,之前学习的都忘差不多了,利用点空余时间把Python再捡一捡。 多进程处理还是记录个笔记,以免后边忘了翻看。 因为Python的多线程有全局锁,导致单进程下没法利用多核并行,多进程可以规避这个限制,多核跑起来。
fork
fork比较基础了,是unix类操作系统使用的生成子进程的方法。两个返回值,一个是父进程的返回子进程id,一个是子进程得到的0。 python是在os模块里使用。
import os
arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
pid = os.fork()
# f(x)平方
def f(x):
return x*x
# 子进程
if pid == 0:
print ("child process pid=", os.getpid(), 'ppid', os.getppid())
arr1 = arr[len(arr)//2:]
print(list(map(f, arr1)))
# 父进程
else:
print("curpid is %d" % pid)
arr2 = arr[:len(arr)//2]
print(list(map(f, arr2)))
multiprocessing
fork在类unix下使用,python有更好用的模块处理。multiprocessing跨平台的多进程模块。
多进程间通信,可以有提供使用的队列Queue,管道Pipe,锁lock 共享数据,可以Value,Array等。具体查看手册
from multiprocessing import Process, Value, Array, Queue
import os, time
# 子进程执行
def run_proc(name, n, arr, q):
time.sleep(1)
print('Run child process %s (%s)...' % (name, os.getpid()))
# n.value是数字
print(n.value)
print(list(x * x for x in arr))
# 放一个对象进queue
q.put({os.getpid(): int(time.time())})
if __name__ == '__main__':
# 可以使用 Value 或 Array 将数据存储在共享内存映射中
# d是double
num = Value('d', 1.0)
# i是integer
arr = Array('i', range(10))
# queue 提供的进程间通信一种方式,线程安全
queue = Queue()
p1 = Process(target=run_proc, args=("test1", num, arr, queue))
p2 = Process(target=run_proc, args=("test2", num, arr, queue))
p1.start()
p2.start()
p1.join()
p2.join()
# 主进程获取队列
totalProcess = 2
while totalProcess > 0:
print(queue.get())
totalProcess -= 1
Pool
Python也提供了进程池的工具,多核下可以利用多核进行并行操作,通过使用子进程而非线程有效地绕过了全局解释器锁。
import multiprocessing as mp
import os, random, time
def task(name):
print('Run task %s, pid (%s)' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 1)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
def f(x):
return x*x
if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
poolsize = mp.cpu_count() * 2
p = mp.Pool(poolsize)
for i in range(poolsize):
# apply_async 下发任务
p.apply_async(task, args=(i,))
print('Waiting for all subprocesses done...')
# map() 会将可迭代对象分割为许多块,然后提交给进程池
aftermap = p.map(f, range(10))
print(aftermap)
# 关闭pool
p.close()
p.join()