使用 Cython 编写并行代码

注意

此页面使用两种不同的语法变体

  • Cython 特定的 cdef 语法,旨在使类型声明简洁且易于从 C/C++ 的角度阅读。

  • 纯 Python 语法,允许在 纯 Python 代码 中进行静态 Cython 类型声明,遵循 PEP-484 类型提示和 PEP 526 变量注释。

    要在 Python 语法中使用 C 数据类型,您需要在要编译的 Python 模块中导入特殊的 cython 模块,例如

    import cython
    

    如果您使用纯 Python 语法,我们强烈建议您使用最新的 Cython 3 版本,因为与 0.29.x 版本相比,这里已经进行了重大改进。

加速 Cython 代码的一种方法是并行化:您编写可以在 CPU 的多个核心上同时运行的代码。对于适合并行化的代码,这可以产生相当大的加速效果,等于 CPU 的核心数量(例如,在 4 核 CPU 上加速 4 倍)。

本教程假设您已经熟悉 Cython 的 “类型化内存视图”(因为使用内存视图的代码通常是易于使用 Cython 并行化的代码),并且您也对编写并行代码的常见陷阱有所了解(它旨在成为 Cython 教程,而不是并行编程的完整介绍)。

在开始之前,请注意以下几点

  • 并非所有代码都可以并行化 - 对于某些代码,算法仅仅依赖于按顺序执行,您不应该尝试对其进行并行化。累加和就是一个很好的例子。

  • 并非所有代码都值得并行化。在启动并行部分时,会产生相当多的开销,因此您需要确保您正在处理足够的数据以使这种开销值得。此外,请确保您正在对数据进行实际操作!多个线程仅仅读取相同的数据往往不能很好地并行化。如有疑问,请计时。

  • Cython 要求并行块的内容为 nogil。如果您的算法需要访问 Python 对象,那么它可能不适合并行化。

  • Cython 的内置并行化使用 OpenMP 结构 omp parallel foromp parallel。这些非常适合并行化相对较小、自包含的代码块(尤其是循环)。但是,如果您想使用其他并行化模型,例如生成和等待任务,或者将一些“辅助工作”卸载到持续运行的辅助线程,那么您可能更适合使用其他方法(例如 Python 的 threading 模块)。

  • 实际实现您的并行 Cython 代码可能应该是您优化的最后一步。您应该首先从一些可用的串行代码开始。但是,值得尽早规划,因为它可能会影响您对算法的选择。

本教程不旨在探讨所有可用于自定义并行化的选项。有关详细信息,请参阅 主要并行化文档。您还应该知道,Cython 对如何并行化您的代码做出的许多选择是相当固定的,如果您想要 Cython 默认情况下不提供的特定 OpenMP 行为,那么您可能更适合用 C 语言自己编写。

编译

OpenMP 需要您的 C/C++ 编译器的支持。这种支持通常通过一个特殊的命令行参数启用:在 GCC 上是 -fopenmp,而在 MSVC 上是 /openmp。如果您的编译器不支持 OpenMP(或者您忘记传递参数),那么代码通常仍然可以编译,但不会并行运行。

以下 setup.py 文件可用于编译本教程中的示例

from setuptools import Extension, setup
from Cython.Build import cythonize
import sys

if sys.platform.startswith("win"):
    openmp_arg = '/openmp'
else:
    openmp_arg = '-fopenmp'


ext_modules = [
    Extension(
        "*",
        ["*.pyx"],
        extra_compile_args=[openmp_arg],
        extra_link_args=[openmp_arg],
    ),
    Extension(
        "*",
        ["*.pyx"],
        extra_compile_args=[openmp_arg],
        extra_link_args=[openmp_arg],
    )
]

setup(
    name='parallel-tutorial',
    ext_modules=cythonize(ext_modules),
)

逐元素并行操作

Cython 中最简单、最常见的并行操作是逐元素遍历数组,对每个数组元素执行相同的操作。在下面的简单示例中,我们计算数组中每个元素的 sin

from cython.parallel cimport prange
cimport cython
from libc.math cimport sin

import numpy as np

