各位好,我在csdn找到一段代码,用于faers数据库的数据清理和整理,如下:
代码的发布时间较早,我运行后发现我按照要求将数据库源文件(18年-22年)放入data文件后,只能运行20年-22年的数据,请问如何修改可以运行18-22年的全部数据呢?
```python
```from os import listdir
```from os.path import join, isfile
```from sys import argv
```import zipfile
```import re
```import importlib
```db_selection = 'sqlite'
if len(argv) > 1:
db_selection = argv[1]
db = importlib.import_module(db_selection)
db.setupDB()
class DBfields:
def __init__(self, year, quarter):
self.yearq = year + 0.1 * quarter
self.trans = {
'THER': ['therapy', self.therapy_fields],
'RPSR': ['source', self.source_fields],
'REAC': ['react', self.react_fields],
'OUTC': ['outcome', self.outcome_fields],
'INDI': ['indication', self.indication_fields],
'DRUG': ['drug', self.drug_fields],
'DEMO': ['demo', self.demo_fields],
}
def translate(self, first_four):
table_name = self.trans[first_four][0]
table_fields = self.trans[first_four][1]()
return {'table_name': table_name, 'table_fields': table_fields}
def therapy_fields(self):
if self.yearq < 2012.4:
return ['ISR', 'DRUG_SEQ', 'START_DT', 'END_DT', 'DUR', 'DUR_COD']
else:
return ['PRIMARYID', 'CASEID', 'DRUG_SEQ', 'START_DT', 'END_DT', 'DUR', 'DUR_COD']
def source_fields(self):
if self.yearq < 2012.4:
return ['ISR', 'RPSR_COD']
else:
return ['PRIMARYID', 'CASEID', 'RPSR_COD']
def react_fields(self):
if self.yearq < 2012.4:
return ['ISR', 'PT']
elif self.yearq < 2014.3:
return ['PRIMARYID', 'CASEID', 'PT']
else:
return ['PRIMARYID', 'CASEID', 'PT', 'DRUG_REC_ACT']
def outcome_fields(self):
if self.yearq < 2012.4:
return ['ISR', 'OUTC_COD']
else:
return ['PRIMARYID', 'CASEID', 'OUTC_COD']
def indication_fields(self):
if self.yearq < 2012.4:
return ['ISR', 'DRUG_SEQ', 'INDI_PT']
else:
return ['PRIMARYID', 'CASEID', 'INDI_DRUG_SEQ', 'INDI_PT']
def drug_fields(self):
if self.yearq < 2012.4:
return ['ISR', 'DRUG_SEQ', 'ROLE_COD', 'DRUGNAME', 'VAL_VBM', 'ROUTE', 'DOSE_VBM', 'DECHAL', 'RECHAL',
'LOT_NUM', 'EXP_DT', 'NDA_NUM']
elif self.yearq < 2014.3:
return ['PRIMARYID', 'CASEID', 'DRUG_SEQ', 'ROLE_COD', 'DRUGNAME', 'VAL_VBM', 'ROUTE', 'DOSE_VBM',
'CUM_DOSE_CHR', 'CUM_DOSE_UNIT',
'DECHAL', 'RECHAL', 'LOT_NUM', 'EXP_DT', 'NDA_NUM', 'DOSE_AMT', 'DOSE_UNIT', 'DOSE_FORM',
'DOSE_FREQ']
else:
return ['PRIMARYID', 'CASEID', 'DRUG_SEQ', 'ROLE_COD', 'DRUGNAME', 'PROD_AI', 'VAL_VBM', 'ROUTE',
'DOSE_VBM', 'CUM_DOSE_CHR', 'CUM_DOSE_UNIT',
'DECHAL', 'RECHAL', 'LOT_NUM', 'EXP_DT', 'NDA_NUM', 'DOSE_AMT', 'DOSE_UNIT', 'DOSE_FORM',
'DOSE_FREQ']
def demo_fields(self):
if self.yearq < 2012.4:
return ['ISR', 'CASE_NUM', 'I_F_COD', 'FOLL_SEQ', 'IMAGE', 'EVENT_DT', 'MFR_DT', 'FDA_DT', 'REPT_COD',
'MFR_NUM', 'MFR_SNDR', 'AGE', 'AGE_COD',
'GNDR_COD', 'E_SUB', 'WT', 'WT_COD', 'REPT_DT', 'OCCP_COD', 'DEATH_DT', 'TO_MFR', 'CONFID',
'REPORTER_COUNTRY']
elif self.yearq < 2014.3:
return ['PRIMARYID', 'CASEID', 'CASEVERSION', 'I_F_COD', 'EVENT_DT', 'MFR_DT', 'INIT_FDA_DT', 'FDA_DT',
'REPT_COD', 'MFR_NUM', 'MFR_SNDR',
'AGE', 'AGE_COD', 'GNDR_COD', 'E_SUB', 'WT', 'WT_COD', 'REPT_DT', 'TO_MFR', 'OCCP_COD',
'REPORTER_COUNTRY', 'OCCR_COUNTRY']
else:
return ['PRIMARYID', 'CASEID', 'CASEVERSION', 'I_F_COD', 'EVENT_DT', 'MFR_DT', 'INIT_FDA_DT', 'FDA_DT',
'REPT_COD', 'AUTH_NUM', 'MFR_NUM',
'MFR_SNDR', 'LIT_REF', 'AGE', 'AGE_COD', 'AGE_GRP', 'SEX', 'E_SUB', 'WT', 'WT_COD', 'REPT_DT',
'TO_MFR', 'OCCP_COD', 'REPORTER_COUNTRY', 'OCCR_COUNTRY']
```tbl_count = {}
files = ['data/' + f for f in listdir('data') if isfile(join('data', f)) and f[-4:].lower() == '.zip']
for filename in files:
if not zipfile.is_zipfile(filename):
raise Exception(filename + ' is not a zip file')
valid_files = []
ascii_file_re = re.compile(r'as(?:c*)i(?:i*)/(.+?)\.txt', re.I)
ascii_year_re = re.compile(r'\D(\d{4})\D')
ascii_quarter_re = re.compile(r'q(\d)\.', re.I)
for filename in files:
try:
year = int(ascii_year_re.search(filename).group(1))
quarter = int(ascii_quarter_re.search(filename).group(1))
except:
raise Exception('Unable to ascertain date information for file ' + filename)
try:
f = zipfile.ZipFile(filename, 'r')
f.infolist()
except:
raise Exception('Unable to read ' + filename)
for name in f.namelist():
if ascii_file_re.match(name):
valid_files.append([filename, name, year, quarter])
def sort_files(a, b):
if a[2] != b[2]:
return a[2] - b[2]
return a[3] - b[3]
from functools import cmp_to_key
valid_files.sort(key=cmp_to_key(sort_files))
def pop_newlines(fields, req_fields):
while len(fields) > req_fields and (fields[-1] == "\r\n" or fields[-1] == "" or fields[-1] == "\n"):
fields.pop()
for zip_files in valid_files:
# Unpack list of info
zip_filename = zip_files[0]
filename = zip_files[1]
year = zip_files[2]
quarter = zip_files[3]
zip_name = ascii_file_re.search(zip_files[1]).group(1)
if zip_name[:4] == 'STAT':
continue # STAT is not $ delimited and likely wrong after scrubbing
trans = DBfields(year, quarter).translate(zip_name[:4].upper())
table_name = trans['table_name']
print(zip_filename + ' ' + filename + ' (' + trans['table_name'] + ')...', )
f = zipfile.ZipFile(zip_filename, 'r')
h = f.open(filename, 'r')
lines = h.readlines()
total_lines = len(lines)
req_fields = len(trans['table_fields'])
fields_obj = DBfields(year, quarter)
i = 0
while i < total_lines:
if i == 0:
i += 1
continue
l = lines[i].decode('utf-8')
fields = l.split('$')
extra_lines = 0
while len(fields) < req_fields and i + 1 + extra_lines < total_lines:
extra_lines += 1
l += lines[i + extra_lines]
fields = l.split('$')
# Check if we went over the field count and give up
pop_newlines(fields, req_fields)
if len(fields) > req_fields:
print("\t", zip_files[1], i + 1, len(fields), req_fields)
# noinspection PyTypeChecker
fields = lines[i].split('$')
extra_lines = 0
break
pop_newlines(fields, req_fields)
field_count = len(fields)
if field_count == req_fields:
try:
if table_name not in tbl_count:
``` tbl_count[table_name] = {'records': 0, 'files': 0}
``` tbl_count[table_name]['records'] += 1
for f in fields:
f = f.strip()
db.writeEntry(table_name, trans['table_fields'], fields)
except Exception as e:
print(l)
print(fields)
raise Exception(e)
else:
print("\t", trans['table_name'], ' - skipping line ', i + 1, year, quarter, len(fields), req_fields)
print("\t\t", fields)
i += 1 + extra_lines
print('done')
```tbl_count[table_name]['files'] += 1
db.preClose()
db.closeDB()
db.postClose()
print(tbl_count)
****Sqlite.qy文件如下:****
"""SQLite module used to assit FAERS parse.py module
These functions are separated from parse.py to keep the DB modular. Feel free
to write your own for your own db.
"""
import sqlite3
from os.path import isfile
from os import remove
import zipfile
import zlib # Needed for zipfile compression type (not all systems may have this)
# Remove old database if exists and setup connection
# Maybe this should be an option passed to setupDB()
if isfile('faers-data.sqlite'):
remove('faers-data.sqlite')
con = sqlite3.connect('faers-data.sqlite', isolation_level=None)
def setupDB():
"""
setupDB() createds SQLite tables using the global connection.
"""
con.execute("""
create table demo (ISR integer, PRIMARYID integer, CASEID integer, CASEVERSION integer, CASE_NUM integer, I_F_COD text,
FOLL_SEQ text, IMAGE text, EVENT_DT date, MFR_DT date, INIT_FDA_DT date, FDA_DT date,
REPT_COD text, AUTH_NUM integer, MFR_NUM text, MFR_SNDR text, LIT_REF text, AGE numeric, AGE_COD text, AGE_GRP text, SEX text,
GNDR_COD text, E_SUB text, WT numeric, WT_COD text, REPT_DT date,
OCCP_COD text, DEATH_DT date, TO_MFR text, CONFID text, REPORTER_COUNTRY text, OCCR_COUNTRY text)
""")
con.execute("""
create table drug (ISR integer, PRIMARYID integer, CASEID integer, DRUG_SEQ integer, ROLE_COD text,
DRUGNAME text, PROD_AI text, VAL_VBM integer, ROUTE text, DOSE_VBM text, CUM_DOSE_CHR real, CUM_DOSE_UNIT text, DECHAL text,
RECHAL text, LOT_NUM text, EXP_DT date, NDA_NUM text, DOSE_AMT real, DOSE_UNIT text, DOSE_FORM text, DOSE_FREQ text)
""")
con.execute("""
create table react (ISR integer, PRIMARYID integer, CASEID integer, PT text not null, DRUG_REC_ACT text)
""")
con.execute("""
create table outcome (ISR integer, PRIMARYID integer, CASEID integer, OUTC_COD text not null)
""")
con.execute("""
create table source (ISR integer, PRIMARYID integer, CASEID integer, RPSR_COD text not null)
""")
con.execute("""
create table therapy (ISR integer, PRIMARYID integer, CASEID integer, DRUG_SEQ integer, START_DT date,
END_DT date, DUR integer, DUR_COD text)
""")
con.execute("""
create table indication (ISR integer, PRIMARYID integer, CASEID integer, DRUG_SEQ integer, INDI_DRUG_SEQ integer, INDI_PT text)
""")
con.commit()
def writeEntry(table_name, field_names, fields):
"""
writeEntry() takes a table_name and list of fields and inserts them.
"""
fs = ' (' + ','.join(field_names) + ')'
qs = ['?'] * len(fields)
stm = 'INSERT INTO ' + table_name + fs + ' VALUES(' + ', '.join(qs) + ')'
con.execute(stm, fields)
def preClose():
con.execute('VACUUM')
"""def closeDB():
"""
closeDB() commits and closes the Db connection.
"""
con.commit()
con.close()
"""def postClose():
if isfile('faers-data-sqlite.zip'):
remove('faers-data-sqlite.zip')
zf = zipfile.ZipFile('faers-data-sqlite.zip', 'w', allowZip64=True)
zf.write('faers-data.sqlite', compress_type=zipfile.ZIP_DEFLATED)
""" zf.close()