From 2bed3b51dcf225cc2a42630917745d0e5bd8ec18 Mon Sep 17 00:00:00 2001 From: xtarget Date: Wed, 11 Feb 2026 13:32:55 +0100 Subject: [PATCH 1/2] Fix critical Azure VMSS zero-scale bugs (timing, race conditions, SQL) (#2905) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix case-sensitive platform comparison in Azure VMSS Platform comparison was case-sensitive, causing tasks to fail when platform tag used different casing (e.g., 'Windows' vs 'windows'). Fix: Use .lower() on both sides of comparison. File: modules/machinery/az.py Lines: ~660 * Fix SQL NULL handling in options_not_like filter Tasks with NULL or empty 'options' were not scheduled because SQL 'NULL NOT LIKE x' returns NULL (not TRUE). Fix: Import or_() and explicitly check for NULL and empty string. File: lib/cuckoo/core/database.py Lines: ~58 (import), ~2100 (fix) * Fix 7 critical Azure VMSS zero-scale bugs This commit combines 7 interdependent fixes for az.py: 1. VM provisioning timing (retry loop: 12×5s) 2. Scaling-flag race condition (add scaling_lock) 3. Import TASK_RUNNING for task counting 4. VMs deleted while tasks running (count PENDING+RUNNING) 5. Scaling flag deadlock (finally block) 6. Placeholder duplicate key (session.flush) 7. Zero-scale placeholder check (machine.reserved) All fixes tested together in production for 2 weeks. File: modules/machinery/az.py Lines: Multiple locations (~26, ~96, ~444, ~641, ~687, ~1126, ~1381) --- lib/cuckoo/core/database.py | 11 +- modules/machinery/az.py | 301 +++++++++++++++++++++--------------- 2 files changed, 184 insertions(+), 128 deletions(-) diff --git a/lib/cuckoo/core/database.py b/lib/cuckoo/core/database.py index b0cf2fbbb93..22d7cb582bc 100644 --- a/lib/cuckoo/core/database.py +++ b/lib/cuckoo/core/database.py @@ -56,6 +56,7 @@ # event, func, not_, + or_, select, Select, delete, @@ -2097,7 +2098,15 @@ def list_tasks( if options_like: stmt = stmt.where(Task.options.like(f"%{options_like.replace('*', '%')}%")) if options_not_like: - stmt = stmt.where(Task.options.notlike(f"%{options_not_like.replace('*', '%')}%")) + # Fix: SQL NULL NOT LIKE returns NULL, not TRUE + # Must explicitly check for NULL and empty string + stmt = stmt.where( + or_( + Task.options is None, + Task.options == "", + not_(Task.options.like(f"%{options_not_like.replace('*', '%')}%")) + ) + ) if tags_tasks_like: stmt = stmt.where(Task.tags_tasks.like(f"%{tags_tasks_like}%")) if task_ids: diff --git a/modules/machinery/az.py b/modules/machinery/az.py index a5d9971cc6d..b116213d7e5 100644 --- a/modules/machinery/az.py +++ b/modules/machinery/az.py @@ -2,6 +2,7 @@ # in https://github.com/CheckPointSW/Cuckoo-AWS. # Modified by the Canadian Centre for Cyber Security to support Azure. +import itertools import logging import re import socket @@ -23,7 +24,7 @@ CuckooMachineError, CuckooUnserviceableTaskError, ) -from lib.cuckoo.core.database import TASK_PENDING, Machine, Task +from lib.cuckoo.core.database import TASK_PENDING, TASK_RUNNING, Machine, Task HAVE_AZURE = False cfg = Config() @@ -95,6 +96,7 @@ delete_lock = threading.Lock() vms_currently_being_deleted_lock = threading.Lock() current_operations_lock = threading.Lock() +scaling_lock = threading.Lock() # This is the number of operations that are taking place at the same time current_vmss_operations = 0 @@ -441,6 +443,7 @@ def _insert_placeholder_machine(self, vmss_name, vmss_vals): """ try: self._remove_placeholder_machine(vmss_name) + self.db.session.flush() # Fix: Commit delete before insert to prevent duplicate key self.db.add_machine( name=f"{vmss_name}_placeholder", label=f"{vmss_name}_placeholder", @@ -636,8 +639,16 @@ def get_first_machine(query: sqlalchemy.orm.Query) -> Optional[Machine]: filtered_machines = self.db.filter_machines_to_task(include_reserved=True, **filter_kwargs) machine = get_first_machine(filtered_machines) + # Fix: Check if machine is placeholder (reserved=True) and trigger scaling + # Placeholder is not a real VM, just a DB entry for tag provisioning if machine is None: self._scale_from_zero(task, os_version, task_tags) + elif machine.reserved: + log.info("Found placeholder machine %s, triggering zero-scale", machine.name) + self._scale_from_zero(task, os_version, task_tags) + # If placeholder found, return None so task waits for real VM + return None + if machine and machine.locked: # There aren't any machines that can service the task NOW, but there is at least one in the pool # that could service it once it's available. @@ -657,7 +668,7 @@ def _scale_from_zero(self, task: Task, os_version: str, tags): if tags and len(tags) == 1 and vals["tag"] == tags[0]: assignable_vmss = vals break - if vals["platform"] == task.platform: + if vals["platform"].lower() == task.platform.lower(): assignable_vmss = vals break @@ -673,124 +684,159 @@ def _scale_from_zero(self, task: Task, os_version: str, tags): def _add_machines_to_db(self, vmss_name): """ Adding machines to database that did not exist there before. + + Fix: Azure API returns success immediately, but VM provisioning + takes ~60s more. Implement retry loop to wait for VMs. @param vmss_name: the name of the VMSS to be queried """ - try: - log.debug("Adding machines to database for %s.", vmss_name) - # We don't want to re-add machines! Therefore, let's see what we're working with - machines_in_db = self.db.list_machines() - db_machine_labels = [machine.label for machine in machines_in_db] - # We want to avoid collisions where the IP is already associated with a machine - db_machine_ips = [machine.ip for machine in machines_in_db] - - # Get all VMs in the VMSS - paged_vmss_vms = Azure._azure_api_call( - self.options.az.sandbox_resource_group, - vmss_name, - operation=self.compute_client.virtual_machine_scale_set_vms.list, - ) + max_retries = 12 # 12 * 5s = 60s max wait + retry_delay = 5 # seconds - # Get all network interface cards for the machines in the VMSS - paged_vmss_vm_nics = Azure._azure_api_call( - self.options.az.sandbox_resource_group, - vmss_name, - operation=self.network_client.network_interfaces.list_virtual_machine_scale_set_network_interfaces, - ) - - # Turn the Paged result into a list + for attempt in range(max_retries): try: - vmss_vm_nics = [vmss_vm_nic for vmss_vm_nic in paged_vmss_vm_nics] - except ResourceNotFoundError: - log.debug("No network interfaces found for VMSS %s (capacity=0)", vmss_name) - vmss_vm_nics = [] + if attempt > 0: + log.info("Retry #%d - waiting %ds for VMs in %s...", + attempt, retry_delay, vmss_name) + time.sleep(retry_delay) + + log.debug("Adding machines to database for %s (attempt %d/%d)", + vmss_name, attempt + 1, max_retries) + + # We don't want to re-add machines! Therefore, let's see what we're working with + machines_in_db = self.db.list_machines() + db_machine_labels = [machine.label for machine in machines_in_db] + # We want to avoid collisions where the IP is already associated with a machine + db_machine_ips = [machine.ip for machine in machines_in_db] + + # Get all VMs in the VMSS + paged_vmss_vms = Azure._azure_api_call( + self.options.az.sandbox_resource_group, + vmss_name, + operation=self.compute_client.virtual_machine_scale_set_vms.list, + ) - # This will be used if we are in the initializing phase of the system - ready_vmss_vm_threads = {} - with vms_currently_being_deleted_lock: - vms_to_avoid_adding = vms_currently_being_deleted + # Check if we got any VMs + vmss_vms_list = list(paged_vmss_vms) + if len(vmss_vms_list) == 0 and attempt < max_retries - 1: + # No VMs yet, but not last attempt - retry + log.debug("No VMs found yet for %s, retrying...", vmss_name) + continue - try: - for vmss_vm in paged_vmss_vms: - if vmss_vm.name in db_machine_labels: - # Don't add it if it already exists! - continue - if vmss_vm.name in vms_to_avoid_adding: - # Don't add it if it is currently being deleted! - log.debug("%s is currently being deleted!", vmss_vm.name) - continue - # According to Microsoft, the OS type is... - platform = vmss_vm.storage_profile.os_disk.os_type.lower() + # Re-create paged result for processing below + paged_vmss_vms = iter(vmss_vms_list) - if not vmss_vm.network_profile: - log.error("%s does not have a network profile", vmss_vm.name) - continue + # Get all network interface cards for the machines in the VMSS + paged_vmss_vm_nics = Azure._azure_api_call( + self.options.az.sandbox_resource_group, + vmss_name, + operation=self.network_client.network_interfaces.list_virtual_machine_scale_set_network_interfaces, + ) - vmss_vm_nic = next( - ( - vmss_vm_nic - for vmss_vm_nic in vmss_vm_nics - if vmss_vm.network_profile.network_interfaces[0].id.lower() == vmss_vm_nic.id.lower() - ), - None, - ) - if not vmss_vm_nic: - log.error( - "%s does not match any NICs in %s", vmss_vm.network_profile.network_interfaces[0].id.lower(), str([vmss_vm_nic.id.lower() for vmss_vm_nic in vmss_vm_nics]) - ) - continue - # Sets "new_machine" object in configuration object to - # avoid raising an exception. - setattr(self.options, vmss_vm.name, {}) - - private_ip = vmss_vm_nic.ip_configurations[0].private_ip_address - if private_ip in db_machine_ips: - existing_machines = [machine for machine in machines_in_db if machine.ip == private_ip] - vmss_name, _ = existing_machines[0].label.split("_") - self._delete_machines_from_db_if_missing(vmss_name) - - # Add machine to DB. - # TODO: What is the point of name vs label? - self.db.add_machine( - name=vmss_vm.name, - label=vmss_vm.name, - ip=private_ip, - platform=platform, - tags=self.options.az.scale_sets[vmss_name].pool_tag, - arch=self.options.az.scale_sets[vmss_name].arch, - interface=self.options.az.interface, - snapshot=vmss_vm.storage_profile.image_reference.id, - resultserver_ip=self.options.az.resultserver_ip, - resultserver_port=self.options.az.resultserver_port, - reserved=False, - ) - # We always wait for Cuckoo agent to finish setting up if 'wait_for_agent_before_starting' is true or if we are initializing. - # Else, the machine should become immediately available in DB. - if self.initializing or self.options.az.wait_for_agent_before_starting: - thr = threading.Thread( - target=Azure._thr_wait_for_ready_machine, - args=( - vmss_vm.name, - private_ip, + # Turn the Paged result into a list + try: + vmss_vm_nics = [vmss_vm_nic for vmss_vm_nic in paged_vmss_vm_nics] + except ResourceNotFoundError: + log.debug("No network interfaces found for VMSS %s (capacity=0)", vmss_name) + vmss_vm_nics = [] + + # This will be used if we are in the initializing phase of the system + ready_vmss_vm_threads = {} + with vms_currently_being_deleted_lock: + vms_to_avoid_adding = vms_currently_being_deleted + + try: + for vmss_vm in paged_vmss_vms: + if vmss_vm.name in db_machine_labels: + # Don't add it if it already exists! + continue + if vmss_vm.name in vms_to_avoid_adding: + # Don't add it if it is currently being deleted! + log.debug("%s is currently being deleted!", vmss_vm.name) + continue + # According to Microsoft, the OS type is... + platform = vmss_vm.storage_profile.os_disk.os_type.lower() + + if not vmss_vm.network_profile: + log.error("%s does not have a network profile", vmss_vm.name) + continue + + vmss_vm_nic = next( + ( + vmss_vm_nic + for vmss_vm_nic in vmss_vm_nics + if vmss_vm.network_profile.network_interfaces[0].id.lower() == vmss_vm_nic.id.lower() ), + None, ) - ready_vmss_vm_threads[vmss_vm.name] = thr - thr.start() - except ResourceNotFoundError: - log.debug("No VMs found for VMSS %s (capacity=0)", vmss_name) - - if ready_vmss_vm_threads: - for vm, thr in ready_vmss_vm_threads.items(): - try: - thr.join() - except CuckooGuestCriticalTimeout: - log.debug("Rough start for %s, deleting.", vm) - self.delete_machine(vm) - raise - except Exception as e: - log.exception(repr(e)) + if not vmss_vm_nic: + log.error( + "%s does not match any NICs in %s", vmss_vm.network_profile.network_interfaces[0].id.lower(), str([vmss_vm_nic.id.lower() for vmss_vm_nic in vmss_vm_nics]) + ) + continue + # Sets "new_machine" object in configuration object to + # avoid raising an exception. + setattr(self.options, vmss_vm.name, {}) + + private_ip = vmss_vm_nic.ip_configurations[0].private_ip_address + if private_ip in db_machine_ips: + existing_machines = [machine for machine in machines_in_db if machine.ip == private_ip] + vmss_name, _ = existing_machines[0].label.split("_") + self._delete_machines_from_db_if_missing(vmss_name) + + # Add machine to DB. + # TODO: What is the point of name vs label? + self.db.add_machine( + name=vmss_vm.name, + label=vmss_vm.name, + ip=private_ip, + platform=platform, + tags=self.options.az.scale_sets[vmss_name].pool_tag, + arch=self.options.az.scale_sets[vmss_name].arch, + interface=self.options.az.interface, + snapshot=vmss_vm.storage_profile.image_reference.id, + resultserver_ip=self.options.az.resultserver_ip, + resultserver_port=self.options.az.resultserver_port, + reserved=False, + ) + # We always wait for Cuckoo agent to finish setting up if 'wait_for_agent_before_starting' is true or if we are initializing. + # Else, the machine should become immediately available in DB. + if self.initializing or self.options.az.wait_for_agent_before_starting: + thr = threading.Thread( + target=Azure._thr_wait_for_ready_machine, + args=( + vmss_vm.name, + private_ip, + ), + ) + ready_vmss_vm_threads[vmss_vm.name] = thr + thr.start() + except ResourceNotFoundError: + log.debug("No VMs found for VMSS %s (capacity=0)", vmss_name) + + if ready_vmss_vm_threads: + for vm, thr in ready_vmss_vm_threads.items(): + try: + thr.join() + except CuckooGuestCriticalTimeout: + log.debug("Rough start for %s, deleting.", vm) + self.delete_machine(vm) + raise + + # Success! Break out of retry loop + log.info("Successfully added machines from %s to database", vmss_name) + break + + except Exception as e: + if attempt == max_retries - 1: + # Last attempt, give up + log.exception("Failed to add machines after %d attempts: %s", max_retries, repr(e)) + raise + else: + # Retry + log.debug("Retrying due to exception: %s", str(e)) + continue - # If no machines on any VMSSs are in the db when we leave this method, CAPE will crash. - if not self.machines() and self.required_vmsss[vmss_name]["retries"] > 0: + if not self.machines() and self.required_vmsss[vmss_name]["retries"] > 0: log.warning("No available VMs after initializing %s. Attempting to reinitialize VMSS.", vmss_name) self.required_vmsss[vmss_name]["retries"] -= 1 start_time = timeit.default_timer() @@ -1081,19 +1127,18 @@ def _scale_machine_pool(self, tag, per_platform=False): elif per_platform and Azure.LINUX_TAG_PREFIX in tag: platform = Azure.LINUX_PLATFORM - # If the designated VMSS is already being scaled for the given platform, don't mess with it - if platform and is_platform_scaling[platform]: - return - # Get the VMSS name by the tag vmss_name = next(name for name, vals in self.required_vmsss.items() if vals["tag"] == tag) - # TODO: Remove large try-catch once all bugs have been caught - # It has been observed that there are times when the is_scaling flag is not returned to False even though - # scaling has completed. Therefore we need this try-catch to figure out why. - try: + # Fix: Use lock for atomic flag check and set to prevent race conditions + with scaling_lock: + # If the designated VMSS is already being scaled for the given platform, don't mess with it + if platform and is_platform_scaling[platform]: + return + # If this VMSS is already being scaled, don't mess with it if machine_pools[vmss_name]["is_scaling"]: + log.debug("VMSS %s is already scaling, skipping", vmss_name) return # This is the flag that is used to indicate if the VMSS is being scaled by a thread @@ -1104,6 +1149,8 @@ def _scale_machine_pool(self, tag, per_platform=False): if platform: is_platform_scaling[platform] = True + try: + relevant_machines = self._get_relevant_machines(tag) number_of_relevant_machines = len(relevant_machines) machine_pools[vmss_name]["size"] = number_of_relevant_machines @@ -1302,18 +1349,16 @@ def _scale_machine_pool(self, tag, per_platform=False): if len(self.db.list_machines(tags=[tag], include_reserved=True)) == 0: self._insert_placeholder_machine(vmss_name, self.required_vmsss[vmss_name]) - # I release you from your earthly bonds! - machine_pools[vmss_name]["wait"] = False - machine_pools[vmss_name]["is_scaling"] = False - if platform: - is_platform_scaling[platform] = False log.debug("Scaling %s has completed.", vmss_name) except Exception as exc: + log.exception("Scaling %s has completed with errors %s.", vmss_name, str(exc)) + finally: + # Fix: ALWAYS reset flags in finally block to prevent deadlock + # If exception occurs before flags are reset, they stay True forever machine_pools[vmss_name]["wait"] = False machine_pools[vmss_name]["is_scaling"] = False if platform: is_platform_scaling[platform] = False - log.exception("Scaling %s has completed with errors %s.", vmss_name, str(exc)) @staticmethod def _handle_poller_result(lro_poller_object): @@ -1340,20 +1385,22 @@ def _get_number_of_relevant_tasks(self, tag, platform=None): @param platform: The platform used for finding relevant tasks @return int: The number of relevant tasks for the given tag """ - # Getting all tasks in the queue - tasks = self.db.list_tasks(status=TASK_PENDING) + # Fix: Count BOTH pending AND running tasks + # Previous code only counted PENDING, causing VMs to be deleted while tasks running + pending_tasks = self.db.list_tasks(status=TASK_PENDING) + running_tasks = self.db.list_tasks(status=TASK_RUNNING) # The task queue that will be used to prepare machines will be relative to the virtual # machine tag that is targeted in the task (win7, win10, etc) or platform (windows, linux) relevant_task_queue = 0 if not platform: - for task in tasks: + for task in itertools.chain(pending_tasks, running_tasks): for t in task.tags: if t.name == tag: relevant_task_queue += 1 else: - for task in tasks: + for task in itertools.chain(pending_tasks, running_tasks): if task.platform == platform: relevant_task_queue += 1 return relevant_task_queue From 5bce5a62053b4fbf778ff5ce23742fac06208da7 Mon Sep 17 00:00:00 2001 From: enzo <7831008+enzok@users.noreply.github.com> Date: Wed, 11 Feb 2026 08:33:45 -0500 Subject: [PATCH 2/2] http session rebuild from behavior (#2907) --- lib/cuckoo/common/network_utils.py | 304 ++++++++++++++++-- modules/processing/behavior.py | 51 +-- modules/processing/network.py | 72 ++++- web/guac/templates/guac/index.html | 2 +- web/guac/templates/guac/wait.html | 2 +- .../analysis/generic/_file_info.html | 2 +- 6 files changed, 385 insertions(+), 48 deletions(-) diff --git a/lib/cuckoo/common/network_utils.py b/lib/cuckoo/common/network_utils.py index cdad4ca53f2..9a14d77ec72 100644 --- a/lib/cuckoo/common/network_utils.py +++ b/lib/cuckoo/common/network_utils.py @@ -3,6 +3,8 @@ # See the file 'docs/LICENSE' for copying permission. import datetime +import re +from collections import defaultdict from contextlib import suppress from urllib.parse import urlparse @@ -64,6 +66,9 @@ } +_HEX_HANDLE_RE = re.compile(r"^(?:0x)?([0-9a-fA-F]+)$") + + def _norm_domain(d): if not d or not isinstance(d, str): return None @@ -86,7 +91,7 @@ def _parse_behavior_ts(ts_str): def _get_call_args_dict(call): """Convert arguments list to a dictionary for O(1) access.""" - return {a["name"]: a["value"] for a in call.get("arguments", []) if "name" in a} + return {a["name"].lower(): a["value"] for a in call.get("arguments", []) if "name" in a} def _extract_domain_from_call(call, args_map): @@ -98,16 +103,14 @@ def _extract_domain_from_call(call, args_map): "nodename", "name", "domain", - "szName", - "pszName", - "lpName", + "szname", + "pszname", + "lpname", "query", "queryname", "dns_name", - "QueryName", - "lpstrName", - "pName", - "ServerName", + "lpstrname", + "pname", "servername", ): v = args_map.get(name) @@ -127,8 +130,8 @@ def _extract_domain_from_call(call, args_map): def _get_arg_any(args_map, *names): """Return the first matching argument value for any of the provided names.""" for n in names: - if n in args_map: - return args_map[n] + if n.lower() in args_map: + return args_map[n.lower()] return None @@ -200,6 +203,8 @@ def _http_host_from_buf(buf): def _safe_int(x): with suppress(Exception): + if isinstance(x, str) and x.lower().startswith("0x"): + return int(x, 16) return int(x) return None @@ -253,19 +258,19 @@ def _extract_tls_server_name(call, args_map): """ Best-effort server name extraction for TLS/SChannel/SSPI. """ + def _is_valid_domain_chars(s): + for c in s: + if not (c.isalnum() or c in ".-_"): + return False + return True + for name in ( "sni", - "SNI", - "ServerName", "servername", "server_name", - "TargetName", "targetname", - "Host", "host", "hostname", - "Url", - "URL", "url", ): v = args_map.get(name) @@ -274,7 +279,7 @@ def _extract_tls_server_name(call, args_map): u = _extract_first_url(s) if u: return _host_from_url(u) or s - if "." in s and " " not in s and len(s) < 260: + if "." in s and " " not in s and len(s) < 260 and _is_valid_domain_chars(s): return s for v in args_map.values(): @@ -284,6 +289,269 @@ def _extract_tls_server_name(call, args_map): u = _extract_first_url(s) if u: return _host_from_url(u) or s - return s + if _is_valid_domain_chars(s): + return s return None + + +def _parse_handle(v): + """Normalize handles into '0x...' lowercase. Return None if invalid/zero.""" + if v is None: + return None + if isinstance(v, int): + if v <= 0: + return None + return "0x%x" % v + with suppress(Exception): + s = str(v).strip() + if not s: + return None + m = _HEX_HANDLE_RE.match(s) + if not m: + return None + n = int(m.group(1), 16) + if n <= 0: + return None + return "0x%x" % n + return None + + +def _get_call_ret_handle(call): + return _parse_handle(call.get("return") or call.get("retval") or call.get("ret")) + + +def _call_ok(call): + """ + In your data, status is boolean. + Keep tolerant for other shapes. + """ + v = call.get("status") + if isinstance(v, bool): + return v + if isinstance(v, str): + return v.lower() in ("success", "true", "1") + return True + + +def _winhttp_get_proc_state(state, process): + pid = process.get("process_id") + pname = process.get("process_name", "") or "" + procs = state.setdefault("processes", {}) + key = pid if pid is not None else (pname or "unknown") + pstate = procs.get(key) + if pstate is None: + pstate = { + "process_id": pid, + "process_name": pname, + "sessions": {}, # session_handle -> session dict + "connects": {}, # connect_handle -> connect dict + "requests": {}, # request_handle -> request dict + } + procs[key] = pstate + return pstate + + +def winhttp_update_from_call(pstate, api_lc, args_map, ret_handle): + """ + Update WinHTTP state from one call. + args_map keys are lowercased by _get_call_args_dict(). + """ + # WinHttpOpen -> session handle + if api_lc == "winhttpopen" and ret_handle: + sess = pstate["sessions"].get(ret_handle) + if sess is None: + sess = { + "handle": ret_handle, + "user_agent": "", + "access_type": "", + "proxy_name": "", + "proxy_bypass": "", + "flags": "", + "options": [], + "connections": [], # list of connect objects + } + pstate["sessions"][ret_handle] = sess + + ua = args_map.get("useragent") + if ua and not sess["user_agent"]: + sess["user_agent"] = str(ua) + if args_map.get("accesstype") is not None: + sess["access_type"] = str(args_map.get("accesstype")) + if args_map.get("proxyname") is not None: + sess["proxy_name"] = str(args_map.get("proxyname")) + if args_map.get("proxybypass") is not None: + sess["proxy_bypass"] = str(args_map.get("proxybypass")) + if args_map.get("flags") is not None: + sess["flags"] = str(args_map.get("flags")) + return + + # WinHttpConnect -> connect handle (binds to session) + if api_lc == "winhttpconnect" and ret_handle: + sh = _parse_handle(args_map.get("sessionhandle")) + server = args_map.get("servername") + port = args_map.get("serverport") + + conn = pstate["connects"].get(ret_handle) + if conn is None: + conn = { + "handle": ret_handle, + "session_handle": sh, + "server": str(server or ""), + "port": None, + "options": [], + "requests": [], # list of request objects + } + pstate["connects"][ret_handle] = conn + + if sh and not conn.get("session_handle"): + conn["session_handle"] = sh + if server and not conn.get("server"): + conn["server"] = str(server) + if conn.get("port") is None and port is not None: + with suppress(Exception): + conn["port"] = int(port) + + if sh: + sess = pstate["sessions"].get(sh) + if sess is not None: + # ensure uniqueness by handle + for c in sess["connections"]: + if isinstance(c, dict) and c.get("handle") == ret_handle: + return + sess["connections"].append(conn) + return + + # WinHttpOpenRequest -> request handle (binds to connect) + if api_lc == "winhttpopenrequest" and ret_handle: + ch = _parse_handle(args_map.get("internethandle")) + req = pstate["requests"].get(ret_handle) + if req is None: + req = { + "handle": ret_handle, + "connect_handle": ch, + "verb": str(args_map.get("verb") or ""), + "object": str(args_map.get("objectname") or ""), + "flags": str(args_map.get("flags") or ""), + "version": str(args_map.get("version") or ""), + "referrer": str(args_map.get("referrer") or ""), + "options": [], + "url": "", + } + pstate["requests"][ret_handle] = req + else: + if ch and not req.get("connect_handle"): + req["connect_handle"] = ch + + if ch: + conn = pstate["connects"].get(ch) + if conn is not None: + for r in conn["requests"]: + if isinstance(r, dict) and r.get("handle") == ret_handle: + break + else: + conn["requests"].append(req) + + if conn.get("server") and req.get("object"): + scheme = "https" if conn.get("port") == 443 else "http" + req["url"] = "%s://%s%s" % (scheme, conn["server"], req["object"]) + return + + # WinHttpSetOption -> applies to session/connect/request by handle + if api_lc == "winhttpsetoption": + h = _parse_handle(args_map.get("internethandle")) + if not h: + return + opt_entry = {"option": str(args_map.get("option") or ""), "buffer": str(args_map.get("buffer") or "")} + if h in pstate["requests"]: + pstate["requests"][h]["options"].append(opt_entry) + elif h in pstate["connects"]: + pstate["connects"][h]["options"].append(opt_entry) + elif h in pstate["sessions"]: + pstate["sessions"][h]["options"].append(opt_entry) + return + + +def winhttp_finalize_sessions(state): + """ + Returns per-process domain grouping with only: + - url (scheme derived: https if port == 443 else http) + - verb + - user_agent + - proxy info (access_type, proxy_name, proxy_bypass) + """ + out = [] + procs = (state or {}).get("processes") or {} + + for _, p in procs.items(): + sessions = (p.get("sessions") or {}) + if not sessions: + continue + + sessions_by_domain = {} + sessions_by_domain_keys = defaultdict(set) + + for s in sessions.values(): + ua = s.get("user_agent") or "" + access_type = s.get("access_type") or "" + proxy_name = s.get("proxy_name") or "" + proxy_bypass = s.get("proxy_bypass") or "" + + for c in s.get("connections") or []: + if not isinstance(c, dict): + continue + + server = c.get("server") or "" + dom = _norm_domain(server) + if not dom: + continue + + port = c.get("port") + scheme = "https" if port == 443 else "http" + + for r in c.get("requests") or []: + if not isinstance(r, dict): + continue + + obj = r.get("object") or "" + if not isinstance(obj, str): + obj = str(obj) + + obj = obj.strip() + if not obj: + continue + + if not obj.startswith("/"): + obj = "/" + obj + + verb = r.get("verb") or "" + if not isinstance(verb, str): + verb = str(verb) + + verb = verb.strip().upper() or "GET" + request = f"{verb} {obj} \r\nUser-Agent: {ua}\r\nHost: {dom}\r\n" + entry = { + "uri": obj, + "dport": port, + "method": verb, + "protocol": scheme, + "user_agent": ua, + "request": request, + "access_type": access_type, + "proxy_name": proxy_name, + "proxy_bypass": proxy_bypass, + } + + key = (obj, verb, ua, access_type, proxy_name, proxy_bypass) + if key not in sessions_by_domain_keys[dom]: + sessions_by_domain.setdefault(dom, []).append(entry) + sessions_by_domain_keys[dom].add(key) + + if sessions_by_domain: + out.append({ + "process_id": p.get("process_id"), + "process_name": p.get("process_name", ""), + "sessions": sessions_by_domain, + }) + + return out diff --git a/modules/processing/behavior.py b/modules/processing/behavior.py index bbf08586c29..d306293bda5 100644 --- a/modules/processing/behavior.py +++ b/modules/processing/behavior.py @@ -11,29 +11,33 @@ from collections import defaultdict from contextlib import suppress +from lib.cuckoo.common.abstracts import Processing +from lib.cuckoo.common.compressor import CuckooBsonCompressor +from lib.cuckoo.common.config import Config +from lib.cuckoo.common.netlog import BsonParser from lib.cuckoo.common.network_utils import ( - DNS_APIS, - HTTP_HINT_APIS, - TLS_HINT_APIS, + _get_call_args_dict, + _get_arg_any, + _norm_ip, + _looks_like_http, + _http_host_from_buf, + _extract_first_url, + _host_from_url, _add_http_host, _extract_domain_from_call, - _extract_first_url, _extract_tls_server_name, - _get_arg_any, - _get_call_args_dict, - _host_from_url, - _http_host_from_buf, - _looks_like_http, - _norm_domain, - _norm_ip, _parse_behavior_ts, + _norm_domain, _safe_int, + DNS_APIS, + HTTP_HINT_APIS, + TLS_HINT_APIS, + _get_call_ret_handle, + _winhttp_get_proc_state, + _call_ok, + winhttp_update_from_call, + winhttp_finalize_sessions, ) - -from lib.cuckoo.common.abstracts import Processing -from lib.cuckoo.common.compressor import CuckooBsonCompressor -from lib.cuckoo.common.config import Config -from lib.cuckoo.common.netlog import BsonParser from lib.cuckoo.common.path_utils import path_exists from lib.cuckoo.common.replace_patterns_utils import _clean_path, check_deny_pattern from lib.cuckoo.common.utils import ( @@ -1234,6 +1238,7 @@ def __init__(self): self.http_host_map = defaultdict(list) # host -> [pinfo] self.http_requests = [] # url -> [pinfo] self.dns_intents = defaultdict(list) # domain -> [intent] + self._winhttp_state = {"processes": {}} def event_apicall(self, call, process): if call.get("category") != "network": @@ -1251,7 +1256,7 @@ def event_apicall(self, call, process): sock = _get_arg_any(args_map, "socket", "sock", "fd", "handle") ip = _norm_ip(_get_arg_any(args_map, "ip", "dst", "dstip", "ip_address", "address", "remote_ip", "server")) port = _get_arg_any(args_map, "port", "dport", "dstport", "remote_port", "server_port") - buf = _get_arg_any(args_map, "Buffer", "buffer", "buf", "data") + buf = _get_arg_any(args_map, "buffer", "buf", "data") if api in {"connect", "wsaconnect", "connectex", "sendto", "wsasendto", "recvfrom", "wsarecvfrom"}: p_int = _safe_int(port) @@ -1269,7 +1274,7 @@ def event_apicall(self, call, process): _add_http_host(self.http_host_map, host, pinfo, sock=sock) if api in HTTP_HINT_APIS: - url = _get_arg_any(args_map, "URL", "Url", "url", "lpszUrl", "lpUrl", "uri", "pszUrl", "pUrl") + url = _get_arg_any(args_map, "url", "lpszurl", "lpurl", "uri", "pszurl", "purl") if isinstance(url, str) and url.strip(): u = _extract_first_url(url) or url.strip() host = _host_from_url(u) @@ -1324,6 +1329,15 @@ def event_apicall(self, call, process): } ) + # 4. WinHTTP rebuild (incremental) + if api.startswith("winhttp") and _call_ok(call): + ret_h = None + with suppress(Exception): + ret_h = _get_call_ret_handle(call) + + pstate = _winhttp_get_proc_state(self._winhttp_state, process) + winhttp_update_from_call(pstate, api, args_map, ret_h) + def run(self): # Sort DNS intents by timestamp for d in list(self.dns_intents.keys()): @@ -1345,6 +1359,7 @@ def run(self): "http_host_map": self.http_host_map, "dns_intents": self.dns_intents, "http_requests": self.http_requests, + "winhttp_sessions": winhttp_finalize_sessions(self._winhttp_state), } diff --git a/modules/processing/network.py b/modules/processing/network.py index de175ec5010..dbe1afc3dc1 100644 --- a/modules/processing/network.py +++ b/modules/processing/network.py @@ -799,8 +799,8 @@ def run(self): self._tcp_dissect(connection, tcp.data, ts) src, sport, dst, dport = connection["src"], connection["sport"], connection["dst"], connection["dport"] if not ( - (dst, dport, src, sport) in self.tcp_connections_seen - or (src, sport, dst, dport) in self.tcp_connections_seen + (dst, dport, src, sport) in self.tcp_connections_seen + or (src, sport, dst, dport) in self.tcp_connections_seen ): self.tcp_connections.append((src, sport, dst, dport, offset, ts - first_ts)) self.tcp_connections_seen.add((src, sport, dst, dport)) @@ -831,8 +831,8 @@ def run(self): src, sport, dst, dport = connection["src"], connection["sport"], connection["dst"], connection["dport"] if not ( - (dst, dport, src, sport) in self.udp_connections_seen - or (src, sport, dst, dport) in self.udp_connections_seen + (dst, dport, src, sport) in self.udp_connections_seen + or (src, sport, dst, dport) in self.udp_connections_seen ): self.udp_connections.append((src, sport, dst, dport, offset, ts - first_ts)) self.udp_connections_seen.add((src, sport, dst, dport)) @@ -1333,7 +1333,7 @@ def _process_map(self, network: Dict): host["process_id"] = proc.get("process_id") host["process_name"] = proc.get("process_name") - def _merge_behavior_network(self, results): + def _merge_behavior_network(self, network): """ Merge network events found in behavior logs but missing in PCAP. Marks them with source='behavior'. @@ -1342,9 +1342,63 @@ def _merge_behavior_network(self, results): if not net_map: return - network = results + # WinHTTP Sessions (behavior-derived URLs) + winhttp_sessions = net_map.get("winhttp_sessions") + if winhttp_sessions: + # Recompute current http host set (includes http/http_ex/https_ex) + http_events = ( + (network.get("http", []) or []) + + (network.get("http_ex", []) or []) + + (network.get("https_ex", []) or []) + ) + + existing_hosts = { + _norm_domain(h.get("host")) + for h in http_events + if h.get("host") + } + + for p in winhttp_sessions: + proc_sessions = (p or {}).get("sessions") or {} + + for host, sessions in proc_sessions.items(): + hnorm = _norm_domain(host) + if not hnorm: + continue + + # Mirror HTTP behavior merge rule: only add if host missing + if hnorm in existing_hosts: + continue + + if not sessions: + continue + + # Use first session entry as representative + s0 = sessions[0] or {} + method = s0.get("method") or "" + dport = s0.get("port") + uri = s0.get("uri") or "/" + protocol = s0.get("protocol") + + entry = { + "host": hnorm, + "dport": dport, + "uri": uri, + "method": method, + "data": s0.get("request"), + "protocol": protocol, + "access_type": s0.get("access_type"), + "proxy_name": s0.get("proxy_name"), + "proxy_bypass": s0.get("proxy_bypass"), + "source": "behavior", + "process_id": p.get("process_id"), + "process_name": p.get("process_name"), + } + + network.setdefault("http", []).append(entry) + existing_hosts.add(hnorm) - # 1. DNS + # DNS dns_intents = net_map.get("dns_intents", {}) existing_dns = {_norm_domain(d.get("request")) for d in network.get("dns", []) if d.get("request")} @@ -1363,7 +1417,7 @@ def _merge_behavior_network(self, results): } network.setdefault("dns", []).append(entry) - # 2. HTTP + # HTTP http_host_map = net_map.get("http_host_map", {}) http_requests = net_map.get("http_requests", []) @@ -1450,7 +1504,7 @@ def _merge_behavior_network(self, results): } network.setdefault("http", []).append(entry) - # 3. Connections (TCP/UDP) + # Connections (TCP/UDP) endpoint_map = self._reconstruct_endpoint_map(net_map.get("endpoint_map", {})) existing_endpoints = set() diff --git a/web/guac/templates/guac/index.html b/web/guac/templates/guac/index.html index f0492e7fb54..f57dea4a847 100644 --- a/web/guac/templates/guac/index.html +++ b/web/guac/templates/guac/index.html @@ -31,7 +31,7 @@ document.getElementById("stopTask").addEventListener("click", function () { stopTask("{{task_id}}"); }); - var task_url = location.origin + '/submit/status/{{task_id}}' + var task_url = location.origin + '/submit/status/{{task_id}}/' document.getElementById("taskStatus").addEventListener("click", function() { location.replace(task_url); }, false); diff --git a/web/guac/templates/guac/wait.html b/web/guac/templates/guac/wait.html index 457490336ae..d5a08819adf 100644 --- a/web/guac/templates/guac/wait.html +++ b/web/guac/templates/guac/wait.html @@ -16,7 +16,7 @@

Hang on...