您可以使用 Python 的 multiprocessing 模块的 Process 类,并利用每个子进程运行的独立 CPU 时间来计算函数执行时间。
方法如下:
首先,定义一个新的 run() 函数,该函数包含要测试的函数及其参数。
然后,在 run() 函数中启动一个新的子进程,并记录当前时间。在子进程中运行所需测试的功能函数,并在子进程完成后记录结束时间。
计算一下子进程的 CPU 时间,并将其与实际时间作为子进程的执行时间。
最终,您可以将每个子进程的执行时间汇总到一个列表中,并输出结果。
示例如下:
import time
from multiprocessing import Process, cpu_count
def timed_run(func, args):
"""Function to execute a function and record its execution time."""
start = time.process_time()
func(*args)
end = time.process_time()
elapsed_time = end - start
return round(elapsed_time, 2)
def func(x):
"""The function to test."""
time.sleep(1) # simulate some work
print(f"this is process {x}")
def process_pool(func, args_list):
"""Function to execute a function with arguments using a pool of processes."""
results = []
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
for args in args_list:
results.append(executor.submit(timed_run, func, args))
return [r.result() for r in results]
if __name__ == '__main__':
args_list = [(x,) for x in range(10)]
times = process_pool(func, args_list)
print(f"Execution times for each process: {times}")
在上面的代码中,我们定义了一个新的 timed_run() 函数来运行测试函数并计算其执行时间。然后,在 process_pool() 函数中,我们使用 ProcessPoolExecutor 类创建一个进程池,并提交任务。每个任务由一个子进程执行,这样您就可以在 CPU 时间到达后获取子进程的执行时间。
最终,我们将每个子进程的执行时间记录在一个列表中,并输出结果。