Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 17 additions & 51 deletions internal/event/event.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package event

import (
"context"
"errors"
"fmt"
"github.com/icinga/icinga-go-library/database"
baseEv "github.com/icinga/icinga-go-library/notifications/event"
"github.com/icinga/icinga-go-library/types"
"github.com/jmoiron/sqlx"
"go.uber.org/zap/zapcore"
"net/url"
"strings"
"time"
Expand All @@ -29,7 +27,6 @@ var ErrSuperfluousMuteUnmuteEvent = errors.New("ignoring superfluous (un)mute ev
type Event struct {
Time time.Time `json:"-"`
SourceId int64 `json:"-"`
ID int64 `json:"-"`

baseEv.Event `json:",inline"`
}
Expand Down Expand Up @@ -101,52 +98,21 @@ func (e *Event) SetMute(muted bool, reason string) {
e.MuteReason = reason
}

func (e *Event) String() string {
return fmt.Sprintf("[time=%s type=%q severity=%s]", e.Time, e.Type, e.Severity.String())
}

// Sync transforms this event to *event.EventRow and synchronises with the database.
func (e *Event) Sync(ctx context.Context, tx *sqlx.Tx, db *database.DB, objectId types.Binary) error {
if e.ID != 0 {
func (e *Event) String() string { return e.Name }

// MarshalLogObject implements the [zapcore.ObjectMarshaler] interface to allow logging the event as a structured object.
func (e *Event) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("name", e.Name)
encoder.AddTime("time", e.Time)
encoder.AddInt64("source_id", e.SourceId)
encoder.AddString("type", e.Type.String())
encoder.AddString("severity", e.Severity.String())
encoder.AddString("username", e.Username)
_ = encoder.AddObject("tags", zapcore.ObjectMarshalerFunc(func(objectEncoder zapcore.ObjectEncoder) error {
for key, value := range e.Tags {
objectEncoder.AddString(key, value)
}
return nil
}

eventRow := NewEventRow(e, objectId)
eventID, err := database.InsertObtainID(ctx, tx, database.BuildInsertStmtWithout(db, eventRow, "id"), eventRow)
if err == nil {
e.ID = eventID
}

return err
}

// EventRow represents a single event database row and isn't an in-memory representation of an event.
type EventRow struct {
ID int64 `db:"id"`
Time types.UnixMilli `db:"time"`
ObjectID types.Binary `db:"object_id"`
Type types.String `db:"type"`
Severity baseEv.Severity `db:"severity"`
Username types.String `db:"username"`
Message types.String `db:"message"`
Mute types.Bool `db:"mute"`
MuteReason types.String `db:"mute_reason"`
}

// TableName implements the contracts.TableNamer interface.
func (er *EventRow) TableName() string {
return "event"
}

func NewEventRow(e *Event, objectId types.Binary) *EventRow {
return &EventRow{
Time: types.UnixMilli(e.Time),
ObjectID: objectId,
Type: types.MakeString(e.Type.String(), types.TransformEmptyStringToNull),
Severity: e.Severity,
Username: types.MakeString(e.Username, types.TransformEmptyStringToNull),
Message: types.MakeString(e.Message, types.TransformEmptyStringToNull),
Mute: e.Mute,
MuteReason: types.MakeString(e.MuteReason, types.TransformEmptyStringToNull),
}
}))
return nil
}
12 changes: 0 additions & 12 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,6 @@ import (
"github.com/jmoiron/sqlx"
)

// EventRow represents a single incident event database entry.
type EventRow struct {
IncidentID int64 `db:"incident_id"`
EventID int64 `db:"event_id"`
}

// TableName implements the contracts.TableNamer interface.
func (e *EventRow) TableName() string {
return "incident_event"
}

// ContactRow represents a single incident contact database entry.
type ContactRow struct {
IncidentID int64 `db:"incident_id"`
Expand Down Expand Up @@ -68,7 +57,6 @@ type HistoryRow struct {
ID int64 `db:"id"`
IncidentID int64 `db:"incident_id"`
RuleEscalationID types.Int `db:"rule_escalation_id"`
EventID types.Int `db:"event_id"`
recipient.Key `db:",inline"`
RuleID types.Int `db:"rule_id"`
Time types.UnixMilli `db:"time"`
Expand Down
29 changes: 1 addition & 28 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error {
}
defer func() { _ = tx.Rollback() }()

if err = ev.Sync(ctx, tx, i.db, i.Object.ID); err != nil {
i.logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err))
return err
}

isNew := i.StartedAt.Time().IsZero()
if isNew {
err = i.processIncidentOpenedEvent(ctx, tx, ev)
Expand All @@ -159,11 +154,6 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error {
i.logger = i.logger.With(zap.String("incident", i.String()))
}

if err = i.AddEvent(ctx, tx, ev); err != nil {
i.logger.Errorw("Cannot insert incident event to the database", zap.Error(err))
return err
}

switch ev.Type {
case baseEv.TypeState:
if !isNew {
Expand Down Expand Up @@ -261,15 +251,6 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) {
var notifications []*NotificationEntry
ctx := context.Background()
err = i.db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error {
err := ev.Sync(ctx, tx, i.db, i.Object.ID)
if err != nil {
return err
}

if err = i.AddEvent(ctx, tx, ev); err != nil {
return fmt.Errorf("cannot insert incident event to the database: %w", err)
}

if err = i.triggerEscalations(ctx, tx, ev, escalations); err != nil {
return err
}
Expand Down Expand Up @@ -306,7 +287,6 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,

hr := &HistoryRow{
IncidentID: i.Id,
EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull),
Time: types.UnixMilli(time.Now()),
Type: IncidentSeverityChanged,
NewSeverity: newSeverity,
Expand All @@ -327,7 +307,6 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,

hr = &HistoryRow{
IncidentID: i.Id,
EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull),
Time: i.RecoveredAt,
Type: Closed,
}
Expand Down Expand Up @@ -365,7 +344,6 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx,
IncidentID: i.Id,
Type: Opened,
Time: types.UnixMilli(ev.Time),
EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull),
NewSeverity: i.Severity,
Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull),
}
Expand All @@ -391,7 +369,6 @@ func (i *Incident) handleUnmute(ctx context.Context, tx *sqlx.Tx, ev *event.Even

hr := &HistoryRow{
IncidentID: i.Id,
EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull),
Time: types.UnixMilli(time.Now()),
Type: Unmuted,
// On the other hand, if an object is unmuted, its mute reason is already reset, and we can't access it anymore.
Expand All @@ -413,7 +390,6 @@ func (i *Incident) handleMute(ctx context.Context, tx *sqlx.Tx, ev *event.Event)

hr := &HistoryRow{
IncidentID: i.Id,
EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull),
Time: types.UnixMilli(time.Now()),
Type: Muted,
// Since the object may have already been muted with previous events before this incident even
Expand Down Expand Up @@ -463,7 +439,6 @@ func (i *Incident) applyMatchingRules(ctx context.Context, tx *sqlx.Tx, ev *even
hr := &HistoryRow{
IncidentID: i.Id,
Time: types.UnixMilli(time.Now()),
EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull),
RuleID: types.MakeInt(r.ID, types.TransformZeroIntToNull),
Type: RuleMatched,
}
Expand Down Expand Up @@ -573,7 +548,6 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even
hr := &HistoryRow{
IncidentID: i.Id,
Time: state.TriggeredAt,
EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull),
RuleEscalationID: types.MakeInt(state.RuleEscalationID, types.TransformZeroIntToNull),
RuleID: types.MakeInt(r.ID, types.TransformZeroIntToNull),
Type: EscalationTriggered,
Expand All @@ -587,7 +561,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even
return err
}

if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil {
if err := i.AddRecipient(ctx, tx, escalation); err != nil {
return err
}
}
Expand Down Expand Up @@ -688,7 +662,6 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
hr := &HistoryRow{
IncidentID: i.Id,
Key: recipientKey,
EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull),
Type: RecipientRoleChanged,
Time: types.UnixMilli(time.Now()),
NewRecipientRole: newRole,
Expand Down
30 changes: 4 additions & 26 deletions internal/incident/incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/object"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -228,11 +227,6 @@ func ProcessEvent(
runtimeConfig *config.RuntimeConfig,
ev *event.Event,
) error {
var wasObjectMuted bool
if obj := object.GetFromCache(object.ID(ev.SourceId, ev.Tags)); obj != nil {
wasObjectMuted = obj.IsMuted()
}

obj, err := object.FromEvent(ctx, db, ev)
if err != nil {
return fmt.Errorf("cannot sync event object: %w", err)
Expand All @@ -251,29 +245,13 @@ func ProcessEvent(
}

if currentIncident == nil {
switch {
case ev.Severity == baseEv.SeverityNone:
// We need to ignore superfluous mute and unmute events here, as would be the case with an existing
// incident, otherwise the event stream catch-up phase will generate useless events after each
// Icinga 2 reload and overwhelm the database with the very same mute/unmute events.
if wasObjectMuted && ev.Type == baseEv.TypeMute {
return event.ErrSuperfluousMuteUnmuteEvent
} else if !wasObjectMuted && ev.Type == baseEv.TypeUnmute {
return event.ErrSuperfluousMuteUnmuteEvent
}

// There is no active incident, but the event appears to be relevant, so try to persist it in the DB.
err = db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { return ev.Sync(ctx, tx, db, obj.ID) })
if err != nil {
return errors.New("cannot sync non-state event to the database")
}

return nil
case ev.Severity != baseEv.SeverityOK:
if ev.Severity != baseEv.SeverityOK && ev.Severity != baseEv.SeverityNone {
panic(fmt.Sprintf("cannot process event %v with a non-OK state %v without a known incident", ev, ev.Severity))
default:
}
if ev.Severity == baseEv.SeverityOK {
return fmt.Errorf("%w: ok state event from source %d", event.ErrSuperfluousStateChange, ev.SourceId)
}
return nil
}

return currentIncident.ProcessEvent(ctx, ev)
Expand Down
13 changes: 1 addition & 12 deletions internal/incident/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,9 @@ func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, stat
return err
}

// AddEvent Inserts incident history record to the database and returns an error on db failure.
func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error {
ie := &EventRow{IncidentID: i.Id, EventID: ev.ID}
stmt, _ := i.db.BuildInsertStmt(ie)
_, err := tx.NamedExecContext(ctx, stmt, ie)

return err
}

// AddRecipient adds recipient from the given *rule.Escalation to this incident.
// Syncs also all the recipients with the database and returns an error on db failure.
func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error {
func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.Escalation) error {
newRole := RoleRecipient
if i.HasManager() {
newRole = RoleSubscriber
Expand All @@ -90,7 +81,6 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru

hr := &HistoryRow{
IncidentID: i.Id,
EventID: types.MakeInt(eventId, types.TransformZeroIntToNull),
Key: cr.Key,
Time: types.UnixMilli(time.Now()),
Type: RecipientRoleChanged,
Expand Down Expand Up @@ -148,7 +138,6 @@ func (i *Incident) generateNotifications(
hr := &HistoryRow{
IncidentID: i.Id,
Key: recipient.ToKey(contact),
EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull),
Time: types.UnixMilli(time.Now()),
Type: Notified,
ChannelID: types.MakeInt(chID, types.TransformZeroIntToNull),
Expand Down
2 changes: 2 additions & 0 deletions internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) {
}

l.logger.Infow("Processing event", zap.String("event", ev.String()))
l.logger.Debugw("Event details", zap.Object("event", &ev))

err := incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev)
if errors.Is(err, event.ErrSuperfluousStateChange) || errors.Is(err, event.ErrSuperfluousMuteUnmuteEvent) {
l.abort(w, http.StatusNotAcceptable, &ev, "%v", err)
Expand Down
28 changes: 0 additions & 28 deletions schema/mysql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -264,23 +264,6 @@ CREATE TABLE object_extra_tag (
CONSTRAINT fk_object_extra_tag_object FOREIGN KEY (object_id) REFERENCES object(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

CREATE TABLE event (
id bigint NOT NULL AUTO_INCREMENT,
time bigint NOT NULL,
object_id binary(32) NOT NULL,
-- NOT NULL is enforced via CHECK not to default to 'acknowledgement-cleared'
type enum('acknowledgement-cleared', 'acknowledgement-set', 'custom', 'downtime-end', 'downtime-removed', 'downtime-start', 'flapping-end', 'flapping-start', 'incident-age', 'mute', 'state', 'unmute'),
severity enum('ok', 'debug', 'info', 'notice', 'warning', 'err', 'crit', 'alert', 'emerg'),
message mediumtext,
username text COLLATE utf8mb4_unicode_ci,
mute enum('n', 'y'),
mute_reason mediumtext,

CONSTRAINT pk_event PRIMARY KEY (id),
CONSTRAINT ck_event_type_notnull CHECK (type IS NOT NULL),
CONSTRAINT fk_event_object FOREIGN KEY (object_id) REFERENCES object(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

CREATE TABLE rule (
id bigint NOT NULL AUTO_INCREMENT,
name text NOT NULL COLLATE utf8mb4_unicode_ci,
Expand Down Expand Up @@ -358,15 +341,6 @@ CREATE TABLE incident (
CONSTRAINT fk_incident_object FOREIGN KEY (object_id) REFERENCES object(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

CREATE TABLE incident_event (
incident_id bigint NOT NULL,
event_id bigint NOT NULL,

CONSTRAINT pk_incident_event PRIMARY KEY (incident_id, event_id),
CONSTRAINT fk_incident_event_incident FOREIGN KEY (incident_id) REFERENCES incident(id),
CONSTRAINT fk_incident_event_event FOREIGN KEY (event_id) REFERENCES event(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

CREATE TABLE incident_contact (
incident_id bigint NOT NULL,
contact_id bigint,
Expand Down Expand Up @@ -409,7 +383,6 @@ CREATE TABLE incident_history (
id bigint NOT NULL AUTO_INCREMENT,
incident_id bigint NOT NULL,
rule_escalation_id bigint,
event_id bigint,
contact_id bigint,
contactgroup_id bigint,
schedule_id bigint,
Expand All @@ -432,7 +405,6 @@ CREATE TABLE incident_history (
CONSTRAINT fk_incident_history_incident_rule_escalation_state FOREIGN KEY (incident_id, rule_escalation_id) REFERENCES incident_rule_escalation_state(incident_id, rule_escalation_id),
CONSTRAINT fk_incident_history_incident FOREIGN KEY (incident_id) REFERENCES incident(id),
CONSTRAINT fk_incident_history_rule_escalation FOREIGN KEY (rule_escalation_id) REFERENCES rule_escalation(id),
CONSTRAINT fk_incident_history_event FOREIGN KEY (event_id) REFERENCES event(id),
CONSTRAINT fk_incident_history_contact FOREIGN KEY (contact_id) REFERENCES contact(id),
CONSTRAINT fk_incident_history_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id),
CONSTRAINT fk_incident_history_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id),
Expand Down
3 changes: 3 additions & 0 deletions schema/mysql/upgrades/1.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE incident_event;
ALTER TABLE incident_history DROP CONSTRAINT fk_incident_history_event;
DROP TABLE event;
Loading
Loading