diff --git a/changelog.d/5-internal/WPB-24076 b/changelog.d/5-internal/WPB-24076 new file mode 100644 index 00000000000..caf6d968821 --- /dev/null +++ b/changelog.d/5-internal/WPB-24076 @@ -0,0 +1 @@ +Add meeting cleaner job in `background-worker`. diff --git a/charts/integration/templates/configmap.yaml b/charts/integration/templates/configmap.yaml index 82fc9895284..e6a2a9b7955 100644 --- a/charts/integration/templates/configmap.yaml +++ b/charts/integration/templates/configmap.yaml @@ -54,7 +54,7 @@ data: port: 8080 backgroundWorker: - host: backgroundWorker.{{ .Release.Namespace }}.svc.cluster.local + host: background-worker.{{ .Release.Namespace }}.svc.cluster.local port: 8080 # Background jobs defaults for integration tests backgroundJobs: @@ -145,7 +145,7 @@ data: port: 8080 backgroundWorker: - host: backgroundWorker.{{ .Release.Namespace }}-fed2.svc.cluster.local + host: background-worker.{{ .Release.Namespace }}-fed2.svc.cluster.local port: 8080 stern: @@ -212,7 +212,7 @@ data: host: proxy.wire-federation-v0.svc.cluster.local port: 8080 backgroundWorker: - host: backgroundWorker.wire-federation-v0.svc.cluster.local + host: background-worker.wire-federation-v0.svc.cluster.local port: 8080 stern: host: stern.wire-federation-v0.svc.cluster.local @@ -255,7 +255,7 @@ data: host: proxy.wire-federation-v1.svc.cluster.local port: 8080 backgroundWorker: - host: backgroundWorker.wire-federation-v1.svc.cluster.local + host: background-worker.wire-federation-v1.svc.cluster.local port: 8080 stern: host: stern.wire-federation-v1.svc.cluster.local @@ -298,7 +298,7 @@ data: host: proxy.wire-federation-v2.svc.cluster.local port: 8080 backgroundWorker: - host: backgroundWorker.wire-federation-v2.svc.cluster.local + host: background-worker.wire-federation-v2.svc.cluster.local port: 8080 stern: host: stern.wire-federation-v2.svc.cluster.local diff --git a/charts/wire-server/templates/background-worker/configmap.yaml b/charts/wire-server/templates/background-worker/configmap.yaml index 7c8ef9aee43..3b1bc15cbb4 100644 --- a/charts/wire-server/templates/background-worker/configmap.yaml +++ b/charts/wire-server/templates/background-worker/configmap.yaml @@ -90,6 +90,10 @@ data: {{toYaml .backendNotificationPusher | indent 6 }} {{- with .backgroundJobs }} backgroundJobs: +{{ toYaml . | indent 6 }} + {{- end }} + {{- with .meetingsCleanup }} + meetingsCleanup: {{ toYaml . | indent 6 }} {{- end }} {{- if .postgresMigration }} diff --git a/charts/wire-server/values.yaml b/charts/wire-server/values.yaml index 77dd4b6953d..3480344677f 100644 --- a/charts/wire-server/values.yaml +++ b/charts/wire-server/values.yaml @@ -977,6 +977,15 @@ background-worker: # Total attempts, including the first try maxAttempts: 3 + # Meetings cleanup configuration + meetingsCleanup: + # Delete meetings older than this many hours (48 hours = 2 days) + cleanOlderThanHours: 48.0 + # Maximum number of meetings to delete per batch + batchSize: 1000 + # Cron schedule for the cleanup job (0 * * * * = every hour) + schedule: "0 * * * *" + # Controls where conversation data is stored/accessed postgresMigration: conversation: cassandra diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index fe0a19d0682..c9f2e7aca0d 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -313,6 +313,8 @@ galley: # See helmfile for the real value federationDomain: integration.example.com disabledAPIVersions: [] + meetings: + validityPeriod: "5s" # These values are insecure, against anyone getting hold of the hash, # but its not a concern for the integration tests. @@ -644,6 +646,10 @@ background-worker: concurrency: 8 jobTimeout: 60s maxAttempts: 3 + meetingsCleanup: + cleanOlderThanHours: 0.0014 + batchSize: 100 + schedule: "* * * * *" # Cassandra clusters used by background-worker cassandra: host: {{ .Values.cassandraHost }} diff --git a/integration/test/Test/Meetings.hs b/integration/test/Test/Meetings.hs index f3d47dbb698..86fe6f87867 100644 --- a/integration/test/Test/Meetings.hs +++ b/integration/test/Test/Meetings.hs @@ -4,10 +4,16 @@ module Test.Meetings where import API.Galley import qualified API.GalleyInternal as I +import Control.Monad.Reader (ask) +import qualified Data.Text as Text +import qualified Data.Text.Encoding as Text import Data.Time.Clock import qualified Data.Time.Format as Time import SetupHelpers +import System.Timeout (timeout) import Testlib.Prelude +import Text.Regex.TDFA ((=~)) +import UnliftIO.Concurrent (threadDelay) -- Helper to extract meetingId and domain from a meeting JSON object getMeetingIdAndDomain :: (HasCallStack) => Value -> App (String, String) @@ -359,3 +365,93 @@ testMeetingDeleteUnauthorized = do meeting <- getJSON 201 r1 (meetingId, domain) <- getMeetingIdAndDomain meeting deleteMeeting otherUser domain meetingId >>= assertStatus 404 + +testMeetingCleanup :: (HasCallStack) => App () +testMeetingCleanup = do + env <- ask + timedOutResult <- liftIO $ timeout (2 * 60 * 1_000_000) $ runAppWithEnv env $ do + -- 2 minutes timeout + (owner, _tid, _members) <- createTeam OwnDomain 1 + now <- liftIO getCurrentTime + -- Create a meeting that ends now. + -- Configured retention is 0.0014 hours (~5 seconds). + -- cutoffTime will be now' - 5s. + -- We need end_date < cutoffTime. + -- If we wait 6 seconds, now' = now + 6s. + -- cutoffTime = now + 6s - 5s = now + 1s. + -- end_date (now) < cutoffTime (now + 1s). + let startTime = addUTCTime (negate 3600) now + endTime = now + newMeeting = defaultMeetingJson "Cleanup Test" startTime endTime [] + + r1 <- postMeetings owner newMeeting + assertSuccess r1 + meeting <- getJSON 201 r1 + (meetingId, domain) <- getMeetingIdAndDomain meeting + + -- Wait 6 seconds to ensure meeting is old enough + liftIO $ threadDelay 6_000_000 + + -- Wait for cleanup job to run + waitForCleanupJob OwnDomain + + -- Check it's gone + getMeeting owner domain meetingId >>= assertStatus 404 + + case timedOutResult of + Just () -> pure () + Nothing -> assertFailure "testMeetingCleanup timed out after 2 minutes" + +waitForCleanupJob :: (HasCallStack, MakesValue domain) => domain -> App () +waitForCleanupJob domain = do + initialMetrics <- getMetricsBody domain + let initialCount = getRunCount initialMetrics + + waitForIncrease domain initialCount + where + getMetricsBody d = do + getMetrics d BackgroundWorker `bindResponse` \resp -> do + resp.status `shouldMatchInt` 200 + pure $ Text.unpack $ Text.decodeUtf8 resp.body + + getRunCount metrics = + let (_, _, _, matches) :: (String, String, String, [String]) = (metrics =~ "wire_meetings_cleanup_runs_total ([0-9]+)") + in case matches of + [val] -> read val :: Int + _ -> 0 + + waitForIncrease d oldVal = do + metrics <- getMetricsBody d + let newVal = getRunCount metrics + -- We wait until it increases. + -- Note: if oldVal was 0 (metric didn't exist), getting 0 again means it hasn't run. + -- If it runs, it should become >= 1. + -- But wait, if matches is empty, we return 0. + -- If the metric appears, it will be >= 1 (initialized at 0? Counter starts at 0). + -- If it runs, it increments. + when (newVal <= oldVal) $ do + liftIO $ threadDelay 1_000_000 -- Wait 1s + waitForIncrease d oldVal + +testMeetingExpiration :: (HasCallStack) => App () +testMeetingExpiration = do + (owner, _tid, _members) <- createTeam OwnDomain 1 + now <- liftIO getCurrentTime + let startTime = addUTCTime (negate 3600) now + -- meetingValidityPeriodSeconds is configured to 5 seconds in galley.integration.yaml + endTime = now + newMeeting = defaultMeetingJson "Expiring Meeting" startTime endTime [] + + r1 <- postMeetings owner newMeeting + assertSuccess r1 + meeting <- getJSON 201 r1 + (meetingId, domain) <- getMeetingIdAndDomain meeting + + -- Check it is accessible immediately (endDate = now, so valid until now + 5s) + getMeeting owner domain meetingId >>= assertStatus 200 + + -- Wait 6 seconds + liftIO $ threadDelay 6_000_000 + + -- Check it is expired + getMeeting owner domain meetingId >>= assertStatus 404 diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs index bc36f8de6bc..08fb1501af0 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs @@ -19,6 +19,7 @@ module Wire.ConversationStore.Cassandra ( interpretMLSCommitLockStoreToCassandra, interpretConversationStoreToCassandra, interpretConversationStoreToCassandraAndPostgres, + interpretConversationStoreByMigration, MigrationError (..), ) where @@ -80,6 +81,7 @@ import Wire.ConversationStore.Migration.Cleanup import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres) import Wire.MigrationLock import Wire.Postgres +import Wire.PostgresMigrationOpts (StorageLocation (..)) import Wire.Sem.Paging.Cassandra import Wire.StoredConversation import Wire.StoredConversation qualified as StoreConv @@ -1602,3 +1604,21 @@ withMigrationLocksAndUserCleanup cassClient lockType maxWait userIds action = . runInputConst cassClient $ cleanupIfNecessary (Right <$> userIds) action + +interpretConversationStoreByMigration :: + forall r a. + ( Member TinyLog r, + PGConstraints r, + Member Async r, + Member (Error MigrationError) r, + Member Race r + ) => + StorageLocation -> + ClientState -> + Sem (ConversationStore ': r) a -> + Sem r a +interpretConversationStoreByMigration storageLocation client = + case storageLocation of + CassandraStorage -> interpretConversationStoreToCassandra client + MigrationToPostgresql -> interpretConversationStoreToCassandraAndPostgres client + PostgresqlStorage -> interpretConversationStoreToPostgres diff --git a/libs/wire-subsystems/src/Wire/ConversationSubsystem.hs b/libs/wire-subsystems/src/Wire/ConversationSubsystem.hs index 44605b64467..cdf585c9150 100644 --- a/libs/wire-subsystems/src/Wire/ConversationSubsystem.hs +++ b/libs/wire-subsystems/src/Wire/ConversationSubsystem.hs @@ -339,6 +339,9 @@ data ConversationSubsystem m a where ConnId -> Local ConvId -> ConversationSubsystem m (UpdateResult Event) + InternalDeleteLocalConversation :: + Local ConvId -> + ConversationSubsystem m () GetMLSPublicKeys :: Maybe MLSPublicKeyFormat -> ConversationSubsystem m (MLSKeysByPurpose (MLSKeys SomeKey)) diff --git a/libs/wire-subsystems/src/Wire/ConversationSubsystem/Action.hs b/libs/wire-subsystems/src/Wire/ConversationSubsystem/Action.hs index 3c81ed700f9..7ca234495fc 100644 --- a/libs/wire-subsystems/src/Wire/ConversationSubsystem/Action.hs +++ b/libs/wire-subsystems/src/Wire/ConversationSubsystem/Action.hs @@ -29,6 +29,7 @@ module Wire.ConversationSubsystem.Action updateLocalConversationLeave, updateLocalConversationMemberUpdate, updateLocalConversationDelete, + updateLocalConversationDeleteUnchecked, updateLocalConversationRename, updateLocalConversationMessageTimerUpdate, updateLocalConversationReceiptModeUpdate, @@ -1088,6 +1089,26 @@ updateLocalConversationDelete :: updateLocalConversationDelete lcnvId uid connId = updateLocalConversation @'ConversationDeleteTag lcnvId uid connId () +updateLocalConversationDeleteUnchecked :: + ( Member (ErrorS 'InvalidOperation) r, + Member (ErrorS 'ConvNotFound) r, + Member CodeStore r, + Member E.ConversationStore r, + Member (ErrorS 'NotATeamMember) r, + Member ProposalStore r + ) => + Local ConvId -> + Sem r () +updateLocalConversationDeleteUnchecked lcnv = do + let tag = sing @'ConversationDeleteTag + conv <- getConversationWithError lcnv + -- check that the action does not bypass the underlying protocol + unless (protocolValidAction conv.protocol tag ()) $ + throwS @'InvalidOperation + -- perform all authorisation checks and, if successful, then update itself + let lconv = qualifyAs lcnv conv + void $ performAction @'ConversationDeleteTag lconv (error "not used") Nothing () + updateLocalConversationRename :: ( Member (Error FederationError) r, Member (ErrorS ('ActionDenied (ConversationActionPermission 'ConversationRenameTag))) r, diff --git a/libs/wire-subsystems/src/Wire/ConversationSubsystem/Interpreter.hs b/libs/wire-subsystems/src/Wire/ConversationSubsystem/Interpreter.hs index 672372cf8bc..d1e3f48b8b5 100644 --- a/libs/wire-subsystems/src/Wire/ConversationSubsystem/Interpreter.hs +++ b/libs/wire-subsystems/src/Wire/ConversationSubsystem/Interpreter.hs @@ -44,6 +44,7 @@ import Wire.CodeStore (CodeStore) import Wire.ConversationStore (ConversationStore) import Wire.ConversationStore qualified as ConvStore import Wire.ConversationSubsystem (ConversationSubsystem (..)) +import Wire.ConversationSubsystem.Action qualified as Action import Wire.ConversationSubsystem.Action.Notify qualified as ActionNotify import Wire.ConversationSubsystem.Clients as Clients import Wire.ConversationSubsystem.Create qualified as Create @@ -210,6 +211,8 @@ interpretConversationSubsystem = interpret $ \case mapErrors $ Update.postProteusBroadcast lusr con msg DeleteLocalConversation lusr con lcnv -> mapErrors $ Update.deleteLocalConversation lusr con lcnv + InternalDeleteLocalConversation lcnv -> + mapErrors $ Action.updateLocalConversationDeleteUnchecked lcnv GetMLSPublicKeys fmt -> mapErrors $ MLS.getMLSPublicKeys fmt ResetMLSConversation lusr reset -> diff --git a/libs/wire-subsystems/src/Wire/MeetingsStore.hs b/libs/wire-subsystems/src/Wire/MeetingsStore.hs index e2e0c7b55d6..63ad71f3902 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsStore.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsStore.hs @@ -167,5 +167,10 @@ data MeetingsStore m a where MeetingId -> [EmailAddress] -> MeetingsStore m () + -- Cleanup operations + GetOldMeetings :: + UTCTime -> + Int -> + MeetingsStore m [StoredMeeting] makeSem ''MeetingsStore diff --git a/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs b/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs index 65f8f890626..05fd2a46333 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs @@ -65,6 +65,8 @@ interpretMeetingsStoreToPostgres = addInvitedEmailsImpl meetingId email RemoveInvitedEmails meetingId emails -> removeInvitedEmailsImpl meetingId emails + GetOldMeetings cutoffTime batchSize -> + getOldMeetingsImpl cutoffTime batchSize -- * Create @@ -395,3 +397,35 @@ removeInvitedEmailsImpl meetingId emails = do updated_at = NOW() WHERE id = ($2 :: uuid) |] + +getOldMeetingsImpl :: + ( Member (Input Pool) r, + Member (Embed IO) r, + Member (Error UsageError) r + ) => + UTCTime -> + Int -> + Sem r [StoredMeeting] +getOldMeetingsImpl cutoffTime batchSize = do + pool <- input + result <- liftIO $ use pool session + either throw pure result + where + session :: Session [StoredMeeting] + session = statement (cutoffTime, fromIntegral batchSize) $ V.toList <$> listStatement + listStatement :: Statement (UTCTime, Int32) (V.Vector StoredMeeting) + listStatement = + refineResult + (traverse (postgresUnmarshall @StoredMeetingTuple @StoredMeeting)) + $ [vectorStatement| + SELECT + id :: uuid, title :: text, creator :: uuid, + start_time :: timestamptz, end_time :: timestamptz, + recurrence_frequency :: text?, recurrence_interval :: int4?, recurrence_until :: timestamptz?, + conversation_id :: uuid, invited_emails :: text[], trial :: boolean, + created_at :: timestamptz, updated_at :: timestamptz + FROM meetings + WHERE end_time < ($1 :: timestamptz) + ORDER BY end_time ASC + LIMIT ($2 :: int4) + |] diff --git a/libs/wire-subsystems/src/Wire/MeetingsSubsystem.hs b/libs/wire-subsystems/src/Wire/MeetingsSubsystem.hs index 8a126f1be0b..ea17c8a5ee9 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsSubsystem.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsSubsystem.hs @@ -21,6 +21,7 @@ module Wire.MeetingsSubsystem where import Data.Id import Data.Qualified +import Data.Time.Clock (UTCTime) import Imports import Polysemy import Wire.API.Meeting @@ -59,5 +60,9 @@ data MeetingsSubsystem m a where Qualified MeetingId -> [EmailAddress] -> MeetingsSubsystem m Bool + CleanupOldMeetings :: + UTCTime -> + Int -> + MeetingsSubsystem m Int64 makeSem ''MeetingsSubsystem diff --git a/libs/wire-subsystems/src/Wire/MeetingsSubsystem/Interpreter.hs b/libs/wire-subsystems/src/Wire/MeetingsSubsystem/Interpreter.hs index f038e9d6564..35fae84656d 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsSubsystem/Interpreter.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsSubsystem/Interpreter.hs @@ -26,13 +26,14 @@ import Data.Default (def) import Data.Domain (Domain) import Data.Id import Data.Map qualified as Map -import Data.Qualified (Local, Qualified (..), qualifyAs, tDomain, tUnqualified) +import Data.Qualified (Local, Qualified (..), inputQualifyLocal, qualifyAs, tDomain, tUnqualified) import Data.Range (Range, unsafeRange) import Data.Set qualified as Set import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime) import Imports import Polysemy import Polysemy.Error +import Polysemy.Input (Input) import Wire.API.Conversation hiding (Member) import Wire.API.Conversation.Role (roleNameWireAdmin) import Wire.API.Meeting qualified as API @@ -59,7 +60,8 @@ interpretMeetingsSubsystem :: Member TeamSubsystem r, Member FeaturesConfigSubsystem r, Member Now r, - Member (Error MeetingError) r + Member (Error MeetingError) r, + Member (Input (Local ())) r ) => NominalDiffTime -> InterpreterFor MeetingsSubsystem r @@ -78,6 +80,8 @@ interpretMeetingsSubsystem validityPeriod = interpret $ \case addInvitedEmailsImpl zUser meetingId emails validityPeriod RemoveInvitedEmails zUser meetingId emails -> removeInvitedEmailsImpl zUser meetingId emails validityPeriod + CleanupOldMeetings cutoffTime batchSize -> + cleanupOldMeetingsImpl cutoffTime batchSize createMeetingImpl :: ( Member Store.MeetingsStore r, @@ -360,3 +364,36 @@ removeInvitedEmailsImpl zUser meetingId emails validityPeriod = do lift $ Store.removeInvitedEmails (qUnqualified meetingId) emails pure $ isJust result + +cleanupOldMeetingsImpl :: + ( Member Store.MeetingsStore r, + Member ConversationSubsystem r, + Member (Input (Local ())) r + ) => + UTCTime -> + Int -> + Sem r Int64 +cleanupOldMeetingsImpl cutoffTime batchSize = do + oldMeetings <- Store.getOldMeetings cutoffTime batchSize + if null oldMeetings + then pure 0 + else do + for_ oldMeetings forceDeleteMeeting + pure $ fromIntegral $ length oldMeetings + +forceDeleteMeeting :: + ( Member Store.MeetingsStore r, + Member ConversationSubsystem r, + Member (Input (Local ())) r + ) => + Store.StoredMeeting -> + Sem r () +forceDeleteMeeting meeting = do + maybeConv <- ConversationSubsystem.internalGetConversation meeting.conversationId + case maybeConv of + Just conv + | conv.metadata.cnvmGroupConvType == Just MeetingConversation, + conv.id_ == meeting.conversationId -> + ConversationSubsystem.internalDeleteLocalConversation =<< inputQualifyLocal meeting.conversationId + _ -> pure () + Store.deleteMeeting meeting.id diff --git a/libs/wire-subsystems/test/unit/Wire/MeetingsSubsystem/InterpreterSpec.hs b/libs/wire-subsystems/test/unit/Wire/MeetingsSubsystem/InterpreterSpec.hs index 4e14e34d6e0..e6c55c1de58 100644 --- a/libs/wire-subsystems/test/unit/Wire/MeetingsSubsystem/InterpreterSpec.hs +++ b/libs/wire-subsystems/test/unit/Wire/MeetingsSubsystem/InterpreterSpec.hs @@ -32,6 +32,7 @@ import Data.Time.Clock import Imports import Polysemy import Polysemy.Error +import Polysemy.Input import Polysemy.State import System.Random (StdGen, mkStdGen) import Test.Hspec @@ -70,6 +71,7 @@ type TestStack = GalleyAPIAccess, Now, State UTCTime, + Input (Local ()), Random, State StdGen, ErrorS 'TeamMemberNotFound, @@ -107,6 +109,7 @@ runTestStack now gen teams configs = . runError @(Tagged 'TeamMemberNotFound ()) . evalState gen . randomToStatefulStdGen + . runInputConst (toLocalUnsafe (Domain "my-domain") ()) . evalState now . interpretNowAsState . miniGalleyAPIAccess teams configs diff --git a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/ConversationSubsystem.hs b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/ConversationSubsystem.hs index 27457d83433..38bd471efe6 100644 --- a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/ConversationSubsystem.hs +++ b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/ConversationSubsystem.hs @@ -84,6 +84,10 @@ inMemoryConversationSubsystemInterpreter = interpretH $ \case modify @(Map ConvId StoredConversation) (Map.delete (tUnqualified lcnv)) modify @ConversationMembers (Map.delete (tUnqualified lcnv)) pureT Unchanged + InternalDeleteLocalConversation lcnv -> do + modify @(Map ConvId StoredConversation) (Map.delete (tUnqualified lcnv)) + modify @ConversationMembers (Map.delete (tUnqualified lcnv)) + pureT () GetConversationIds _lusr _range _pagingState -> do pureT $ MultiTablePaging.MultiTablePage [] False (Public.ConversationPagingState MultiTablePaging.PagingLocals Nothing) GetConversations cids -> do diff --git a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs index 60906b23c67..385eac0c791 100644 --- a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs +++ b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs @@ -103,3 +103,9 @@ inMemoryMeetingsStoreInterpreter = interpret $ \case } modify (Map.insert mid updatedMeeting) DeleteMeeting mid -> modify (Map.delete mid) + GetOldMeetings cutoffTime batchSize -> + gets $ + take batchSize + . List.sortOn (.endTime) + . filter (\sm -> sm.endTime < cutoffTime) + . Map.elems diff --git a/services/background-worker/background-worker.cabal b/services/background-worker/background-worker.cabal index e6433d047c5..a3943555eb7 100644 --- a/services/background-worker/background-worker.cabal +++ b/services/background-worker/background-worker.cabal @@ -21,6 +21,8 @@ library Wire.BackgroundWorker.Options Wire.BackgroundWorker.Util Wire.DeadUserNotificationWatcher + Wire.Effects + Wire.MeetingsCleanupWorker Wire.PostgresMigrations hs-source-dirs: src @@ -39,6 +41,7 @@ library , bytestring-conversion , cassandra-util , containers + , cron , data-timeout , exceptions , extended @@ -61,6 +64,7 @@ library , ssl-util , tagged , text + , time , tinylog , transformers , transformers-base diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml index eaee5a414c4..12f01acb396 100644 --- a/services/background-worker/background-worker.integration.yaml +++ b/services/background-worker/background-worker.integration.yaml @@ -64,6 +64,12 @@ backgroundJobs: jobTimeout: 5s maxAttempts: 3 +# Meetings cleanup configuration for integration +meetingsCleanup: + cleanOlderThanHours: 0.0014 # Clean meetings older than ~5 seconds + batchSize: 100 + schedule: "* * * * *" # Run every minute + postgresMigration: conversation: postgresql conversationCodes: postgresql diff --git a/services/background-worker/default.nix b/services/background-worker/default.nix index 6137da6af87..524be8f3efa 100644 --- a/services/background-worker/default.nix +++ b/services/background-worker/default.nix @@ -11,6 +11,7 @@ , bytestring-conversion , cassandra-util , containers +, cron , data-default , data-timeout , exceptions @@ -44,6 +45,7 @@ , ssl-util , tagged , text +, time , tinylog , transformers , transformers-base @@ -71,6 +73,7 @@ mkDerivation { bytestring-conversion cassandra-util containers + cron data-timeout exceptions extended @@ -93,6 +96,7 @@ mkDerivation { ssl-util tagged text + time tinylog transformers transformers-base diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index e89ed926f43..10bdc020f7a 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -34,6 +34,7 @@ import Wire.BackgroundWorker.Health qualified as Health import Wire.BackgroundWorker.Jobs.Consumer qualified as Jobs import Wire.BackgroundWorker.Options import Wire.DeadUserNotificationWatcher qualified as DeadUserNotificationWatcher +import Wire.MeetingsCleanupWorker qualified as MeetingsCleanupWorker import Wire.Migration import Wire.Options.Galley qualified as Galley import Wire.PostgresMigrations qualified as Migrations @@ -75,16 +76,21 @@ run opts galleyOpts = do runAppT env $ withNamedLogger "background-job-consumer" $ Jobs.startWorker amqpEP + cleanupMeetings <- + runAppT env $ + withNamedLogger "meetings-cleanup" $ + MeetingsCleanupWorker.startWorker opts.meetingsCleanup let cleanup = void $ runConcurrently $ - (,,,,,) + (,,,,,,) <$> Concurrently cleanupDeadUserNotifWatcher <*> Concurrently cleanupBackendNotifPusher <*> Concurrently cleanupConvMigration <*> Concurrently cleanUpConvCodesMigration <*> Concurrently cleanupTeamFeaturesMigration <*> Concurrently cleanupJobs + <*> Concurrently cleanupMeetings let server = defaultServer (T.unpack opts.backgroundWorker.host) opts.backgroundWorker.port env.logger let settings = newSettings server diff --git a/services/background-worker/src/Wire/BackgroundWorker/Env.hs b/services/background-worker/src/Wire/BackgroundWorker/Env.hs index 981ae2139f6..01754e066be 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Env.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Env.hs @@ -77,6 +77,7 @@ data Env = Env httpManager :: Manager, defederationTimeout :: ResponseTimeout, backendNotificationMetrics :: BackendNotificationMetrics, + meetingsCleanupMetrics :: MeetingsCleanupMetrics, backendNotificationsConfig :: BackendNotificationsConfig, backgroundJobsConfig :: BackgroundJobsConfig, workerRunningGauge :: Vector Text Gauge, @@ -112,6 +113,10 @@ data BackendNotificationMetrics = BackendNotificationMetrics stuckQueuesGauge :: Vector Text Gauge } +data MeetingsCleanupMetrics = MeetingsCleanupMetrics + { runsCounter :: Counter + } + mkBackendNotificationMetrics :: IO BackendNotificationMetrics mkBackendNotificationMetrics = BackendNotificationMetrics @@ -119,6 +124,11 @@ mkBackendNotificationMetrics = <*> register (vector "targetDomain" $ counter $ Prometheus.Info "wire_backend_notifications_errors" "Number of errors that occurred while pushing notifications") <*> register (vector "targetDomain" $ gauge $ Prometheus.Info "wire_backend_notifications_stuck_queues" "Set to 1 when pushing notifications is stuck") +mkMeetingsCleanupMetrics :: IO MeetingsCleanupMetrics +mkMeetingsCleanupMetrics = + MeetingsCleanupMetrics + <$> register (counter $ Prometheus.Info "wire_meetings_cleanup_runs_total" "Number of times the meetings cleanup job has run") + mkWorkerRunningGauge :: IO (Vector Text Gauge) mkWorkerRunningGauge = register (vector "worker" $ gauge $ Prometheus.Info "wire_background_worker_running_workers" "Set to 1 when a worker is running") @@ -146,6 +156,7 @@ mkEnv opts galleyOpts = do (BackgroundJobConsumer, False) ] backendNotificationMetrics <- mkBackendNotificationMetrics + meetingsCleanupMetrics <- mkMeetingsCleanupMetrics let backendNotificationsConfig = opts.backendNotificationPusher backgroundJobsConfig = opts.backgroundJobs federationDomain = galleyOpts._settings._federationDomain diff --git a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs index 842dec6ec80..7e4bbcc0648 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs @@ -20,287 +20,22 @@ module Wire.BackgroundWorker.Jobs.Registry ) where -import Bilge qualified -import Bilge.Retry -import Cassandra (ClientState) -import Control.Monad.Catch -import Control.Retry -import Data.ByteString qualified as BS -import Data.ByteString.Lazy.Char8 qualified as LC8 -import Data.Id -import Data.Misc -import Data.Qualified -import Data.Tagged (Tagged) -import Data.Text qualified as T -import Data.Text.Lazy qualified as TL -import Galley.Types.Error (InternalError, internalErrorDescription, legalHoldServiceUnavailable) -import Hasql.Pool (UsageError) -import Hasql.Pool qualified as Hasql import Imports -import Network.HTTP.Client qualified as Http -import Network.Wai.Utilities.JSONResponse (JSONResponse (..)) -import OpenSSL.Session qualified as SSL -import Polysemy -import Polysemy.Async (asyncToIOFinal) -import Polysemy.Conc -import Polysemy.Error -import Polysemy.Input -import Polysemy.Resource (resourceToIOFinal) -import Polysemy.TinyLog qualified as P -import Ssl.Util -import System.Logger as Logger -import System.Logger.Class qualified as Log -import URI.ByteString (uriPath) import Wire.API.BackgroundJobs (Job (..)) -import Wire.API.Conversation.Config (ConversationSubsystemConfig (..)) -import Wire.API.Error (APIError (toResponse), DynError (..)) -import Wire.API.Error.Galley -import Wire.API.Federation.Error (FederationError) -import Wire.API.MLS.Keys (MLSKeysByPurpose, MLSPrivateKeys) -import Wire.API.Team.Collaborator (TeamCollaboratorsError) -import Wire.API.Team.Feature (LegalholdConfig) -import Wire.API.Team.FeatureFlags (FanoutLimit, FeatureDefaults (FeatureLegalHoldDisabledPermanently), currentFanoutLimit) -import Wire.BackendNotificationQueueAccess.RabbitMq qualified as BackendNotificationQueueAccess import Wire.BackgroundJobsPublisher.RabbitMQ (interpretBackgroundJobsPublisherRabbitMQ) import Wire.BackgroundJobsRunner (runJob) import Wire.BackgroundJobsRunner.Interpreter hiding (runJob) import Wire.BackgroundWorker.Env (AppT, Env (..)) -import Wire.BrigAPIAccess.Rpc -import Wire.ClientSubsystem.Error (ClientError) -import Wire.CodeStore.Cassandra (interpretCodeStoreToCassandra) -import Wire.CodeStore.DualWrite (interpretCodeStoreToCassandraAndPostgres) -import Wire.CodeStore.Postgres (interpretCodeStoreToPostgres) -import Wire.ConversationStore.Cassandra -import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres) -import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemError, GroupInfoCheckEnabled (..), IntraListing (..), interpretConversationSubsystem) +import Wire.Effects import Wire.ExternalAccess.External -import Wire.FeaturesConfigSubsystem (getAllTeamFeaturesForServer) -import Wire.FeaturesConfigSubsystem.Interpreter (runFeaturesConfigSubsystem) -import Wire.FeaturesConfigSubsystem.Types (ExposeInvitationURLsAllowlist (..)) -import Wire.FederationAPIAccess.Interpreter (FederationAPIAccessConfig (..), interpretFederationAPIAccess) -import Wire.FederationSubsystem.Interpreter (runFederationSubsystem) -import Wire.FireAndForget (interpretFireAndForget) -import Wire.GalleyAPIAccess -import Wire.GalleyAPIAccess.Rpc (interpretGalleyAPIAccessToRpc) -import Wire.GundeckAPIAccess -import Wire.HashPassword.Interpreter (runHashPassword) -import Wire.LegalHoldStore.Cassandra (interpretLegalHoldStoreToCassandra) -import Wire.LegalHoldStore.Env (LegalHoldEnv (..)) -import Wire.NotificationSubsystem.Interpreter -import Wire.Options.Galley (GuestLinkTTLSeconds) -import Wire.ParseException -import Wire.PostgresMigrationOpts -import Wire.ProposalStore.Cassandra (interpretProposalStoreToCassandra) -import Wire.RateLimit (RateLimitExceeded) -import Wire.RateLimit.Interpreter (interpretRateLimit) -import Wire.Rpc -import Wire.Sem.Concurrency (ConcurrencySafety (Unsafe)) -import Wire.Sem.Concurrency.IO (unsafelyPerformConcurrency) -import Wire.Sem.Delay (runDelay) -import Wire.Sem.Logger (mapLogger) -import Wire.Sem.Logger.TinyLog (loggerToTinyLog) -import Wire.Sem.Now.IO (nowToIO) -import Wire.Sem.Random.IO (randomToIO) -import Wire.ServiceStore.Cassandra (interpretServiceStoreToCassandra) -import Wire.SparAPIAccess.Rpc (interpretSparAPIAccessToRpc) -import Wire.TeamCollaboratorsStore.Postgres (interpretTeamCollaboratorsStoreToPostgres) -import Wire.TeamCollaboratorsSubsystem.Interpreter (interpretTeamCollaboratorsSubsystem) -import Wire.TeamFeatureStore.Cassandra (interpretTeamFeatureStoreToCassandra) -import Wire.TeamFeatureStore.Error (TeamFeatureStoreError) -import Wire.TeamJournal.Aws (interpretTeamJournal) -import Wire.TeamStore.Cassandra (interpretTeamStoreToCassandra) -import Wire.TeamSubsystem.Interpreter (TeamSubsystemConfig (..), interpretTeamSubsystem) -import Wire.UserClientIndexStore.Cassandra -import Wire.UserGroupStore.Postgres (interpretUserGroupStoreToPostgres) - --- Helper functions for LegalHoldEnv --- Adapted from Galley.External.LegalHoldService.Internal -makeVerifiedRequestWithManagerIO :: - Logger -> - Http.Manager -> - ([Fingerprint Rsa] -> SSL.SSL -> IO ()) -> - Fingerprint Rsa -> - HttpsUrl -> - (Http.Request -> Http.Request) -> - IO (Http.Response LC8.ByteString) -makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr (HttpsUrl url) reqBuilder = do - let verified = verifyFingerprints [fpr] - extHandleAll (errHandler logger) $ do - recovering legalHoldRetryPolicy httpHandlers $ - const $ - withVerifiedSslConnection verified mgr (reqBuilderMods . reqBuilder) $ - \req -> - Http.httpLbs req mgr - where - reqBuilderMods = - maybe id Bilge.host (Bilge.extHost url) - . Bilge.port (fromMaybe 443 (Bilge.extPort url)) - . Bilge.secure - . prependPath (uriPath url) - errHandler logger' e = do - Logger.info logger' $ Log.msg ("error making request to legalhold service: " <> displayException e) - throwM (legalHoldServiceUnavailable e) - prependPath :: BS.ByteString -> Http.Request -> Http.Request - prependPath pth req = req {Http.path = pth `BS.append` Http.path req} -- Modified to use BS.append - -- () from System.FilePath, but here we just need to append. - -- Assuming a simple append is sufficient for URI path segments for this context. - legalHoldRetryPolicy :: RetryPolicy - legalHoldRetryPolicy = limitRetries 3 <> exponentialBackoff 100000 - extHandleAll :: (MonadCatch m) => (SomeException -> m a) -> m a -> m a - extHandleAll f ma = - catches - ma - [ Handler $ \(ex :: SomeAsyncException) -> throwM ex, - Handler $ \(ex :: SomeException) -> f ex - ] - -makeVerifiedRequestIO :: Logger -> ExtEnv -> Fingerprint Rsa -> HttpsUrl -> (Http.Request -> Http.Request) -> IO (Http.Response LC8.ByteString) -makeVerifiedRequestIO logger extEnv fpr url reqBuilder = do - let (mgr, verifyFingerprints) = extGetManager extEnv - makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr url reqBuilder - -makeVerifiedRequestFreshManagerIO :: Logger -> Fingerprint Rsa -> HttpsUrl -> (Http.Request -> Http.Request) -> IO (Http.Response LC8.ByteString) -makeVerifiedRequestFreshManagerIO logger fpr url reqBuilder = do - let disableTlsV1 = True - ExtEnv (mgr, verifyFingerprints) <- initExtEnv disableTlsV1 - makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr url reqBuilder dispatchJob :: Job -> AppT IO (Either Text ()) dispatchJob job = do env <- ask @Env let disableTlsV1 = True extEnv <- liftIO (initExtEnv disableTlsV1) - liftIO $ runInterpreters env extEnv $ runJob job - where - convStoreInterpreter env = - case env.postgresMigration.conversation of - CassandraStorage -> interpretConversationStoreToCassandra env.cassandraGalley - MigrationToPostgresql -> interpretConversationStoreToCassandraAndPostgres env.cassandraGalley - PostgresqlStorage -> interpretConversationStoreToPostgres - runInterpreters env extEnv = do - let federationAPIAccessConfig = - FederationAPIAccessConfig - { ownDomain = env.federationDomain, - federatorEndpoint = Just env.federatorInternal, - http2Manager = env.http2Manager, - requestId = job.requestId - } - teamSubsystemConfig = TeamSubsystemConfig {concurrentDeletionEvents = 1} - legalHoldEnv = - let makeReq fpr url rb = makeVerifiedRequestIO env.logger extEnv fpr url rb - makeReqFresh fpr url rb = makeVerifiedRequestFreshManagerIO env.logger fpr url rb - in LegalHoldEnv {makeVerifiedRequest = makeReq, makeVerifiedRequestFreshManager = makeReqFresh} - convCodesStoreInterpreter = - case env.postgresMigration.conversationCodes of - CassandraStorage -> interpretCodeStoreToCassandra - MigrationToPostgresql -> interpretCodeStoreToCassandraAndPostgres - PostgresqlStorage -> interpretCodeStoreToPostgres - runFinal @IO - . unsafelyPerformConcurrency @_ @'Unsafe - . embedToFinal @IO - . asyncToIOFinal - . interpretRace - . runDelay - . resourceToIOFinal - . runError - . mapError @DynError (.eMessage) - . mapError @JSONResponse (T.pack . show . (.value)) - . mapError @ConversationSubsystemError toResponse - . mapError @ClientError (T.pack . displayException) - . mapError @FederationError (T.pack . displayException) - . mapError @UsageError (T.pack . show) - . mapError @ParseException (T.pack . displayException) - . mapError @MigrationError (T.pack . show) - . mapError @InternalError (TL.toStrict . internalErrorDescription) - . mapError @UnreachableBackends (T.pack . show) - . mapError @TeamCollaboratorsError (const ("Team collaborators error" :: Text)) - . mapError @TeamFeatureStoreError (const ("Team feature store error" :: Text)) - . mapError @(Tagged 'NotATeamMember ()) (const ("Not a team member" :: Text)) - . mapError @(Tagged 'ConvAccessDenied ()) (const ("Conversation access denied" :: Text)) - . mapError @(Tagged 'TeamNotFound ()) (const ("Team not found" :: Text)) - . mapError @(Tagged 'TeamMemberNotFound ()) (const ("Team member not found" :: Text)) - . mapError @(Tagged 'AccessDenied ()) (const ("Access denied" :: Text)) - . mapError @NonFederatingBackends (const ("Non federating backends" :: Text)) - . mapError @UnreachableBackendsLegacy (const ("Unreachable backends legacy" :: Text)) - . mapError @RateLimitExceeded (const ("Rate limit exceeded" :: Text)) - . interpretTinyLog env job.requestId job.jobId - . runInputConst @Hasql.Pool env.hasqlPool - . runInputConst @(Local ()) (toLocalUnsafe env.federationDomain ()) - . runInputConst @(FeatureDefaults LegalholdConfig) FeatureLegalHoldDisabledPermanently - . runInputConst @ClientState env.cassandraGalley - . runInputConst @LegalHoldEnv legalHoldEnv - . runInputConst @ExposeInvitationURLsAllowlist (ExposeInvitationURLsAllowlist $ fromMaybe [] env.exposeInvitationURLsTeamAllowlist) - . runInputConst @(Either HttpsUrl (Map Text HttpsUrl)) env.convCodeURI - . runInputConst @IntraListing (IntraListing env.intraListing) - . runInputConst @(Maybe GroupInfoCheckEnabled) (GroupInfoCheckEnabled <$> env.checkGroupInfo) - . runInputConst @(Maybe GuestLinkTTLSeconds) env.guestLinkTTLSeconds - . runInputConst @FanoutLimit (currentFanoutLimit env.maxTeamSize env.maxFanoutSize) - . interpretMLSCommitLockStoreToCassandra env.cassandraGalley - . interpretProposalStoreToCassandra - . interpretServiceStoreToCassandra env.cassandraBrig - . interpretUserGroupStoreToPostgres - . interpretTeamFeatureStoreToCassandra - . interpretUserClientIndexStoreToCassandra env.cassandraGalley - . convStoreInterpreter env - . interpretTeamStoreToCassandra - . interpretTeamCollaboratorsStoreToPostgres - . interpretLegalHoldStoreToCassandra FeatureLegalHoldDisabledPermanently - . interpretTeamJournal Nothing - . interpretBackgroundJobsPublisherRabbitMQ job.requestId env.amqpJobsPublisherChannel - . nowToIO - . randomToIO - . interpretFireAndForget - . BackendNotificationQueueAccess.interpretBackendNotificationQueueAccess (Just $ backendQueueEnv env) - . runRpcWithHttp env.httpManager job.requestId - . runGundeckAPIAccess env.gundeckEndpoint - -- FUTUREWORK: Currently the brig access effect is needed for the interpreter of ExternalAccess. - -- At the time of implementation the only function used from ExternalAccess is deliverAsync, which will not call brig access. - -- However, to prevent the background worker to require HTTP access to brig, we should consider refactoring this at some point. - . interpretBrigAccess env.brigEndpoint - . interpretGalleyAPIAccessToRpc mempty env.galleyEndpoint - . runInputSem getConversationSubsystemConfig - . runInputSem @(Maybe (MLSKeysByPurpose MLSPrivateKeys)) (inputs @ConversationSubsystemConfig (.mlsKeys)) - . runInputSem getConfiguredFeatureFlags - . runHashPassword env.passwordHashingOptions - . interpretRateLimit env.passwordHashingRateLimitEnv - . convCodesStoreInterpreter - . interpretExternalAccess extEnv - . interpretSparAPIAccessToRpc env.sparEndpoint - . runNotificationSubsystemGundeck (defaultNotificationSubsystemConfig job.requestId) - . interpretFederationAPIAccess federationAPIAccessConfig - . interpretTeamSubsystem teamSubsystemConfig - . ( \m -> do - p <- inputs @ConversationSubsystemConfig (.federationProtocols) - runFederationSubsystem p m - ) - . runFeaturesConfigSubsystem - . runInputSem getAllTeamFeaturesForServer - . interpretTeamCollaboratorsSubsystem - . interpretConversationSubsystem - . interpretBackgroundJobsRunner - - getConversationSubsystemConfig :: - (Member GalleyAPIAccess r) => - Sem r ConversationSubsystemConfig - getConversationSubsystemConfig = getConversationConfig - - backendQueueEnv :: Env -> BackendNotificationQueueAccess.Env - backendQueueEnv env = - BackendNotificationQueueAccess.Env - { channelMVar = env.amqpBackendNotificationsChannel, - logger = env.logger, - local = toLocalUnsafe env.federationDomain (), - requestId = job.requestId - } - -interpretTinyLog :: - (Member (Embed IO) r) => - Env -> - RequestId -> - JobId -> - Sem (P.TinyLog ': r) a -> - Sem r a -interpretTinyLog e reqId jobId = - loggerToTinyLog e.logger - . mapLogger ((field "request" (unRequestId reqId) . field "job" (idToText jobId)) .) - . raiseUnder @P.TinyLog + liftIO + $ runBackgroundWorkerEffects env extEnv job.requestId (Just job.jobId) + . interpretBackgroundJobsPublisherRabbitMQ job.requestId env.amqpJobsPublisherChannel + . interpretBackgroundJobsRunner + $ runJob job diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index c616d1e5a4e..7b8abe9f7bc 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE RecordWildCards #-} + -- This file is part of the Wire Server implementation. -- -- Copyright (C) 2025 Wire Swiss GmbH @@ -18,12 +20,14 @@ module Wire.BackgroundWorker.Options where import Data.Aeson +import Data.Aeson.Types (JSONPathElement (Key), parserThrowError) import Data.Misc import Data.Range (Range) import GHC.Generics import Hasql.Pool.Extended import Imports import Network.AMQP.Extended +import System.Cron (CronSchedule, parseCronSchedule) import System.Logger.Extended import Util.Options import Wire.Migration @@ -50,6 +54,7 @@ data Opts = Opts migrateConversationsOptions :: !MigrationOptions, migrateConversationCodes :: !Bool, migrateTeamFeatures :: !Bool, + meetingsCleanup :: MeetingsCleanupConfig, backgroundJobs :: BackgroundJobsConfig } deriving (Show, Generic) @@ -92,3 +97,28 @@ data BackgroundJobsConfig = BackgroundJobsConfig } deriving (Show, Generic) deriving (FromJSON) via Generically BackgroundJobsConfig + +data MeetingsCleanupConfig = MeetingsCleanupConfig + { -- | Delete meetings older than this many hours + cleanOlderThanHours :: Double, + -- | Maximum number of meetings to delete per batch + batchSize :: Int, + -- | Cron schedule for the cleanup job + schedule :: CronSchedule + } + deriving (Show, Generic) + +instance FromJSON MeetingsCleanupConfig where + parseJSON = + withObject "MeetingsCleanupConfig" $ \o -> do + cleanOlderThanHours <- o .: "cleanOlderThanHours" + batchSize <- o .: "batchSize" + when (batchSize <= 0) $ + parserThrowError [Key "batchSize"] $ + "batchSize must be greater than 0, got: " <> show batchSize + scheduleRaw <- o .: "schedule" + schedule <- + case parseCronSchedule scheduleRaw of + Left e -> parserThrowError [Key "schedule"] $ "Cannot parse cronjob syntax: " <> e + Right x -> pure x + pure $ MeetingsCleanupConfig {..} diff --git a/services/background-worker/src/Wire/Effects.hs b/services/background-worker/src/Wire/Effects.hs new file mode 100644 index 00000000000..5f474cd1078 --- /dev/null +++ b/services/background-worker/src/Wire/Effects.hs @@ -0,0 +1,389 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2025 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.Effects + ( runBackgroundWorkerEffects, + ) +where + +import Bilge qualified +import Bilge.Retry +import Cassandra (ClientState) +import Control.Monad.Catch +import Control.Retry +import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as LBS +import Data.Id +import Data.Misc +import Data.Qualified +import Data.Tagged (Tagged) +import Data.Text qualified as T +import Data.Text.Lazy qualified as TL +import Galley.Types.Error (InternalError, internalErrorDescription, legalHoldServiceUnavailable) +import Hasql.Pool (UsageError) +import Hasql.Pool qualified as Hasql +import Imports +import Network.HTTP.Client qualified as Http +import Network.Wai.Utilities.JSONResponse (JSONResponse (..)) +import OpenSSL.Session qualified as SSL +import Polysemy +import Polysemy.Async (Async, asyncToIOFinal) +import Polysemy.Conc +import Polysemy.Error +import Polysemy.Input +import Polysemy.Resource (Resource, resourceToIOFinal) +import Polysemy.TinyLog qualified as P +import Ssl.Util +import System.Logger as Logger +import System.Logger qualified as Log +import URI.ByteString (uriPath) +import Wire.API.Conversation.Config (ConversationSubsystemConfig (..)) +import Wire.API.Error (APIError (toResponse), DynError (..)) +import Wire.API.Error.Galley +import Wire.API.Federation.Client (FederatorClient) +import Wire.API.Federation.Error (FederationError) +import Wire.API.MLS.Keys (MLSKeysByPurpose, MLSPrivateKeys) +import Wire.API.Team.Collaborator (TeamCollaboratorsError) +import Wire.API.Team.Feature (AllTeamFeatures, LegalholdConfig) +import Wire.API.Team.FeatureFlags (FanoutLimit, FeatureDefaults (FeatureLegalHoldDisabledPermanently), FeatureFlags, currentFanoutLimit) +import Wire.BackendNotificationQueueAccess (BackendNotificationQueueAccess) +import Wire.BackendNotificationQueueAccess.RabbitMq qualified as BackendNotificationQueueAccess +import Wire.BackgroundWorker.Env (Env (..)) +import Wire.BrigAPIAccess (BrigAPIAccess) +import Wire.BrigAPIAccess.Rpc +import Wire.ClientSubsystem.Error (ClientError) +import Wire.CodeStore (CodeStore) +import Wire.CodeStore.Cassandra (interpretCodeStoreToCassandra) +import Wire.CodeStore.DualWrite (interpretCodeStoreToCassandraAndPostgres) +import Wire.CodeStore.Postgres (interpretCodeStoreToPostgres) +import Wire.ConversationStore (ConversationStore, MLSCommitLockStore) +import Wire.ConversationStore.Cassandra (MigrationError (..), interpretConversationStoreByMigration, interpretMLSCommitLockStoreToCassandra) +import Wire.ConversationSubsystem (ConversationSubsystem) +import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemError, GroupInfoCheckEnabled (..), IntraListing (..), interpretConversationSubsystem) +import Wire.ExternalAccess (ExternalAccess) +import Wire.ExternalAccess.External +import Wire.FeaturesConfigSubsystem (FeaturesConfigSubsystem, getAllTeamFeaturesForServer) +import Wire.FeaturesConfigSubsystem.Interpreter (runFeaturesConfigSubsystem) +import Wire.FeaturesConfigSubsystem.Types (ExposeInvitationURLsAllowlist (..)) +import Wire.FederationAPIAccess (FederationAPIAccess) +import Wire.FederationAPIAccess.Interpreter (FederationAPIAccessConfig (..), interpretFederationAPIAccess) +import Wire.FederationSubsystem (FederationSubsystem) +import Wire.FederationSubsystem.Interpreter (runFederationSubsystem) +import Wire.FireAndForget (FireAndForget, interpretFireAndForget) +import Wire.GalleyAPIAccess +import Wire.GalleyAPIAccess.Rpc (interpretGalleyAPIAccessToRpc) +import Wire.GundeckAPIAccess +import Wire.HashPassword (HashPassword) +import Wire.HashPassword.Interpreter (runHashPassword) +import Wire.LegalHoldStore (LegalHoldStore) +import Wire.LegalHoldStore.Cassandra (interpretLegalHoldStoreToCassandra) +import Wire.LegalHoldStore.Env (LegalHoldEnv (..)) +import Wire.NotificationSubsystem (NotificationSubsystem) +import Wire.NotificationSubsystem.Interpreter +import Wire.Options.Galley (GuestLinkTTLSeconds) +import Wire.ParseException +import Wire.PostgresMigrationOpts +import Wire.ProposalStore (ProposalStore) +import Wire.ProposalStore.Cassandra (interpretProposalStoreToCassandra) +import Wire.RateLimit (RateLimit, RateLimitExceeded) +import Wire.RateLimit.Interpreter (interpretRateLimit) +import Wire.Rpc +import Wire.Sem.Concurrency (Concurrency, ConcurrencySafety (Unsafe)) +import Wire.Sem.Concurrency.IO (unsafelyPerformConcurrency) +import Wire.Sem.Delay (Delay, runDelay) +import Wire.Sem.Logger (mapLogger) +import Wire.Sem.Logger.TinyLog (loggerToTinyLog) +import Wire.Sem.Now (Now) +import Wire.Sem.Now.IO (nowToIO) +import Wire.Sem.Random (Random) +import Wire.Sem.Random.IO (randomToIO) +import Wire.ServiceStore (ServiceStore) +import Wire.ServiceStore.Cassandra (interpretServiceStoreToCassandra) +import Wire.SparAPIAccess (SparAPIAccess) +import Wire.SparAPIAccess.Rpc (interpretSparAPIAccessToRpc) +import Wire.TeamCollaboratorsStore (TeamCollaboratorsStore) +import Wire.TeamCollaboratorsStore.Postgres (interpretTeamCollaboratorsStoreToPostgres) +import Wire.TeamCollaboratorsSubsystem (TeamCollaboratorsSubsystem) +import Wire.TeamCollaboratorsSubsystem.Interpreter (interpretTeamCollaboratorsSubsystem) +import Wire.TeamFeatureStore (TeamFeatureStore) +import Wire.TeamFeatureStore.Cassandra (interpretTeamFeatureStoreToCassandra) +import Wire.TeamFeatureStore.Error (TeamFeatureStoreError) +import Wire.TeamJournal (TeamJournal) +import Wire.TeamJournal.Aws (interpretTeamJournal) +import Wire.TeamStore (TeamStore) +import Wire.TeamStore.Cassandra (interpretTeamStoreToCassandra) +import Wire.TeamSubsystem (TeamSubsystem) +import Wire.TeamSubsystem.Interpreter (TeamSubsystemConfig (..), interpretTeamSubsystem) +import Wire.UserClientIndexStore (UserClientIndexStore) +import Wire.UserClientIndexStore.Cassandra +import Wire.UserGroupStore (UserGroupStore) +import Wire.UserGroupStore.Postgres (interpretUserGroupStoreToPostgres) + +makeVerifiedRequestWithManagerIO :: + Logger -> + Http.Manager -> + ([Fingerprint Rsa] -> SSL.SSL -> IO ()) -> + Fingerprint Rsa -> + HttpsUrl -> + (Http.Request -> Http.Request) -> + IO (Http.Response LBS.ByteString) +makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr (HttpsUrl url) reqBuilder = do + let verified = verifyFingerprints [fpr] + extHandleAll (errHandler logger) $ do + recovering legalHoldRetryPolicy httpHandlers $ + const $ + withVerifiedSslConnection verified mgr (reqBuilderMods . reqBuilder) $ + \req -> + Http.httpLbs req mgr + where + reqBuilderMods = + maybe id Bilge.host (Bilge.extHost url) + . Bilge.port (fromMaybe 443 (Bilge.extPort url)) + . Bilge.secure + . prependPath (uriPath url) + errHandler logger' e = do + Logger.info logger' $ Log.msg ("error making request to legalhold service: " <> displayException e) + throwM (legalHoldServiceUnavailable e) + prependPath :: BS.ByteString -> Http.Request -> Http.Request + prependPath pth req = req {Http.path = pth `BS.append` Http.path req} + legalHoldRetryPolicy :: RetryPolicy + legalHoldRetryPolicy = limitRetries 3 <> exponentialBackoff 100000 + extHandleAll :: (MonadCatch m) => (SomeException -> m a) -> m a -> m a + extHandleAll f ma = + catches + ma + [ Handler $ \(ex :: SomeAsyncException) -> throwM ex, + Handler $ \(ex :: SomeException) -> f ex + ] + +makeVerifiedRequestIO :: Logger -> ExtEnv -> Fingerprint Rsa -> HttpsUrl -> (Http.Request -> Http.Request) -> IO (Http.Response LBS.ByteString) +makeVerifiedRequestIO logger extEnv fpr url reqBuilder = do + let (mgr, verifyFingerprints) = extGetManager extEnv + makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr url reqBuilder + +makeVerifiedRequestFreshManagerIO :: Logger -> Fingerprint Rsa -> HttpsUrl -> (Http.Request -> Http.Request) -> IO (Http.Response LBS.ByteString) +makeVerifiedRequestFreshManagerIO logger fpr url reqBuilder = do + let disableTlsV1 = True + ExtEnv (mgr, verifyFingerprints) <- initExtEnv disableTlsV1 + makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr url reqBuilder + +type BackgroundWorkerEffects = + '[ ConversationSubsystem, + TeamCollaboratorsSubsystem, + Input AllTeamFeatures, + FeaturesConfigSubsystem, + FederationSubsystem, + TeamSubsystem, + FederationAPIAccess FederatorClient, + NotificationSubsystem, + SparAPIAccess, + ExternalAccess, + RateLimit, + HashPassword, + Input FeatureFlags, + Input (Maybe (MLSKeysByPurpose MLSPrivateKeys)), + Input ConversationSubsystemConfig, + GalleyAPIAccess, + BrigAPIAccess, + GundeckAPIAccess, + Rpc, + CodeStore, + BackendNotificationQueueAccess, + FireAndForget, + Random, + Now, + TeamJournal, + LegalHoldStore, + TeamCollaboratorsStore, + TeamStore, + ConversationStore, + UserClientIndexStore, + TeamFeatureStore, + UserGroupStore, + ServiceStore, + ProposalStore, + MLSCommitLockStore, + Input FanoutLimit, + Input (Maybe GuestLinkTTLSeconds), + Input (Maybe GroupInfoCheckEnabled), + Input IntraListing, + Input (Either HttpsUrl (Map Text HttpsUrl)), + Input ExposeInvitationURLsAllowlist, + Input LegalHoldEnv, + Input ClientState, + Input (FeatureDefaults LegalholdConfig), + Input (Local ()), + Input Hasql.Pool, + P.TinyLog, + Error RateLimitExceeded, + Error UnreachableBackendsLegacy, + Error NonFederatingBackends, + Error (Tagged AccessDenied ()), + Error (Tagged TeamMemberNotFound ()), + Error (Tagged TeamNotFound ()), + Error (Tagged ConvAccessDenied ()), + Error (Tagged NotATeamMember ()), + Error TeamFeatureStoreError, + Error TeamCollaboratorsError, + Error UnreachableBackends, + Error InternalError, + Error MigrationError, + Error ParseException, + Error UsageError, + Error FederationError, + Error ClientError, + Error ConversationSubsystemError, + Error JSONResponse, + Error DynError, + Error Text, + Resource, + Delay, + Race, + Async, + Embed IO, + Concurrency Unsafe, + Final IO + ] + +runBackgroundWorkerEffects :: + Env -> + ExtEnv -> + RequestId -> + Maybe JobId -> + Sem BackgroundWorkerEffects a -> + IO (Either Text a) +runBackgroundWorkerEffects env extEnv requestId mJobId = + runFinal @IO + . unsafelyPerformConcurrency @_ @'Unsafe + . embedToFinal @IO + . asyncToIOFinal + . interpretRace + . runDelay + . resourceToIOFinal + . runError + . mapError @DynError (.eMessage) + . mapError @JSONResponse (T.pack . show . (.value)) + . mapError @ConversationSubsystemError toResponse + . mapError @ClientError (T.pack . displayException) + . mapError @FederationError (T.pack . displayException) + . mapError @UsageError (T.pack . show) + . mapError @ParseException (T.pack . displayException) + . mapError @MigrationError (T.pack . show) + . mapError @InternalError (TL.toStrict . internalErrorDescription) + . mapError @UnreachableBackends (T.pack . show) + . mapError @TeamCollaboratorsError (const ("Team collaborators error" :: Text)) + . mapError @TeamFeatureStoreError (const ("Team feature store error" :: Text)) + . mapError @(Tagged 'NotATeamMember ()) (const ("Not a team member" :: Text)) + . mapError @(Tagged 'ConvAccessDenied ()) (const ("Conversation access denied" :: Text)) + . mapError @(Tagged 'TeamNotFound ()) (const ("Team not found" :: Text)) + . mapError @(Tagged 'TeamMemberNotFound ()) (const ("Team member not found" :: Text)) + . mapError @(Tagged 'AccessDenied ()) (const ("Access denied" :: Text)) + . mapError @NonFederatingBackends (const ("Non federating backends" :: Text)) + . mapError @UnreachableBackendsLegacy (const ("Unreachable backends legacy" :: Text)) + . mapError @RateLimitExceeded (const ("Rate limit exceeded" :: Text)) + . interpretTinyLog + . runInputConst @Hasql.Pool env.hasqlPool + . runInputConst @(Local ()) (toLocalUnsafe env.federationDomain ()) + . runInputConst @(FeatureDefaults LegalholdConfig) FeatureLegalHoldDisabledPermanently + . runInputConst @ClientState env.cassandraGalley + . runInputConst @LegalHoldEnv legalHoldEnv + . runInputConst @ExposeInvitationURLsAllowlist (ExposeInvitationURLsAllowlist $ fromMaybe [] env.exposeInvitationURLsTeamAllowlist) + . runInputConst @(Either HttpsUrl (Map Text HttpsUrl)) env.convCodeURI + . runInputConst @IntraListing (IntraListing env.intraListing) + . runInputConst @(Maybe GroupInfoCheckEnabled) (GroupInfoCheckEnabled <$> env.checkGroupInfo) + . runInputConst @(Maybe GuestLinkTTLSeconds) env.guestLinkTTLSeconds + . runInputConst @FanoutLimit (currentFanoutLimit env.maxTeamSize env.maxFanoutSize) + . interpretMLSCommitLockStoreToCassandra env.cassandraGalley + . interpretProposalStoreToCassandra + . interpretServiceStoreToCassandra env.cassandraBrig + . interpretUserGroupStoreToPostgres + . interpretTeamFeatureStoreToCassandra + . interpretUserClientIndexStoreToCassandra env.cassandraGalley + . interpretConversationStoreByMigration env.postgresMigration.conversation env.cassandraGalley + . interpretTeamStoreToCassandra + . interpretTeamCollaboratorsStoreToPostgres + . interpretLegalHoldStoreToCassandra FeatureLegalHoldDisabledPermanently + . interpretTeamJournal Nothing + . nowToIO + . randomToIO + . interpretFireAndForget + . BackendNotificationQueueAccess.interpretBackendNotificationQueueAccess (Just backendQueueEnv) + . convCodesStoreInterpreter + . runRpcWithHttp env.httpManager requestId + . runGundeckAPIAccess env.gundeckEndpoint + -- FUTUREWORK: Currently the brig access effect is needed for the interpreter of ExternalAccess. + -- At the time of implementation the only function used from ExternalAccess is deliverAsync, which will not call brig access. + -- However, to prevent the background worker to require HTTP access to brig, we should consider refactoring this at some point. + . interpretBrigAccess env.brigEndpoint + . interpretGalleyAPIAccessToRpc mempty env.galleyEndpoint + . runInputSem getConversationSubsystemConfig + . runInputSem @(Maybe (MLSKeysByPurpose MLSPrivateKeys)) (inputs @ConversationSubsystemConfig (.mlsKeys)) + . runInputSem getConfiguredFeatureFlags + . runHashPassword env.passwordHashingOptions + . interpretRateLimit env.passwordHashingRateLimitEnv + . interpretExternalAccess extEnv + . interpretSparAPIAccessToRpc env.sparEndpoint + . runNotificationSubsystemGundeck (defaultNotificationSubsystemConfig requestId) + . interpretFederationAPIAccess federationAPIAccessConfig + . interpretTeamSubsystem teamSubsystemConfig + . ( \m -> do + p <- inputs @ConversationSubsystemConfig (.federationProtocols) + runFederationSubsystem p m + ) + . runFeaturesConfigSubsystem + . runInputSem getAllTeamFeaturesForServer + . interpretTeamCollaboratorsSubsystem + . interpretConversationSubsystem + where + convCodesStoreInterpreter = + case env.postgresMigration.conversationCodes of + CassandraStorage -> interpretCodeStoreToCassandra + MigrationToPostgresql -> interpretCodeStoreToCassandraAndPostgres + PostgresqlStorage -> interpretCodeStoreToPostgres + legalHoldEnv = + let makeReq fpr url rb = makeVerifiedRequestIO env.logger extEnv fpr url rb + makeReqFresh fpr url rb = makeVerifiedRequestFreshManagerIO env.logger fpr url rb + in LegalHoldEnv {makeVerifiedRequest = makeReq, makeVerifiedRequestFreshManager = makeReqFresh} + teamSubsystemConfig = TeamSubsystemConfig {concurrentDeletionEvents = 1} + federationAPIAccessConfig = + FederationAPIAccessConfig + { ownDomain = env.federationDomain, + federatorEndpoint = Just env.federatorInternal, + http2Manager = env.http2Manager, + requestId = requestId + } + getConversationSubsystemConfig :: + (Member GalleyAPIAccess r) => + Sem r ConversationSubsystemConfig + getConversationSubsystemConfig = getConversationConfig + backendQueueEnv = + BackendNotificationQueueAccess.Env + { channelMVar = env.amqpBackendNotificationsChannel, + logger = env.logger, + local = toLocalUnsafe env.federationDomain (), + requestId = requestId + } + interpretTinyLog :: (Member (Embed IO) r) => Sem (P.TinyLog ': r) a -> Sem r a + interpretTinyLog = + loggerToTinyLog env.logger + . mapLogger (loggerFields .) + . raiseUnder @P.TinyLog + loggerFields :: Log.Msg -> Log.Msg + loggerFields = + case mJobId of + Nothing -> field "request" (unRequestId requestId) + Just jId -> field "request" (unRequestId requestId) . field "job" (idToText jId) diff --git a/services/background-worker/src/Wire/MeetingsCleanupWorker.hs b/services/background-worker/src/Wire/MeetingsCleanupWorker.hs new file mode 100644 index 00000000000..b3068af2152 --- /dev/null +++ b/services/background-worker/src/Wire/MeetingsCleanupWorker.hs @@ -0,0 +1,138 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.MeetingsCleanupWorker + ( startWorker, + CleanupConfig (..), + ) +where + +import Data.Id (RequestId (RequestId)) +import Data.Text qualified as T +import Data.Time.Clock +import Imports +import Polysemy.Error (runError) +import Prometheus (incCounter) +import System.Cron (Job (..), forkJob) +import System.Logger qualified as Log +import Wire.BackgroundWorker.Env (AppT, Env (..), MeetingsCleanupMetrics (..), runAppT) +import Wire.BackgroundWorker.Options (MeetingsCleanupConfig (..)) +import Wire.BackgroundWorker.Util (CleanupAction) +import Wire.Effects +import Wire.ExternalAccess.External +import Wire.MeetingsStore.Postgres (interpretMeetingsStoreToPostgres) +import Wire.MeetingsSubsystem +import Wire.MeetingsSubsystem.Interpreter + +data CleanupConfig = CleanupConfig + { retentionHours :: Double, + batchSize :: Int + } + deriving (Show, Eq) + +-- | Start the meetings cleanup worker thread +-- +-- This worker runs periodically to clean up old meetings based on the configuration. +startWorker :: + MeetingsCleanupConfig -> + AppT IO CleanupAction +startWorker config = do + env <- ask + Log.info env.logger $ + Log.msg (Log.val "Starting meetings cleanup worker") + . Log.field "schedule" (show config.schedule) + . Log.field "clean_older_than_hours" config.cleanOlderThanHours + + void . liftIO $ do + forkJob $ + Job config.schedule $ + runAppT env $ do + Log.info env.logger $ Log.msg (Log.val "Starting scheduled meetings cleanup") + runCleanupOldMeetings (configFromOptions config) + liftIO $ incCounter env.meetingsCleanupMetrics.runsCounter + + pure $ pure () + +-- | Convert MeetingsCleanupConfig to CleanupConfig +configFromOptions :: MeetingsCleanupConfig -> CleanupConfig +configFromOptions cfg = + CleanupConfig + { retentionHours = cfg.cleanOlderThanHours, + batchSize = cfg.batchSize + } + +-- | Main cleanup function that orchestrates the cleanup process +runCleanupOldMeetings :: CleanupConfig -> AppT IO () +runCleanupOldMeetings config = do + env <- ask + now <- liftIO getCurrentTime + let cutoffTime = addUTCTime (negate $ realToFrac config.retentionHours * 3600) now + + Log.info env.logger $ + Log.msg (Log.val "Starting cleanup of old meetings") + . Log.field "cutoff_time" (show cutoffTime) + . Log.field "retention_hours" config.retentionHours + . Log.field "batch_size" config.batchSize + + -- Loop until no more meetings are deleted + totalDeleted <- cleanupLoop env cutoffTime config.batchSize 0 + + Log.info env.logger $ + Log.msg (Log.val "Completed cleanup of old meetings") + . Log.field "total_deleted" totalDeleted + +cleanupLoop :: Env -> UTCTime -> Int -> Int64 -> AppT IO Int64 +cleanupLoop env cutoffTime batchSize totalSoFar = do + when (batchSize <= 0) $ do + Log.err env.logger $ + Log.msg (Log.val "Invalid batch size: must be greater than 0") + . Log.field "batch_size" batchSize + error "Invalid batch size: must be greater than 0" + -- Run the subsystem to handle cleanup logic + result <- liftIO $ runMeetingsCleanup env cutoffTime batchSize + + case result of + Left err -> do + Log.err env.logger $ + Log.msg (Log.val "Failed to cleanup old meetings batch") + . Log.field "error" (show err) + . Log.field "total_deleted_so_far" totalSoFar + pure totalSoFar + Right deletedCount -> do + let newTotal = totalSoFar + deletedCount + Log.info env.logger $ + Log.msg (Log.val "Cleaned up meetings batch") + . Log.field "batch_deleted" deletedCount + . Log.field "total_deleted" newTotal + -- Continue if we deleted a full batch (meaning there might be more) + if deletedCount >= fromIntegral batchSize + then cleanupLoop env cutoffTime batchSize newTotal + else pure newTotal + +-- Run the meetings cleanup using the subsystem +runMeetingsCleanup :: Env -> UTCTime -> Int -> IO (Either Text Int64) +runMeetingsCleanup env cutoffTime batchSize = do + let disableTlsV1 = True + extEnv <- initExtEnv disableTlsV1 + let validityPeriod = realToFrac batchSize * 3600 + let mergeErrors = either (Left . T.pack . show) Right + fmap (either Left mergeErrors) + . runBackgroundWorkerEffects env extEnv (RequestId "meetings-cleanup") Nothing + . interpretMeetingsStoreToPostgres + . runError @MeetingError + . interpretMeetingsSubsystem validityPeriod + $ Wire.MeetingsSubsystem.cleanupOldMeetings cutoffTime batchSize diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index a3730bdaf27..59c25f75122 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -57,7 +57,7 @@ import Test.Hspec import Test.QuickCheck import Test.Wire.Util import UnliftIO.Async -import Util.Options (Endpoint (..), PasswordHashingOptions (..)) +import Util.Options import Wire.API.Conversation.Action import Wire.API.Federation.API import Wire.API.Federation.API.Brig @@ -384,6 +384,7 @@ spec = do passwordHashingRateLimitEnv <- newRateLimitEnv defTestRateLimitConfig backendNotificationMetrics <- mkBackendNotificationMetrics + meetingsCleanupMetrics <- mkMeetingsCleanupMetrics workerRunningGauge <- mkWorkerRunningGauge domains <- runAppT Env {..} $ getRemoteDomains (fromJust rabbitmqAdminClient) domains `shouldBe` map Domain ["foo.example", "bar.example", "baz.example"] @@ -435,6 +436,7 @@ spec = do passwordHashingRateLimitEnv <- newRateLimitEnv defTestRateLimitConfig backendNotificationMetrics <- mkBackendNotificationMetrics + meetingsCleanupMetrics <- mkMeetingsCleanupMetrics workerRunningGauge <- mkWorkerRunningGauge domainsThread <- async $ runAppT Env {..} $ getRemoteDomains (fromJust rabbitmqAdminClient) diff --git a/services/background-worker/test/Test/Wire/Util.hs b/services/background-worker/test/Test/Wire/Util.hs index 6aa6afa8c91..8a08533d8df 100644 --- a/services/background-worker/test/Test/Wire/Util.hs +++ b/services/background-worker/test/Test/Wire/Util.hs @@ -48,6 +48,7 @@ testEnv = do } statuses <- newIORef mempty backendNotificationMetrics <- mkBackendNotificationMetrics + meetingsCleanupMetrics <- mkMeetingsCleanupMetrics workerRunningGauge <- mkWorkerRunningGauge httpManager <- newManager defaultManagerSettings let federatorInternal = Endpoint "localhost" 0 diff --git a/services/galley/galley.integration.yaml b/services/galley/galley.integration.yaml index 10a7499f798..0ec0a4c58de 100644 --- a/services/galley/galley.integration.yaml +++ b/services/galley/galley.integration.yaml @@ -93,7 +93,7 @@ settings: maxRateLimitedKeys: 100000 # Estimated memory usage: 4 MB meetings: - validityPeriod: "48h" + validityPeriod: "5s" # We explicitly do not disable any API version. Please make sure the configuration value is the same in all these configs: # brig, cannon, cargohold, galley, gundeck, proxy, spar. diff --git a/services/galley/src/Galley/App.hs b/services/galley/src/Galley/App.hs index e27c5e45ce0..c387e2bc0c5 100644 --- a/services/galley/src/Galley/App.hs +++ b/services/galley/src/Galley/App.hs @@ -103,8 +103,7 @@ import Wire.CodeStore.Cassandra import Wire.CodeStore.DualWrite import Wire.CodeStore.Postgres import Wire.ConversationStore (ConversationStore, MLSCommitLockStore) -import Wire.ConversationStore.Cassandra -import Wire.ConversationStore.Postgres +import Wire.ConversationStore.Cassandra (MigrationError (..), interpretConversationStoreByMigration, interpretMLSCommitLockStoreToCassandra) import Wire.ConversationSubsystem import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemError, GroupInfoCheckEnabled (..), IntraListing (IntraListing), interpretConversationSubsystem) import Wire.CustomBackendStore @@ -142,7 +141,6 @@ import Wire.Options.Galley hiding (brig, endpoint, federator) import Wire.Options.Galley qualified as O import Wire.Options.Keys import Wire.ParseException -import Wire.Postgres (PGConstraints) import Wire.ProposalStore (ProposalStore) import Wire.ProposalStore.Cassandra import Wire.RateLimit @@ -400,21 +398,8 @@ logAndMapError fErr fLog logMsg action = evalGalley :: Env -> Sem GalleyEffects a -> ExceptT JSONResponse IO a evalGalley e = - let convStoreInterpreter :: - forall r a. - ( Member TinyLog r, - PGConstraints r, - Member Async r, - Member (Error MigrationError) r, - Member Race r - ) => - Sem (ConversationStore ': r) a -> - Sem r a - convStoreInterpreter = - case (e ^. options . postgresMigration).conversation of - CassandraStorage -> interpretConversationStoreToCassandra (e ^. cstate) - MigrationToPostgresql -> interpretConversationStoreToCassandraAndPostgres (e ^. cstate) - PostgresqlStorage -> interpretConversationStoreToPostgres + let convStoreInterpreter = + interpretConversationStoreByMigration (e ^. options . postgresMigration).conversation (e ^. cstate) convCodesStoreInterpreter = case (e ^. options . postgresMigration).conversationCodes of CassandraStorage -> interpretCodeStoreToCassandra