Skip to content

Commit 2f27fdd

Browse files
committed
refactor: migrate to structured event models and improve syscall parsing
- Replace raw string event handling with Pydantic models for type safety - Add support for multiple syscall types (execve, fork, clone, connect) - Create base event model with common validation - Move event models to dedicated domain module - Update tests to use new event models - Improve trace reader to parse and validate events - Remove deprecated event_models.py
1 parent 0fa259d commit 2f27fdd

File tree

11 files changed

+157
-82
lines changed

11 files changed

+157
-82
lines changed

linux_edr/app.py

Lines changed: 15 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from collections import defaultdict
33
import re
44
import os
5-
from typing import Dict, List, Optional, Any, NamedTuple, Iterator, Set, Tuple
5+
from typing import Dict, List, Optional, Any, NamedTuple, Iterator, Set, Tuple, Union
66
from apscheduler.schedulers.background import BackgroundScheduler
77
from .trace import TraceReader
88
from .aggregator import Aggregator
@@ -11,6 +11,7 @@
1111
from .config import Config
1212
from .report_manager import ReportManager
1313
from .models import Cell
14+
from .domain.models.events import BaseSyscallEvent, ExecveEvent
1415

1516

1617
def setup_logging(debug: bool = False) -> None:
@@ -316,62 +317,35 @@ def _summarize(self) -> None:
316317

317318
logging.info(f"Created cell report {cell.report_id} with {cell.total} events")
318319

319-
def _process_event(self, evt: str) -> None:
320+
def _process_event(self, evt: BaseSyscallEvent) -> None:
320321
"""
321322
Process a single event from the trace reader.
322323
323324
Args:
324325
evt: Raw event string from trace_pipe
325326
"""
326-
# Log raw event in debug mode
327-
if self.debug:
328-
# Always log basic event info
329-
logging.debug(f"Raw event: {evt}")
330-
331-
# Log detailed parsed info if verbose debug is enabled
332-
if self.verbose_debug:
333-
self._log_parsed_event(evt)
334-
335-
# If event is already a dict (e.g., when injected by tests or future extensions),
336-
# we assume it's been validated and directly buffer it.
337-
if isinstance(evt, dict):
338-
self.agg.add(evt)
339-
return
340-
341-
# Otherwise, treat it as raw text from trace_pipe and try to parse/validate.
342-
parsed = parse_execve(evt)
327+
if self.verbose_debug:
328+
self._log_debug_event(evt)
343329

344-
if not parsed:
345-
# Not an execve line – skip buffering
330+
# If the trace reader already produced a validated ExecveEvent model, buffer it directly.
331+
if isinstance(evt, BaseSyscallEvent):
332+
self.agg.add(evt.model_dump() if hasattr(evt, "model_dump") else evt.dict())
346333
return
334+
else:
335+
logging.warning(f"Invalid event type: {type(evt)}")
347336

348-
# Validate with Pydantic schema (ensures correct types/structure)
349-
try:
350-
from .domain.models.event_models import ExecveEvent as ExecveEventModel # Local import to avoid cycles
351-
352-
model_event = ExecveEventModel.from_namedtuple(parsed)
353-
354-
# Buffer as plain dict (safer for serialization & downstream processing)
355-
self.agg.add(model_event.model_dump())
356-
except Exception as e:
357-
# Any validation or conversion error – log and drop the event
358-
logging.warning("Invalid event skipped: %s", e)
359-
360-
def _log_parsed_event(self, evt: str) -> None:
337+
def _log_debug_event(self, evt: BaseSyscallEvent) -> None:
361338
"""
362339
Parse and log detailed event information.
363340
364341
Args:
365342
evt: Raw event string from trace_pipe
366343
"""
344+
if not self.debug or not self.verbose_debug:
345+
return
346+
367347
try:
368-
parsed_evt = parse_execve(evt)
369-
if parsed_evt:
370-
logging.debug(
371-
f"Parsed execve: timestamp={parsed_evt.timestamp}, "
372-
f"pid={parsed_evt.pid}, command={parsed_evt.command}, "
373-
f"args={parsed_evt.args}"
374-
)
348+
logging.debug(f"{evt}")
375349
except Exception as e:
376350
logging.debug(f"Parse error: {str(e)}")
377351

linux_edr/domain/models/event_models.py

