136 lines
4.3 KiB
Python
136 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Convert TTPs CSV to Parquet format optimized for embedding frameworks.
|
|
|
|
Features:
|
|
- Explodes TTPs into separate rows (one TTP per row)
|
|
- Parses TTP ID and TTP Name into separate columns
|
|
- Cleans HTML entities and escapes
|
|
- Handles malformed rows gracefully
|
|
"""
|
|
|
|
import pandas as pd
|
|
import re
|
|
from pathlib import Path
|
|
|
|
# File paths
|
|
INPUT_FILE = "/Users/bzuccaro/Library/Mobile Documents/com~apple~Keynote/Documents/Notebook Conversion - Refractor/Support Files/TTPs - Full Dataset.csv"
|
|
OUTPUT_FILE = "/Users/bzuccaro/Library/Mobile Documents/com~apple~Keynote/Documents/Notebook Conversion - Refractor/Support Files/ttps_dataset.parquet"
|
|
|
|
def clean_text(text: str) -> str:
|
|
"""Clean HTML entities and normalize text."""
|
|
if not isinstance(text, str):
|
|
return text
|
|
|
|
# Replace HTML entities
|
|
replacements = {
|
|
'&': '&',
|
|
'>': '>',
|
|
'<': '<',
|
|
'"': '"',
|
|
''': "'",
|
|
}
|
|
for entity, char in replacements.items():
|
|
text = text.replace(entity, char)
|
|
|
|
return text.strip()
|
|
|
|
def parse_ttps(ttp_string: str) -> list:
|
|
"""
|
|
Parse TTP string like "['T1057 - Process Discovery', 'T1569.002 - System Services']"
|
|
into list of tuples: [('T1057', 'Process Discovery'), ('T1569.002', 'System Services')]
|
|
"""
|
|
if not isinstance(ttp_string, str) or not ttp_string.strip():
|
|
return []
|
|
|
|
# Remove surrounding brackets if present
|
|
ttp_string = ttp_string.strip()
|
|
if ttp_string.startswith("[") and ttp_string.endswith("]"):
|
|
ttp_string = ttp_string[1:-1]
|
|
|
|
# Parse individual TTPs
|
|
ttps = []
|
|
# Match pattern like 'T1057 - Process Discovery' or "T1057 - Process Discovery"
|
|
pattern = r"['\"]?(T[\d\.]+)\s*-\s*([^'\"]+)['\"]?"
|
|
matches = re.findall(pattern, ttp_string)
|
|
|
|
for match in matches:
|
|
ttp_id, ttp_name = match
|
|
ttps.append({
|
|
'ttp_id': ttp_id.strip(),
|
|
'ttp_name': ttp_name.strip()
|
|
})
|
|
|
|
# If regex didn't find anything, try simpler split
|
|
if not ttps and ',' in ttp_string:
|
|
items = ttp_string.split(',')
|
|
for item in items:
|
|
item = item.strip().strip("'\"").strip()
|
|
if ' - ' in item:
|
|
parts = item.split(' - ', 1)
|
|
ttps.append({
|
|
'ttp_id': parts[0].strip(),
|
|
'ttp_name': parts[1].strip()
|
|
})
|
|
|
|
return ttps
|
|
|
|
def main():
|
|
print(f"Reading CSV from: {INPUT_FILE}")
|
|
|
|
# Read CSV
|
|
df = pd.read_csv(INPUT_FILE)
|
|
print(f"Read {len(df)} rows with columns: {df.columns.tolist()}")
|
|
|
|
# Clean text in Scenario column
|
|
print("Cleaning HTML entities in Scenario column...")
|
|
df['Scenario'] = df['Scenario'].apply(clean_text)
|
|
|
|
# Parse TTPs and explode into separate rows
|
|
print("Parsing TTPs and exploding to separate rows...")
|
|
|
|
# Convert TTP column to list of dicts
|
|
parsed_data = []
|
|
for idx, row in df.iterrows():
|
|
scenario = row['Scenario']
|
|
ttp_list = parse_ttps(str(row['TTP']))
|
|
|
|
if ttp_list:
|
|
# Deduplicate by TTP_ID within each scenario
|
|
seen_ids = set()
|
|
for ttp in ttp_list:
|
|
if ttp['ttp_id'] not in seen_ids:
|
|
seen_ids.add(ttp['ttp_id'])
|
|
parsed_data.append({
|
|
'Scenario': scenario,
|
|
'TTP_ID': ttp['ttp_id'],
|
|
'TTP_Name': ttp['ttp_name']
|
|
})
|
|
else:
|
|
# Keep rows with no valid TTPs but with empty TTP fields
|
|
parsed_data.append({
|
|
'Scenario': scenario,
|
|
'TTP_ID': None,
|
|
'TTP_Name': None
|
|
})
|
|
|
|
# Create new dataframe
|
|
new_df = pd.DataFrame(parsed_data)
|
|
print(f"Exploded to {len(new_df)} rows")
|
|
|
|
# Save to parquet
|
|
print(f"Saving to Parquet: {OUTPUT_FILE}")
|
|
new_df.to_parquet(OUTPUT_FILE, index=False, engine='pyarrow')
|
|
|
|
# Print stats
|
|
print("\n=== Conversion Complete ===")
|
|
print(f"Original rows: {len(df)}")
|
|
print(f"Parquet rows: {len(new_df)}")
|
|
print(f"Columns: {new_df.columns.tolist()}")
|
|
print(f"Rows with TTP_ID: {new_df['TTP_ID'].notna().sum()}")
|
|
print(f"Sample rows:")
|
|
print(new_df.head(10).to_string())
|
|
|
|
if __name__ == "__main__":
|
|
main()
|