最近在写一个小程序,后端是用python写的API,fastapi部署在windows上的脚本。业务逻辑是,客户通过小程序选择需要的图片,选好后可以打包为zip文件(通常打包10-20张,每张10M左右,打包后50-200M)。业务上有个典型的情况是,可能会同时有几十或上百人在同时通过小程序选择图片,选择好,调用API接口 /zip_files打包zip。这种情况下大部分是可以的,但是会有少部分会出现不打包的情况,想请问是否是因为打包需要时间,或者同时并发的人数较多,两者在一起导致会有遗漏或者不执行的情况。请问是什么原因,怎么调整?
补充:在此api下的zip_files函数,执行两个功能:先打包图片为zip,后转换压缩包地址为短网址。但有时出现短网址转换成功,压缩包却没有。
import uvicorn
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
app = FastAPI()
app.mount("/图片库", StaticFiles(directory="图片库"), name="图片库")
static_folder = r"E:/新建文件夹/图片库"
global phone,base64s
@app.post('/zip_files)
def zip_files(item: dict):
# res = request.get_data()
# res = json.loads(res)
res = item
print(res)
selected_photo = res.get('selected_photo')
single_or_group = res.get('single_or_group')
first_level_name = res.get('first_level_name')
second_level_name = res.get('second_level_name')
domain_name = res.get('domain_name')
product_select_transfer = res.get('product_select_transfer')
print(single_or_group,first_level_name,second_level_name,domain_name,'****',product_select_transfer)
last_folder = static_folder.split('/')[-1]
urls = []
for i in selected_photo.values():
print('i.....',i)
url = i.get('url')
url = static_folder + url.split(last_folder)[-1]
a = url.split('/')
a.insert(-1, '原片')
urlss = '/'.join(a)
urls.append(urlss)
print('urls is ....',urls)
if single_or_group == 'single':
zip_name = static_folder+'/single/'+str(res.get('phone')) + '/' +str(res.get('phone'))+'.zip'
zip_name_ruce = static_folder+'/single/'+str(res.get('phone')) + '/' +'入册.zip'
base_url = domain_name + '/' + last_folder + '/single/'+str(res.get('phone')) + '/' +str(res.get('phone'))+'.zip'
elif single_or_group == 'group':
zip_name = static_folder+'/group/'+'/'+first_level_name+'/'+second_level_name+'/'+str(res.get('phone')) + '/' +str(res.get('phone'))+'.zip'
zip_name_ruce = static_folder+'/group/'+'/'+first_level_name+'/'+second_level_name+'/'+str(res.get('phone')) + '/' +'入册.zip'
base_url = domain_name + '/' + last_folder + '/group/'+'/'+first_level_name+'/'+second_level_name+'/'+str(res.get('phone')) + '/' +str(res.get('phone'))+'.zip'
else:
zip_name = ''
base_url = ''
zip_name_ruce = ''
print('base_url is ....',base_url,'last_folder is ...',last_folder)
zip = zipfile.ZipFile( zip_name, 'w' )
for file in urls:
# print ('compressing', file)
zip.write( file,os.path.basename(file) )
zip.close()
print ('compressing finished')
zip_ruce = zipfile.ZipFile(zip_name_ruce, 'w')
for key in product_select_transfer:
if product_select_transfer[key]:
all_taocan = product_select_transfer[key]
for keys in all_taocan:
taocan_name = keys
taocan_products = all_taocan[keys]
for keyss in taocan_products:
product_name = keyss
photo_lst = taocan_products[keyss]
for i in photo_lst:
# pre_static_folder = static_folder.replace(f'/{last_folder}','')
# ruce_file = i.replace(domain_name,pre_static_folder)
url = static_folder + i.split(last_folder)[-1]
a = url.split('/')
a.insert(-1, '原片')
urlss = '/'.join(a)
zip_path = f'/{taocan_name}/{product_name}/{os.path.basename(urlss)}'
zip_ruce.write( urlss,zip_path )
zip_ruce.close()
print ('ruce compressing finished')
res = get_short_url(base_url)
return res
def get_short_url(url):
try:
client = dwz.Dwz("***************")
res = client.create_single(url, dwz.TOV.ONE_YEAR)
# print(res)
return res
except:
return '请联系我们'
if __name__ == '__main__':
uvicorn.run(app=app,host='0.0.0.0', port=8084)