diff --git a/.drone.star b/.drone.star index 64f7da67ef0..9b1deba8c3c 100644 --- a/.drone.star +++ b/.drone.star @@ -834,11 +834,13 @@ def wopiValidatorTests(ctx, storage, accounts_hash_difficulty = 4): "image": OC_CI_ALPINE, "environment": {}, "commands": [ - "curl -k 'https://ocis-server:9200/remote.php/webdav/test.wopitest' --fail --retry-connrefused --retry 7 --retry-all-errors -X PUT -u admin:admin -D headers.txt", + "curl -v -X PUT 'https://ocis-server:9200/remote.php/webdav/test.wopitest' -k --fail --retry-connrefused --retry 7 --retry-all-errors -u admin:admin -D headers.txt", + "cat headers.txt", "export FILE_ID=$(cat headers.txt | sed -n -e 's/^.*Oc-Fileid: //p')", "export URL=\"https://ocis-server:9200/app/open?app_name=FakeOffice&file_id=$FILE_ID\"", "export URL=$(echo $URL | tr -d '[:cntrl:]')", - "curl -k -X POST \"$URL\" -u admin:admin -v > open.json", + "curl -v -X POST \"$URL\" -k --fail --retry-connrefused --retry 7 --retry-all-errors -u admin:admin > open.json", + "cat open.json", "cat open.json | jq .form_parameters.access_token | tr -d '\"' > accesstoken", "cat open.json | jq .form_parameters.access_token_ttl | tr -d '\"' > accesstokenttl", "echo -n 'http://wopiserver:8880/wopi/files/' > wopisrc", diff --git a/changelog/unreleased/async-postprocessing.md b/changelog/unreleased/async-postprocessing.md new file mode 100644 index 00000000000..59695ea4e70 --- /dev/null +++ b/changelog/unreleased/async-postprocessing.md @@ -0,0 +1,5 @@ +Enhancement: Async Postprocessing + +Provides functionality for async postprocessing. This will allow the system to do the postprocessing (virusscan, copying of bytes to their final destination, ...) asynchronous to the users request. Major change when active. + +https://github.com/owncloud/ocis/pull/5207 diff --git a/go.mod b/go.mod index 13e627ac6de..f1e0ff5a3b3 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/blevesearch/bleve/v2 v2.3.5 github.com/coreos/go-oidc/v3 v3.4.0 github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965 - github.com/cs3org/reva/v2 v2.12.0 + github.com/cs3org/reva/v2 v2.12.1-0.20221214090401-47e0591bb902 github.com/disintegration/imaging v1.6.2 github.com/ggwhite/go-masker v1.0.9 github.com/go-chi/chi/v5 v5.0.7 diff --git a/go.sum b/go.sum index 36d464c6311..3be9009eb58 100644 --- a/go.sum +++ b/go.sum @@ -341,8 +341,8 @@ github.com/crewjam/saml v0.4.6 h1:XCUFPkQSJLvzyl4cW9OvpWUbRf0gE7VUpU8ZnilbeM4= github.com/crewjam/saml v0.4.6/go.mod h1:ZBOXnNPFzB3CgOkRm7Nd6IVdkG+l/wF+0ZXLqD96t1A= github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965 h1:y4n2j68LLnvac+zw/al8MfPgO5aQiIwLmHM/JzYN8AM= github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= -github.com/cs3org/reva/v2 v2.12.0 h1:KGQnNje13BbWuQBnxnWKyk+JjYTrETE8Q71KqKpzQQo= -github.com/cs3org/reva/v2 v2.12.0/go.mod h1:+lH5G0UmNjMNj4F0bDhbh+HqL1UihlbL8zPBa57Y2QI= +github.com/cs3org/reva/v2 v2.12.1-0.20221214090401-47e0591bb902 h1:r8K9y0RMFXjQlrbx17iQziWYhNyAYmh70ixaXbQHsHY= +github.com/cs3org/reva/v2 v2.12.1-0.20221214090401-47e0591bb902/go.mod h1:GpocVB1w6yxeSr1VBsO9jztmt1SyNC4lCwudLwDzxHQ= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= diff --git a/ocis-pkg/config/config.go b/ocis-pkg/config/config.go index 968019372a7..da0b57e625a 100644 --- a/ocis-pkg/config/config.go +++ b/ocis-pkg/config/config.go @@ -19,6 +19,7 @@ import ( notifications "github.com/owncloud/ocis/v2/services/notifications/pkg/config" ocdav "github.com/owncloud/ocis/v2/services/ocdav/pkg/config" ocs "github.com/owncloud/ocis/v2/services/ocs/pkg/config" + postprocessing "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" proxy "github.com/owncloud/ocis/v2/services/proxy/pkg/config" search "github.com/owncloud/ocis/v2/services/search/pkg/config" settings "github.com/owncloud/ocis/v2/services/settings/pkg/config" @@ -76,33 +77,34 @@ type Config struct { AdminUserID string `yaml:"admin_user_id" env:"OCIS_ADMIN_USER_ID" desc:"ID of a user, that should receive admin privileges."` Runtime Runtime `yaml:"runtime"` - AppProvider *appProvider.Config `yaml:"app_provider"` - AppRegistry *appRegistry.Config `yaml:"app_registry"` - Audit *audit.Config `yaml:"audit"` - AuthBasic *authbasic.Config `yaml:"auth_basic"` - AuthBearer *authbearer.Config `yaml:"auth_bearer"` - AuthMachine *authmachine.Config `yaml:"auth_machine"` - Frontend *frontend.Config `yaml:"frontend"` - Gateway *gateway.Config `yaml:"gateway"` - Graph *graph.Config `yaml:"graph"` - Groups *groups.Config `yaml:"groups"` - IDM *idm.Config `yaml:"idm"` - IDP *idp.Config `yaml:"idp"` - Nats *nats.Config `yaml:"nats"` - Notifications *notifications.Config `yaml:"notifications"` - OCDav *ocdav.Config `yaml:"ocdav"` - OCS *ocs.Config `yaml:"ocs"` - Proxy *proxy.Config `yaml:"proxy"` - Settings *settings.Config `yaml:"settings"` - Sharing *sharing.Config `yaml:"sharing"` - StorageSystem *storagesystem.Config `yaml:"storage_system"` - StoragePublicLink *storagepublic.Config `yaml:"storage_public"` - StorageShares *storageshares.Config `yaml:"storage_shares"` - StorageUsers *storageusers.Config `yaml:"storage_users"` - Store *store.Config `yaml:"store"` - Thumbnails *thumbnails.Config `yaml:"thumbnails"` - Users *users.Config `yaml:"users"` - Web *web.Config `yaml:"web"` - WebDAV *webdav.Config `yaml:"webdav"` - Search *search.Config `yaml:"search"` + AppProvider *appProvider.Config `yaml:"app_provider"` + AppRegistry *appRegistry.Config `yaml:"app_registry"` + Audit *audit.Config `yaml:"audit"` + AuthBasic *authbasic.Config `yaml:"auth_basic"` + AuthBearer *authbearer.Config `yaml:"auth_bearer"` + AuthMachine *authmachine.Config `yaml:"auth_machine"` + Frontend *frontend.Config `yaml:"frontend"` + Gateway *gateway.Config `yaml:"gateway"` + Graph *graph.Config `yaml:"graph"` + Groups *groups.Config `yaml:"groups"` + IDM *idm.Config `yaml:"idm"` + IDP *idp.Config `yaml:"idp"` + Nats *nats.Config `yaml:"nats"` + Notifications *notifications.Config `yaml:"notifications"` + OCDav *ocdav.Config `yaml:"ocdav"` + OCS *ocs.Config `yaml:"ocs"` + Postprocessing *postprocessing.Config `yaml:"postprocessing"` + Proxy *proxy.Config `yaml:"proxy"` + Settings *settings.Config `yaml:"settings"` + Sharing *sharing.Config `yaml:"sharing"` + StorageSystem *storagesystem.Config `yaml:"storage_system"` + StoragePublicLink *storagepublic.Config `yaml:"storage_public"` + StorageShares *storageshares.Config `yaml:"storage_shares"` + StorageUsers *storageusers.Config `yaml:"storage_users"` + Store *store.Config `yaml:"store"` + Thumbnails *thumbnails.Config `yaml:"thumbnails"` + Users *users.Config `yaml:"users"` + Web *web.Config `yaml:"web"` + WebDAV *webdav.Config `yaml:"webdav"` + Search *search.Config `yaml:"search"` } diff --git a/ocis-pkg/config/defaultconfig.go b/ocis-pkg/config/defaultconfig.go index 261db77b4cf..b9f959223a1 100644 --- a/ocis-pkg/config/defaultconfig.go +++ b/ocis-pkg/config/defaultconfig.go @@ -17,6 +17,7 @@ import ( notifications "github.com/owncloud/ocis/v2/services/notifications/pkg/config/defaults" ocdav "github.com/owncloud/ocis/v2/services/ocdav/pkg/config/defaults" ocs "github.com/owncloud/ocis/v2/services/ocs/pkg/config/defaults" + postprocessing "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/defaults" proxy "github.com/owncloud/ocis/v2/services/proxy/pkg/config/defaults" search "github.com/owncloud/ocis/v2/services/search/pkg/config/defaults" settings "github.com/owncloud/ocis/v2/services/settings/pkg/config/defaults" @@ -56,6 +57,7 @@ func DefaultConfig() *Config { Notifications: notifications.DefaultConfig(), OCDav: ocdav.DefaultConfig(), OCS: ocs.DefaultConfig(), + Postprocessing: postprocessing.DefaultConfig(), Proxy: proxy.DefaultConfig(), Search: search.FullDefaultConfig(), Settings: settings.DefaultConfig(), diff --git a/ocis/pkg/runtime/service/service.go b/ocis/pkg/runtime/service/service.go index f809b2d3481..f05e5fc2002 100644 --- a/ocis/pkg/runtime/service/service.go +++ b/ocis/pkg/runtime/service/service.go @@ -34,6 +34,7 @@ import ( notifications "github.com/owncloud/ocis/v2/services/notifications/pkg/command" ocdav "github.com/owncloud/ocis/v2/services/ocdav/pkg/command" ocs "github.com/owncloud/ocis/v2/services/ocs/pkg/command" + postprocessing "github.com/owncloud/ocis/v2/services/postprocessing/pkg/command" proxy "github.com/owncloud/ocis/v2/services/proxy/pkg/command" search "github.com/owncloud/ocis/v2/services/search/pkg/command" settings "github.com/owncloud/ocis/v2/services/settings/pkg/command" @@ -125,6 +126,7 @@ func NewService(options ...Option) (*Service, error) { s.ServicesRegistry[opts.Config.AppProvider.Service.Name] = appProvider.NewSutureService s.ServicesRegistry[opts.Config.Notifications.Service.Name] = notifications.NewSutureService s.ServicesRegistry[opts.Config.Search.Service.Name] = search.NewSutureService + s.ServicesRegistry[opts.Config.Postprocessing.Service.Name] = postprocessing.NewSutureService // populate delayed services s.Delayed[opts.Config.Sharing.Service.Name] = sharing.NewSutureService @@ -262,7 +264,7 @@ func (s *Service) generateRunSet(cfg *ociscfg.Config) { } // List running processes for the Service Controller. -func (s *Service) List(args struct{}, reply *string) error { +func (s *Service) List(_ struct{}, reply *string) error { tableString := &strings.Builder{} table := tablewriter.NewWriter(tableString) table.SetHeader([]string{"Service"}) diff --git a/services/postprocessing/Makefile b/services/postprocessing/Makefile new file mode 100644 index 00000000000..0bdc1157223 --- /dev/null +++ b/services/postprocessing/Makefile @@ -0,0 +1,37 @@ +SHELL := bash +NAME := postprocessing + +include ../../.make/recursion.mk + +############ tooling ############ +ifneq (, $(shell command -v go 2> /dev/null)) # suppress `command not found warnings` for non go targets in CI +include ../../.bingo/Variables.mk +endif + +############ go tooling ############ +include ../../.make/go.mk + +############ release ############ +include ../../.make/release.mk + +############ docs generate ############ +include ../../.make/docs.mk + +.PHONY: docs-generate +docs-generate: config-docs-generate + +############ generate ############ +include ../../.make/generate.mk + +.PHONY: ci-go-generate +ci-go-generate: # CI runs ci-node-generate automatically before this target + +.PHONY: ci-node-generate +ci-node-generate: + +############ licenses ############ +.PHONY: ci-node-check-licenses +ci-node-check-licenses: + +.PHONY: ci-node-save-licenses +ci-node-save-licenses: diff --git a/services/postprocessing/cmd/postprocessing/main.go b/services/postprocessing/cmd/postprocessing/main.go new file mode 100644 index 00000000000..ddc191e3a39 --- /dev/null +++ b/services/postprocessing/cmd/postprocessing/main.go @@ -0,0 +1,14 @@ +package main + +import ( + "os" + + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/command" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/defaults" +) + +func main() { + if err := command.Execute(defaults.DefaultConfig()); err != nil { + os.Exit(1) + } +} diff --git a/services/postprocessing/pkg/command/health.go b/services/postprocessing/pkg/command/health.go new file mode 100644 index 00000000000..0a9fe13bdf5 --- /dev/null +++ b/services/postprocessing/pkg/command/health.go @@ -0,0 +1,18 @@ +package command + +import ( + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/urfave/cli/v2" +) + +// Health is the entrypoint for the health command. +func Health(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "health", + Usage: "Check health status", + Action: func(c *cli.Context) error { + // Not implemented + return nil + }, + } +} diff --git a/services/postprocessing/pkg/command/root.go b/services/postprocessing/pkg/command/root.go new file mode 100644 index 00000000000..0b558d9b787 --- /dev/null +++ b/services/postprocessing/pkg/command/root.go @@ -0,0 +1,56 @@ +package command + +import ( + "context" + "os" + + "github.com/owncloud/ocis/v2/ocis-pkg/clihelper" + ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/thejerf/suture/v4" + "github.com/urfave/cli/v2" +) + +// GetCommands provides all commands for this service +func GetCommands(cfg *config.Config) cli.Commands { + return []*cli.Command{ + // start this service + Server(cfg), + + // interaction with this service + + // infos about this service + Health(cfg), + Version(cfg), + } +} + +// Execute is the entry point for the postprocessing command. +func Execute(cfg *config.Config) error { + app := clihelper.DefaultApp(&cli.App{ + Name: "postprocessing", + Usage: "starts postprocessing service", + Commands: GetCommands(cfg), + }) + + return app.Run(os.Args) +} + +// SutureService allows for the postprocessing command to be embedded and supervised by a suture supervisor tree. +type SutureService struct { + cfg *config.Config +} + +// NewSutureService creates a new postprocessing.SutureService +func NewSutureService(cfg *ociscfg.Config) suture.Service { + cfg.Postprocessing.Commons = cfg.Commons + return SutureService{ + cfg: cfg.Postprocessing, + } +} + +// Serve to implement Server interface +func (s SutureService) Serve(ctx context.Context) error { + s.cfg.Context = ctx + return Execute(s.cfg) +} diff --git a/services/postprocessing/pkg/command/server.go b/services/postprocessing/pkg/command/server.go new file mode 100644 index 00000000000..09540f4abb8 --- /dev/null +++ b/services/postprocessing/pkg/command/server.go @@ -0,0 +1,75 @@ +package command + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "os" + + "github.com/cs3org/reva/v2/pkg/events/server" + "github.com/go-micro/plugins/v4/events/natsjs" + ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/parser" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/logging" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/service" + "github.com/urfave/cli/v2" +) + +// Server is the entrypoint for the server command. +func Server(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "server", + Usage: fmt.Sprintf("start %s service without runtime (unsupervised mode)", cfg.Service.Name), + Category: "server", + Before: func(c *cli.Context) error { + err := parser.ParseConfig(cfg) + if err != nil { + fmt.Printf("%v", err) + os.Exit(1) + } + return err + }, + Action: func(c *cli.Context) error { + logger := logging.Configure(cfg.Service.Name, cfg.Log) + + evtsCfg := cfg.Postprocessing.Events + var tlsConf *tls.Config + + if !evtsCfg.TLSInsecure { + var rootCAPool *x509.CertPool + if evtsCfg.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) + if err != nil { + return err + } + + rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) + if err != nil { + return err + } + evtsCfg.TLSInsecure = false + } + + tlsConf = &tls.Config{ + RootCAs: rootCAPool, + } + } + + bus, err := server.NewNatsStream( + natsjs.TLSConfig(tlsConf), + natsjs.Address(evtsCfg.Endpoint), + natsjs.ClusterID(evtsCfg.Cluster), + ) + if err != nil { + return err + } + + svc, err := service.NewPostprocessingService(bus, logger, cfg.Postprocessing) + if err != nil { + return err + } + return svc.Run() + }, + } +} diff --git a/services/postprocessing/pkg/command/version.go b/services/postprocessing/pkg/command/version.go new file mode 100644 index 00000000000..5d52c94a8ca --- /dev/null +++ b/services/postprocessing/pkg/command/version.go @@ -0,0 +1,19 @@ +package command + +import ( + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/urfave/cli/v2" +) + +// Version prints the service versions of all running instances. +func Version(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "version", + Usage: "print the version of this binary and the running extension instances", + Category: "info", + Action: func(c *cli.Context) error { + // not implemented + return nil + }, + } +} diff --git a/services/postprocessing/pkg/config/config.go b/services/postprocessing/pkg/config/config.go new file mode 100644 index 00000000000..479b8db8fbb --- /dev/null +++ b/services/postprocessing/pkg/config/config.go @@ -0,0 +1,37 @@ +package config + +import ( + "context" + "time" + + "github.com/owncloud/ocis/v2/ocis-pkg/shared" +) + +// Config combines all available configuration parts. +type Config struct { + Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service + + Service Service `yaml:"-"` + + Log *Log `yaml:"log"` + + Postprocessing Postprocessing `yaml:"postprocessing"` + + Context context.Context `yaml:"-"` +} + +// Postprocessing definces the config options for the postprocessing service. +type Postprocessing struct { + Events Events `yaml:"events"` + Virusscan bool `yaml:"virusscan" env:"POSTPROCESSING_VIRUSSCAN" desc:"should the system do a virusscan? Needs antivirus service"` + Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"the sytem sleeps for this time while postprocessing"` +} + +// Events combines the configuration options for the event bus. +type Events struct { + Endpoint string `yaml:"endpoint" env:"POSTPROCESSING_EVENTS_ENDPOINT" desc:"Endpoint of the event system."` + Cluster string `yaml:"cluster" env:"POSTPROCESSING_EVENTS_CLUSTER" desc:"Cluster ID of the event system."` + + TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;SEARCH_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."` + TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"SEARCH_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SEARCH_EVENTS_TLS_INSECURE will be seen as false."` +} diff --git a/services/postprocessing/pkg/config/defaults/defaultconfig.go b/services/postprocessing/pkg/config/defaults/defaultconfig.go new file mode 100644 index 00000000000..70dd45ac110 --- /dev/null +++ b/services/postprocessing/pkg/config/defaults/defaultconfig.go @@ -0,0 +1,47 @@ +package defaults + +import ( + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" +) + +// FullDefaultConfig returns a full sanitized config +func FullDefaultConfig() *config.Config { + cfg := DefaultConfig() + EnsureDefaults(cfg) + Sanitize(cfg) + return cfg +} + +// DefaultConfig is the default configuration +func DefaultConfig() *config.Config { + return &config.Config{ + Service: config.Service{ + Name: "postprocessing", + }, + Postprocessing: config.Postprocessing{ + Events: config.Events{ + Endpoint: "127.0.0.1:9233", + Cluster: "ocis-cluster", + }, + }, + } +} + +// EnsureDefaults ensures defaults on a config +func EnsureDefaults(cfg *config.Config) { + // provide with defaults for shared logging, since we need a valid destination address for BindEnv. + if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil { + cfg.Log = &config.Log{ + Level: cfg.Commons.Log.Level, + Pretty: cfg.Commons.Log.Pretty, + Color: cfg.Commons.Log.Color, + File: cfg.Commons.Log.File, + } + } else if cfg.Log == nil { + cfg.Log = &config.Log{} + } +} + +// Sanitize does nothing atm +func Sanitize(cfg *config.Config) { +} diff --git a/services/postprocessing/pkg/config/log.go b/services/postprocessing/pkg/config/log.go new file mode 100644 index 00000000000..bdbd36bd54e --- /dev/null +++ b/services/postprocessing/pkg/config/log.go @@ -0,0 +1,9 @@ +package config + +// Log defines the available log configuration. +type Log struct { + Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;POSTPROCESSING_LOG_LEVEL" desc:"The log level. Valid values are: \"panic\", \"fatal\", \"error\", \"warn\", \"info\", \"debug\", \"trace\"."` + Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;POSTPROCESSING_LOG_PRETTY" desc:"Activates pretty log output."` + Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;POSTPROCESSING_LOG_COLOR" desc:"Activates colorized log output."` + File string `mapstructure:"file" env:"OCIS_LOG_FILE;POSTPROCESSING_LOG_FILE" desc:"The path to the log file. Activates logging to this file if set."` +} diff --git a/services/postprocessing/pkg/config/parser/parse.go b/services/postprocessing/pkg/config/parser/parse.go new file mode 100644 index 00000000000..03441f9415f --- /dev/null +++ b/services/postprocessing/pkg/config/parser/parse.go @@ -0,0 +1,37 @@ +package parser + +import ( + "errors" + + ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/defaults" + + "github.com/owncloud/ocis/v2/ocis-pkg/config/envdecode" +) + +// ParseConfig loads configuration from known paths. +func ParseConfig(cfg *config.Config) error { + _, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg) + if err != nil { + return err + } + + defaults.EnsureDefaults(cfg) + + // load all env variables relevant to the config in the current context. + if err := envdecode.Decode(cfg); err != nil { + // no environment variable set for this config is an expected "error" + if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) { + return err + } + } + + defaults.Sanitize(cfg) + + return Validate(cfg) +} + +func Validate(cfg *config.Config) error { + return nil +} diff --git a/services/postprocessing/pkg/config/service.go b/services/postprocessing/pkg/config/service.go new file mode 100644 index 00000000000..d1eac383f0b --- /dev/null +++ b/services/postprocessing/pkg/config/service.go @@ -0,0 +1,6 @@ +package config + +// Service defines the available service configuration. +type Service struct { + Name string `yaml:"-"` +} diff --git a/services/postprocessing/pkg/logging/logging.go b/services/postprocessing/pkg/logging/logging.go new file mode 100644 index 00000000000..ae5a674a0ea --- /dev/null +++ b/services/postprocessing/pkg/logging/logging.go @@ -0,0 +1,17 @@ +package logging + +import ( + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" +) + +// LoggerFromConfig initializes a service-specific logger instance. +func Configure(name string, cfg *config.Log) log.Logger { + return log.NewLogger( + log.Name(name), + log.Level(cfg.Level), + log.Pretty(cfg.Pretty), + log.Color(cfg.Color), + log.File(cfg.File), + ) +} diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go new file mode 100644 index 00000000000..7875285a9dc --- /dev/null +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -0,0 +1,118 @@ +package postprocessing + +import ( + "time" + + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" +) + +// Postprocessing handles postprocessing of a file +type Postprocessing struct { + id string + url string + u *user.User + m map[events.Postprocessingstep]interface{} + filename string + filesize uint64 + resourceID *provider.ResourceId + c config.Postprocessing + steps []events.Postprocessingstep +} + +// New returns a new postprocessing instance +func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, c config.Postprocessing) *Postprocessing { + return &Postprocessing{ + id: uploadID, + url: uploadURL, + u: user, + m: make(map[events.Postprocessingstep]interface{}), + c: c, + filename: filename, + filesize: filesize, + resourceID: resourceID, + steps: getSteps(c), + } +} + +// Init is the first step of the postprocessing +func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} { + pp.m["init"] = ev + + if len(pp.steps) == 0 { + return pp.finished(events.PPOutcomeContinue) + } + + return pp.nextStep(pp.steps[0]) +} + +// Virusscan is the virusscanning step of the postprocessing +func (pp *Postprocessing) Virusscan(ev events.VirusscanFinished) interface{} { + pp.m[events.PPStepAntivirus] = ev + + switch ev.Outcome { + case events.PPOutcomeContinue: + return pp.next(events.PPStepAntivirus) + default: + return pp.finished(ev.Outcome) + + } +} + +// Delay will sleep the configured time then continue +func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} { + pp.m[events.PPStepDelay] = ev + time.Sleep(pp.c.Delayprocessing) + return pp.next(events.PPStepDelay) +} + +func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} { + l := len(pp.steps) + for i, s := range pp.steps { + if s == current && i+1 < l { + return pp.nextStep(pp.steps[i+1]) + } + } + return pp.finished(events.PPOutcomeContinue) +} + +func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartPostprocessingStep { + return events.StartPostprocessingStep{ + UploadID: pp.id, + URL: pp.url, + ExecutingUser: pp.u, + Filename: pp.filename, + Filesize: pp.filesize, + ResourceID: pp.resourceID, + StepToStart: next, + } +} + +func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished { + return events.PostprocessingFinished{ + UploadID: pp.id, + Result: pp.m, + ExecutingUser: pp.u, + Filename: pp.filename, + Outcome: outcome, + } +} + +func getSteps(c config.Postprocessing) []events.Postprocessingstep { + // NOTE: first version only contains very basic configuration options + // But we aim for a system where postprocessing steps and their order can be configured per space + // ideally by the spaceadmin itself + // We need to iterate over configuring PP service when we see fit + var steps []events.Postprocessingstep + if c.Delayprocessing != 0 { + steps = append(steps, events.PPStepDelay) + } + + if c.Virusscan { + steps = append(steps, events.PPStepAntivirus) + } + + return steps +} diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go new file mode 100644 index 00000000000..e442cb76a26 --- /dev/null +++ b/services/postprocessing/pkg/service/service.go @@ -0,0 +1,75 @@ +package service + +import ( + "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing" +) + +// PostprocessingService is an instance of the service handling postprocessing of files +type PostprocessingService struct { + log log.Logger + events <-chan interface{} + pub events.Publisher + c config.Postprocessing +} + +// NewPostprocessingService returns a new instance of a postprocessing service +func NewPostprocessingService(stream events.Stream, logger log.Logger, c config.Postprocessing) (*PostprocessingService, error) { + evs, err := events.Consume(stream, "postprocessing", + events.BytesReceived{}, + events.StartPostprocessingStep{}, + events.VirusscanFinished{}, + events.UploadReady{}, + ) + if err != nil { + return nil, err + } + + return &PostprocessingService{ + log: logger, + events: evs, + pub: stream, + c: c, + }, nil +} + +// Run to fulfil Runner interface +func (pps *PostprocessingService) Run() error { + current := make(map[string]*postprocessing.Postprocessing) + for e := range pps.events { + var next interface{} + switch ev := e.(type) { + case events.BytesReceived: + pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.c) + current[ev.UploadID] = pp + next = pp.Init(ev) + case events.VirusscanFinished: + pp := current[ev.UploadID] + if pp == nil { + // no current upload - this was an on demand scan + continue + } + next = pp.Virusscan(ev) + case events.StartPostprocessingStep: + if ev.StepToStart != events.PPStepDelay { + continue + } + pp := current[ev.UploadID] + next = pp.Delay(ev) + case events.UploadReady: + // the storage provider thinks the upload is done - so no need to keep it any more + delete(current, ev.UploadID) + } + + if next != nil { + if err := events.Publish(pps.pub, next); err != nil { + pps.log.Error().Err(err).Msg("unable to publish event") + return err // we can't publish -> we are screwed + } + } + + } + return nil +} diff --git a/services/storage-users/pkg/config/config.go b/services/storage-users/pkg/config/config.go index 2d847f821b3..25fd07e50bf 100644 --- a/services/storage-users/pkg/config/config.go +++ b/services/storage-users/pkg/config/config.go @@ -24,6 +24,8 @@ type Config struct { Driver string `yaml:"driver" env:"STORAGE_USERS_DRIVER" desc:"The storage driver which should be used by the service"` Drivers Drivers `yaml:"drivers"` DataServerURL string `yaml:"data_server_url" env:"STORAGE_USERS_DATA_SERVER_URL" desc:"URL of the data server, needs to be reachable by the data gateway provided by the frontend service or the user if directly exposed."` + DataGatewayURL string `yaml:"data_gateway_url" env:"STORAGE_USERS_DATA_GATEWAY_URL" desc:"URL of the data gateway server"` + TransferExpires int64 `yaml:"transfer_expires" env:"STORAGE_USERS_TRANSFER_EXPIRES" desc:"the time after which the token for upload postprocessing expires"` Events Events `yaml:"events"` Cache Cache `yaml:"cache"` MountID string `yaml:"mount_id" env:"STORAGE_USERS_MOUNT_ID" desc:"Mount ID of this storage."` @@ -98,6 +100,7 @@ type OCISDriver struct { ShareFolder string `yaml:"share_folder" env:"STORAGE_USERS_OCIS_SHARE_FOLDER" desc:"Name of the folder jailing all shares."` MaxAcquireLockCycles int `yaml:"max_acquire_lock_cycles" env:"STORAGE_USERS_OCIS_MAX_ACQUIRE_LOCK_CYCLES" desc:"When trying to lock files, ocis will try this amount of times to acquire the lock before failing. After each try it will wait for an increasing amount of time. Values of 0 or below will be ignored and the default value of 20 will be used."` LockCycleDurationFactor int `yaml:"lock_cycle_duration_factor" env:"STORAGE_USERS_OCIS_LOCK_CYCLE_DURATION_FACTOR" desc:"When trying to lock files, ocis will multiply the cycle with this factor and use it as a millisecond timeout. Values of 0 or below will be ignored and the default value of 30 will be used."` + AsyncUploads bool `yaml:"async_uploads" env:"STORAGE_USERS_OCIS_ASYNC_UPLOADS" desc:"Enable asynchronous file uploads."` } type S3NGDriver struct { @@ -143,6 +146,7 @@ type Events struct { TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;STORAGE_USERS_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."` TLSRootCaCertPath string `yaml:"tls_root_ca_cert_path" env:"STORAGE_USERS_EVENTS_TLS_ROOT_CA_CERT" desc:"The root CA certificate used to validate the server's TLS certificate. If provided STORAGE_USERS_EVENTS_TLS_INSECURE will be seen as false."` EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;STORAGE_USERS_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."` + NumConsumers int `yaml:"num_consumers" env:"STORAGE_USERS_EVENTS_NUM_CONSUMERS" desc:"The amount of concurrent event consumers to start. Event consumers are used for post-processing files. Multiple consumers increase parallelisation, but will also increase CPU and memory demands. The setting has no effect when the STORAGE_USERS_OCIS_ASYNC_UPLOADS is set to false. The default and minimum value is 1."` } // Cache holds cache config diff --git a/services/storage-users/pkg/config/defaults/defaultconfig.go b/services/storage-users/pkg/config/defaults/defaultconfig.go index 999627402bb..83e2d6470f9 100644 --- a/services/storage-users/pkg/config/defaults/defaultconfig.go +++ b/services/storage-users/pkg/config/defaults/defaultconfig.go @@ -39,6 +39,8 @@ func DefaultConfig() *config.Config { }, Reva: shared.DefaultRevaConfig(), DataServerURL: "http://localhost:9158/data", + DataGatewayURL: "https://localhost:9200/data", + TransferExpires: 86400, UploadExpiration: 24 * 60 * 60, Driver: "ocis", Drivers: config.Drivers{ diff --git a/services/storage-users/pkg/revaconfig/user.go b/services/storage-users/pkg/revaconfig/user.go index 61d54e64900..0e6830408da 100644 --- a/services/storage-users/pkg/revaconfig/user.go +++ b/services/storage-users/pkg/revaconfig/user.go @@ -98,6 +98,25 @@ func UserDrivers(cfg *config.Config) map[string]interface{} { "permissionssvc_tls_mode": cfg.Commons.GRPCClientTLS.Mode, "max_acquire_lock_cycles": cfg.Drivers.OCIS.MaxAcquireLockCycles, "lock_cycle_duration_factor": cfg.Drivers.OCIS.LockCycleDurationFactor, + "asyncfileuploads": cfg.Drivers.OCIS.AsyncUploads, + "statcache": map[string]interface{}{ + "cache_store": cfg.Cache.Store, + "cache_nodes": cfg.Cache.Nodes, + "cache_database": cfg.Cache.Database, + }, + "events": map[string]interface{}{ + "natsaddress": cfg.Events.Addr, + "natsclusterid": cfg.Events.ClusterID, + "tlsinsecure": cfg.Events.TLSInsecure, + "tlsrootcacertificate": cfg.Events.TLSRootCaCertPath, + "numconsumers": cfg.Events.NumConsumers, + }, + "tokens": map[string]interface{}{ + "transfer_shared_secret": cfg.Commons.TransferSecret, + "transfer_expires": cfg.TransferExpires, + "download_endpoint": cfg.DataServerURL, + "datagateway_endpoint": cfg.DataGatewayURL, + }, }, "s3": map[string]interface{}{ "enable_home": false, @@ -125,6 +144,25 @@ func UserDrivers(cfg *config.Config) map[string]interface{} { "s3.bucket": cfg.Drivers.S3NG.Bucket, "max_acquire_lock_cycles": cfg.Drivers.S3NG.MaxAcquireLockCycles, "lock_cycle_duration_factor": cfg.Drivers.S3NG.LockCycleDurationFactor, + "asyncfileuploads": cfg.Drivers.OCIS.AsyncUploads, + "statcache": map[string]interface{}{ + "cache_store": cfg.Cache.Store, + "cache_nodes": cfg.Cache.Nodes, + "cache_database": cfg.Cache.Database, + }, + "events": map[string]interface{}{ + "natsaddress": cfg.Events.Addr, + "natsclusterid": cfg.Events.ClusterID, + "tlsinsecure": cfg.Events.TLSInsecure, + "tlsrootcacertificate": cfg.Events.TLSRootCaCertPath, + "numconsumers": cfg.Events.NumConsumers, + }, + "tokens": map[string]interface{}{ + "transfer_shared_secret": cfg.Commons.TransferSecret, + "transfer_expires": cfg.TransferExpires, + "download_endpoint": cfg.DataServerURL, + "datagateway_endpoint": cfg.DataGatewayURL, + }, }, } }