Python中进程的同步和池化(代码实现和图解)

2021年3月26日17:39:24 发表评论 1,271 次浏览

先决条件– Python中的多处理|

本文讨论了与Python中的多处理相关的两个重要概念:

  • 进程之间的同步
  • 进程合并

进程之间的同步

进程同步被定义为一种机制, 该机制可确保两个或多个并发进程不会同时执行某些特定的程序段, 即关键部分.

关键部分是指程序中访问共享资源的部分。

例如, 在下图中, 3个进程尝试同时访问共享资源或关键部分。

Python中进程的同步和池化1

对共享资源的并发访问可能导致竞争条件。

当两个或多个进程可以访问共享数据并且它们试图同时更改它们时, 就会发生争用情况。结果, 变量的值可能是不可预测的, 并且取决于进程的上下文切换的时间而变化。

考虑下面的程序以了解竞争条件的概念:

# Python program to illustrate 
# the concept of race condition
# in multiprocessing
import multiprocessing
  
# function to withdraw from account
def withdraw(balance):    
     for _ in range ( 10000 ):
         balance.value = balance.value - 1
  
# function to deposit to account
def deposit(balance):    
     for _ in range ( 10000 ):
         balance.value = balance.value + 1
  
def perform_transactions():
  
     # initial balance (in shared memory)
     balance = multiprocessing.Value( 'i' , 100 )
  
     # creating new processes
     p1 = multiprocessing.Process(target = withdraw, args = (balance, ))
     p2 = multiprocessing.Process(target = deposit, args = (balance, ))
  
     # starting processes
     p1.start()
     p2.start()
  
     # wait until processes are finished
     p1.join()
     p2.join()
  
     # print final balance
     print ( "Final balance = {}" . format (balance.value))
  
if __name__ = = "__main__" :
     for _ in range ( 10 ):
  
         # perform same transaction process 10 times
         perform_transactions()

如果你在上述程序上运行, 你将获得一些非预期的值, 例如:

Final balance = 1311
Final balance = 199
Final balance = 558
Final balance = -2265
Final balance = 1371
Final balance = 1158
Final balance = -577
Final balance = -1300
Final balance = -341
Final balance = 157

在上述程序中, 以初始余额为100进行了10000次提款和10000次存款交易。预期的最终余额为100, 但我们在10次迭代中得到了perform_transactions函数是一些不同的值。

发生这种情况是由于进程同时访问共享数据平衡。余额的这种不可预测性不过是比赛条件.

让我们尝试使用下面给出的序列图更好地理解它。这些是在以上示例中针对单个撤回和存款操作可能产生的不同顺序。

这是一个可能的序列, 由于两个进程读取相同的值并相应地将其写回, 因此会给出错误的答案。

p1 p2 balance
read(balance)
current= 100
100
read(balance)
current= 100
100
balance=current-1 = 99
write(balance)
99
balance=current+ 1 = 101
write(balance)
101

这些是以上场景中需要的2个可能的序列。

p1 p2 balance
read(balance)
current= 100
100
balance=current-1 = 99
write(balance)
99
read(balance)
current= 99
99
balance=current+ 1 = 100
write(balance)
100
p1 p2 balance
read(balance)
current= 100
100
balance=current+ 1 = 101
write(balance)
101
read(balance)
current= 101
101
balance=current-1 = 100
write(balance)
100

使用锁

多处理模块提供了锁上课以应对比赛条件。锁是使用信号操作系统提供的对象。

信号量是一个同步对象, 它控制多个进程在并行编程环境中对公共资源的访问。它只是操作系统(或内核)存储中指定位置的值, 每个进程可以检查然后更改。根据找到的值, 该进程可以使用该资源, 或者会发现该资源已在使用中, 并且必须等待一段时间才能再次尝试。信号量可以是二进制的(0或1), 也可以具有其他值。通常, 使用信号量的进程会检查该值, 然后(如果使用资源)则更改该值以反映该值, 以便后续的信号量用户将知道等待。

考虑下面给出的示例:

# Python program to illustrate 
# the concept of locks
# in multiprocessing
import multiprocessing
  
# function to withdraw from account
def withdraw(balance, lock):    
     for _ in range ( 10000 ):
         lock.acquire()
         balance.value = balance.value - 1
         lock.release()
  
