diff --git a/software/glasgow/abstract.py b/software/glasgow/abstract.py index 2d145bcd2..1b009073e 100644 --- a/software/glasgow/abstract.py +++ b/software/glasgow/abstract.py @@ -1,5 +1,5 @@ from abc import ABCMeta, abstractmethod -from typing import Self, Any, Literal +from typing import Self, Any, Literal, assert_never from collections.abc import Generator from collections.abc import Mapping from dataclasses import dataclass @@ -35,6 +35,26 @@ class PullState(enum.Enum): def enabled(self): return self != self.Float + def combine(self, other: "PullState") -> "PullState": + if self == PullState.Float: + return other + elif other == PullState.Float: + return self + elif self == other: + return self + else: + raise ValueError(f"Can't combine conflicting pulls {self} and {other}") + + def to_bit(self) -> bool: + if self == PullState.Float: + raise ValueError("Can't convert floating PullState into concrete bit value") + elif self == PullState.Low: + return False + elif self == PullState.High: + return True + + assert_never(self) + def __invert__(self): match self: case self.Float: return self @@ -138,6 +158,14 @@ def _legacy_number(self): case GlasgowPort.B: return 8 + self.number case _: assert False + @property + def loc_name(self) -> str: + """Name of the pin location. + + The same as the ``__str__`` of the pin, but without the inversion symbol. + """ + return f"{self.port}{self.number}" + def __invert__(self) -> Self: return GlasgowPin(self.port, self.number, invert=not self.invert) diff --git a/software/glasgow/hardware/assembly.py b/software/glasgow/hardware/assembly.py index 18363a270..853ad4cf5 100644 --- a/software/glasgow/hardware/assembly.py +++ b/software/glasgow/hardware/assembly.py @@ -529,7 +529,7 @@ def add_applet(self, applet: Any) -> Generator[None, None, None]: def add_platform_pin(self, pin: GlasgowPin, port_name: str) -> io.PortLike: assert self._artifact is None, "cannot add a port to a sealed assembly" # TODO: make this a proper error and not an assertion - pin_name = f"{pin.port}{pin.number}" + pin_name = pin.loc_name assert pin_name in self._platform.glasgow_pins, f"unknown or already used pin {pin_name}" self._logger.debug("assigning pin %s to %s%s", port_name, pin_name, " (inverted)" if pin.invert else "") diff --git a/software/glasgow/simulation/assembly.py b/software/glasgow/simulation/assembly.py index 0dc8bcf35..ffd15a25e 100644 --- a/software/glasgow/simulation/assembly.py +++ b/software/glasgow/simulation/assembly.py @@ -1,6 +1,11 @@ +# amaranth: UnusedElaboratable=no +# Suppress warning about unused elaboratable emitted when a simulation run is +# exited via an exception before elaboration + from typing import Any from collections.abc import Generator from contextlib import contextmanager +import itertools import logging from amaranth import * @@ -86,14 +91,91 @@ async def set(self, value): self._parent._context.set(self._signal, value) +class SimulationPin: + _logger: logging.Logger + _name: str + _pin: GlasgowPin + _port: io.SimulationPort | None + _pull_state: PullState | None + + def __init__(self, logger: logging.Logger, pin: GlasgowPin, name: str): + self._logger = logger + self._name = name + self._pin = pin + self._port = None + self._port_taken = False + self._pull_state = None + + @property + def name(self) -> str: + return self._name + + def bind_port(self) -> io.SimulationPort: + assert self._port is None, "Pin has already been used" + self._port = io.SimulationPort("io", 1, name=self._name, invert=self.pin.invert) + return self._port + + @property + def port(self) -> io.SimulationPort: + assert self._port is not None, "Pin has not had a port bound to it yet" + return self._port + + @property + def pin(self) -> GlasgowPin: + return self._pin + + @pin.setter + def pin(self, new_config: GlasgowPin): + if self._pin != new_config: + if self._port is not None: + raise ValueError( + f"Attempted to change pin configuration from {self._pin} to {new_config} " + "after a port has already been bound to it" + ) + self._logger.debug(f"Changing pin config from {self._pin} to {new_config}") + self._pin = new_config + + @property + def pull_state(self) -> PullState | None: + return self._pull_state + + @pull_state.setter + def pull_state(self, new_state: PullState): + if self._pull_state is None: + self._logger.debug(f"Setting pull state to {new_state} for {self.name}") + self._pull_state = new_state + else: + self._logger.debug( + f"Changing pull state from {self._pull_state} to {new_state} for {self.name}" + ) + self._pull_state = new_state + + @property + def effective_pull_state(self) -> PullState: + non_inverted = PullState.Float if self._pull_state is None else self._pull_state + return ~non_inverted if self._pin.invert else non_inverted + + def __repr__(self) -> str: + return ( + "SimulationPin(" + f"name={self._name}, pin={self._pin}, port={self._port}, pull_state={self._pull_state}" + ")" + ) + + class SimulationAssembly(AbstractAssembly): + _pins: dict[str, SimulationPin] + _jumpers: dict[str, int] # {pin_name: jumper_group} + _next_jumper_group: int + def __init__(self): - self._logger = logger - self._pins = {} # {name: io.PortLike} - self._modules = [] # (elaboratable, name) - self._benches = [] # (constructor, background) - self._jumpers = [] # (pin_name...) - self.__context = None + self._logger = logger + self._pins = {} + self._modules = [] # (elaboratable, name) + self._benches = [] # (constructor, background) + self._jumpers = {} + self._next_jumper_group = 0 + self.__context = None @property def sys_clk_period(self) -> "Period": @@ -109,16 +191,43 @@ def add_applet(self, applet: Any) -> Generator[None, None, None]: self._logger = logger def add_platform_pin(self, pin: GlasgowPin, port_name: str) -> io.PortLike: - pin_name = f"{pin.port}{pin.number}" - port = io.SimulationPort("io", 1, name=pin_name) - self._pins[pin_name] = port - return port + # TODO: do we want to support use case of multiple buffers attaching to the same pin? + # (mustn't output at the same time at different levels) + pin_name = pin.loc_name + + sim_pin: SimulationPin + if pin_name in self._pins: + sim_pin = self._pins[pin_name] + # Update pin inversion if set differently by e.g. set_pin_pull + sim_pin.pin = pin + else: + sim_pin = SimulationPin(self._logger, pin, pin_name) + self._pins[pin_name] = sim_pin + + return sim_pin.bind_port() def get_pin(self, pin_name: str) -> io.SimulationPort: - return self._pins[pin_name] + return self._pins[pin_name].port def connect_pins(self, *pin_names: str): - self._jumpers.append(pin_names) + now_connected_existing_groups = { + self._jumpers[pin_name] + for pin_name in pin_names + if pin_name in self._jumpers + } + + # load-bearing list (don't use a generator expression here) + # we want to get all old pins to update and the update the dict + # from which we are getting them (no data racing plz) + self._jumpers.update([ + (pin_name, self._next_jumper_group) + for pin_name, old_group in self._jumpers.items() + if old_group in now_connected_existing_groups + ]) + self._jumpers.update( + (pin_name, self._next_jumper_group) for pin_name in pin_names + ) + self._next_jumper_group += 1 def add_in_pipe(self, in_stream, *, in_flush=C(0), fifo_depth=None, buffer_size=None) -> AbstractInPipe: @@ -193,10 +302,20 @@ def set_port_voltage(self, port: GlasgowPort, vio: GlasgowVio): pass def set_pin_pull(self, pin: GlasgowPin, state: PullState): - pass # TODO: record pull state? + # NOTE: Currently only works with pins connected to each other via a jumper + # Implemented below in jumper elaboration + pin_name = pin.loc_name + if pin_name not in self._pins: + self._pins[pin_name] = SimulationPin(self._logger, pin, pin_name) + self._pins[pin_name].pull_state = state async def configure_ports(self): - pass # TODO: log and use pull state for default pin state? + if self.__context is not None: + raise NotImplementedError( + "Runtime modification of ports has not yet been implemented for simulations" + ) + + pass @property def _context(self): @@ -210,22 +329,42 @@ def run(self, fn, *, vcd_file=None, gtkw_file=None): dummy = Signal() m.d.sync += dummy.eq(0) # make sure the domain exists - for jumper in self._jumpers: + jumped_pin_groups: dict[int, list[str]] = {} + for pin_name, group in self._jumpers.items(): + if group not in jumped_pin_groups: + jumped_pin_groups[group] = [] + jumped_pin_groups[group].append(pin_name) + + for jumper in jumped_pin_groups.values(): net = Signal(name=f"jumper_{'_'.join(jumper)}") pins = [self._pins[name] for name in jumper] + pull_state = PullState.Float for pin in pins: - m.d.comb += pin.i.eq(net) - with m.If(pin.oe): - m.d.comb += net.eq(pin.o) + pull_state = pull_state.combine(pin.effective_pull_state) + + m.d.comb += pin.port.i.eq(net) + with m.If(pin.port.oe): + m.d.comb += net.eq(pin.port.o) + + any_zero = Cat((pin.port.o == 0) & pin.port.oe for pin in pins).any() + any_one = Cat((pin.port.o == 1) & pin.port.oe for pin in pins).any() + m.d.comb += Assert( - sum(Cat(pin.oe for pin in pins)) <= 1, + ~(any_zero & any_one), Format( f"electrical contention on a jumper: " - f"{' '.join(f'{name}.oe={{}}' for name in jumper)}", - *(self._pins[name].oe for name in jumper) - ) + f"{' '.join(f'{name}(oe={{}}, o={{}})' for name in jumper)}", + *itertools.chain.from_iterable( + (self._pins[name].port.oe, self._pins[name].port.o) + for name in jumper + ), + ), ) + if pull_state != PullState.Float: + with m.If(Cat(pin.port.oe for pin in pins) == 0): + m.d.comb += net.eq(pull_state.to_bit()) + for elaboratable, name in self._modules: m.submodules[name] = elaboratable diff --git a/software/glasgow/simulation/test.py b/software/glasgow/simulation/test.py new file mode 100644 index 000000000..f8e2b5116 --- /dev/null +++ b/software/glasgow/simulation/test.py @@ -0,0 +1,255 @@ +import functools +import logging +import sys +from typing import Any +from collections.abc import Awaitable, Callable, Mapping +import unittest + +from amaranth import * +from amaranth.lib import io +from amaranth.sim.pysim import TestbenchContext + +from glasgow.simulation.assembly import SimulationAssembly +from glasgow.abstract import PullState + +logger = logging.getLogger(__name__) +logger.level = logging.TRACE + +stream_handler = logging.StreamHandler(sys.stderr) +logger.addHandler(stream_handler) + + +def simulation_assembly_test( + prepare: Callable[[Any, SimulationAssembly], Any] | None = None, +): + + def decorator( + case: Callable[ + [Any, SimulationAssembly, TestbenchContext, Any], Awaitable[None] + ], + ): + @functools.wraps(case) + def wrapper(self): + assembly = SimulationAssembly() + prepare_res = None + if prepare is not None: + prepare_res = prepare(self, assembly) + + async def launch(ctx: TestbenchContext): + await case(self, assembly, ctx, prepare_res) + + assembly.run(launch) + + return wrapper + + return decorator + + +def _jump_pins( + *jumped_pin_groups: list[str], + pulls: Mapping[str, PullState] = {}, +) -> Callable[[Any, SimulationAssembly], dict[str, io.Buffer]]: + def prepare(self, assembly: SimulationAssembly) -> dict[str, io.Buffer]: + buffers = {} + m = Module() + + for pin in {pin for group in jumped_pin_groups for pin in group}: + pin_without_inv = pin.rstrip("#") + port = assembly.add_port(pin, pin_without_inv) + buffers[pin_without_inv] = buffer = io.Buffer("io", port) + m.submodules[f"io_{pin_without_inv}"] = buffer + + for group in jumped_pin_groups: + assembly.connect_pins(*(p.rstrip("#") for p in group)) + assembly.add_submodule(m) + + assembly.use_pulls(pulls) + + return buffers + + return prepare + + +class SimulationAssemblyTestCase(unittest.TestCase): + + @simulation_assembly_test(prepare=_jump_pins(["A0", "B0"])) + async def test_jumper( + self, + assembly: SimulationAssembly, + ctx: TestbenchContext, + buffers: dict[str, io.Buffer], + ): + a0 = buffers["A0"] + b0 = buffers["B0"] + + ctx.set(a0.oe, 1) + ctx.set(a0.o, 0) + + await ctx.tick() + + self.assertEqual(ctx.get(b0.i), 0) + + ctx.set(a0.o, 1) + + await ctx.tick() + + self.assertEqual(ctx.get(b0.i), 1) + + @simulation_assembly_test(prepare=_jump_pins(["A0#", "B0"])) + async def test_jumper_pin_invert( + self, + assembly: SimulationAssembly, + ctx: TestbenchContext, + buffers: dict[str, io.Buffer], + ): + a0 = buffers["A0"] + b0 = buffers["B0"] + + ctx.set(a0.oe, 1) + ctx.set(a0.o, 0) + + await ctx.tick() + + self.assertEqual(ctx.get(b0.i), 1) + + ctx.set(a0.o, 1) + + await ctx.tick() + + self.assertEqual(ctx.get(b0.i), 0) + + @simulation_assembly_test(prepare=_jump_pins(["A0", "A1"], ["A1", "A2"])) + async def test_jumper_transitivity( + self, + assembly: SimulationAssembly, + ctx: TestbenchContext, + buffers: dict[str, io.Buffer], + ): + a0 = buffers["A0"] + a2 = buffers["A2"] + + ctx.set(a0.oe, 1) + ctx.set(a0.o, 0) + + await ctx.tick() + + self.assertEqual(ctx.get(a2.i), 0) + + ctx.set(a0.o, 1) + + await ctx.tick() + + self.assertEqual(ctx.get(a2.i), 1) + + @simulation_assembly_test( + prepare=_jump_pins(["A0", "B0"], pulls={"A0": PullState.High}) + ) + async def test_jumper_pull_up( + self, + assembly: SimulationAssembly, + ctx: TestbenchContext, + buffers: dict[str, io.Buffer], + ): + a0 = buffers["A0"] + b0 = buffers["B0"] + + await ctx.tick() + + self.assertEqual(ctx.get(a0.i), 1) + self.assertEqual(ctx.get(b0.i), 1) + + ctx.set(a0.oe, 1) + ctx.set(a0.o, 0) + + await ctx.tick() + + self.assertEqual(ctx.get(b0.i), 0) + + ctx.set(a0.oe, 0) + + await ctx.tick() + + self.assertEqual(ctx.get(a0.i), 1) + self.assertEqual(ctx.get(b0.i), 1) + + ctx.set(b0.oe, 1) + ctx.set(b0.o, 0) + + await ctx.tick() + + self.assertEqual(ctx.get(a0.i), 0) + + @simulation_assembly_test( + prepare=_jump_pins( + ["A0", "A1", "A2"], + pulls={"A0": PullState.Low, "A1": PullState.Low, "A2": PullState.Float}, + ) + ) + async def test_jumper_non_conflicting_pulls( + self, + assembly: SimulationAssembly, + ctx: TestbenchContext, + buffers: dict[str, io.Buffer], + ): + await ctx.tick() + + def test_jumper_conflicting_pulls(self): + assembly = SimulationAssembly() + assembly.add_port("A0", "A0") + assembly.add_port("B0", "B0") + assembly.use_pulls({"A0": PullState.High, "B0": PullState.Low}) + + async def tb(ctx: TestbenchContext): + self.assertFalse(True, "Must not be able to run with conflicting pull-ups") + + with self.assertRaisesRegex(ValueError, r"\bconflict"): + assembly.connect_pins("A0", "B0") + assembly.run(tb) + + def test_jumper_contention(self): + assembly = SimulationAssembly() + port_a0 = assembly.add_port("A0", "A0") + port_b0 = assembly.add_port("B0", "B0") + + m = Module() + + a0 = io.Buffer("io", port_a0) + b0 = io.Buffer("io", port_b0) + + m.submodules["io_A0"] = a0 + m.submodules["io_B0"] = b0 + + assembly.add_submodule(m) + + assembly.connect_pins("A0", "B0") + + end_of_allowed_reached = False + + async def tb( + ctx: TestbenchContext, + ): + # get the variables from outside of this function + nonlocal end_of_allowed_reached + nonlocal a0 + nonlocal b0 + + ctx.set(a0.oe, 1) + ctx.set(a0.o, 0) + ctx.set(b0.oe, 1) + ctx.set(b0.o, 0) + + await ctx.tick() + + end_of_allowed_reached = True + + ctx.set(a0.o, 1) + + await ctx.tick() + + with self.assertRaisesRegex(AssertionError, r"\bcontention"): + assembly.run(tb) + + self.assertTrue( + end_of_allowed_reached, + "Should be able run with two pin outputs set to the same value", + )