ingest.py•6.84 kB
import os
import glob
import tiktoken
from typing import List, Dict, Tuple
from dataclasses import dataclass
from pinecone import Pinecone
import openai
from tqdm import tqdm
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from dotenv import load_dotenv
load_dotenv(override = True)
@dataclass
class Filing:
ticker: str
company_name: str
report_type: str
filing_date: str
file_path: str
summary_path: str
content: str
summary: str
class SECFilingProcessor:
def __init__(self, pinecone_index_name: str = 'sec-embeddings'):
self.pc = Pinecone(api_key = os.getenv('PINECONE_API_KEY'))
self.index = self.pc.Index(pinecone_index_name)
self.encoding = tiktoken.encoding_for_model('text-embedding-3-small')
self.company_names = {
'AAPL': 'Apple Inc.',
'AMZN': 'Amazon.com Inc.',
'FL': 'Foot Locker Inc.',
'KO': 'The Coca-Cola Company',
'META': 'Meta Platforms Inc.',
'MSFT': 'Microsoft Corporation',
'NVDA': 'NVIDIA Corporation',
'TSLA': 'Tesla Inc.'
}
def parse_filename(self, filename: str) -> Tuple[str, str, str]:
parts = filename.replace('.txt', '').split('_')
ticker = parts[0]
report_code = parts[1]
filing_date = parts[2]
report_type = 'yearly' if report_code == '10K' else 'quarterly'
return ticker, report_type, filing_date
def create_chunks(self,
text: str,
chunk_size: int = 1024,
overlap: int = 128) -> List[str]:
tokens = self.encoding.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = self.encoding.decode(chunk_tokens)
chunks.append(chunk_text)
if end >= len(tokens):
break
start = end - overlap
return chunks
def load_filing(self, file_path: str) -> Filing:
filename = os.path.basename(file_path)
if '_summary' in filename:
return None
ticker, report_type, filing_date = self.parse_filename(filename)
company_name = self.company_names.get(ticker, ticker)
summary_filename = filename.replace('.txt', '_summary.txt')
summary_path = os.path.join(os.path.dirname(file_path), summary_filename)
with open(file_path, 'r', encoding = 'utf-8') as f:
content = f.read()
summary = ''
if os.path.exists(summary_path):
with open(summary_path, 'r', encoding = 'utf-8') as f:
summary = f.read()
return Filing(
ticker = ticker,
company_name = company_name,
report_type = report_type,
filing_date = filing_date,
file_path = file_path,
summary_path = summary_path,
content = content,
summary = summary
)
def embed(self, docs: List[str]) -> List[List[float]]:
res = openai.embeddings.create(
input = docs,
model = 'text-embedding-3-small',
dimensions = 512
)
doc_embeds = [r.embedding for r in res.data]
return doc_embeds
def process_ticker(self, ticker: str, data_dir: str, chunk_size: int = 1024, namespace: str = '__default__'):
ticker_dir = os.path.join(data_dir, ticker)
files = glob.glob(f'{ticker_dir}/*.txt')
files = [f for f in files if '_summary' not in f]
print(f'\nProcessing {ticker}: {len(files)} files')
chunk_data = []
for file_path in tqdm(files, desc = f'Loading {ticker} files'):
filing = self.load_filing(file_path)
if filing is None:
continue
chunks = self.create_chunks(filing.content, chunk_size = chunk_size)
for i, chunk_text in enumerate(chunks):
enhanced_text = f'''
Summary of {filing.company_name} {filing.report_type} report ({filing.filing_date}):
{filing.summary}
Content (Part {i + 1}):
{chunk_text}
'''
chunk_id = f'{ticker}_{filing.filing_date}_{i}'
chunk_data.append({
'id': chunk_id,
'text': enhanced_text,
'metadata': {
'ticker': filing.ticker,
'company_name': filing.company_name,
'report_type': filing.report_type,
'filing_date': filing.filing_date,
'chunk_index': i,
'original_text': chunk_text[:1000]
}
})
print(f'{ticker}: Created {len(chunk_data)} chunks from {len(files)} files')
batch_size = 50
total_uploaded = 0
for i in range(0, len(chunk_data), batch_size):
batch = chunk_data[i:i + batch_size]
texts = [d['text'] for d in batch]
embeddings = self.embed(texts)
vectors = []
for d, e in zip(batch, embeddings):
vectors.append({
'id': d['id'],
'values': e,
'metadata': d['metadata']
})
self.index.upsert(
vectors = vectors,
namespace = namespace
)
total_uploaded += len(vectors)
print(f' Uploaded batch {i//batch_size + 1}: {len(vectors)} vectors')
print(f'{ticker}: Total uploaded {total_uploaded} vectors')
return total_uploaded
def process_all_tickers(self, data_dir: str, chunk_size: int = 1024, namespace: str = '__default__'):
tickers = ['AAPL', 'AMZN', 'FL', 'KO', 'META', 'MSFT', 'NVDA', 'TSLA']
total_vectors = 0
for ticker in tickers:
count = self.process_ticker(ticker, data_dir, chunk_size, namespace)
total_vectors += count
print(f'\n✅ Complete: Uploaded {total_vectors} total vectors to Pinecone')
return total_vectors
if __name__ == '__main__':
DATA_DIR = '../data'
processor = SECFilingProcessor()
processor.process_all_tickers(
data_dir = DATA_DIR,
chunk_size = 1024,
namespace = '__default__'
)