Lines changed: 0 additions & 21 deletions
This file was deleted.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from .base import BaseSyscallEvent
2+
from .execve import ExecveEvent
3+
from .fork import ForkEvent
4+
from .clone import CloneEvent
5+
from .connect import ConnectEvent
6+
7+
__all__ = [
8+
"BaseSyscallEvent",
9+
"ExecveEvent",
10+
"ForkEvent",
11+
"CloneEvent",
12+
"ConnectEvent",
13+
]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from pydantic import BaseModel, Field, field_validator
2+
from datetime import datetime
3+
4+
class BaseSyscallEvent(BaseModel):
5+
"""Common attributes for all syscall events."""
6+
7+
timestamp: str = Field(..., description="Kernel timestamp (can be converted to datetime later)")
8+
pid: int = Field(..., ge=0, description="Process ID that triggered the syscall")
9+
10+
# --- validators -------------------------------------------------------
11+
@field_validator("timestamp")
12+
@classmethod
13+
def _validate_iso_or_numeric(cls, v: str) -> str: # pragma: no cover
14+
"""Accepts isoformat or numeric timestamps but ensures non-empty."""
15+
if not v:
16+
raise ValueError("timestamp cannot be empty")
17+
return v
18+
19+
@field_validator("pid")
20+
@classmethod
21+
def _validate_pid(cls, v: int) -> int: # pragma: no cover
22+
if v < 0:
23+
raise ValueError("pid must be non-negative")
24+
return v
25+
26+
# --- helpers ----------------------------------------------------------
27+
def __str__(self) -> str: # pragma: no cover – convenience only
28+
return f"[{self.timestamp}] pid={self.pid}"
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from pydantic import Field
2+
from .base import BaseSyscallEvent
3+
4+
class CloneEvent(BaseSyscallEvent):
5+
"""clone syscall event."""
6+
7+
child_pid: int = Field(..., ge=0, description="PID of the cloned task")
8+
flags: str = Field(..., description="Clone flags")
9+
10+
def __str__(self) -> str: # pragma: no cover
11+
return f"{super().__str__()} clone -> child_pid={self.child_pid} flags={self.flags}"
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from pydantic import Field
2+
from .base import BaseSyscallEvent
3+
4+
class ConnectEvent(BaseSyscallEvent):
5+
"""connect syscall event."""
6+
7+
fd: int = Field(..., ge=0, description="Socket file descriptor")
8+
address: str = Field(..., description="Destination address (ip:port or path)")
9+
10+
def __str__(self) -> str: # pragma: no cover
11+
return f"{super().__str__()} connect -> fd={self.fd} addr={self.address}"
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from typing import List
2+
from pydantic import Field
3+
4+
from .base import BaseSyscallEvent
5+
6+
class ExecveEvent(BaseSyscallEvent):
7+
"""execve syscall event."""
8+
9+
command: str = Field(..., description="Executable invoked (basename)")
10+
args: List[str] = Field(default_factory=list, description="Arguments supplied to the executable")
11+
12+
def __str__(self) -> str: # pragma: no cover
13+
cmd_line = " ".join([self.command, *self.args])
14+
return f"{super().__str__()} execve -> {cmd_line}"
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from pydantic import Field
2+
from .base import BaseSyscallEvent
3+
4+
class ForkEvent(BaseSyscallEvent):
5+
"""fork syscall event."""
6+
7+
child_pid: int = Field(..., ge=0, description="Child process PID created by fork")
8+
9+
def __str__(self) -> str: # pragma: no cover
10+
return f"{super().__str__()} fork -> child_pid={self.child_pid}"

linux_edr/trace.py

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,23 @@
44
import errno
55
import logging
66
import time
7-
from typing import Generator, Optional
7+
import re
8+
from typing import Generator, Optional, Union
89

910
# Default path to the kernel's trace_pipe
1011
TRACE_PATH = "/sys/kernel/tracing/trace_pipe"
1112
# Maximum time to wait when reading (in seconds)
12-
DEFAULT_TIMEOUT = 10
13+
DEFAULT_TIMEOUT = 1.0
1314

1415
logger = logging.getLogger(__name__)
1516

17+
# Pre-compiled regexes for supported syscalls
18+
EXECVE_PATTERN = re.compile(r"(\S+)\s+\[(\d+)\]\s+.*execve.*\((.*?)\)")
19+
FORK_PATTERN = re.compile(r"(\S+)\s+\[(\d+)\]\s+.*fork.*child_pid=(\d+)")
20+
CLONE_PATTERN = re.compile(r"(\S+)\s+\[(\d+)\]\s+.*clone.*child_pid=(\d+)\s+flags=(\S+)")
21+
CONNECT_PATTERN = re.compile(r"(\S+)\s+\[(\d+)\]\s+.*connect.*fd=(\d+)\s+addr=(.+)")
22+
23+
from .domain.models.events import ExecveEvent, ForkEvent, CloneEvent, ConnectEvent, BaseSyscallEvent
1624

