From 8a771ffd8ced1fea1bf3b8df0dacfd56c1eecf0f Mon Sep 17 00:00:00 2001 From: Rob Taylor Date: Mon, 28 Apr 2025 16:47:37 +0100 Subject: [PATCH 1/7] wip --- amaranth/sim/_base.py | 25 ++- amaranth/sim/_vcdwriter.py | 313 ++++++++++++++++++++++++++++++++++++ amaranth/sim/pysim.py | 321 +++---------------------------------- amaranth/sim/trace.py | 275 +++++++++++++++++++++++++++++++ pyproject.toml | 1 + 5 files changed, 631 insertions(+), 304 deletions(-) create mode 100644 amaranth/sim/_vcdwriter.py create mode 100644 amaranth/sim/trace.py diff --git a/amaranth/sim/_base.py b/amaranth/sim/_base.py index 7e58112a4..e2c784a02 100644 --- a/amaranth/sim/_base.py +++ b/amaranth/sim/_base.py @@ -1,4 +1,25 @@ -__all__ = ["BaseProcess", "BaseSignalState", "BaseMemoryState", "BaseEngineState", "BaseEngine"] +from abc import ABCMeta, abstractmethod + +__all__ = ["BaseProcess", "BaseSignalState", "BaseMemoryState", "BaseEngineState", "BaseEngine", "Observer"] + + +class Observer(metaclass=ABCMeta): + @property + @abstractmethod + def fs_per_delta(self) -> int: + return 0 + + @abstractmethod + def update_signal(self, timestamp, signal): + ... + + @abstractmethod + def update_memory(self, timestamp, memory, addr): + ... + + @abstractmethod + def close(self, timestamp): + assert False class BaseProcess: @@ -97,5 +118,5 @@ def step_design(self): def advance(self): raise NotImplementedError # :nocov: - def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta): + def observe(self, observer: Observer): raise NotImplementedError # :nocov: diff --git a/amaranth/sim/_vcdwriter.py b/amaranth/sim/_vcdwriter.py new file mode 100644 index 000000000..da5c4323d --- /dev/null +++ b/amaranth/sim/_vcdwriter.py @@ -0,0 +1,313 @@ +import enum as py_enum +import itertools +import os.path +import re +import warnings + +from contextlib import contextmanager +from typing import IO + +from ..hdl import * +from ..hdl._mem import MemoryInstance +from ..hdl._ast import SignalDict +from ..lib import data, wiring +from ._base import * +from ._async import * +from ._pyeval import eval_format, eval_value, eval_assign +from ._pyrtl import _FragmentCompiler +from ._pyclock import PyClockProcess + + +class _VCDWriter(Observer): + @staticmethod + def decode_to_vcd(format, value): + return format.format(value).expandtabs().replace(" ", "_") + + def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), fs_per_delta=0): + self.state = state + self._fs_per_delta = fs_per_delta + + # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that + # the simulator is still usable if it's not installed for some reason. + import vcd, vcd.gtkw + + self.close_vcd = False + self.close_gtkw = False + if isinstance(vcd_file, str): + vcd_file = open(vcd_file, "w") + self.close_vcd = True + if isinstance(gtkw_file, str): + gtkw_file = open(gtkw_file, "w") + self.close_gtkw = True + + self.vcd_signal_vars = SignalDict() + self.vcd_memory_vars = {} + self.vcd_file = vcd_file + self.vcd_writer = vcd_file and vcd.VCDWriter(self.vcd_file, + timescale="1 fs", comment="Generated by Amaranth") + + self.gtkw_signal_names = SignalDict() + self.gtkw_memory_names = {} + self.gtkw_file = gtkw_file + self.gtkw_save = gtkw_file and vcd.gtkw.GTKWSave(self.gtkw_file) + + self.traces = traces + + signal_names = SignalDict() + memories = {} + for fragment, fragment_info in design.fragments.items(): + fragment_name = ("bench", *fragment_info.name) + for signal, signal_name in fragment_info.signal_names.items(): + if signal not in signal_names: + signal_names[signal] = set() + signal_names[signal].add((*fragment_name, signal_name)) + if isinstance(fragment, MemoryInstance): + memories[fragment._data] = fragment_name + + trace_names = SignalDict() + assigned_names = set() + def traverse_traces(traces): + if isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(trace, MemoryData._Row): + memory = trace._memory + if not memory in memories: + if memory.name not in assigned_names: + name = memory.name + else: + name = f"{memory.name}${len(assigned_names)}" + assert name not in assigned_names + memories[memory] = ("bench", name) + assigned_names.add(name) + else: + for trace_signal in trace._rhs_signals(): + if trace_signal not in signal_names: + if trace_signal.name not in assigned_names: + name = trace_signal.name + else: + name = f"{trace_signal.name}${len(assigned_names)}" + assert name not in assigned_names + trace_names[trace_signal] = {("bench", name)} + assigned_names.add(name) + elif isinstance(traces, MemoryData): + if not traces in memories: + if traces.name not in assigned_names: + name = traces.name + else: + name = f"{traces.name}${len(assigned_names)}" + assert name not in assigned_names + memories[traces] = ("bench", name) + assigned_names.add(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + for name in traces.signature.members: + traverse_traces(getattr(traces, name)) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for trace in traces.values(): + traverse_traces(trace) + else: + raise TypeError(f"{traces!r} is not a traceable object") + traverse_traces(traces) + + if self.vcd_writer is None: + return + + for signal, names in itertools.chain(signal_names.items(), trace_names.items()): + self.vcd_signal_vars[signal] = [] + self.gtkw_signal_names[signal] = [] + + def add_var(path, var_type, var_size, var_init, value): + vcd_var = None + for (*var_scope, var_name) in names: + if re.search(r"[ \t\r\n]", var_name): + raise NameError("Signal '{}.{}' contains a whitespace character" + .format(".".join(var_scope), var_name)) + + field_name = var_name + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + if path: + field_name = "\\" + field_name + + if vcd_var is None: + vcd_var = self.vcd_writer.register_var( + scope=var_scope, name=field_name, + var_type=var_type, size=var_size, init=var_init) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + self.gtkw_signal_names[signal].append( + ".".join((*var_scope, field_name)) + suffix) + else: + self.vcd_writer.register_alias( + scope=var_scope, name=field_name, + var=vcd_var) + + self.vcd_signal_vars[signal].append((vcd_var, value)) + + def add_wire_var(path, value): + add_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_format_var(path, fmt): + add_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_wire_var(path, fmt._chunks[0][0]) + else: + add_format_var(path, fmt) + + if signal._decoder is not None and not isinstance(signal._decoder, py_enum.EnumMeta): + add_var((), "string", 1, signal._decoder(signal._init), signal._decoder) + else: + add_format((), signal._format) + + for memory, memory_name in memories.items(): + self.vcd_memory_vars[memory] = vcd_vars = [] + self.gtkw_memory_names[memory] = gtkw_names = [] + + for idx, row in enumerate(memory): + row_vcd_vars = [] + row_gtkw_names = [] + var_scope = memory_name[:-1] + + def add_mem_var(path, var_type, var_size, var_init, value): + field_name = "\\" + memory_name[-1] + f"[{idx}]" + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + row_vcd_vars.append((self.vcd_writer.register_var( + scope=var_scope, name=field_name, var_type=var_type, + size=var_size, init=var_init + ), value)) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + row_gtkw_names.append(".".join((*var_scope, field_name)) + suffix) + + def add_mem_wire_var(path, value): + add_mem_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_mem_format_var(path, fmt): + add_mem_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_mem_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_mem_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_mem_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_mem_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_mem_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_mem_wire_var(path, fmt._chunks[0][0]) + else: + add_mem_format_var(path, fmt) + + if isinstance(memory._shape, ShapeCastable): + fmt = memory._shape.format(memory._shape(row), "") + add_mem_format((), fmt) + else: + add_mem_wire_var((), row) + + vcd_vars.append(row_vcd_vars) + gtkw_names.append(row_gtkw_names) + + @property + def fs_per_delta(self) -> int: + return self._fs_per_delta + + def update_signal(self, timestamp, signal): + for (vcd_var, repr) in self.vcd_signal_vars.get(signal, ()): + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + elif isinstance(repr, (Format, Format.Enum)): + var_value = eval_format(self.state, repr) + else: + # decoder + var_value = repr(eval_value(self.state, signal)) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def update_memory(self, timestamp, memory, addr): + if memory not in self.vcd_memory_vars: + return + for vcd_var, repr in self.vcd_memory_vars[memory][addr]: + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + else: + var_value = eval_format(self.state, repr) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def close(self, timestamp): + if self.vcd_writer is not None: + self.vcd_writer.close(timestamp) + + if self.gtkw_save is not None: + self.gtkw_save.dumpfile(self.vcd_file.name) + self.gtkw_save.dumpfile_size(self.vcd_file.tell()) + + self.gtkw_save.treeopen("top") + + def traverse_traces(traces): + if isinstance(traces, data.View): + with self.gtkw_save.group("view"): + traverse_traces(Value.cast(traces)) + elif isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(traces, MemoryData._Row): + for name in self.gtkw_memory_names[traces._memory][traces._index]: + self.gtkw_save.trace(name) + else: + for trace_signal in trace._rhs_signals(): + for name in self.gtkw_signal_names[trace_signal]: + self.gtkw_save.trace(name) + elif isinstance(traces, MemoryData): + for row_names in self.gtkw_memory_names[traces]: + for name in row_names: + self.gtkw_save.trace(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + with self.gtkw_save.group("interface"): + for _, _, member in traces.signature.flatten(traces): + traverse_traces(member) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for name, trace in traces.items(): + with self.gtkw_save.group(name): + traverse_traces(trace) + else: + assert False # :nocov: + + traverse_traces(self.traces) + + if self.close_vcd: + self.vcd_file.close() + if self.close_gtkw: + self.gtkw_file.close() + + diff --git a/amaranth/sim/pysim.py b/amaranth/sim/pysim.py index d98ef7703..eccef7d00 100644 --- a/amaranth/sim/pysim.py +++ b/amaranth/sim/pysim.py @@ -1,4 +1,4 @@ -from contextlib import contextmanager +from contextlib import contextmanager, closing import itertools import re import os.path @@ -13,299 +13,11 @@ from ._pyeval import eval_format, eval_value, eval_assign from ._pyrtl import _FragmentCompiler from ._pyclock import PyClockProcess - +from ._vcdwriter import _VCDWriter __all__ = ["PySimEngine"] -class _VCDWriter: - @staticmethod - def decode_to_vcd(format, value): - return format.format(value).expandtabs().replace(" ", "_") - - def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), fs_per_delta=0): - self.state = state - self.fs_per_delta = fs_per_delta - - # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that - # the simulator is still usable if it's not installed for some reason. - import vcd, vcd.gtkw - - self.close_vcd = False - self.close_gtkw = False - if isinstance(vcd_file, str): - vcd_file = open(vcd_file, "w") - self.close_vcd = True - if isinstance(gtkw_file, str): - gtkw_file = open(gtkw_file, "w") - self.close_gtkw = True - - self.vcd_signal_vars = SignalDict() - self.vcd_memory_vars = {} - self.vcd_file = vcd_file - self.vcd_writer = vcd_file and vcd.VCDWriter(self.vcd_file, - timescale="1 fs", comment="Generated by Amaranth") - - self.gtkw_signal_names = SignalDict() - self.gtkw_memory_names = {} - self.gtkw_file = gtkw_file - self.gtkw_save = gtkw_file and vcd.gtkw.GTKWSave(self.gtkw_file) - - self.traces = traces - - signal_names = SignalDict() - memories = {} - for fragment, fragment_info in design.fragments.items(): - fragment_name = ("bench", *fragment_info.name) - for signal, signal_name in fragment_info.signal_names.items(): - if signal not in signal_names: - signal_names[signal] = set() - signal_names[signal].add((*fragment_name, signal_name)) - if isinstance(fragment, MemoryInstance): - memories[fragment._data] = fragment_name - - trace_names = SignalDict() - assigned_names = set() - def traverse_traces(traces): - if isinstance(traces, ValueLike): - trace = Value.cast(traces) - if isinstance(trace, MemoryData._Row): - memory = trace._memory - if not memory in memories: - if memory.name not in assigned_names: - name = memory.name - else: - name = f"{memory.name}${len(assigned_names)}" - assert name not in assigned_names - memories[memory] = ("bench", name) - assigned_names.add(name) - else: - for trace_signal in trace._rhs_signals(): - if trace_signal not in signal_names: - if trace_signal.name not in assigned_names: - name = trace_signal.name - else: - name = f"{trace_signal.name}${len(assigned_names)}" - assert name not in assigned_names - trace_names[trace_signal] = {("bench", name)} - assigned_names.add(name) - elif isinstance(traces, MemoryData): - if not traces in memories: - if traces.name not in assigned_names: - name = traces.name - else: - name = f"{traces.name}${len(assigned_names)}" - assert name not in assigned_names - memories[traces] = ("bench", name) - assigned_names.add(name) - elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): - for name in traces.signature.members: - traverse_traces(getattr(traces, name)) - elif isinstance(traces, list) or isinstance(traces, tuple): - for trace in traces: - traverse_traces(trace) - elif isinstance(traces, dict): - for trace in traces.values(): - traverse_traces(trace) - else: - raise TypeError(f"{traces!r} is not a traceable object") - traverse_traces(traces) - - if self.vcd_writer is None: - return - - for signal, names in itertools.chain(signal_names.items(), trace_names.items()): - self.vcd_signal_vars[signal] = [] - self.gtkw_signal_names[signal] = [] - - def add_var(path, var_type, var_size, var_init, value): - vcd_var = None - for (*var_scope, var_name) in names: - if re.search(r"[ \t\r\n]", var_name): - raise NameError("Signal '{}.{}' contains a whitespace character" - .format(".".join(var_scope), var_name)) - - field_name = var_name - for item in path: - if isinstance(item, int): - field_name += f"[{item}]" - else: - field_name += f".{item}" - if path: - field_name = "\\" + field_name - - if vcd_var is None: - vcd_var = self.vcd_writer.register_var( - scope=var_scope, name=field_name, - var_type=var_type, size=var_size, init=var_init) - if var_size > 1: - suffix = f"[{var_size - 1}:0]" - else: - suffix = "" - self.gtkw_signal_names[signal].append( - ".".join((*var_scope, field_name)) + suffix) - else: - self.vcd_writer.register_alias( - scope=var_scope, name=field_name, - var=vcd_var) - - self.vcd_signal_vars[signal].append((vcd_var, value)) - - def add_wire_var(path, value): - add_var(path, "wire", len(value), eval_value(self.state, value), value) - - def add_format_var(path, fmt): - add_var(path, "string", 1, eval_format(self.state, fmt), fmt) - - def add_format(path, fmt): - if isinstance(fmt, Format.Struct): - add_wire_var(path, fmt._value) - for name, subfmt in fmt._fields.items(): - add_format(path + (name,), subfmt) - elif isinstance(fmt, Format.Array): - add_wire_var(path, fmt._value) - for idx, subfmt in enumerate(fmt._fields): - add_format(path + (idx,), subfmt) - elif (isinstance(fmt, Format) and - len(fmt._chunks) == 1 and - isinstance(fmt._chunks[0], tuple) and - fmt._chunks[0][1] == ""): - add_wire_var(path, fmt._chunks[0][0]) - else: - add_format_var(path, fmt) - - if signal._decoder is not None and not isinstance(signal._decoder, py_enum.EnumMeta): - add_var((), "string", 1, signal._decoder(signal._init), signal._decoder) - else: - add_format((), signal._format) - - for memory, memory_name in memories.items(): - self.vcd_memory_vars[memory] = vcd_vars = [] - self.gtkw_memory_names[memory] = gtkw_names = [] - - for idx, row in enumerate(memory): - row_vcd_vars = [] - row_gtkw_names = [] - var_scope = memory_name[:-1] - - def add_mem_var(path, var_type, var_size, var_init, value): - field_name = "\\" + memory_name[-1] + f"[{idx}]" - for item in path: - if isinstance(item, int): - field_name += f"[{item}]" - else: - field_name += f".{item}" - row_vcd_vars.append((self.vcd_writer.register_var( - scope=var_scope, name=field_name, var_type=var_type, - size=var_size, init=var_init - ), value)) - if var_size > 1: - suffix = f"[{var_size - 1}:0]" - else: - suffix = "" - row_gtkw_names.append(".".join((*var_scope, field_name)) + suffix) - - def add_mem_wire_var(path, value): - add_mem_var(path, "wire", len(value), eval_value(self.state, value), value) - - def add_mem_format_var(path, fmt): - add_mem_var(path, "string", 1, eval_format(self.state, fmt), fmt) - - def add_mem_format(path, fmt): - if isinstance(fmt, Format.Struct): - add_mem_wire_var(path, fmt._value) - for name, subfmt in fmt._fields.items(): - add_mem_format(path + (name,), subfmt) - elif isinstance(fmt, Format.Array): - add_mem_wire_var(path, fmt._value) - for idx, subfmt in enumerate(fmt._fields): - add_mem_format(path + (idx,), subfmt) - elif (isinstance(fmt, Format) and - len(fmt._chunks) == 1 and - isinstance(fmt._chunks[0], tuple) and - fmt._chunks[0][1] == ""): - add_mem_wire_var(path, fmt._chunks[0][0]) - else: - add_mem_format_var(path, fmt) - - if isinstance(memory._shape, ShapeCastable): - fmt = memory._shape.format(memory._shape(row), "") - add_mem_format((), fmt) - else: - add_mem_wire_var((), row) - - vcd_vars.append(row_vcd_vars) - gtkw_names.append(row_gtkw_names) - - def update_signal(self, timestamp, signal): - for (vcd_var, repr) in self.vcd_signal_vars.get(signal, ()): - if isinstance(repr, Value): - var_value = eval_value(self.state, repr) - elif isinstance(repr, (Format, Format.Enum)): - var_value = eval_format(self.state, repr) - else: - # decoder - var_value = repr(eval_value(self.state, signal)) - self.vcd_writer.change(vcd_var, timestamp, var_value) - - def update_memory(self, timestamp, memory, addr): - if memory not in self.vcd_memory_vars: - return - for vcd_var, repr in self.vcd_memory_vars[memory][addr]: - if isinstance(repr, Value): - var_value = eval_value(self.state, repr) - else: - var_value = eval_format(self.state, repr) - self.vcd_writer.change(vcd_var, timestamp, var_value) - - def close(self, timestamp): - if self.vcd_writer is not None: - self.vcd_writer.close(timestamp) - - if self.gtkw_save is not None: - self.gtkw_save.dumpfile(self.vcd_file.name) - self.gtkw_save.dumpfile_size(self.vcd_file.tell()) - - self.gtkw_save.treeopen("top") - - def traverse_traces(traces): - if isinstance(traces, data.View): - with self.gtkw_save.group("view"): - traverse_traces(Value.cast(traces)) - elif isinstance(traces, ValueLike): - trace = Value.cast(traces) - if isinstance(traces, MemoryData._Row): - for name in self.gtkw_memory_names[traces._memory][traces._index]: - self.gtkw_save.trace(name) - else: - for trace_signal in trace._rhs_signals(): - for name in self.gtkw_signal_names[trace_signal]: - self.gtkw_save.trace(name) - elif isinstance(traces, MemoryData): - for row_names in self.gtkw_memory_names[traces]: - for name in row_names: - self.gtkw_save.trace(name) - elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): - with self.gtkw_save.group("interface"): - for _, _, member in traces.signature.flatten(traces): - traverse_traces(member) - elif isinstance(traces, list) or isinstance(traces, tuple): - for trace in traces: - traverse_traces(trace) - elif isinstance(traces, dict): - for name, trace in traces.items(): - with self.gtkw_save.group(name): - traverse_traces(trace) - else: - assert False # :nocov: - traverse_traces(self.traces) - - if self.close_vcd: - self.vcd_file.close() - if self.close_gtkw: - self.gtkw_file.close() - - class _PyTimeline: def __init__(self): self.now = 0 @@ -607,7 +319,7 @@ def __init__(self, design): self._processes = _FragmentCompiler(self._state)(self._design.fragment) self._testbenches = [] self._delta_cycles = 0 - self._vcd_writers = [] + self._observers = [] self._active_triggers = set() @property @@ -658,7 +370,7 @@ def step_design(self): # Performs the three phases of a delta cycle in a loop: converged = False while not converged: - changed = set() if self._vcd_writers else None + changed = set() if self._observers else None # 1a. trigger: run every active trigger, sampling values and waking up processes; for trigger_state in self._active_triggers: @@ -677,15 +389,15 @@ def step_design(self): # 2. commit: apply queued signal changes, activating any awaited triggers. converged = self._state.commit(changed) - for vcd_writer in self._vcd_writers: - now_plus_deltas = self._now_plus_deltas(vcd_writer.fs_per_delta) + for observer in self._observers: + now_plus_deltas = self._now_plus_deltas(observer.fs_per_delta) for change in changed: if type(change) is _PySignalState: signal_state = change - vcd_writer.update_signal(now_plus_deltas, + observer.update_signal(now_plus_deltas, signal_state.signal) elif type(change) is _PyMemoryChange: - vcd_writer.update_memory(now_plus_deltas, change.state.memory, + observer.update_memory(now_plus_deltas, change.state.memory, change.addr) else: assert False # :nocov: @@ -721,12 +433,17 @@ def advance(self): return False @contextmanager - def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta): - vcd_writer = _VCDWriter(self._state, self._design, - vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces, fs_per_delta=fs_per_delta) + def observe(self, observer: Observer): try: - self._vcd_writers.append(vcd_writer) + self._observers.append(observer) yield finally: - vcd_writer.close(self._now_plus_deltas(vcd_writer.fs_per_delta)) - self._vcd_writers.remove(vcd_writer) + observer.close(self._now_plus_deltas(observer.fs_per_delta)) + self._observers.remove(observer) + + @contextmanager + def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta): + observer = _VCDWriter(self._state, self._design, vcd_file=vcd_file, gtkw_file=gtkw_file, + traces=traces, fs_per_delta=fs_per_delta) + with self.observe(observer): + yield diff --git a/amaranth/sim/trace.py b/amaranth/sim/trace.py new file mode 100644 index 000000000..485d93a8a --- /dev/null +++ b/amaranth/sim/trace.py @@ -0,0 +1,275 @@ +import itertools +import re +import enum as py_enum + +import pystore + +from ..hdl import * +from ..hdl._mem import MemoryInstance +from ..hdl._ast import SignalDict +from ..lib import data, wiring +from ._base import * +from ._async import * + + +class TimeSeriesWriter: + def __init__(self, state, design, *, pystore_path, collection_name, traces=()): + self.state = state + + self._signal_vars = SignalDict() + self._memory_vars = {} + + self.traces = traces + + pystore.set_path(pystore_path) # ugh api fail + store = pystore.store('amaranth') + self._collection = store.collection(collection_name) + signal_names = SignalDict() + memories = {} + for fragment, fragment_info in design.fragments.items(): + fragment_name = ("bench", *fragment_info.name) + for signal, signal_name in fragment_info.signal_names.items(): + if signal not in signal_names: + signal_names[signal] = set() + signal_names[signal].add((*fragment_name, signal_name)) + if isinstance(fragment, MemoryInstance): + memories[fragment._data] = fragment_name + + trace_names = SignalDict() + assigned_names = set() + def traverse_traces(traces): + if isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(trace, MemoryData._Row): + memory = trace._memory + if not memory in memories: + if memory.name not in assigned_names: + name = memory.name + else: + name = f"{memory.name}${len(assigned_names)}" + assert name not in assigned_names + memories[memory] = ("bench", name) + assigned_names.add(name) + else: + for trace_signal in trace._rhs_signals(): + if trace_signal not in signal_names: + if trace_signal.name not in assigned_names: + name = trace_signal.name + else: + name = f"{trace_signal.name}${len(assigned_names)}" + assert name not in assigned_names + trace_names[trace_signal] = {("bench", name)} + assigned_names.add(name) + elif isinstance(traces, MemoryData): + if not traces in memories: + if traces.name not in assigned_names: + name = traces.name + else: + name = f"{traces.name}${len(assigned_names)}" + assert name not in assigned_names + memories[traces] = ("bench", name) + assigned_names.add(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + for name in traces.signature.members: + traverse_traces(getattr(traces, name)) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for trace in traces.values(): + traverse_traces(trace) + else: + raise TypeError(f"{traces!r} is not a traceable object") + traverse_traces(traces) + + for signal, names in itertools.chain(signal_names.items(), trace_names.items()): + self._signal_vars[signal] = [] + + def add_var(path, var_type, var_size, var_init, value): + vcd_var = None + for (*var_scope, var_name) in names: + if re.search(r"[ \t\r\n]", var_name): + raise NameError("Signal '{}.{}' contains a whitespace character" + .format(".".join(var_scope), var_name)) + + field_name = var_name + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + if path: + field_name = "\\" + field_name + + if vcd_var is None: + self.register_var( + scope=var_scope, name=field_name, + var_type=var_type, size=var_size, init=var_init) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + self._signal_names[signal].append( + ".".join((*var_scope, field_name)) + suffix) + else: + self.vcd_writer.register_alias( + scope=var_scope, name=field_name, + var=vcd_var) + + self.vcd_signal_vars[signal].append((vcd_var, value)) + + def add_wire_var(path, value): + add_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_format_var(path, fmt): + add_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_wire_var(path, fmt._chunks[0][0]) + else: + add_format_var(path, fmt) + + if signal._decoder is not None and not isinstance(signal._decoder, py_enum.EnumMeta): + add_var((), "string", 1, signal._decoder(signal._init), signal._decoder) + else: + add_format((), signal._format) + + for memory, memory_name in memories.items(): + self.vcd_memory_vars[memory] = vcd_vars = [] + self.gtkw_memory_names[memory] = gtkw_names = [] + + for idx, row in enumerate(memory): + row_vcd_vars = [] + row_gtkw_names = [] + var_scope = memory_name[:-1] + + def add_mem_var(path, var_type, var_size, var_init, value): + field_name = "\\" + memory_name[-1] + f"[{idx}]" + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + row_vcd_vars.append((self.vcd_writer.register_var( + scope=var_scope, name=field_name, var_type=var_type, + size=var_size, init=var_init + ), value)) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + row_gtkw_names.append(".".join((*var_scope, field_name)) + suffix) + + def add_mem_wire_var(path, value): + add_mem_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_mem_format_var(path, fmt): + add_mem_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_mem_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_mem_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_mem_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_mem_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_mem_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_mem_wire_var(path, fmt._chunks[0][0]) + else: + add_mem_format_var(path, fmt) + + if isinstance(memory._shape, ShapeCastable): + fmt = memory._shape.format(memory._shape(row), "") + add_mem_format((), fmt) + else: + add_mem_wire_var((), row) + + vcd_vars.append(row_vcd_vars) + gtkw_names.append(row_gtkw_names) + + def update_signal(self, timestamp, signal): + for (vcd_var, repr) in self.vcd_signal_vars.get(signal, ()): + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + elif isinstance(repr, (Format, Format.Enum)): + var_value = eval_format(self.state, repr) + else: + # decoder + var_value = repr(eval_value(self.state, signal)) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def update_memory(self, timestamp, memory, addr): + if memory not in self.vcd_memory_vars: + return + for vcd_var, repr in self.vcd_memory_vars[memory][addr]: + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + else: + var_value = eval_format(self.state, repr) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def close(self, timestamp): + if self.vcd_writer is not None: + self.vcd_writer.close(timestamp) + + if self.gtkw_save is not None: + self.gtkw_save.dumpfile(self.vcd_file.name) + self.gtkw_save.dumpfile_size(self.vcd_file.tell()) + + self.gtkw_save.treeopen("top") + + def traverse_traces(traces): + if isinstance(traces, data.View): + with self.gtkw_save.group("view"): + traverse_traces(Value.cast(traces)) + elif isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(traces, MemoryData._Row): + for name in self.gtkw_memory_names[traces._memory][traces._index]: + self.gtkw_save.trace(name) + else: + for trace_signal in trace._rhs_signals(): + for name in self.gtkw_signal_names[trace_signal]: + self.gtkw_save.trace(name) + elif isinstance(traces, MemoryData): + for row_names in self.gtkw_memory_names[traces]: + for name in row_names: + self.gtkw_save.trace(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + with self.gtkw_save.group("interface"): + for _, _, member in traces.signature.flatten(traces): + traverse_traces(member) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for name, trace in traces.items(): + with self.gtkw_save.group(name): + traverse_traces(trace) + else: + assert False # :nocov: + traverse_traces(self.traces) + + if self.close_vcd: + self.vcd_file.close() + if self.close_gtkw: + self.gtkw_file.close() + + diff --git a/pyproject.toml b/pyproject.toml index 60a1ea849..2764cb028 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "jschon~=0.11.1", # for amaranth.lib.meta "pyvcd>=0.2.2,<0.5", # for amaranth.sim.pysim "Jinja2~=3.0", # for amaranth.build + "pystore>=0.1.24", ] [project.optional-dependencies] From 72611e6e15cc77795766996cb4880236e8684103 Mon Sep 17 00:00:00 2001 From: Ivy Yu Date: Mon, 14 Jul 2025 23:11:50 +0800 Subject: [PATCH 2/7] started observer & toggle coverage implementation --- amaranth/sim/_base.py | 62 ++++++++++++++++++++++++++++++++++++-- amaranth/sim/_coverage.py | 54 +++++++++++++++++++++++++++++++++ amaranth/sim/_test_base.py | 57 +++++++++++++++++++++++++++++++++++ 3 files changed, 171 insertions(+), 2 deletions(-) create mode 100644 amaranth/sim/_coverage.py create mode 100644 amaranth/sim/_test_base.py diff --git a/amaranth/sim/_base.py b/amaranth/sim/_base.py index e2c784a02..e0d3d7770 100644 --- a/amaranth/sim/_base.py +++ b/amaranth/sim/_base.py @@ -1,6 +1,6 @@ from abc import ABCMeta, abstractmethod -__all__ = ["BaseProcess", "BaseSignalState", "BaseMemoryState", "BaseEngineState", "BaseEngine", "Observer"] +__all__ = ["BaseProcess", "BaseSignalState", "BaseMemoryState", "BaseEngineState", "BaseEngine", "Observer", "DummyEngine", "PrintObserver"] class Observer(metaclass=ABCMeta): @@ -65,7 +65,9 @@ def reset(self): raise NotImplementedError # :nocov: def get_signal(self, signal): - raise NotImplementedError # :nocov: + val = self._sim.read_signal(signal) + print(f"[DEBUG] Raw value read: {val} for signal: {signal}") + return int(val) def get_memory(self, memory): raise NotImplementedError # :nocov: @@ -83,6 +85,26 @@ def add_memory_waker(self, memory, waker): class BaseEngine: + # add storage for observers + def __init__(self): + self._observers = [] + + # append observer to list + def add_observer(self, observer: Observer): + self._observers.append(observer) + + def notify_signal_change(self, signal): + for observer in self._observers: + observer.update_signal(self.now, signal) + + def notify_memory_change(self, memory, addr): + for observer in self._observers: + observer.update_memory(self.now, memory, addr) + + def notify_close(self): + for observer in self._observers: + observer.close(self.now) + @property def state(self) -> BaseEngineState: raise NotImplementedError # :nocov: @@ -120,3 +142,39 @@ def advance(self): def observe(self, observer: Observer): raise NotImplementedError # :nocov: + +class DummyEngine(BaseEngine): + def __init__(self): + super().__init__() + self._now = 0 + + @property + def now(self): + return self._now + + def notify_signal_change(self, signal): + for obs in self._observers: + obs.update_signal(self.now, signal) + + def notify_memory_change(self, memory, addr): + for obs in self._observers: + obs.update_memory(self.now, memory, addr) + + def notify_close(self): + for obs in self._observers: + obs.close(self.now) + + +class PrintObserver(Observer): + @property + def fs_per_delta(self) -> int: + return 1 + + def update_signal(self, timestamp, signal): + print(f"[{timestamp}] Signal changed: {signal}") + + def update_memory(self, timestamp, memory, addr): + print(f"[{timestamp}] Memory write at {addr}") + + def close(self, timestamp): + print(f"[{timestamp}] Simulation ended") diff --git a/amaranth/sim/_coverage.py b/amaranth/sim/_coverage.py new file mode 100644 index 000000000..c8042f29f --- /dev/null +++ b/amaranth/sim/_coverage.py @@ -0,0 +1,54 @@ +from ._base import Observer + +class ToggleCoverageObserver(Observer): + def __init__(self, state): + self.state = state + self._prev_values = {} + self._toggles = {} + self._signal_names = {} + + @property + def fs_per_delta(self) -> int: + return 0 + + def update_signal(self, timestamp, signal): + if getattr(signal, "name", "") != "out": + return + + sig_id = id(signal) + curr_val = int(self.state.get_signal(signal)) #FIX??? + print(f"[DEBUG] Signal {getattr(signal, 'name', signal)} = {curr_val}") + + if sig_id not in self._prev_values: + self._prev_values[sig_id] = curr_val + self._toggles[sig_id] = {"0->1": False, "1->0": False} + self._signal_names[sig_id] = signal.name + return + + prev_val = self._prev_values[sig_id] + + if prev_val == 0 and curr_val == 1: + self._toggles[sig_id]["0->1"] = True + elif prev_val == 1 and curr_val == 0: + self._toggles[sig_id]["1->0"] = True + + self._prev_values[sig_id] = curr_val + + def update_memory(self, timestamp, memory, addr): + pass + + def get_results(self): + return { + self._signal_names[sig_id]: toggles + for sig_id, toggles in self._toggles.items() + } + + def close(self, timestamp): + results = self.get_results() + print("=== Toggle Coverage Report ===") + for signal, toggles in results.items(): + print(f"{signal}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + + + diff --git a/amaranth/sim/_test_base.py b/amaranth/sim/_test_base.py new file mode 100644 index 000000000..9487ed964 --- /dev/null +++ b/amaranth/sim/_test_base.py @@ -0,0 +1,57 @@ +from _base import DummyEngine, PrintObserver + +# def test_print_observer(): +# engine = DummyEngine() +# observer = PrintObserver() +# engine.add_observer(observer) + +# engine.notify_signal_change("CLK") +# engine.notify_memory_change("RAM", 0x10) +# engine.notify_close() + +# if __name__ == "__main__": +# test_print_observer() + +from amaranth import * +from amaranth.sim import Tick, Simulator +from amaranth.sim._coverage import ToggleCoverageObserver + +class ToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(name="out") + + def elaborate(self, platform): + m = Module() + counter = Signal(2, name="counter") + m.d.sync += counter.eq(counter + 1) + m.d.comb += self.out.eq(counter[1]) + return m + + +def run_toggle_coverage_test(): + dut = ToggleDUT() + sim = Simulator(dut) + + toggle_cov = ToggleCoverageObserver(sim._engine.state) + sim._engine.add_observer(toggle_cov) + + def process(): + for _ in range(8): # Run for 8 cycles + yield Tick() + sim._engine.notify_signal_change(dut.out) + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + + results = toggle_cov.get_results() + print("Toggle coverage results:") + for signal_name, toggles in results.items(): + print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + assert results["out"]["0->1"] + assert results["out"]["1->0"] + + +if __name__ == "__main__": + run_toggle_coverage_test() From f6580d4d2204a6ba0517651ae1b37c599c7bd974 Mon Sep 17 00:00:00 2001 From: Rob Taylor Date: Thu, 17 Jul 2025 12:05:28 +0100 Subject: [PATCH 3/7] Add setting of fs_per_delta to base observer class --- amaranth/sim/_base.py | 10 ++++++++-- amaranth/sim/_coverage.py | 11 ++++------- amaranth/sim/_vcdwriter.py | 4 ++-- amaranth/sim/trace.py | 17 +++++++++-------- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/amaranth/sim/_base.py b/amaranth/sim/_base.py index e0d3d7770..7a8abc0f2 100644 --- a/amaranth/sim/_base.py +++ b/amaranth/sim/_base.py @@ -4,10 +4,12 @@ class Observer(metaclass=ABCMeta): + def __init__(self, fs_per_delta=0): + self._fs_per_delta = fs_per_delta + @property - @abstractmethod def fs_per_delta(self) -> int: - return 0 + return self._fs_per_delta @abstractmethod def update_signal(self, timestamp, signal): @@ -166,6 +168,10 @@ def notify_close(self): class PrintObserver(Observer): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + @property def fs_per_delta(self) -> int: return 1 diff --git a/amaranth/sim/_coverage.py b/amaranth/sim/_coverage.py index c8042f29f..96ad6c339 100644 --- a/amaranth/sim/_coverage.py +++ b/amaranth/sim/_coverage.py @@ -1,20 +1,17 @@ from ._base import Observer class ToggleCoverageObserver(Observer): - def __init__(self, state): + def __init__(self, state, **kwargs): self.state = state self._prev_values = {} - self._toggles = {} + self._toggles = {} self._signal_names = {} - - @property - def fs_per_delta(self) -> int: - return 0 + super().__init__(**kwargs) def update_signal(self, timestamp, signal): if getattr(signal, "name", "") != "out": return - + sig_id = id(signal) curr_val = int(self.state.get_signal(signal)) #FIX??? print(f"[DEBUG] Signal {getattr(signal, 'name', signal)} = {curr_val}") diff --git a/amaranth/sim/_vcdwriter.py b/amaranth/sim/_vcdwriter.py index da5c4323d..a93ec5a4e 100644 --- a/amaranth/sim/_vcdwriter.py +++ b/amaranth/sim/_vcdwriter.py @@ -23,9 +23,9 @@ class _VCDWriter(Observer): def decode_to_vcd(format, value): return format.format(value).expandtabs().replace(" ", "_") - def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), fs_per_delta=0): + def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), **kwargs): + super.__init__(**kwargs) self.state = state - self._fs_per_delta = fs_per_delta # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that # the simulator is still usable if it's not installed for some reason. diff --git a/amaranth/sim/trace.py b/amaranth/sim/trace.py index 485d93a8a..2df6e77f6 100644 --- a/amaranth/sim/trace.py +++ b/amaranth/sim/trace.py @@ -2,7 +2,7 @@ import re import enum as py_enum -import pystore +import nanots from ..hdl import * from ..hdl._mem import MemoryInstance @@ -12,8 +12,10 @@ from ._async import * -class TimeSeriesWriter: - def __init__(self, state, design, *, pystore_path, collection_name, traces=()): +class TimeSeriesWriter(Observer): + def __init__(self, state, design, *, db_filename, traces=(), **kwargs): + super().__init__(**kwargs) + self.state = state self._signal_vars = SignalDict() @@ -21,9 +23,8 @@ def __init__(self, state, design, *, pystore_path, collection_name, traces=()): self.traces = traces - pystore.set_path(pystore_path) # ugh api fail - store = pystore.store('amaranth') - self._collection = store.collection(collection_name) + self.writer = nanots.Writer(db_filename, auto_reclaim=False) + signal_names = SignalDict() memories = {} for fragment, fragment_info in design.fragments.items(): @@ -52,7 +53,7 @@ def traverse_traces(traces): assigned_names.add(name) else: for trace_signal in trace._rhs_signals(): - if trace_signal not in signal_names: + if trace_signal and trace_signal not in signal_names: if trace_signal.name not in assigned_names: name = trace_signal.name else: @@ -102,7 +103,7 @@ def add_var(path, var_type, var_size, var_init, value): field_name = "\\" + field_name if vcd_var is None: - self.register_var( + context = writer.create_context(f"{var_scope}@{field_name}" scope=var_scope, name=field_name, var_type=var_type, size=var_size, init=var_init) if var_size > 1: From 1da678b21483f8500fce5bc37e7e04a8168e0623 Mon Sep 17 00:00:00 2001 From: Ivy Yu Date: Thu, 17 Jul 2025 20:33:13 +0800 Subject: [PATCH 4/7] moved test_base.py to ./tests and into a unittest --- amaranth/sim/_test_base.py | 57 --------------------------------- amaranth/sim/_vcdwriter.py | 2 +- amaranth/sim/test_vcd_writer.py | 31 ++++++++++++++++++ pdm_build.py | 14 ++++++-- pyproject.toml | 3 +- tests/test_base.py | 45 ++++++++++++++++++++++++++ 6 files changed, 91 insertions(+), 61 deletions(-) delete mode 100644 amaranth/sim/_test_base.py create mode 100644 amaranth/sim/test_vcd_writer.py create mode 100644 tests/test_base.py diff --git a/amaranth/sim/_test_base.py b/amaranth/sim/_test_base.py deleted file mode 100644 index 9487ed964..000000000 --- a/amaranth/sim/_test_base.py +++ /dev/null @@ -1,57 +0,0 @@ -from _base import DummyEngine, PrintObserver - -# def test_print_observer(): -# engine = DummyEngine() -# observer = PrintObserver() -# engine.add_observer(observer) - -# engine.notify_signal_change("CLK") -# engine.notify_memory_change("RAM", 0x10) -# engine.notify_close() - -# if __name__ == "__main__": -# test_print_observer() - -from amaranth import * -from amaranth.sim import Tick, Simulator -from amaranth.sim._coverage import ToggleCoverageObserver - -class ToggleDUT(Elaboratable): - def __init__(self): - self.out = Signal(name="out") - - def elaborate(self, platform): - m = Module() - counter = Signal(2, name="counter") - m.d.sync += counter.eq(counter + 1) - m.d.comb += self.out.eq(counter[1]) - return m - - -def run_toggle_coverage_test(): - dut = ToggleDUT() - sim = Simulator(dut) - - toggle_cov = ToggleCoverageObserver(sim._engine.state) - sim._engine.add_observer(toggle_cov) - - def process(): - for _ in range(8): # Run for 8 cycles - yield Tick() - sim._engine.notify_signal_change(dut.out) - - sim.add_clock(1e-6) - sim.add_testbench(process) - sim.run() - - results = toggle_cov.get_results() - print("Toggle coverage results:") - for signal_name, toggles in results.items(): - print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") - - assert results["out"]["0->1"] - assert results["out"]["1->0"] - - -if __name__ == "__main__": - run_toggle_coverage_test() diff --git a/amaranth/sim/_vcdwriter.py b/amaranth/sim/_vcdwriter.py index a93ec5a4e..7627e5fe2 100644 --- a/amaranth/sim/_vcdwriter.py +++ b/amaranth/sim/_vcdwriter.py @@ -24,7 +24,7 @@ def decode_to_vcd(format, value): return format.format(value).expandtabs().replace(" ", "_") def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), **kwargs): - super.__init__(**kwargs) + super().__init__(**kwargs) self.state = state # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that diff --git a/amaranth/sim/test_vcd_writer.py b/amaranth/sim/test_vcd_writer.py new file mode 100644 index 000000000..cb83035d1 --- /dev/null +++ b/amaranth/sim/test_vcd_writer.py @@ -0,0 +1,31 @@ +from amaranth import Elaboratable, Module, Signal +from amaranth.sim import Simulator + +class Top(Elaboratable): + def __init__(self): + self.a = Signal() + + def elaborate(self, platform): + m = Module() + count = Signal(4) + m.d.sync += [ + count.eq(count + 1), + self.a.eq(count[-1]) + ] + return m + +# Create design and simulator +dut = Top() +sim = Simulator(dut) +sim.add_clock(1e-6) # 1 MHz + +def process(): + for _ in range(10): + yield + +sim.add_sync_process(process) + +# Write VCD output +with open("test_output.vcd", "w") as vcd_file: + with sim.write_vcd(vcd_file=vcd_file, gtkw_file=None, traces=[dut.a]): + sim.run() diff --git a/pdm_build.py b/pdm_build.py index 3ad2682e4..c6215ccac 100644 --- a/pdm_build.py +++ b/pdm_build.py @@ -4,15 +4,25 @@ from pdm.backend._vendor.packaging.version import Version +# def format_version(version: SCMVersion) -> str: +# major, minor, patch = (int(n) for n in str(version.version).split(".")[:3]) +# dirty = f"+{datetime.utcnow():%Y%m%d.%H%M%S}" if version.dirty else "" +# if version.distance is None: +# return f"{major}.{minor}.{patch}{dirty}" +# else: +# return f"{major}.{minor}.{patch}.dev{version.distance}{dirty}" + def format_version(version: SCMVersion) -> str: - major, minor, patch = (int(n) for n in str(version.version).split(".")[:3]) + parts = str(version.version).split(".") + major = int(parts[0]) + minor = int(parts[1]) if len(parts) > 1 else 0 + patch = int(parts[2]) if len(parts) > 2 else 0 dirty = f"+{datetime.utcnow():%Y%m%d.%H%M%S}" if version.dirty else "" if version.distance is None: return f"{major}.{minor}.{patch}{dirty}" else: return f"{major}.{minor}.{patch}.dev{version.distance}{dirty}" - def pdm_build_initialize(context): version = Version(context.config.metadata["version"]) diff --git a/pyproject.toml b/pyproject.toml index 2764cb028..8d4f4f68b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,10 +15,11 @@ license = { text = "BSD-2-clause" } requires-python = "~=3.9" dependencies = [ - "jschon~=0.11.1", # for amaranth.lib.meta + "jschon>=0.11.1", # for amaranth.lib.meta "pyvcd>=0.2.2,<0.5", # for amaranth.sim.pysim "Jinja2~=3.0", # for amaranth.build "pystore>=0.1.24", + "pytest>=8.4.1", ] [project.optional-dependencies] diff --git a/tests/test_base.py b/tests/test_base.py new file mode 100644 index 000000000..4ac7e979e --- /dev/null +++ b/tests/test_base.py @@ -0,0 +1,45 @@ +import unittest +from amaranth import * +from amaranth.sim import Tick, Simulator +from amaranth.sim._coverage import ToggleCoverageObserver + +class ToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(name="out") + + def elaborate(self, platform): + m = Module() + counter = Signal(2, name="counter") + m.d.sync += counter.eq(counter + 1) + m.d.comb += self.out.eq(counter[1]) + return m + + +class ToggleCoverageTest(unittest.TestCase): + def test_toggle_coverage(self): + dut = ToggleDUT() + sim = Simulator(dut) + + toggle_cov = ToggleCoverageObserver(sim._engine.state) + sim._engine.add_observer(toggle_cov) + + def process(): + for _ in range(8): + yield Tick() + sim._engine.notify_signal_change(dut.out) + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + + results = toggle_cov.get_results() + print("Toggle coverage results:") + for signal_name, toggles in results.items(): + print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") + self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") + + +if __name__ == "__main__": + unittest.main() From 493dbbc662a5267af5da616fb788519561df886c Mon Sep 17 00:00:00 2001 From: Ivy Yu Date: Fri, 18 Jul 2025 20:00:47 +0800 Subject: [PATCH 5/7] implemented toggle coverage with regular & irregular toggleDUT, tracking num of signal transitions --- amaranth/sim/_base.py | 4 +- amaranth/sim/_coverage.py | 16 +++++-- pyproject.toml | 98 ++++++++++++++++++++------------------- tests/test_base.py | 45 ------------------ tests/test_coverage.py | 81 ++++++++++++++++++++++++++++++++ 5 files changed, 145 insertions(+), 99 deletions(-) delete mode 100644 tests/test_base.py create mode 100644 tests/test_coverage.py diff --git a/amaranth/sim/_base.py b/amaranth/sim/_base.py index 7a8abc0f2..f3a5b194c 100644 --- a/amaranth/sim/_base.py +++ b/amaranth/sim/_base.py @@ -67,9 +67,7 @@ def reset(self): raise NotImplementedError # :nocov: def get_signal(self, signal): - val = self._sim.read_signal(signal) - print(f"[DEBUG] Raw value read: {val} for signal: {signal}") - return int(val) + raise NotImplementedError # :nocov: def get_memory(self, memory): raise NotImplementedError # :nocov: diff --git a/amaranth/sim/_coverage.py b/amaranth/sim/_coverage.py index 96ad6c339..a5c2d99e3 100644 --- a/amaranth/sim/_coverage.py +++ b/amaranth/sim/_coverage.py @@ -1,4 +1,5 @@ from ._base import Observer +from amaranth.sim._vcdwriter import eval_value, eval_format class ToggleCoverageObserver(Observer): def __init__(self, state, **kwargs): @@ -13,21 +14,28 @@ def update_signal(self, timestamp, signal): return sig_id = id(signal) - curr_val = int(self.state.get_signal(signal)) #FIX??? + try: + val = eval_value(self.state, signal) + except Exception: + val = int(self.state.get_signal(signal)) + try: + curr_val = int(val) + except TypeError: + curr_val = val print(f"[DEBUG] Signal {getattr(signal, 'name', signal)} = {curr_val}") if sig_id not in self._prev_values: self._prev_values[sig_id] = curr_val - self._toggles[sig_id] = {"0->1": False, "1->0": False} + self._toggles[sig_id] = {"0->1": 0, "1->0": 0} self._signal_names[sig_id] = signal.name return prev_val = self._prev_values[sig_id] if prev_val == 0 and curr_val == 1: - self._toggles[sig_id]["0->1"] = True + self._toggles[sig_id]["0->1"] += 1 elif prev_val == 1 and curr_val == 0: - self._toggles[sig_id]["1->0"] = True + self._toggles[sig_id]["1->0"] += 1 self._prev_values[sig_id] = curr_val diff --git a/pyproject.toml b/pyproject.toml index 8d4f4f68b..546e895a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,9 +1,56 @@ # Project metadata + [tool.pdm.version] source = "scm" version_format = "pdm_build:format_version" +[tool.pdm.build] +# If amaranth 0.3 is checked out with git (e.g. as a part of a persistent editable install or +# a git worktree cached by tools like poetry), it can have an empty `nmigen` directory left over, +# which causes a hard error because setuptools cannot determine the top-level package. +# Add a workaround to improve experience for people upgrading from old checkouts. +includes = ["amaranth/"] + +source-includes = [ + ".gitignore", + ".coveragerc", + ".env.toolchain", + "CONTRIBUTING.txt", +] + +# Development workflow configuration + +[tool.pdm.dev-dependencies] +# This version requirement needs to be synchronized with the one in pyproject.toml above! +docs = [ + "sphinx~=7.1", + "sphinxcontrib-platformpicker~=1.4", + "sphinxcontrib-yowasp-wavedrom==1.8", # exact version to avoid changes in rendering + "sphinx-rtd-theme~=2.0", + "sphinx-autobuild", +] +examples = [ + "amaranth-boards @ git+https://github.com/amaranth-lang/amaranth-boards.git" +] + +[tool.pdm.scripts] +_.env_file = ".env.toolchain" + +test.composite = ["test-code", "test-docs", "coverage-xml"] +test-code.env = {PYTHONWARNINGS = "error"} +test-code.cmd = "python -m coverage run -m unittest discover -t . -s tests -v" +test-docs.cmd = "sphinx-build -b doctest docs/ docs/_build" + +document.cmd = "sphinx-build docs/ docs/_build/ -W --keep-going" +document-live.cmd = "sphinx-autobuild docs/ docs/_build/ --watch amaranth" +document-linkcheck.cmd = "sphinx-build docs/ docs/_linkcheck/ -b linkcheck" + +coverage-text.cmd = "python -m coverage report" +coverage-html.cmd = "python -m coverage html" +coverage-xml.cmd = "python -m coverage xml" + +extract-schemas.call = "amaranth.lib.meta:_extract_schemas('amaranth', base_uri='https://amaranth-lang.org/schema/amaranth')" [project] dynamic = ["version"] @@ -48,53 +95,10 @@ amaranth-rpc = "amaranth.rpc:main" requires = ["pdm-backend~=2.3.0"] build-backend = "pdm.backend" -[tool.pdm.build] -# If amaranth 0.3 is checked out with git (e.g. as a part of a persistent editable install or -# a git worktree cached by tools like poetry), it can have an empty `nmigen` directory left over, -# which causes a hard error because setuptools cannot determine the top-level package. -# Add a workaround to improve experience for people upgrading from old checkouts. -includes = ["amaranth/"] -source-includes = [ - ".gitignore", - ".coveragerc", - ".env.toolchain", - "CONTRIBUTING.txt", -] - -# Development workflow configuration - -[tool.pdm.dev-dependencies] -# This version requirement needs to be synchronized with the one in pyproject.toml above! +[dependency-groups] test = [ - "yowasp-yosys>=0.40", - "coverage", -] -docs = [ - "sphinx~=7.1", - "sphinxcontrib-platformpicker~=1.4", - "sphinxcontrib-yowasp-wavedrom==1.8", # exact version to avoid changes in rendering - "sphinx-rtd-theme~=2.0", - "sphinx-autobuild", -] -examples = [ - "amaranth-boards @ git+https://github.com/amaranth-lang/amaranth-boards.git" + "yowasp-yosys>=0.40", + "coverage", + "pytest>=8.4.1", ] - -[tool.pdm.scripts] -_.env_file = ".env.toolchain" - -test.composite = ["test-code", "test-docs", "coverage-xml"] -test-code.env = {PYTHONWARNINGS = "error"} -test-code.cmd = "python -m coverage run -m unittest discover -t . -s tests -v" -test-docs.cmd = "sphinx-build -b doctest docs/ docs/_build" - -document.cmd = "sphinx-build docs/ docs/_build/ -W --keep-going" -document-live.cmd = "sphinx-autobuild docs/ docs/_build/ --watch amaranth" -document-linkcheck.cmd = "sphinx-build docs/ docs/_linkcheck/ -b linkcheck" - -coverage-text.cmd = "python -m coverage report" -coverage-html.cmd = "python -m coverage html" -coverage-xml.cmd = "python -m coverage xml" - -extract-schemas.call = "amaranth.lib.meta:_extract_schemas('amaranth', base_uri='https://amaranth-lang.org/schema/amaranth')" diff --git a/tests/test_base.py b/tests/test_base.py deleted file mode 100644 index 4ac7e979e..000000000 --- a/tests/test_base.py +++ /dev/null @@ -1,45 +0,0 @@ -import unittest -from amaranth import * -from amaranth.sim import Tick, Simulator -from amaranth.sim._coverage import ToggleCoverageObserver - -class ToggleDUT(Elaboratable): - def __init__(self): - self.out = Signal(name="out") - - def elaborate(self, platform): - m = Module() - counter = Signal(2, name="counter") - m.d.sync += counter.eq(counter + 1) - m.d.comb += self.out.eq(counter[1]) - return m - - -class ToggleCoverageTest(unittest.TestCase): - def test_toggle_coverage(self): - dut = ToggleDUT() - sim = Simulator(dut) - - toggle_cov = ToggleCoverageObserver(sim._engine.state) - sim._engine.add_observer(toggle_cov) - - def process(): - for _ in range(8): - yield Tick() - sim._engine.notify_signal_change(dut.out) - - sim.add_clock(1e-6) - sim.add_testbench(process) - sim.run() - - results = toggle_cov.get_results() - print("Toggle coverage results:") - for signal_name, toggles in results.items(): - print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") - - self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") - self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_coverage.py b/tests/test_coverage.py new file mode 100644 index 000000000..66eb4e499 --- /dev/null +++ b/tests/test_coverage.py @@ -0,0 +1,81 @@ +import unittest +from amaranth import * +from amaranth.sim import Tick, Simulator +from amaranth.sim._coverage import ToggleCoverageObserver + +class ToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(name="out") + + def elaborate(self, platform): + m = Module() + counter = Signal(2, name="counter") + m.d.sync += counter.eq(counter + 1) + m.d.comb += self.out.eq(counter[1]) + return m + +class IrregularToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(name="out") + + def elaborate(self, platform): + m = Module() + counter = Signal(4, name="counter") + toggle = Signal() + + m.d.sync += counter.eq(counter + 1) + with m.If((counter == 1) | (counter == 3) | (counter == 6)): + m.d.sync += toggle.eq(~toggle) + m.d.comb += self.out.eq(toggle) + + return m + +class ToggleCoverageTest(unittest.TestCase): + def test_toggle_coverage_regular(self): + dut = ToggleDUT() + sim = Simulator(dut) + + toggle_cov = ToggleCoverageObserver(sim._engine.state) + sim._engine.add_observer(toggle_cov) + + def process(): + for _ in range(16): + yield Tick() + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + + results = toggle_cov.get_results() + print("[Regular] Toggle coverage results:") + for signal_name, toggles in results.items(): + print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") + self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") + + def test_toggle_coverage_irregular(self): + dut = IrregularToggleDUT() + sim = Simulator(dut) + + toggle_cov = ToggleCoverageObserver(sim._engine.state) + sim._engine.add_observer(toggle_cov) + + def process(): + for _ in range(16): + yield Tick() + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + + results = toggle_cov.get_results() + print("[Irregular] Toggle coverage results:") + for signal_name, toggles in results.items(): + print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") + self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") + self.assertGreaterEqual(results["out"]["0->1"], 1) + self.assertGreaterEqual(results["out"]["1->0"], 1) + From 58ebf99ce26a063aa93d3fc0bb6200c8c0e662db Mon Sep 17 00:00:00 2001 From: Ivy Yu Date: Tue, 22 Jul 2025 10:46:32 +0800 Subject: [PATCH 6/7] fixed reviewer's comments, multi-bit toggle DUT works, 0- --- ...rage.Ivys-MacBook-Air.local.61006.XgklDXGx | Bin 0 -> 69632 bytes amaranth/sim/_coverage.py | 29 +++++-- pyproject.toml | 5 ++ tests/test_coverage.py | 79 ++++++++++-------- 4 files changed, 69 insertions(+), 44 deletions(-) create mode 100644 .coverage.Ivys-MacBook-Air.local.61006.XgklDXGx diff --git a/.coverage.Ivys-MacBook-Air.local.61006.XgklDXGx b/.coverage.Ivys-MacBook-Air.local.61006.XgklDXGx new file mode 100644 index 0000000000000000000000000000000000000000..29fff4de07e9b1d962b53f01702625b81666dc80 GIT binary patch literal 69632 zcmeI5O>iV-8OOWl`@1)UWe8!}=3~i*O)`Y#E1PVR*(3|QP^=)RRTd0AJ)KOmGu^Y@ zJxQh%*V#p=vgDwpCn-26UPMn^SPw!KsHJl7;6W~m9>gk@DlN+cQ04RXM`mVsL(y-j zLc+f*Gt<-E@4uht_df6UWaje^-mg2FylPra#gPvSdjwGw7G+rwgbY1~=n-s5+KB~k z=vD07-)=V}Og%lGm%bq663+_KWBF6kWbV2A&Dr1On(1$5pHC`jl}=y-0T2KI5cq5p z=suWB7Dqqbpi6-PJLWT&nLSGR4g#u_?0Mok^NaLP^3_9}XfQaRchNwh4zsaPHPwAPt$ zFYxc?Xq%3AfJ8KX&0tr^2mR#ZvZbwRmS(7$?OAb9uZ?HM&!5dCi^Id>Gn(s@VyX0( z9q@(@^jtZyN)tL2FOEJ{CW>3h)*an6WNlMZ z+m2Sdtb-E_TCfqkes4(YaputDUP}L7eRO(>FO$;0+)unp#u;XSwU?0Hni9J<8|`M5 zkU!CGwpiv+3h%~_xn0Wb(mJN^>_^U<%j9Lx-+}B5kh2{1Qnd~JOk3N^{0X_2a>8}P zFehZ@1VzAsY@#?bED98r+NOP`LH(_Y(soSueT6c=GEGmpr-o9=;{N^OGiTgDVR$N4 z-LbcJ(gQ)Wb$FZ5VduAx8&*mNgM^i$Mng7N-6F$M%Un(LaStZ7sJS+837qyvdUXF8oI zuI%3~bhbjEGQG8%>5e3l#nDmmT-psIKezmshh(wv%_TgSUgdj-MgLP%!JA`drqAHa>OPtRUYp!YstPgjk*ms9cnoN3!SHLI-I zvX2Bu<*~YAvs%a=ldGoL&=kXK8z{vIo9?OGvTMVotOArl4_Q)%?R$ z=z{V^JI$tI)cp2}HN&1N)%~n+vio?I2ql!qs8HHeL@&||ME$d@#tQVOTEv8@HkF!J z+*zjdDQ?uYOXR%PXjySIM{jB}yTmod4Xh2tmQ_omlhl@uk3~N$3|MYerr9$aC8tuDs009sH0T2KI5C8!X009sH0T2Lz-A5oQM#Ll=|3{>^ z1?gRS!Uh5$00JNY0w4eaAOHd&00JNY0wD0|B#?_l_esG=Jf7+L0Am}AOHd&00JNY0w4eaAOHd&00JNo5XeRMiNP#DBoiG?`;!21 zx`2KDpSB_V4?sW*uOI*dAOHd&00JNY0w4eaAOHd&u$u_%nT?P;qVzN7l;pkaCja0I z2!H?xfB*=900@8p2!H?xfB*>W1_Fh}ST`z`9QaM^B5k200@8p2!H?xfB*=900@8p2!H_A{}BTq00JNY0w4eaAOHd& z00JNY0wA#a31rf53D?D65~Q!^-^hPedaW>2csBR%+!Ohk+)v~0Chm;Cl-NvMkTR+B zsbi@~az33-{VMrd;{D{=?2XyKrB9^4lX)!jLT)@;&;BU=R_0HcQE4SQAGr`e5dB4b zE%v3z?a}kG=OQ0OHe1dheJ$#B8l{S(H#JhYk?ZGoT?G%r4g4l-u3GE|e6Qy>!O$%dkQZWIF?3$KKBzTHzn|X# z#bz;i-Eyp(zbu|1tY%_>*o0k>C4a@Ut`SN zAzyUnO_r<5s5qwCQ0t0rlscx}VNGG?zF2o8v`2kHeSW2nM$d+qYCC#^S&}NeDa`Xn z30I6NwyjwXyL&=f5sq@-{YbPE+Htj^(|u!f&som!k+KL5Wj^?)G#T50_asqaxzY#;yvAOHd& z00N(B0$kHe>zYwBtx{!EZ|KG*^-1pJ>dyu;Hp!@_QEjx@6wTbpShpCuUl(YdY3u4~ z8aC)`2sFbrG7qF^0?yG@X3`O^0Z5>;W^U+g_G)%Vtd)kNS!{%T_l}(EO~BkW7kx4` z=3v+m=NWW3mZEBG1}n)miDv7~k}E+9hqwaHRnTmwvaZ%D8+`1yFN%IfN8q1s#cYIW;ip>;m;m;Sv?ij2pJIv3cSLCO8M$A@=-9O6Z z0-KH6&@J6y^P5F(3EQzWg%z|bxe6MyqN%yllUH-wK$Fg<)gc>(c&a{Tg3jXl3T_Wp z^;ISyvCsd{(j2Sws&rObmePga7QR(@ps*+Zdj2c=(*_8$or93A{&vz;@`y|ijRs{2)_=dc9&;kLN_0F4&&km@%Rx^8RAvxHP0tz zNT;K0`NGNnSCJ^ea|1Vt4L!_^BTP~77=<0?EZ1vF> zuHv=9U;kg&%PXK6ZPvvvU&-sl)+k|Vm?T2SC*#mpW6oU>70&WRhuh6B5Ak}jHO`pd zLlU8jxf@pQ`u~DN=|fj_EWJO{I94Em&^>~|_5Y1Bcb puz>&wfB*=900@8p2!H?xfB*=900{ga5jZ)=Gv5=emk)kE?tjd*;D!JI literal 0 HcmV?d00001 diff --git a/amaranth/sim/_coverage.py b/amaranth/sim/_coverage.py index a5c2d99e3..3aba31e5f 100644 --- a/amaranth/sim/_coverage.py +++ b/amaranth/sim/_coverage.py @@ -1,6 +1,11 @@ +from enum import Enum, auto from ._base import Observer from amaranth.sim._vcdwriter import eval_value, eval_format +class ToggleDirection(Enum): + ZERO_TO_ONE = auto() + ONE_TO_ZERO = auto() + class ToggleCoverageObserver(Observer): def __init__(self, state, **kwargs): self.state = state @@ -16,7 +21,7 @@ def update_signal(self, timestamp, signal): sig_id = id(signal) try: val = eval_value(self.state, signal) - except Exception: + except (KeyError, AttributeError): val = int(self.state.get_signal(signal)) try: curr_val = int(val) @@ -26,16 +31,22 @@ def update_signal(self, timestamp, signal): if sig_id not in self._prev_values: self._prev_values[sig_id] = curr_val - self._toggles[sig_id] = {"0->1": 0, "1->0": 0} + self._toggles[sig_id] = { + i: {ToggleDirection.ZERO_TO_ONE: 0, ToggleDirection.ONE_TO_ZERO: 0} + for i in range(signal.shape().width) + } self._signal_names[sig_id] = signal.name return prev_val = self._prev_values[sig_id] - if prev_val == 0 and curr_val == 1: - self._toggles[sig_id]["0->1"] += 1 - elif prev_val == 1 and curr_val == 0: - self._toggles[sig_id]["1->0"] += 1 + for bit in range(signal.shape().width): + prev_bit = (prev_val >> bit) & 1 + curr_bit = (curr_val >> bit) & 1 + if prev_bit == 0 and curr_bit == 1: + self._toggles[sig_id][bit][ToggleDirection.ZERO_TO_ONE] += 1 + elif prev_bit == 1 and curr_bit == 0: + self._toggles[sig_id][bit][ToggleDirection.ONE_TO_ZERO] += 1 self._prev_values[sig_id] = curr_val @@ -51,8 +62,10 @@ def get_results(self): def close(self, timestamp): results = self.get_results() print("=== Toggle Coverage Report ===") - for signal, toggles in results.items(): - print(f"{signal}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + for signal, bit_toggles in results.items(): + print(f"{signal}:") + for bit, counts in bit_toggles.items(): + print(f" Bit {bit}: 0→1={counts[ToggleDirection.ZERO_TO_ONE]}, 1→0={counts[ToggleDirection.ONE_TO_ZERO]}") diff --git a/pyproject.toml b/pyproject.toml index 546e895a9..bb03e19b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -102,3 +102,8 @@ test = [ "coverage", "pytest>=8.4.1", ] +dev = [ + "pytest>=8.4.1", + "pytest-cov>=6.2.1", + "coverage>=7.9.2", +] diff --git a/tests/test_coverage.py b/tests/test_coverage.py index 66eb4e499..a6d181803 100644 --- a/tests/test_coverage.py +++ b/tests/test_coverage.py @@ -1,7 +1,8 @@ import unittest from amaranth import * -from amaranth.sim import Tick, Simulator -from amaranth.sim._coverage import ToggleCoverageObserver +from amaranth.sim import Simulator, Tick +from amaranth.sim._coverage import ToggleCoverageObserver, ToggleDirection + class ToggleDUT(Elaboratable): def __init__(self): @@ -13,7 +14,8 @@ def elaborate(self, platform): m.d.sync += counter.eq(counter + 1) m.d.comb += self.out.eq(counter[1]) return m - + + class IrregularToggleDUT(Elaboratable): def __init__(self): self.out = Signal(name="out") @@ -22,19 +24,27 @@ def elaborate(self, platform): m = Module() counter = Signal(4, name="counter") toggle = Signal() - m.d.sync += counter.eq(counter + 1) with m.If((counter == 1) | (counter == 3) | (counter == 6)): m.d.sync += toggle.eq(~toggle) m.d.comb += self.out.eq(toggle) + return m + +class MultiBitToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(3, name="out") # 3-bit output + def elaborate(self, platform): + m = Module() + counter = Signal(3) + m.d.sync += counter.eq(counter + 1) + m.d.comb += self.out.eq(counter) return m + class ToggleCoverageTest(unittest.TestCase): - def test_toggle_coverage_regular(self): - dut = ToggleDUT() + def run_simulation(self, dut): sim = Simulator(dut) - toggle_cov = ToggleCoverageObserver(sim._engine.state) sim._engine.add_observer(toggle_cov) @@ -45,37 +55,34 @@ def process(): sim.add_clock(1e-6) sim.add_testbench(process) sim.run() + return toggle_cov.get_results() - results = toggle_cov.get_results() - print("[Regular] Toggle coverage results:") - for signal_name, toggles in results.items(): - print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + def assert_bit_toggled(self, toggles, signal_name, bit=0): + self.assertIn(signal_name, toggles) + self.assertIn(bit, toggles[signal_name]) + bit_toggles = toggles[signal_name][bit] + print(f"{signal_name}[{bit}]: 0→1={bit_toggles[ToggleDirection.ZERO_TO_ONE]}, 1→0={bit_toggles[ToggleDirection.ONE_TO_ZERO]}") + self.assertGreaterEqual(bit_toggles[ToggleDirection.ZERO_TO_ONE], 1) + self.assertGreaterEqual(bit_toggles[ToggleDirection.ONE_TO_ZERO], 1) - self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") - self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") + def test_toggle_coverage_regular(self): + print("\n[TEST] Regular toggle coverage") + dut = ToggleDUT() + results = self.run_simulation(dut) + self.assert_bit_toggled(results, "out", bit=0) def test_toggle_coverage_irregular(self): + print("\n[TEST] Irregular toggle coverage") dut = IrregularToggleDUT() - sim = Simulator(dut) - - toggle_cov = ToggleCoverageObserver(sim._engine.state) - sim._engine.add_observer(toggle_cov) - - def process(): - for _ in range(16): - yield Tick() - - sim.add_clock(1e-6) - sim.add_testbench(process) - sim.run() - - results = toggle_cov.get_results() - print("[Irregular] Toggle coverage results:") - for signal_name, toggles in results.items(): - print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") - - self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") - self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") - self.assertGreaterEqual(results["out"]["0->1"], 1) - self.assertGreaterEqual(results["out"]["1->0"], 1) - + results = self.run_simulation(dut) + self.assert_bit_toggled(results, "out", bit=0) + + def test_toggle_coverage_multibit(self): + print("\n[TEST] Multibit toggle coverage") + dut = MultiBitToggleDUT() + results = self.run_simulation(dut) + for bit in range(3): + self.assert_bit_toggled(results, "out", bit=bit) + +if __name__ == "__main__": + unittest.main() From f8306d531e60260cdfd7ac411b8708f70568ed7a Mon Sep 17 00:00:00 2001 From: Ivy Yu Date: Tue, 29 Jul 2025 16:03:15 +0800 Subject: [PATCH 7/7] started statement coverage, works for basic DUT --- 1 changed from string to enum | 3 ++ amaranth/sim/_coverage.py | 23 ++++++++ pyproject.toml | 1 + tests/coverage/test_statement.py | 53 +++++++++++++++++++ .../test_toggle.py} | 1 - 5 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 1 changed from string to enum create mode 100644 tests/coverage/test_statement.py rename tests/{test_coverage.py => coverage/test_toggle.py} (99%) diff --git a/1 changed from string to enum b/1 changed from string to enum new file mode 100644 index 000000000..c94cfff95 --- /dev/null +++ b/1 changed from string to enum @@ -0,0 +1,3 @@ +[ivy 58ebf99] fixed reviewer's comments, multi-bit toggle DUT works, 0- + 4 files changed, 69 insertions(+), 44 deletions(-) + create mode 100644 .coverage.Ivys-MacBook-Air.local.61006.XgklDXGx diff --git a/amaranth/sim/_coverage.py b/amaranth/sim/_coverage.py index 3aba31e5f..4fca9fc7d 100644 --- a/amaranth/sim/_coverage.py +++ b/amaranth/sim/_coverage.py @@ -70,3 +70,26 @@ def close(self, timestamp): +class StatementCoverageObserver(Observer): + def __init__(self, **kwargs): + self._statement_hits = {} + super().__init__(**kwargs) + + def record_statement_hit(self, statement_id: str): + if statement_id not in self._statement_hits: + self._statement_hits[statement_id] = 0 + self._statement_hits[statement_id] += 1 + + def update_signal(self, timestamp, signal): + pass + + def update_memory(self, timestamp, memory, addr): + pass + + def get_result(self): + return self._statement_hits + + def close(self, timestamp): + print("=== Statement Coverage Report ===") + for stmt_id, count in sorted(self._statement_hits.items()): + print(f"{stmt_id}:{'HIT' if count > 0 else 'MISS'} ({count} times)") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index bb03e19b4..add3c8d11 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,7 @@ dependencies = [ "Jinja2~=3.0", # for amaranth.build "pystore>=0.1.24", "pytest>=8.4.1", + "coverage>=7.10.1", ] [project.optional-dependencies] diff --git a/tests/coverage/test_statement.py b/tests/coverage/test_statement.py new file mode 100644 index 000000000..d0f5b9d43 --- /dev/null +++ b/tests/coverage/test_statement.py @@ -0,0 +1,53 @@ +from amaranth import * +from amaranth.sim import Simulator, Tick +from amaranth.sim._coverage import StatementCoverageObserver +import unittest + +class StatementDUT(Elaboratable): + def __init__(self): + self.out = Signal() + + def elaborate(self, platform): + m = Module() + self.counter = Signal(3) + m.d.sync += self.counter.eq(self.counter + 1) + m.d.sync += self.out.eq(self.counter == 0) + return m + + +class StatementCoverageTest(unittest.TestCase): + def run_simulation(self): + observer = StatementCoverageObserver() + dut = StatementDUT() + sim = Simulator(dut) + + def process(): + for _ in range(4): + counter_val = (yield dut.counter) + + if counter_val == 0: + observer.record_statement_hit("if_counter_0") + else: + observer.record_statement_hit("else_counter_nonzero") + + observer.record_statement_hit("counter_increment") + + yield Tick() + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + return observer.get_result() + + def test_statement_coverage(self): + print("\n[TEST] Statement coverage") + results = self.run_simulation() + + for stmt in ["if_counter_0", "else_counter_nonzero", "counter_increment"]: + self.assertIn(stmt, results) + self.assertGreater(results[stmt], 0, f"Statement '{stmt}' was never hit.") + print(f"Statement '{stmt}' hit {results[stmt]} time(s)") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_coverage.py b/tests/coverage/test_toggle.py similarity index 99% rename from tests/test_coverage.py rename to tests/coverage/test_toggle.py index a6d181803..432f3cf0b 100644 --- a/tests/test_coverage.py +++ b/tests/coverage/test_toggle.py @@ -15,7 +15,6 @@ def elaborate(self, platform): m.d.comb += self.out.eq(counter[1]) return m - class IrregularToggleDUT(Elaboratable): def __init__(self): self.out = Signal(name="out")