我想尝试编写代码以将手机中的图片大量传输到电脑中。我尝试了使用adb shell su pull命令直接将大量图片拷贝到指定文件夹,但因获取不到root权限而失败。因此,我转而通过adb shell cat获取图片数据,然后通过with open as在指定文件夹生成对应文件名并写入到其中。尽管传输过程非常顺利,没有报任何错误,但传输结束后却发现拷贝的所有图片失真,有的甚至打不开。挑选了几张图片进行排查,发现所有拷贝后的图片大小比原图小,其中jpg小了大概17B左右,png格式的图片少了35B左右。下面是编写的用于复制手机图片的代码。我使用过该代码(当然某些地方要适当改改)复制过本机图片和MuMu模拟器的图片,都没有发现失真的问题,仅在复制手机图片时发生了图片失真,所以我想请问一下大能们问题出在哪里。
from subprocess import Popen, PIPE
import threading
class CopyFiles:
"""将目标文件夹下的所有文件拷贝到另一文件夹"""
"""拷贝手机数据时,请先使用adb devices获取手机序列号,确保手机已成功连接"""
def get_all_files_name(self):
"""获取目标文件夹下所有文件的文件名"""
p = Popen(f'{self.adb} -s {self.serial_num} shell ls sdcard/{self.target_dir}',
shell=True, stdout=PIPE, encoding='utf-8')
result = p.communicate()[0].strip()
times = result.count('\n')
for i in range(times + 1):
self.file_name_list.append(result.split('\n')[i])
def __init__(self, serial_num: str, target_dir: str, copy_addr: str):
self.adb = 'F:/android_sdk/platform-tools/adb.exe'
self.serial_num = serial_num
self.target_dir = target_dir
self.copy_addr = copy_addr
self.file_name_list = []
self.get_all_files_name()
self.files_dict = {}
self.loop = True
self.execution_time = 0
def read_files(self):
"""读取图片文件"""
# todo 实际传输过程中发现图片失真,对比copy图片和原图发现,copy图片大小要小于原图
for file_name in self.file_name_list:
p = Popen(f'{self.adb} -s {self.serial_num} shell cat sdcard/{self.target_dir}/{file_name}', shell=True, stdout=PIPE)
content = p.communicate()[0].strip()
with open('E:/picture/sss/test1.txt', 'wb') as f1:
f1.write(content) # 1431,505 - 1428,537: 比原图多2,968 B
content = content.replace(b'\r\n', b'\n').replace(b'\r\n', b'\n')
with open('E:/picture/sss/test2.txt', 'wb') as f2:
f2.write(content) # 1428,518 - 1428,537: 比原图少19 B
self.files_dict[file_name] = content
self.loop = False
def write_files(self):
"""写入图片文件"""
while True:
if len(self.files_dict) > 0:
for file_name in list(self.files_dict.keys()):
# 这里需将self.files_dict.keys()转换为列表,
# 否则会报RuntimeError: dictionary changed size during iteration错误
with open(self.copy_addr + '/' + file_name, 'wb') as f:
f.write(self.files_dict.get(file_name))
self.files_dict.pop(file_name)
self.execution_time += 1
if not self.loop and len(self.files_dict) == 0:
break
if __name__ == '__main__':
copy_file = CopyFiles('手机序列号', 'Pictures/test', 'E:/picture/sss')
read_thread = threading.Thread(target=copy_file.read_files)
write_thread = threading.Thread(target=copy_file.write_files)
read_thread.start()
write_thread.start()
while True:
if not read_thread.is_alive() and not write_thread.is_alive():
print(f'execution times: {copy_file.execution_time}')
break