diff --git a/README.md b/README.md
index 8bf4717..51ccb44 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@ This repository hosts default payloads collected by the OpenAEV datasets collect
### 1. Create your payload within OpenAEV
-The first step is to create the payload in an OpenAEV Platform. Be sure to fill the description, associate with relevant MITRE ATT&CK TTPs and put any relevant tagging.
+The first step is to create the payload in an OpenAEV Platform. Be sure to fill the description, associate it with relevant MITRE ATT&CK TTPs and put any relevant tagging.

@@ -34,13 +34,14 @@ In the example above, just take `Activate Guest Account.zip` and extract it to c
### 5. Verify directory structure and generate manifest
-In a payload directory, you've only the `payload.json` file and an optional `attachments.zip` containing a potential malicious file (encrypted archive). *Do not unzip this file, let it as it is*.
+In a payload directory, you should only have the `payload.json` file and an optional `attachments.zip` containing a potential malicious file (encrypted archive). *Do not unzip this file, let it as it is*.

-Before opening your pull request, just run the Python script `scripts/generate_manifest.py`.
+Before opening your pull request, the JSON file(s) must be passed through our convenience script. You'll first need to install the requirements `python -m pip install -r scripts/requirements.txt`, then run the Python script `scripts/generate_manifest.py` (minimum Python version: 3.12).
```bash
+$ python3 -m pip install -r scripts/requirements.txt
$ python3 scripts/generate_manifest.py
```
@@ -66,4 +67,4 @@ Then, test the payload and validate it works before marging the pull request.
OpenAEV is a product designed and developed by the company [Filigran](https://filigran.io).
-
\ No newline at end of file
+
diff --git a/scripts/generate_manifest.py b/scripts/generate_manifest.py
index 81ef1f7..9321b24 100644
--- a/scripts/generate_manifest.py
+++ b/scripts/generate_manifest.py
@@ -1,68 +1,172 @@
-import os
-import json
+from pathlib import Path
+import json_api_doc
+import orjson
-def find_json_files(root_dir, ignore_path):
- json_files = []
- for root, dirs, files in os.walk(root_dir):
- for file in files:
- if file.lower().endswith(".json"):
- file_path = os.path.abspath(os.path.join(root, file))
- if os.path.abspath(ignore_path) == file_path:
- continue
- json_files.append(file_path)
- return json_files
+ORJSON_OPTION = orjson.OPT_INDENT_2 | orjson.OPT_NAIVE_UTC | orjson.OPT_OMIT_MICROSECONDS | orjson.OPT_SORT_KEYS
+def is_valid_json_api(json_data):
+ """check if the JSON data is in the JSON:API format"""
+ return "data" in json_data.keys()
-def fix_and_load_json(file_path, parent_dir):
- try:
- with open(file_path, "r", encoding="utf-8") as f:
- data = json.load(f)
- changed = False
-
- info = data.get("payload_information", None)
- if info and isinstance(info, dict):
- # Set required values
- if info.get("payload_source") != "FILIGRAN":
- info["payload_source"] = "FILIGRAN"
- changed = True
- if info.get("payload_status") != "VERIFIED":
- info["payload_status"] = "VERIFIED"
+def is_valid_json_flat(json_data):
+ """check if the JSON data is in the legacy flat JSON payload format"""
+ return "payload_information" in json_data.keys()
+
+def process_json_api(data, file_path, root_path):
+ flat_data = json_api_doc.deserialize(data)
+
+ # extracting tags, cleaning them for future use and simplifying them in the flat_data
+ payload_tags = flat_data.get("payload_tags", [])
+ for idx in range(len(payload_tags)):
+ if "id" in payload_tags[idx]:
+ del payload_tags[idx]["id"]
+ if "type" in payload_tags[idx]:
+ del payload_tags[idx]["type"]
+ flat_data["payload_tags"] = [tag["tag_id"] for tag in payload_tags]
+
+ # extracting domains and simplifying them in the flat_data
+ payload_domains = flat_data.get("payload_domains", [])
+ flat_data["payload_domains"] = [
+ {"domain_name": domain["domain_name"], "domain_color": domain["domain_color"]}
+ for domain in payload_domains
+ ]
+
+ # extracting attack_patterns, cleaning them for future use and rewriting them in flat_data
+ payload_attack_patterns = flat_data.get("payload_attack_patterns", [])
+ for idx in range(len(payload_attack_patterns)):
+ if "id" in payload_attack_patterns[idx]:
+ del payload_attack_patterns[idx]["id"]
+ if "type" in payload_attack_patterns[idx]:
+ del payload_attack_patterns[idx]["type"]
+ flat_data["payload_attack_patterns"] = payload_attack_patterns
+
+ # looking for relevant document(s) and formatting them to the previous format
+ payload_document = {}
+ file_lookup = [
+ key for key
+ in flat_data
+ if isinstance(flat_data[key], dict) and flat_data[key].get("type") == "documents"
+ ]
+ if len(file_lookup)>1:
+ print("Warning, more than one file detected as attachment, fallback to first found")
+ if file_lookup:
+ file_key = file_lookup[0]
+ payload_document = flat_data.pop(file_key)
+ flat_data[file_key] = payload_document.get("document_id")
+ if "id" in payload_document:
+ del payload_document["id"]
+ if "type" in payload_document:
+ del payload_document["type"]
+ payload_document["document_tags"] = [
+ tag["tag_id"] for tag
+ in payload_document.get("document_tags", [])
+ ]
+
+ attachment_path = file_path.parent / "attachments.zip"
+ if attachment_path.is_file():
+ # Compute relative path from root_path and make URL-compatible
+ relative_path = attachment_path.relative_to(root_path)
+ relative_path = relative_path.as_posix()
+ if payload_document.get("document_path") != relative_path:
+ payload_document["document_path"] = relative_path
+ flat_data["payload_document"] = payload_document
+
+ if "payload_external_id" not in flat_data or flat_data["payload_external_id"] is None:
+ flat_data["payload_external_id"] = flat_data["payload_id"]
+ if flat_data.get("payload_source") != "FILIGRAN":
+ flat_data["payload_source"] = "FILIGRAN"
+ if flat_data.get("payload_status") != "VERIFIED":
+ flat_data["payload_status"] = "VERIFIED"
+
+ for key in ["id", "type", "payload_id", "payload_collector", "payload_collector_type"]:
+ if key in flat_data:
+ del flat_data[key]
+
+ final_data = {
+ "payload_information": flat_data,
+ "payload_tags": payload_tags,
+ "payload_document": payload_document,
+ "payload_attack_patterns": payload_attack_patterns,
+ }
+
+ bindata = orjson.dumps(final_data, default=str, option=ORJSON_OPTION)
+ file_path.write_bytes(bindata)
+
+ return final_data
+
+def process_json_flat(data, file_path, root_path):
+ changed = False
+
+ payload_information = data.get("payload_information", {})
+ if payload_information and isinstance(payload_information, dict):
+ # Set required values
+ if payload_information.get("payload_source") != "FILIGRAN":
+ payload_information["payload_source"] = "FILIGRAN"
+ changed = True
+ if payload_information.get("payload_status") != "VERIFIED":
+ payload_information["payload_status"] = "VERIFIED"
+ changed = True
+
+ # Handle payload_external_id and payload_id
+ if "payload_external_id" not in payload_information or payload_information["payload_external_id"] is None:
+ payload_information["payload_external_id"] = payload_information["payload_id"]
+ changed = True
+
+ # Remove unwanted keys
+ for key in ["payload_collector_type", "payload_collector", "payload_id"]:
+ if key in payload_information:
+ del payload_information[key]
changed = True
+ data["payload_information"] = payload_information
- # Handle payload_external_id and payload_id
- if "payload_external_id" not in info or info["payload_external_id"] is None:
- info["payload_external_id"] = info["payload_id"]
+ # Handle document_path in payload_document if attachments.zip exists
+ payload_document = data.get("payload_document")
+ if payload_document is not None and isinstance(payload_document, dict):
+ attachment_path = file_path.parent / "attachments.zip"
+ if attachment_path.is_file():
+ # Compute relative path from root_path and make URL-compatible
+ relative_path = attachment_path.relative_to(root_path)
+ relative_path = relative_path.as_posix()
+ if payload_document.get("document_path") != relative_path:
+ payload_document["document_path"] = relative_path
changed = True
+ data["payload_document"] = payload_document
- # Remove unwanted keys
- for key in ["payload_collector_type", "payload_collector", "payload_id"]:
- if key in info:
- del info[key]
- changed = True
-
- # Handle document_path in payload_document if attachments.zip exists
- payload_doc = data.get("payload_document")
- if payload_doc is not None and isinstance(payload_doc, dict):
- dir_path = os.path.dirname(file_path)
- attachment_path = os.path.join(dir_path, "attachments.zip")
- if os.path.isfile(attachment_path):
- # Compute relative path from parent_dir and make URL-compatible
- rel_path = os.path.relpath(attachment_path, parent_dir).replace(
- os.sep, "/"
- )
- if payload_doc.get("document_path") != rel_path:
- payload_doc["document_path"] = rel_path
- changed = True
-
- if changed:
- with open(file_path, "w", encoding="utf-8") as f:
- json.dump(data, f, indent=2, ensure_ascii=False)
+ if changed:
+ bindata = orjson.dumps(data, default=str, option=ORJSON_OPTION)
+ file_path.write_bytes(bindata)
+
+ return data
+
+def fix_and_load_json(file_path, root_path, raise_on_unknown=False):
+ """route the file data in the proper processing function according to format"""
+ print(f"Processing {file_path}")
+ try:
+ data = orjson.loads(file_path.read_bytes())
+
+ if is_valid_json_api(data):
+ print("File detected as matching the JSON:API format")
+ data = process_json_api(data, file_path, root_path)
+ elif is_valid_json_flat(data):
+ print("File detected as matching the legacy JSON flat format")
+ data = process_json_flat(data, file_path, root_path)
+ else:
+ if raise_on_unknown:
+ print("File is neither JSON:API nor legacy JSON flat format")
+ raise ValueError()
return data
except Exception as e:
print(f"Error loading {file_path}: {e}")
return None
+def find_json_files(root_path, ignore_path):
+ """recursively check for JSON files under root_path"""
+ return [
+ file for file
+ in root_path.glob("**/*.json")
+ if file != ignore_path
+ ]
def merge_json_files(json_files, parent_dir):
merged = []
@@ -78,13 +182,13 @@ def merge_json_files(json_files, parent_dir):
if __name__ == "__main__":
- parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
- output_path = os.path.join(parent_dir, "manifest.json")
+ root_path = Path(__file__).resolve().parent
+ output_path = root_path / "manifest.json"
- json_files = find_json_files(parent_dir, output_path)
+ json_files = find_json_files(root_path, output_path)
print(f"Found {len(json_files)} JSON files.")
- merged_data = merge_json_files(json_files, parent_dir)
- with open(output_path, "w", encoding="utf-8") as out:
- json.dump(merged_data, out, indent=2, ensure_ascii=False)
+ merged_data = merge_json_files(json_files, root_path)
+ bindata = orjson.dumps(merged_data, default=str, option=ORJSON_OPTION)
+ output_path.write_bytes(bindata)
print(f"Merged JSON saved to {output_path}")
diff --git a/scripts/requirements.txt b/scripts/requirements.txt
new file mode 100644
index 0000000..56150f7
--- /dev/null
+++ b/scripts/requirements.txt
@@ -0,0 +1,2 @@
+json-api-doc >= 0.15.0
+orjson >= 3.10,<4