Python的多进程与多线程
最近有个Python项目要搞,之前学习的都忘差不多了,利用点空余时间把Python再捡一捡。
一、python多进程
因为Python的多线程有全局锁,导致单进程下没法利用多核并行,多进程可以规避这个限制,多核跑起来。
fork
fork比较基础了,是unix类操作系统使用的生成子进程的方法。两个返回值,一个是父进程的返回子进程id,一个是子进程得到的0。 python是在os模块里使用。
import os
arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
pid = os.fork()
# f(x)平方
def f(x):
return x*x
# 子进程
if pid == 0:
print ("child process pid=", os.getpid(), 'ppid', os.getppid())
arr1 = arr[len(arr)//2:]
print(list(map(f, arr1)))
# 父进程
else:
print("curpid is %d" % pid)
arr2 = arr[:len(arr)//2]
print(list(map(f, arr2)))
multiprocessing
fork在类unix下使用,python有更好用的模块处理。multiprocessing跨平台的多进程模块。
多进程间通信,可以有提供使用的队列Queue,管道Pipe,锁lock 共享数据,可以Value,Array等。具体查看手册
from multiprocessing import Process, Value, Array, Queue
import os, time
# 子进程执行
def run_proc(name, n, arr, q):
time.sleep(1)
print('Run child process %s (%s)...' % (name, os.getpid()))
# n.value是数字
print(n.value)
print(list(x * x for x in arr))
# 放一个对象进queue
q.put({os.getpid(): int(time.time())})
if __name__ == '__main__':
# 可以使用 Value 或 Array 将数据存储在共享内存映射中
# d是double
num = Value('d', 1.0)
# i是integer
arr = Array('i', range(10))
# queue 提供的进程间通信一种方式,线程安全
queue = Queue()
p1 = Process(target=run_proc, args=("test1", num, arr, queue))
p2 = Process(target=run_proc, args=("test2", num, arr, queue))
p1.start()
p2.start()
p1.join()
p2.join()
# 主进程获取队列
totalProcess = 2
while totalProcess > 0:
print(queue.get())
totalProcess -= 1
Pool
Python也提供了进程池的工具,多核下可以利用多核进行并行操作,通过使用子进程而非线程有效地绕过了全局解释器锁。
import multiprocessing as mp
import os, random, time
def task(name):
print('Run task %s, pid (%s)' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 1)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
def f(x):
return x*x
if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
poolsize = mp.cpu_count() * 2
p = mp.Pool(poolsize)
for i in range(poolsize):
# apply_async 下发任务
p.apply_async(task, args=(i,))
print('Waiting for all subprocesses done...')
# map() 会将可迭代对象分割为许多块,然后提交给进程池
aftermap = p.map(f, range(10))
print(aftermap)
# 关闭pool
p.close()
p.join()
二、python多线程
前边介绍了Python的多进程操作,这里记下多线程操作。
创建一个线程
使用threading模块,把函数传入并创建Thread实例,再调用start()开始执行
import time, threading
def f(x):
time.sleep(1)
# threading.current_thread().name 获取线程名字
print('thread %s is running... params %s' % (threading.current_thread().name, x))
if __name__ == '__main__':
threads = []
for i in range(10):
# target 函数名,name线程名字,args传参
t = threading.Thread(target=f, name='workerThreadName', args=(i,))
threads.append(t)
# 开始执行
t.start()
for t in threads:
t.join()
# 这里主线程结束,其他线程也会继续运行
print('thread %s ended.' % threading.current_thread().name)
线程池
直接创建线程的方式比较直接,但存在这样的问题,不好控制线程总数,如果线程启动过多反而会影响整体执行效率。
解决的办法可以使用池化,控制一个总数,使用过的线程再放回池里。
from concurrent.futures import ThreadPoolExecutor
import threading, time
def f(x):
time.sleep(1)
print("thread %s running.. params %s" % (threading.current_thread().name, x))
if __name__ == '__main__':
thread_pool = ThreadPoolExecutor(8)
for i in range(8):
# f执行函数,i传参
thread_pool.submit(f, i)
thread_pool.shutdown()
print("thread %s end" % threading.current_thread().name)
with as
创建一个池,再显示关闭它。这种写法可以使用Python提供的上下文管理器 with as 语法。
# with as 可以免去手动调用 shutdown()
with ThreadPoolExecutor(8) as executor:
for i in range(8):
thread_pool.submit(f, i)
# 这里就已经调用过shutdown()
print("thread %s end" % threading.current_thread().name)
获取任务执行结果
submit返回Future对象,future.result()获取函数返回值
from concurrent.futures import ThreadPoolExecutor
import threading, time
def f(x):
time.sleep(1)
print("thread %s running.. params %s" % (threading.current_thread().name, x))
return x*x
if __name__ == '__main__':
futures = []
with ThreadPoolExecutor(8) as executor:
for i in range(8):
# submit返回future对象
futures.append(executor.submit(f, i))
# future.result() 获取任务返回值
for future in futures:
# 因为此时这里已经shutdown(),所以可以直接取值
# 否则是个阻塞操作,可选超时时间
print(future.result())
print("thread %s end" % threading.current_thread().name)
线程互斥锁
线程之间操作共享变量,要加锁操作
lock = threading.Lock()
# 获取锁
lock.acquire()
# 释放锁
lock.release()
当然多线程处理还有其他细节,工程经验或者查阅手册等。