1. 当前位置:网站首页 > Python

cython、python、numpy速度对比


提高python效率的方法

cython、numpy、python及未测试的pyo3和新出的mojo语言

  • 因为pyo3时用rust语言编写的第三方库,暂时还未涉猎该语言故就不加入测试中
  • mojo当前本地只支持ubuntu,当前没得环境等等看看
  • 基于python运行效率过慢的情况

测试结论:当numpy里没有原生方法计算时,通过cython可以大大加快处理速度,特别时for循环较多时,故通过cython混合开发,可以有效的提高处理效率及性能

# test.py打印结果

Prue Python: 2215 ms
Cython: 32 ms
True
Numpy: 17 ms
True
Cython Faster: 14 ms
True
原生Python: 965 ms
内存视图优化: 197 ms, 加速 4.9 倍
优化拉满: 93 ms, 加速 10.38 倍

测试代码 test.py

# coding: utf-8
import random
import time

import numpy as np
from cython_test import elementwise_multiply, elementwise_multiply_faster
from cython_test import knapsack_2d_dp_cy, knapsack_2d_dp_cy_full_power
from util.stopwatch import StopWatch

def elementwise_multiply_python(A, B):

    rows, clos = A.shape
    result = np.zeros((rows, clos), dtype=np.float64)

    for i in range(rows):
        for j in range(clos):
            result[i, j] = A[i, j] * B[i, j]

    return result

def knapsack_2d_dp(values, weights, capacity):
    n = len(values)

    # 创建一个二维数组dp,其中dp[i,j]表示前i个物品,在容量为j的情况下的最大价值
    dp = np.zeros((n + 1, capacity + 1), dtype=np.int32)

    # 填充dp数组
    for i in range(1, n + 1):
        for j in range(1, capacity + 1):
            # 如果当前物品的重量大于背包容量j,则无法放入
            if weights[i - 1] > j:
                dp[i, j] = dp[i - 1, j]
            else:
                # 否则,可以选择放入或不放入当前物品,选择最大价值
                dp[i, j] = max(dp[i - 1, j], dp[i - 1, j - weights[i - 1]] + values[i - 1])

    # 从dp数组中找到最优解的最大价值
    max_value = dp[n, capacity]
    return max_value

if __name__ == '__main__':
    A = np.random.rand(1000, 1000)
    B = np.random.rand(1000, 1000)

    repeat = 10

    sw = StopWatch()
    for _ in range(repeat):
        r1 = elementwise_multiply_python(A, B)
    sw.stop()
    print(f'Prue Python: {sw.getElapsedTime()} ms')

    sw.start()
    for _ in range(repeat):
        r2 = elementwise_multiply(A, B)
    sw.stop()
    print(f'Cython: {sw.getElapsedTime()} ms')

    print((r1 == r2).all())

    sw.start()
    for _ in range(repeat):
        r3 = A * B
    sw.stop()
    print(f'Numpy: {sw.getElapsedTime()} ms')
    print((r1 == r3).all())

    sw.start()
    for _ in range(repeat):
        r4 = elementwise_multiply_faster(A, B)
    sw.stop()
    print(f'Cython Faster: {sw.getElapsedTime()} ms')

    print((r1 == r4).all())

    # 测试
    random.seed(42)  # 设置随机种子以确保结果可复现
    num_items = 10
    values = [random.randint(1, 100) for _ in range(num_items)]
    weights = [random.randint(1, 20) for _ in range(num_items)]
    capacity = 50

    repeat = 5000

    sw = StopWatch()
    for _ in range(repeat):
        max_value = knapsack_2d_dp(values, weights, capacity)
    sw.stop()
    t1 = sw.getElapsedTime()
    print(f"原生Python: {t1} ms")
    # print(f"max value: {max_value}")

    sw.start()
    for _ in range(repeat):
        max_value = knapsack_2d_dp_cy(values, weights, capacity)
    sw.stop()
    t2 = sw.getElapsedTime()
    print(f"内存视图优化: {t2} ms, 加速 {t1 / t2:.1f} 倍")
    # print(f"max value: {max_value}")

    sw.start()
    for _ in range(repeat):
        max_value = knapsack_2d_dp_cy_full_power(values, weights, capacity)
    sw.stop()
    t3 = sw.getElapsedTime()
    print(f"优化拉满: {t3} ms, 加速 {t1 / t3:.2f} 倍")

    time.sleep(111111)

测试代码 cython_test.pyx 需要通过setup.py编译后给python调用

import numpy as np
cimport numpy as cnp
from cython import boundscheck, wraparound

