使用 Cython 编写并行代码¶
注意
此页面使用两种不同的语法变体
Cython 特定的
cdef
语法,旨在使类型声明简洁且易于从 C/C++ 的角度阅读。纯 Python 语法,允许在 纯 Python 代码 中进行静态 Cython 类型声明,遵循 PEP-484 类型提示和 PEP 526 变量注释。
要在 Python 语法中使用 C 数据类型,您需要在要编译的 Python 模块中导入特殊的
cython
模块,例如import cython
如果您使用纯 Python 语法,我们强烈建议您使用最新的 Cython 3 版本,因为与 0.29.x 版本相比,这里已经进行了重大改进。
加速 Cython 代码的一种方法是并行化:您编写可以在 CPU 的多个核心上同时运行的代码。对于适合并行化的代码,这可以产生相当大的加速效果,等于 CPU 的核心数量(例如,在 4 核 CPU 上加速 4 倍)。
本教程假设您已经熟悉 Cython 的 “类型化内存视图”(因为使用内存视图的代码通常是易于使用 Cython 并行化的代码),并且您也对编写并行代码的常见陷阱有所了解(它旨在成为 Cython 教程,而不是并行编程的完整介绍)。
在开始之前,请注意以下几点
并非所有代码都可以并行化 - 对于某些代码,算法仅仅依赖于按顺序执行,您不应该尝试对其进行并行化。累加和就是一个很好的例子。
并非所有代码都值得并行化。在启动并行部分时,会产生相当多的开销,因此您需要确保您正在处理足够的数据以使这种开销值得。此外,请确保您正在对数据进行实际操作!多个线程仅仅读取相同的数据往往不能很好地并行化。如有疑问,请计时。
Cython 要求并行块的内容为
nogil
。如果您的算法需要访问 Python 对象,那么它可能不适合并行化。Cython 的内置并行化使用 OpenMP 结构
omp parallel for
和omp parallel
。这些非常适合并行化相对较小、自包含的代码块(尤其是循环)。但是,如果您想使用其他并行化模型,例如生成和等待任务,或者将一些“辅助工作”卸载到持续运行的辅助线程,那么您可能更适合使用其他方法(例如 Python 的threading
模块)。实际实现您的并行 Cython 代码可能应该是您优化的最后一步。您应该首先从一些可用的串行代码开始。但是,值得尽早规划,因为它可能会影响您对算法的选择。
本教程不旨在探讨所有可用于自定义并行化的选项。有关详细信息,请参阅 主要并行化文档。您还应该知道,Cython 对如何并行化您的代码做出的许多选择是相当固定的,如果您想要 Cython 默认情况下不提供的特定 OpenMP 行为,那么您可能更适合用 C 语言自己编写。
编译¶
OpenMP 需要您的 C/C++ 编译器的支持。这种支持通常通过一个特殊的命令行参数启用:在 GCC 上是 -fopenmp
,而在 MSVC 上是 /openmp
。如果您的编译器不支持 OpenMP(或者您忘记传递参数),那么代码通常仍然可以编译,但不会并行运行。
以下 setup.py
文件可用于编译本教程中的示例
from setuptools import Extension, setup
from Cython.Build import cythonize
import sys
if sys.platform.startswith("win"):
openmp_arg = '/openmp'
else:
openmp_arg = '-fopenmp'
ext_modules = [
Extension(
"*",
["*.pyx"],
extra_compile_args=[openmp_arg],
extra_link_args=[openmp_arg],
),
Extension(
"*",
["*.pyx"],
extra_compile_args=[openmp_arg],
extra_link_args=[openmp_arg],
)
]
setup(
name='parallel-tutorial',
ext_modules=cythonize(ext_modules),
)
逐元素并行操作¶
Cython 中最简单、最常见的并行操作是逐元素遍历数组,对每个数组元素执行相同的操作。在下面的简单示例中,我们计算数组中每个元素的 sin
from cython.parallel cimport prange
cimport cython
from libc.math cimport sin
import numpy as np
@cython.boundscheck(False)
@cython.wraparound(False)
def do_sine(double[:,:] input):
cdef double[:,:] output = np.empty_like(input)
cdef Py_ssize_t i, j
for i in prange(input.shape[0], nogil=True):
for j in range(input.shape[1]):
output[i, j] = sin(input[i, j])
return np.asarray(output)
from cython.parallel import prange
import cython
from cython.cimports.libc.math import sin
import numpy as np
@cython.boundscheck(False)
@cython.wraparound(False)
def do_sine(input: cython.double[:,:]):
output : cython.double[:,:] = np.empty_like(input)
i : cython.Py_ssize_t
j : cython.Py_ssize_t
for i in prange(input.shape[0], nogil=True):
for j in range(input.shape[1]):
output[i, j] = sin(input[i, j])
return np.asarray(output)
我们对最外层循环进行并行化。这通常是一个好主意,因为进入和退出并行块会有一些开销。但是,您还应该考虑数组的可能大小。如果 input
通常的大小为 (2, 10000000)
,那么对长度为 2
的维度进行并行化可能是一个更糟糕的选择。
循环体本身是 nogil
- 也就是说,您不能执行“Python”操作。这是一个相当强的限制,如果您发现需要使用 GIL,那么 Cython 的并行化功能可能不适合您。但是,可以在循环中抛出异常 - Cython 只需重新获得 GIL 并抛出异常,然后终止所有线程上的循环。
需要显式地将循环变量 i
键入为 C 整数。对于非并行循环,Cython 可以推断出这一点,但它目前不推断并行循环的循环变量,因此不键入 i
会导致编译错误,因为它将是一个 Python 对象,因此在没有 GIL 的情况下无法使用。
为了使有经验的 OpenMP 用户受益,下面显示了生成的 C 代码,为了可读性,这里对其进行了简化
#pragma omp parallel
{
#pragma omp for firstprivate(i) lastprivate(i) lastprivate(j)
for (__pyx_t_8 = 0; __pyx_t_8 < __pyx_t_9; __pyx_t_8++){
i = __pyx_t_8;
/* body goes here */
}
}
私有变量¶
从上面生成的 C 代码中可以注意到一个有用的点 - 在循环中使用的变量,如 i
和 j
,被标记为 firstprivate
和 lastprivate
。在循环中,每个线程都有自己的数据副本,数据根据循环之前的其值进行初始化,并且在循环之后,“全局”副本被设置为等于最后一次迭代(即,就像循环是串行运行一样)。
Cython 应用的基本规则是
prange
块中的 C 标量变量被设置为firstprivate
和lastprivate
,在 并行块 中分配的 C 标量变量是
private
(这意味着它们不能用于在块中传递数据),数组变量(例如内存视图)不会被设置为私有。相反,Cython 假设您已经构建了循环,以便每次迭代都作用于不同的数据,
Python 对象也不会被设置为私有,尽管对它们的访问是通过 Python 的 GIL 控制的。
Cython 目前没有提供太多覆盖这些选择的机会。
归约¶
Cython 中第二常见的并行操作是“归约”操作。一个常见的例子是累积整个数组的总和,例如在下面计算向量范数时
from cython.parallel cimport prange
cimport cython
from libc.math cimport sqrt
@cython.boundscheck(False)
@cython.wraparound(False)
def l2norm(double[:] x):
cdef double total = 0
cdef Py_ssize_t i
for i in prange(x.shape[0], nogil=True):
total += x[i]*x[i]
return sqrt(total)
from cython.parallel import prange
import cython
from cython.cimports.libc.math import sqrt
@cython.boundscheck(False)
@cython.wraparound(False)
def l2norm(x: cython.double[:]):
total: cython.double = 0
i: cython.Py_ssize_t
for i in prange(x.shape[0], nogil=True):
total += x[i]*x[i]
return sqrt(total)
Cython 能够推断 +=
、*=
、-=
、&=
、|=
和 ^=
的归约。这些只适用于 C 标量变量,因此您不能轻松地将 2D 内存视图归约为 1D 内存视图,例如。
生成的 C 代码大约是
#pragma omp parallel reduction(+:total)
{
#pragma omp for firstprivate(i) lastprivate(i)
for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_3; __pyx_t_2++){
i = __pyx_t_2;
total = total + /* some indexing code */;
}
}
parallel
块¶
Cython 的 parallel
运算符比 prange
使用频率低得多。 parallel
生成一个代码块,该代码块会同时在多个线程上运行。但是,与 prange
不同,工作不会自动分配到各个线程。
这里我们介绍了 parallel
块的三个常见用途
将 prange 块串联在一起¶
进入和离开并行部分会产生一些开销。因此,如果您有多个并行部分,并且它们之间有很小的串行部分,那么编写一个大型并行块可能会更有效。任何小的串行部分都会被复制,但开销会减少。
在下面的示例中,我们对向量进行就地归一化。第一个并行循环计算范数,第二个并行循环将范数应用于向量,我们避免了在两者之间跳进跳出串行代码。
from cython.parallel cimport parallel, prange
cimport cython
from libc.math cimport sqrt
@cython.boundscheck(False)
@cython.wraparound(False)
def normalize(double[:] x):
cdef Py_ssize_t i
cdef double total = 0
cdef double norm
with nogil, parallel():
for i in prange(x.shape[0]):
total += x[i]*x[i]
norm = sqrt(total)
for i in prange(x.shape[0]):
x[i] /= norm
from cython.parallel import parallel, prange
import cython
from cython.cimports.libc.math import sqrt
@cython.boundscheck(False)
@cython.wraparound(False)
def normalize(x: cython.double[:]):
i: cython.Py_ssize_t
total: cython.double = 0
norm: cython.double
with cython.nogil, parallel():
for i in prange(x.shape[0]):
total += x[i]*x[i]
norm = sqrt(total)
for i in prange(x.shape[0]):
x[i] /= norm
C 代码大约是
#pragma omp parallel private(norm) reduction(+:total)
{
/* some calculations of array size... */
#pragma omp for firstprivate(i) lastprivate(i)
for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_3; __pyx_t_2++){
/* ... */
}
norm = sqrt(total);
#pragma omp for firstprivate(i) lastprivate(i)
for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_3; __pyx_t_2++){
/* ... */
}
}
为每个线程分配“临时空间”¶
假设每个线程需要一小块临时空间来工作。它们不能共享临时空间,因为这会导致数据竞争。在这种情况下,分配和释放是在并行部分完成的(因此在每个线程的基础上发生),围绕一个循环,然后使用临时空间。
我们这里的示例使用 C++ 查找二维数组中每列的中位数(只是 numpy.median(x, axis=0)
的并行版本)。我们必须重新排序每一列才能找到它的中位数,但不想修改输入数组。因此,我们为每个线程分配一个 C++ 向量作为临时空间,并在其中工作。为了提高效率,向量是在 prange
循环之外分配的。
# distutils: language = c++
from cython.parallel cimport parallel, prange
from libcpp.vector cimport vector
from libcpp.algorithm cimport nth_element
cimport cython
from cython.operator cimport dereference
import numpy as np
@cython.boundscheck(False)
@cython.wraparound(False)
def median_along_axis0(const double[:,:] x):
cdef double[::1] out = np.empty(x.shape[1])
cdef Py_ssize_t i, j
cdef vector[double] *scratch
cdef vector[double].iterator median_it
with nogil, parallel():
# allocate scratch space per loop
scratch = new vector[double](x.shape[0])
try:
for i in prange(x.shape[1]):
# copy row into scratch space
for j in range(x.shape[0]):
dereference(scratch)[j] = x[j, i]
median_it = scratch.begin() + scratch.size()//2
nth_element(scratch.begin(), median_it, scratch.end())
# for the sake of a simple example, don't handle even lengths...
out[i] = dereference(median_it)
finally:
del scratch
return np.asarray(out)
# distutils: language = c++
from cython.parallel import parallel, prange
from cython.cimports.libc.stdlib import malloc, free
from cython.cimports.libcpp.algorithm import nth_element
import cython
from cython.operator import dereference
import numpy as np
@cython.boundscheck(False)
@cython.wraparound(False)
def median_along_axis0(x: cython.double[:,:]):
out: cython.double[::1] = np.empty(x.shape[1])
i: cython.Py_ssize_t
j: cython.Py_ssize_t
scratch: cython.pointer(cython.double)
median_it: cython.pointer(cython.double)
with cython.nogil, parallel():
# allocate scratch space per loop
scratch = cython.cast(
cython.pointer(cython.double),
malloc(cython.sizeof(cython.double)*x.shape[0]))
try:
for i in prange(x.shape[1]):
# copy row into scratch space
for j in range(x.shape[0]):
scratch[j] = x[j, i]
median_it = scratch + x.shape[0]//2
nth_element(scratch, median_it, scratch + x.shape[0])
# for the sake of a simple example, don't handle even lengths...
out[i] = dereference(median_it)
finally:
free(scratch)
return np.asarray(out)
注意
纯语法和经典语法示例并不完全相同,因为纯 Python 语法不支持 C++ 的“new”,因此我们以略微不同的方式分配临时空间
在生成的代码中,scratch
变量在外部并行块中被标记为 private
。一个粗略的概述是
#pragma omp parallel private(scratch)
{
scratch = new std::vector<double> ((x.shape[0]))
#pragma omp for firstprivate(i) lastprivate(i) lastprivate(j) lastprivate(median_it)
for (__pyx_t_9 = 0; __pyx_t_9 < __pyx_t_10; __pyx_t_9++){
i = __pyx_t_9;
/* implementation goes here */
}
/* some exception handling detail omitted */
delete scratch;
}
在每个线程上执行不同的任务¶
最后,如果您手动指定线程数量,然后使用 omp.get_thread_num()
识别每个线程,您就可以手动将工作分配到各个线程。这在 Cython 中是一个相当罕见的用例,并且可能表明 threading
模块更适合您要执行的操作。但是,这是一个选项。
from cython.parallel cimport parallel
from openmp cimport omp_get_thread_num
cdef void long_running_task1() noexcept nogil:
pass
cdef void long_running_task2() noexcept nogil:
pass
def do_two_tasks():
cdef int thread_num
with nogil, parallel(num_threads=2):
thread_num = omp_get_thread_num()
if thread_num == 0:
long_running_task1()
elif thread_num == 1:
long_running_task2()
from cython.parallel import parallel
from cython.cimports.openmp import omp_get_thread_num
import cython
@cython.cfunc
@cython.nogil
def long_running_task1() -> cython.void:
pass
@cython.cfunc
@cython.nogil
def long_running_task2() -> cython.void:
pass
def do_two_tasks():
thread_num: cython.int
with cython.nogil, parallel(num_threads=2):
thread_num = omp_get_thread_num()
if thread_num == 0:
long_running_task1()
elif thread_num == 1:
long_running_task2()
这种代码块的实用性受到以下事实的限制:在代码块中分配的变量对每个线程都是 private
,因此无法在后面的串行部分访问。
上面示例的生成的 C 代码非常简单
#pragma omp parallel private(thread_num)
{
thread_num = omp_get_thread_num();
switch (thread_num) {
/* ... */
}
}