@cython.boundscheck(False)
@cython.wraparound(False)
def do_sine(double[:,:] input):
    cdef double[:,:] output = np.empty_like(input)
    cdef Py_ssize_t i, j

    for i in prange(input.shape[0], nogil=True):
        for j in range(input.shape[1]):
            output[i, j] = sin(input[i, j])
    return np.asarray(output)

我们对最外层循环进行并行化。这通常是一个好主意,因为进入和退出并行块会有一些开销。但是,您还应该考虑数组的可能大小。如果 input 通常的大小为 (2, 10000000),那么对长度为 2 的维度进行并行化可能是一个更糟糕的选择。

循环体本身是 nogil - 也就是说,您不能执行“Python”操作。这是一个相当强的限制,如果您发现需要使用 GIL,那么 Cython 的并行化功能可能不适合您。但是,可以在循环中抛出异常 - Cython 只需重新获得 GIL 并抛出异常,然后终止所有线程上的循环。

需要显式地将循环变量 i 键入为 C 整数。对于非并行循环,Cython 可以推断出这一点,但它目前不推断并行循环的循环变量,因此不键入 i 会导致编译错误,因为它将是一个 Python 对象,因此在没有 GIL 的情况下无法使用。

为了使有经验的 OpenMP 用户受益,下面显示了生成的 C 代码,为了可读性,这里对其进行了简化

#pragma omp parallel
{
    #pragma omp for firstprivate(i) lastprivate(i) lastprivate(j)
    for (__pyx_t_8 = 0; __pyx_t_8 < __pyx_t_9; __pyx_t_8++){
        i = __pyx_t_8;
        /* body goes here */
    }
}

私有变量

从上面生成的 C 代码中可以注意到一个有用的点 - 在循环中使用的变量,如 ij,被标记为 firstprivatelastprivate。在循环中,每个线程都有自己的数据副本,数据根据循环之前的其值进行初始化,并且在循环之后,“全局”副本被设置为等于最后一次迭代(即,就像循环是串行运行一样)。

Cython 应用的基本规则是

  • prange 块中的 C 标量变量被设置为 firstprivatelastprivate

  • 并行块 中分配的 C 标量变量是 private(这意味着它们不能用于在块中传递数据),

  • 数组变量(例如内存视图)不会被设置为私有。相反,Cython 假设您已经构建了循环,以便每次迭代都作用于不同的数据,

  • Python 对象也不会被设置为私有,尽管对它们的访问是通过 Python 的 GIL 控制的。

Cython 目前没有提供太多覆盖这些选择的机会。

归约

Cython 中第二常见的并行操作是“归约”操作。一个常见的例子是累积整个数组的总和,例如在下面计算向量范数时

from cython.parallel cimport prange
cimport cython
from libc.math cimport sqrt

@cython.boundscheck(False)
@cython.wraparound(False)
def l2norm(double[:] x):
    cdef double total = 0
    cdef Py_ssize_t i
    for i in prange(x.shape[0], nogil=True):
        total += x[i]*x[i]
    return sqrt(total)

Cython 能够推断 +=*=-=&=|=^= 的归约。这些只适用于 C 标量变量,因此您不能轻松地将 2D 内存视图归约为 1D 内存视图,例如。

生成的 C 代码大约是

#pragma omp parallel reduction(+:total)
{
    #pragma omp for firstprivate(i) lastprivate(i)
    for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_3; __pyx_t_2++){
        i = __pyx_t_2;
        total = total + /* some indexing code */;

    }
}

parallel

Cython 的 parallel 运算符比 prange 使用频率低得多。 parallel 生成一个代码块,该代码块会同时在多个线程上运行。但是,与 prange 不同,工作不会自动分配到各个线程。

这里我们介绍了 parallel 块的三个常见用途

将 prange 块串联在一起

进入和离开并行部分会产生一些开销。因此,如果您有多个并行部分,并且它们之间有很小的串行部分,那么编写一个大型并行块可能会更有效。任何小的串行部分都会被复制,但开销会减少。

在下面的示例中,我们对向量进行就地归一化。第一个并行循环计算范数,第二个并行循环将范数应用于向量,我们避免了在两者之间跳进跳出串行代码。

from cython.parallel cimport parallel, prange
cimport cython
from libc.math cimport sqrt

