跳转至主要内容

Python编程

解决Python多线程冲突:内存共享与队列应用详解

Sprite
发表于 2024年10月22日

我们在 Python 中使用线程时,可能会遇到一个常见的问题:不同线程之间怎么安全地共享数据。虽然 Python 线程本身是共享同一个内存空间的,但在处理多个线程同时访问同一个数据时,很容易出现冲突和数据不一致的情况。为了避免这些问题,Python 提供了一些工具和技巧,比如队列、锁和事件,帮助我们在保持并发的同时,也能确保数据共享的安全性和稳定性。

这篇文章我们聊一下在多线程环境下,怎么处理这些数据共享的问题。

共享内存

第一种最简单的方法是在不同的线程中使用相同的变量。我们在之前的教程中已经使用过这个特性,但没有明确讨论过。让我们通过一个非常简单的例子来看看如何使用共享内存:

from threading import Thread, Event
from time import sleep

# 创建一个事件对象,用于控制线程的终止
event = Event()

# 定义一个函数,在线程中不断修改传入的列表
def modify_variable(var):
    while True:
        # 循环修改列表中的每个元素,每次将其加1
        for i in range(len(var)):
            var[i] += 1
        # 检查事件是否已被触发,如果是则跳出循环
        if event.is_set():
            break
        # 暂停0.5秒,避免修改速度过快
        sleep(0.5)
    print('停止打印')  # 当事件触发时,线程会打印这句话

# 定义一个列表变量
my_var = [123]
# 创建并启动一个线程,目标函数是 modify_variable,传入 my_var 列表作为参数
t = Thread(target=modify_variable, args=(my_var, ))
t.start()

# 主线程中不断打印列表的当前状态
while True:
    try:
        print(my_var)
        # 暂停1秒
        sleep(1)
    except KeyboardInterrupt:
        # 捕捉键盘中断 (Ctrl+C),触发事件来通知子线程停止
        event.set()
        break

# 等待子线程结束
t.join()
# 最后打印列表的最终状态
print(my_var)

上面的示例我们通过传递一个参数 my_var (一个数字列表)来启动一个新线程。线程将以一定的延迟增加数字的值,同时我们使用事件来结束线程。

在这个示例中,重要的代码部分是第 print(my_var) 行。该打印语句位于主线程中,但是它可以访问在子线程中生成的信息。这种行为是可能的,由于不同线程之间的内存共享。虽然能够访问相同的内存空间是一件很方便的事情,但也可能带来一些风险。

让我们稍微修改在线程中运行的代码。让我们移除 sleep :

def modify_variable(var):
    while True:
        for i in range(len(var)):
            var[i] += 1
        if event.is_set():
            break
        # sleep(.5)
    print('停止打印')

现在,当我们运行代码时,会在一次迭代和下一次迭代之间没有休眠。让我们运行它一小段时间,比如说 5 秒,我们可以这样做:

from time import time
[...]

my_var = [123]
t = Thread(target=modify_variable, args=(my_var, ))
t.start()
t0 = time()
while time()-t0 < 5:
    print(my_var)
    sleep(1)
event.set()
t.join()
print(my_var)

上面省略了重复的代码部分。如果你运行这段代码,将会得到非常大的输出结果。在我的情况下,我得到了:

[656346165634626563463]

然而,有一个非常重要的特性需要注意。这三个数字是连续的。这是预期的,因为起始变量是 [1, 2, 3] ,我们对每个变量都加了一。让我们再次启动第二个线程,看看输出是什么:

my_var = [123]
t = Thread(target=modify_variable, args=(my_var, ))
t2 = Thread(target=modify_variable, args=(my_var, ))
t.start()
t2.start()
t0 = time()
while time()-t0 < 5:
    try:
        print(my_var)
        sleep(1)
    except KeyboardInterrupt:
        event.set()
        break
event.set()
t.join()
t2.join()
print(my_var)

我已经得到如下数值作为输出:

[573844756869715684220]

你可以首先注意到这两个点:

第一点是运行两个线程得出的结果比运行一个线程小。

第二点是输出的结果值并不是连续的。

