问题遇到的现象和发生背景
需要加速的代码,是两个嵌套的for循环,单次循环之间可以并行处理,没有数据依赖,应该是可以用GPU进行加速的。
问题相关代码,请勿粘贴截图
import msgpack
import torch
from torch.nn.utils.rnn import pad_sequence
from pathlib import Path
import csv
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from numba import cuda, jit
from numba.typed import List
import numpy as np
with open('/home/suned/data/giscup_2021_1/msgpack/20200801.msgpack', 'rb') as f1:
dct1 = msgpack.unpackb(f1.read())
with open('/home/suned/data/giscup_2021_1/totalmsgpack/1_2.msgpack', 'rb') as f2:
dct2 = msgpack.unpackb(f2.read())
def batch2tensor(batch, name, log_trans=False, long_tensor=False): # 将数据处理成张量形式。
if long_tensor == True:
x = torch.LongTensor([int(item[name]) for item in batch])
else:
x = torch.FloatTensor([item[name] for item in batch])
if log_trans == True:
x = torch.log(x)
return x
eta_min, eta_max, eta_mean, eta_std = (2.3978952727983707, 9.371353167823885, 6.553886963677842, 0.5905307292899195)
simple_eat_min, simple_eat_max, simple_eat_mean, simple_eat_std = (
0.6931471805599453, 9.320180837655714, 6.453206241137908, 0.5758803681400783) # simple_eta是出发时刻平均通行时间求和。
eta1 = (batch2tensor(dct1, 'eta',
log_trans=True) - eta_mean) / eta_std # 计算标准化值,给出一组数据中各数值的相对位置。是一种标准化处理,平均数为0,标准差为1。这里的eta实际是ata,即训练的标签值。
eta2 = (batch2tensor(dct2, 'eta', log_trans=True) - eta_mean) / eta_std
simple_eta1 = (batch2tensor(dct1, 'simple_eta', log_trans=True) - simple_eat_mean) / simple_eat_std
# simple_eta2 = (batch2tensor(dct2, 'simple_eta', log_trans=True) - simple_eat_mean)/simple_eat_std
link_start = [torch.LongTensor(list([item['link_id'][0]])) for item in dct1]
link_start5 = [torch.LongTensor(list(item['link_id'][0:10])) for item in dct2]
link_start = pad_sequence(link_start, batch_first=True)
link_start5 = pad_sequence(link_start5, batch_first=True)
# print('link_start:',link_start)
# print('link_start5',link_start5)
link_end = [torch.LongTensor(list([item['link_id'][-1]])) for item in dct1]
link_end5 = [torch.LongTensor(list(item['link_id'][-10:])) for item in dct2]
link_end = pad_sequence(link_end, batch_first=True)
link_end5 = pad_sequence(link_end5, batch_first=True)
eta1 = eta1.numpy()
eta2 = eta2.numpy()
simple_eta1 = simple_eta1.numpy()
link_start = link_start.numpy()
link_start5 = link_start5.numpy()
link_end = link_end.numpy()
link_end5 = link_end5.numpy()
@jit(nopython=True)
def traj_judge(link_start, link_start5, link_end, link_end5, eta):
trajO = np.empty(shape=(0, 10))
trajD = np.empty(shape=(0, 10))
trajOD = np.empty(shape=(0, 20))
etaOD = np.empty(shape=(0, 1))
etaODavg = np.empty(shape=(0, 1))
for j in range(0, len(link_start5)):
if ((link_start == link_start5[j]).sum()) >= 1 and ((link_end == link_end5[j]).sum()) >= 1:
trajO = np.append(trajO, [link_start == link_start5[j]], axis=0)
trajD = np.append(trajD, [link_end == link_end5[j]], axis=0)
etaOD = np.append(etaOD, [[eta[j]]], axis=0)
'''elif((link_start==link_start5[j]).long().sum())==0 or ((link_end==link_end5[j]).long().sum())==0:
trajO1=[torch.tensor([False, False, False, False, False, False, False, False, False, False])]
trajD1=[torch.tensor([False, False, False, False, False, False, False, False, False, False])]
etaOD1=[torch.tensor([0])]'''
if len(trajO) == 0:
trajO = np.array([[False, False, False, False, False, False, False, False, False, False]])
if len(trajD) == 0:
trajD = np.array([[False, False, False, False, False, False, False, False, False, False]])
trajOD = np.concatenate((trajO, trajD), axis=1)
etaODavg = etaOD.mean()
return etaODavg,trajOD
'''@jit('float64()',nopython=True)
def var1():
trajlen1 = np.empty(shape=(0, 1))
return trajlen1
@jit('float64()',nopython=True)
def var2():
etaODAVG1 = np.empty(shape=(0, 1))
return etaODAVG1'''
@cuda.jit
def gpurun(N, etaODAVG, trajlen, trajOD, etaODavg,trajO,trajD,etaOD,etaODavg1,link_start, link_start5, link_end, link_end5, eta):
idxWithinGrid = cuda.threadIdx.x + cuda.blockIdx.x * cuda.blockDim.x
gridStride = cuda.gridDim.x * cuda.blockDim.x
for i in range(idxWithinGrid, N, gridStride):
for j in range(0, len(link_start5)):
if ((link_start[i] == link_start5[j]).sum()) >= 1 and ((link_end[i] == link_end5[j]).sum()) >= 1:
trajO = np.append(trajO, [[link_start[i] == link_start5[j]]], axis=0)
trajD = np.append(trajD, [[link_end[i] == link_end5[j]]], axis=0)
etaOD = np.append(etaOD, [[eta[j]]], axis=0)
'''elif((link_start==link_start5[j]).long().sum())==0 or ((link_end==link_end5[j]).long().sum())==0:
trajO1=[torch.tensor([False, False, False, False, False, False, False, False, False, False])]
trajD1=[torch.tensor([False, False, False, False, False, False, False, False, False, False])]
etaOD1=[torch.tensor([0])]'''
if len(trajO) == 0:
trajO = np.array([[False, False, False, False, False, False, False, False, False, False]])
if len(trajD) == 0:
trajD = np.array([[False, False, False, False, False, False, False, False, False, False]])
trajOD = np.concatenate((trajO, trajD), axis=1)
etaODavg = etaOD.mean()
if len(trajOD) == 1:
etaODavg = simple_eta1[i]
else:
etaODavg = etaODavg
etaODAVG = np.append(etaODAVG, etaODavg, axis=0)
trajlen = np.append(trajlen, [[len(trajOD)]], axis=0)
print('write:', i, 'trajlen:', len(trajOD), 'etaODAVG:', etaOD.mean(), 'simpale_eta:', simple_eta1[i], 'eta1:',
eta1[i])
def main():
etaODAVG = cuda.device_array(shape=(0, 1))
trajlen = cuda.device_array(shape=(0, 1))
trajOD = cuda.device_array(shape=(0, 20))
etaODavg = cuda.device_array(shape=(0, 1))
trajO = cuda.device_array(shape=(0, 10))
trajD = cuda.device_array(shape=(0, 10))
etaOD = cuda.device_array(shape=(0, 1))
etaODavg1 = cuda.device_array(shape=(0, 1))
gpurun[12, 256](len(link_start) + 1, etaODAVG, trajlen, trajOD, etaODavg,trajO,trajD,etaOD,etaODavg1,link_start, link_start5, link_end, link_end5, eta2)
if __name__ == '__main__':
main()
msg_path1 = Path('/home/suned/data/giscup_2021_1/totalmsgpack/20200801avg.msgpack')
msg_path1.parent.mkdir(parents=True, exist_ok=True) # parents:如果父目录不存在,是否创建父目录。exist_ok:只有在目录不存在时创建目录,目录已存在时不会抛出异常。
msg_path2 = Path('/home/suned/data/giscup_2021_1/totalmsgpack/20200801trajlen.msgpack')
msg_path2.parent.mkdir(parents=True, exist_ok=True)
with open(msg_path1, 'wb') as f1:
packed = msgpack.packb(etaODAVG) # msgpack.packb是一种数据打包的方式,类似JSON,但是更简单且数据量更小
f1.write(packed)
with open(msg_path2, 'wb') as f2:
packed = msgpack.packb(trajlen) # msgpack.packb是一种数据打包的方式,类似JSON,但是更简单且数据量更小
f2.write(packed)
运行结果及报错内容
在没有用cuda.jit装饰之前,代码是可以正常运行的,只是比较慢,而且目前用到的只是测试数据,后期可能会使用更大的数据。
我想要达到的结果
目前调试遇到很多问题,貌似主要是对数据类型的不支持,不知道是否有擅长这方面的朋友可以帮忙解决一下。
数据在此:链接: https://pan.baidu.com/s/1J7ii31wCm3nhp9mNuZ4Akw 提取码: mi63