From baecad18de68919f48badfffc84477724eca7d3d Mon Sep 17 00:00:00 2001 From: guzmud Date: Tue, 12 May 2026 15:22:16 +0200 Subject: [PATCH] [payloads] feat(scripts): updating manifest generation for JSON:API (#7) --- README.md | 9 +- scripts/generate_manifest.py | 220 ++++++++++++++++++++++++++--------- scripts/requirements.txt | 2 + 3 files changed, 169 insertions(+), 62 deletions(-) create mode 100644 scripts/requirements.txt diff --git a/README.md b/README.md index 8bf4717..51ccb44 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This repository hosts default payloads collected by the OpenAEV datasets collect ### 1. Create your payload within OpenAEV -The first step is to create the payload in an OpenAEV Platform. Be sure to fill the description, associate with relevant MITRE ATT&CK TTPs and put any relevant tagging. +The first step is to create the payload in an OpenAEV Platform. Be sure to fill the description, associate it with relevant MITRE ATT&CK TTPs and put any relevant tagging. ![Create Payload](./.github/img/create-payload.png "Create Payload") @@ -34,13 +34,14 @@ In the example above, just take `Activate Guest Account.zip` and extract it to c ### 5. Verify directory structure and generate manifest -In a payload directory, you've only the `payload.json` file and an optional `attachments.zip` containing a potential malicious file (encrypted archive). *Do not unzip this file, let it as it is*. +In a payload directory, you should only have the `payload.json` file and an optional `attachments.zip` containing a potential malicious file (encrypted archive). *Do not unzip this file, let it as it is*. ![File Structure](./.github/img/file-structure.png "File Structure") -Before opening your pull request, just run the Python script `scripts/generate_manifest.py`. +Before opening your pull request, the JSON file(s) must be passed through our convenience script. You'll first need to install the requirements `python -m pip install -r scripts/requirements.txt`, then run the Python script `scripts/generate_manifest.py` (minimum Python version: 3.12). ```bash +$ python3 -m pip install -r scripts/requirements.txt $ python3 scripts/generate_manifest.py ``` @@ -66,4 +67,4 @@ Then, test the payload and validate it works before marging the pull request. OpenAEV is a product designed and developed by the company [Filigran](https://filigran.io). - \ No newline at end of file + diff --git a/scripts/generate_manifest.py b/scripts/generate_manifest.py index 81ef1f7..9321b24 100644 --- a/scripts/generate_manifest.py +++ b/scripts/generate_manifest.py @@ -1,68 +1,172 @@ -import os -import json +from pathlib import Path +import json_api_doc +import orjson -def find_json_files(root_dir, ignore_path): - json_files = [] - for root, dirs, files in os.walk(root_dir): - for file in files: - if file.lower().endswith(".json"): - file_path = os.path.abspath(os.path.join(root, file)) - if os.path.abspath(ignore_path) == file_path: - continue - json_files.append(file_path) - return json_files +ORJSON_OPTION = orjson.OPT_INDENT_2 | orjson.OPT_NAIVE_UTC | orjson.OPT_OMIT_MICROSECONDS | orjson.OPT_SORT_KEYS +def is_valid_json_api(json_data): + """check if the JSON data is in the JSON:API format""" + return "data" in json_data.keys() -def fix_and_load_json(file_path, parent_dir): - try: - with open(file_path, "r", encoding="utf-8") as f: - data = json.load(f) - changed = False - - info = data.get("payload_information", None) - if info and isinstance(info, dict): - # Set required values - if info.get("payload_source") != "FILIGRAN": - info["payload_source"] = "FILIGRAN" - changed = True - if info.get("payload_status") != "VERIFIED": - info["payload_status"] = "VERIFIED" +def is_valid_json_flat(json_data): + """check if the JSON data is in the legacy flat JSON payload format""" + return "payload_information" in json_data.keys() + +def process_json_api(data, file_path, root_path): + flat_data = json_api_doc.deserialize(data) + + # extracting tags, cleaning them for future use and simplifying them in the flat_data + payload_tags = flat_data.get("payload_tags", []) + for idx in range(len(payload_tags)): + if "id" in payload_tags[idx]: + del payload_tags[idx]["id"] + if "type" in payload_tags[idx]: + del payload_tags[idx]["type"] + flat_data["payload_tags"] = [tag["tag_id"] for tag in payload_tags] + + # extracting domains and simplifying them in the flat_data + payload_domains = flat_data.get("payload_domains", []) + flat_data["payload_domains"] = [ + {"domain_name": domain["domain_name"], "domain_color": domain["domain_color"]} + for domain in payload_domains + ] + + # extracting attack_patterns, cleaning them for future use and rewriting them in flat_data + payload_attack_patterns = flat_data.get("payload_attack_patterns", []) + for idx in range(len(payload_attack_patterns)): + if "id" in payload_attack_patterns[idx]: + del payload_attack_patterns[idx]["id"] + if "type" in payload_attack_patterns[idx]: + del payload_attack_patterns[idx]["type"] + flat_data["payload_attack_patterns"] = payload_attack_patterns + + # looking for relevant document(s) and formatting them to the previous format + payload_document = {} + file_lookup = [ + key for key + in flat_data + if isinstance(flat_data[key], dict) and flat_data[key].get("type") == "documents" + ] + if len(file_lookup)>1: + print("Warning, more than one file detected as attachment, fallback to first found") + if file_lookup: + file_key = file_lookup[0] + payload_document = flat_data.pop(file_key) + flat_data[file_key] = payload_document.get("document_id") + if "id" in payload_document: + del payload_document["id"] + if "type" in payload_document: + del payload_document["type"] + payload_document["document_tags"] = [ + tag["tag_id"] for tag + in payload_document.get("document_tags", []) + ] + + attachment_path = file_path.parent / "attachments.zip" + if attachment_path.is_file(): + # Compute relative path from root_path and make URL-compatible + relative_path = attachment_path.relative_to(root_path) + relative_path = relative_path.as_posix() + if payload_document.get("document_path") != relative_path: + payload_document["document_path"] = relative_path + flat_data["payload_document"] = payload_document + + if "payload_external_id" not in flat_data or flat_data["payload_external_id"] is None: + flat_data["payload_external_id"] = flat_data["payload_id"] + if flat_data.get("payload_source") != "FILIGRAN": + flat_data["payload_source"] = "FILIGRAN" + if flat_data.get("payload_status") != "VERIFIED": + flat_data["payload_status"] = "VERIFIED" + + for key in ["id", "type", "payload_id", "payload_collector", "payload_collector_type"]: + if key in flat_data: + del flat_data[key] + + final_data = { + "payload_information": flat_data, + "payload_tags": payload_tags, + "payload_document": payload_document, + "payload_attack_patterns": payload_attack_patterns, + } + + bindata = orjson.dumps(final_data, default=str, option=ORJSON_OPTION) + file_path.write_bytes(bindata) + + return final_data + +def process_json_flat(data, file_path, root_path): + changed = False + + payload_information = data.get("payload_information", {}) + if payload_information and isinstance(payload_information, dict): + # Set required values + if payload_information.get("payload_source") != "FILIGRAN": + payload_information["payload_source"] = "FILIGRAN" + changed = True + if payload_information.get("payload_status") != "VERIFIED": + payload_information["payload_status"] = "VERIFIED" + changed = True + + # Handle payload_external_id and payload_id + if "payload_external_id" not in payload_information or payload_information["payload_external_id"] is None: + payload_information["payload_external_id"] = payload_information["payload_id"] + changed = True + + # Remove unwanted keys + for key in ["payload_collector_type", "payload_collector", "payload_id"]: + if key in payload_information: + del payload_information[key] changed = True + data["payload_information"] = payload_information - # Handle payload_external_id and payload_id - if "payload_external_id" not in info or info["payload_external_id"] is None: - info["payload_external_id"] = info["payload_id"] + # Handle document_path in payload_document if attachments.zip exists + payload_document = data.get("payload_document") + if payload_document is not None and isinstance(payload_document, dict): + attachment_path = file_path.parent / "attachments.zip" + if attachment_path.is_file(): + # Compute relative path from root_path and make URL-compatible + relative_path = attachment_path.relative_to(root_path) + relative_path = relative_path.as_posix() + if payload_document.get("document_path") != relative_path: + payload_document["document_path"] = relative_path changed = True + data["payload_document"] = payload_document - # Remove unwanted keys - for key in ["payload_collector_type", "payload_collector", "payload_id"]: - if key in info: - del info[key] - changed = True - - # Handle document_path in payload_document if attachments.zip exists - payload_doc = data.get("payload_document") - if payload_doc is not None and isinstance(payload_doc, dict): - dir_path = os.path.dirname(file_path) - attachment_path = os.path.join(dir_path, "attachments.zip") - if os.path.isfile(attachment_path): - # Compute relative path from parent_dir and make URL-compatible - rel_path = os.path.relpath(attachment_path, parent_dir).replace( - os.sep, "/" - ) - if payload_doc.get("document_path") != rel_path: - payload_doc["document_path"] = rel_path - changed = True - - if changed: - with open(file_path, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2, ensure_ascii=False) + if changed: + bindata = orjson.dumps(data, default=str, option=ORJSON_OPTION) + file_path.write_bytes(bindata) + + return data + +def fix_and_load_json(file_path, root_path, raise_on_unknown=False): + """route the file data in the proper processing function according to format""" + print(f"Processing {file_path}") + try: + data = orjson.loads(file_path.read_bytes()) + + if is_valid_json_api(data): + print("File detected as matching the JSON:API format") + data = process_json_api(data, file_path, root_path) + elif is_valid_json_flat(data): + print("File detected as matching the legacy JSON flat format") + data = process_json_flat(data, file_path, root_path) + else: + if raise_on_unknown: + print("File is neither JSON:API nor legacy JSON flat format") + raise ValueError() return data except Exception as e: print(f"Error loading {file_path}: {e}") return None +def find_json_files(root_path, ignore_path): + """recursively check for JSON files under root_path""" + return [ + file for file + in root_path.glob("**/*.json") + if file != ignore_path + ] def merge_json_files(json_files, parent_dir): merged = [] @@ -78,13 +182,13 @@ def merge_json_files(json_files, parent_dir): if __name__ == "__main__": - parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) - output_path = os.path.join(parent_dir, "manifest.json") + root_path = Path(__file__).resolve().parent + output_path = root_path / "manifest.json" - json_files = find_json_files(parent_dir, output_path) + json_files = find_json_files(root_path, output_path) print(f"Found {len(json_files)} JSON files.") - merged_data = merge_json_files(json_files, parent_dir) - with open(output_path, "w", encoding="utf-8") as out: - json.dump(merged_data, out, indent=2, ensure_ascii=False) + merged_data = merge_json_files(json_files, root_path) + bindata = orjson.dumps(merged_data, default=str, option=ORJSON_OPTION) + output_path.write_bytes(bindata) print(f"Merged JSON saved to {output_path}") diff --git a/scripts/requirements.txt b/scripts/requirements.txt new file mode 100644 index 0000000..56150f7 --- /dev/null +++ b/scripts/requirements.txt @@ -0,0 +1,2 @@ +json-api-doc >= 0.15.0 +orjson >= 3.10,<4