From c4b9375bd5808994e99d897945351b6e9e860222 Mon Sep 17 00:00:00 2001 From: Lawrence Enehizena Date: Wed, 7 Dec 2022 21:26:54 +0100 Subject: [PATCH 1/8] abstract out reusable util functions --- api/v1/api.go | 100 ++++++++++++++++++++++----------------------- api/v1/handlers.go | 66 +++++++----------------------- api/v1/pinning.go | 2 +- util/auth.go | 16 ++++++++ util/content.go | 35 ++++++++++++++++ 5 files changed, 116 insertions(+), 103 deletions(-) diff --git a/api/v1/api.go b/api/v1/api.go index 7b6adacc..7741dda7 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -86,61 +86,61 @@ func (s *apiV1) RegisterRoutes(e *echo.Echo) { e.POST("/register", s.handleRegisterUser) e.POST("/login", s.handleLoginUser) e.GET("/health", s.handleHealth) - e.GET("/viewer", withUser(s.handleGetViewer), s.AuthRequired(util.PermLevelUpload)) + e.GET("/viewer", util.WithUser(s.handleGetViewer), s.AuthRequired(util.PermLevelUpload)) e.GET("/retrieval-candidates/:cid", s.handleGetRetrievalCandidates) e.GET("/gw/:path", s.handleGateway) - e.POST("/put", util.WithMultipartFormDataChecker(withUser(s.handleAdd)), s.AuthRequired(util.PermLevelUpload)) + e.POST("/put", util.WithMultipartFormDataChecker(util.WithUser(s.handleAdd)), s.AuthRequired(util.PermLevelUpload)) e.GET("/get/:cid", s.handleGetFullContentbyCid) // e.HEAD("/get/:cid", s.handleGetContentByCid) user := e.Group("/user") user.Use(s.AuthRequired(util.PermLevelUser)) - user.GET("/api-keys", withUser(s.handleUserGetApiKeys)) - user.POST("/api-keys", withUser(s.handleUserCreateApiKey)) - user.DELETE("/api-keys/:key_or_hash", withUser(s.handleUserRevokeApiKey)) - user.GET("/export", withUser(s.handleUserExportData)) - user.PUT("/password", withUser(s.handleUserChangePassword)) - user.PUT("/address", withUser(s.handleUserChangeAddress)) - user.GET("/stats", withUser(s.handleGetUserStats)) + user.GET("/api-keys", util.WithUser(s.handleUserGetApiKeys)) + user.POST("/api-keys", util.WithUser(s.handleUserCreateApiKey)) + user.DELETE("/api-keys/:key_or_hash", util.WithUser(s.handleUserRevokeApiKey)) + user.GET("/export", util.WithUser(s.handleUserExportData)) + user.PUT("/password", util.WithUser(s.handleUserChangePassword)) + user.PUT("/address", util.WithUser(s.handleUserChangeAddress)) + user.GET("/stats", util.WithUser(s.handleGetUserStats)) userMiner := user.Group("/miner") - userMiner.POST("/claim", withUser(s.handleUserClaimMiner)) - userMiner.GET("/claim/:miner", withUser(s.handleUserGetClaimMinerMsg)) - userMiner.POST("/suspend/:miner", withUser(s.handleSuspendMiner)) - userMiner.PUT("/unsuspend/:miner", withUser(s.handleUnsuspendMiner)) - userMiner.PUT("/set-info/:miner", withUser(s.handleMinersSetInfo)) + userMiner.POST("/claim", util.WithUser(s.handleUserClaimMiner)) + userMiner.GET("/claim/:miner", util.WithUser(s.handleUserGetClaimMinerMsg)) + userMiner.POST("/suspend/:miner", util.WithUser(s.handleSuspendMiner)) + userMiner.PUT("/unsuspend/:miner", util.WithUser(s.handleUnsuspendMiner)) + userMiner.PUT("/set-info/:miner", util.WithUser(s.handleMinersSetInfo)) contmeta := e.Group("/content") uploads := contmeta.Group("", s.AuthRequired(util.PermLevelUpload)) - uploads.POST("/add", util.WithMultipartFormDataChecker(withUser(s.handleAdd))) - uploads.POST("/add-ipfs", withUser(s.handleAddIpfs)) - uploads.POST("/add-car", util.WithContentLengthCheck(withUser(s.handleAddCar))) - uploads.POST("/create", withUser(s.handleCreateContent)) + uploads.POST("/add", util.WithMultipartFormDataChecker(util.WithUser(s.handleAdd))) + uploads.POST("/add-ipfs", util.WithUser(s.handleAddIpfs)) + uploads.POST("/add-car", util.WithContentLengthCheck(util.WithUser(s.handleAddCar))) + uploads.POST("/create", util.WithUser(s.handleCreateContent)) content := contmeta.Group("", s.AuthRequired(util.PermLevelUser)) content.GET("/by-cid/:cid", s.handleGetContentByCid) - content.GET("/:cont_id", withUser(s.handleGetContent)) - content.GET("/stats", withUser(s.handleStats)) + content.GET("/:cont_id", util.WithUser(s.handleGetContent)) + content.GET("/stats", util.WithUser(s.handleStats)) content.GET("/ensure-replication/:datacid", s.handleEnsureReplication) - content.GET("/status/:id", withUser(s.handleContentStatus)) - content.GET("/list", withUser(s.handleListContent)) - content.GET("/deals", withUser(s.handleListContentWithDeals)) - content.GET("/failures/:content", withUser(s.handleGetContentFailures)) - content.GET("/bw-usage/:content", withUser(s.handleGetContentBandwidth)) - content.GET("/staging-zones", withUser(s.handleGetStagingZoneForUser)) - content.GET("/aggregated/:content", withUser(s.handleGetAggregatedForContent)) - content.GET("/all-deals", withUser(s.handleGetAllDealsForUser)) + content.GET("/status/:id", util.WithUser(s.handleContentStatus)) + content.GET("/list", util.WithUser(s.handleListContent)) + content.GET("/deals", util.WithUser(s.handleListContentWithDeals)) + content.GET("/failures/:content", util.WithUser(s.handleGetContentFailures)) + content.GET("/bw-usage/:content", util.WithUser(s.handleGetContentBandwidth)) + content.GET("/staging-zones", util.WithUser(s.handleGetStagingZoneForUser)) + content.GET("/aggregated/:content", util.WithUser(s.handleGetAggregatedForContent)) + content.GET("/all-deals", util.WithUser(s.handleGetAllDealsForUser)) // TODO: the commented out routes here are still fairly useful, but maybe // need to have some sort of 'super user' permission level in order to use // them? Can easily cause harm using them deals := e.Group("/deals") deals.Use(s.AuthRequired(util.PermLevelUser)) - deals.GET("/status/:deal", withUser(s.handleGetDealStatus)) - deals.GET("/status-by-proposal/:propcid", withUser(s.handleGetDealStatusByPropCid)) + deals.GET("/status/:deal", util.WithUser(s.handleGetDealStatus)) + deals.GET("/status-by-proposal/:propcid", util.WithUser(s.handleGetDealStatusByPropCid)) deals.GET("/query/:miner", s.handleQueryAsk) - deals.POST("/make/:miner", withUser(s.handleMakeDeal)) + deals.POST("/make/:miner", util.WithUser(s.handleMakeDeal)) //deals.POST("/transfer/start/:miner/:propcid/:datacid", s.handleTransferStart) deals.GET("/transfer/status/:id", s.handleTransferStatusByID) deals.POST("/transfer/status", s.handleTransferStatus) @@ -149,29 +149,29 @@ func (s *apiV1) RegisterRoutes(e *echo.Echo) { deals.POST("/estimate", s.handleEstimateDealCost) deals.GET("/proposal/:propcid", s.handleGetProposal) deals.GET("/info/:dealid", s.handleGetDealInfo) - deals.GET("/failures", withUser(s.handleStorageFailures)) + deals.GET("/failures", util.WithUser(s.handleStorageFailures)) cols := e.Group("/collections") cols.Use(s.AuthRequired(util.PermLevelUser)) - cols.GET("", withUser(s.handleListCollections)) - cols.POST("", withUser(s.handleCreateCollection)) - cols.DELETE("/:coluuid", withUser(s.handleDeleteCollection)) - cols.POST("/:coluuid", withUser(s.handleAddContentsToCollection)) - cols.GET("/:coluuid", withUser(s.handleGetCollectionContents)) - cols.DELETE("/:coluuid/contents", withUser(s.handleDeleteContentFromCollection)) - cols.POST("/:coluuid/commit", withUser(s.handleCommitCollection)) + cols.GET("", util.WithUser(s.handleListCollections)) + cols.POST("", util.WithUser(s.handleCreateCollection)) + cols.DELETE("/:coluuid", util.WithUser(s.handleDeleteCollection)) + cols.POST("/:coluuid", util.WithUser(s.handleAddContentsToCollection)) + cols.GET("/:coluuid", util.WithUser(s.handleGetCollectionContents)) + cols.DELETE("/:coluuid/contents", util.WithUser(s.handleDeleteContentFromCollection)) + cols.POST("/:coluuid/commit", util.WithUser(s.handleCommitCollection)) colfs := cols.Group("/fs") - colfs.POST("/add", withUser(s.handleColfsAdd)) + colfs.POST("/add", util.WithUser(s.handleColfsAdd)) pinning := e.Group("/pinning") pinning.Use(util.OpenApiMiddleware(s.log)) pinning.Use(s.AuthRequired(util.PermLevelUser)) - pinning.GET("/pins", withUser(s.handleListPins)) - pinning.GET("/pins/:pinid", withUser(s.handleGetPin)) - pinning.DELETE("/pins/:pinid", withUser(s.handleDeletePin)) + pinning.GET("/pins", util.WithUser(s.handleListPins)) + pinning.GET("/pins/:pinid", util.WithUser(s.handleGetPin)) + pinning.DELETE("/pins/:pinid", util.WithUser(s.handleDeletePin)) pinning.Use(util.JSONPayloadMiddleware) - pinning.POST("/pins", withUser(s.handleAddPin)) - pinning.POST("/pins/:pinid", withUser(s.handleReplacePin)) + pinning.POST("/pins", util.WithUser(s.handleAddPin)) + pinning.POST("/pins/:pinid", util.WithUser(s.handleReplacePin)) // explicitly public, for now public := e.Group("/public") @@ -204,14 +204,14 @@ func (s *apiV1) RegisterRoutes(e *echo.Echo) { admin.GET("/dealstats", s.handleDealStats) admin.GET("/disk-info", s.handleDiskSpaceCheck) admin.GET("/stats", s.handleAdminStats) - admin.GET("/system/config", withUser(s.handleGetSystemConfig)) + admin.GET("/system/config", util.WithUser(s.handleGetSystemConfig)) // miners admin.POST("/miners/add/:miner", s.handleAdminAddMiner) admin.POST("/miners/rm/:miner", s.handleAdminRemoveMiner) - admin.POST("/miners/suspend/:miner", withUser(s.handleSuspendMiner)) - admin.PUT("/miners/unsuspend/:miner", withUser(s.handleUnsuspendMiner)) - admin.PUT("/miners/set-info/:miner", withUser(s.handleMinersSetInfo)) + admin.POST("/miners/suspend/:miner", util.WithUser(s.handleSuspendMiner)) + admin.PUT("/miners/unsuspend/:miner", util.WithUser(s.handleUnsuspendMiner)) + admin.PUT("/miners/set-info/:miner", util.WithUser(s.handleMinersSetInfo)) admin.GET("/miners", s.handleAdminGetMiners) admin.GET("/miners/stats", s.handleAdminGetMinerStats) admin.GET("/miners/transfers/:miner", s.handleMinerTransferDiagnostics) @@ -248,7 +248,7 @@ func (s *apiV1) RegisterRoutes(e *echo.Echo) { admin.GET("/retrieval/querytest/:content", s.handleRetrievalCheck) admin.GET("/retrieval/stats", s.handleGetRetrievalInfo) - admin.POST("/invite/:code", withUser(s.handleAdminCreateInvite)) + admin.POST("/invite/:code", util.WithUser(s.handleAdminCreateInvite)) admin.GET("/invites", s.handleAdminGetInvites) admin.GET("/fixdeals", s.handleFixupDeals) diff --git a/api/v1/handlers.go b/api/v1/handlers.go index 32c4ef88..6a251884 100644 --- a/api/v1/handlers.go +++ b/api/v1/handlers.go @@ -107,20 +107,6 @@ type statsResp struct { PinningStatus pinningtypes.PinningStatus `json:"pinningStatus"` } -func withUser(f func(echo.Context, *util.User) error) func(echo.Context) error { - return func(c echo.Context) error { - u, ok := c.Get("user").(*util.User) - if !ok { - return &util.HttpError{ - Code: http.StatusUnauthorized, - Reason: util.ERR_INVALID_AUTH, - Details: "endpoint not called with proper authentication", - } - } - return f(c, u) - } -} - // handleStats godoc // @Summary Get content statistics // @Description This endpoint is used to get content statistics. Every content stored in the network (estuary) is tracked by a unique ID which can be used to get information about the content. This endpoint will allow the consumer to get the collected stats of a content @@ -388,7 +374,7 @@ func (s *apiV1) handleAddIpfs(c echo.Context, u *util.User) error { defaultPath := "/" + filename colp := defaultPath if params.CollectionDir != "" { - p, err := sanitizePath(params.CollectionDir) + p, err := util.SanitizePath(params.CollectionDir) if err != nil { return err } @@ -653,7 +639,7 @@ func (s *apiV1) handleAdd(c echo.Context, u *util.User) error { col = &srchCol } - path, err := constructDirectoryPath(c.QueryParam(ColDir)) + path, err := util.ConstructDirectoryPath(c.QueryParam(ColDir)) if err != nil { return err } @@ -735,20 +721,6 @@ func (s *apiV1) handleAdd(c echo.Context, u *util.User) error { }) } -func constructDirectoryPath(dir string) (string, error) { - defaultPath := "/" - path := defaultPath - if cp := dir; cp != "" { - sp, err := sanitizePath(cp) - if err != nil { - return "", err - } - - path = sp - } - return path, nil -} - // redirectContentAdding is called when localContentAddingDisabled is true // it finds available shuttles and adds the desired content in one of them func (s *apiV1) redirectContentAdding(c echo.Context, u *util.User) error { @@ -3297,7 +3269,7 @@ func (s *apiV1) handleAddContentsToCollection(c echo.Context, u *util.User) erro return fmt.Errorf("%d specified content(s) were not found or user missing permissions", len(contentIDs)-len(contents)) } - path, err := constructDirectoryPath(c.QueryParam(ColDir)) + path, err := util.ConstructDirectoryPath(c.QueryParam(ColDir)) var colrefs []collections.CollectionRef for _, cont := range contents { fullPath := filepath.Join(path, cont.Name) @@ -4659,7 +4631,7 @@ func (s *apiV1) handleCreateContent(c echo.Context, u *util.User) error { req.CollectionDir = "/" } - sp, err := sanitizePath(req.CollectionDir) + sp, err := util.SanitizePath(req.CollectionDir) if err != nil { return err } @@ -5062,25 +5034,15 @@ const ( ColDir string = "dir" ) -func sanitizePath(p string) (string, error) { - if len(p) == 0 { - return "", fmt.Errorf("can't sanitize empty path") - } - - if p[0] != '/' { - return "", fmt.Errorf("paths must start with /") - } - - // TODO: prevent use of special weird characters - - cleanPath := filepath.Clean(p) - - // if original path ends in /, append / to cleaned path - // needed for full path vs dir+filename magic to work in handleAddIpfs - if strings.HasSuffix(p, "/") { - cleanPath = cleanPath + "/" - } - return cleanPath, nil +type collectionListResponse struct { + Name string `json:"name"` + Type CidType `json:"type"` + Size int64 `json:"size"` + ContID uint `json:"contId"` + Cid *util.DbCID `json:"cid,omitempty"` + Dir string `json:"dir"` + ColUuid string `json:"coluuid"` + UpdatedAt time.Time `json:"updatedAt"` } // handleColfsAdd godoc @@ -5134,7 +5096,7 @@ func (s *apiV1) handleColfsAdd(c echo.Context, u *util.User) error { var path *string if npath != "" { - p, err := sanitizePath(npath) + p, err := util.SanitizePath(npath) if err != nil { return err } diff --git a/api/v1/pinning.go b/api/v1/pinning.go index 652e916d..1dd06c32 100644 --- a/api/v1/pinning.go +++ b/api/v1/pinning.go @@ -265,7 +265,7 @@ func (s *apiV1) handleAddPin(e echo.Context, u *util.User) error { var colpath *string colp, ok := pin.Meta["colpath"].(string) if ok { - p, err := sanitizePath(colp) + p, err := util.SanitizePath(colp) if err != nil { return err } diff --git a/util/auth.go b/util/auth.go index b4ad52c8..ced62e7f 100644 --- a/util/auth.go +++ b/util/auth.go @@ -5,6 +5,8 @@ import ( b64 "encoding/base64" "fmt" "net/http" + + "github.com/labstack/echo/v4" ) func isEntityOwner(uID, entityID uint, entity string) error { @@ -50,3 +52,17 @@ func GetTokenHash(token string) string { // needs to be URL-encodable to send revoke token requests by hash return b64.RawURLEncoding.EncodeToString(tokenHashBytes[:]) } + +func WithUser(f func(echo.Context, *User) error) func(echo.Context) error { + return func(c echo.Context) error { + u, ok := c.Get("user").(*User) + if !ok { + return &HttpError{ + Code: http.StatusUnauthorized, + Reason: ERR_INVALID_AUTH, + Details: "endpoint not called with proper authentication", + } + } + return f(c, u) + } +} diff --git a/util/content.go b/util/content.go index abed798b..18cf01fe 100644 --- a/util/content.go +++ b/util/content.go @@ -213,3 +213,38 @@ func GetContent(contentid string, db *gorm.DB, u *User) (Content, error) { } return content, nil } + +func ConstructDirectoryPath(dir string) (string, error) { + defaultPath := "/" + path := defaultPath + if cp := dir; cp != "" { + sp, err := SanitizePath(cp) + if err != nil { + return "", err + } + + path = sp + } + return path, nil +} + +func SanitizePath(p string) (string, error) { + if len(p) == 0 { + return "", fmt.Errorf("can't sanitize empty path") + } + + if p[0] != '/' { + return "", fmt.Errorf("paths must start with /") + } + + // TODO: prevent use of special weird characters + + cleanPath := filepath.Clean(p) + + // if original path ends in /, append / to cleaned path + // needed for full path vs dir+filename magic to work in handleAddIpfs + if strings.HasSuffix(p, "/") { + cleanPath = cleanPath + "/" + } + return cleanPath, nil +} From 17ee3cd8d529a44db0372cbea17abe6899d6550f Mon Sep 17 00:00:00 2001 From: Lawrence Enehizena Date: Wed, 7 Dec 2022 21:27:51 +0100 Subject: [PATCH 2/8] add api v2 for contents --- api/v2/api.go | 36 ++-- api/v2/auth.go | 99 ++++++++++ api/v2/contents.go | 461 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 583 insertions(+), 13 deletions(-) create mode 100644 api/v2/auth.go create mode 100644 api/v2/contents.go diff --git a/api/v2/api.go b/api/v2/api.go index c718512e..ec5bc1e3 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -7,6 +7,7 @@ import ( "github.com/application-research/estuary/node" "github.com/application-research/estuary/pinner" "github.com/application-research/estuary/stagingbs" + "github.com/application-research/estuary/util" "github.com/application-research/estuary/util/gateway" "github.com/application-research/filclient" "github.com/filecoin-project/lotus/api" @@ -19,13 +20,13 @@ import ( type apiV2 struct { cfg *config.Estuary - DB *gorm.DB + db *gorm.DB tracer trace.Tracer - Node *node.Node - FilClient *filclient.FilClient - Api api.Gateway - CM *contentmgr.ContentManager - StagingMgr *stagingbs.StagingBSMgr + node *node.Node + filClient *filclient.FilClient + api api.Gateway + cm *contentmgr.ContentManager + stagingMgr *stagingbs.StagingBSMgr gwayHandler *gateway.GatewayHandler cacher *memo.Cacher minerManager miner.IMinerManager @@ -48,13 +49,13 @@ func NewAPIV2( ) *apiV2 { return &apiV2{ cfg: cfg, - DB: db, + db: db, tracer: trc, - Node: nd, - FilClient: fc, - Api: gwApi, - CM: cm, - StagingMgr: sbm, + node: nd, + filClient: fc, + api: gwApi, + cm: cm, + stagingMgr: sbm, gwayHandler: gateway.NewGatewayHandler(nd.Blockstore), cacher: memo.NewCacher(), minerManager: mm, @@ -81,5 +82,14 @@ func NewAPIV2( // @securityDefinitions.Bearer.in header // @securityDefinitions.Bearer.name Authorization func (s *apiV2) RegisterRoutes(e *echo.Echo) { - _ = e.Group("/v2") + e2 := e.Group("/v2") + + // to upload contents you only need an upload key + // to see info about contents you need a user-level key (see contents group) + e2.POST("/contents", util.WithUser(s.handleAdd), s.AuthRequired(util.PermLevelUpload)) + // e2.GET("/contents", util.WithUser(s.handleListContent), s.AuthRequired(util.PermLevelUser)) + // e2.GET("/contents/:contentid", util.WithUser(s.handleGetContent), s.AuthRequired(util.PermLevelUser)) + // e2.GET("/contents/:cid/ensure-replication", s.handleEnsureReplication, s.AuthRequired(util.PermLevelUser)) + // e2.GET("/contents/:contentid/status", util.WithUser(s.handleContentStatus), s.AuthRequired(util.PermLevelUser)) + } diff --git a/api/v2/auth.go b/api/v2/auth.go new file mode 100644 index 00000000..8f96aaed --- /dev/null +++ b/api/v2/auth.go @@ -0,0 +1,99 @@ +package api + +import ( + "fmt" + "net/http" + "time" + + "github.com/application-research/estuary/util" + "github.com/labstack/echo/v4" + "go.opentelemetry.io/otel/attribute" + "golang.org/x/xerrors" + "gorm.io/gorm" +) + +func (s *apiV2) AuthRequired(level int) echo.MiddlewareFunc { + return func(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + + // Check first if the Token is available. We should not continue if the + // token isn't even available. + auth, err := util.ExtractAuth(c) + if err != nil { + return err + } + + ctx, span := s.tracer.Start(c.Request().Context(), "authCheck") + defer span.End() + c.SetRequest(c.Request().WithContext(ctx)) + + u, err := s.checkTokenAuth(auth) + if err != nil { + return err + } + + span.SetAttributes(attribute.Int("user", int(u.ID))) + + if u.AuthToken.UploadOnly && level >= util.PermLevelUser { + s.log.Warnw("api key is upload only", "user", u.ID, "perm", u.Perm, "required", level) + + return &util.HttpError{ + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "api key is upload only", + } + } + + if u.Perm >= level { + c.Set("user", u) + return next(c) + } + + s.log.Warnw("user not authorized", "user", u.ID, "perm", u.Perm, "required", level) + + return &util.HttpError{ + Code: http.StatusForbidden, + Reason: util.ERR_NOT_AUTHORIZED, + Details: "user not authorized", + } + } + } +} + +func (s *apiV2) checkTokenAuth(token string) (*util.User, error) { + var authToken util.AuthToken + tokenHash := util.GetTokenHash(token) + if err := s.db.First(&authToken, "token = ? OR token_hash = ?", token, tokenHash).Error; err != nil { + if xerrors.Is(err, gorm.ErrRecordNotFound) { + return nil, &util.HttpError{ + Code: http.StatusUnauthorized, + Reason: util.ERR_INVALID_TOKEN, + Details: "api key does not exist", + } + } + return nil, err + } + + if authToken.Expiry.Before(time.Now()) { + return nil, &util.HttpError{ + Code: http.StatusUnauthorized, + Reason: util.ERR_TOKEN_EXPIRED, + Details: fmt.Sprintf("token for user %d expired %s", authToken.User, authToken.Expiry), + } + } + + var user util.User + if err := s.db.First(&user, "id = ?", authToken.User).Error; err != nil { + if xerrors.Is(err, gorm.ErrRecordNotFound) { + return nil, &util.HttpError{ + Code: http.StatusUnauthorized, + Reason: util.ERR_INVALID_TOKEN, + Details: "no user exists for the spicified api key", + } + } + return nil, err + } + + user.AuthToken = authToken + return &user, nil +} diff --git a/api/v2/contents.go b/api/v2/contents.go new file mode 100644 index 00000000..c134ba10 --- /dev/null +++ b/api/v2/contents.go @@ -0,0 +1,461 @@ +package api + +import ( + "context" + "fmt" + "math/rand" + "net/http" + "net/http/httputil" + "net/url" + "path" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/application-research/estuary/collections" + "github.com/application-research/estuary/model" + "github.com/application-research/estuary/util" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" + ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" + "github.com/ipld/go-car" + "github.com/labstack/echo/v4" + "github.com/labstack/gommon/log" + "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "golang.org/x/xerrors" +) + +type UploadedContent struct { + Length int64 + Filename string + CID cid.Cid + Origins []*peer.AddrInfo +} + +type UploadType string + +const ( + UploadTypeDefault UploadType = "" + UploadTypeFile UploadType = "file" + UploadTypeCID UploadType = "cid" + UploadTypeCar UploadType = "car" + UploadTypeUrl UploadType = "url" +) + +type CidType string + +const ( + Dir CidType = "directory" + File CidType = "file" + ColDir string = "dir" +) + +// handleAdd godoc +// @Summary Add new content +// @Description This endpoint is used to upload new content. +// @Tags contents +// @Produce json +// @Accept multipart/form-data +// @Param type query type false "Type of content to upload ('car', 'cid', 'file' or 'url'). Defaults to 'file'" +// @Param car body string false "Car file to upload" +// @Param body body util.ContentAddIpfsBody false "IPFS Body" +// @Param data formData file false "File to upload" +// @Param filename formData string false "Filename to use for upload" +// @Param uuid query string false "Collection UUID" +// @Param replication query int false "Replication value" +// @Param ignore-dupes query string false "Ignore Dupes true/false" +// @Param lazy-provide query string false "Lazy Provide true/false" +// @Param dir query string false "Directory in collection" +// @Success 200 {object} util.ContentAddResponse +// @Failure 400 {object} util.HttpError +// @Failure 500 {object} util.HttpError +// @Router /contents [post] +func (s *apiV2) handleAdd(c echo.Context, u *util.User) error { + ctx, span := s.tracer.Start(c.Request().Context(), "handleAdd", trace.WithAttributes(attribute.Int("user", int(u.ID)))) + defer span.End() + + if err := util.ErrorIfContentAddingDisabled(s.isContentAddingDisabled(u)); err != nil { + return err + } + + if s.cfg.Content.DisableLocalAdding { + return s.redirectContentAdding(c, u) + } + + // replication from query params + replication := s.cfg.Replication + replVal := c.QueryParam("replication") + if replVal != "" { + parsed, err := strconv.Atoi(replVal) + if err != nil { + log.Errorf("failed to parse replication value in form data, assuming default for now: %s", err) + } else { + replication = parsed + } + } + + // collection uuid from query params + collectionuuid := c.QueryParam("uuid") + var collection *collections.Collection + if collectionuuid != "" { + var srchcollection collections.Collection + if err := s.db.First(&srchcollection, "uuid = ? and user_id = ?", collectionuuid, u.ID).Error; err != nil { + return err + } + collection = &srchcollection + } + + path, err := util.ConstructDirectoryPath(c.QueryParam(ColDir)) + if err != nil { + return err + } + + uploadType := UploadType(c.QueryParam("type")) + if uploadType == UploadTypeDefault { + uploadType = UploadTypeFile + } + + bsid, bs, err := s.stagingMgr.AllocNew() + if err != nil { + return err + } + + defer func() { + go func() { + if err := s.stagingMgr.CleanUp(bsid); err != nil { + log.Errorf("failed to clean up staging blockstore: %s", err) + } + }() + }() + + bserv := blockservice.New(bs, nil) + dserv := merkledag.NewDAGService(bserv) + + uploadedContent, err := s.loadContentFromRequest(c, ctx, uploadType, bs, dserv) + + // if splitting is disabled and uploaded content size is greater than content size limit + // reject the upload, as it will only get stuck and deals will never be made for it + if !u.FlagSplitContent() && uploadedContent.Length > s.cfg.Content.MaxSize { + return &util.HttpError{ + Code: http.StatusBadRequest, + Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, + Details: fmt.Sprintf("content size %d bytes, is over upload size limit of %d bytes, and content splitting is not enabled, please reduce the content size", uploadedContent.Length, s.cfg.Content.MaxSize), + } + } + + if c.QueryParam("ignore-dupes") == "true" { + isDup, err := s.isDupCIDContent(c, uploadedContent.CID, u) + if err != nil || isDup { + return err + } + } + + // when pinning a CID we need to add a file to handle the special case + // of calling PinContent on the content manager + // TODO(gabe): PinContent adds to database tracking. decouple logic from that + if uploadType == UploadTypeCID { + makeDeal := true + cols := []*collections.CollectionRef{ + { + Collection: collection.ID, + Path: &path, + }, + } + pinstatus, pinOp, err := s.cm.PinContent(ctx, u.ID, uploadedContent.CID, uploadedContent.Filename, cols, uploadedContent.Origins, 0, nil, makeDeal) + if err != nil { + return err + } + s.pinMgr.Add(pinOp) + return c.JSON(http.StatusAccepted, pinstatus) + } + + content, err := s.cm.AddDatabaseTracking(ctx, u, dserv, uploadedContent.CID, uploadedContent.Filename, replication) + if err != nil { + return xerrors.Errorf("encountered problem computing object references: %w", err) + } + fullPath := filepath.Join(path, content.Name) + + // create collection if need be + if collection != nil { + log.Debugf("COLLECTION CREATION: %d, %d", collection.ID, content.ID) + if err := s.db.Create(&collections.CollectionRef{ + Collection: collection.ID, + Content: content.ID, + Path: &fullPath, + }).Error; err != nil { + log.Errorf("failed to add content to requested collection: %s", err) + } + } + + if err := util.DumpBlockstoreTo(ctx, s.tracer, bs, s.node.Blockstore); err != nil { + return xerrors.Errorf("failed to move data from staging to main blockstore: %w", err) + } + + go func() { + s.cm.ToCheck(content.ID) + }() + + if c.QueryParam("lazy-provide") != "true" { + subctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + if err := s.node.FullRT.Provide(subctx, uploadedContent.CID, true); err != nil { + span.RecordError(fmt.Errorf("provide error: %w", err)) + log.Errorf("fullrt provide call errored: %s", err) + } + } + + go func() { + if err := s.node.Provider.Provide(uploadedContent.CID); err != nil { + log.Warnf("failed to announce providers: %s", err) + } + }() + + return c.JSON(http.StatusOK, &util.ContentAddResponse{ + Cid: uploadedContent.CID.String(), + RetrievalURL: util.CreateDwebRetrievalURL(uploadedContent.CID.String()), + EstuaryRetrievalURL: util.CreateEstuaryRetrievalURL(uploadedContent.CID.String()), + EstuaryId: content.ID, + Providers: s.cm.PinDelegatesForContent(*content), + }) +} + +// LoadContentFromRequest reads a POST /contents request and loads the content from it +// It treats every different case of content upload: file (formData, CID, CAR or URL) +// Returns (UploadedContent, contentLen, filename, error) +func (s *apiV2) loadContentFromRequest(c echo.Context, ctx context.Context, uploadType UploadType, bs blockstore.Blockstore, dserv ipld.DAGService) (UploadedContent, error) { + // for all three upload types + // get len + // get filename + // import file and get cid + content := UploadedContent{} + switch uploadType { + case UploadTypeFile: + // get file from formData + form, err := c.MultipartForm() + if err != nil { + return UploadedContent{}, xerrors.Errorf("invalid formData for 'file' upload option: %w", err) + } + defer form.RemoveAll() + mpf, err := c.FormFile("data") + if err != nil { + return UploadedContent{}, xerrors.Errorf("invalid formData for 'file' upload option: %w", err) + } + + // Get len + content.Length = mpf.Size + + // Get filename + content.Filename = mpf.Filename + if fvname := c.FormValue("filename"); fvname != "" { + content.Filename = fvname + } + + // import file and get UploadTypeCID + fi, err := mpf.Open() + if err != nil { + return UploadedContent{}, err + } + defer fi.Close() + nd, err := util.ImportFile(dserv, fi) + if err != nil { + return UploadedContent{}, err + } + content.CID = nd.Cid() + + case UploadTypeCar: + // get CAR file from request body + // import file and get UploadTypeCID + defer c.Request().Body.Close() + header, err := car.LoadCar(ctx, bs, c.Request().Body) + if err != nil { + return UploadedContent{}, err + } + if len(header.Roots) != 1 { + // if someone wants this feature, let me know + return UploadedContent{}, xerrors.Errorf("cannot handle uploading car files with multiple roots") + } + content.CID = header.Roots[0] + + // Get filename + // TODO: how to specify filename? + content.Filename = content.CID.String() + if qpname := c.QueryParam("filename"); qpname != "" { + content.Filename = qpname + } + + // Get len + // TODO: uncomment and fix this + // bdWriter := &bytes.Buffer{} + // bdReader := io.TeeReader(c.Request().Body, bdWriter) + + // bdSize, err := io.Copy(ioutil.Discard, bdReader) + // if err != nil { + // return err + // } + + // if bdSize > util.MaxDealContentSize { + // return &util.HttpError{ + // Code: http.StatusBadRequest, + // Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, + // Details: fmt.Sprintf("content size %d bytes, is over upload size of limit %d bytes, and content splitting is not enabled, please reduce the content size", bdSize, util.MaxDealContentSize), + // } + // } + + // c.Request().Body = ioutil.NopCloser(bdWriter) + content.Length = 0 // zero since we're not checking the length of this content so it doesn't break the limit check (bad) + + case UploadTypeCID: + // get UploadTypeCID from POST body + var params util.ContentAddIpfsBody + if err := c.Bind(¶ms); err != nil { + return UploadedContent{}, err + } + + // Get filename + content.Filename = params.Name + if content.Filename == "" { + content.Filename = params.Root + } + + // get UploadTypeCID + cid, err := cid.Decode(params.Root) + if err != nil { + return UploadedContent{}, err + } + content.CID = cid + + // Can't get len (will be gotten during pinning) + content.Length = 0 + + // origins are needed for pinning later on + var origins []*peer.AddrInfo + for _, p := range params.Peers { + ai, err := peer.AddrInfoFromString(p) + if err != nil { + return UploadedContent{}, err + } + origins = append(origins, ai) + } + content.Origins = origins + + case UploadTypeUrl: + url := string(UploadTypeUrl) + filename := path.Base(url) + content.Filename = filename + + resp, err := http.Get(url) + if err != nil { + return UploadedContent{}, err + } + defer resp.Body.Close() + + nd, err := util.ImportFile(dserv, resp.Body) + if err != nil { + return UploadedContent{}, err + } + content.CID = nd.Cid() + + default: + return UploadedContent{}, xerrors.Errorf("invalid type, need 'file', 'cid' or 'car'. Got %s", uploadType) + } + return content, nil +} + +func (s *apiV2) isDupCIDContent(c echo.Context, rootCID cid.Cid, u *util.User) (bool, error) { + var count int64 + if err := s.db.Model(util.Content{}).Where("cid = ? and user_id = ?", rootCID.Bytes(), u.ID).Count(&count).Error; err != nil { + return false, err + } + if count > 0 { + return true, c.JSON(409, map[string]string{"message": fmt.Sprintf("this content is already preserved under cid:%s", rootCID.String())}) + } + return false, nil +} + +func (s *apiV2) isContentAddingDisabled(u *util.User) bool { + return (s.cfg.Content.DisableGlobalAdding && s.cfg.Content.DisableLocalAdding) || u.StorageDisabled +} + +// redirectContentAdding is called when localContentAddingDisabled is true +// it finds available shuttles and adds the desired content in one of them +func (s *apiV2) redirectContentAdding(c echo.Context, u *util.User) error { + uep, err := s.getPreferredUploadEndpoints(u) + if err != nil { + return fmt.Errorf("failed to get preferred upload endpoints: %s", err) + } + if len(uep) <= 0 { + return &util.HttpError{ + Code: http.StatusBadRequest, + Reason: util.ERR_CONTENT_ADDING_DISABLED, + Details: "uploading content to this node is not allowed at the moment", + } + } + + //#nosec G404: ignore weak random number generator + shURL, err := url.Parse(uep[rand.Intn(len(uep))]) + if err != nil { + return err + } + shURL.Path = "" + shURL.RawQuery = "" + shURL.Fragment = "" + + proxy := httputil.NewSingleHostReverseProxy(shURL) + proxy.ServeHTTP(c.Response(), c.Request()) + return nil +} + +func (s *apiV2) getPreferredUploadEndpoints(u *util.User) ([]string, error) { + // TODO: this should be a lotttttt smarter + s.cm.ShuttlesLk.Lock() + defer s.cm.ShuttlesLk.Unlock() + var shuttles []model.Shuttle + for hnd, sh := range s.cm.Shuttles { + if sh.ContentAddingDisabled { + s.log.Debugf("shuttle %+v content adding is disabled", sh) + continue + } + + if sh.Hostname == "" { + s.log.Debugf("shuttle %+v has empty hostname", sh) + continue + } + + var shuttle model.Shuttle + if err := s.db.First(&shuttle, "handle = ?", hnd).Error; err != nil { + s.log.Errorf("failed to look up shuttle by handle: %s", err) + continue + } + + if !shuttle.Open { + s.log.Debugf("shuttle %+v is not open, skipping", shuttle) + continue + } + shuttles = append(shuttles, shuttle) + } + + sort.Slice(shuttles, func(i, j int) bool { + return shuttles[i].Priority > shuttles[j].Priority + }) + + var out []string + for _, sh := range shuttles { + host := "https://" + sh.Host + if strings.HasPrefix(sh.Host, "http://") || strings.HasPrefix(sh.Host, "https://") { + host = sh.Host + } + out = append(out, host+"/content/add") + } + + if !s.cfg.Content.DisableLocalAdding { + out = append(out, s.cfg.Hostname+"/content/add") + } + return out, nil +} From d9fa2fdfffa862cdc6d48c19fd31029b97b1e7ad Mon Sep 17 00:00:00 2001 From: Gabriel Cruz Date: Thu, 8 Dec 2022 10:22:20 -0300 Subject: [PATCH 3/8] add error handling --- api/v2/contents.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/api/v2/contents.go b/api/v2/contents.go index c134ba10..558881e3 100644 --- a/api/v2/contents.go +++ b/api/v2/contents.go @@ -139,6 +139,9 @@ func (s *apiV2) handleAdd(c echo.Context, u *util.User) error { dserv := merkledag.NewDAGService(bserv) uploadedContent, err := s.loadContentFromRequest(c, ctx, uploadType, bs, dserv) + if err != nil { + return err + } // if splitting is disabled and uploaded content size is greater than content size limit // reject the upload, as it will only get stuck and deals will never be made for it @@ -164,10 +167,13 @@ func (s *apiV2) handleAdd(c echo.Context, u *util.User) error { makeDeal := true cols := []*collections.CollectionRef{ { - Collection: collection.ID, - Path: &path, + Path: &path, }, } + if collection != nil { + cols[0].Collection = collection.ID + } + pinstatus, pinOp, err := s.cm.PinContent(ctx, u.ID, uploadedContent.CID, uploadedContent.Filename, cols, uploadedContent.Origins, 0, nil, makeDeal) if err != nil { return err From 2036677ba56991f6d0fce42b700ef9d0c9479e16 Mon Sep 17 00:00:00 2001 From: Gabriel Cruz Date: Thu, 8 Dec 2022 14:38:20 -0300 Subject: [PATCH 4/8] add other contents endpoints --- api/v2/api.go | 8 +- api/v2/contents.go | 429 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 426 insertions(+), 11 deletions(-) diff --git a/api/v2/api.go b/api/v2/api.go index ec5bc1e3..6779c973 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -87,9 +87,9 @@ func (s *apiV2) RegisterRoutes(e *echo.Echo) { // to upload contents you only need an upload key // to see info about contents you need a user-level key (see contents group) e2.POST("/contents", util.WithUser(s.handleAdd), s.AuthRequired(util.PermLevelUpload)) - // e2.GET("/contents", util.WithUser(s.handleListContent), s.AuthRequired(util.PermLevelUser)) - // e2.GET("/contents/:contentid", util.WithUser(s.handleGetContent), s.AuthRequired(util.PermLevelUser)) - // e2.GET("/contents/:cid/ensure-replication", s.handleEnsureReplication, s.AuthRequired(util.PermLevelUser)) - // e2.GET("/contents/:contentid/status", util.WithUser(s.handleContentStatus), s.AuthRequired(util.PermLevelUser)) + e2.GET("/contents", util.WithUser(s.handleListContent), s.AuthRequired(util.PermLevelUser)) + e2.GET("/contents/:contentid", util.WithUser(s.handleGetContent), s.AuthRequired(util.PermLevelUser)) + e2.GET("/contents/:cid/ensure-replication", s.handleEnsureReplication, s.AuthRequired(util.PermLevelUser)) + e2.GET("/contents/:contentid/status", util.WithUser(s.handleContentStatus), s.AuthRequired(util.PermLevelUser)) } diff --git a/api/v2/contents.go b/api/v2/contents.go index 558881e3..234de7e6 100644 --- a/api/v2/contents.go +++ b/api/v2/contents.go @@ -12,11 +12,15 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/application-research/estuary/collections" "github.com/application-research/estuary/model" "github.com/application-research/estuary/util" + "github.com/application-research/filclient" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -24,10 +28,12 @@ import ( "github.com/ipfs/go-merkledag" "github.com/ipld/go-car" "github.com/labstack/echo/v4" - "github.com/labstack/gommon/log" "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multihash" + "github.com/pkg/errors" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "gorm.io/gorm" "golang.org/x/xerrors" ) @@ -95,7 +101,7 @@ func (s *apiV2) handleAdd(c echo.Context, u *util.User) error { if replVal != "" { parsed, err := strconv.Atoi(replVal) if err != nil { - log.Errorf("failed to parse replication value in form data, assuming default for now: %s", err) + s.log.Errorf("failed to parse replication value in form data, assuming default for now: %s", err) } else { replication = parsed } @@ -130,7 +136,7 @@ func (s *apiV2) handleAdd(c echo.Context, u *util.User) error { defer func() { go func() { if err := s.stagingMgr.CleanUp(bsid); err != nil { - log.Errorf("failed to clean up staging blockstore: %s", err) + s.log.Errorf("failed to clean up staging blockstore: %s", err) } }() }() @@ -190,13 +196,13 @@ func (s *apiV2) handleAdd(c echo.Context, u *util.User) error { // create collection if need be if collection != nil { - log.Debugf("COLLECTION CREATION: %d, %d", collection.ID, content.ID) + s.log.Debugf("COLLECTION CREATION: %d, %d", collection.ID, content.ID) if err := s.db.Create(&collections.CollectionRef{ Collection: collection.ID, Content: content.ID, Path: &fullPath, }).Error; err != nil { - log.Errorf("failed to add content to requested collection: %s", err) + s.log.Errorf("failed to add content to requested collection: %s", err) } } @@ -213,13 +219,13 @@ func (s *apiV2) handleAdd(c echo.Context, u *util.User) error { defer cancel() if err := s.node.FullRT.Provide(subctx, uploadedContent.CID, true); err != nil { span.RecordError(fmt.Errorf("provide error: %w", err)) - log.Errorf("fullrt provide call errored: %s", err) + s.log.Errorf("fullrt provide call errored: %s", err) } } go func() { if err := s.node.Provider.Provide(uploadedContent.CID); err != nil { - log.Warnf("failed to announce providers: %s", err) + s.log.Warnf("failed to announce providers: %s", err) } }() @@ -465,3 +471,412 @@ func (s *apiV2) getPreferredUploadEndpoints(u *util.User) ([]string, error) { } return out, nil } + +// handleListContent godoc +// @Summary List all pinned content +// @Description This endpoint lists all content +// @Tags contents +// @Produce json +// @Success 200 {object} string +// @Failure 400 {object} util.HttpError +// @Failure 500 {object} util.HttpError +// @Param deals query bool false "If 'true', only list content with deals made" +// @Param cid query string false "CID of content to look for" +// @Router /content [get] +func (s *apiV2) handleListContent(c echo.Context, u *util.User) error { + if cidStr := c.QueryParam("cid"); cidStr != "" { + out, err := s.getContentByCid(cidStr) + if err != nil { + return err + } + return c.JSON(http.StatusOK, out) + } + if deals := c.QueryParam("deals"); deals == "true" { + return s.handleListContentWithDeals(c, u) + } + var contents []util.Content + if err := s.db.Find(&contents, "active and user_id = ?", u.ID).Error; err != nil { + return err + } + + return c.JSON(http.StatusOK, contents) +} + +type expandedContent struct { + util.Content + AggregatedFiles int64 `json:"aggregatedFiles"` +} + +// handleListContentWithDeals godoc +// @Summary Content with deals +// @Description This endpoint lists all content with deals +// @Tags contents +// @Produce json +// @Success 200 {object} string +// @Failure 400 {object} util.HttpError +// @Failure 500 {object} util.HttpError +// @Param limit query int false "Limit" +// @Param offset query int false "Offset" +// @Router /content/deals [get] +func (s *apiV2) handleListContentWithDeals(c echo.Context, u *util.User) error { + + var limit = 20 + if limstr := c.QueryParam("limit"); limstr != "" { + l, err := strconv.Atoi(limstr) + if err != nil { + return err + } + limit = l + } + + var offset int + if offstr := c.QueryParam("offset"); offstr != "" { + o, err := strconv.Atoi(offstr) + if err != nil { + return err + } + offset = o + } + + var contents []util.Content + err := s.db.Model(&util.Content{}). + Limit(limit). + Offset(offset). + Order("contents.id desc"). + Joins("inner join content_deals on contents.id = content_deals.content"). + Where("contents.active and contents.user_id = ? and not contents.aggregated_in > 0", u.ID). + Group("contents.id"). + Scan(&contents).Error + + if err != nil { + return err + } + + out := make([]expandedContent, 0, len(contents)) + for _, content := range contents { + ec := expandedContent{ + Content: content, + } + if content.Aggregate { + if err := s.db.Model(util.Content{}).Where("aggregated_in = ?", content.ID).Count(&ec.AggregatedFiles).Error; err != nil { + return err + } + + } + out = append(out, ec) + } + + return c.JSON(http.StatusOK, out) +} + +type getContentResponse struct { + Content *util.Content `json:"content"` + AggregatedIn *util.Content `json:"aggregatedIn,omitempty"` + Selector string `json:"selector,omitempty"` + Deals []*model.ContentDeal `json:"deals"` +} + +func (s *apiV2) getContentByCid(cidStr string) ([]getContentResponse, error) { + obj, err := cid.Decode(cidStr) + if err != nil { + return []getContentResponse{}, errors.Wrapf(err, "invalid cid") + } + + v0 := cid.Undef + dec, err := multihash.Decode(obj.Hash()) + if err == nil { + if dec.Code == multihash.SHA2_256 || dec.Length == 32 { + v0 = cid.NewCidV0(obj.Hash()) + } + } + v1 := cid.NewCidV1(obj.Prefix().Codec, obj.Hash()) + + var contents []util.Content + if err := s.db.Find(&contents, "(cid=? or cid=?) and active", v0.Bytes(), v1.Bytes()).Error; err != nil { + return []getContentResponse{}, err + } + + out := make([]getContentResponse, 0) + for i, content := range contents { + resp := getContentResponse{ + Content: &contents[i], + } + + id := content.ID + + if content.AggregatedIn > 0 { + var aggr util.Content + if err := s.db.First(&aggr, "id = ?", content.AggregatedIn).Error; err != nil { + return []getContentResponse{}, err + } + + resp.AggregatedIn = &aggr + + // no need to early return here, the selector is mostly cosmetic atm + if selector, err := s.calcSelector(content.AggregatedIn, content.ID); err == nil { + resp.Selector = selector + } + + id = content.AggregatedIn + } + + var deals []*model.ContentDeal + if err := s.db.Find(&deals, "content = ? and deal_id > 0 and not failed", id).Error; err != nil { + return []getContentResponse{}, err + } + + resp.Deals = deals + + out = append(out, resp) + } + + return out, nil +} + +func (s *apiV2) calcSelector(aggregatedIn uint, contentID uint) (string, error) { + // sort the known content IDs aggregated in a CAR, and use the index in the sorted list + // to build the CAR sub-selector + + var ordinal uint + result := s.db.Raw(`SELECT ordinal - 1 FROM ( + SELECT + id, ROW_NUMBER() OVER ( ORDER BY CAST(id AS TEXT) ) AS ordinal + FROM contents + WHERE aggregated_in = ? + ) subq + WHERE id = ? + `, aggregatedIn, contentID).Scan(&ordinal) + + if result.Error != nil { + return "", result.Error + } + + return fmt.Sprintf("/Links/%d/Hash", ordinal), nil +} + +// handleGetContentByCid godoc +// @Summary Get Content by Cid +// @Description This endpoint returns the content record associated with a CID +// @Tags public +// @Produce json +// @Success 200 {object} string +// @Failure 400 {object} util.HttpError +// @Failure 500 {object} util.HttpError +// @Param cid path string true "Cid" +// @Router /public/by-cid/{cid} [get] +func (s *apiV2) handleGetContentByCid(c echo.Context) error { + cidStr := c.Param("cid") + out, err := s.getContentByCid(cidStr) + if err != nil { + return err + } + return c.JSON(http.StatusOK, out) +} + +// handleGetContent godoc +// @Summary Content +// @Description This endpoint returns a content by its ID +// @Tags contents +// @Produce json +// @Success 200 {object} string +// @Failure 400 {object} util.HttpError +// @Failure 500 {object} util.HttpError +// @Param contentid path int true "Content ID" +// @Router /contents/{contentid} [get] +func (s *apiV2) handleGetContent(c echo.Context, u *util.User) error { + contID, err := strconv.Atoi(c.Param("contentid")) + if err != nil { + return err + } + + var content util.Content + if err := s.db.First(&content, "id = ?", contID).Error; err != nil { + if xerrors.Is(err, gorm.ErrRecordNotFound) { + return &util.HttpError{ + Code: http.StatusNotFound, + Reason: util.ERR_CONTENT_NOT_FOUND, + Details: fmt.Sprintf("content: %d was not found", contID), + } + } + return err + } + + if err := util.IsContentOwner(u.ID, content.UserID); err != nil { + return err + } + + return c.JSON(http.StatusOK, content) +} + +type EnsureReplicationResponse struct { + Content util.Content `json:"content"` +} + +// handleEnsureReplication godoc +// @Summary Ensure Replication +// @Description This endpoint ensures that the content is replicated to the specified number of providers +// @Tags contents +// @Produce json +// @Success 200 {object} string +// @Failure 400 {object} util.HttpError +// @Failure 500 {object} util.HttpError +// @Param cid path string true "CID" +// @Router /content/{cid}/ensure-replication [get] +func (s *apiV2) handleEnsureReplication(c echo.Context) error { + cid, err := cid.Decode(c.Param("cid")) + if err != nil { + return err + } + + var content util.Content + if err := s.db.Find(&content, "cid = ?", cid.Bytes()).Error; err != nil { + return err + } + + fmt.Println("Content: ", content.Cid.CID, cid) + + s.cm.ToCheck(content.ID) + return c.JSON(http.StatusOK, EnsureReplicationResponse{Content: content}) +} + +type onChainDealState struct { + SectorStartEpoch abi.ChainEpoch `json:"sectorStartEpoch"` + LastUpdatedEpoch abi.ChainEpoch `json:"lastUpdatedEpoch"` + SlashEpoch abi.ChainEpoch `json:"slashEpoch"` +} + +type dealStatus struct { + Deal model.ContentDeal `json:"deal"` + TransferStatus *filclient.ChannelState `json:"transfer"` + OnChainState *onChainDealState `json:"onChainState"` +} + +type ContentStatusResponse struct { + Content util.Content `json:"content"` + Deals []dealStatus `json:"deals"` + FailuresCount int64 `json:"failuresCount"` + Failures []model.DfeRecord `json:"failures"` + TotalOutBw int64 `json:"totalOutBw"` + Aggregated []util.Content `json:"aggregated"` + Offloaded bool `json:"offloaded"` // TODO(gabe): this needs to be set on handleContentStatus, but we're not offloading data as of now +} + +// handleContentStatus godoc +// @Summary Content Status +// @Description This endpoint returns the status of a content +// @Tags contents +// @Produce json +// @Success 200 {object} ContentStatusResponse +// @Failure 400 {object} util.HttpError +// @Failure 500 {object} util.HttpError +// @Param contentid path int true "Content ID" +// @Router /contents/{contentid}/status [get] +func (s *apiV2) handleContentStatus(c echo.Context, u *util.User) error { + ctx := c.Request().Context() + contID, err := strconv.Atoi(c.Param("contentid")) + if err != nil { + return err + } + + var content util.Content + if err := s.db.First(&content, "id = ?", contID).Error; err != nil { + if xerrors.Is(err, gorm.ErrRecordNotFound) { + return &util.HttpError{ + Code: http.StatusNotFound, + Reason: util.ERR_CONTENT_NOT_FOUND, + Details: fmt.Sprintf("content: %d was not found", contID), + } + } + return err + } + + if err := util.IsContentOwner(u.ID, content.UserID); err != nil { + return err + } + + var deals []model.ContentDeal + if err := s.db.Find(&deals, "content = ?", content.ID).Error; err != nil { + return err + } + + ds := make([]dealStatus, len(deals)) + var wg sync.WaitGroup + for i, d := range deals { + dl := d + wg.Add(1) + go func(i int) { + defer wg.Done() + d := deals[i] + dstatus := dealStatus{ + Deal: d, + } + + chanst, err := s.cm.GetTransferStatus(ctx, &dl, content.Cid.CID, content.Location) + if err != nil { + s.log.Errorf("failed to get transfer status: %s", err) + // the UI needs to display a transfer state even for intermittent errors + chanst = &filclient.ChannelState{ + StatusStr: "Error", + } + } + + // the transfer state is yet to be been announced - the UI needs to display a transfer state + if chanst == nil && d.DTChan == "" { + chanst = &filclient.ChannelState{ + StatusStr: "Initializing", + } + } + + dstatus.TransferStatus = chanst + + if d.DealID > 0 { + markDeal, err := s.api.StateMarketStorageDeal(ctx, abi.DealID(d.DealID), types.EmptyTSK) + if err != nil { + s.log.Warnw("failed to get deal info from market actor", "dealID", d.DealID, "error", err) + } else { + dstatus.OnChainState = &onChainDealState{ + SectorStartEpoch: markDeal.State.SectorStartEpoch, + LastUpdatedEpoch: markDeal.State.LastUpdatedEpoch, + SlashEpoch: markDeal.State.SlashEpoch, + } + } + } + ds[i] = dstatus + }(i) + } + + wg.Wait() + + sort.Slice(ds, func(i, j int) bool { + return ds[i].Deal.CreatedAt.Before(ds[j].Deal.CreatedAt) + }) + + var failCount int64 + var errs []model.DfeRecord + if err := s.db.Model(&model.DfeRecord{}).Where("content = ?", content.ID).Find(&errs).Count(&failCount).Error; err != nil { + return err + } + + var bw int64 + if err := s.db.Model(util.ObjRef{}). + Select("SUM(size * reads)"). + Where("obj_refs.content = ?", content.ID). + Joins("left join objects on obj_refs.object = objects.id"). + Scan(&bw).Error; err != nil { + return err + } + + var sub []util.Content + if err := s.db.Find(&sub, "aggregated_in = ?", content.ID).Error; err != nil { + return err + } + + return c.JSON(http.StatusOK, ContentStatusResponse{ + Content: content, + Deals: ds, + FailuresCount: failCount, + Failures: errs, + TotalOutBw: bw, + Aggregated: sub, + }) +} From 08e2a62dd680c5f699c4b45ecb8778289338cd53 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 6 Dec 2022 11:18:16 +0000 Subject: [PATCH 5/8] Bump go.opencensus.io from 0.23.0 to 0.24.0 Bumps [go.opencensus.io](https://github.com/census-instrumentation/opencensus-go) from 0.23.0 to 0.24.0. - [Release notes](https://github.com/census-instrumentation/opencensus-go/releases) - [Commits](https://github.com/census-instrumentation/opencensus-go/compare/v0.23.0...v0.24.0) --- updated-dependencies: - dependency-name: go.opencensus.io dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] From 819578f867bd19438906cdb482a758a905d4cfee Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 8 Dec 2022 16:18:04 +0000 Subject: [PATCH 6/8] Bump github.com/ipfs/go-merkledag from 0.8.0 to 0.8.1 Bumps [github.com/ipfs/go-merkledag](https://github.com/ipfs/go-merkledag) from 0.8.0 to 0.8.1. - [Release notes](https://github.com/ipfs/go-merkledag/releases) - [Commits](https://github.com/ipfs/go-merkledag/compare/v0.8.0...v0.8.1) --- updated-dependencies: - dependency-name: github.com/ipfs/go-merkledag dependency-type: direct:production ... Signed-off-by: dependabot[bot] From 01cc03029b7334e88b32efa58b12d78cfa1cc7eb Mon Sep 17 00:00:00 2001 From: Anjor Kanekar Date: Thu, 8 Dec 2022 16:47:46 +0000 Subject: [PATCH 7/8] fixes duplicate collection contents (#705) * fix * tests * refactor * fix tests From 70952067d4f0256dc34f6e4d3704777838f0bd21 Mon Sep 17 00:00:00 2001 From: Gabriel Cruz Date: Thu, 15 Dec 2022 12:14:36 -0300 Subject: [PATCH 8/8] add support for ignore-dupes on shuttles /content/add-car endpoints --- cmd/estuary-shuttle/main.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/cmd/estuary-shuttle/main.go b/cmd/estuary-shuttle/main.go index 9e4b0b3a..e8f99d62 100644 --- a/cmd/estuary-shuttle/main.go +++ b/cmd/estuary-shuttle/main.go @@ -1249,6 +1249,7 @@ func (s *Shuttle) handleLogLevel(c echo.Context) error { // @Success 200 {object} string // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError +// @Param ignore-dupes query string false "Ignore Dupes" // @Router /content/add [post] func (s *Shuttle) handleAdd(c echo.Context, u *User) error { ctx := c.Request().Context() @@ -1311,7 +1312,7 @@ func (s *Shuttle) handleAdd(c echo.Context, u *User) error { return err } - contid, err := s.createContent(ctx, u, nd.Cid(), filename, cic) + contid, err := s.createContent(ctx, u, nd.Cid(), filename, cic, c.QueryParam("ignore-dupes")) if err != nil { return err } @@ -1386,6 +1387,8 @@ func (s *Shuttle) Provide(ctx context.Context, c cid.Cid) error { // @Success 200 {object} string // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError +// @Param ignore-dupes query string false "Ignore Dupes" +// @Param filename query string false "Filename" // @Router /content/add-car [post] func (s *Shuttle) handleAddCar(c echo.Context, u *User) error { ctx := c.Request().Context() @@ -1454,7 +1457,7 @@ func (s *Shuttle) handleAddCar(c echo.Context, u *User) error { contid, err := s.createContent(ctx, u, root, filename, util.ContentInCollection{ CollectionID: c.QueryParam(ColUuid), CollectionDir: c.QueryParam(ColDir), - }) + }, c.QueryParam("ignore-dupes")) if err != nil { return err } @@ -1510,7 +1513,7 @@ func (s *Shuttle) addrsForShuttle() []string { return out } -func (s *Shuttle) createContent(ctx context.Context, u *User, root cid.Cid, filename string, cic util.ContentInCollection) (uint, error) { +func (s *Shuttle) createContent(ctx context.Context, u *User, root cid.Cid, filename string, cic util.ContentInCollection, ignoreDupes string) (uint, error) { log.Debugf("createContent> cid: %v, filename: %s, collection: %+v", root, filename, cic) data, err := json.Marshal(util.ContentCreateBody{ @@ -1533,6 +1536,10 @@ func (s *Shuttle) createContent(ctx context.Context, u *User, root cid.Cid, file return 0, err } + q := req.URL.Query() + q.Add("ignore-dupes", ignoreDupes) + req.URL.RawQuery = q.Encode() + req.Header.Set("Authorization", "Bearer "+u.AuthToken) req.Header.Set("Content-Type", "application/json") @@ -2288,6 +2295,7 @@ type importDealBody struct { // @Success 200 {object} string // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError +// @Param ignore-dupes query string false "Ignore Dupes" // @Param body body main.importDealBody true "Import a deal" // @Router /content/importdeal [post] func (s *Shuttle) handleImportDeal(c echo.Context, u *User) error { @@ -2351,7 +2359,7 @@ func (s *Shuttle) handleImportDeal(c echo.Context, u *User) error { break } - contid, err := s.createContent(ctx, u, cc, body.Name, body.ContentInCollection) + contid, err := s.createContent(ctx, u, cc, body.Name, body.ContentInCollection, c.QueryParam("ignore-dupes")) if err != nil { return err }