
解决Python多线程冲突:内存共享与队列应用详解
我们在 Python 中使用线程时,可能会遇到一个常见的问题:不同线程之间怎么安全地共享数据。虽然 Python 线程本身是共享同一个内存空间的,但在处理多个线程同时访问同一个数据时,很容易出现冲突和数据不一致的情况。为了避免这些问题,Python 提供了一些工具和技巧,比如队列、锁和事件,帮助我们在保持并发的同时,也能确保数据共享的安全性和稳定性。
这篇文章我们聊一下在多线程环境下,怎么处理这些数据共享的问题。
共享内存
第一种最简单的方法是在不同的线程中使用相同的变量。我们在之前的教程中已经使用过这个特性,但没有明确讨论过。让我们通过一个非常简单的例子来看看如何使用共享内存:
from threading import Thread, Event
from time import sleep
# 创建一个事件对象,用于控制线程的终止
event = Event()
# 定义一个函数,在线程中不断修改传入的列表
def modify_variable(var):
while True:
# 循环修改列表中的每个元素,每次将其加1
for i in range(len(var)):
var[i] += 1
# 检查事件是否已被触发,如果是则跳出循环
if event.is_set():
break
# 暂停0.5秒,避免修改速度过快
sleep(0.5)
print('停止打印') # 当事件触发时,线程会打印这句话
# 定义一个列表变量
my_var = [1, 2, 3]
# 创建并启动一个线程,目标函数是 modify_variable,传入 my_var 列表作为参数
t = Thread(target=modify_variable, args=(my_var, ))
t.start()
# 主线程中不断打印列表的当前状态
while True:
try:
print(my_var)
# 暂停1秒
sleep(1)
except KeyboardInterrupt:
# 捕捉键盘中断 (Ctrl+C),触发事件来通知子线程停止
event.set()
break
# 等待子线程结束
t.join()
# 最后打印列表的最终状态
print(my_var)
上面的示例我们通过传递一个参数 my_var
(一个数字列表)来启动一个新线程。线程将以一定的延迟增加数字的值,同时我们使用事件来结束线程。
在这个示例中,重要的代码部分是第 print(my_var)
行。该打印语句位于主线程中,但是它可以访问在子线程中生成的信息。这种行为是可能的,由于不同线程之间的内存共享。虽然能够访问相同的内存空间是一件很方便的事情,但也可能带来一些风险。
让我们稍微修改在线程中运行的代码。让我们移除 sleep
:
def modify_variable(var):
while True:
for i in range(len(var)):
var[i] += 1
if event.is_set():
break
# sleep(.5)
print('停止打印')
现在,当我们运行代码时,会在一次迭代和下一次迭代之间没有休眠。让我们运行它一小段时间,比如说 5 秒,我们可以这样做:
from time import time
[...]
my_var = [1, 2, 3]
t = Thread(target=modify_variable, args=(my_var, ))
t.start()
t0 = time()
while time()-t0 < 5:
print(my_var)
sleep(1)
event.set()
t.join()
print(my_var)
上面省略了重复的代码部分。如果你运行这段代码,将会得到非常大的输出结果。在我的情况下,我得到了:
[6563461, 6563462, 6563463]
然而,有一个非常重要的特性需要注意。这三个数字是连续的。这是预期的,因为起始变量是 [1, 2, 3]
,我们对每个变量都加了一。让我们再次启动第二个线程,看看输出是什么:
my_var = [1, 2, 3]
t = Thread(target=modify_variable, args=(my_var, ))
t2 = Thread(target=modify_variable, args=(my_var, ))
t.start()
t2.start()
t0 = time()
while time()-t0 < 5:
try:
print(my_var)
sleep(1)
except KeyboardInterrupt:
event.set()
break
event.set()
t.join()
t2.join()
print(my_var)
我已经得到如下数值作为输出:
[5738447, 5686971, 5684220]
你可以首先注意到这两个点:
第一点是运行两个线程得出的结果比运行一个线程小。
第二点是输出的结果值并不是连续的。
这是在使用多个线程处理 Python 时可能出现的非常重要的行为。在上一个教程中,我们讨论了线程是由操作系统处理的,操作系统决定何时启动或关闭线程。我们无法控制操作系统的决定。在上面的例子中,由于循环中没有 sleep
,操作系统将不得不决定何时停止一个线程并启动另一个线程。然而,这并不能完全解释我们得到的输出。不管一个线程先运行还是停止,等等,我们总是对每个元素添加 +1
。
上述代码的问题在于第 var[i] += 1
行,实际上涉及两个操作。首先,它从 var[i]
复制值并添加 1
。然后将值存回 var[i]
。在这两个操作之间,操作系统可能决定从一个任务切换到另一个任务。在这种情况下,两个任务在列表中看到的值是相同的,因此我们只添加 +1
一次,而不是两次。如果你想让它更加明显,你可以启动两个线程,一个向列表添加,另一个从列表减去,这样可以快速提示哪个线程运行得更快。在我的案例中,我得到了以下输出:
[-8832, -168606, 2567]
但如果我再次运行它,我会得到:
[97998, 133432, 186591]
注意
你可能会注意到两个线程的 start
之间存在延迟,这会给第一个启动的线程带来一定的优势。
如何同步数据访问
为了解决我们在上面示例中找到的问题,我们必须确保没有两个线程同时尝试写入相同的变量。为此,我们可以使用一个 Lock
:
from threading import Lock
[...]
data_lock = Lock()
def modify_variable(var):
while True:
with data_lock:
for i in range(len(var)):
var[i] += 1
if event.is_set():
break
# sleep(.5)
print('停止打印')
请注意,我们在函数中添加了一行 with data_lock:
。如果再次运行代码,你将看到我们获得的值总是连续的。锁确保只有一个线程会同时访问变量。
下面我们看第二种数据共享的方式,队列。
队列
线程常用的情况之一是当你有一些无法优化的慢任务时。例如,假设你正在从网站下载数据。大部分时间处理器都会处于空闲状态。这意味着你可以利用这段时间做其他事情。如果你想要下载整个网站(也称为抓取),那么同时下载多个页面将是一个很好的解决方案。想象一下,你有一个要下载的页面列表,然后启动多个线程,每个线程下载一个页面。如果你在实现这个过程时不小心,可能会出现重复下载的问题,就像我们在上一节中看到的那样。
这是在使用线程时另一个对象非常有用的地方:队列。队列是一个按顺序接受数据的对象,即你一次向其中放入一个元素的数据。然后,数据可以按照相同的顺序被消费,称为先进先出(FIFO)。一个非常简单的例子是:
from queue import Queue
queue = Queue()
for i in range(20):
queue.put(i)
while not queue.empty():
data = queue.get()
print(data)
在这个例子中,你会看到我们创建了一个 Queue
,然后我们将数字从 0 到 19 放入队列中。稍后,我们创建了一个 while
循环,它从队列中取出数据并将其打印出来。这是 Python 中队列的基本行为。你应该注意到打印的数字顺序与它们添加到队列中的顺序相同。
回到文章开头的例子,我们可以使用队列来在线程之间共享信息。我们可以修改函数,使其不再接受列表作为参数,而是接受一个队列,从中读取元素。然后,它将把结果输出到一个输出队列:
from threading import Thread, Event
from queue import Queue
from time import sleep, time
event = Event()
def modify_variable(queue_in, queue_out):
while True:
if not queue_in.empty():
var = queue_in.get()
for i in range(len(var)):
var[i] += 1
queue_out.put(var)
if event.is_set():
break
print('停止打印')
要使用上面的代码,我们需要创建两个队列。思路是我们也可以创建两个线程,其中输入和输出队列被颠倒。在这种情况下,一个线程将其输出放在第二个线程的队列上,反之亦然。这会看起来像这样:
my_var = [1, 2, 3]
queue1 = Queue()
queue2 = Queue()
queue1.put(my_var)
t = Thread(target=modify_variable, args=(queue1, queue2))
t2 = Thread(target=modify_variable, args=(queue2, queue1))
t.start()
t2.start()
t0 = time()
while time()-t0 < 5:
try:
sleep(1)
except KeyboardInterrupt:
event.set()
break
event.set()
t.join()
t2.join()
if not queue1.empty():
print(queue1.get())
if not queue2.empty():
print(queue2.get())
在我的情况下,我得到的输出是:
[871, 872, 873]
迄今为止,这比我们看到的任何东西都小得多,但至少我们成功地在两个不同的线程之间共享了数据,而没有任何冲突。这种慢速的来源是什么?让我们尝试以科学的方法来解决问题并且逐个研究每个部分。其中最有趣的事情之一是,在尝试运行代码的其余部分之前,我们要检查队列是否为空。我们可以监控程序的重要部分实际运行所花费的时间:
def modify_variable(queue_in: Queue, queue_out: Queue):
internal_t = 0
while True:
if not queue_in.empty():
t0 = time()
var = queue_in.get()
for i in range(len(var)):
var[i] += 1
queue_out.put(var)
internal_t += time()-t0
if event.is_set():
break
sleep(0.1)
print(f'运行时间: {internal_t} 秒n')
唯一的变化是在函数中增加了一个新变量,称为 internal_t
。然后,我们监视计算的时间并将其放入新线程。如果我们再次运行代码,你应该得到类似以下的输出:
运行时间: 0.0006377696990966797 秒
运行时间: 0.0003573894500732422 秒
这意味着在我们程序运行的 5 秒钟中,只有大约 0.9 毫秒我们实际在做某事。这仅占 0.01%的时间!让我们快速看看如果我们改变代码,只使用一个队列而不是两个会发生什么,即输入和输出队列将是相同的:
t = Thread(target=modify_variable, args=(queue1, queue1))
t2 = Thread(target=modify_variable, args=(queue1, queue1))
仅通过这个改变,我得到了如下输出:
运行时间: 4.290639877319336 秒
运行时间: 4.355865955352783 秒
这好多了!在程序运行的约 5 秒钟内,线程总共运行了 8 秒。这正是并行化的预期效果。此外,循环的输出要大得多。
[710779, 710780, 710781]
我们思考一下,如果我们使用两个队列,为什么程序运行得这么慢?但如果我们将输出和输入使用同一个队列,程序会运行得相当快呢?
当像我们在之前的例子中盲目地使用线程时,我们把一切都交给了操作系统。我们无法控制操作系统决定从一个任务切换到另一个任务。在上面的代码中,我们检查队列是否为空。很可能操作系统决定优先处理一个基本上什么都不做,只是等待队列中有元素的任务。如果这种情况发生未同步,大部分时间程序将一直等待队列中有元素(始终优先处理错误的任务)。但当我们将相同的任务用于输入和输出时,无论运行哪个任务都没关系,总会有东西可以进行。
如果你想要查看先前的猜测是否成立,我们可以对其进行测量。我们只有一个 if
语句来检查 queue.empty()
,我们可以添加一个 else
来累积程序实际上没有做任何事情的时间:
def modify_variable(queue_in: Queue, queue_out: Queue):
internal_t = 0
sleeping_t = 0
while True:
if not queue_in.empty():
t0 = time()
var = queue_in.get()
for i in range(len(var)):
var[i] += 1
queue_out.put(var)
internal_t += time()-t0
else:
t0 = time()
sleep(0.001)
sleeping_t += time()-t0
if event.is_set():
break
sleep(0.1)
print(f'运行时间: {internal_t} 秒')
print(f'睡眠时间: {sleeping_t} 秒')
在上面的代码中,如果队列是空的,程序将会休眠 1 毫秒。当然,这并不是最好的做法,但我们可以假设 1 毫秒对程序整体性能没有实质影响。当我运行上面的程序时,使用两个不同的队列,我得到以下输出:
运行时间: 0.0 秒
睡眠时间: 5.001126289367676 秒
运行时间: 0.00018215179443359375 秒
睡眠时间: 5.001835107803345 秒
[4126, 4127, 4128]
很明显,大部分时间程序只是在等待,直到队列中有更多数据可用。因为每次没有数据可用时我们都会休眠 1 毫秒,实际上会使程序变得更慢。但我认为这是一个很好的例子。我们可以将其与在输入和输出上使用相同队列进行比较。
运行时间: 3.1206254959106445 秒
睡眠时间: 1.3756272792816162 秒
运行时间: 3.253162145614624 秒
睡眠时间: 1.136244535446167 秒
现在你看到,即使因为睡眠而浪费了一些时间,我们大部分时间实际上都在执行计算。
当使用相同的队列进行输入和输出时,唯一需要注意的是,在检查队列是否为空和实际从中读取之间,可能会发生另一个线程抓取结果的情况。这在队列文档中有描述。除非我们自己包含一个 Lock
,队列可以被任何线程读取和写入。锁只对 get
或 put
命令生效。
队列的额外选项
队列有一些额外选项,比如它们可以容纳的元素的最大数量。你还可以定义后进先出(LIFO)类型的队列,你可以在文档中了解相关信息。我发现 Queues
更有用的是它们是用纯 Python 编写的。如果你访问它们的源代码,你可以学到很多关于线程同步、自定义异常和文档编制的知识。
重要的是要注意的是,当你使用多个线程时,有时你希望等待(即阻止执行),有时则不希望。在上面的示例中,我们一直在读取队列之前检查队列是否为空。但是如果我们不检查呢? get
方法有两个选项: block
和 timeout
。第一个用于确定我们是否希望程序等待直到有元素可用。第二个是指定我们希望等待的秒数。在那段时间后,会引发异常。如果我们将 block
设置为 false,并且队列为空,就会立即抛出异常。
我们可以修改函数 modify_variable
以利用这一点:
def modify_variable(queue_in: Queue, queue_out: Queue):
internal_t = 0
while True:
t0 = time()
var = queue_in.get()
for i in range(len(var)):
var[i] += 1
queue_out.put(var)
internal_t += time()-t0
if event.is_set():
break
sleep(0.1)
print(f'运行时间: {internal_t} 秒n')
使用此代码,为输入和输出使用不同队列,我得到以下结果:
运行时间: 4.914130210876465 秒
运行时间: 4.937211513519287 秒
[179992, 179993, 179994]
这比我们之前得到的要好得多。但这种计时方式并不是真正公平,很多时间都是在 get
函数中等待,但我们仍然把那些时间计算在内。如果我们把 t0 = time()
的代码移到 get
的下面一行,那么实际运行代码的时间就会有很大的不同:
运行时间: 0.7706246376037598 秒
运行时间: 0.763786792755127 秒
[177807, 177808, 177809]
所以现在你看到了,也许我们在之前的示例中应该以不同的方式计算时间,特别是当我们在使用相同的队列进行输入和输出时。
如果我们不想让程序在等待获取时被阻塞,可以这样做:
from queue import Empty
[...]
try:
var = queue_in.get(block=False)
except Empty:
continue
或者,我们可以像这样指定超时时间:
try:
var = queue_in.get(block=True, timeout=0.001)
except Empty:
continue
在这种情况下,我们要么不等待( block==False
),捕获异常,要么等待最多 1 毫秒( timeout=0.001
),然后捕获异常。你可以尝试调整这些选项,看看你的代码性能是否得到优化。
停止线程的队列
到目前为止,我们一直使用锁来停止线程,然而,还有另一种可能性,即通过向队列附加特殊信息来控制线程的流动。一个非常简单的例子是向队列添加一个元素 None
,当函数获取到它时,便停止执行。代码将如下所示:
[...]
var = queue_in.get()
if var is None:
break
然后,在脚本的主要部分,当我们想要停止线程时,我们执行以下操作:
queue1.put(None)
queue2.put(None)
如果你想知道为什么要选择其中一种选项,答案实际上非常简单。我们正在处理的示例,队列中始终最多只有一个元素。当我们停止程序时,我们知道队列中的所有内容都已被处理。然而,请想象一下,如果程序正在处理一组彼此之间没有关联的元素。例如,从网站下载数据或处理图像等情况下会出现这种情况。你希望确保在停止线程之前完成所有处理。在这种情况下,向队列添加一个特殊值可以保证所有元素都将被处理。
结论
访问共享内存可以使程序开发变得非常快速,但当不同线程对相同元素进行读写时,可能会出现问题,在这篇文章中,我们已经讨论了如何解决这种线程安全问题,即如何在线程之间安全地共享数据,一个是线程之间共享内存,另一个是使用队列。
写作不易,欢迎关注
Previous Post
深入理解Python中的深拷贝和浅拷贝评论已关闭。