@cython.boundscheck(False)
@cython.wraparound(False)
def normalize(double[:] x):
    cdef Py_ssize_t i
    cdef double total = 0
    cdef double norm
    with nogil, parallel():
        for i in prange(x.shape[0]):
            total += x[i]*x[i]
        norm = sqrt(total)
        for i in prange(x.shape[0]):
            x[i] /= norm

C 代码大约是

#pragma omp parallel private(norm) reduction(+:total)
{
    /* some calculations of array size... */
    #pragma omp for firstprivate(i) lastprivate(i)
    for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_3; __pyx_t_2++){
        /* ... */
    }
    norm = sqrt(total);
    #pragma omp for firstprivate(i) lastprivate(i)
    for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_3; __pyx_t_2++){
        /* ... */
    }
}

为每个线程分配“临时空间”

假设每个线程需要一小块临时空间来工作。它们不能共享临时空间,因为这会导致数据竞争。在这种情况下,分配和释放是在并行部分完成的(因此在每个线程的基础上发生),围绕一个循环,然后使用临时空间。

我们这里的示例使用 C++ 查找二维数组中每列的中位数(只是 numpy.median(x, axis=0) 的并行版本)。我们必须重新排序每一列才能找到它的中位数,但不想修改输入数组。因此,我们为每个线程分配一个 C++ 向量作为临时空间,并在其中工作。为了提高效率,向量是在 prange 循环之外分配的。

# distutils: language = c++

from cython.parallel cimport parallel, prange
from libcpp.vector cimport vector
from libcpp.algorithm cimport nth_element
cimport cython
from cython.operator cimport dereference

import numpy as np

@cython.boundscheck(False)
@cython.wraparound(False)
def median_along_axis0(const double[:,:] x):
    cdef double[::1] out = np.empty(x.shape[1])
    cdef Py_ssize_t i, j

    cdef vector[double] *scratch
    cdef vector[double].iterator median_it
    with nogil, parallel():
        # allocate scratch space per loop
        scratch = new vector[double](x.shape[0])
        try:
            for i in prange(x.shape[1]):
                # copy row into scratch space
                for j in range(x.shape[0]):
                    dereference(scratch)[j] = x[j, i]
                median_it = scratch.begin() + scratch.size()//2
                nth_element(scratch.begin(), median_it, scratch.end())
                # for the sake of a simple example, don't handle even lengths...
                out[i] = dereference(median_it)
        finally:
            del scratch
    return np.asarray(out)

注意

纯语法和经典语法示例并不完全相同,因为纯 Python 语法不支持 C++ 的“new”,因此我们以略微不同的方式分配临时空间

在生成的代码中,scratch 变量在外部并行块中被标记为 private。一个粗略的概述是

#pragma omp parallel private(scratch)
{
    scratch = new std::vector<double> ((x.shape[0]))
    #pragma omp for firstprivate(i) lastprivate(i) lastprivate(j) lastprivate(median_it)
    for (__pyx_t_9 = 0; __pyx_t_9 < __pyx_t_10; __pyx_t_9++){
        i = __pyx_t_9;
        /* implementation goes here */
    }
    /* some exception handling detail omitted */
    delete scratch;
}

在每个线程上执行不同的任务

最后,如果您手动指定线程数量,然后使用 omp.get_thread_num() 识别每个线程,您就可以手动将工作分配到各个线程。这在 Cython 中是一个相当罕见的用例,并且可能表明 threading 模块更适合您要执行的操作。但是,这是一个选项。

from cython.parallel cimport parallel
from openmp cimport omp_get_thread_num




cdef void long_running_task1() noexcept nogil:
    pass



cdef void long_running_task2() noexcept nogil:
    pass

def do_two_tasks():
    cdef int thread_num
    with nogil, parallel(num_threads=2):
        thread_num = omp_get_thread_num()
        if thread_num == 0:
            long_running_task1()
        elif thread_num == 1:
            long_running_task2()

这种代码块的实用性受到以下事实的限制:在代码块中分配的变量对每个线程都是 private,因此无法在后面的串行部分访问。

上面示例的生成的 C 代码非常简单

#pragma omp parallel private(thread_num)
{
    thread_num = omp_get_thread_num();
    switch (thread_num) {
        /* ... */
    }
}