提高python效率的方法
cython、numpy、python及未测试的pyo3和新出的mojo语言
- 因为pyo3时用rust语言编写的第三方库,暂时还未涉猎该语言故就不加入测试中
- mojo当前本地只支持ubuntu,当前没得环境等等看看
- 基于python运行效率过慢的情况
测试结论:当numpy里没有原生方法计算时,通过cython可以大大加快处理速度,特别时for循环较多时,故通过cython混合开发,可以有效的提高处理效率及性能
# test.py打印结果
Prue Python: 2215 ms
Cython: 32 ms
True
Numpy: 17 ms
True
Cython Faster: 14 ms
True
原生Python: 965 ms
内存视图优化: 197 ms, 加速 4.9 倍
优化拉满: 93 ms, 加速 10.38 倍
测试代码 test.py
# coding: utf-8
import random
import time
import numpy as np
from cython_test import elementwise_multiply, elementwise_multiply_faster
from cython_test import knapsack_2d_dp_cy, knapsack_2d_dp_cy_full_power
from util.stopwatch import StopWatch
def elementwise_multiply_python(A, B):
rows, clos = A.shape
result = np.zeros((rows, clos), dtype=np.float64)
for i in range(rows):
for j in range(clos):
result[i, j] = A[i, j] * B[i, j]
return result
def knapsack_2d_dp(values, weights, capacity):
n = len(values)
# 创建一个二维数组dp,其中dp[i,j]表示前i个物品,在容量为j的情况下的最大价值
dp = np.zeros((n + 1, capacity + 1), dtype=np.int32)
# 填充dp数组
for i in range(1, n + 1):
for j in range(1, capacity + 1):
# 如果当前物品的重量大于背包容量j,则无法放入
if weights[i - 1] > j:
dp[i, j] = dp[i - 1, j]
else:
# 否则,可以选择放入或不放入当前物品,选择最大价值
dp[i, j] = max(dp[i - 1, j], dp[i - 1, j - weights[i - 1]] + values[i - 1])
# 从dp数组中找到最优解的最大价值
max_value = dp[n, capacity]
return max_value
if __name__ == '__main__':
A = np.random.rand(1000, 1000)
B = np.random.rand(1000, 1000)
repeat = 10
sw = StopWatch()
for _ in range(repeat):
r1 = elementwise_multiply_python(A, B)
sw.stop()
print(f'Prue Python: {sw.getElapsedTime()} ms')
sw.start()
for _ in range(repeat):
r2 = elementwise_multiply(A, B)
sw.stop()
print(f'Cython: {sw.getElapsedTime()} ms')
print((r1 == r2).all())
sw.start()
for _ in range(repeat):
r3 = A * B
sw.stop()
print(f'Numpy: {sw.getElapsedTime()} ms')
print((r1 == r3).all())
sw.start()
for _ in range(repeat):
r4 = elementwise_multiply_faster(A, B)
sw.stop()
print(f'Cython Faster: {sw.getElapsedTime()} ms')
print((r1 == r4).all())
# 测试
random.seed(42) # 设置随机种子以确保结果可复现
num_items = 10
values = [random.randint(1, 100) for _ in range(num_items)]
weights = [random.randint(1, 20) for _ in range(num_items)]
capacity = 50
repeat = 5000
sw = StopWatch()
for _ in range(repeat):
max_value = knapsack_2d_dp(values, weights, capacity)
sw.stop()
t1 = sw.getElapsedTime()
print(f"原生Python: {t1} ms")
# print(f"max value: {max_value}")
sw.start()
for _ in range(repeat):
max_value = knapsack_2d_dp_cy(values, weights, capacity)
sw.stop()
t2 = sw.getElapsedTime()
print(f"内存视图优化: {t2} ms, 加速 {t1 / t2:.1f} 倍")
# print(f"max value: {max_value}")
sw.start()
for _ in range(repeat):
max_value = knapsack_2d_dp_cy_full_power(values, weights, capacity)
sw.stop()
t3 = sw.getElapsedTime()
print(f"优化拉满: {t3} ms, 加速 {t1 / t3:.2f} 倍")
time.sleep(111111)
测试代码 cython_test.pyx 需要通过setup.py编译后给python调用
import numpy as np
cimport numpy as cnp
from cython import boundscheck, wraparound
# cpdef 代表在c语言里也可以支持 def代表只支持python
cpdef cnp.ndarray[double, ndim=2] elementwise_multiply(
cnp.ndarray[double, ndim=2] A,
cnp.ndarray[double, ndim=2] B):
cdef int nrows = A.shape[0]
cdef int ncols = A.shape[1]
cdef cnp.ndarray[double, ndim=2] result = np.zeros((nrows, ncols), dtype=np.float64)
cdef int i, j
for i in range(nrows):
for j in range(ncols):
result[i, j] = A[i, j] * B[i, j]
return result
# 通过检查代码,关闭了边界检查优化 优化解决for循环内的代码
@boundscheck(False)
@wraparound(False)
def elementwise_multiply_faster(
cnp.ndarray[double, ndim=2] A,
cnp.ndarray[double, ndim=2] B):
cdef:
int nrows = A.shape[0]
int ncols = A.shape[1]
cnp.ndarray[double, ndim=2] result = np.zeros((nrows, ncols), dtype=np.float64)
int i, j
for i in range(nrows):
for j in range(ncols):
result[i, j] = A[i, j] * B[i, j]
return result
# 视图优化
def knapsack_2d_dp_cy(values, weights, capacity):
n = len(values)
# 创建一个二维数组dp,其中dp[i,j]表示前i个物品,在容量为j的情况下的最大价值
dp_raw = np.zeros((n + 1, capacity + 1), dtype=np.int32)
cdef int[:,:] dp = dp_raw # 这行代码及获得内存视图
# 填充dp数组
for i in range(1, n + 1):
for j in range(1, capacity + 1):
# 如果当前物品的重量大于背包容量j,则无法放入
if weights[i - 1] > j:
dp[i, j] = dp[i - 1, j]
else:
# 否则,可以选择放入或不放入当前物品,选择最大价值
dp[i, j] = max(dp[i - 1, j], dp[i - 1, j - weights[i - 1]] + values[i - 1])
# 从dp数组中找到最优解的最大价值
max_value = dp[n, capacity]
return max_value
@boundscheck(False)
@wraparound(False)
def knapsack_2d_dp_cy_full_power(list[int] values, list[int] weights, int capacity):
cdef:
int n = len(values)
# 创建一个二维数组dp,其中dp[i,j]表示前i个物品,在容量为j的情况下的最大价值
cnp.ndarray[int, ndim=2] dp_raw = np.zeros((n + 1, capacity + 1), dtype=np.int32)
int[:,:] dp = dp_raw
int i, j, max_value
# 填充dp数组
for i in range(1, n + 1):
for j in range(1, capacity + 1):
# 如果当前物品的重量大于背包容量j,则无法放入
if weights[i - 1] > j:
dp[i, j] = dp[i - 1, j]
else:
# 否则,可以选择放入或不放入当前物品,选择最大价值
dp[i, j] = max(dp[i - 1, j], dp[i - 1, j - weights[i - 1]] + values[i - 1])
# 从dp数组中找到最优解的最大价值
max_value = dp[n, capacity]
return max_value
测试代码setup.py
from setuptools import setup, Extension
from Cython.Build import cythonize
import numpy as np
ext_modules =[
Extension("cython_test", # 包的名称 这里可以使用逗号放到指定路径 app.cpython_test
["cython_test.pyx"], # cpython文件 这里可以通过/选择不同的路径 app/cpython/cpython_test.pyx
include_dirs=[np.get_include()]) # 包含的文件库导入
]
setup(
name="cython_test",
ext_modules=cythonize(ext_modules),
script_args=["build_ext", "--build-lib", "."]
)
# python setup.py build_ext --inplace
# 实际使用了pyd文件,通过pyx文件生成
# cython -a cython_test.pyx
# 分析代码的运行效率,黄色越深代表越耗时
stopwatech.py
import time
class StopWatch:
def __init__(self):
self.__startTime = time.time()
self.__endTime = 0
def start(self):
self.__startTime = time.time()
def stop(self):
self.__endTime = time.time()
def getElapsedTime(self):
return int((self.__endTime - self.__startTime) * 1000)
版权说明
本文地址:http://www.liuyangdeboke.cn/?post=53
未标注转载均为本站远程,转载请注明文章出处:
发表评论