提问:怎么能有效的抓取10-k的MD&A部分
Task:本地已下载多家公司10k的htm文件,需要抓取SEC 10k年报里的Item7管理层讨论部分做文本分析, 不考虑使用SEC的api
存在难点:
- htm文档内部标签格式不同尚不统一,同家公司不同年份甚至都会存在不同的格式
- 以item 7和item 7A/8的标题为查询界定方式但是htm里存在目录也有item7和item7A/8的标题,不知道是不是这个原因下面的codes有时会抓取10-k目录第二页的文本内容
- item 7等标题的方式也存在多种情况 e.g. Item 7, ITEM 7, ITEM_7, Item 7
以下代码有参考一个帖子的代码进行修改:
方法一:匹配精确度高但是匹配成功的10k文件少
import os
import re
from bs4 import BeautifulSoup
import csv
from pathlib import Path
class TenKScraper:
def __init__(self, section, next_section):
self.all_section = [str(i) for i in range(1, 16)] + ['1A', '1B', '7A', '9A', '9B']
section_num = re.findall(r'\d.*\w*', section.upper())[0]
next_section_num = re.findall(r'\d.*\w*', next_section.upper())[0]
if section_num not in self.all_section:
raise ValueError(f'Section: {section_num} is not available, available sections: {self.all_section}')
if next_section_num not in self.all_section:
raise ValueError(f'Section: {next_section_num} is not available, available sections: {self.all_section}')
self.section = 'Item ' + section_num
self.next_section = 'Item ' + next_section_num
self.section_upper = 'ITEM ' + section_num
self.next_section_upper = 'ITEM ' + next_section_num
def scrape_folder_to_csv(self, folder_path, output_csv):
# Prepare the CSV file
with open(output_csv, mode='w', newline='', encoding='utf-8') as csv_file:
writer = csv.writer(csv_file)
writer.writerow(['Filename', 'Extracted Content']) # CSV header
# Iterate over all files in the folder
for root, _, files in os.walk(folder_path):
for file in files:
if file.endswith('.htm') or file.endswith('.html'):
input_path = os.path.join(root, file)
print(f"Processing file: {input_path}")
content = self.scrape(input_path)
if content:
writer.writerow([file, content])
def scrape(self, input_path):
try:
with open(input_path, 'rb') as input_file:
page = input_file.read()
page = page.strip().replace(b'\n', b' ').replace(b'\r', b'').replace(b' ', b' ').replace(b' ', b' ')
while b' ' in page:
page = page.replace(b' ', b' ')
regexs = [
# 针对 Item 的正则表达式
bytes(r'(?i)<(?:span|b)[^>]*>\s*' + re.escape(self.section) + r'\.?\s*(.*?)<(?:span|b)[^>]*>\s*' + re.escape(self.next_section) + r'\.?', encoding='utf-8'),
bytes(r'(?i)' + re.escape(self.section) + r'\.\s*(.*?)' + re.escape(self.next_section) + r'\.', encoding='utf-8'),
bytes(r'bold;\">\s*' + self.section + r'\.(.+?)bold;\">\s*' + self.next_section + r'\.', encoding='utf-8'),
bytes(r'b>\s*' + self.section + r'\.(.+?)b>\s*' + self.next_section + r'\.', encoding='utf-8'),
bytes(r'' + self.section + r'\.\s*<\/b>(.+?)' + self.next_section + r'\.\s*<\/b>', encoding='utf-8'),
bytes(r'' + self.section + r'\.\s*[^<>]+\.\s*<\/b(.+?)' + self.next_section + r'\.\s*[^<>]+\.\s*<\/b', encoding='utf-8'),
bytes(r'b>\s*<font[^>]+>\s*' + self.section + r'\.(.+?)b>\s*<font[^>]+>\s*' + self.next_section + r'\.', encoding='utf-8'),
bytes(r'' + self.section.upper() + r'\.\s*<\/b>(.+?)' + self.next_section.upper() + r'\.\s*<\/b>', encoding='utf-8'),
bytes(r'' + self.section + r'\.\s+<\/b>(.+?)' + self.next_section + r'\.\s+<\/b>', encoding='utf-8'),
bytes(r'' + self.section + r'\.\s*<[^>]+>(.+?)' + self.next_section + r'\.\s*<[^>]+>', encoding='utf-8'),
bytes(r'' + self.section + r'\.\s*(.+?)' + self.next_section + r'\.\s*', encoding='utf-8'),
bytes(r'(?i)<div[^>]*>\s*<span[^>]*>\s*' + re.escape(self.section) + r'\.?\s*</span>(.*?)<div[^>]*>\s*<span[^>]*>\s*' + re.escape(self.next_section) + r'\.?\s*</span>', encoding='utf-8'),
bytes(r'(?i)<div[^>]*>\s*' + re.escape(self.section) + r'\.?\s*(.*?)<div[^>]*>\s*' + re.escape(self.next_section) + r'\.?\s*', encoding='utf-8'),
bytes(r'(?i)<span[^>]*>\s*' + re.escape(self.section) + r'\.?\s*(.*?)<span[^>]*>\s*' + re.escape(self.next_section) + r'\.?\s*', encoding='utf-8'),
bytes(r'(?i)' + re.escape(self.section) + r'\.\s*(.*?)' + re.escape(self.next_section) + r'\.', encoding='utf-8'),
# 新增:Item 在 <p> 标签里的情况
bytes(r'(?i)<p[^>]*>\s*' + re.escape(self.section) + r'\.?\s*(.*?)<\/p>', encoding='utf-8'),
bytes(r'(?i)<p[^>]*>\s*' + re.escape(self.section) + r'\.?\s*(.*?)<p[^>]*>\s*' + re.escape(self.next_section) + r'\.?\s*', encoding='utf-8'),
# 新增:ITEM 大写且在 <div> 的 <span> 里的情况
bytes(r'(?i)<div[^>]*>\s*<span[^>]*>\s*' + re.escape(self.section_upper) + r'\.?\s*(.*?)<\/span><\/div>', encoding='utf-8'),
bytes(r'(?i)<div[^>]*>\s*<span[^>]*>\s*' + re.escape(self.section_upper) + r'\.?\s*(.*?)<div[^>]*>\s*<span[^>]*>\s*' + re.escape(self.next_section_upper) + r'\.?\s*', encoding='utf-8'),
bytes(r'(?i)<p[^>]*>\s*' + re.escape(self.section) + r'\.?\s*(.*?)<\/p>', encoding='utf-8')
]
match = None
for regex in regexs:
match = re.search(regex, page, flags=re.IGNORECASE | re.DOTALL)
if match:
break
if match:
html_content = match.group(1).decode('utf-8')
soup = BeautifulSoup(html_content, 'lxml')
# 去除多余的空白字符
content = re.sub(r'\s+', ' ', soup.get_text()).strip()
return content
else:
print(f"No content found between {self.section} and {self.next_section} in {input_path}.")
return None
except Exception as e:
print(f"Error processing {input_path}: {e}")
return None
方法二:匹配精确度差但是匹配10k文档能抓取到文本内容的数量多
import os
from bs4 import BeautifulSoup
import re
import csv
class TenKScraper:
def __init__(self, section, next_section):
self.all_section = [str(i) for i in range(1, 16)] + ['1A', '1B', '7A', '9A', '9B']
section_num = re.findall(r'\d.*\w*', section.upper())[0]
next_section_num = re.findall(r'\d.*\w*', next_section.upper())[0]
if section_num not in self.all_section:
raise ValueError(f'Section: {section_num} is not available, available sections: {self.all_section}')
if next_section_num not in self.all_section:
raise ValueError(f'Section: {next_section_num} is not available, available sections: {self.all_section}')
self.section = section_num
self.next_section = next_section_num
def generate_patterns(self, section):
# 生成匹配不同格式的正则表达式模式
patterns = [
rf'(?i)Item\s*{section}',
rf'(?i)ITEM\s*{section}',
rf'(?i)Item_{section}',
rf'(?i)ITEM_{section}',
rf'(?i)Item{section}',
rf'(?i)ITEM{section}'
]
return patterns
def find_start_tag(self, soup):
# 查找起始标签
start_patterns = self.generate_patterns(self.section)
for pattern in start_patterns:
start_tags = soup.find_all(lambda tag: tag.name in ['p', 'span', 'div', 'b', 'strong'] and re.search(pattern, tag.get_text()))
if start_tags:
return start_tags[0]
return None
def find_end_tag(self, next_tag):
# 查找结束标签
end_patterns = self.generate_patterns(self.next_section)
for pattern in end_patterns:
if next_tag.name in ['p', 'span', 'div', 'b', 'strong'] and re.search(pattern, next_tag.get_text()):
return True
return False
def skip_table_of_contents(self, soup):
# 跳过目录页
toc_patterns = [r'(?i)Table of Contents', r'(?i)Table of Contents']
for pattern in toc_patterns:
toc_tags = soup.find_all(lambda tag: tag.name in ['p', 'span', 'div', 'b', 'strong'] and re.search(pattern, tag.get_text()))
if toc_tags:
last_toc_tag = toc_tags[-1]
next_tag = last_toc_tag.find_next_sibling()
if next_tag:
return next_tag
return soup.find()
def scrape(self, input_path):
try:
with open(input_path, 'r', encoding='utf-8') as input_file:
html_content = input_file.read()
soup = BeautifulSoup(html_content, 'lxml')
# 跳过目录页
start_search_tag = self.skip_table_of_contents(soup)
start_tag = self.find_start_tag(BeautifulSoup(str(start_search_tag), 'lxml'))
if not start_tag:
return None
content = []
next_tag = start_tag.find_next_sibling()
while next_tag:
if self.find_end_tag(next_tag):
break
content.append(next_tag.get_text(strip=True))
next_tag = next_tag.find_next_sibling()
return ' '.join(content)
except Exception as e:
print(f"Error processing {input_path}: {e}")
return None
def scrape_folder_to_csv(self, folder_path, output_csv):
results = []
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.endswith('.htm') or file.endswith('.html'):
file_path = os.path.join(root, file)
item_content = self.scrape(file_path)
if item_content is not None:
results.append((file, item_content))
else:
results.append((file, '未找到相关内容'))
with open(output_csv, mode='w', newline='', encoding='utf-8') as csv_file:
writer = csv.writer(csv_file)
writer.writerow(['文件名', f'Item {self.section}内容'])
for file, content in results:
writer.writerow([file, content])
if __name__ == '__main__':
folder_path = ''
output_csv = 'test.csv'
scraper = TenKScraper('7', '7A')
scraper.scrape_folder_to_csv(folder_path, output_csv)