diff --git a/software/glasgow/access/direct/demultiplexer.py b/software/glasgow/access/direct/demultiplexer.py index eebc0967d..76f1320c3 100644 --- a/software/glasgow/access/direct/demultiplexer.py +++ b/software/glasgow/access/direct/demultiplexer.py @@ -151,7 +151,9 @@ async def claim_interface(self, applet, mux_interface, args, pull_low=set(), pul class DirectDemultiplexerInterface(AccessDemultiplexerInterface): def __init__(self, device, applet, mux_interface, - read_buffer_size=None, write_buffer_size=None): + read_buffer_size=None, write_buffer_size=None, + read_packets_per_xfer=_packets_per_xfer, read_xfers_per_queue=_xfers_per_queue, + write_packets_per_xfer=_packets_per_xfer, write_xfers_per_queue=_xfers_per_queue): super().__init__(device, applet) self._write_buffer_size = write_buffer_size @@ -162,6 +164,11 @@ def __init__(self, device, applet, mux_interface, self._pipe_num = mux_interface._pipe_num self._addr_reset = mux_interface._addr_reset + self.read_packets_per_xfer = read_packets_per_xfer + self.read_xfers_per_queue = read_xfers_per_queue + self.write_packets_per_xfer = write_packets_per_xfer + self.write_xfers_per_queue = write_xfers_per_queue + config_num = self.device.usb_handle.getConfiguration() for config in self.device.usb_handle.getDevice().iterConfigurations(): if config.getConfigurationValue() == config_num: @@ -214,7 +221,7 @@ async def reset(self): # streaming data, there are no overflows. (This is perhaps not the best way to implement # an applet, but we can support it easily enough, and it avoids surprise overflows.) self.logger.trace("FIFO: pipelining reads") - for _ in range(_xfers_per_queue): + for _ in range(self.read_xfers_per_queue): self._in_tasks.submit(self._in_task()) # Give the IN tasks a chance to submit their transfers before deasserting reset. await asyncio.sleep(0) @@ -229,7 +236,7 @@ async def _in_task(self): self.logger.trace("FIFO: read pushback") await self._in_pushback.wait() - size = self._in_packet_size * _packets_per_xfer + size = self._in_packet_size * self.read_packets_per_xfer data = await self.device.bulk_read(self._endpoint_in, size) self._in_buffer.write(data) @@ -277,7 +284,7 @@ async def read(self, length=None, *, flush=True): def _out_slice(self): # Fast path: read as much contiguous data as possible, up to our transfer size. - size = self._out_packet_size * _packets_per_xfer + size = self._out_packet_size * self.write_packets_per_xfer data = self._out_buffer.read(size) if len(data) < self._out_packet_size: @@ -293,7 +300,7 @@ def _out_slice(self): @property def _out_threshold(self): - out_xfer_size = self._out_packet_size * _packets_per_xfer + out_xfer_size = self._out_packet_size * self.write_packets_per_xfer if self._write_buffer_size is None: return out_xfer_size else: @@ -330,7 +337,7 @@ async def write(self, data): # The write scheduling algorithm attempts to satisfy several partially conflicting goals: # * We want to schedule writes as early as possible, because this reduces buffer bloat and # can dramatically improve responsiveness of the system. - # * We want to schedule writes that are as large as possible, up to _packets_per_xfer, + # * We want to schedule writes that are as large as possible, up to write_packets_per_xfer, # because this reduces CPU utilization and improves latency. # * We never want to automatically schedule writes smaller than _out_packet_size, # because they occupy a whole microframe anyway. @@ -338,17 +345,17 @@ async def write(self, data): # We use an approach that performs well when fed with a steady sequence of very large # FIFO chunks, yet scales down to packet-size and byte-size FIFO chunks as well. # * We only submit a write automatically once the buffer level crosses the threshold of - # `_out_packet_size * _packets_per_xfer`. In this case, _slice_packet always returns - # `_out_packet_size * n` bytes, where n is between 1 and _packet_per_xfer. + # `_out_packet_size * write_packets_per_xfer`. In this case, _slice_packet always returns + # `_out_packet_size * n` bytes, where n is between 1 and write_packets_per_xfer. # * We submit enough writes that there is at least one write for each transfer worth - # of data in the buffer, up to _xfers_per_queue outstanding writes. + # of data in the buffer, up to write_xfers_per_queue outstanding writes. # * We submit another write once one finishes, if the buffer level is still above # the threshold, even if no more explicit write calls are performed. # - # This provides predictable write behavior; only _packets_per_xfer packet writes are + # This provides predictable write behavior; only write_packets_per_xfer packet writes are # automatically submitted, and only the minimum necessary number of tasks are scheduled on # calls to `write`. - while len(self._out_tasks) < _xfers_per_queue and \ + while len(self._out_tasks) < self.write_xfers_per_queue and \ len(self._out_buffer) >= self._out_threshold: self._out_tasks.submit(self._out_task(self._out_slice())) @@ -357,16 +364,16 @@ async def flush(self, wait=True): # First, we ensure we can submit one more task. (There can be more tasks than # _xfers_per_queue because a task may spawn another one just before it terminates.) - if len(self._out_tasks) >= _xfers_per_queue: + if len(self._out_tasks) >= self.write_xfers_per_queue: self._out_stalls += 1 - while len(self._out_tasks) >= _xfers_per_queue: + while len(self._out_tasks) >= self.write_xfers_per_queue: await self._out_tasks.wait_one() - # At this point, the buffer can contain at most _packets_per_xfer packets worth + # At this point, the buffer can contain at most write_packets_per_xfer packets worth # of data, as anything beyond that crosses the threshold of automatic submission. # So, we can simply submit the rest of data, which by definition fits into a single # transfer. - assert len(self._out_buffer) <= self._out_packet_size * _packets_per_xfer + assert len(self._out_buffer) <= self._out_packet_size * self.write_packets_per_xfer if self._out_buffer: data = bytearray() while self._out_buffer: diff --git a/software/glasgow/applet/internal/fixed_throughput/__init__.py b/software/glasgow/applet/internal/fixed_throughput/__init__.py new file mode 100644 index 000000000..932b2e0b2 --- /dev/null +++ b/software/glasgow/applet/internal/fixed_throughput/__init__.py @@ -0,0 +1,119 @@ +import logging +import asyncio +import argparse +import time +from amaranth import * + +from ... import * + + +class FixedThroughputSubtarget(Elaboratable): + def __init__(self, rate_reg, in_fifo): + self.rate_reg = rate_reg + self.in_fifo = in_fifo + + def elaborate(self, platform): + m = Module() + + data_rate_count = Signal(8) + data_valid = Signal() + data = Signal(8) + overflow = Signal() + + m.d.comb += data.eq(Cat(overflow, Const(0, 7))) + + # delta sigma ish + m.d.sync += Cat(data_rate_count, data_valid).eq(data_rate_count + self.rate_reg + 1) + + with m.If(data_valid): + with m.If(self.in_fifo.w_rdy): + m.d.comb += [ + self.in_fifo.w_data.eq(data), + self.in_fifo.w_en.eq(1), + ] + with m.Else(): + m.d.sync += overflow.eq(1) + + return m + + +class FixedThroughputApplet(GlasgowApplet): + logger = logging.getLogger(__name__) + help = "evaluate fixed read throughput performance" + description = """ + Evaluate fixed read throughput performance and check for in FIFO overflows + """ + + @classmethod + def add_build_arguments(cls, parser, access): + pass + + def build(self, target, args): + self.mux_interface = iface = \ + target.multiplexer.claim_interface(self, args=None, throttle="none") + + rate_reg, self.__addr_rate = target.registers.add_rw(8) + + subtarget = iface.add_subtarget( + FixedThroughputSubtarget( + rate_reg, + iface.get_in_fifo(auto_flush=False) + ) + ) + + @classmethod + def add_run_arguments(cls, parser, access): + parser.add_argument( + "--rpkts", metavar="READ-PACKETS", type=int, + help="How many packets per read transfer") + + parser.add_argument( + "--rxfers", metavar="READ-TRANSFERS", type=int, + help="How many read transfers to have active at a time") + + async def run(self, device, args): + kwargs = {} + if args.rpkts is not None: + kwargs["read_packets_per_xfer"] = args.rpkts + if args.rxfers is not None: + kwargs["read_xfers_per_queue"] = args.rxfers + + return await device.demultiplexer.claim_interface(self, self.mux_interface, args=None, **kwargs) + + @classmethod + def add_interact_arguments(cls, parser): + parser.add_argument( + "rate_mbps", metavar="rate", type=float, + help="data rate in Mbps") + + async def interact(self, device, args, iface): + data_rate = round(args.rate_mbps/8 * 1e6 / 48e6 * 256) - 1 + await device.write_register(self.__addr_rate, data_rate) + await iface.reset() + + try: + count = 0 + begin = time.time() + overflow = False + + while not overflow: + data = await iface.read() + data_list = list(data) + + count += len(data_list) + overflow = 1 in data_list + + finally: + duration = time.time() - begin + mbps = count*8 / duration / 1e6 + expected_mbps = (data_rate+1)/256 * 8 * 48 + + print(f"{overflow=}") + print(f"Elapsed: {duration}") + print(f"Mbps: {mbps}") + print(f"Expected Mbps: {expected_mbps}") + + @classmethod + def tests(cls): + from . import test + return test.FixedThroughputAppletTestCase diff --git a/software/glasgow/applet/internal/fixed_throughput/test.py b/software/glasgow/applet/internal/fixed_throughput/test.py new file mode 100644 index 000000000..2192ec654 --- /dev/null +++ b/software/glasgow/applet/internal/fixed_throughput/test.py @@ -0,0 +1,8 @@ +from ... import * +from . import FixedThroughputApplet + + +class FixedThroughputAppletTestCase(GlasgowAppletTestCase, applet=FixedThroughputApplet): + @synthesis_test + def test_build(self): + self.assertBuilds() diff --git a/software/pyproject.toml b/software/pyproject.toml index edcad145e..bf120cc5b 100644 --- a/software/pyproject.toml +++ b/software/pyproject.toml @@ -78,6 +78,7 @@ glasgow = "glasgow.cli:run_main" [project.entry-points."glasgow.applet"] selftest = "glasgow.applet.internal.selftest:SelfTestApplet" benchmark = "glasgow.applet.internal.benchmark:BenchmarkApplet" +fixed-throughput = "glasgow.applet.internal.fixed_throughput:FixedThroughputApplet" analyzer = "glasgow.applet.interface.analyzer:AnalyzerApplet" uart = "glasgow.applet.interface.uart:UARTApplet"