python多进程编程共享锁进程池队列通讯技术一览
多进程编程指在一个程序中同时运行多个进程,这些进程可以并行执行不同的任务,从而提高程序执行效率。相比单进程程序,多进程可以充分利用多核CPU的处理能力。
进程间的通信非常重要。进程间可以通过Queue、Pipe等方式进行数据传递和同步。比如生产者-消费者模式,生产者进程将数据放入Queue,消费者进程从Queue取出数据进行处理。在多进程环境下,由于多个进程可能同时操作共享资源,所以需要使用Lock等同步原语对共享资源进行互斥访问,避免竞争条件。比如更新全局变量时需要先获取Lock,确保同一时刻只有一个进程进入临界区。Python的multiprocessing模块提供了多进程编程的各种组件,包括Process类代表进程,Queue用于进程通信,Lock/Semaphore用于同步等。我们可以通过进程池的方式批量创建进程,通过map()分配任务。需要注意,多进程编程的开销也比较大,创建进程耗费时间和额外内存。不能无限制地创建进程,需要根据机器性能设置合理的进程数。此外进程间共享数据复杂,最好通过消息传递方式避免直接共享内存。
下面我们一个个举例说明:
进程间的通信和同步
在多进程环境下,进程间通信变得尤为重要。进程可以通过Queue、Pipe等方式进行数据传递。例如:#!/usr/local/python3/bin/python3 # -*- coding: utf-8 -* import multiprocessing def producer(queue): for i in range(10): queue.put(i) queue.put(None) # 在队列中放入None来通知消费者进程结束 def consumer(queue): while True: data = queue.get() if data is None: break print(data) if __name__ == '__main__': queue = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(queue,)) p2 = multiprocessing.Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join()运行效果:
上面通过multiprocessing.Queue()实现了进程间通信。生产者进程将数据放入队列,消费者进程从队列取出数据。
共享资源需要加锁
另外,对共享资源需要加锁,避免多个进程同时访问导致数据混乱。例如:
#!/usr/local/python3/bin/python3 # -*- coding: utf-8 -* import multiprocessing def increment(lock, counter): for _ in range(100): with lock: counter.value += 1 print(counter.value) if __name__ == '__main__': lock = multiprocessing.Lock() counter = multiprocessing.Value('i', 0) p1 = multiprocessing.Process(target=increment, args=(lock, counter)) p2 = multiprocessing.Process(target=increment, args=(lock, counter)) p1.start() p2.start() p1.join() p2.join() print(counter.value)运行结果
上面通过multiprocessing.Lock()对共享计数器counter进行保护,确保同一时刻只有一个进程进入临界区。
进程池(Pool)
我们还可以通过multiprocessing.Pool创建进程池,实现进程复用。#!/usr/local/python3/bin/python3 # -*- coding: utf-8 -* from multiprocessing import Pool import os, time def worker(filename): print('Processing %s' % filename) # Pretend to do something time.sleep(1) return '%s processed' % filename if __name__ == '__main__': filenames = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt', 'file5.txt', 'file6.txt'] p = Pool(3) start = time.time() results = p.map(worker, filenames) end = time.time() print('Took %.2f seconds' % (end - start))运行结果
上面根据CPU核心数创建了一个进程池,然后使用pool.map()对多个文件并行处理。比顺序处理更快。需要注意进程数的选择,以及进程间同步、通信的机制。合理利用多进程可以提高计算密集型任务的执行效率。
网友评论0