这是在使用多个线程处理 Python 时可能出现的非常重要的行为。在上一个教程中,我们讨论了线程是由操作系统处理的,操作系统决定何时启动或关闭线程。我们无法控制操作系统的决定。在上面的例子中,由于循环中没有 sleep ,操作系统将不得不决定何时停止一个线程并启动另一个线程。然而,这并不能完全解释我们得到的输出。不管一个线程先运行还是停止,等等,我们总是对每个元素添加 +1 。

上述代码的问题在于第 var[i] += 1 行,实际上涉及两个操作。首先,它从 var[i] 复制值并添加 1 。然后将值存回 var[i] 。在这两个操作之间,操作系统可能决定从一个任务切换到另一个任务。在这种情况下,两个任务在列表中看到的值是相同的,因此我们只添加 +1 一次,而不是两次。如果你想让它更加明显,你可以启动两个线程,一个向列表添加,另一个从列表减去,这样可以快速提示哪个线程运行得更快。在我的案例中,我得到了以下输出:

[-8832-1686062567]

但如果我再次运行它,我会得到:

[97998133432186591]

注意

你可能会注意到两个线程的 start 之间存在延迟,这会给第一个启动的线程带来一定的优势。

如何同步数据访问

为了解决我们在上面示例中找到的问题,我们必须确保没有两个线程同时尝试写入相同的变量。为此,我们可以使用一个 Lock :

from threading import Lock
[...]
data_lock = Lock()
def modify_variable(var):
    while True:
        with data_lock:
            for i in range(len(var)):
                var[i] += 1
        if event.is_set():
            break
        # sleep(.5)
    print('停止打印')

请注意,我们在函数中添加了一行 with data_lock: 。如果再次运行代码,你将看到我们获得的值总是连续的。锁确保只有一个线程会同时访问变量。

下面我们看第二种数据共享的方式,队列。

队列

线程常用的情况之一是当你有一些无法优化的慢任务时。例如,假设你正在从网站下载数据。大部分时间处理器都会处于空闲状态。这意味着你可以利用这段时间做其他事情。如果你想要下载整个网站(也称为抓取),那么同时下载多个页面将是一个很好的解决方案。想象一下,你有一个要下载的页面列表,然后启动多个线程,每个线程下载一个页面。如果你在实现这个过程时不小心,可能会出现重复下载的问题,就像我们在上一节中看到的那样。

这是在使用线程时另一个对象非常有用的地方:队列。队列是一个按顺序接受数据的对象,即你一次向其中放入一个元素的数据。然后,数据可以按照相同的顺序被消费,称为先进先出(FIFO)。一个非常简单的例子是:

from queue import Queue

queue = Queue()
for i in range(20):
    queue.put(i)

while not queue.empty():
    data = queue.get()
    print(data)

在这个例子中,你会看到我们创建了一个 Queue ,然后我们将数字从 0 到 19 放入队列中。稍后,我们创建了一个 while 循环,它从队列中取出数据并将其打印出来。这是 Python 中队列的基本行为。你应该注意到打印的数字顺序与它们添加到队列中的顺序相同。

回到文章开头的例子,我们可以使用队列来在线程之间共享信息。我们可以修改函数,使其不再接受列表作为参数,而是接受一个队列,从中读取元素。然后,它将把结果输出到一个输出队列:

from threading import Thread, Event
from queue import Queue
from time import sleep, time

event = Event()

def modify_variable(queue_in, queue_out):
    while True:
        if not queue_in.empty():
            var = queue_in.get()
            for i in range(len(var)):
                var[i] += 1
            queue_out.put(var)
        if event.is_set():
            break
    print('停止打印')

要使用上面的代码,我们需要创建两个队列。思路是我们也可以创建两个线程,其中输入和输出队列被颠倒。在这种情况下,一个线程将其输出放在第二个线程的队列上,反之亦然。这会看起来像这样:

my_var = [123]
queue1 = Queue()
queue2 = Queue()
queue1.put(my_var)
t = Thread(target=modify_variable, args=(queue1, queue2))
t2 = Thread(target=modify_variable, args=(queue2, queue1))
t.start()
t2.start()
t0 = time()
while time()-t0 < 5:
    try:
        sleep(1)
    except KeyboardInterrupt:
        event.set()
        break
event.set()
t.join()
t2.join()
if not queue1.empty():
    print(queue1.get())
if not queue2.empty():
    print(queue2.get())

