使用并行性¶
注意
此页面使用两种不同的语法变体
Cython 特定的
cdef
语法,旨在使类型声明简洁,并易于从 C/C++ 的角度阅读。纯 Python 语法,允许在 纯 Python 代码 中进行静态 Cython 类型声明,遵循 PEP-484 类型提示和 PEP 526 变量注释。
要在 Python 语法中使用 C 数据类型,您需要在要编译的 Python 模块中导入特殊的
cython
模块,例如import cython
如果您使用纯 Python 语法,我们强烈建议您使用最新的 Cython 3 版本,因为与 0.29.x 版本相比,这里已经进行了重大改进。
Cython 通过 cython.parallel
模块支持原生并行性。要使用这种并行性,必须释放 GIL(请参阅 释放 GIL)。它目前支持 OpenMP,但以后可能会支持更多后端。
注意
此模块中的功能只能从主线程或并行区域使用,因为存在 OpenMP 限制。
- cython.parallel.prange([start,] stop[, step][, nogil=False][, use_threads_if=CONDITION][, schedule=None[, chunksize=None]][, num_threads=None])¶
此函数可用于并行循环。OpenMP 会自动启动线程池,并根据使用的调度分配工作。
线程局部性和约简会自动推断变量。
如果您在 prange 块中为变量赋值,它将变为 lastprivate,这意味着该变量将包含最后一次迭代的值。如果您对变量使用就地运算符,它将变为约简,这意味着来自变量的线程局部副本的值将使用运算符进行约简,并在循环结束后分配给原始变量。索引变量始终为 lastprivate。在并行 with 块中分配的变量将是私有的,并且在块结束后不可用,因为没有顺序最后值的的概念。
- 参数:
start – 指示循环开始的索引(与 range 中的 start 参数相同)。
stop – 指示循环何时停止的索引(与 range 中的 stop 参数相同)。
step – 一个整数,表示序列的步长(与 range 中的 step 参数相同)。它不能为 0。
nogil – 此函数只能在释放 GIL 的情况下使用。如果
nogil
为 true,则循环将被包装在一个 nogil 部分中。use_threads_if – 仅当
CONDITION
评估为 true 时,循环才会在多个线程中运行。否则,代码将按顺序运行。按顺序运行循环在以下情况下非常有用:生成线程的成本大于并行运行循环的益处(例如,对于小型数据集)。schedule –
将
schedule
传递给 OpenMP,它可以是以下之一- 静态
如果提供了 chunksize,则迭代将提前以给定 chunksize 的块形式分配给所有线程。如果没有提供 chunksize,则迭代空间将被划分为大小大致相等的块,并且最多一个块将提前分配给每个线程。
当调度开销很重要,并且问题可以分解成大小相等且运行时间大致相同的块时,这最适合。
- 动态
迭代将按需分配给线程,默认块大小为 1。
当每个块的运行时间不同且事先未知时,这很适合,因此使用更多数量的较小块来保持所有线程繁忙。
- 引导
与动态调度一样,迭代将按需分配给线程,但块大小会逐渐减小。每个块的大小与未分配迭代次数除以参与线程数成正比,减小到 1(或提供的 chunksize)。
当事实证明最后几个块比预期花费更多时间或以其他方式被错误调度时,这比纯动态调度更具优势,因此大多数线程开始处于空闲状态,而最后几个块仅由较少数量的线程处理。
- 运行时
调度和块大小取自运行时调度变量,该变量可以通过
openmp.omp_set_schedule()
函数调用或OMP_SCHEDULE
环境变量设置。请注意,这实际上会禁用调度代码本身的任何静态编译时优化,因此与在编译时静态配置相同调度策略相比,性能可能略差。默认调度是实现定义的。有关更多信息,请参阅 OpenMP 规范 [1]。
num_threads –
num_threads
参数指示团队应包含多少个线程。如果没有给出,OpenMP 将决定使用多少个线程。通常,这是机器上可用内核的数量。但是,这可以通过omp_set_num_threads()
函数或OMP_NUM_THREADS
环境变量来控制。chunksize –
chunksize
参数指示用于将迭代分配给线程的块大小。这仅对static
、dynamic
和guided
调度有效,并且是可选的。不同的块大小可能会产生截然不同的性能结果,具体取决于调度、它提供的负载平衡、调度开销以及虚假共享量(如果有)。
带约简的示例
from cython.parallel import prange
i = cython.declare(cython.int)
n = cython.declare(cython.int, 30)
sum = cython.declare(cython.int, 0)
for i in prange(n, nogil=True):
sum += i
print(sum)
from cython.parallel import prange
cdef int i
cdef int n = 30
cdef int sum = 0
for i in prange(n, nogil=True):
sum += i
print(sum)
带 类型化内存视图(例如 NumPy 数组)的示例
from cython.parallel import prange
def func(x: cython.double[:], alpha: cython.double):
i: cython.Py_ssize_t
for i in prange(x.shape[0], nogil=True):
x[i] = alpha * x[i]
from cython.parallel import prange
def func(double[:] x, double alpha):
cdef Py_ssize_t i
for i in prange(x.shape[0], nogil=True):
x[i] = alpha * x[i]
带条件并行的示例
from cython.parallel import prange
def psum(n: cython.int):
i: cython.int
sum: cython.int = 0
for i in prange(n, nogil=True, use_threads_if=n>1000):
sum += i
return sum
psum(30) # Executed sequentially
psum(10000) # Executed in parallel
from cython.parallel import prange
def psum(int n):
cdef int i
cdef int sum = 0
for i in prange(n, nogil=True, use_threads_if=n>1000):
sum += i
return sum
psum(30) # Executed sequentially
psum(10000) # Executed in parallel
- cython.parallel.parallel(num_threads=None, use_threads_if=CONDITION)¶
此指令可以用作
with
语句的一部分,以并行执行代码序列。这目前对设置prange
使用的线程局部缓冲区很有用。包含的prange
将是一个非并行的共享工作循环,因此在并行部分中分配给的任何变量也对prange
私有。并行块中私有的变量在并行块之后不可用。带线程局部缓冲区的示例
from cython.parallel import parallel, prange from cython.cimports.libc.stdlib import abort, malloc, free @cython.nogil @cython.cfunc def func(buf: cython.p_int) -> cython.void: pass # ... idx = cython.declare(cython.Py_ssize_t) i = cython.declare(cython.Py_ssize_t) j = cython.declare(cython.Py_ssize_t) n = cython.declare(cython.Py_ssize_t, 100) local_buf = cython.declare(p_int) size = cython.declare(cython.size_t, 10) with cython.nogil, parallel(): local_buf: cython.p_int = cython.cast(cython.p_int, malloc(cython.sizeof(cython.int) * size)) if local_buf is cython.NULL: abort() # populate our local buffer in a sequential loop for i in range(size): local_buf[i] = i * 2 # share the work using the thread-local buffer(s) for j in prange(n, schedule='guided'): func(local_buf) free(local_buf)
from cython.parallel import parallel, prange from libc.stdlib cimport abort, malloc, free cdef void func(int *buf) noexcept nogil: pass # ... cdef Py_ssize_t idx, i, j, n = 100 cdef int * local_buf cdef size_t size = 10 with nogil, parallel(): local_buf = <int *> malloc(sizeof(int) * size) if local_buf is NULL: abort() # populate our local buffer in a sequential loop for i in range(size): local_buf[i] = i * 2 # share the work using the thread-local buffer(s) for j in prange(n, schedule='guided'): func(local_buf) free(local_buf)
稍后可能会在并行块中支持部分,以将工作代码部分分配给线程。
- cython.parallel.threadid()¶
返回线程的 ID。对于 n 个线程,ID 范围从 0 到 n-1。
编译¶
要实际使用 OpenMP 支持,您需要告诉 C 或 C++ 编译器启用 OpenMP。对于 gcc,可以在 setup.py
中执行以下操作
from setuptools import Extension, setup
from Cython.Build import cythonize
ext_modules = [
Extension(
"hello",
["hello.py"],
extra_compile_args=['-fopenmp'],
extra_link_args=['-fopenmp'],
)
]
setup(
name='hello-parallel-world',
ext_modules=cythonize(ext_modules),
)
from setuptools import Extension, setup
from Cython.Build import cythonize
ext_modules = [
Extension(
"hello",
["hello.pyx"],
extra_compile_args=['-fopenmp'],
extra_link_args=['-fopenmp'],
)
]
setup(
name='hello-parallel-world',
ext_modules=cythonize(ext_modules),
)
对于 Microsoft Visual C++ 编译器,请使用 '/openmp'
代替 '-fopenmp'
作为 'extra_compile_args'
选项。不要在 'extra_link_args'
选项中添加任何 OpenMP 标志。
跳出循环¶
parallel with 和 prange 块在 nogil 模式下支持 break、continue 和 return 语句。此外,在这些块内使用 with gil
块是有效的,并且可以从这些块中传播异常。但是,由于这些块使用 OpenMP,因此不能简单地退出,因此退出过程是尽力而为的。对于 prange()
,这意味着在任何线程的后续迭代中,第一次 break、return 或异常后,循环体将被跳过。如果多个不同的值可能被返回,则返回哪个值是未定义的,因为迭代没有特定的顺序。
from cython.parallel import prange
@cython.exceptval(-1)
@cython.cfunc
def func(n: cython.Py_ssize_t) -> cython.int:
i: cython.Py_ssize_t
for i in prange(n, nogil=True):
if i == 8:
with cython.gil:
raise Exception()
elif i == 4:
break
elif i == 2:
return i
from cython.parallel import prange
cdef int func(Py_ssize_t n) except -1:
cdef Py_ssize_t i
for i in prange(n, nogil=True):
if i == 8:
with gil:
raise Exception()
elif i == 4:
break
elif i == 2:
return i
在上面的示例中,是否会引发异常、是否会简单地 break 还是会返回 2 是未定义的。
使用 OpenMP 函数¶
可以通过 cimport openmp
来使用 OpenMP 函数。
from cython.parallel import parallel
from cython.cimports.openmp import omp_set_dynamic, omp_get_num_threads
num_threads = cython.declare(cython.int)
omp_set_dynamic(1)
with cython.nogil, parallel():
num_threads = omp_get_num_threads()
# ...
from cython.parallel cimport parallel
cimport openmp
cdef int num_threads
openmp.omp_set_dynamic(1)
with nogil, parallel():
num_threads = openmp.omp_get_num_threads()
# ...
参考资料