Files
LLM-Labs/Support Files/convert_csv_to_parquet.py
T
2026-03-22 16:17:20 -06:00

136 lines
4.3 KiB
Python

#!/usr/bin/env python3
"""
Convert TTPs CSV to Parquet format optimized for embedding frameworks.
Features:
- Explodes TTPs into separate rows (one TTP per row)
- Parses TTP ID and TTP Name into separate columns
- Cleans HTML entities and escapes
- Handles malformed rows gracefully
"""
import pandas as pd
import re
from pathlib import Path
# File paths
INPUT_FILE = "/Users/bzuccaro/Library/Mobile Documents/com~apple~Keynote/Documents/Notebook Conversion - Refractor/Support Files/TTPs - Full Dataset.csv"
OUTPUT_FILE = "/Users/bzuccaro/Library/Mobile Documents/com~apple~Keynote/Documents/Notebook Conversion - Refractor/Support Files/ttps_dataset.parquet"
def clean_text(text: str) -> str:
"""Clean HTML entities and normalize text."""
if not isinstance(text, str):
return text
# Replace HTML entities
replacements = {
'&': '&',
'>': '>',
'&lt;': '<',
'&quot;': '"',
'&#39;': "'",
}
for entity, char in replacements.items():
text = text.replace(entity, char)
return text.strip()
def parse_ttps(ttp_string: str) -> list:
"""
Parse TTP string like "['T1057 - Process Discovery', 'T1569.002 - System Services']"
into list of tuples: [('T1057', 'Process Discovery'), ('T1569.002', 'System Services')]
"""
if not isinstance(ttp_string, str) or not ttp_string.strip():
return []
# Remove surrounding brackets if present
ttp_string = ttp_string.strip()
if ttp_string.startswith("[") and ttp_string.endswith("]"):
ttp_string = ttp_string[1:-1]
# Parse individual TTPs
ttps = []
# Match pattern like 'T1057 - Process Discovery' or "T1057 - Process Discovery"
pattern = r"['\"]?(T[\d\.]+)\s*-\s*([^'\"]+)['\"]?"
matches = re.findall(pattern, ttp_string)
for match in matches:
ttp_id, ttp_name = match
ttps.append({
'ttp_id': ttp_id.strip(),
'ttp_name': ttp_name.strip()
})
# If regex didn't find anything, try simpler split
if not ttps and ',' in ttp_string:
items = ttp_string.split(',')
for item in items:
item = item.strip().strip("'\"").strip()
if ' - ' in item:
parts = item.split(' - ', 1)
ttps.append({
'ttp_id': parts[0].strip(),
'ttp_name': parts[1].strip()
})
return ttps
def main():
print(f"Reading CSV from: {INPUT_FILE}")
# Read CSV
df = pd.read_csv(INPUT_FILE)
print(f"Read {len(df)} rows with columns: {df.columns.tolist()}")
# Clean text in Scenario column
print("Cleaning HTML entities in Scenario column...")
df['Scenario'] = df['Scenario'].apply(clean_text)
# Parse TTPs and explode into separate rows
print("Parsing TTPs and exploding to separate rows...")
# Convert TTP column to list of dicts
parsed_data = []
for idx, row in df.iterrows():
scenario = row['Scenario']
ttp_list = parse_ttps(str(row['TTP']))
if ttp_list:
# Deduplicate by TTP_ID within each scenario
seen_ids = set()
for ttp in ttp_list:
if ttp['ttp_id'] not in seen_ids:
seen_ids.add(ttp['ttp_id'])
parsed_data.append({
'Scenario': scenario,
'TTP_ID': ttp['ttp_id'],
'TTP_Name': ttp['ttp_name']
})
else:
# Keep rows with no valid TTPs but with empty TTP fields
parsed_data.append({
'Scenario': scenario,
'TTP_ID': None,
'TTP_Name': None
})
# Create new dataframe
new_df = pd.DataFrame(parsed_data)
print(f"Exploded to {len(new_df)} rows")
# Save to parquet
print(f"Saving to Parquet: {OUTPUT_FILE}")
new_df.to_parquet(OUTPUT_FILE, index=False, engine='pyarrow')
# Print stats
print("\n=== Conversion Complete ===")
print(f"Original rows: {len(df)}")
print(f"Parquet rows: {len(new_df)}")
print(f"Columns: {new_df.columns.tolist()}")
print(f"Rows with TTP_ID: {new_df['TTP_ID'].notna().sum()}")
print(f"Sample rows:")
print(new_df.head(10).to_string())
if __name__ == "__main__":
main()