在我的情况下,我得到的输出是:

[871872873]

迄今为止,这比我们看到的任何东西都小得多,但至少我们成功地在两个不同的线程之间共享了数据,而没有任何冲突。这种慢速的来源是什么?让我们尝试以科学的方法来解决问题并且逐个研究每个部分。其中最有趣的事情之一是,在尝试运行代码的其余部分之前,我们要检查队列是否为空。我们可以监控程序的重要部分实际运行所花费的时间:

def modify_variable(queue_in: Queue, queue_out: Queue):
    internal_t = 0
    while True:
        if not queue_in.empty():
            t0 = time()
            var = queue_in.get()
            for i in range(len(var)):
                var[i] += 1
            queue_out.put(var)
            internal_t += time()-t0
        if event.is_set():
            break
    sleep(0.1)
    print(f'运行时间: {internal_t} 秒n')

唯一的变化是在函数中增加了一个新变量,称为 internal_t 。然后,我们监视计算的时间并将其放入新线程。如果我们再次运行代码,你应该得到类似以下的输出:

运行时间: 0.0006377696990966797 秒
运行时间: 0.0003573894500732422 秒

这意味着在我们程序运行的 5 秒钟中,只有大约 0.9 毫秒我们实际在做某事。这仅占 0.01%的时间!让我们快速看看如果我们改变代码,只使用一个队列而不是两个会发生什么,即输入和输出队列将是相同的:

t = Thread(target=modify_variable, args=(queue1, queue1))
t2 = Thread(target=modify_variable, args=(queue1, queue1))

仅通过这个改变,我得到了如下输出:

运行时间: 4.290639877319336 秒
运行时间: 4.355865955352783 秒

这好多了!在程序运行的约 5 秒钟内,线程总共运行了 8 秒。这正是并行化的预期效果。此外,循环的输出要大得多。

[710779710780710781]

我们思考一下,如果我们使用两个队列,为什么程序运行得这么慢?但如果我们将输出和输入使用同一个队列,程序会运行得相当快呢?

当像我们在之前的例子中盲目地使用线程时,我们把一切都交给了操作系统。我们无法控制操作系统决定从一个任务切换到另一个任务。在上面的代码中,我们检查队列是否为空。很可能操作系统决定优先处理一个基本上什么都不做,只是等待队列中有元素的任务。如果这种情况发生未同步,大部分时间程序将一直等待队列中有元素(始终优先处理错误的任务)。但当我们将相同的任务用于输入和输出时,无论运行哪个任务都没关系,总会有东西可以进行。

如果你想要查看先前的猜测是否成立,我们可以对其进行测量。我们只有一个 if 语句来检查 queue.empty() ,我们可以添加一个 else 来累积程序实际上没有做任何事情的时间:

def modify_variable(queue_in: Queue, queue_out: Queue):
    internal_t = 0
    sleeping_t = 0
    while True:
        if not queue_in.empty():
            t0 = time()
            var = queue_in.get()
            for i in range(len(var)):
                var[i] += 1
            queue_out.put(var)
            internal_t += time()-t0
        else:
            t0 = time()
            sleep(0.001)
            sleeping_t += time()-t0
        if event.is_set():
            break
    sleep(0.1)
    print(f'运行时间: {internal_t} 秒')
    print(f'睡眠时间: {sleeping_t} 秒')

在上面的代码中,如果队列是空的,程序将会休眠 1 毫秒。当然,这并不是最好的做法,但我们可以假设 1 毫秒对程序整体性能没有实质影响。当我运行上面的程序时,使用两个不同的队列,我得到以下输出:

运行时间: 0.0 秒
睡眠时间: 5.001126289367676 秒
运行时间: 0.00018215179443359375 秒
睡眠时间: 5.001835107803345 秒
[4126, 4127, 4128]

很明显,大部分时间程序只是在等待,直到队列中有更多数据可用。因为每次没有数据可用时我们都会休眠 1 毫秒,实际上会使程序变得更慢。但我认为这是一个很好的例子。我们可以将其与在输入和输出上使用相同队列进行比较。

运行时间: 3.1206254959106445 秒
睡眠时间: 1.3756272792816162 秒
运行时间: 3.253162145614624 秒
睡眠时间: 1.136244535446167 秒