1725
class TraceReader:
1826
"""
@@ -121,7 +129,34 @@ def _reopen_if_needed(self) -> bool:
121129
return False
122130
return True
123131

124-
def __iter__(self) -> Generator[str, None, None]:
132+
def _parse_line(self, line: str) -> Optional[BaseSyscallEvent]:
133+
"""Attempt to parse a supported syscall line into a Pydantic model."""
134+
# execve
135+
if m := EXECVE_PATTERN.search(line):
136+
ts, pid_str, cmd_args = m.groups()
137+
pid = int(pid_str)
138+
parts = cmd_args.split() if cmd_args else []
139+
if parts:
140+
return ExecveEvent(timestamp=ts, pid=pid, command=parts[0].strip('"'), args=parts[1:])
141+
142+
# fork
143+
if m := FORK_PATTERN.search(line):
144+
ts, pid_str, child_pid_str = m.groups()
145+
return ForkEvent(timestamp=ts, pid=int(pid_str), child_pid=int(child_pid_str))
146+
147+
# clone
148+
if m := CLONE_PATTERN.search(line):
149+
ts, pid_str, child_pid_str, flags = m.groups()
150+
return CloneEvent(timestamp=ts, pid=int(pid_str), child_pid=int(child_pid_str), flags=flags)
151+
152+
# connect
153+
if m := CONNECT_PATTERN.search(line):
154+
ts, pid_str, fd_str, addr = m.groups()
155+
return ConnectEvent(timestamp=ts, pid=int(pid_str), fd=int(fd_str), address=addr)
156+
157+
return None
158+
159+
def __iter__(self) -> Generator[Union[str, BaseSyscallEvent], None, None]:
125160
"""
126161
Iterate over lines from the trace pipe.
127162
@@ -168,8 +203,12 @@ def __iter__(self) -> Generator[str, None, None]:
168203
text = data.decode("utf-8", errors="replace")
169204

170205
for line in text.splitlines():
171-
if line.strip(): # Skip empty lines
172-
yield line
206+
if not line.strip():
207+
continue
208+
209+
parsed_evt = self._parse_line(line)
210+
# Yield the parsed object if recognized, else the raw line for backward-compat.
211+
yield parsed_evt if parsed_evt else line
173212
except OSError as e:
174213
if e.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
175214
continue

tests/test_app.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
SyscallTracer,
1212
)
1313

14-
from linux_edr.domain.models.event_models import ExecveEvent
14+
from linux_edr.domain.models.events import ExecveEvent
1515

1616

1717
class TestApp(unittest.TestCase):
@@ -266,30 +266,25 @@ def test_process_event(self, mock_parse_execve, mock_log_info, mock_log_debug):
266266
# Import the method to test it independently
267267
from linux_edr.app import LinuxEDRApp
268268

269-
# Call the method directly
270-
LinuxEDRApp._process_event(app, "test_event")
269+
# Call the method directly with a validated event
270+
LinuxEDRApp._process_event(app, parsed_event)
271271

272272
# The aggregator should receive a validated dict version of the parsed event
273-
expected_dict = {
274-
"timestamp": "12345.6789",
275-
"pid": 1000,
276-
"command": "test_cmd",
277-
"args": ["-a", "-b"],
278-
}
273+
expected_dict = parsed_event.model_dump()
279274
app.agg.add.assert_called_once_with(expected_dict)
280275

281276
# Reset mock and test with verbose_debug=False
282277
mock_log_debug.reset_mock()
283278
app.verbose_debug = False
284279

285-
LinuxEDRApp._process_event(app, "test_event2")
280+
LinuxEDRApp._process_event(app, parsed_event)
286281
app.agg.add.assert_called_with(expected_dict)
287282

288283
# Test with debug=False
289284
mock_log_debug.reset_mock()
290285
app.debug = False
291286

292-
LinuxEDRApp._process_event(app, "test_event3")
287+
LinuxEDRApp._process_event(app, parsed_event)
293288
app.agg.add.assert_called_with(expected_dict)
294289
mock_log_debug.assert_not_called()
295290

0 commit comments

Comments
 (0)