深入大型数据集:并行与分布化Python代码
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

2.2 并行处理

很好,现在我们有了一种利用map函数获取所有互联网数据的方法。但是,每次使用map函数离线获取一页数据非常缓慢。如果抓取一个网页需要1秒的时间,而我们需要抓取3652个网页,那么下载所有的数据需要一个多小时(3652秒,约为61分钟)。这并不是一个令人无法接受的等待时间;但是这已经足够漫长,以致我们想要尽量避免如此长的下载时间。而且我们可以做到这一点。

怎样才能避免这种等待呢?如果我们不是一次只下载一个页面,而是一次下载多个页面呢?通过并行编程,我们可以做到这一点。

并行编程指的是这样一种编程方式:将问题分成多个块,然后同时分别处理这些块。通常,我们希望对这些块的处理是相同的。例如,在示例中,我们希望能处理每个URL(一个单独的数据,与任何其他URL无关),并且获取该URL的网站内容(一个通用处理)。

图2.6显示了使用标准线性处理和并行处理下载URL的区别。

图2.6 每次读取一个网页很慢,可以通过并行编程来加速这个过程。

使用线性处理,我们可以一次处理一个URL并将它们转换成Web页面。先处理一个URL,获取数据,然后再处理下一个URL。并行编程允许我们将这个任务切分开来,并且更快地处理它。当我们编写并行代码时,我们为任务分配了许多“工人”(worker)(通常是CPU)。随后,每一个工人都会获取其中一块数据并对它进行处理。

在图2.6中,数据是相同的,数据转换也是相同的。唯一的变化是一次执行的任务数量。以前我们一次只执行一个任务,现在我们一次执行4个任务。这将使我们的工作速度加快为原来的4倍。

2.2.1 处理器和处理过程

如果一次执行4个任务更好,那么为什么不同时执行8个任务呢?为什么不是10个任务,又为什么不是1000个任务呢?这个问题问得很好。大多数人在计算机上工作的时候都不会想到,甚至大多数程序员都不会想到的是,计算机硬件对计算机行为的影响。例如,大多数人都知道自己有一台Mac或PC;但是,除非计算机上有英特尔(Intel)的标签,否则大多数人可能都说不出他们计算机的处理器属于哪一类。

然而,在并行编程中,这些处理器是我们的英雄。处理器是能够执行指令的小型电路板;也就是说,它是实际工作的地方。通常,我们认为计算机的内存是限制自己能做的事情的主要因素(这当然也对),但是我们的CPU也同样重要。在一个弱处理器下拥有大量的内存,就像在自助餐台前排队时只有一个盘子一样:当然,餐台上有很多食物,但是一个盘子却装不下更多的美食。如果我们的CPU是多核的,这就像额外的盘子:每次我们去取自助餐时,就能带回更多的食物。

CPU就像盘子一样,越多越好。我们拥有的CPU越多,就能分配越多的任务,完成越多的工作。你可以通过在Python REPL中运行以下Python命令,检查你的机器上有多少个CPU:

或者,你也可以在终端中运行以下命令:

这两个命令会执行相同的操作。第一个命令的第一行从Python标准库中导入os模块,第二行检查有多少个CPU。即使你不熟悉如何使用Python的os模块,也可以通过它提供的许多工具来与操作系统进行交互。根据你使用的操作系统不同,其中一些函数的具体细节会略有不同。在大量使用这个模块之前,你有必要熟悉一下它的细节。

这些命令非常有用,因为它们告诉你从标准并行编程实现中可以获得多大程度的速度提升。在Python中并行实现代码时,默认情况下Python将使用所有的CPU。如果不希望这样,就必须注明,我们需要它使用更少的CPU。