现在你看到,即使因为睡眠而浪费了一些时间,我们大部分时间实际上都在执行计算。

当使用相同的队列进行输入和输出时,唯一需要注意的是,在检查队列是否为空和实际从中读取之间,可能会发生另一个线程抓取结果的情况。这在队列文档中有描述。除非我们自己包含一个 Lock ,队列可以被任何线程读取和写入。锁只对 get 或 put 命令生效。

队列的额外选项

队列有一些额外选项,比如它们可以容纳的元素的最大数量。你还可以定义后进先出(LIFO)类型的队列,你可以在文档中了解相关信息。我发现 Queues 更有用的是它们是用纯 Python 编写的。如果你访问它们的源代码,你可以学到很多关于线程同步、自定义异常和文档编制的知识。

重要的是要注意的是,当你使用多个线程时,有时你希望等待(即阻止执行),有时则不希望。在上面的示例中,我们一直在读取队列之前检查队列是否为空。但是如果我们不检查呢? get 方法有两个选项: block 和 timeout 。第一个用于确定我们是否希望程序等待直到有元素可用。第二个是指定我们希望等待的秒数。在那段时间后,会引发异常。如果我们将 block 设置为 false,并且队列为空,就会立即抛出异常。

我们可以修改函数 modify_variable 以利用这一点:

def modify_variable(queue_in: Queue, queue_out: Queue):
    internal_t = 0
    while True:
        t0 = time()
        var = queue_in.get()
        for i in range(len(var)):
            var[i] += 1
        queue_out.put(var)
        internal_t += time()-t0
        if event.is_set():
            break
    sleep(0.1)
    print(f'运行时间: {internal_t} 秒n')

使用此代码,为输入和输出使用不同队列,我得到以下结果:

运行时间: 4.914130210876465 秒
运行时间: 4.937211513519287 秒

[179992, 179993, 179994]

这比我们之前得到的要好得多。但这种计时方式并不是真正公平,很多时间都是在 get 函数中等待,但我们仍然把那些时间计算在内。如果我们把 t0 = time() 的代码移到 get 的下面一行,那么实际运行代码的时间就会有很大的不同:

运行时间: 0.7706246376037598 秒
运行时间: 0.763786792755127 秒

[177807, 177808, 177809]

所以现在你看到了,也许我们在之前的示例中应该以不同的方式计算时间,特别是当我们在使用相同的队列进行输入和输出时。

如果我们不想让程序在等待获取时被阻塞,可以这样做:

from queue import Empty
[...]

    try:
        var = queue_in.get(block=False)
    except Empty:
        continue

或者,我们可以像这样指定超时时间:

try:
    var = queue_in.get(block=True, timeout=0.001)
except Empty:
    continue

在这种情况下,我们要么不等待( block==False ),捕获异常,要么等待最多 1 毫秒( timeout=0.001 ),然后捕获异常。你可以尝试调整这些选项,看看你的代码性能是否得到优化。

停止线程的队列

到目前为止,我们一直使用锁来停止线程,然而,还有另一种可能性,即通过向队列附加特殊信息来控制线程的流动。一个非常简单的例子是向队列添加一个元素 None ,当函数获取到它时,便停止执行。代码将如下所示:

[...]

var = queue_in.get()
if var is None:
    break

然后,在脚本的主要部分,当我们想要停止线程时,我们执行以下操作:

queue1.put(None)
queue2.put(None)

如果你想知道为什么要选择其中一种选项,答案实际上非常简单。我们正在处理的示例,队列中始终最多只有一个元素。当我们停止程序时,我们知道队列中的所有内容都已被处理。然而,请想象一下,如果程序正在处理一组彼此之间没有关联的元素。例如,从网站下载数据或处理图像等情况下会出现这种情况。你希望确保在停止线程之前完成所有处理。在这种情况下,向队列添加一个特殊值可以保证所有元素都将被处理。

结论

访问共享内存可以使程序开发变得非常快速,但当不同线程对相同元素进行读写时,可能会出现问题,在这篇文章中,我们已经讨论了如何解决这种线程安全问题,即如何在线程之间安全地共享数据,一个是线程之间共享内存,另一个是使用队列。

写作不易,欢迎关注

评论已关闭。