Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions internal/ingest/handler_units.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,26 @@ func (p *Pipeline) handleUnitEvent(eventType string, payload []byte) error {
}
}

// Call-alert events: upsert target unit and store target_unit in metadata_json
effectiveTargetUnitTag := data.TargetUnitAlphaTag
if eventType == "call_alert" && data.TargetUnit > 0 {
if dbTag, err := p.db.UpsertUnit(ctx, identity.SystemID, data.TargetUnit,
data.TargetUnitAlphaTag, "call_alert_target", ts, 0,
); err != nil {
p.log.Warn().Err(err).Int("target_unit", data.TargetUnit).Msg("failed to upsert target unit")
} else if dbTag != "" {
effectiveTargetUnitTag = dbTag
}

meta := map[string]any{
"target_unit": data.TargetUnit,
"target_unit_alpha_tag": effectiveTargetUnitTag,
}
if b, err := json.Marshal(meta); err == nil {
row.MetadataJSON = b
}
}

if err := p.db.InsertUnitEvent(ctx, row); err != nil {
return fmt.Errorf("insert unit event: %w", err)
}
Expand All @@ -184,6 +204,10 @@ func (p *Pipeline) handleUnitEvent(eventType string, payload []byte) error {
ssePayload["signaling_type"] = data.SignalingType
ssePayload["signal_type"] = data.SignalType
}
if eventType == "call_alert" {
ssePayload["target_unit"] = data.TargetUnit
ssePayload["target_unit_alpha_tag"] = effectiveTargetUnitTag
}

p.PublishEvent(EventData{
Type: "unit_event",
Expand Down
192 changes: 158 additions & 34 deletions internal/ingest/handler_units_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,45 +113,169 @@ func TestParseUnitEventData(t *testing.T) {
t.Error("expected error for invalid JSON")
}
})

t.Run("call_alert_event", func(t *testing.T) {
// Payload format produced by the mqtt_status plugin's send_json():
// {"type":"call_alert","call_alert":{...},"timestamp":...,"instance_id":"..."}
payload := []byte(`{
"type": "call_alert",
"call_alert": {
"sys_num": 1,
"sys_name": "pscsite4",
"unit": 4810011,
"unit_alpha_tag": "",
"target_unit": 4811289,
"target_unit_alpha_tag": "OFFICER JONES"
},
"timestamp": 1712789072,
"instance_id": "trunk-recorder"
}`)
env, data, err := parseUnitEventData(payload, "call_alert")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if env.InstanceID != "trunk-recorder" {
t.Errorf("InstanceID = %q, want %q", env.InstanceID, "trunk-recorder")
}
if env.Timestamp != 1712789072 {
t.Errorf("Timestamp = %d, want 1712789072", env.Timestamp)
}
if data.Unit != 4810011 {
t.Errorf("Unit = %d, want 4810011", data.Unit)
}
if data.SysName != "pscsite4" {
t.Errorf("SysName = %q, want %q", data.SysName, "pscsite4")
}
if data.TargetUnit != 4811289 {
t.Errorf("TargetUnit = %d, want 4811289", data.TargetUnit)
}
if data.TargetUnitAlphaTag != "OFFICER JONES" {
t.Errorf("TargetUnitAlphaTag = %q, want %q", data.TargetUnitAlphaTag, "OFFICER JONES")
}
// call_alert has no talkgroup
if data.Talkgroup != 0 {
t.Errorf("Talkgroup = %d, want 0 (no talkgroup on call_alert)", data.Talkgroup)
}
})

t.Run("call_alert_no_alpha_tags", func(t *testing.T) {
// Typical real-world payload where unit tags are not configured
payload := []byte(`{
"type": "call_alert",
"call_alert": {
"sys_num": 1,
"sys_name": "pscsite4",
"unit": 4810011,
"unit_alpha_tag": "",
"target_unit": 4811292,
"target_unit_alpha_tag": ""
},
"timestamp": 1712789100,
"instance_id": "trunk-recorder"
}`)
_, data, err := parseUnitEventData(payload, "call_alert")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if data.TargetUnit != 4811292 {
t.Errorf("TargetUnit = %d, want 4811292", data.TargetUnit)
}
if data.TargetUnitAlphaTag != "" {
t.Errorf("TargetUnitAlphaTag = %q, want empty", data.TargetUnitAlphaTag)
}
})
}

