Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions software/glasgow/access/direct/demultiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ async def claim_interface(self, applet, mux_interface, args, pull_low=set(), pul

class DirectDemultiplexerInterface(AccessDemultiplexerInterface):
def __init__(self, device, applet, mux_interface,
read_buffer_size=None, write_buffer_size=None):
read_buffer_size=None, write_buffer_size=None,
read_packets_per_xfer=_packets_per_xfer, read_xfers_per_queue=_xfers_per_queue,
write_packets_per_xfer=_packets_per_xfer, write_xfers_per_queue=_xfers_per_queue):
super().__init__(device, applet)

self._write_buffer_size = write_buffer_size
Expand All @@ -162,6 +164,11 @@ def __init__(self, device, applet, mux_interface,
self._pipe_num = mux_interface._pipe_num
self._addr_reset = mux_interface._addr_reset

self.read_packets_per_xfer = read_packets_per_xfer
self.read_xfers_per_queue = read_xfers_per_queue
self.write_packets_per_xfer = write_packets_per_xfer
self.write_xfers_per_queue = write_xfers_per_queue

config_num = self.device.usb_handle.getConfiguration()
for config in self.device.usb_handle.getDevice().iterConfigurations():
if config.getConfigurationValue() == config_num:
Expand Down Expand Up @@ -214,7 +221,7 @@ async def reset(self):
# streaming data, there are no overflows. (This is perhaps not the best way to implement
# an applet, but we can support it easily enough, and it avoids surprise overflows.)
self.logger.trace("FIFO: pipelining reads")
for _ in range(_xfers_per_queue):
for _ in range(self.read_xfers_per_queue):
self._in_tasks.submit(self._in_task())
# Give the IN tasks a chance to submit their transfers before deasserting reset.
await asyncio.sleep(0)
Expand All @@ -229,7 +236,7 @@ async def _in_task(self):
self.logger.trace("FIFO: read pushback")
await self._in_pushback.wait()

size = self._in_packet_size * _packets_per_xfer
size = self._in_packet_size * self.read_packets_per_xfer
data = await self.device.bulk_read(self._endpoint_in, size)
self._in_buffer.write(data)

Expand Down Expand Up @@ -277,7 +284,7 @@ async def read(self, length=None, *, flush=True):

def _out_slice(self):
# Fast path: read as much contiguous data as possible, up to our transfer size.
size = self._out_packet_size * _packets_per_xfer
size = self._out_packet_size * self.write_packets_per_xfer
data = self._out_buffer.read(size)

if len(data) < self._out_packet_size:
Expand All @@ -293,7 +300,7 @@ def _out_slice(self):

@property
def _out_threshold(self):
out_xfer_size = self._out_packet_size * _packets_per_xfer
out_xfer_size = self._out_packet_size * self.write_packets_per_xfer
if self._write_buffer_size is None:
return out_xfer_size
else:
Expand Down Expand Up @@ -330,25 +337,25 @@ async def write(self, data):
# The write scheduling algorithm attempts to satisfy several partially conflicting goals:
# * We want to schedule writes as early as possible, because this reduces buffer bloat and
# can dramatically improve responsiveness of the system.
# * We want to schedule writes that are as large as possible, up to _packets_per_xfer,
# * We want to schedule writes that are as large as possible, up to write_packets_per_xfer,
# because this reduces CPU utilization and improves latency.
# * We never want to automatically schedule writes smaller than _out_packet_size,
# because they occupy a whole microframe anyway.
#
# We use an approach that performs well when fed with a steady sequence of very large
# FIFO chunks, yet scales down to packet-size and byte-size FIFO chunks as well.
# * We only submit a write automatically once the buffer level crosses the threshold of
# `_out_packet_size * _packets_per_xfer`. In this case, _slice_packet always returns
# `_out_packet_size * n` bytes, where n is between 1 and _packet_per_xfer.
# `_out_packet_size * write_packets_per_xfer`. In this case, _slice_packet always returns
# `_out_packet_size * n` bytes, where n is between 1 and write_packets_per_xfer.
# * We submit enough writes that there is at least one write for each transfer worth
# of data in the buffer, up to _xfers_per_queue outstanding writes.
# of data in the buffer, up to write_xfers_per_queue outstanding writes.
# * We submit another write once one finishes, if the buffer level is still above
# the threshold, even if no more explicit write calls are performed.
#
# This provides predictable write behavior; only _packets_per_xfer packet writes are
# This provides predictable write behavior; only write_packets_per_xfer packet writes are
# automatically submitted, and only the minimum necessary number of tasks are scheduled on
# calls to `write`.
while len(self._out_tasks) < _xfers_per_queue and \
while len(self._out_tasks) < self.write_xfers_per_queue and \
len(self._out_buffer) >= self._out_threshold:
self._out_tasks.submit(self._out_task(self._out_slice()))

Expand All @@ -357,16 +364,16 @@ async def flush(self, wait=True):

# First, we ensure we can submit one more task. (There can be more tasks than
# _xfers_per_queue because a task may spawn another one just before it terminates.)
if len(self._out_tasks) >= _xfers_per_queue:
if len(self._out_tasks) >= self.write_xfers_per_queue:
self._out_stalls += 1
while len(self._out_tasks) >= _xfers_per_queue:
while len(self._out_tasks) >= self.write_xfers_per_queue:
await self._out_tasks.wait_one()

# At this point, the buffer can contain at most _packets_per_xfer packets worth
# At this point, the buffer can contain at most write_packets_per_xfer packets worth
# of data, as anything beyond that crosses the threshold of automatic submission.
# So, we can simply submit the rest of data, which by definition fits into a single
# transfer.
assert len(self._out_buffer) <= self._out_packet_size * _packets_per_xfer
assert len(self._out_buffer) <= self._out_packet_size * self.write_packets_per_xfer
if self._out_buffer:
data = bytearray()
while self._out_buffer:
Expand Down
119 changes: 119 additions & 0 deletions software/glasgow/applet/internal/fixed_throughput/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import logging
import asyncio
import argparse
import time
from amaranth import *

from ... import *


class FixedThroughputSubtarget(Elaboratable):
def __init__(self, rate_reg, in_fifo):
self.rate_reg = rate_reg
self.in_fifo = in_fifo

def elaborate(self, platform):
m = Module()

data_rate_count = Signal(8)
data_valid = Signal()
data = Signal(8)
overflow = Signal()

m.d.comb += data.eq(Cat(overflow, Const(0, 7)))

# delta sigma ish
m.d.sync += Cat(data_rate_count, data_valid).eq(data_rate_count + self.rate_reg + 1)

with m.If(data_valid):
with m.If(self.in_fifo.w_rdy):
m.d.comb += [
self.in_fifo.w_data.eq(data),
self.in_fifo.w_en.eq(1),
]
with m.Else():
m.d.sync += overflow.eq(1)

return m


class FixedThroughputApplet(GlasgowApplet):
logger = logging.getLogger(__name__)
help = "evaluate fixed read throughput performance"
description = """
Evaluate fixed read throughput performance and check for in FIFO overflows
"""

@classmethod
def add_build_arguments(cls, parser, access):
pass

def build(self, target, args):
self.mux_interface = iface = \
target.multiplexer.claim_interface(self, args=None, throttle="none")

rate_reg, self.__addr_rate = target.registers.add_rw(8)

subtarget = iface.add_subtarget(
FixedThroughputSubtarget(
rate_reg,
iface.get_in_fifo(auto_flush=False)
)
)

@classmethod
def add_run_arguments(cls, parser, access):
parser.add_argument(
"--rpkts", metavar="READ-PACKETS", type=int,
help="How many packets per read transfer")

parser.add_argument(
"--rxfers", metavar="READ-TRANSFERS", type=int,
help="How many read transfers to have active at a time")

async def run(self, device, args):
kwargs = {}
if args.rpkts is not None:
kwargs["read_packets_per_xfer"] = args.rpkts
if args.rxfers is not None:
kwargs["read_xfers_per_queue"] = args.rxfers

return await device.demultiplexer.claim_interface(self, self.mux_interface, args=None, **kwargs)

@classmethod
def add_interact_arguments(cls, parser):
parser.add_argument(
"rate_mbps", metavar="rate", type=float,
help="data rate in Mbps")

async def interact(self, device, args, iface):
data_rate = round(args.rate_mbps/8 * 1e6 / 48e6 * 256) - 1
await device.write_register(self.__addr_rate, data_rate)
await iface.reset()

try:
count = 0
begin = time.time()
overflow = False

while not overflow:
data = await iface.read()
data_list = list(data)

count += len(data_list)
overflow = 1 in data_list

finally:
duration = time.time() - begin
mbps = count*8 / duration / 1e6
expected_mbps = (data_rate+1)/256 * 8 * 48

print(f"{overflow=}")
print(f"Elapsed: {duration}")
print(f"Mbps: {mbps}")
print(f"Expected Mbps: {expected_mbps}")

@classmethod
def tests(cls):
from . import test
return test.FixedThroughputAppletTestCase
8 changes: 8 additions & 0 deletions software/glasgow/applet/internal/fixed_throughput/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from ... import *
from . import FixedThroughputApplet


class FixedThroughputAppletTestCase(GlasgowAppletTestCase, applet=FixedThroughputApplet):
@synthesis_test
def test_build(self):
self.assertBuilds()
1 change: 1 addition & 0 deletions software/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ glasgow = "glasgow.cli:run_main"
[project.entry-points."glasgow.applet"]
selftest = "glasgow.applet.internal.selftest:SelfTestApplet"
benchmark = "glasgow.applet.internal.benchmark:BenchmarkApplet"
fixed-throughput = "glasgow.applet.internal.fixed_throughput:FixedThroughputApplet"

analyzer = "glasgow.applet.interface.analyzer:AnalyzerApplet"
uart = "glasgow.applet.interface.uart:UARTApplet"
Expand Down