# cpdef 代表在c语言里也可以支持 def代表只支持python
cpdef cnp.ndarray[double, ndim=2] elementwise_multiply(
        cnp.ndarray[double, ndim=2] A,
        cnp.ndarray[double, ndim=2] B):
    cdef int nrows = A.shape[0]
    cdef int ncols = A.shape[1]
    cdef cnp.ndarray[double, ndim=2] result = np.zeros((nrows, ncols), dtype=np.float64)

    cdef int i, j
    for i in range(nrows):
        for j in range(ncols):
            result[i, j] = A[i, j] * B[i, j]

    return result

# 通过检查代码,关闭了边界检查优化  优化解决for循环内的代码
@boundscheck(False)
@wraparound(False)
def elementwise_multiply_faster(
        cnp.ndarray[double, ndim=2] A,
        cnp.ndarray[double, ndim=2] B):
    cdef:
        int nrows = A.shape[0]
        int ncols = A.shape[1]
        cnp.ndarray[double, ndim=2] result = np.zeros((nrows, ncols), dtype=np.float64)
        int i, j

    for i in range(nrows):
        for j in range(ncols):
            result[i, j] = A[i, j] * B[i, j]

    return result

# 视图优化

def knapsack_2d_dp_cy(values, weights, capacity):
    n = len(values)

    # 创建一个二维数组dp,其中dp[i,j]表示前i个物品,在容量为j的情况下的最大价值
    dp_raw = np.zeros((n + 1, capacity + 1), dtype=np.int32)
    cdef int[:,:] dp = dp_raw   # 这行代码及获得内存视图

    # 填充dp数组
    for i in range(1, n + 1):
        for j in range(1, capacity + 1):
            # 如果当前物品的重量大于背包容量j,则无法放入
            if weights[i - 1] > j:
                dp[i, j] = dp[i - 1, j]
            else:
                # 否则,可以选择放入或不放入当前物品,选择最大价值
                dp[i, j] = max(dp[i - 1, j], dp[i - 1, j - weights[i - 1]] + values[i - 1])

    # 从dp数组中找到最优解的最大价值
    max_value = dp[n, capacity]
    return max_value

@boundscheck(False)
@wraparound(False)
def knapsack_2d_dp_cy_full_power(list[int] values, list[int] weights, int capacity):
    cdef:
        int n = len(values)

    # 创建一个二维数组dp,其中dp[i,j]表示前i个物品,在容量为j的情况下的最大价值
        cnp.ndarray[int, ndim=2] dp_raw = np.zeros((n + 1, capacity + 1), dtype=np.int32)
        int[:,:] dp = dp_raw
        int i, j, max_value

    # 填充dp数组
    for i in range(1, n + 1):
        for j in range(1, capacity + 1):
            # 如果当前物品的重量大于背包容量j,则无法放入
            if weights[i - 1] > j:
                dp[i, j] = dp[i - 1, j]
            else:
                # 否则,可以选择放入或不放入当前物品,选择最大价值
                dp[i, j] = max(dp[i - 1, j], dp[i - 1, j - weights[i - 1]] + values[i - 1])

    # 从dp数组中找到最优解的最大价值
    max_value = dp[n, capacity]
    return max_value

测试代码setup.py

from setuptools import setup, Extension
from Cython.Build import cythonize
import numpy as np

ext_modules =[
    Extension("cython_test",                  # 包的名称  这里可以使用逗号放到指定路径 app.cpython_test
              ["cython_test.pyx"],          # cpython文件 这里可以通过/选择不同的路径 app/cpython/cpython_test.pyx
              include_dirs=[np.get_include()])      # 包含的文件库导入
]

setup(
    name="cython_test",
    ext_modules=cythonize(ext_modules),
    script_args=["build_ext", "--build-lib", "."]
)

# python setup.py build_ext --inplace
# 实际使用了pyd文件,通过pyx文件生成

# cython -a cython_test.pyx
# 分析代码的运行效率,黄色越深代表越耗时

stopwatech.py

import time

class StopWatch:
    def __init__(self):
        self.__startTime = time.time()
        self.__endTime = 0

    def start(self):
        self.__startTime = time.time()

    def stop(self):
        self.__endTime = time.time()

    def getElapsedTime(self):
        return int((self.__endTime - self.__startTime) * 1000)

本文最后更新于2023-10-7,已超过 3个月没有更新,如果文章内容或图片资源失效,请留言反馈,我们会及时处理,谢谢!
版权说明

本文地址:http://www.liuyangdeboke.cn/?post=53
未标注转载均为本站远程,转载请注明文章出处:

发表评论

联系我们

在线咨询:点击这里给我发消息

微信号:17721538135

工作日:9:00-23:00,节假日休息

扫码关注