Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions datastore/postgres/index_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -61,6 +62,118 @@ func TestIndexE2E(t *testing.T) {
}
}

func TestRequeueIndexPartialsBatchAndSharedLayers(t *testing.T) {
integration.NeedDB(t)
ctx := test.Logging(t)
pool := pgtest.TestIndexerDB(ctx, t)
store := NewIndexerStore(pool)
scnr := mockScnr{name: "test-scanner", kind: "test", version: "v0.0.1"}
scnrs := indexer.VersionedScanners{scnr}
if err := store.RegisterScanners(ctx, scnrs); err != nil {
t.Fatalf("failed to register scanner: %v", err)
}

partialOnlyA := &claircore.Layer{Hash: test.RandomSHA256Digest(t)}
sharedWithFinished := &claircore.Layer{Hash: test.RandomSHA256Digest(t)}
partialOnlyB := &claircore.Layer{Hash: test.RandomSHA256Digest(t)}
partialOnlyC := &claircore.Layer{Hash: test.RandomSHA256Digest(t)}

partialA := claircore.Manifest{
Hash: test.RandomSHA256Digest(t),
Layers: []*claircore.Layer{partialOnlyA, sharedWithFinished},
}
finished := claircore.Manifest{
Hash: test.RandomSHA256Digest(t),
Layers: []*claircore.Layer{sharedWithFinished},
}
partialB := claircore.Manifest{
Hash: test.RandomSHA256Digest(t),
Layers: []*claircore.Layer{partialOnlyB},
}
partialC := claircore.Manifest{
Hash: test.RandomSHA256Digest(t),
Layers: []*claircore.Layer{partialOnlyC},
}

for _, m := range []claircore.Manifest{partialA, finished, partialB, partialC} {
if err := store.PersistManifest(ctx, m); err != nil {
t.Fatalf("failed to persist manifest %s: %v", m.Hash, err)
}
for _, l := range m.Layers {
if err := store.SetLayerScanned(ctx, l.Hash, scnr); err != nil {
t.Fatalf("failed to set layer %s scanned: %v", l.Hash, err)
}
}
}

for _, m := range []claircore.Manifest{partialA, partialB, partialC} {
ir := &claircore.IndexReport{Hash: m.Hash, State: "IndexPartial"}
if err := store.SetIndexPartial(ctx, ir, scnrs); err != nil {
t.Fatalf("failed to set partial index report %s: %v", m.Hash, err)
}
if err := ageIndexReport(ctx, pool, m.Hash, 2*time.Hour); err != nil {
t.Fatalf("failed to age partial index report %s: %v", m.Hash, err)
}
}
ir := &claircore.IndexReport{Hash: finished.Hash, State: "IndexFinished"}
if err := store.SetIndexFinished(ctx, ir, scnrs); err != nil {
t.Fatalf("failed to set finished index report: %v", err)
}

n, err := store.RequeueIndexPartials(ctx, time.Hour, 2)
if err != nil {
t.Fatalf("failed to requeue partial index reports: %v", err)
}
if n != 2 {
t.Fatalf("requeued %d partial index reports, want 2", n)
}

assertManifestScanned := func(m claircore.Manifest, want bool) {
t.Helper()
got, err := store.ManifestScanned(ctx, m.Hash, scnrs)
if err != nil {
t.Fatalf("failed to query manifest %s scanned: %v", m.Hash, err)
}
if got != want {
t.Fatalf("manifest %s scanned = %v, want %v", m.Hash, got, want)
}
}
assertLayerScanned := func(l *claircore.Layer, want bool) {
t.Helper()
got, err := store.LayerScanned(ctx, l.Hash, scnr)
if err != nil {
t.Fatalf("failed to query layer %s scanned: %v", l.Hash, err)
}
if got != want {
t.Fatalf("layer %s scanned = %v, want %v", l.Hash, got, want)
}
}

assertManifestScanned(partialA, false)
assertManifestScanned(partialB, false)
assertManifestScanned(partialC, true)
assertManifestScanned(finished, true)

assertLayerScanned(partialOnlyA, false)
assertLayerScanned(partialOnlyB, false)
assertLayerScanned(partialOnlyC, true)
assertLayerScanned(sharedWithFinished, true)
}

