#!/usr/bin/env python3 """ Convert TTPs CSV to Parquet format optimized for embedding frameworks. Features: - Explodes TTPs into separate rows (one TTP per row) - Parses TTP ID and TTP Name into separate columns - Cleans HTML entities and escapes - Handles malformed rows gracefully """ import pandas as pd import re from pathlib import Path # File paths INPUT_FILE = "/Users/bzuccaro/Library/Mobile Documents/com~apple~Keynote/Documents/Notebook Conversion - Refractor/Support Files/TTPs - Full Dataset.csv" OUTPUT_FILE = "/Users/bzuccaro/Library/Mobile Documents/com~apple~Keynote/Documents/Notebook Conversion - Refractor/Support Files/ttps_dataset.parquet" def clean_text(text: str) -> str: """Clean HTML entities and normalize text.""" if not isinstance(text, str): return text # Replace HTML entities replacements = { '&': '&', '>': '>', '<': '<', '"': '"', ''': "'", } for entity, char in replacements.items(): text = text.replace(entity, char) return text.strip() def parse_ttps(ttp_string: str) -> list: """ Parse TTP string like "['T1057 - Process Discovery', 'T1569.002 - System Services']" into list of tuples: [('T1057', 'Process Discovery'), ('T1569.002', 'System Services')] """ if not isinstance(ttp_string, str) or not ttp_string.strip(): return [] # Remove surrounding brackets if present ttp_string = ttp_string.strip() if ttp_string.startswith("[") and ttp_string.endswith("]"): ttp_string = ttp_string[1:-1] # Parse individual TTPs ttps = [] # Match pattern like 'T1057 - Process Discovery' or "T1057 - Process Discovery" pattern = r"['\"]?(T[\d\.]+)\s*-\s*([^'\"]+)['\"]?" matches = re.findall(pattern, ttp_string) for match in matches: ttp_id, ttp_name = match ttps.append({ 'ttp_id': ttp_id.strip(), 'ttp_name': ttp_name.strip() }) # If regex didn't find anything, try simpler split if not ttps and ',' in ttp_string: items = ttp_string.split(',') for item in items: item = item.strip().strip("'\"").strip() if ' - ' in item: parts = item.split(' - ', 1) ttps.append({ 'ttp_id': parts[0].strip(), 'ttp_name': parts[1].strip() }) return ttps def main(): print(f"Reading CSV from: {INPUT_FILE}") # Read CSV df = pd.read_csv(INPUT_FILE) print(f"Read {len(df)} rows with columns: {df.columns.tolist()}") # Clean text in Scenario column print("Cleaning HTML entities in Scenario column...") df['Scenario'] = df['Scenario'].apply(clean_text) # Parse TTPs and explode into separate rows print("Parsing TTPs and exploding to separate rows...") # Convert TTP column to list of dicts parsed_data = [] for idx, row in df.iterrows(): scenario = row['Scenario'] ttp_list = parse_ttps(str(row['TTP'])) if ttp_list: # Deduplicate by TTP_ID within each scenario seen_ids = set() for ttp in ttp_list: if ttp['ttp_id'] not in seen_ids: seen_ids.add(ttp['ttp_id']) parsed_data.append({ 'Scenario': scenario, 'TTP_ID': ttp['ttp_id'], 'TTP_Name': ttp['ttp_name'] }) else: # Keep rows with no valid TTPs but with empty TTP fields parsed_data.append({ 'Scenario': scenario, 'TTP_ID': None, 'TTP_Name': None }) # Create new dataframe new_df = pd.DataFrame(parsed_data) print(f"Exploded to {len(new_df)} rows") # Save to parquet print(f"Saving to Parquet: {OUTPUT_FILE}") new_df.to_parquet(OUTPUT_FILE, index=False, engine='pyarrow') # Print stats print("\n=== Conversion Complete ===") print(f"Original rows: {len(df)}") print(f"Parquet rows: {len(new_df)}") print(f"Columns: {new_df.columns.tolist()}") print(f"Rows with TTP_ID: {new_df['TTP_ID'].notna().sum()}") print(f"Sample rows:") print(new_df.head(10).to_string()) if __name__ == "__main__": main()