Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 286 additions & 18 deletions lib/cuckoo/common/network_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# See the file 'docs/LICENSE' for copying permission.

import datetime
import re
from collections import defaultdict
from contextlib import suppress
from urllib.parse import urlparse

Expand Down Expand Up @@ -64,6 +66,9 @@
}


_HEX_HANDLE_RE = re.compile(r"^(?:0x)?([0-9a-fA-F]+)$")


def _norm_domain(d):
if not d or not isinstance(d, str):
return None
Expand All @@ -86,7 +91,7 @@ def _parse_behavior_ts(ts_str):

def _get_call_args_dict(call):
"""Convert arguments list to a dictionary for O(1) access."""
return {a["name"]: a["value"] for a in call.get("arguments", []) if "name" in a}
return {a["name"].lower(): a["value"] for a in call.get("arguments", []) if "name" in a}


def _extract_domain_from_call(call, args_map):
Expand All @@ -98,16 +103,14 @@ def _extract_domain_from_call(call, args_map):
"nodename",
"name",
"domain",
"szName",
"pszName",
"lpName",
"szname",
"pszname",
"lpname",
"query",
"queryname",
"dns_name",
"QueryName",
"lpstrName",
"pName",
"ServerName",
"lpstrname",
"pname",
"servername",
):
v = args_map.get(name)
Expand All @@ -127,8 +130,8 @@ def _extract_domain_from_call(call, args_map):
def _get_arg_any(args_map, *names):
"""Return the first matching argument value for any of the provided names."""
for n in names:
if n in args_map:
return args_map[n]
if n.lower() in args_map:
return args_map[n.lower()]
return None


Expand Down Expand Up @@ -200,6 +203,8 @@ def _http_host_from_buf(buf):

def _safe_int(x):
with suppress(Exception):
if isinstance(x, str) and x.lower().startswith("0x"):
return int(x, 16)
return int(x)
return None

Expand Down Expand Up @@ -253,19 +258,19 @@ def _extract_tls_server_name(call, args_map):
"""
Best-effort server name extraction for TLS/SChannel/SSPI.
"""
def _is_valid_domain_chars(s):
for c in s:
if not (c.isalnum() or c in ".-_"):
return False
return True

for name in (
"sni",
"SNI",
"ServerName",
"servername",
"server_name",
"TargetName",
"targetname",
"Host",
"host",
"hostname",
"Url",
"URL",
"url",
):
v = args_map.get(name)
Expand All @@ -274,7 +279,7 @@ def _extract_tls_server_name(call, args_map):
u = _extract_first_url(s)
if u:
return _host_from_url(u) or s
if "." in s and " " not in s and len(s) < 260:
if "." in s and " " not in s and len(s) < 260 and _is_valid_domain_chars(s):
return s

for v in args_map.values():
Expand All @@ -284,6 +289,269 @@ def _extract_tls_server_name(call, args_map):
u = _extract_first_url(s)
if u:
return _host_from_url(u) or s
return s
if _is_valid_domain_chars(s):
return s

return None


def _parse_handle(v):
"""Normalize handles into '0x...' lowercase. Return None if invalid/zero."""
if v is None:
return None
if isinstance(v, int):
if v <= 0:
return None
return "0x%x" % v
with suppress(Exception):
s = str(v).strip()
if not s:
return None
m = _HEX_HANDLE_RE.match(s)
if not m:
return None
n = int(m.group(1), 16)
if n <= 0:
return None
return "0x%x" % n
return None


def _get_call_ret_handle(call):
return _parse_handle(call.get("return") or call.get("retval") or call.get("ret"))


def _call_ok(call):
"""
In your data, status is boolean.
Keep tolerant for other shapes.
"""
v = call.get("status")
if isinstance(v, bool):
return v
if isinstance(v, str):
return v.lower() in ("success", "true", "1")
return True


def _winhttp_get_proc_state(state, process):
pid = process.get("process_id")
pname = process.get("process_name", "") or ""
procs = state.setdefault("processes", {})
key = pid if pid is not None else (pname or "unknown")
pstate = procs.get(key)
if pstate is None:
pstate = {
"process_id": pid,
"process_name": pname,
"sessions": {}, # session_handle -> session dict
"connects": {}, # connect_handle -> connect dict
"requests": {}, # request_handle -> request dict
}
procs[key] = pstate
return pstate


def winhttp_update_from_call(pstate, api_lc, args_map, ret_handle):
"""
Update WinHTTP state from one call.
args_map keys are lowercased by _get_call_args_dict().
"""
# WinHttpOpen -> session handle
if api_lc == "winhttpopen" and ret_handle:
sess = pstate["sessions"].get(ret_handle)
if sess is None:
sess = {
"handle": ret_handle,
"user_agent": "",
"access_type": "",
"proxy_name": "",
"proxy_bypass": "",
"flags": "",
"options": [],
"connections": [], # list of connect objects
}
pstate["sessions"][ret_handle] = sess