func ageIndexReport(ctx context.Context, pool *pgxpool.Pool, hash claircore.Digest, age time.Duration) error {
const query = `
UPDATE indexreport ir
SET
updated_at = now() - $2::interval
FROM manifest m
WHERE
ir.manifest_id = m.id
AND m.hash = $1;
`
_, err := pool.Exec(ctx, query, hash, age)
return err
}

// RunAll executes all test steps in sequence
func (e *indexE2e) RunAll(t testing.TB) {
steps := []struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
ALTER TABLE indexreport
ADD COLUMN IF NOT EXISTS updated_at timestamptz NOT NULL DEFAULT now();

UPDATE indexreport
SET
state = COALESCE(NULLIF(state, ''), scan_result ->> 'state')
WHERE
state IS NULL
OR state = '';

CREATE INDEX IF NOT EXISTS indexreport_partial_retry_idx ON indexreport (updated_at)
WHERE
state = 'IndexPartial';
13 changes: 8 additions & 5 deletions datastore/postgres/setindexfinished.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ INSERT
INTO
scanned_manifest (manifest_id, scanner_id)
VALUES
((SELECT manifest_id FROM manifests), $2);
((SELECT manifest_id FROM manifests), $2)
ON CONFLICT DO NOTHING;
`
upsertIndexReport = `
WITH
Expand All @@ -66,13 +67,15 @@ WITH
)
INSERT
INTO
indexreport (manifest_id, scan_result)
indexreport (manifest_id, state, scan_result, updated_at)
VALUES
((SELECT manifest_id FROM manifests), $2)
((SELECT manifest_id FROM manifests), $2, $3, now())
ON CONFLICT
(manifest_id)
DO
UPDATE SET scan_result = excluded.scan_result;
UPDATE SET state = excluded.state,
scan_result = excluded.scan_result,
updated_at = excluded.updated_at;
`
)

Expand All @@ -99,7 +102,7 @@ DO
}

start := time.Now()
_, err = tx.Exec(ctx, upsertIndexReport, ir.Hash, ir)
_, err = tx.Exec(ctx, upsertIndexReport, ir.Hash, ir.State, ir)
if err != nil {
return fmt.Errorf("failed to upsert scan result: %w", err)
}
Expand Down
83 changes: 83 additions & 0 deletions datastore/postgres/setindexpartial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package postgres

import (
"context"
"fmt"
"time"

"github.com/quay/claircore"
"github.com/quay/claircore/indexer"
)

// SetIndexPartial persists a degraded index report and marks the manifest as
// scanned to prevent immediate retry loops.
func (s *IndexerStore) SetIndexPartial(ctx context.Context, ir *claircore.IndexReport, scnrs indexer.VersionedScanners) error {
return s.SetIndexFinished(ctx, ir, scnrs)
}

func (s *IndexerStore) RequeueIndexPartials(ctx context.Context, minAge time.Duration, limit int) (int64, error) {
if limit < 1 {
limit = 1
}
const query = `
WITH partial_manifests AS (
SELECT
ir.manifest_id
FROM
indexreport ir
WHERE
ir.state = 'IndexPartial'
AND ir.updated_at <= now() - $1::interval
ORDER BY
ir.updated_at ASC,
ir.manifest_id ASC
LIMIT $2
),
partial_layers AS (
SELECT DISTINCT
ml.layer_id
FROM
manifest_layer ml
JOIN partial_manifests pm ON pm.manifest_id = ml.manifest_id
WHERE
NOT EXISTS (
SELECT
1
FROM
manifest_layer ml2
LEFT JOIN indexreport ir2 ON ir2.manifest_id = ml2.manifest_id
WHERE
ml2.layer_id = ml.layer_id
AND ml2.manifest_id NOT IN (
SELECT
manifest_id
FROM
partial_manifests
)
AND COALESCE(ir2.state, '') <> 'IndexPartial'
)
),
deleted_manifest_scans AS (
DELETE FROM scanned_manifest sm
USING partial_manifests pm
WHERE sm.manifest_id = pm.manifest_id
RETURNING sm.manifest_id
),
deleted_layer_scans AS (
DELETE FROM scanned_layer sl
USING partial_layers pl
WHERE sl.layer_id = pl.layer_id
RETURNING sl.layer_id
)
UPDATE indexreport ir
SET
updated_at = now()
FROM partial_manifests pm
WHERE ir.manifest_id = pm.manifest_id;
`
tag, err := s.pool.Exec(ctx, query, minAge, limit)
if err != nil {
return 0, fmt.Errorf("failed to requeue partial index reports: %w", err)
}
return tag.RowsAffected(), nil
}
10 changes: 6 additions & 4 deletions datastore/postgres/setindexreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ WITH
)
INSERT
INTO
indexreport (manifest_id, scan_result)
indexreport (manifest_id, state, scan_result, updated_at)
VALUES
((SELECT manifest_id FROM manifests), $2)
((SELECT manifest_id FROM manifests), $2, $3, now())
ON CONFLICT
(manifest_id)
DO
UPDATE SET scan_result = excluded.scan_result;
UPDATE SET state = excluded.state,
scan_result = excluded.scan_result,
updated_at = excluded.updated_at;
`
start := time.Now()
_, err := s.pool.Exec(ctx, query, ir.Hash, ir)
_, err := s.pool.Exec(ctx, query, ir.Hash, ir.State, ir)
if err != nil {
return fmt.Errorf("failed to upsert index report: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions indexer/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Controller struct {
report *claircore.IndexReport
// a fatal error halting the scanning process
err error
// partial records whether the scan completed with degraded scanner data.
partial bool
// the current state of the controller
currentState State
// Realizer is scoped to a single request
Expand Down
13 changes: 12 additions & 1 deletion indexer/controller/indexfinished.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,20 @@ import (
// indexer to the IndexFinished state the indexer will no longer transition
// and return an IndexReport to the caller
func indexFinished(ctx context.Context, s *Controller) (State, error) {
s.report.Success = true
slog.InfoContext(ctx, "finishing scan")

if s.partial {
s.report.Success = false
s.report.State = IndexPartial.String()
err := s.Store.SetIndexPartial(ctx, s.report, s.Vscnrs)
if err != nil {
return Terminal, fmt.Errorf("failed finish partial scan: %w", err)
}
slog.InfoContext(ctx, "manifest partially scanned")
return Terminal, nil
}

s.report.Success = true
err := s.Store.SetIndexFinished(ctx, s.report, s.Vscnrs)
if err != nil {
return Terminal, fmt.Errorf("failed finish scan: %w", err)
Expand Down
9 changes: 9 additions & 0 deletions indexer/controller/scanlayers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package controller

import (
"context"
"errors"
"fmt"
"log/slog"

"github.com/quay/claircore/indexer"
)

// scanLayers will run all scanner types against all layers if deemed necessary
Expand All @@ -13,6 +16,12 @@ func scanLayers(ctx context.Context, c *Controller) (State, error) {
defer slog.InfoContext(ctx, "layers scan done")
err := c.LayerScanner.Scan(ctx, c.manifest.Hash, c.manifest.Layers)
if err != nil {
if errors.Is(err, indexer.ErrScanPartial) {
c.partial = true
c.report.Err = err.Error()
slog.WarnContext(ctx, "layers scan completed with partial results", "reason", err)
return Coalesce, nil
}
return Terminal, fmt.Errorf("failed to scan all layer contents: %w", err)
}
slog.DebugContext(ctx, "layers scan ok")
Expand Down
6 changes: 6 additions & 0 deletions indexer/controller/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const (
// to the caller of Scan()
// Transitions: Terminal
IndexFinished
// IndexPartial is a terminal state indicating indexing completed with
// degraded scanner data and should be retried later.
IndexPartial
)

func (ss State) String() string {
Expand All @@ -51,6 +54,7 @@ func (ss State) String() string {
"IndexManifest",
"IndexError",
"IndexFinished",
"IndexPartial",
}
return names[ss]
}
Expand All @@ -73,6 +77,8 @@ func (ss *State) FromString(state string) {
*ss = IndexError
case "IndexFinished":
*ss = IndexFinished
case "IndexPartial":
*ss = IndexPartial
}
}

Expand Down
Loading
Loading