// ── round-trip: route parsing + payload parsing ─────────────────────

func TestUnitEventParseRoundTrip(t *testing.T) {
topic := "trengine/units/butco/join"
payload, _ := json.Marshal(map[string]any{
"type": "unit_event",
"timestamp": 1700000000,
"instance_id": "tr-1",
"join": map[string]any{
"sys_name": "butco",
"unit": 12345,
"talkgroup": 100,
},
t.Run("join", func(t *testing.T) {
topic := "trengine/units/butco/join"
payload, _ := json.Marshal(map[string]any{
"type": "unit_event",
"timestamp": 1700000000,
"instance_id": "tr-1",
"join": map[string]any{
"sys_name": "butco",
"unit": 12345,
"talkgroup": 100,
},
})

route := ParseTopic(topic)
if route == nil {
t.Fatal("ParseTopic returned nil")
}
if route.Handler != "unit_event" {
t.Fatalf("Handler = %q, want %q", route.Handler, "unit_event")
}
if route.EventType != "join" {
t.Fatalf("EventType = %q, want %q", route.EventType, "join")
}

env, data, err := parseUnitEventData(payload, route.EventType)
if err != nil {
t.Fatalf("parseUnitEventData: %v", err)
}
if env.InstanceID != "tr-1" {
t.Errorf("InstanceID = %q, want %q", env.InstanceID, "tr-1")
}
if data.Unit != 12345 {
t.Errorf("Unit = %d, want 12345", data.Unit)
}
if data.Talkgroup != 100 {
t.Errorf("Talkgroup = %d, want 100", data.Talkgroup)
}
})

route := ParseTopic(topic)
if route == nil {
t.Fatal("ParseTopic returned nil")
}
if route.Handler != "unit_event" {
t.Fatalf("Handler = %q, want %q", route.Handler, "unit_event")
}
if route.EventType != "join" {
t.Fatalf("EventType = %q, want %q", route.EventType, "join")
}

env, data, err := parseUnitEventData(payload, route.EventType)
if err != nil {
t.Fatalf("parseUnitEventData: %v", err)
}
if env.InstanceID != "tr-1" {
t.Errorf("InstanceID = %q, want %q", env.InstanceID, "tr-1")
}
if data.Unit != 12345 {
t.Errorf("Unit = %d, want 12345", data.Unit)
}
if data.Talkgroup != 100 {
t.Errorf("Talkgroup = %d, want 100", data.Talkgroup)
}
t.Run("call_alert", func(t *testing.T) {
topic := "tr/units/pscsite4/call_alert"
payload, _ := json.Marshal(map[string]any{
"type": "call_alert",
"timestamp": 1712789072,
"instance_id": "trunk-recorder",
"call_alert": map[string]any{
"sys_num": 1,
"sys_name": "pscsite4",
"unit": 4810011,
"unit_alpha_tag": "",
"target_unit": 4811289,
"target_unit_alpha_tag": "OFFICER JONES",
},
})

route := ParseTopic(topic)
if route == nil {
t.Fatal("ParseTopic returned nil for call_alert topic")
}
if route.Handler != "unit_event" {
t.Fatalf("Handler = %q, want unit_event", route.Handler)
}
if route.EventType != "call_alert" {
t.Fatalf("EventType = %q, want call_alert", route.EventType)
}
if route.SysName != "pscsite4" {
t.Fatalf("SysName = %q, want pscsite4", route.SysName)
}

env, data, err := parseUnitEventData(payload, route.EventType)
if err != nil {
t.Fatalf("parseUnitEventData: %v", err)
}
if env.InstanceID != "trunk-recorder" {
t.Errorf("InstanceID = %q, want trunk-recorder", env.InstanceID)
}
if data.Unit != 4810011 {
t.Errorf("Unit = %d, want 4810011", data.Unit)
}
if data.TargetUnit != 4811289 {
t.Errorf("TargetUnit = %d, want 4811289", data.TargetUnit)
}
if data.TargetUnitAlphaTag != "OFFICER JONES" {
t.Errorf("TargetUnitAlphaTag = %q, want OFFICER JONES", data.TargetUnitAlphaTag)
}
if data.Talkgroup != 0 {
t.Errorf("Talkgroup = %d, want 0", data.Talkgroup)
}
})
}
3 changes: 3 additions & 0 deletions internal/ingest/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ type UnitEventData struct {
// Signal-specific fields (from signal events)
SignalingType string `json:"signaling_type,omitempty"`
SignalType string `json:"signal_type,omitempty"`
// Call-alert-specific fields (from call_alert events)
TargetUnit int `json:"target_unit,omitempty"`
TargetUnitAlphaTag string `json:"target_unit_alpha_tag,omitempty"`
}

// UnitEventMsg wraps a unit event message. The event data is keyed by the event type.
Expand Down
8 changes: 4 additions & 4 deletions internal/ingest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import "strings"

// Route describes a parsed MQTT topic.
type Route struct {
Handler string // handler name: "status", "console", "systems", "system", "calls_active", "call_start", "call_end", "audio", "recorders", "recorder", "rates", "trunking_message", "unit_event" (includes signal)
Handler string // handler name: "status", "console", "systems", "system", "calls_active", "call_start", "call_end", "audio", "recorders", "recorder", "rates", "trunking_message", "unit_event" (includes signal, call_alert)
SysName string // set for unit event and trunking message topics
EventType string // set for unit events: "on", "off", "call", "end", "join", etc.
EventType string // set for unit events: "on", "off", "call", "end", "join", "call_alert", etc.
}

// ParseTopic maps an MQTT topic string to a Route.
Expand Down Expand Up @@ -36,7 +36,7 @@ type Route struct {
//
// Unit event topics ({unit_topic}/...):
//
// .../{sys_name}/{event_type} → unit_event
// .../{sys_name}/{event_type} → unit_event (on, off, call, end, join, location, ackresp, data, signal, call_alert)
func ParseTopic(topic string) *Route {
parts := strings.Split(topic, "/")
n := len(parts)
Expand Down Expand Up @@ -75,7 +75,7 @@ func ParseTopic(topic string) *Route {

// Unit events: .../{sys_name}/{event_type}
switch last {
case "on", "off", "call", "end", "join", "location", "ackresp", "data", "signal":
case "on", "off", "call", "end", "join", "location", "ackresp", "data", "signal", "call_alert":
if n >= 2 {
return &Route{Handler: "unit_event", SysName: parts[n-2], EventType: last}
}
Expand Down
1 change: 1 addition & 0 deletions internal/ingest/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestParseTopic(t *testing.T) {
{name: "unit_join", topic: "trengine/units/butco/join", want: &Route{Handler: "unit_event", SysName: "butco"}},
{name: "unit_ackresp", topic: "trengine/units/butco/ackresp", want: &Route{Handler: "unit_event", SysName: "butco"}},
{name: "unit_data", topic: "trengine/units/butco/data", want: &Route{Handler: "unit_event", SysName: "butco"}},
{name: "unit_call_alert", topic: "tr/units/pscsite4/call_alert", want: &Route{Handler: "unit_event", SysName: "pscsite4", EventType: "call_alert"}},

// Custom prefixes — router only cares about trailing segments
{name: "custom_prefix_feed", topic: "myradio/whatever/call_start", want: &Route{Handler: "call_start"}},
Expand Down
17 changes: 16 additions & 1 deletion openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3700,7 +3700,7 @@ components:

EventType:
type: string
enum: [on, off, join, call, end, data, ans_req, location, ackresp, signal]
enum: [on, off, join, call, end, data, ans_req, location, ackresp, signal, call_alert]
description: |
Unit event types from trunk-recorder MQTT:
- **on**: unit registration (radio powered on / entering system)
Expand All @@ -3713,6 +3713,7 @@ components:
- **location**: unit location update
- **ackresp**: acknowledge response
- **signal**: in-band signaling decode (MDC1200, FleetSync, STAR)
- **call_alert**: directed unit-to-unit page (P25 TSBK 0x1F); `target_unit` and `target_unit_alpha_tag` present in SSE payload and `metadata_json`

CallState:
type: string
Expand Down Expand Up @@ -4435,6 +4436,20 @@ components:
Only present in SSE payloads for `unit_event:signal` events.
example: normal

# Call-alert-specific fields (present only for event_type=call_alert, via SSE)
target_unit:
type: integer
description: |
Unit ID of the radio being paged (P25 Call Alert, TSBK 0x1F).
Only present in SSE payloads for `unit_event:call_alert` events.
example: 4811289
target_unit_alpha_tag:
type: string
description: |
Alpha tag for the paged unit, if known.
Only present in SSE payloads for `unit_event:call_alert` events.
example: "OFFICER JONES"

CallFrequency:
type: object
description: |
Expand Down
29 changes: 26 additions & 3 deletions web/events.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
.type-call_start { background: color-mix(in srgb, var(--info) 20%, transparent); color: var(--info); }
.type-call_end { background: color-mix(in srgb, var(--success) 20%, transparent); color: var(--success); }
.type-call_update { background: color-mix(in srgb, var(--cyan) 15%, transparent); color: var(--cyan); }
.type-call_alert { background: color-mix(in srgb, #cc2200 25%, transparent); color: #ff4422; font-weight: 700; }
.type-unit_event { background: color-mix(in srgb, var(--magenta) 20%, transparent); color: var(--magenta); }
.type-recorder_update { background: color-mix(in srgb, var(--warning) 15%, transparent); color: var(--warning); }
.type-rate_update { background: var(--bg-tile); color: var(--text-muted); }
Expand Down Expand Up @@ -146,6 +147,7 @@
<div class="filters">
<label><input type="checkbox" value="call_start" checked> call_start</label>
<label><input type="checkbox" value="call_end" checked> call_end</label>
<label><input type="checkbox" value="call_alert" checked> call_alert</label>
<label><input type="checkbox" value="unit_event" checked> unit_event</label>
<label><input type="checkbox" value="recorder_update" checked> recorder_update</label>
<label><input type="checkbox" value="rate_update"> rate_update</label>
Expand Down Expand Up @@ -173,7 +175,13 @@
const EVENT_TYPES = ['call_start', 'call_end', 'call_update', 'unit_event', 'recorder_update', 'rate_update', 'trunking_message', 'console'];

function getSelectedTypes() {
return Array.from(document.querySelectorAll('.filters input:checked')).map(cb => cb.value);
const checked = new Set(Array.from(document.querySelectorAll('.filters input:checked')).map(cb => cb.value));
// call_alert is a unit_event subtype — always subscribe to unit_event so we receive them,
// then filter client-side in addEvent based on the call_alert checkbox.
const ssTypes = new Set(checked);
if (checked.has('call_alert')) ssTypes.add('unit_event');
ssTypes.delete('call_alert');
return [...ssTypes];
}

function setStatus(state, text) {
Expand Down Expand Up @@ -204,6 +212,10 @@
` <span class="dim">dur=</span>${d.duration ? d.duration.toFixed(1) + 's' : '?'}` +
` <span class="dim">freq=</span>${(d.freq / 1e6).toFixed(4)}` +
(d.emergency ? ' <span class="emergency">EMERGENCY</span>' : '');
case 'call_alert':
return `<span class="val">${esc(d.unit_alpha_tag || 'Unit ' + d.unit_id)}</span>` +
` <span class="dim">→</span>` +
` <span class="val">${esc(d.target_unit_alpha_tag || 'Unit ' + d.target_unit)}</span>`;
case 'unit_event':
return `<span class="dim">${esc(d.event_type || '?')}</span>` +
` <span class="val">${esc(d.unit_alpha_tag || 'Unit ' + d.unit_id)}</span>` +
Expand Down Expand Up @@ -235,6 +247,17 @@
}

function addEvent(type, data) {
// Remap call_alert from unit_event subtype to its own display type
let displayType = type;
if (type === 'unit_event' && data.event_type === 'call_alert') {
displayType = 'call_alert';
}

// Client-side filter: check if displayType is enabled
const checked = new Set(Array.from(document.querySelectorAll('.filters input:checked')).map(cb => cb.value));
if (displayType === 'call_alert' && !checked.has('call_alert')) return;
if (displayType === 'unit_event' && !checked.has('unit_event')) return;

totalCount++;
eventCountEl.textContent = totalCount;

Expand All @@ -246,8 +269,8 @@

row.innerHTML =
`<span class="event-time">${time}</span>` +
`<span class="event-type type-${type}">${type.replace('_', ' ')}</span>` +
`<span class="event-detail">${formatDetail(type, data)}</span>`;
`<span class="event-type type-${displayType}">${displayType.replace(/_/g, ' ')}</span>` +
`<span class="event-detail">${formatDetail(displayType, data)}</span>`;

eventsEl.insertBefore(row, eventsEl.firstChild);

Expand Down
Loading