ua = args_map.get("useragent")
if ua and not sess["user_agent"]:
sess["user_agent"] = str(ua)
if args_map.get("accesstype") is not None:
sess["access_type"] = str(args_map.get("accesstype"))
if args_map.get("proxyname") is not None:
sess["proxy_name"] = str(args_map.get("proxyname"))
if args_map.get("proxybypass") is not None:
sess["proxy_bypass"] = str(args_map.get("proxybypass"))
if args_map.get("flags") is not None:
sess["flags"] = str(args_map.get("flags"))
return

# WinHttpConnect -> connect handle (binds to session)
if api_lc == "winhttpconnect" and ret_handle:
sh = _parse_handle(args_map.get("sessionhandle"))
server = args_map.get("servername")
port = args_map.get("serverport")

conn = pstate["connects"].get(ret_handle)
if conn is None:
conn = {
"handle": ret_handle,
"session_handle": sh,
"server": str(server or ""),
"port": None,
"options": [],
"requests": [], # list of request objects
}
pstate["connects"][ret_handle] = conn

if sh and not conn.get("session_handle"):
conn["session_handle"] = sh
if server and not conn.get("server"):
conn["server"] = str(server)
if conn.get("port") is None and port is not None:
with suppress(Exception):
conn["port"] = int(port)

if sh:
sess = pstate["sessions"].get(sh)
if sess is not None:
# ensure uniqueness by handle
for c in sess["connections"]:
if isinstance(c, dict) and c.get("handle") == ret_handle:
return
sess["connections"].append(conn)
return

# WinHttpOpenRequest -> request handle (binds to connect)
if api_lc == "winhttpopenrequest" and ret_handle:
ch = _parse_handle(args_map.get("internethandle"))
req = pstate["requests"].get(ret_handle)
if req is None:
req = {
"handle": ret_handle,
"connect_handle": ch,
"verb": str(args_map.get("verb") or ""),
"object": str(args_map.get("objectname") or ""),
"flags": str(args_map.get("flags") or ""),
"version": str(args_map.get("version") or ""),
"referrer": str(args_map.get("referrer") or ""),
"options": [],
"url": "",
}
pstate["requests"][ret_handle] = req
else:
if ch and not req.get("connect_handle"):
req["connect_handle"] = ch

if ch:
conn = pstate["connects"].get(ch)
if conn is not None:
for r in conn["requests"]:
if isinstance(r, dict) and r.get("handle") == ret_handle:
break
else:
conn["requests"].append(req)

if conn.get("server") and req.get("object"):
scheme = "https" if conn.get("port") == 443 else "http"
req["url"] = "%s://%s%s" % (scheme, conn["server"], req["object"])
return

# WinHttpSetOption -> applies to session/connect/request by handle
if api_lc == "winhttpsetoption":
h = _parse_handle(args_map.get("internethandle"))
if not h:
return
opt_entry = {"option": str(args_map.get("option") or ""), "buffer": str(args_map.get("buffer") or "")}
if h in pstate["requests"]:
pstate["requests"][h]["options"].append(opt_entry)
elif h in pstate["connects"]:
pstate["connects"][h]["options"].append(opt_entry)
elif h in pstate["sessions"]:
pstate["sessions"][h]["options"].append(opt_entry)
return


def winhttp_finalize_sessions(state):
"""
Returns per-process domain grouping with only:
- url (scheme derived: https if port == 443 else http)
- verb
- user_agent
- proxy info (access_type, proxy_name, proxy_bypass)
"""
out = []
procs = (state or {}).get("processes") or {}

for _, p in procs.items():
sessions = (p.get("sessions") or {})
if not sessions:
continue

sessions_by_domain = {}
sessions_by_domain_keys = defaultdict(set)

for s in sessions.values():
ua = s.get("user_agent") or ""
access_type = s.get("access_type") or ""
proxy_name = s.get("proxy_name") or ""
proxy_bypass = s.get("proxy_bypass") or ""

for c in s.get("connections") or []:
if not isinstance(c, dict):
continue

server = c.get("server") or ""
dom = _norm_domain(server)
if not dom:
continue

port = c.get("port")
scheme = "https" if port == 443 else "http"

for r in c.get("requests") or []:
if not isinstance(r, dict):
continue

obj = r.get("object") or ""
if not isinstance(obj, str):
obj = str(obj)

obj = obj.strip()
if not obj:
continue

if not obj.startswith("/"):
obj = "/" + obj

verb = r.get("verb") or ""
if not isinstance(verb, str):
verb = str(verb)

verb = verb.strip().upper() or "GET"
request = f"{verb} {obj} \r\nUser-Agent: {ua}\r\nHost: {dom}\r\n"
entry = {
"uri": obj,
"dport": port,
"method": verb,
"protocol": scheme,
"user_agent": ua,
"request": request,
"access_type": access_type,
"proxy_name": proxy_name,
"proxy_bypass": proxy_bypass,
}

key = (obj, verb, ua, access_type, proxy_name, proxy_bypass)
if key not in sessions_by_domain_keys[dom]:
sessions_by_domain.setdefault(dom, []).append(entry)
sessions_by_domain_keys[dom].add(key)

if sessions_by_domain:
out.append({
"process_id": p.get("process_id"),
"process_name": p.get("process_name", ""),
"sessions": sessions_by_domain,
})

return out
Loading
Loading