From d102a2a59a90867e6fc894b7c540fde3eae26376 Mon Sep 17 00:00:00 2001 From: hweawer Date: Mon, 14 Jul 2025 12:50:14 +0200 Subject: [PATCH 1/3] Improve kraken origin logging --- origin/blobserver/config.go | 50 ++++ origin/blobserver/server.go | 541 ++++++++++++++++++++++++++++++++---- 2 files changed, 537 insertions(+), 54 deletions(-) diff --git a/origin/blobserver/config.go b/origin/blobserver/config.go index d6719311f..ea0306dd9 100644 --- a/origin/blobserver/config.go +++ b/origin/blobserver/config.go @@ -23,11 +23,61 @@ import ( type Config struct { Listener listener.Config `yaml:"listener"` DuplicateWriteBackStagger time.Duration `yaml:"duplicate_write_back_stagger"` + + // Timeout configurations + DownloadTimeout time.Duration `yaml:"download_timeout"` + UploadTimeout time.Duration `yaml:"upload_timeout"` + ReplicationTimeout time.Duration `yaml:"replication_timeout"` + BackendTimeout time.Duration `yaml:"backend_timeout"` + ReadinessTimeout time.Duration `yaml:"readiness_timeout"` + + // Limit configurations + MaxConcurrentDownloads int `yaml:"max_concurrent_downloads"` + MaxConcurrentUploads int `yaml:"max_concurrent_uploads"` + MaxRequestSize int64 `yaml:"max_request_size"` + + // Retry configurations + MaxRetries int `yaml:"max_retries"` + RetryDelay time.Duration `yaml:"retry_delay"` + RetryMaxDelay time.Duration `yaml:"retry_max_delay"` } func (c Config) applyDefaults() Config { if c.DuplicateWriteBackStagger == 0 { c.DuplicateWriteBackStagger = 30 * time.Minute } + if c.DownloadTimeout == 0 { + c.DownloadTimeout = 5 * time.Minute + } + if c.UploadTimeout == 0 { + c.UploadTimeout = 10 * time.Minute + } + if c.ReplicationTimeout == 0 { + c.ReplicationTimeout = 3 * time.Minute + } + if c.BackendTimeout == 0 { + c.BackendTimeout = 2 * time.Minute + } + if c.ReadinessTimeout == 0 { + c.ReadinessTimeout = 30 * time.Second + } + if c.MaxConcurrentDownloads == 0 { + c.MaxConcurrentDownloads = 10 + } + if c.MaxConcurrentUploads == 0 { + c.MaxConcurrentUploads = 5 + } + if c.MaxRequestSize == 0 { + c.MaxRequestSize = 1024 * 1024 * 1024 // 1GB + } + if c.MaxRetries == 0 { + c.MaxRetries = 3 + } + if c.RetryDelay == 0 { + c.RetryDelay = 100 * time.Millisecond + } + if c.RetryMaxDelay == 0 { + c.RetryMaxDelay = 5 * time.Second + } return c } diff --git a/origin/blobserver/server.go b/origin/blobserver/server.go index 3abd83d74..e49ca5987 100644 --- a/origin/blobserver/server.go +++ b/origin/blobserver/server.go @@ -14,9 +14,11 @@ package blobserver import ( + "context" "encoding/json" "fmt" "io" + "math/rand" "net/http" _ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux. "os" @@ -73,6 +75,21 @@ type Server struct { // a given torrent, however this requires blob server to understand the // context of the p2p client running alongside it. pctx core.PeerContext + + // Resource management + downloadSemaphore chan struct{} + uploadSemaphore chan struct{} + + // Metrics + downloadTimer tally.Timer + uploadTimer tally.Timer + replicationTimer tally.Timer + downloadCounter tally.Counter + uploadCounter tally.Counter + replicationCounter tally.Counter + errorCounter tally.Counter + timeoutCounter tally.Counter + resourceLeakCounter tally.Counter } // New initializes a new Server. @@ -98,20 +115,31 @@ func New( }) return &Server{ - config: config, - stats: stats, - clk: clk, - addr: addr, - hashRing: hashRing, - cas: cas, - clientProvider: clientProvider, - clusterProvider: clusterProvider, - backends: backends, - blobRefresher: blobRefresher, - metaInfoGenerator: metaInfoGenerator, - uploader: newUploader(cas), - writeBackManager: writeBackManager, - pctx: pctx, + config: config, + stats: stats, + clk: clk, + addr: addr, + hashRing: hashRing, + cas: cas, + clientProvider: clientProvider, + clusterProvider: clusterProvider, + backends: backends, + blobRefresher: blobRefresher, + metaInfoGenerator: metaInfoGenerator, + uploader: newUploader(cas), + writeBackManager: writeBackManager, + pctx: pctx, + downloadSemaphore: make(chan struct{}, config.MaxConcurrentDownloads), + uploadSemaphore: make(chan struct{}, config.MaxConcurrentUploads), + downloadTimer: stats.Timer("download_duration"), + uploadTimer: stats.Timer("upload_duration"), + replicationTimer: stats.Timer("replication_duration"), + downloadCounter: stats.Counter("downloads"), + uploadCounter: stats.Counter("uploads"), + replicationCounter: stats.Counter("replications"), + errorCounter: stats.Counter("errors"), + timeoutCounter: stats.Counter("timeouts"), + resourceLeakCounter: stats.Counter("resource_leaks"), }, nil } @@ -126,6 +154,8 @@ func (s *Server) Handler() http.Handler { r.Use(middleware.StatusCounter(s.stats)) r.Use(middleware.LatencyTimer(s.stats)) + r.Use(s.requestTracingMiddleware) + r.Use(s.requestValidationMiddleware) // Public endpoints: @@ -169,6 +199,51 @@ func (s *Server) Handler() http.Handler { return r } +// requestTracingMiddleware adds structured logging with request tracing +func (s *Server) requestTracingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestID := fmt.Sprintf("%d", rand.Int63()) + start := time.Now() + + // Add request ID to context for downstream handlers + ctx := context.WithValue(r.Context(), "request_id", requestID) + r = r.WithContext(ctx) + + // Add request ID to response headers for debugging + w.Header().Set("X-Request-ID", requestID) + + log.With( + "request_id", requestID, + "method", r.Method, + "path", r.URL.Path, + "remote_addr", r.RemoteAddr, + ).Info("Request started") + + defer func() { + duration := time.Since(start) + log.With( + "request_id", requestID, + "method", r.Method, + "path", r.URL.Path, + "duration_ms", duration.Milliseconds(), + ).Info("Request completed") + }() + + next.ServeHTTP(w, r) + }) +} + +// requestValidationMiddleware validates request size and other basic requirements +func (s *Server) requestValidationMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.ContentLength > s.config.MaxRequestSize { + http.Error(w, "Request too large", http.StatusRequestEntityTooLarge) + return + } + next.ServeHTTP(w, r) + }) +} + // ListenAndServe is a blocking call which runs s. func (s *Server) ListenAndServe(h http.Handler) error { log.Infof("Starting blob server on %s", s.config.Listener) @@ -181,41 +256,77 @@ func (s *Server) healthCheckHandler(w http.ResponseWriter, r *http.Request) erro } func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) error { - err := s.backends.CheckReadiness() - if err != nil { - return handler.Errorf("not ready to serve traffic: %s", err).Status(http.StatusServiceUnavailable) + ctx, cancel := context.WithTimeout(r.Context(), s.config.ReadinessTimeout) + defer cancel() + + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "readiness_check") + + done := make(chan error, 1) + go func() { + done <- s.backends.CheckReadiness() + }() + + select { + case err := <-done: + if err != nil { + logger.Errorf("Readiness check failed: %s", err) + return handler.Errorf("not ready to serve traffic: %s", err).Status(http.StatusServiceUnavailable) + } + logger.Info("Readiness check passed") + fmt.Fprintln(w, "OK") + return nil + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + logger.Error("Readiness check timed out") + return handler.Errorf("readiness check timed out").Status(http.StatusServiceUnavailable) } - fmt.Fprintln(w, "OK") - return nil } // statHandler returns blob info if it exists. func (s *Server) statHandler(w http.ResponseWriter, r *http.Request) error { + ctx, cancel := context.WithTimeout(r.Context(), s.config.BackendTimeout) + defer cancel() + + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "stat") + checkLocal, err := strconv.ParseBool(httputil.GetQueryArg(r, "local", "false")) if err != nil { + logger.Errorf("Failed to parse local parameter: %s", err) return handler.Errorf("parse arg `local` as bool: %s", err) } + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { + logger.Errorf("Failed to parse namespace parameter: %s", err) return err } + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } - bi, err := s.stat(namespace, d, checkLocal) + logger = logger.With("namespace", namespace, "digest", d.Hex(), "local", checkLocal) + logger.Info("Starting blob stat") + + bi, err := s.stat(ctx, namespace, d, checkLocal) if os.IsNotExist(err) { + logger.Info("Blob not found") return handler.ErrorStatus(http.StatusNotFound) } else if err != nil { + logger.Errorf("Blob stat failed: %s", err) return fmt.Errorf("stat: %s", err) } + w.Header().Set("Content-Length", strconv.FormatInt(bi.Size, 10)) - log.Debugf("successfully check blob %s exists", d.Hex()) + logger.With("size", bi.Size).Info("Blob stat completed successfully") return nil } -func (s *Server) stat(namespace string, d core.Digest, checkLocal bool) (*core.BlobInfo, error) { +func (s *Server) stat(ctx context.Context, namespace string, d core.Digest, checkLocal bool) (*core.BlobInfo, error) { fi, err := s.cas.GetCacheFileStat(d.Hex()) if err == nil { return core.NewBlobInfo(fi.Size()), nil @@ -225,12 +336,32 @@ func (s *Server) stat(namespace string, d core.Digest, checkLocal bool) (*core.B if err != nil { return nil, fmt.Errorf("get backend client: %s", err) } - if bi, err := client.Stat(namespace, d.Hex()); err == nil { - return bi, nil - } else if err == backenderrors.ErrBlobNotFound { - return nil, os.ErrNotExist - } else { - return nil, fmt.Errorf("backend stat: %s", err) + + done := make(chan struct { + bi *core.BlobInfo + err error + }, 1) + + go func() { + bi, err := client.Stat(namespace, d.Hex()) + done <- struct { + bi *core.BlobInfo + err error + }{bi, err} + }() + + select { + case result := <-done: + if result.err == nil { + return result.bi, nil + } else if result.err == backenderrors.ErrBlobNotFound { + return nil, os.ErrNotExist + } else { + return nil, fmt.Errorf("backend stat: %s", result.err) + } + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + return nil, fmt.Errorf("backend stat timed out: %s", ctx.Err()) } } return nil, err // os.ErrNotExist @@ -240,39 +371,94 @@ func (s *Server) stat(namespace string, d core.Digest, checkLocal bool) (*core.B } func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) error { + ctx, cancel := context.WithTimeout(r.Context(), s.config.DownloadTimeout) + defer cancel() + + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "download") + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { + logger.Errorf("Failed to parse namespace parameter: %s", err) return err } + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } - if err := s.downloadBlob(namespace, d, w); err != nil { - log.With("namespace", namespace).Errorf("Error downloading blob: %s", err) + + logger = logger.With("namespace", namespace, "digest", d.Hex()) + logger.Info("Starting blob download") + + // Acquire download semaphore + select { + case s.downloadSemaphore <- struct{}{}: + defer func() { <-s.downloadSemaphore }() + case <-ctx.Done(): + logger.Error("Download semaphore acquisition timed out") + return handler.Errorf("download queue full").Status(http.StatusServiceUnavailable) + } + + s.downloadCounter.Inc(1) + timer := s.downloadTimer.Start() + defer timer.Stop() + + if err := s.downloadBlob(ctx, namespace, d, w); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Download failed: %s", err) return err } + setOctetStreamContentType(w) + logger.Info("Download completed successfully") return nil } func (s *Server) replicateToRemoteHandler(w http.ResponseWriter, r *http.Request) error { + ctx, cancel := context.WithTimeout(r.Context(), s.config.ReplicationTimeout) + defer cancel() + + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "replicate_to_remote") + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { + logger.Errorf("Failed to parse namespace parameter: %s", err) return err } + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + remote, err := httputil.ParseParam(r, "remote") if err != nil { + logger.Errorf("Failed to parse remote parameter: %s", err) + return err + } + + logger = logger.With("namespace", namespace, "digest", d.Hex(), "remote", remote) + logger.Info("Starting remote replication") + + s.replicationCounter.Inc(1) + timer := s.replicationTimer.Start() + defer timer.Stop() + + if err := s.replicateToRemote(ctx, namespace, d, remote); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Remote replication failed: %s", err) return err } - return s.replicateToRemote(namespace, d, remote) + + logger.Info("Remote replication completed successfully") + return nil } -func (s *Server) replicateToRemote(namespace string, d core.Digest, remoteDNS string) error { +func (s *Server) replicateToRemote(ctx context.Context, namespace string, d core.Digest, remoteDNS string) error { f, err := s.cas.GetCacheFileReader(d.Hex()) if err != nil { if os.IsNotExist(err) { @@ -280,76 +466,157 @@ func (s *Server) replicateToRemote(namespace string, d core.Digest, remoteDNS st } return handler.Errorf("file store: %s", err) } - defer f.Close() + defer func() { + if closeErr := f.Close(); closeErr != nil { + s.resourceLeakCounter.Inc(1) + log.Errorf("Failed to close file reader: %s", closeErr) + } + }() remote, err := s.clusterProvider.Provide(remoteDNS) if err != nil { return handler.Errorf("remote cluster provider: %s", err) } - return remote.UploadBlob(namespace, d, f) + + done := make(chan error, 1) + go func() { + done <- remote.UploadBlob(namespace, d, f) + }() + + select { + case err := <-done: + return err + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + return handler.Errorf("remote replication timed out: %s", ctx.Err()) + } } // deleteBlobHandler deletes blob data. func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "delete") + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + + logger = logger.With("digest", d.Hex()) + logger.Info("Starting blob deletion") + if err := s.deleteBlob(d); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Blob deletion failed: %s", err) return err } + setContentLength(w, 0) w.WriteHeader(http.StatusAccepted) - log.Debugf("successfully delete blob %s", d.Hex()) + logger.Info("Blob deletion completed successfully") return nil } func (s *Server) getLocationsHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "get_locations") + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + + logger = logger.With("digest", d.Hex()) + logger.Info("Getting blob locations") + locs := s.hashRing.Locations(d) w.Header().Set("Origin-Locations", strings.Join(locs, ",")) w.WriteHeader(http.StatusOK) + + logger.With("locations", locs).Info("Blob locations retrieved successfully") return nil } // getPeerContextHandler returns the Server's peer context as JSON. func (s *Server) getPeerContextHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "get_peer_context") + + logger.Info("Getting peer context") + if err := json.NewEncoder(w).Encode(s.pctx); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to encode peer context: %s", err) return handler.Errorf("error converting peer context to json: %s", err) } + + logger.Info("Peer context retrieved successfully") return nil } func (s *Server) getMetaInfoHandler(w http.ResponseWriter, r *http.Request) error { + ctx, cancel := context.WithTimeout(r.Context(), s.config.BackendTimeout) + defer cancel() + + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "get_metainfo") + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { + logger.Errorf("Failed to parse namespace parameter: %s", err) return err } + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } - raw, err := s.getMetaInfo(namespace, d) + + logger = logger.With("namespace", namespace, "digest", d.Hex()) + logger.Info("Getting metainfo") + + raw, err := s.getMetaInfo(ctx, namespace, d) if err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to get metainfo: %s", err) return err } + w.Write(raw) + logger.Info("Metainfo retrieved successfully") return nil } func (s *Server) overwriteMetaInfoHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "overwrite_metainfo") + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + pieceLength, err := strconv.ParseInt(r.URL.Query().Get("piece_length"), 10, 64) if err != nil { + logger.Errorf("Failed to parse piece_length parameter: %s", err) return handler.Errorf("invalid piece_length argument: %s", err).Status(http.StatusBadRequest) } - return s.overwriteMetaInfo(d, pieceLength) + + logger = logger.With("digest", d.Hex(), "piece_length", pieceLength) + logger.Info("Overwriting metainfo") + + if err := s.overwriteMetaInfo(d, pieceLength); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to overwrite metainfo: %s", err) + return err + } + + logger.Info("Metainfo overwritten successfully") + return nil } // overwriteMetaInfo generates metainfo configured with pieceLength for d and @@ -360,13 +627,22 @@ func (s *Server) overwriteMetaInfo(d core.Digest, pieceLength int64) error { if err != nil { return handler.Errorf("get cache file: %s", err) } + defer func() { + if closeErr := f.Close(); closeErr != nil { + s.resourceLeakCounter.Inc(1) + log.Errorf("Failed to close file reader in overwriteMetaInfo: %s", closeErr) + } + }() + mi, err := core.NewMetaInfo(d, f, pieceLength) if err != nil { return handler.Errorf("create metainfo: %s", err) } + if _, err := s.cas.SetCacheFileMetadata(d.Hex(), metadata.NewTorrentMeta(mi)); err != nil { return handler.Errorf("set metainfo: %s", err) } + return nil } @@ -374,7 +650,7 @@ func (s *Server) overwriteMetaInfo(d core.Digest, pieceLength int64) error { // the blob from the storage backend configured for namespace will be initiated. // This download is asynchronous and getMetaInfo will immediately return a // "202 Accepted" server error. -func (s *Server) getMetaInfo(namespace string, d core.Digest) ([]byte, error) { +func (s *Server) getMetaInfo(ctx context.Context, namespace string, d core.Digest) ([]byte, error) { var tm metadata.TorrentMeta if err := s.cas.GetCacheFileMetadata(d.Hex(), &tm); os.IsNotExist(err) { return nil, s.startRemoteBlobDownload(namespace, d, true) @@ -390,13 +666,14 @@ type localReplicationHook struct { func (h *localReplicationHook) Run(d core.Digest) { timer := h.server.stats.Timer("replicate_blob").Start() + defer timer.Stop() + if err := h.server.replicateBlobLocally(d); err != nil { // Don't return error here as we only want to cache storage backend errors. log.With("blob", d.Hex()).Errorf("Error replicating remote blob: %s", err) h.server.stats.Counter("replicate_blob_errors").Inc(1) return } - timer.Stop() } func (s *Server) startRemoteBlobDownload( @@ -420,21 +697,42 @@ func (s *Server) startRemoteBlobDownload( } func (s *Server) replicateBlobLocally(d core.Digest) error { - return s.applyToReplicas(d, func(i int, client blobclient.Client) error { + ctx, cancel := context.WithTimeout(context.Background(), s.config.ReplicationTimeout) + defer cancel() + + return s.applyToReplicas(ctx, d, func(i int, client blobclient.Client) error { f, err := s.cas.GetCacheFileReader(d.Hex()) if err != nil { return fmt.Errorf("get cache reader: %s", err) } - if err := client.TransferBlob(d, f); err != nil { - return fmt.Errorf("transfer blob: %s", err) + defer func() { + if closeErr := f.Close(); closeErr != nil { + s.resourceLeakCounter.Inc(1) + log.Errorf("Failed to close file reader in replicateBlobLocally: %s", closeErr) + } + }() + + done := make(chan error, 1) + go func() { + done <- client.TransferBlob(d, f) + }() + + select { + case err := <-done: + if err != nil { + return fmt.Errorf("transfer blob: %s", err) + } + return nil + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + return fmt.Errorf("transfer blob timed out: %s", ctx.Err()) } - return nil }) } // applyToReplicas applies f to the replicas of d concurrently in random order, // not including the current origin. Passes the index of the iteration to f. -func (s *Server) applyToReplicas(d core.Digest, f func(i int, c blobclient.Client) error) error { +func (s *Server) applyToReplicas(ctx context.Context, d core.Digest, f func(i int, c blobclient.Client) error) error { replicas := stringset.FromSlice(s.hashRing.Locations(d)) replicas.Remove(s.addr) @@ -455,28 +753,57 @@ func (s *Server) applyToReplicas(d core.Digest, f func(i int, c blobclient.Clien }(i, replica) i++ } - wg.Wait() - - return errutil.Join(errs) + + // Wait for all goroutines to complete or context to be cancelled + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + return errutil.Join(errs) + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + return fmt.Errorf("replicas operation timed out: %s", ctx.Err()) + } } // downloadBlob downloads blob for d into dst. If no blob exists under d, a // download of the blob from the storage backend configured for namespace will // be initiated. This download is asynchronous and downloadBlob will immediately // return a "202 Accepted" handler error. -func (s *Server) downloadBlob(namespace string, d core.Digest, dst io.Writer) error { +func (s *Server) downloadBlob(ctx context.Context, namespace string, d core.Digest, dst io.Writer) error { f, err := s.cas.GetCacheFileReader(d.Hex()) if os.IsNotExist(err) { return s.startRemoteBlobDownload(namespace, d, true) } else if err != nil { return handler.Errorf("get cache file: %s", err) } - defer f.Close() - - if _, err := io.Copy(dst, f); err != nil { - return handler.Errorf("copy blob: %s", err) + defer func() { + if closeErr := f.Close(); closeErr != nil { + s.resourceLeakCounter.Inc(1) + log.Errorf("Failed to close file reader in downloadBlob: %s", closeErr) + } + }() + + done := make(chan error, 1) + go func() { + _, err := io.Copy(dst, f) + done <- err + }() + + select { + case err := <-done: + if err != nil { + return handler.Errorf("copy blob: %s", err) + } + return nil + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + return handler.Errorf("download blob timed out: %s", ctx.Err()) } - return nil } func (s *Server) deleteBlob(d core.Digest) error { @@ -491,39 +818,74 @@ func (s *Server) deleteBlob(d core.Digest) error { // startTransferHandler initializes an upload for internal blob transfers. func (s *Server) startTransferHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "start_transfer") + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + + logger = logger.With("digest", d.Hex()) + logger.Info("Starting internal transfer") + if ok, err := blobExists(s.cas, d); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to check blob existence: %s", err) return handler.Errorf("check blob: %s", err) } else if ok { + logger.Info("Blob already exists") return handler.ErrorStatus(http.StatusConflict) } + uid, err := s.uploader.start(d) if err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to start upload: %s", err) return err } + setUploadLocation(w, uid) w.WriteHeader(http.StatusOK) + logger.With("upload_id", uid).Info("Internal transfer started successfully") return nil } // patchTransferHandler uploads a chunk of a blob for internal uploads. func (s *Server) patchTransferHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "patch_transfer") + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + uid, err := httputil.ParseParam(r, "uid") if err != nil { + logger.Errorf("Failed to parse uid parameter: %s", err) return err } + start, end, err := parseContentRange(r.Header) if err != nil { + logger.Errorf("Failed to parse content range: %s", err) + return err + } + + logger = logger.With("digest", d.Hex(), "upload_id", uid, "start", start, "end", end) + logger.Info("Patching internal transfer") + + if err := s.uploader.patch(d, uid, r.Body, start, end); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to patch transfer: %s", err) return err } - return s.uploader.patch(d, uid, r.Body, start, end) + + logger.Info("Internal transfer patched successfully") + return nil } // commitTransferHandler commits the upload of an internal blob transfer. @@ -759,3 +1121,74 @@ func (s *Server) maybeDelete(name string, ttl time.Duration) (deleted bool, err } return false, nil } + +// getRequestID extracts the request ID from the request context +func (s *Server) getRequestID(r *http.Request) string { + if id, ok := r.Context().Value("request_id").(string); ok { + return id + } + return "unknown" +} + +// setOctetStreamContentType sets the content type to application/octet-stream +func setOctetStreamContentType(w http.ResponseWriter) { + w.Header().Set("Content-Type", "application/octet-stream") +} + +// setContentLength sets the content length header +func setContentLength(w http.ResponseWriter, length int) { + w.Header().Set("Content-Length", strconv.Itoa(length)) +} + +// setUploadLocation sets the upload location header +func setUploadLocation(w http.ResponseWriter, uid string) { + w.Header().Set("Location", uid) +} + +// blobExists checks if a blob exists in the cache +func blobExists(cas *store.CAStore, d core.Digest) (bool, error) { + _, err := cas.GetCacheFileStat(d.Hex()) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err +} + +// parseContentRange parses the content range header +func parseContentRange(headers http.Header) (start, end int64, err error) { + rangeHeader := headers.Get("Content-Range") + if rangeHeader == "" { + return 0, 0, fmt.Errorf("missing Content-Range header") + } + + // Parse "bytes start-end/total" format + parts := strings.Split(rangeHeader, " ") + if len(parts) != 2 || parts[0] != "bytes" { + return 0, 0, fmt.Errorf("invalid Content-Range format") + } + + rangeParts := strings.Split(parts[1], "/") + if len(rangeParts) != 2 { + return 0, 0, fmt.Errorf("invalid Content-Range format") + } + + startEndParts := strings.Split(rangeParts[0], "-") + if len(startEndParts) != 2 { + return 0, 0, fmt.Errorf("invalid Content-Range format") + } + + start, err = strconv.ParseInt(startEndParts[0], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("invalid start range: %s", err) + } + + end, err = strconv.ParseInt(startEndParts[1], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("invalid end range: %s", err) + } + + return start, end, nil +} From 466c24f2a4086a4d5f125603ce12abcabf1c473f Mon Sep 17 00:00:00 2001 From: hweawer Date: Wed, 16 Jul 2025 09:54:44 +0200 Subject: [PATCH 2/3] Refactor cmd --- origin/blobserver/server.go | 63 +------------ origin/cmd/cmd.go | 180 ++++++++++++++++++++++++++++++------ 2 files changed, 154 insertions(+), 89 deletions(-) diff --git a/origin/blobserver/server.go b/origin/blobserver/server.go index e49ca5987..ef9f35ebf 100644 --- a/origin/blobserver/server.go +++ b/origin/blobserver/server.go @@ -987,7 +987,7 @@ func (s *Server) commitClusterUploadHandler(w http.ResponseWriter, r *http.Reque if err := s.writeBack(namespace, d, 0); err != nil { return err } - err = s.applyToReplicas(d, func(i int, client blobclient.Client) error { + err = s.applyToReplicas(r.Context(), d, func(i int, client blobclient.Client) error { delay := s.config.DuplicateWriteBackStagger * time.Duration(i+1) f, err := s.cas.GetCacheFileReader(d.Hex()) if err != nil { @@ -1130,65 +1130,4 @@ func (s *Server) getRequestID(r *http.Request) string { return "unknown" } -// setOctetStreamContentType sets the content type to application/octet-stream -func setOctetStreamContentType(w http.ResponseWriter) { - w.Header().Set("Content-Type", "application/octet-stream") -} - -// setContentLength sets the content length header -func setContentLength(w http.ResponseWriter, length int) { - w.Header().Set("Content-Length", strconv.Itoa(length)) -} - -// setUploadLocation sets the upload location header -func setUploadLocation(w http.ResponseWriter, uid string) { - w.Header().Set("Location", uid) -} - -// blobExists checks if a blob exists in the cache -func blobExists(cas *store.CAStore, d core.Digest) (bool, error) { - _, err := cas.GetCacheFileStat(d.Hex()) - if err == nil { - return true, nil - } - if os.IsNotExist(err) { - return false, nil - } - return false, err -} -// parseContentRange parses the content range header -func parseContentRange(headers http.Header) (start, end int64, err error) { - rangeHeader := headers.Get("Content-Range") - if rangeHeader == "" { - return 0, 0, fmt.Errorf("missing Content-Range header") - } - - // Parse "bytes start-end/total" format - parts := strings.Split(rangeHeader, " ") - if len(parts) != 2 || parts[0] != "bytes" { - return 0, 0, fmt.Errorf("invalid Content-Range format") - } - - rangeParts := strings.Split(parts[1], "/") - if len(rangeParts) != 2 { - return 0, 0, fmt.Errorf("invalid Content-Range format") - } - - startEndParts := strings.Split(rangeParts[0], "-") - if len(startEndParts) != 2 { - return 0, 0, fmt.Errorf("invalid Content-Range format") - } - - start, err = strconv.ParseInt(startEndParts[0], 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("invalid start range: %s", err) - } - - end, err = strconv.ParseInt(startEndParts[1], 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("invalid end range: %s", err) - } - - return start, end, nil -} diff --git a/origin/cmd/cmd.go b/origin/cmd/cmd.go index cb0c3f80e..436d26fba 100644 --- a/origin/cmd/cmd.go +++ b/origin/cmd/cmd.go @@ -14,6 +14,7 @@ package cmd import ( + "crypto/tls" "encoding/json" "flag" "fmt" @@ -44,6 +45,7 @@ import ( "github.com/andres-erbsen/clock" "github.com/go-chi/chi" + "github.com/jmoiron/sqlx" "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -110,18 +112,43 @@ func WithLogger(l *zap.Logger) Option { // Run runs the origin. func Run(flags *Flags, opts ...Option) { + validateFlags(flags) + + var overrides options + for _, o := range opts { + o(&overrides) + } + + config := setupConfiguration(flags, &overrides) + logger := setupLogging(config, &overrides) + defer func() { + if logger != nil { + logger.Sync() + } + }() + + stats, statsCloser := setupMetrics(config, flags, &overrides) + defer statsCloser() + + hostname := setupHostname(flags) + peerIP := setupPeerIP(flags) + + components := setupCoreComponents(config, flags, hostname, peerIP, stats) + server := setupBlobServer(config, flags, hostname, components, stats) + + startServices(config, flags, server, components.scheduler) +} + +func validateFlags(flags *Flags) { if flags.PeerPort == 0 { panic("must specify non-zero peer port") } if flags.BlobServerPort == 0 { panic("must specify non-zero blob server port") } +} - var overrides options - for _, o := range opts { - o(&overrides) - } - +func setupConfiguration(flags *Flags, overrides *options) Config { var config Config if overrides.config != nil { config = *overrides.config @@ -135,26 +162,34 @@ func Run(flags *Flags, opts ...Option) { } } } + return config +} +func setupLogging(config Config, overrides *options) *zap.Logger { if overrides.logger != nil { log.SetGlobalLogger(overrides.logger.Sugar()) + return overrides.logger } else { zlog := log.ConfigureLogger(config.ZapLogging) - defer zlog.Sync() + return zlog.Desugar() } +} - stats := overrides.metrics - if stats == nil { - s, closer, err := metrics.New(config.Metrics, flags.KrakenCluster) - if err != nil { - log.Fatalf("Failed to init metrics: %s", err) - } - stats = s - defer closer.Close() +func setupMetrics(config Config, flags *Flags, overrides *options) (tally.Scope, func()) { + if overrides.metrics != nil { + return overrides.metrics, func() {} } - go metrics.EmitVersion(stats) + s, closer, err := metrics.New(config.Metrics, flags.KrakenCluster) + if err != nil { + log.Fatalf("Failed to init metrics: %s", err) + } + go metrics.EmitVersion(s) + return s, func() { closer.Close() } +} + +func setupHostname(flags *Flags) string { var hostname string if flags.BlobServerHostName == "" { var err error @@ -166,36 +201,96 @@ func Run(flags *Flags, opts ...Option) { hostname = flags.BlobServerHostName } log.Infof("Configuring origin with hostname '%s'", hostname) + return hostname +} +func setupPeerIP(flags *Flags) string { if flags.PeerIP == "" { localIP, err := netutil.GetLocalIP() if err != nil { log.Fatalf("Error getting local ip: %s", err) } - flags.PeerIP = localIP + return localIP } + return flags.PeerIP +} +type coreComponents struct { + cas *store.CAStore + pctx core.PeerContext + backendManager *backend.Manager + writeBackManager persistedretry.Manager + metaInfoGen *metainfogen.Generator + blobRefresher *blobrefresh.Refresher + scheduler scheduler.ReloadableScheduler + hashRing hashring.Ring + tls *tls.Config +} + +func setupCoreComponents(config Config, flags *Flags, hostname, peerIP string, stats tally.Scope) *coreComponents { + cas := setupCAStore(config, stats) + pctx := setupPeerContext(config, flags, peerIP) + backendManager := setupBackendManager(config, stats) + + localDB := setupLocalDB(config) + writeBackManager := setupWriteBackManager(config, stats, cas, backendManager, localDB) + metaInfoGen := setupMetaInfoGenerator(config, cas) + blobRefresher := setupBlobRefresher(config, stats, cas, backendManager, metaInfoGen) + + netevents := setupNetworkEvents(config) + schedulerInstance := setupScheduler(config, stats, pctx, cas, netevents, blobRefresher) + + cluster := setupCluster(config) + tlsConfig := setupTLS(config) + hashRing := setupHashRing(config, flags, hostname, cluster, tlsConfig, backendManager) + + return &coreComponents{ + cas: cas, + pctx: pctx, + backendManager: backendManager, + writeBackManager: writeBackManager, + metaInfoGen: metaInfoGen, + blobRefresher: blobRefresher, + scheduler: schedulerInstance, + hashRing: hashRing, + tls: tlsConfig, + } +} + +func setupCAStore(config Config, stats tally.Scope) *store.CAStore { cas, err := store.NewCAStore(config.CAStore, stats) if err != nil { log.Fatalf("Failed to create castore: %s", err) } + return cas +} +func setupPeerContext(config Config, flags *Flags, peerIP string) core.PeerContext { pctx, err := core.NewPeerContext( - config.PeerIDFactory, flags.Zone, flags.KrakenCluster, flags.PeerIP, flags.PeerPort, true) + config.PeerIDFactory, flags.Zone, flags.KrakenCluster, peerIP, flags.PeerPort, true) if err != nil { log.Fatalf("Failed to create peer context: %s", err) } + return pctx +} +func setupBackendManager(config Config, stats tally.Scope) *backend.Manager { backendManager, err := backend.NewManager(config.BackendManager, config.Backends, config.Auth, stats) if err != nil { log.Fatalf("Error creating backend manager: %s", err) } + return backendManager +} +func setupLocalDB(config Config) *sqlx.DB { localDB, err := localdb.New(config.LocalDB) if err != nil { log.Fatalf("Error creating local db: %s", err) } + return localDB +} +func setupWriteBackManager(config Config, stats tally.Scope, cas *store.CAStore, backendManager *backend.Manager, localDB *sqlx.DB) persistedretry.Manager { writeBackManager, err := persistedretry.NewManager( config.WriteBack, stats, @@ -204,35 +299,55 @@ func Run(flags *Flags, opts ...Option) { if err != nil { log.Fatalf("Error creating write-back manager: %s", err) } + return writeBackManager +} +func setupMetaInfoGenerator(config Config, cas *store.CAStore) *metainfogen.Generator { metaInfoGenerator, err := metainfogen.New(config.MetaInfoGen, cas) if err != nil { log.Fatalf("Error creating metainfo generator: %s", err) } + return metaInfoGenerator +} - blobRefresher := blobrefresh.New(config.BlobRefresh, stats, cas, backendManager, metaInfoGenerator) +func setupBlobRefresher(config Config, stats tally.Scope, cas *store.CAStore, backendManager *backend.Manager, metaInfoGen *metainfogen.Generator) *blobrefresh.Refresher { + return blobrefresh.New(config.BlobRefresh, stats, cas, backendManager, metaInfoGen) +} +func setupNetworkEvents(config Config) networkevent.Producer { netevents, err := networkevent.NewProducer(config.NetworkEvent) if err != nil { log.Fatalf("Error creating network event producer: %s", err) } + return netevents +} +func setupScheduler(config Config, stats tally.Scope, pctx core.PeerContext, cas *store.CAStore, netevents networkevent.Producer, blobRefresher *blobrefresh.Refresher) scheduler.ReloadableScheduler { sched, err := scheduler.NewOriginScheduler( config.Scheduler, stats, pctx, cas, netevents, blobRefresher) if err != nil { log.Fatalf("Error creating scheduler: %s", err) } + return sched +} +func setupCluster(config Config) hostlist.List { cluster, err := hostlist.New(config.Cluster) if err != nil { log.Fatalf("Error creating cluster host list: %s", err) } + return cluster +} +func setupTLS(config Config) *tls.Config { tls, err := config.TLS.BuildClient() if err != nil { log.Fatalf("Error building client tls config: %s", err) } + return tls +} +func setupHashRing(config Config, flags *Flags, hostname string, cluster hostlist.List, tls *tls.Config, backendManager *backend.Manager) hashring.Ring { healthCheckFilter := healthcheck.NewFilter(config.HealthCheck, healthcheck.Default(tls)) hashRing := hashring.New( @@ -242,6 +357,7 @@ func Run(flags *Flags, opts ...Option) { hashring.WithWatcher(backend.NewBandwidthWatcher(backendManager))) go hashRing.Monitor(nil) + // Validate that this origin is in the hash ring addr := fmt.Sprintf("%s:%d", hostname, flags.BlobServerPort) if !hashRing.Contains(addr) { // When DNS is used for hash ring membership, the members will be IP @@ -258,24 +374,34 @@ func Run(flags *Flags, opts ...Option) { } } + return hashRing +} + +func setupBlobServer(config Config, flags *Flags, hostname string, components *coreComponents, stats tally.Scope) *blobserver.Server { + addr := fmt.Sprintf("%s:%d", hostname, flags.BlobServerPort) + server, err := blobserver.New( config.BlobServer, stats, clock.New(), addr, - hashRing, - cas, - blobclient.NewProvider(blobclient.WithTLS(tls)), - blobclient.NewClusterProvider(blobclient.WithTLS(tls)), - pctx, - backendManager, - blobRefresher, - metaInfoGenerator, - writeBackManager) + components.hashRing, + components.cas, + blobclient.NewProvider(blobclient.WithTLS(components.tls)), + blobclient.NewClusterProvider(blobclient.WithTLS(components.tls)), + components.pctx, + components.backendManager, + components.blobRefresher, + components.metaInfoGen, + components.writeBackManager) if err != nil { log.Fatalf("Error initializing blob server: %s", err) } + return server +} + +func startServices(config Config, flags *Flags, server *blobserver.Server, sched scheduler.ReloadableScheduler) { h := addTorrentDebugEndpoints(server.Handler(), sched) go func() { log.Fatal(server.ListenAndServe(h)) }() From 28f4b6f1d42442025bd51f15a708ff40cf08ac4e Mon Sep 17 00:00:00 2001 From: hweawer Date: Wed, 16 Jul 2025 11:11:23 +0200 Subject: [PATCH 3/3] Fix --- origin/blobserver/config.go | 28 +++-- origin/blobserver/server.go | 198 +++++++++++++++++------------------- 2 files changed, 103 insertions(+), 123 deletions(-) diff --git a/origin/blobserver/config.go b/origin/blobserver/config.go index ea0306dd9..76c5539f2 100644 --- a/origin/blobserver/config.go +++ b/origin/blobserver/config.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -23,19 +23,18 @@ import ( type Config struct { Listener listener.Config `yaml:"listener"` DuplicateWriteBackStagger time.Duration `yaml:"duplicate_write_back_stagger"` - + // Timeout configurations - DownloadTimeout time.Duration `yaml:"download_timeout"` - UploadTimeout time.Duration `yaml:"upload_timeout"` - ReplicationTimeout time.Duration `yaml:"replication_timeout"` - BackendTimeout time.Duration `yaml:"backend_timeout"` - ReadinessTimeout time.Duration `yaml:"readiness_timeout"` - + DownloadTimeout time.Duration `yaml:"download_timeout"` + UploadTimeout time.Duration `yaml:"upload_timeout"` + ReplicationTimeout time.Duration `yaml:"replication_timeout"` + BackendTimeout time.Duration `yaml:"backend_timeout"` + ReadinessTimeout time.Duration `yaml:"readiness_timeout"` + // Limit configurations MaxConcurrentDownloads int `yaml:"max_concurrent_downloads"` MaxConcurrentUploads int `yaml:"max_concurrent_uploads"` - MaxRequestSize int64 `yaml:"max_request_size"` - + // Retry configurations MaxRetries int `yaml:"max_retries"` RetryDelay time.Duration `yaml:"retry_delay"` @@ -53,7 +52,7 @@ func (c Config) applyDefaults() Config { c.UploadTimeout = 10 * time.Minute } if c.ReplicationTimeout == 0 { - c.ReplicationTimeout = 3 * time.Minute + c.ReplicationTimeout = 5 * time.Minute } if c.BackendTimeout == 0 { c.BackendTimeout = 2 * time.Minute @@ -62,13 +61,10 @@ func (c Config) applyDefaults() Config { c.ReadinessTimeout = 30 * time.Second } if c.MaxConcurrentDownloads == 0 { - c.MaxConcurrentDownloads = 10 + c.MaxConcurrentDownloads = 20 } if c.MaxConcurrentUploads == 0 { - c.MaxConcurrentUploads = 5 - } - if c.MaxRequestSize == 0 { - c.MaxRequestSize = 1024 * 1024 * 1024 // 1GB + c.MaxConcurrentUploads = 10 } if c.MaxRetries == 0 { c.MaxRetries = 3 diff --git a/origin/blobserver/server.go b/origin/blobserver/server.go index ef9f35ebf..e87571ccb 100644 --- a/origin/blobserver/server.go +++ b/origin/blobserver/server.go @@ -44,7 +44,6 @@ import ( "github.com/uber/kraken/utils/httputil" "github.com/uber/kraken/utils/listener" "github.com/uber/kraken/utils/log" - "github.com/uber/kraken/utils/memsize" "github.com/uber/kraken/utils/stringset" "github.com/andres-erbsen/clock" @@ -52,8 +51,6 @@ import ( "github.com/uber-go/tally" ) -const _uploadChunkSize = 16 * memsize.MB - // Server defines a server that serves blob data for agent. type Server struct { config Config @@ -75,21 +72,21 @@ type Server struct { // a given torrent, however this requires blob server to understand the // context of the p2p client running alongside it. pctx core.PeerContext - + // Resource management downloadSemaphore chan struct{} uploadSemaphore chan struct{} - + // Metrics - downloadTimer tally.Timer - uploadTimer tally.Timer - replicationTimer tally.Timer - downloadCounter tally.Counter - uploadCounter tally.Counter - replicationCounter tally.Counter - errorCounter tally.Counter - timeoutCounter tally.Counter - resourceLeakCounter tally.Counter + downloadTimer tally.Timer + uploadTimer tally.Timer + replicationTimer tally.Timer + downloadCounter tally.Counter + uploadCounter tally.Counter + replicationCounter tally.Counter + errorCounter tally.Counter + timeoutCounter tally.Counter + resourceLeakCounter tally.Counter } // New initializes a new Server. @@ -155,7 +152,6 @@ func (s *Server) Handler() http.Handler { r.Use(middleware.StatusCounter(s.stats)) r.Use(middleware.LatencyTimer(s.stats)) r.Use(s.requestTracingMiddleware) - r.Use(s.requestValidationMiddleware) // Public endpoints: @@ -204,21 +200,21 @@ func (s *Server) requestTracingMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { requestID := fmt.Sprintf("%d", rand.Int63()) start := time.Now() - + // Add request ID to context for downstream handlers ctx := context.WithValue(r.Context(), "request_id", requestID) r = r.WithContext(ctx) - + // Add request ID to response headers for debugging w.Header().Set("X-Request-ID", requestID) - + log.With( "request_id", requestID, "method", r.Method, "path", r.URL.Path, "remote_addr", r.RemoteAddr, ).Info("Request started") - + defer func() { duration := time.Since(start) log.With( @@ -228,18 +224,7 @@ func (s *Server) requestTracingMiddleware(next http.Handler) http.Handler { "duration_ms", duration.Milliseconds(), ).Info("Request completed") }() - - next.ServeHTTP(w, r) - }) -} -// requestValidationMiddleware validates request size and other basic requirements -func (s *Server) requestValidationMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.ContentLength > s.config.MaxRequestSize { - http.Error(w, "Request too large", http.StatusRequestEntityTooLarge) - return - } next.ServeHTTP(w, r) }) } @@ -258,15 +243,15 @@ func (s *Server) healthCheckHandler(w http.ResponseWriter, r *http.Request) erro func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) error { ctx, cancel := context.WithTimeout(r.Context(), s.config.ReadinessTimeout) defer cancel() - + requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "readiness_check") - + done := make(chan error, 1) go func() { done <- s.backends.CheckReadiness() }() - + select { case err := <-done: if err != nil { @@ -287,22 +272,22 @@ func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) e func (s *Server) statHandler(w http.ResponseWriter, r *http.Request) error { ctx, cancel := context.WithTimeout(r.Context(), s.config.BackendTimeout) defer cancel() - + requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "stat") - + checkLocal, err := strconv.ParseBool(httputil.GetQueryArg(r, "local", "false")) if err != nil { logger.Errorf("Failed to parse local parameter: %s", err) return handler.Errorf("parse arg `local` as bool: %s", err) } - + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { logger.Errorf("Failed to parse namespace parameter: %s", err) return err } - + d, err := httputil.ParseDigest(r, "digest") if err != nil { logger.Errorf("Failed to parse digest parameter: %s", err) @@ -320,7 +305,7 @@ func (s *Server) statHandler(w http.ResponseWriter, r *http.Request) error { logger.Errorf("Blob stat failed: %s", err) return fmt.Errorf("stat: %s", err) } - + w.Header().Set("Content-Length", strconv.FormatInt(bi.Size, 10)) logger.With("size", bi.Size).Info("Blob stat completed successfully") return nil @@ -330,26 +315,27 @@ func (s *Server) stat(ctx context.Context, namespace string, d core.Digest, chec fi, err := s.cas.GetCacheFileStat(d.Hex()) if err == nil { return core.NewBlobInfo(fi.Size()), nil - } else if os.IsNotExist(err) { + } + if os.IsNotExist(err) { if !checkLocal { client, err := s.backends.GetClient(namespace) if err != nil { return nil, fmt.Errorf("get backend client: %s", err) } - + done := make(chan struct { - bi *core.BlobInfo + bi *core.BlobInfo err error }, 1) - + go func() { bi, err := client.Stat(namespace, d.Hex()) done <- struct { - bi *core.BlobInfo + bi *core.BlobInfo err error }{bi, err} }() - + select { case result := <-done: if result.err == nil { @@ -373,25 +359,25 @@ func (s *Server) stat(ctx context.Context, namespace string, d core.Digest, chec func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) error { ctx, cancel := context.WithTimeout(r.Context(), s.config.DownloadTimeout) defer cancel() - + requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "download") - + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { logger.Errorf("Failed to parse namespace parameter: %s", err) return err } - + d, err := httputil.ParseDigest(r, "digest") if err != nil { logger.Errorf("Failed to parse digest parameter: %s", err) return err } - + logger = logger.With("namespace", namespace, "digest", d.Hex()) logger.Info("Starting blob download") - + // Acquire download semaphore select { case s.downloadSemaphore <- struct{}{}: @@ -400,17 +386,17 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err logger.Error("Download semaphore acquisition timed out") return handler.Errorf("download queue full").Status(http.StatusServiceUnavailable) } - + s.downloadCounter.Inc(1) timer := s.downloadTimer.Start() defer timer.Stop() - + if err := s.downloadBlob(ctx, namespace, d, w); err != nil { s.errorCounter.Inc(1) logger.Errorf("Download failed: %s", err) return err } - + setOctetStreamContentType(w) logger.Info("Download completed successfully") return nil @@ -419,41 +405,41 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err func (s *Server) replicateToRemoteHandler(w http.ResponseWriter, r *http.Request) error { ctx, cancel := context.WithTimeout(r.Context(), s.config.ReplicationTimeout) defer cancel() - + requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "replicate_to_remote") - + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { logger.Errorf("Failed to parse namespace parameter: %s", err) return err } - + d, err := httputil.ParseDigest(r, "digest") if err != nil { logger.Errorf("Failed to parse digest parameter: %s", err) return err } - + remote, err := httputil.ParseParam(r, "remote") if err != nil { logger.Errorf("Failed to parse remote parameter: %s", err) return err } - + logger = logger.With("namespace", namespace, "digest", d.Hex(), "remote", remote) logger.Info("Starting remote replication") - + s.replicationCounter.Inc(1) timer := s.replicationTimer.Start() defer timer.Stop() - + if err := s.replicateToRemote(ctx, namespace, d, remote); err != nil { s.errorCounter.Inc(1) logger.Errorf("Remote replication failed: %s", err) return err } - + logger.Info("Remote replication completed successfully") return nil } @@ -477,12 +463,12 @@ func (s *Server) replicateToRemote(ctx context.Context, namespace string, d core if err != nil { return handler.Errorf("remote cluster provider: %s", err) } - + done := make(chan error, 1) go func() { done <- remote.UploadBlob(namespace, d, f) }() - + select { case err := <-done: return err @@ -496,22 +482,22 @@ func (s *Server) replicateToRemote(ctx context.Context, namespace string, d core func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error { requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "delete") - + d, err := httputil.ParseDigest(r, "digest") if err != nil { logger.Errorf("Failed to parse digest parameter: %s", err) return err } - + logger = logger.With("digest", d.Hex()) logger.Info("Starting blob deletion") - + if err := s.deleteBlob(d); err != nil { s.errorCounter.Inc(1) logger.Errorf("Blob deletion failed: %s", err) return err } - + setContentLength(w, 0) w.WriteHeader(http.StatusAccepted) logger.Info("Blob deletion completed successfully") @@ -521,20 +507,20 @@ func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error func (s *Server) getLocationsHandler(w http.ResponseWriter, r *http.Request) error { requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "get_locations") - + d, err := httputil.ParseDigest(r, "digest") if err != nil { logger.Errorf("Failed to parse digest parameter: %s", err) return err } - + logger = logger.With("digest", d.Hex()) logger.Info("Getting blob locations") - + locs := s.hashRing.Locations(d) w.Header().Set("Origin-Locations", strings.Join(locs, ",")) w.WriteHeader(http.StatusOK) - + logger.With("locations", locs).Info("Blob locations retrieved successfully") return nil } @@ -543,15 +529,15 @@ func (s *Server) getLocationsHandler(w http.ResponseWriter, r *http.Request) err func (s *Server) getPeerContextHandler(w http.ResponseWriter, r *http.Request) error { requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "get_peer_context") - + logger.Info("Getting peer context") - + if err := json.NewEncoder(w).Encode(s.pctx); err != nil { s.errorCounter.Inc(1) logger.Errorf("Failed to encode peer context: %s", err) return handler.Errorf("error converting peer context to json: %s", err) } - + logger.Info("Peer context retrieved successfully") return nil } @@ -559,32 +545,32 @@ func (s *Server) getPeerContextHandler(w http.ResponseWriter, r *http.Request) e func (s *Server) getMetaInfoHandler(w http.ResponseWriter, r *http.Request) error { ctx, cancel := context.WithTimeout(r.Context(), s.config.BackendTimeout) defer cancel() - + requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "get_metainfo") - + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { logger.Errorf("Failed to parse namespace parameter: %s", err) return err } - + d, err := httputil.ParseDigest(r, "digest") if err != nil { logger.Errorf("Failed to parse digest parameter: %s", err) return err } - + logger = logger.With("namespace", namespace, "digest", d.Hex()) logger.Info("Getting metainfo") - + raw, err := s.getMetaInfo(ctx, namespace, d) if err != nil { s.errorCounter.Inc(1) logger.Errorf("Failed to get metainfo: %s", err) return err } - + w.Write(raw) logger.Info("Metainfo retrieved successfully") return nil @@ -593,28 +579,28 @@ func (s *Server) getMetaInfoHandler(w http.ResponseWriter, r *http.Request) erro func (s *Server) overwriteMetaInfoHandler(w http.ResponseWriter, r *http.Request) error { requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "overwrite_metainfo") - + d, err := httputil.ParseDigest(r, "digest") if err != nil { logger.Errorf("Failed to parse digest parameter: %s", err) return err } - + pieceLength, err := strconv.ParseInt(r.URL.Query().Get("piece_length"), 10, 64) if err != nil { logger.Errorf("Failed to parse piece_length parameter: %s", err) return handler.Errorf("invalid piece_length argument: %s", err).Status(http.StatusBadRequest) } - + logger = logger.With("digest", d.Hex(), "piece_length", pieceLength) logger.Info("Overwriting metainfo") - + if err := s.overwriteMetaInfo(d, pieceLength); err != nil { s.errorCounter.Inc(1) logger.Errorf("Failed to overwrite metainfo: %s", err) return err } - + logger.Info("Metainfo overwritten successfully") return nil } @@ -633,16 +619,16 @@ func (s *Server) overwriteMetaInfo(d core.Digest, pieceLength int64) error { log.Errorf("Failed to close file reader in overwriteMetaInfo: %s", closeErr) } }() - + mi, err := core.NewMetaInfo(d, f, pieceLength) if err != nil { return handler.Errorf("create metainfo: %s", err) } - + if _, err := s.cas.SetCacheFileMetadata(d.Hex(), metadata.NewTorrentMeta(mi)); err != nil { return handler.Errorf("set metainfo: %s", err) } - + return nil } @@ -667,7 +653,7 @@ type localReplicationHook struct { func (h *localReplicationHook) Run(d core.Digest) { timer := h.server.stats.Timer("replicate_blob").Start() defer timer.Stop() - + if err := h.server.replicateBlobLocally(d); err != nil { // Don't return error here as we only want to cache storage backend errors. log.With("blob", d.Hex()).Errorf("Error replicating remote blob: %s", err) @@ -699,7 +685,7 @@ func (s *Server) startRemoteBlobDownload( func (s *Server) replicateBlobLocally(d core.Digest) error { ctx, cancel := context.WithTimeout(context.Background(), s.config.ReplicationTimeout) defer cancel() - + return s.applyToReplicas(ctx, d, func(i int, client blobclient.Client) error { f, err := s.cas.GetCacheFileReader(d.Hex()) if err != nil { @@ -711,12 +697,12 @@ func (s *Server) replicateBlobLocally(d core.Digest) error { log.Errorf("Failed to close file reader in replicateBlobLocally: %s", closeErr) } }() - + done := make(chan error, 1) go func() { done <- client.TransferBlob(d, f) }() - + select { case err := <-done: if err != nil { @@ -753,14 +739,14 @@ func (s *Server) applyToReplicas(ctx context.Context, d core.Digest, f func(i in }(i, replica) i++ } - + // Wait for all goroutines to complete or context to be cancelled done := make(chan struct{}) go func() { wg.Wait() close(done) }() - + select { case <-done: return errutil.Join(errs) @@ -793,7 +779,7 @@ func (s *Server) downloadBlob(ctx context.Context, namespace string, d core.Dige _, err := io.Copy(dst, f) done <- err }() - + select { case err := <-done: if err != nil { @@ -820,16 +806,16 @@ func (s *Server) deleteBlob(d core.Digest) error { func (s *Server) startTransferHandler(w http.ResponseWriter, r *http.Request) error { requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "start_transfer") - + d, err := httputil.ParseDigest(r, "digest") if err != nil { logger.Errorf("Failed to parse digest parameter: %s", err) return err } - + logger = logger.With("digest", d.Hex()) logger.Info("Starting internal transfer") - + if ok, err := blobExists(s.cas, d); err != nil { s.errorCounter.Inc(1) logger.Errorf("Failed to check blob existence: %s", err) @@ -838,14 +824,14 @@ func (s *Server) startTransferHandler(w http.ResponseWriter, r *http.Request) er logger.Info("Blob already exists") return handler.ErrorStatus(http.StatusConflict) } - + uid, err := s.uploader.start(d) if err != nil { s.errorCounter.Inc(1) logger.Errorf("Failed to start upload: %s", err) return err } - + setUploadLocation(w, uid) w.WriteHeader(http.StatusOK) logger.With("upload_id", uid).Info("Internal transfer started successfully") @@ -856,34 +842,34 @@ func (s *Server) startTransferHandler(w http.ResponseWriter, r *http.Request) er func (s *Server) patchTransferHandler(w http.ResponseWriter, r *http.Request) error { requestID := s.getRequestID(r) logger := log.With("request_id", requestID, "operation", "patch_transfer") - + d, err := httputil.ParseDigest(r, "digest") if err != nil { logger.Errorf("Failed to parse digest parameter: %s", err) return err } - + uid, err := httputil.ParseParam(r, "uid") if err != nil { logger.Errorf("Failed to parse uid parameter: %s", err) return err } - + start, end, err := parseContentRange(r.Header) if err != nil { logger.Errorf("Failed to parse content range: %s", err) return err } - + logger = logger.With("digest", d.Hex(), "upload_id", uid, "start", start, "end", end) logger.Info("Patching internal transfer") - + if err := s.uploader.patch(d, uid, r.Body, start, end); err != nil { s.errorCounter.Inc(1) logger.Errorf("Failed to patch transfer: %s", err) return err } - + logger.Info("Internal transfer patched successfully") return nil } @@ -1129,5 +1115,3 @@ func (s *Server) getRequestID(r *http.Request) string { } return "unknown" } - -