作者:杨冬 欢迎转载,也请保留这段声明。谢谢!
出处:https://andyyoung01.github.io/ 或 http://andyyoung01.16mb.com/
前篇了解了Python中线程和进程的基本概念,本篇看看实际的代码实例。Python中的多线程一般是通过其内置的threading模块提供的接口来实现的,多进程是通过multiprocessing模块实现的。本篇的代码实例是基于Python3的。
1. Python多进程编程
1.1 Process类
在Python中,通过multiprocessing模块内的Process类来创建新的进程。例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) if hasattr(os, 'getppid'): print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main line') p = Process(target=f, args=('bob',)) p.start() p.join()
|
通过Process类中的start方法来启动新的进程。有三种不同的启动方式:spawn,fork和forkserver。这三种方式的详细区别请参考官方文档。
Process类中的join方法的作用是等待子进程结束后再继续往下执行。
1.2 进程间通信
multiprocessing模块内包括两种进程间通信的方式:Queues和Pipes;同时也提供了其它方式在进程间共享状态,不过官方文档也提到在并发程序设计中尽量避免这种方式。
先来看一下Queues的例子:
1 2 3 4 5 6 7 8 9 10 11
| from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) p.join()
|
multiprocessing模块中的Queue几乎是queue.Queue的克隆,并且Queues是线程和进程安全的,这意味着多个线程或进程访问同一个Queue实例时,不会出现数据混乱的状况。下面来看一个使用Pipe的例子:
1 2 3 4 5 6 7 8 9 10 11 12
| from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join()
|
Pipe方法返回一对连接对象,代表管道的两端,每个连接对象都有send()和recv()方法。如果不同的进程(或线程)在同一时刻试图在管道的同一端进行读写,数据可能会出现混乱,在不同端是没有问题的。Python中的对象大部分都不是进程(或线程)安全的,所以在不同的进程(或线程)操作同一对象时,需要使用同步机制。
1.3 进程的同步机制
进程的同步是通过Lock实现的。实例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
|
1.4 使用进程池
Pool类提供了一个批量创建多个子进程的方法,可以给定子进程数的上限,避免无限地消耗系统的资源。看一下官方文档的实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| from multiprocessing import Pool, TimeoutError import time import os def f(x): return x*x if __name__ == '__main__': with Pool(processes=4) as pool: print(pool.map(f, range(10))) for i in pool.imap_unordered(f, range(10)): print(i) res = pool.apply_async(f, (20,)) print(res.get(timeout=1)) res = pool.apply_async(os.getpid, ()) print(res.get(timeout=1)) multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("We lacked patience and got a multiprocessing.TimeoutError") print("For the moment, the pool remains available for more work") print("Now the pool is closed and no longer available")
|
2. Python多线程编程
Python中使用线程的方式有很多都与使用进程的方式类似,这里就不再列出实例代码。
2.1 Thread类
在Python中,通过threading模块内的Thread类来创建新的进程。通过Thread类中的start方法启动线程,join方法等待线程结束。
2.2 线程间通信
线程间最简单的通信方式是通过threading模块内的Event类。当然也可以通过共享对象来在线程间共享状态信息。由于queue.Queue是线程和进程安全的,所以它也是线程通信使用理想对象。其它对象大部分都不是线程(或进程)安全的,所以在不同的线程(或进程)操作同一对象时,需要使用同步机制。
2.3 线程的同步机制
threading模块中提供了Lock,RLock,Condition,Semaphore等类进行线程间的同步。
关于threading模块中更详细的使用信息,请参考官方文档。