但是这些内容有点超前了。在Python中实现并行代码是什么样的呢?让我们回到从URL下载内容的例子。我们希望并行地抓取Web页面,那么需要修改多少代码呢?清单2.5可以为你提供一些思路(下面“*********.com”所代表的具体网址可通过http://www.broadview.com.cn/40368进行下载)。

清单2.5 并行地进行Web抓取

正如我们在清单2.5中看到的,代码根本无须做太多改动。因为我们一开始就使用map来组织代码,所以让代码变成并行处理,只需要两行代码,以及在第3行中添加两个字符。如果你的机器上有4个CPU,那么这个程序的运行速度应该是非并行版本的4倍。这将把我们假设的1小时的运行时间缩短到15分钟左右。

这很简单,而且也应该很简单。这种类型的任务经常被轻蔑地称为“尴尬的并行任务”。换句话说,使这些任务提速的方法非常简单。也就是说,在进行并行编程时可能会出现一些问题。

在Python中进行并行化编程时,我们可能会遇到以下一些问题:

■ 无法序列化数据或函数,导致程序无法运行。

■ 对顺序敏感的操作会返回不一致的结果。

■ 存在状态依赖的操作会返回不一致的结果。

2.2.2 并行化和序列化

当我们并行编写代码时——例如,当调用以前的并行map函数时,Python会在背后完成很多工作。当并行化不起作用时,通常是因为我们没有充分考虑Python隐藏在背后的这些工作。其中一件Python对我们隐藏的事情就是序列化(pickling)和反序列化(unpickling)。

pickling是对象序列化(也称为serialization或者marshalling)的Python版本,而对象序列化会将代码中的对象以有效的二进制格式存储在磁盘上,稍后程序可以将其读取回来。术语pickling来自Python的pickle模块,该模块提供用于序列化数据和读取已序列化数据的函数。

序列化和反序列化的过程如图2.7所示。

图2.7 序列化允许我们以机器可读的状态来保存数据和指令,以便Python程序以后可以重用它们。

在图2.7的左边,我们从最初的编程环境和最初的代码对象开始这个过程。这一点上没有什么特别的地方,我们只是像往常一样用Python编程。接下来,我们对代码对象进行序列化,于是代码对象被保存在磁盘上的二进制文件中。之后,我们从新的编程环境中读取序列化文件,在新的环境中还可以访问原来的代码对象。我们在第一个环境中序列化的所有内容现在都可以在新的环境中访问,就像之前一样。

注意 我们的代码可以在序列化格式中保存很长时间。在并行编程中,通常我们会快速地将文件读入Python环境,但是没有理由不能将序列化对象长期保存在磁盘上。不过,长期保存序列化的数据不是一个好主意;因为如果升级了Python版本,数据可能会变得不可读。

为什么我们在并行编程中要使用序列化?还记得我说过并行编程允许程序同时做许多件事吗?Python序列化对象(函数和数据)是为了将工作传递给处理任务的每个处理器。该过程如图2.8所示。

图2.8 序列化允许我们跨处理器甚至跨机器共享数据,保存指令和数据,然后在其他地方执行它们。

一开始,我们的代码只在一个处理器上运行;这是标准的编码方式。为了并行地处理代码,Python会将我们的问题分成几个部分,每个部分可以由一个单独的处理单元来处理。主进程工作流随后对这些部分进行序列化,而且这种序列化可以确保处理器知道如何执行所需的工作。当处理单元准备好执行工作时,会从磁盘中读取序列化文件并执行工作。最后,工人进程会序列化结果并将其返回给主进程。

大多数时候,这种方法都能够完美地工作;但是,你只能序列化某些类型的Python对象。如果我们试图在不能序列化的对象上使用并行方法,Python将抛出一个错误。幸运的是,大多数标准的Python对象都是可序列化的,因此可以在并行的Python代码中使用。Python可以自然地序列化以下类型:

■ None、True和False

■ 整数、浮点数和复数

■ 字符串、字节和字节数组

■ 只包含可序列化对象的元组、列表、集合和字典

■ 在一个模块的顶层定义的函数

■ 在一个模块的顶层定义的内置函数

■ 在一个模块的顶层定义的类

我们不能序列化以下类型的对象:

■ lambda函数

■ 嵌套函数

■ 嵌套类

避免类型不可序列化问题最简单的方法,是在使用Python内置的多进程模块时避免使用它们。对于必须使用它们的情况,可以使用社区提供的一个名为pathos的库的dill模块。这个模块采用了一种不同的序列化方法,它允许我们序列化任何自己想要的东西,包括我们之前不能序列化的3种对象类型。

使用pathos和dill与使用多进程模块没有太大的不同。我们要做的第一件事是安装这个库。你可以在命令行中运行

除了安装pathos库,Python还将安装pathos依赖的一些库,包括dill。安装了pathos之后,我们现在可以调用它,它会在背后调用dill。回忆一下我们的多进程例子,它看起来是这样的:

要把这段代码转换成pathos,我们只需要做一些改变。我们的新代码如下:

从多进程处理库转到pathos库只需要改动两个地方。首先,我们必须导入pathos而不是multiprocessing库。另外,在pathos中,需要使用的进程池被称为ProcessPool,而不是Pool。与Pool一样,ProcessPool是用来创建工人处理器单元的函数。我们需要用ProcessPool来代替Pool。与Pool一样,如果我们希望使用的节点数少于最大节点数,则只需指定节点的数量。我们在这里指定只是出于演示的目的。我们可以将ProcessPool重新命名为P,然后就可以像调用multiprocessing.Pool对象那样调用它:P.map。

2.2.3 顺序和并行化

当我们在进行并行处理时,另一个可能引起问题的因素是顺序敏感性。在并行工作时,我们不能保证任务将按照输入的顺序完成。这意味着对于某些需要按照某个线性顺序处理的工作,可能不应该使用并行处理。

如果想要自己测试一下,试试在Python中运行这个命令:

如果使用for循环来执行这个操作,我们希望将0到99之间的每个数字的有序列表打印到屏幕上。但是,在构造map函数时,我们无法做到这一点。使用map函数之后,我们在屏幕上打印出的是一些大致有序,但不是严格顺序排列的数字,以及一个全是“None”的列表。发生了什么事?

当Python并行化代码时,它会将问题分解成多个块,以便处理单元能够处理。对于处理单元来说,每当它们有处理能力时,它们就会抓取第一个可用的块。然后处理这个问题,直到它们完成该处理过程,之后再抓取下一个可用的块。当可用块是无序的时候,它们也会无序地完成该处理过程。我们可以把这个过程可视化,如图2.9所示。

在图2.9中,我们的问题开始于顶部。我们把这个问题分成10个部分,然后把它们排列成一个队列。当处理器空闲时,它们将从队列中拉取一个任务,运行它,并将结果发送到底部的已完成任务区域。之后处理器获取下一个可用的任务并处理它们,直到所有的任务都完成。但是完成这些任务所需的时间各不相同。例如,在已完成的任务区域中,我们可以看到任务1、任务2、任务3和任务5都已经完成,而任务4、任务6、任务7和任务8正在工作。任务9和任务10仍然在排队,没有被分配给处理器。如果任务1(或任务2、任务3)和任务5的执行时间很短,而任务4的执行时间很长,那么就很容易出现这种情况——在执行单个任务4的时间内可以完成另外两个任务。

图2.9 并行处理不一定按照顺序完成任务,因此在我们使用并行技术之前,必须知道这是否可被接受。

尽管Python可能没有按照顺序处理完这些问题,但是它仍然记得应该遵循的顺序。实际上,map函数会以我们期望的准确顺序返回(即使它没有按照这样的顺序处理)。为了演示这一点,我们可以运行以下代码:

打印输出没有顺序,但是返回的列表是有序的。打印输出显示了块的工作顺序,而列表输出显示了返回的数据结构。我们可以看到,即使Python以“错误”的顺序来处理问题,它仍然返回正确的排序结果。那么,什么时候这会给我们带来麻烦呢?答案是,存在状态依赖的时候。

2.2.4 状态和并行化

在面向对象编程中,我们会经常编写一些依赖于类的状态的方法。以fizz/buzz问题为例,这个问题经常被用来介绍编程语言的语法。它涉及对数字的循环。如果数字不能被3(或者5,抑或其他数字)整除,则返回fizz;如果数字能被3整除,则返回buzz。期望的输出是一个间隔正确的含有多个fizz和bazz的序列。

在Python中,我们可以用一个类来解决fizz/buzz问题,如清单2.6所示。

清单2.6 用for循环来解决典型的fizz/buzz问题

这个类会关心我们调用foo方法的次数,每到第3次时它都会打印并返回buzz而不是fizz。我们通过循环来演示它如何正常工作。如果你在本地机器上运行它,就将看到它的运行过程与预期结果一样:我们打印出几个fizz,每到第3个的时候插入一个buzz。然而,当我们试图使用一个并行的map函数来实现同样的效果时,奇怪的事情发生了:

这是怎么回事?为什么只有fizz而没有buzz?让我们回到之前讨论map函数时提到的内容。还记得当时是怎么说的吗?map函数实际上并不进行计算,它只是存储计算的指令。这就是为什么我们称它是惰性的。在这个例子中,计算FB.foo的指令包含了调用FB时的状态。因为FB.n在我们请求指令时的值为0,所以map函数会对所有操作使用FB.n=0(即使在使用时FB.n发生了变化也是如此)。因为FB.n=0总会返回fizz,所以在得到的结果中只有fizz。

我们可以通过将FB.n改为2来测试这一点,这样应该总会返回一个buzz,然后运行相同的命令。代码大概是这样的:

这里,正如我们所期望的那样,存储了当FB.n=2时FB.foo的指令,并且结果中只有buzz而没有fizz。

我们能做些什么呢?通常,这样的情况只需要我们重新思考这个问题。一种常见的解决方案是将内部状态设置为外部变量。例如,我们可以使用range函数生成的数字,而不是FB存储的内部值。然后,我们也可以用一个简单的函数来替换这个类,就像这样:

这个函数与foo方法的功能完全相同,但它依赖于外部变量n的值,而不是内部状态self.n。然后,我们可以通过一个并行map函数,将其应用到range函数生成的数字上,从而得到我们所期望的结果。

当你运行该命令时,会注意到打印的值并没有按照正确的顺序显示。这是因为,正如我们在前面所指出的,处理器正在从堆栈中获取第一个可用的任务,并且以最快的速度完成该任务。有时,fizz任务的执行速度会比buzz任务的执行速度慢,所以一行会连续打印两个buzz。而其他时候,打印buzz会需要更长的时间,于是我们会看到一行会出现3个或者更多的fizz。不过,结果数据的顺序是正确的:fizz,fizz,buzz……fizz,fizz,buzz……fizz,fizz,buzz。

我们有必要花点时间从可视化的角度来了解它。下面先看看在并行化依赖状态时会发生什么,然后再看看不依赖状态的并行化会发生什么。首先,让我们回顾一下图2.8(该图与图2.10是一样的)。

图2.10说明了在执行并行计算时所发生的情况:首先我们将任务分割成多个块(在本例中为4块),然后将这些块以序列化格式保存到磁盘中。之后我们的处理器会获取并运行它们,直到它们全部完成。关于状态,我们最需要注意的是第二步——序列化数据。

图2.10 当我们需要序列化对象,并通过一个并行map函数来分发它时,也会序列化状态信息。这允许我们并行地执行工作,但是可能会产生意想不到的结果。

在第一步中,map函数提供了问题每个部分的指令。这一步类似于我们的并行化步骤,将问题分成多块。请记住,map函数不会立即完成计算工作,它只是编写指令并在稍后执行(因为它是惰性的)。在第二步中,我们需要将指令保存到一个磁盘中。因为map函数已经知道了这些指令,所以这一点很容易做到。但是,请注意,我们正在保存处理这个问题的指令,这意味着我们会将所需的任何状态(比如存储FB.n变量时的状态)都存储下来。然后,在最后一步,我们的处理器会读取这些指令并执行它们。