# function to deposit to account
def deposit(balance, lock):    
     for _ in range ( 10000 ):
         lock.acquire()
         balance.value = balance.value + 1
         lock.release()
  
def perform_transactions():
  
     # initial balance (in shared memory)
     balance = multiprocessing.Value( 'i' , 100 )
  
     # creating a lock object
     lock = multiprocessing.Lock()
  
     # creating new processes
     p1 = multiprocessing.Process(target = withdraw, args = (balance, lock))
     p2 = multiprocessing.Process(target = deposit, args = (balance, lock))
  
     # starting processes
     p1.start()
     p2.start()
  
     # wait until processes are finished
     p1.join()
     p2.join()
  
     # print final balance
     print ( "Final balance = {}" . format (balance.value))
  
if __name__ = = "__main__" :
     for _ in range ( 10 ):
  
         # perform same transaction process 10 times
         perform_transactions()

输出如下:

Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100

让我们尝试逐步理解上面的代码:

首先, 锁对象使用以下对象创建:

lock = multiprocessing.Lock()

然后,将lock作为目标函数参数传递:

p1 = multiprocessing.Process(target=withdraw, args=(balance, lock))
 p2 = multiprocessing.Process(target=deposit, args=(balance, lock))

在目标函数的临界区,我们使用lock.acquire()方法应用锁。一旦获得了锁,其他进程就不能访问它的临界区,直到使用lock.release()方法释放锁。

lock.acquire()
balance.value = balance.value - 1
lock.release()

正如你在结果中看到的, 最终余额每次都是100(这是预期的最终结果)。

进程之间的池化

让我们考虑一个简单的程序来查找给定列表中的数字平方。

# Python program to find 
# squares of numbers in a given list
def square(n):
     return (n * n)
  
if __name__ = = "__main__" :
  
     # input list
     mylist = [ 1 , 2 , 3 , 4 , 5 ]
  
     # empty list to store result
     result = []
  
     for num in mylist:
         result.append(square(num))
  
     print (result)

输出如下:

[1, 4, 9, 16, 25]

这是一个用于计算给定列表元素平方的简单程序。在多核/多处理器系统中, 请考虑下图以了解上述程序的工作方式:

Python中进程的同步和池化2

只有一个内核用于程序执行, 其他内核很可能保持空闲状态。

为了利用所有核心, 多处理模块提供了泳池类。的泳池类表示工作进程池。它具有允许以几种不同方式将任务卸载到工作进程的方法。考虑下图:

Python中进程的同步和池化3

在此, 任务由以下服务器自动在核心/进程之间卸载/分配:泳池目的。用户无需担心明确创建进程。

考虑下面给出的程序:

# Python program to understand 
# the concept of pool
import multiprocessing
import os
  
def square(n):
     print ( "Worker process id for {0}: {1}" . format (n, os.getpid()))
     return (n * n)
  
if __name__ = = "__main__" :
     # input list
     mylist = [ 1 , 2 , 3 , 4 , 5 ]
  
     # creating a pool object
     p = multiprocessing.Pool()
  
     # map list to target function
     result = p. map (square, mylist)
  
     print (result)

输出如下:

Worker process id for 2: 4152
Worker process id for 1: 4151
Worker process id for 4: 4151
Worker process id for 3: 4153
Worker process id for 5: 4152
[1, 4, 9, 16, 25]

让我们尝试逐步理解以上代码:

我们使用以下方法创建一个Pool对象:

p = multiprocessing.Pool()

对于获得更多的任务卸载控制,有一些参数。这些都是:

  • processes:指定工作进程数。
  • maxtasksperchild:指定每个孩子要分配的最大任务数。

可以使用以下参数使池中的所有进程执行一些初始化:

  • initializer:指定工作进程的初始化函数。
  • initargs:要传递给初始化程序的参数。

现在,为了完成某个任务,我们必须将它映射到某个函数。在上面的例子中,我们将mylist映射到square函数。这样mylist的内容和square的定义就会分布在核之间。

result = p.map(square, mylist)

所有工作进程完成任务后, 将返回带有最终结果的列表。

如果发现任何不正确的地方, 或者想分享有关上述主题的更多信息, 请写评论。

首先, 你的面试准备可通过以下方式增强你的数据结构概念:Python DS课程。

木子山

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: