diff --git a/agent/runner/jobs/mongodb_restore_job.go b/agent/runner/jobs/mongodb_restore_job.go index 5597fa0488..750036ce34 100644 --- a/agent/runner/jobs/mongodb_restore_job.go +++ b/agent/runner/jobs/mongodb_restore_job.go @@ -27,6 +27,7 @@ import ( "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/percona/pmm/agent/utils/poll" agentv1 "github.com/percona/pmm/api/agent/v1" ) @@ -211,36 +212,38 @@ func (j *MongoDBRestoreJob) startRestore(ctx context.Context, backupName string) j.l.Infof("starting backup restore for: %s.", backupName) var restoreOutput pbmRestore - var err error startTime := time.Now() - - ticker := time.NewTicker(statusCheckInterval) - defer ticker.Stop() retryCount := 500 + started := false - for { - select { - case <-ticker.C: + err := poll.UntilContextTimeout(ctx, statusCheckInterval, func(ctx context.Context) (bool, error) { + // Preserve previous behavior: first restore command runs after the first tick. + if !started { + started = true + return false, nil + } - if j.pitrTimestamp.Unix() == 0 { - err = execPBMCommand(ctx, j.dbURL, &restoreOutput, "restore", backupName) - } else { - err = execPBMCommand(ctx, j.dbURL, &restoreOutput, "restore", "--time="+j.pitrTimestamp.Format("2006-01-02T15:04:05")) - } + var cmdErr error + if j.pitrTimestamp.Unix() == 0 { + cmdErr = execPBMCommand(ctx, j.dbURL, &restoreOutput, "restore", backupName) + } else { + cmdErr = execPBMCommand(ctx, j.dbURL, &restoreOutput, "restore", "--time="+j.pitrTimestamp.Format("2006-01-02T15:04:05")) + } - if err != nil { - if strings.HasSuffix(err.Error(), "another operation in progress") && retryCount > 0 { - retryCount-- - continue - } - return nil, fmt.Errorf("pbm restore error: %w", err) + if cmdErr != nil { + if strings.HasSuffix(cmdErr.Error(), "another operation in progress") && retryCount > 0 { + retryCount-- + return false, nil } - - restoreOutput.StartedAt = startTime - return &restoreOutput, nil - - case <-ctx.Done(): - return nil, ctx.Err() + return false, fmt.Errorf("pbm restore error: %w", cmdErr) } + + restoreOutput.StartedAt = startTime + return true, nil + }) + if err != nil { + return nil, err } + + return &restoreOutput, nil } diff --git a/agent/runner/jobs/pbm_helpers.go b/agent/runner/jobs/pbm_helpers.go index 7f7eda7121..4cd9ac9e8b 100644 --- a/agent/runner/jobs/pbm_helpers.go +++ b/agent/runner/jobs/pbm_helpers.go @@ -28,6 +28,8 @@ import ( "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" + + "github.com/percona/pmm/agent/utils/poll" ) const ( @@ -35,8 +37,25 @@ const ( resyncTimeout = 5 * time.Minute statusCheckInterval = 5 * time.Second maxRestoreChecks = 100 + + maxDescribeRetries = 5 + // PBM waits up to ~33s for backup metadata. Allow extra margin for describe-backup. + describeStartupGrace = 60 * time.Second + // After the operation stops, status/list metadata can lag behind describe failures. + describeCompletionGrace = 5 * time.Minute + describeRunningWarnInterval = 5 * time.Minute + + pbmCmdBackup = "backup" + pbmCmdRestore = "restore" + + pbmStatusDone = "done" + pbmStatusCanceled = "canceled" + pbmStatusError = "error" + pbmStatusPartlyDone = "partlyDone" ) +var errPBMOperationFailed = errors.New("operation failed") + type pbmSeverity int type describeInfo struct { @@ -48,6 +67,8 @@ type describeInfo struct { type replSet struct { Name string `json:"name"` Status string `json:"status"` + Error string `json:"error,omitempty"` + Node string `json:"node,omitempty"` Nodes []node `json:"nodes"` } @@ -213,23 +234,13 @@ func retrieveLogs(ctx context.Context, dsn string, event string) ([]pbmLogEntry, func waitForPBMNoRunningOperations(ctx context.Context, l logrus.FieldLogger, dsn string) error { l.Info("Waiting for no running pbm operations.") - ticker := time.NewTicker(statusCheckInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - status, err := getPBMStatus(ctx, dsn) - if err != nil { - return err - } - if status.Running.Type == "" { - return nil - } - case <-ctx.Done(): - return ctx.Err() + return poll.UntilContextTimeout(ctx, statusCheckInterval, func(ctx context.Context) (bool, error) { + status, err := getPBMStatus(ctx, dsn) + if err != nil { + return false, err } - } + return status.Running.Type == "", nil + }) } func isShardedCluster(ctx context.Context, dsn string) (bool, error) { @@ -254,42 +265,303 @@ func getPBMStatus(ctx context.Context, dsn string) (*pbmStatus, error) { return &status, nil } +type describePoller struct { + l logrus.FieldLogger + dsn string + operation string + name string + startedAt time.Time + finishedAt time.Time + lastRunningWarn time.Time + pollEvery time.Duration + retries int + fetchDescribe func(context.Context) (describeInfo, error) + fetchStatus func(context.Context, string) (*pbmStatus, error) + fetchRestoreList func(context.Context) ([]pbmListRestore, error) + isRunning func(*pbmStatus) bool + findSnapshot func(*pbmStatus) *pbmSnapshot +} + +func (cfg *describePoller) interval() time.Duration { + if cfg.pollEvery > 0 { + return cfg.pollEvery + } + return statusCheckInterval +} + +func newDescribePoller(l logrus.FieldLogger, dsn, operation, name string, fetchDescribe func(context.Context) (describeInfo, error)) *describePoller { + return &describePoller{ + l: l, + dsn: dsn, + operation: operation, + name: name, + startedAt: time.Now(), + retries: maxDescribeRetries, + fetchDescribe: fetchDescribe, + } +} + func waitForPBMBackup(ctx context.Context, l logrus.FieldLogger, dsn string, name string) error { l.Infof("waiting for pbm backup: %s", name) - ticker := time.NewTicker(statusCheckInterval) - defer ticker.Stop() - - retryCount := 500 - - for { - select { - case <-ticker.C: - var info describeInfo - err := execPBMCommand(ctx, dsn, &info, "describe-backup", name) - if err != nil { - // for the first couple of seconds after backup process starts describe-backup command may return this error - if (strings.HasSuffix(err.Error(), "no such file") || - strings.HasSuffix(err.Error(), "file is empty")) && retryCount > 0 { - retryCount-- - continue - } - return fmt.Errorf("failed to get backup status: %w", err) - } + return waitDescribe(ctx, newDescribePoller(l, dsn, pbmCmdBackup, name, func(ctx context.Context) (describeInfo, error) { + var info describeInfo + err := execPBMCommand(ctx, dsn, &info, "describe-backup", name) + return info, err + })) +} - switch info.Status { - case "done": - return nil - case "canceled": - return errors.New("backup was canceled") - case "error": - return errors.New(info.Error) - } +func waitDescribe(ctx context.Context, cfg *describePoller) error { + return poll.UntilContextTimeout(ctx, cfg.interval(), func(ctx context.Context) (bool, error) { + err := ctx.Err() + if err != nil { + return false, err + } + + done, err := pollDescribeOnce(ctx, cfg) + if err != nil { + return false, err + } + return done, nil + }) +} + +func pollDescribeOnce(ctx context.Context, cfg *describePoller) (bool, error) { + info, describeErr := cfg.fetchDescribe(ctx) + if describeErr == nil { + cfg.retries = maxDescribeRetries + return checkDescribe(info, cfg.operation) + } + + status, statusErr := cfg.getPBMStatus(ctx) + if statusErr != nil { + return false, fmt.Errorf("failed to get pbm status: %w", statusErr) + } + + running := cfg.opRunning(status) + cfg.trackFinished(running) + + if running { + cfg.warnRunningDescribe(describeErr) + cfg.logRunningDescribeErr(describeErr) + return false, nil + } + + if done, err := cfg.statusFallback(ctx, status); done { + return true, err + } + + if cfg.retryDescribeErr(describeErr) { + return false, nil + } + + return false, fmt.Errorf("failed to get %s status: %w", cfg.operation, describeErr) +} + +func (cfg *describePoller) getPBMStatus(ctx context.Context) (*pbmStatus, error) { + if cfg.fetchStatus != nil { + return cfg.fetchStatus(ctx, cfg.dsn) + } + return getPBMStatus(ctx, cfg.dsn) +} + +func (cfg *describePoller) trackFinished(running bool) { + if running { + cfg.finishedAt = time.Time{} + return + } + if cfg.finishedAt.IsZero() { + cfg.finishedAt = time.Now() + } +} + +func (cfg *describePoller) describeCmd() string { + return "describe-" + cfg.operation +} + +func (cfg *describePoller) warnRunningDescribe(describeErr error) { + if cfg.startedAt.IsZero() { + return + } + if time.Since(cfg.lastWarnAt()) < describeRunningWarnInterval { + return + } + cfg.lastRunningWarn = time.Now() + cfg.l.Warnf("%s %q is still running but %s keeps failing: %v", + cfg.operation, cfg.name, cfg.describeCmd(), describeErr) +} + +func (cfg *describePoller) lastWarnAt() time.Time { + if !cfg.lastRunningWarn.IsZero() { + return cfg.lastRunningWarn + } + return cfg.startedAt +} + +func (cfg *describePoller) logRunningDescribeErr(describeErr error) { + if retryTransient(describeErr, cfg, true) { + cfg.l.Debugf("%s transient error while %s %q is still running: %v", + cfg.describeCmd(), cfg.operation, cfg.name, describeErr) + return + } + cfg.l.Debugf("%s error while %s %q is still running: %v", + cfg.describeCmd(), cfg.operation, cfg.name, describeErr) +} + +func (cfg *describePoller) retryDescribeErr(describeErr error) bool { + if retryTransient(describeErr, cfg, false) { + cfg.l.Debugf("%s transient error while waiting for %s %q completion metadata: %v", + cfg.describeCmd(), cfg.operation, cfg.name, describeErr) + return true + } + return cfg.retryDescribeCmd(describeErr) +} + +func (cfg *describePoller) retryDescribeCmd(err error) bool { + if cfg.retries <= 0 { + return false + } + cfg.retries-- + cfg.l.Warnf("%s failed and will retry: %s", cfg.describeCmd(), err) + return true +} + +func (cfg *describePoller) statusFallback(ctx context.Context, status *pbmStatus) (bool, error) { + if snapshot := cfg.targetSnapshot(status); snapshot != nil { + return checkStatus(snapshot.Status, snapshot.Error, cfg.operation) + } + + if cfg.operation != pbmCmdRestore { + return false, nil + } + + list, err := cfg.listRestores(ctx) + if err != nil { + cfg.l.Debugf("failed to get restore list for fallback: %s", err) + return false, nil + } + if restore := restoreByName(list, cfg.name); restore != nil { + return checkStatus(restore.Status, restore.Error, cfg.operation) + } + return false, nil +} + +func (cfg *describePoller) listRestores(ctx context.Context) ([]pbmListRestore, error) { + if cfg.fetchRestoreList != nil { + return cfg.fetchRestoreList(ctx) + } + var list []pbmListRestore + err := execPBMCommand(ctx, cfg.dsn, &list, "list", "--restore") + return list, err +} - case <-ctx.Done(): - return ctx.Err() +func retryTransient(err error, cfg *describePoller, running bool) bool { + if !isTransientDescribeErr(err) { + return false + } + if running { + return time.Since(cfg.startedAt) < describeStartupGrace + } + since := cfg.startedAt + if !cfg.finishedAt.IsZero() { + since = cfg.finishedAt + } + return time.Since(since) < describeCompletionGrace +} + +// isTransientDescribeErr reports whether describe-backup/restore may fail +// temporarily while PBM metadata is not ready yet. Matches are based on known +// PBM CLI error texts and should be updated when PBM changes them. +func isTransientDescribeErr(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "no such file") || + strings.Contains(msg, "file is empty") || + strings.Contains(msg, "missed file") || + (strings.Contains(msg, "get backup meta") && strings.Contains(msg, "not found")) || + strings.Contains(msg, "get snapshot size") +} + +func (cfg *describePoller) opRunning(status *pbmStatus) bool { + if cfg.isRunning != nil { + return cfg.isRunning(status) + } + if cfg.operation != pbmCmdBackup && cfg.operation != pbmCmdRestore { + return false + } + return status.Running.Type == cfg.operation && status.Running.Name == cfg.name +} + +func (cfg *describePoller) targetSnapshot(status *pbmStatus) *pbmSnapshot { + if cfg.findSnapshot != nil { + return cfg.findSnapshot(status) + } + if cfg.operation == pbmCmdBackup { + return snapshotByName(status, cfg.name) + } + return nil +} + +func snapshotByName(status *pbmStatus, name string) *pbmSnapshot { + i := slices.IndexFunc(status.Backups.Snapshot, func(s pbmSnapshot) bool { + return s.Name == name + }) + if i < 0 { + return nil + } + return &status.Backups.Snapshot[i] +} + +func restoreByName(list []pbmListRestore, name string) *pbmListRestore { + i := slices.IndexFunc(list, func(r pbmListRestore) bool { + return r.Name == name + }) + if i < 0 { + return nil + } + return &list[i] +} + +func checkDescribe(info describeInfo, operation string) (bool, error) { + switch info.Status { + case pbmStatusError: + return true, describeErr(info, operation) + case pbmStatusPartlyDone: + return true, groupDescribeErrs(info) + default: + return checkStatus(info.Status, info.Error, operation) + } +} + +func checkStatus(status, errMsg, operation string) (bool, error) { + switch status { + case pbmStatusDone: + return true, nil + case pbmStatusCanceled: + return true, fmt.Errorf("%s was canceled", operation) + case pbmStatusError: + if errMsg != "" { + return true, errors.New(errMsg) + } + return true, fmt.Errorf("%s failed", operation) + case pbmStatusPartlyDone: + if errMsg != "" { + return true, errors.New(errMsg) } + return true, fmt.Errorf("%s partly completed", operation) + default: + return false, nil + } +} + +func describeErr(info describeInfo, operation string) error { + err := groupDescribeErrs(info) + if err != nil && !errors.Is(err, errPBMOperationFailed) { + return err } + return fmt.Errorf("%s failed", operation) } func findPITRRestore(list []pbmListRestore, restoreInfoPITRTime int64, startedAt time.Time) *pbmListRestore { @@ -320,32 +592,45 @@ func findPITRRestoreName(ctx context.Context, dsn string, restoreInfo *pbmRestor return "", err } - ticker := time.NewTicker(statusCheckInterval) - defer ticker.Stop() - + var name string checks := 0 - for { - select { - case <-ticker.C: - checks++ - var list []pbmListRestore - err := execPBMCommand(ctx, dsn, &list, "list", "--restore") - if err != nil { - return "", fmt.Errorf("pbm status error: %w", err) - } - entry := findPITRRestore(list, restoreInfoPITRTime.Unix(), restoreInfo.StartedAt) - if entry == nil { - if checks > maxRestoreChecks { - return "", errors.New("failed to start restore") - } - continue - } else { - return entry.Name, nil - } - case <-ctx.Done(): - return "", ctx.Err() + err = poll.UntilContextTimeout(ctx, statusCheckInterval, func(ctx context.Context) (bool, error) { + err = ctx.Err() + if err != nil { + return false, err + } + + checks++ + var list []pbmListRestore + err = execPBMCommand(ctx, dsn, &list, "list", "--restore") + if err != nil { + return false, fmt.Errorf("pbm status error: %w", err) } + entry := findPITRRestore(list, restoreInfoPITRTime.Unix(), restoreInfo.StartedAt) + if entry != nil { + name = entry.Name + return true, nil + } + if checks > maxRestoreChecks { + return false, errors.New("failed to start restore") + } + return false, nil + }) + if err != nil { + return "", err } + + return name, nil +} + +func fetchRestoreDescribe(ctx context.Context, dsn, name, backupType, confFile string) (describeInfo, error) { + var info describeInfo + args := []string{"describe-restore", name} + if backupType == "physical" { + args = append(args, "--config="+confFile) + } + err := execPBMCommand(ctx, dsn, &info, args...) + return info, err } func waitForPBMRestore(ctx context.Context, l logrus.FieldLogger, dsn string, restoreInfo *pbmRestore, backupType, confFile string) error { @@ -365,47 +650,9 @@ func waitForPBMRestore(ctx context.Context, l logrus.FieldLogger, dsn string, re l.Infof("waiting for pbm restore: %s", name) - ticker := time.NewTicker(statusCheckInterval) - defer ticker.Stop() - - maxRetryCount := 5 - for { - select { - case <-ticker.C: - var info describeInfo - if backupType == "physical" { - err = execPBMCommand(ctx, dsn, &info, "describe-restore", name, "--config="+confFile) - } else { - err = execPBMCommand(ctx, dsn, &info, "describe-restore", name) - } - if err != nil { - if maxRetryCount > 0 { - maxRetryCount-- - l.Warnf("PMM failed to get backup restore status and will retry: %s", err) - continue - } else { //nolint:revive - return fmt.Errorf("failed to get restore status: %w", err) - } - } - // reset maxRetryCount if we were able to successfully get the current restore status - maxRetryCount = 5 - - switch info.Status { - case "done": - return nil - case "canceled": - return errors.New("restore was canceled") - case "error": - return errors.New(info.Error) - // We consider partlyDone as an error because we cannot automatically recover cluster from this status to fully working. - case "partlyDone": - return groupPartlyDoneErrors(info) - } - - case <-ctx.Done(): - return ctx.Err() - } - } + return waitDescribe(ctx, newDescribePoller(l, dsn, pbmCmdRestore, name, func(ctx context.Context) (describeInfo, error) { + return fetchRestoreDescribe(ctx, dsn, name, backupType, confFile) + })) } func pbmConfigure(ctx context.Context, l logrus.FieldLogger, params pbmConfigParams) error { @@ -538,18 +785,29 @@ func createPBMConfig(locationConfig *BackupLocationConfig, prefix string, pitr b return conf, nil } -func groupPartlyDoneErrors(info describeInfo) error { +func groupDescribeErrs(info describeInfo) error { var errMsgs []string + if info.Error != "" { + errMsgs = append(errMsgs, info.Error) + } + for _, rs := range info.ReplSets { - if rs.Status == "partlyDone" { - for _, node := range rs.Nodes { - if node.Status == "error" { - errMsgs = append(errMsgs, fmt.Sprintf("replset: %s, node: %s, error: %s", rs.Name, node.Name, node.Error)) + if rs.Error != "" { + errMsgs = append(errMsgs, fmt.Sprintf("replset: %s, error: %s", rs.Name, rs.Error)) + } + if rs.Status == pbmStatusPartlyDone { + for _, n := range rs.Nodes { + if n.Status == pbmStatusError { + errMsgs = append(errMsgs, fmt.Sprintf("replset: %s, node: %s, error: %s", rs.Name, n.Name, n.Error)) } } } } + + if len(errMsgs) == 0 { + return errPBMOperationFailed + } return errors.New(strings.Join(errMsgs, "; ")) } @@ -572,31 +830,29 @@ func pbmGetSnapshotTimestamp(ctx context.Context, l logrus.FieldLogger, dsn stri // getSnapshots returns all PBM snapshots found in configured location. func getSnapshots(ctx context.Context, l logrus.FieldLogger, dsn string) ([]pbmSnapshot, error) { // Sometimes PBM returns empty list of snapshots, that's why we're trying to get them several times. - ticker := time.NewTicker(listCheckInterval) - defer ticker.Stop() - + var snapshots []pbmSnapshot checks := 0 - for { - select { - case <-ticker.C: - checks++ - status, err := getPBMStatus(ctx, dsn) - if err != nil { - return nil, err - } + err := poll.UntilContextTimeout(ctx, listCheckInterval, func(ctx context.Context) (bool, error) { + checks++ + status, err := getPBMStatus(ctx, dsn) + if err != nil { + return false, err + } - if len(status.Backups.Snapshot) == 0 { - l.Debugf("Attempt %d to get a list of PBM artifacts has failed.", checks) - if checks > maxListChecks { - return nil, fmt.Errorf("got no one snapshot: %w", ErrNotFound) - } - continue + if len(status.Backups.Snapshot) == 0 { + l.Debugf("Attempt %d to get a list of PBM artifacts has failed.", checks) + if checks > maxListChecks { + return false, fmt.Errorf("got no one snapshot: %w", ErrNotFound) } - - return status.Backups.Snapshot, nil - - case <-ctx.Done(): - return nil, ctx.Err() + return false, nil } + + snapshots = status.Backups.Snapshot + return true, nil + }) + if err != nil { + return nil, err } + + return snapshots, nil } diff --git a/agent/runner/jobs/pbm_helpers_test.go b/agent/runner/jobs/pbm_helpers_test.go index ccc1031da5..2e01eda88a 100644 --- a/agent/runner/jobs/pbm_helpers_test.go +++ b/agent/runner/jobs/pbm_helpers_test.go @@ -15,13 +15,39 @@ package jobs import ( + "context" + "errors" + "os" + "strings" "testing" "time" + "github.com/sirupsen/logrus" + logrustest "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func newTestPoller(t *testing.T, opts ...func(*describePoller)) *describePoller { + t.Helper() + + cfg := &describePoller{ + l: logrus.New(), + dsn: "mongodb://localhost", + operation: pbmCmdBackup, + name: "2024-01-01T00:00:00Z", + startedAt: time.Now(), + retries: maxDescribeRetries, + fetchDescribe: func(context.Context) (describeInfo, error) { + return describeInfo{Status: pbmStatusDone}, nil + }, + } + for _, opt := range opts { + opt(cfg) + } + return cfg +} + func TestCreatePBMConfig(t *testing.T) { s3Config := S3LocationConfig{ Endpoint: "test_endpoint", @@ -126,6 +152,683 @@ func TestCreatePBMConfig(t *testing.T) { } } +func TestIsTransientDescribeErr(t *testing.T) { + t.Parallel() + + assert.False(t, isTransientDescribeErr(nil)) + + for _, tc := range []struct { + name string + err error + want bool + }{ + { + name: "no such file", + err: errors.New(`get file 2024-01-01T00:00:00Z/rs0/metadata.json: no such file`), + want: true, + }, + { + name: "file is empty", + err: errors.New("get file foo: file is empty"), + want: true, + }, + { + name: "backup meta not found", + err: errors.New("get backup meta: not found"), + want: true, + }, + { + name: "get snapshot size", + err: errors.New("get snapshot size: missed file"), + want: true, + }, + { + name: "backup meta permission denied", + err: errors.New("get backup meta: permission denied"), + want: false, + }, + { + name: "generic not found", + err: errors.New("authentication failed: user not found"), + want: false, + }, + { + name: "real failure", + err: errors.New("permission denied"), + want: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tc.want, isTransientDescribeErr(tc.err)) + }) + } +} + +func TestOpRunning(t *testing.T) { + t.Parallel() + + backupCfg := &describePoller{ + operation: pbmCmdBackup, + name: "backup-1", + } + status := &pbmStatus{} + status.Running.Type = pbmCmdBackup + status.Running.Name = "backup-1" + assert.True(t, backupCfg.opRunning(status)) + + restoreCfg := &describePoller{ + operation: pbmCmdRestore, + name: "restore-1", + } + status.Running.Type = pbmCmdRestore + status.Running.Name = "restore-1" + assert.True(t, restoreCfg.opRunning(status)) + status.Running.Name = "restore-2" + assert.False(t, restoreCfg.opRunning(status)) + + customCfg := &describePoller{ + isRunning: func(*pbmStatus) bool { return true }, + } + assert.True(t, customCfg.opRunning(status)) + + unknownCfg := &describePoller{operation: "unknown"} + assert.False(t, unknownCfg.opRunning(status)) +} + +func TestTargetSnapshot(t *testing.T) { + t.Parallel() + + cfg := &describePoller{ + operation: pbmCmdBackup, + name: "snap-1", + } + status := &pbmStatus{} + status.Backups.Snapshot = []pbmSnapshot{{Name: "snap-1"}} + assert.NotNil(t, cfg.targetSnapshot(status)) + + cfg.operation = pbmCmdRestore + assert.Nil(t, cfg.targetSnapshot(status)) +} + +func TestRetryTransient(t *testing.T) { + t.Parallel() + + transientErr := errors.New("no such file") + + t.Run("startup grace while running", func(t *testing.T) { + t.Parallel() + cfg := &describePoller{startedAt: time.Now()} + assert.True(t, retryTransient(transientErr, cfg, true)) + cfg.startedAt = time.Now().Add(-describeStartupGrace) + assert.False(t, retryTransient(transientErr, cfg, true)) + assert.False(t, retryTransient(errors.New("permission denied"), &describePoller{startedAt: time.Now()}, true)) + }) + + t.Run("completion grace after operation finished", func(t *testing.T) { + t.Parallel() + cfg := &describePoller{ + startedAt: time.Now().Add(-2 * time.Hour), + finishedAt: time.Now().Add(-1 * time.Minute), + } + assert.True(t, retryTransient(transientErr, cfg, false)) + cfg.finishedAt = time.Now().Add(-describeCompletionGrace) + assert.False(t, retryTransient(transientErr, cfg, false)) + }) +} + +func TestRetryDescribeCmd(t *testing.T) { + t.Parallel() + + cfg := &describePoller{ + l: logrus.New(), + operation: pbmCmdBackup, + retries: 1, + } + + assert.True(t, cfg.retryDescribeCmd(errors.New("temporary"))) + assert.Equal(t, 0, cfg.retries) + assert.False(t, cfg.retryDescribeCmd(errors.New("temporary"))) +} + +func TestDescribeErr(t *testing.T) { + t.Parallel() + + err := describeErr(describeInfo{Status: pbmStatusError}, pbmCmdBackup) + require.EqualError(t, err, "backup failed") + + err = describeErr(describeInfo{Status: pbmStatusError, Error: "oplog gap"}, pbmCmdBackup) + require.EqualError(t, err, "oplog gap") +} + +func TestGroupDescribeErrs_AllBranches(t *testing.T) { + t.Parallel() + + err := groupDescribeErrs(describeInfo{}) + require.ErrorIs(t, err, errPBMOperationFailed) + + err = groupDescribeErrs(describeInfo{Error: "top level"}) + require.EqualError(t, err, "top level") + + err = groupDescribeErrs(describeInfo{ + ReplSets: []replSet{{Name: "rs0", Error: "rs failed"}}, + }) + require.EqualError(t, err, "replset: rs0, error: rs failed") +} + +func TestPollDescribeOnce(t *testing.T) { + t.Parallel() + + t.Run("describe done", func(t *testing.T) { + t.Parallel() + done, err := pollDescribeOnce(context.Background(), newTestPoller(t)) + require.NoError(t, err) + assert.True(t, done) + }) + + t.Run("describe in progress", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{Status: "running"}, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.False(t, done) + }) + + t.Run("describe canceled", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{Status: pbmStatusCanceled}, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.EqualError(t, err, "backup was canceled") + assert.True(t, done) + }) + + t.Run("describe partly done", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{ + Status: pbmStatusPartlyDone, + ReplSets: []replSet{{ + Name: "rs0", + Status: pbmStatusPartlyDone, + Nodes: []node{{ + Name: "node1", + Status: pbmStatusError, + Error: "failed node", + }}, + }}, + }, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.EqualError(t, err, "replset: rs0, node: node1, error: failed node") + assert.True(t, done) + }) + + t.Run("status fetch error", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("describe failed") + } + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + return nil, errors.New("status unavailable") + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.ErrorContains(t, err, "failed to get pbm status") + assert.False(t, done) + }) + + t.Run("running backup with transient describe error", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("no such file") + } + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + status := &pbmStatus{} + status.Running.Type = pbmCmdBackup + status.Running.Name = c.name + return status, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.False(t, done) + }) + + t.Run("running backup does not consume retries", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.startedAt = time.Now().Add(-describeStartupGrace) + c.retries = maxDescribeRetries + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("permission denied") + } + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + status := &pbmStatus{} + status.Running.Type = pbmCmdBackup + status.Running.Name = c.name + return status, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.False(t, done) + assert.Equal(t, maxDescribeRetries, cfg.retries) + }) + + t.Run("snapshot done when describe fails", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("no such file") + } + c.startedAt = time.Now().Add(-describeStartupGrace) + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + status := &pbmStatus{} + status.Backups.Snapshot = []pbmSnapshot{{ + Name: c.name, + Status: pbmStatusDone, + }} + return status, nil + } + c.findSnapshot = func(status *pbmStatus) *pbmSnapshot { + return snapshotByName(status, c.name) + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.True(t, done) + }) + + t.Run("snapshot terminal error", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("no such file") + } + c.startedAt = time.Now().Add(-describeStartupGrace) + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + status := &pbmStatus{} + status.Backups.Snapshot = []pbmSnapshot{{ + Name: c.name, + Status: pbmStatusError, + Error: "storage error", + }} + return status, nil + } + c.findSnapshot = func(status *pbmStatus) *pbmSnapshot { + return snapshotByName(status, c.name) + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.EqualError(t, err, "storage error") + assert.True(t, done) + }) + + t.Run("restore done when describe fails", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.operation = pbmCmdRestore + c.name = "2024-01-01T12:00:00Z" + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("no such file") + } + c.startedAt = time.Now().Add(-describeStartupGrace) + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + return &pbmStatus{}, nil + } + c.fetchRestoreList = func(context.Context) ([]pbmListRestore, error) { + return []pbmListRestore{{ + Name: c.name, + Status: pbmStatusDone, + }}, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.True(t, done) + }) + + t.Run("restore terminal error when describe fails", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.operation = pbmCmdRestore + c.name = "2024-01-01T12:00:00Z" + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("permission denied") + } + c.startedAt = time.Now().Add(-describeStartupGrace) + c.retries = 0 + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + return &pbmStatus{}, nil + } + c.fetchRestoreList = func(context.Context) ([]pbmListRestore, error) { + return []pbmListRestore{{ + Name: c.name, + Status: pbmStatusError, + Error: "node copy failed", + }}, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.EqualError(t, err, "node copy failed") + assert.True(t, done) + }) + + t.Run("startup grace for transient error", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("file is empty") + } + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + return &pbmStatus{}, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.False(t, done) + }) + + t.Run("retries after startup grace when retries remain", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.startedAt = time.Now().Add(-describeStartupGrace) + c.finishedAt = time.Now().Add(-describeCompletionGrace) + c.retries = 2 + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("permission denied") + } + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + return &pbmStatus{}, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.False(t, done) + assert.Equal(t, 1, cfg.retries) + }) + + t.Run("running backup with exhausted retries keeps waiting", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.startedAt = time.Now().Add(-describeStartupGrace) + c.retries = 0 + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("permission denied") + } + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + status := &pbmStatus{} + status.Running.Type = pbmCmdBackup + status.Running.Name = c.name + return status, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.False(t, done) + }) + + t.Run("describe failure without running backup", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.startedAt = time.Now().Add(-describeCompletionGrace) + c.finishedAt = time.Now().Add(-describeCompletionGrace) + c.retries = 0 + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("permission denied") + } + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + return &pbmStatus{}, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.ErrorContains(t, err, "failed to get backup status") + assert.False(t, done) + }) + + t.Run("restore list fetch error keeps polling", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.operation = pbmCmdRestore + c.name = "2024-01-01T12:00:00Z" + c.startedAt = time.Now().Add(-describeStartupGrace) + c.finishedAt = time.Now().Add(-30 * time.Second) + c.retries = 0 + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("no such file") + } + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + return &pbmStatus{}, nil + } + c.fetchRestoreList = func(context.Context) ([]pbmListRestore, error) { + return nil, errors.New("list unavailable") + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.False(t, done) + }) + + t.Run("transient error after operation finished uses completion grace", func(t *testing.T) { + t.Parallel() + cfg := newTestPoller(t, func(c *describePoller) { + c.startedAt = time.Now().Add(-2 * time.Hour) + c.finishedAt = time.Now().Add(-30 * time.Second) + c.retries = 0 + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("file is empty") + } + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + return &pbmStatus{}, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.False(t, done) + }) + + t.Run("warns when describe keeps failing while running", func(t *testing.T) { + t.Parallel() + logger, hook := logrustest.NewNullLogger() + cfg := newTestPoller(t, func(c *describePoller) { + c.l = logger + c.startedAt = time.Now().Add(-describeRunningWarnInterval - time.Second) + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{}, errors.New("permission denied") + } + c.fetchStatus = func(context.Context, string) (*pbmStatus, error) { + status := &pbmStatus{} + status.Running.Type = pbmCmdBackup + status.Running.Name = c.name + return status, nil + } + }) + done, err := pollDescribeOnce(context.Background(), cfg) + require.NoError(t, err) + assert.False(t, done) + var found bool + for _, entry := range hook.Entries { + if entry.Level == logrus.WarnLevel && strings.Contains(entry.Message, "still running") { + found = true + break + } + } + assert.True(t, found) + }) +} + +func TestWaitDescribe(t *testing.T) { + t.Run("completes when describe reports done", func(t *testing.T) { + cfg := newTestPoller(t, func(c *describePoller) { + c.pollEvery = time.Millisecond + }) + err := waitDescribe(context.Background(), cfg) + require.NoError(t, err) + }) + + t.Run("returns describe error", func(t *testing.T) { + cfg := newTestPoller(t, func(c *describePoller) { + c.pollEvery = time.Millisecond + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{Status: pbmStatusCanceled}, nil + } + }) + err := waitDescribe(context.Background(), cfg) + require.EqualError(t, err, "backup was canceled") + }) + + t.Run("context canceled", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err := waitDescribe(ctx, newTestPoller(t, func(c *describePoller) { + c.pollEvery = time.Millisecond + c.fetchDescribe = func(context.Context) (describeInfo, error) { + return describeInfo{Status: "running"}, nil + } + })) + require.ErrorIs(t, err, context.Canceled) + }) +} + +func TestWritePBMConfigFile(t *testing.T) { + t.Parallel() + + conf, err := createPBMConfig(&BackupLocationConfig{ + Type: FilesystemBackupLocationType, + FilesystemStorageConfig: &FilesystemBackupLocationConfig{ + Path: "/tmp/pbm", + }, + }, "artifact", false) + require.NoError(t, err) + + path, err := writePBMConfigFile(conf) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.Remove(path)) + }) + require.FileExists(t, path) +} + +func TestCheckDescribe(t *testing.T) { + t.Parallel() + + done, err := checkDescribe(describeInfo{Status: pbmStatusDone}, pbmCmdBackup) + require.NoError(t, err) + assert.True(t, done) + + done, err = checkDescribe(describeInfo{Status: pbmStatusCanceled}, pbmCmdBackup) + require.EqualError(t, err, "backup was canceled") + assert.True(t, done) + + done, err = checkDescribe(describeInfo{Status: pbmStatusError, Error: "oplog has insufficient range"}, pbmCmdBackup) + require.EqualError(t, err, "oplog has insufficient range") + assert.True(t, done) + + done, err = checkDescribe(describeInfo{Status: pbmStatusPartlyDone, Error: "partial"}, pbmCmdBackup) + require.EqualError(t, err, "partial") + assert.True(t, done) + + done, err = checkDescribe(describeInfo{Status: "running"}, pbmCmdBackup) + require.NoError(t, err) + assert.False(t, done) +} + +func TestCheckStatus(t *testing.T) { + t.Parallel() + + done, err := checkStatus(pbmStatusDone, "", pbmCmdBackup) + require.NoError(t, err) + assert.True(t, done) + + done, err = checkStatus(pbmStatusCanceled, "", pbmCmdBackup) + require.EqualError(t, err, "backup was canceled") + assert.True(t, done) + + done, err = checkStatus(pbmStatusError, "storage unavailable", pbmCmdBackup) + require.EqualError(t, err, "storage unavailable") + assert.True(t, done) + + done, err = checkStatus(pbmStatusError, "", pbmCmdBackup) + require.EqualError(t, err, "backup failed") + assert.True(t, done) + + done, err = checkStatus(pbmStatusPartlyDone, "", pbmCmdRestore) + require.EqualError(t, err, "restore partly completed") + assert.True(t, done) + + done, err = checkStatus("running", "", pbmCmdBackup) + require.NoError(t, err) + assert.False(t, done) +} + +func TestRestoreByName(t *testing.T) { + t.Parallel() + + list := []pbmListRestore{ + {Name: "2024-01-01T00:00:00Z", Status: pbmStatusDone}, + {Name: "2024-01-02T00:00:00Z", Status: pbmStatusError, Error: "failed"}, + } + + assert.Nil(t, restoreByName(list, "missing")) + require.NotNil(t, restoreByName(list, "2024-01-02T00:00:00Z")) +} + +func TestSnapshotByName(t *testing.T) { + t.Parallel() + + status := &pbmStatus{} + status.Backups.Snapshot = []pbmSnapshot{ + {Name: "2024-01-01T00:00:00Z", Status: pbmStatusDone}, + {Name: "2024-01-02T00:00:00Z", Status: pbmStatusError, Error: "failed"}, + } + + assert.Nil(t, snapshotByName(status, "missing")) + require.NotNil(t, snapshotByName(status, "2024-01-02T00:00:00Z")) +} + +func TestGroupDescribeErrs(t *testing.T) { + t.Parallel() + + err := groupDescribeErrs(describeInfo{ + Status: pbmStatusPartlyDone, + ReplSets: []replSet{{ + Name: "rs0", + Status: pbmStatusPartlyDone, + Nodes: []node{{ + Name: "node1", + Status: pbmStatusError, + Error: "copy failed", + }}, + }}, + }) + require.EqualError(t, err, "replset: rs0, node: node1, error: copy failed") +} + +func TestFindPITRRestoreSkipsInvalidEntries(t *testing.T) { + t.Parallel() + + startedAt, err := time.Parse(time.RFC3339Nano, "2022-10-11T14:53:20.000000000Z") + require.NoError(t, err) + + list := []pbmListRestore{ + {Name: "invalid-name", Type: "pitr", PITR: 1000000000}, + {Name: "2022-10-11T14:53:20.000000001Z", Type: "snapshot", Snapshot: "snap"}, + } + assert.Nil(t, findPITRRestore(list, 1000000000, startedAt)) +} + func TestFindPITRRestore(t *testing.T) { // Tested func searches from the end, so we place records to be skipped at the end. testList := []pbmListRestore{ diff --git a/agent/utils/poll/poll.go b/agent/utils/poll/poll.go new file mode 100644 index 0000000000..00d370be38 --- /dev/null +++ b/agent/utils/poll/poll.go @@ -0,0 +1,68 @@ +// Copyright (C) 2023 Percona LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package poll provides helpers for polling until a condition is met. +package poll + +import ( + "context" + "fmt" + "time" +) + +// ConditionFunc returns true when the condition is successfully met. +type ConditionFunc func(ctx context.Context) (done bool, err error) + +// UntilContextTimeout polls until condition returns done=true, err!=nil, or ctx is canceled. +func UntilContextTimeout(ctx context.Context, interval time.Duration, condition ConditionFunc) error { + if interval <= 0 { + return fmt.Errorf("interval must be positive: %s", interval) + } + + err := ctx.Err() + if err != nil { + return err + } + + done, err := condition(ctx) + if err != nil { + return err + } + if done { + return nil + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + err = ctx.Err() + if err != nil { + return err + } + + done, err := condition(ctx) + if err != nil { + return err + } + if done { + return nil + } + } + } +} diff --git a/agent/utils/poll/poll_test.go b/agent/utils/poll/poll_test.go new file mode 100644 index 0000000000..cd4984fcdc --- /dev/null +++ b/agent/utils/poll/poll_test.go @@ -0,0 +1,103 @@ +// Copyright (C) 2023 Percona LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package poll + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUntilContextTimeout(t *testing.T) { + t.Parallel() + + t.Run("immediate success", func(t *testing.T) { + t.Parallel() + + calls := 0 + err := UntilContextTimeout(t.Context(), time.Millisecond, func(context.Context) (bool, error) { + calls++ + return true, nil + }) + require.NoError(t, err) + assert.Equal(t, 1, calls) + }) + + t.Run("success after retries", func(t *testing.T) { + t.Parallel() + + calls := 0 + err := UntilContextTimeout(t.Context(), time.Millisecond, func(context.Context) (bool, error) { + calls++ + return calls == 3, nil + }) + require.NoError(t, err) + assert.Equal(t, 3, calls) + }) + + t.Run("condition error", func(t *testing.T) { + t.Parallel() + + expected := errors.New("boom") + err := UntilContextTimeout(t.Context(), time.Millisecond, func(context.Context) (bool, error) { + return false, expected + }) + require.ErrorIs(t, err, expected) + }) + + t.Run("context canceled", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + calls := 0 + err := UntilContextTimeout(ctx, time.Millisecond, func(context.Context) (bool, error) { + calls++ + return false, nil + }) + require.ErrorIs(t, err, context.Canceled) + assert.Equal(t, 0, calls) + }) + + t.Run("context timeout", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(t.Context(), 20*time.Millisecond) + defer cancel() + + err := UntilContextTimeout(ctx, 5*time.Millisecond, func(context.Context) (bool, error) { + return false, nil + }) + require.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("invalid interval", func(t *testing.T) { + t.Parallel() + + calls := 0 + err := UntilContextTimeout(t.Context(), 0, func(context.Context) (bool, error) { + calls++ + return true, nil + }) + require.Error(t, err) + require.ErrorContains(t, err, "interval must be positive") + assert.Equal(t, 0, calls) + }) +}