Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5-internal/WPB-24076
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add meeting cleaner job in `background-worker`.
10 changes: 5 additions & 5 deletions charts/integration/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ data:
port: 8080

backgroundWorker:
host: backgroundWorker.{{ .Release.Namespace }}.svc.cluster.local
host: background-worker.{{ .Release.Namespace }}.svc.cluster.local
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting finding 👍

port: 8080
# Background jobs defaults for integration tests
backgroundJobs:
Expand Down Expand Up @@ -145,7 +145,7 @@ data:
port: 8080

backgroundWorker:
host: backgroundWorker.{{ .Release.Namespace }}-fed2.svc.cluster.local
host: background-worker.{{ .Release.Namespace }}-fed2.svc.cluster.local
port: 8080

stern:
Expand Down Expand Up @@ -212,7 +212,7 @@ data:
host: proxy.wire-federation-v0.svc.cluster.local
port: 8080
backgroundWorker:
host: backgroundWorker.wire-federation-v0.svc.cluster.local
host: background-worker.wire-federation-v0.svc.cluster.local
port: 8080
stern:
host: stern.wire-federation-v0.svc.cluster.local
Expand Down Expand Up @@ -255,7 +255,7 @@ data:
host: proxy.wire-federation-v1.svc.cluster.local
port: 8080
backgroundWorker:
host: backgroundWorker.wire-federation-v1.svc.cluster.local
host: background-worker.wire-federation-v1.svc.cluster.local
port: 8080
stern:
host: stern.wire-federation-v1.svc.cluster.local
Expand Down Expand Up @@ -298,7 +298,7 @@ data:
host: proxy.wire-federation-v2.svc.cluster.local
port: 8080
backgroundWorker:
host: backgroundWorker.wire-federation-v2.svc.cluster.local
host: background-worker.wire-federation-v2.svc.cluster.local
port: 8080
stern:
host: stern.wire-federation-v2.svc.cluster.local
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ data:
{{toYaml .backendNotificationPusher | indent 6 }}
{{- with .backgroundJobs }}
backgroundJobs:
{{ toYaml . | indent 6 }}
{{- end }}
{{- with .meetingsCleanup }}
meetingsCleanup:
{{ toYaml . | indent 6 }}
{{- end }}
{{- if .postgresMigration }}
Expand Down
9 changes: 9 additions & 0 deletions charts/wire-server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,15 @@ background-worker:
# Total attempts, including the first try
maxAttempts: 3

# Meetings cleanup configuration
meetingsCleanup:
# Delete meetings older than this many hours (48 hours = 2 days)
cleanOlderThanHours: 48.0
# Maximum number of meetings to delete per batch
batchSize: 1000
# Cron schedule for the cleanup job (0 * * * * = every hour)
schedule: "0 * * * *"

# Controls where conversation data is stored/accessed
postgresMigration:
conversation: cassandra
Expand Down
6 changes: 6 additions & 0 deletions hack/helm_vars/wire-server/values.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ galley:
# See helmfile for the real value
federationDomain: integration.example.com
disabledAPIVersions: []
meetings:
validityPeriod: "5s"

# These values are insecure, against anyone getting hold of the hash,
# but its not a concern for the integration tests.
Expand Down Expand Up @@ -644,6 +646,10 @@ background-worker:
concurrency: 8
jobTimeout: 60s
maxAttempts: 3
meetingsCleanup:
cleanOlderThanHours: 0.0014
batchSize: 100
schedule: "* * * * *"
# Cassandra clusters used by background-worker
cassandra:
host: {{ .Values.cassandraHost }}
Expand Down
96 changes: 96 additions & 0 deletions integration/test/Test/Meetings.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ module Test.Meetings where

import API.Galley
import qualified API.GalleyInternal as I
import Control.Monad.Reader (ask)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import Data.Time.Clock
import qualified Data.Time.Format as Time
import SetupHelpers
import System.Timeout (timeout)
import Testlib.Prelude
import Text.Regex.TDFA ((=~))
import UnliftIO.Concurrent (threadDelay)

-- Helper to extract meetingId and domain from a meeting JSON object
getMeetingIdAndDomain :: (HasCallStack) => Value -> App (String, String)
Expand Down Expand Up @@ -359,3 +365,93 @@ testMeetingDeleteUnauthorized = do
meeting <- getJSON 201 r1
(meetingId, domain) <- getMeetingIdAndDomain meeting
deleteMeeting otherUser domain meetingId >>= assertStatus 404

testMeetingCleanup :: (HasCallStack) => App ()
testMeetingCleanup = do
env <- ask
timedOutResult <- liftIO $ timeout (2 * 60 * 1_000_000) $ runAppWithEnv env $ do
-- 2 minutes timeout
(owner, _tid, _members) <- createTeam OwnDomain 1
now <- liftIO getCurrentTime
-- Create a meeting that ends now.
-- Configured retention is 0.0014 hours (~5 seconds).
-- cutoffTime will be now' - 5s.
-- We need end_date < cutoffTime.
-- If we wait 6 seconds, now' = now + 6s.
-- cutoffTime = now + 6s - 5s = now + 1s.
-- end_date (now) < cutoffTime (now + 1s).
let startTime = addUTCTime (negate 3600) now
endTime = now
newMeeting = defaultMeetingJson "Cleanup Test" startTime endTime []

r1 <- postMeetings owner newMeeting
assertSuccess r1
meeting <- getJSON 201 r1
(meetingId, domain) <- getMeetingIdAndDomain meeting

-- Wait 6 seconds to ensure meeting is old enough
liftIO $ threadDelay 6_000_000

-- Wait for cleanup job to run
waitForCleanupJob OwnDomain

-- Check it's gone
getMeeting owner domain meetingId >>= assertStatus 404

case timedOutResult of
Just () -> pure ()
Nothing -> assertFailure "testMeetingCleanup timed out after 2 minutes"

waitForCleanupJob :: (HasCallStack, MakesValue domain) => domain -> App ()
waitForCleanupJob domain = do
initialMetrics <- getMetricsBody domain
let initialCount = getRunCount initialMetrics

waitForIncrease domain initialCount
where
getMetricsBody d = do
getMetrics d BackgroundWorker `bindResponse` \resp -> do
resp.status `shouldMatchInt` 200
pure $ Text.unpack $ Text.decodeUtf8 resp.body

getRunCount metrics =
let (_, _, _, matches) :: (String, String, String, [String]) = (metrics =~ "wire_meetings_cleanup_runs_total ([0-9]+)")
in case matches of
[val] -> read val :: Int
_ -> 0

waitForIncrease d oldVal = do
metrics <- getMetricsBody d
let newVal = getRunCount metrics
-- We wait until it increases.
-- Note: if oldVal was 0 (metric didn't exist), getting 0 again means it hasn't run.
-- If it runs, it should become >= 1.
-- But wait, if matches is empty, we return 0.
-- If the metric appears, it will be >= 1 (initialized at 0? Counter starts at 0).
-- If it runs, it increments.
when (newVal <= oldVal) $ do
liftIO $ threadDelay 1_000_000 -- Wait 1s
waitForIncrease d oldVal

testMeetingExpiration :: (HasCallStack) => App ()
testMeetingExpiration = do
(owner, _tid, _members) <- createTeam OwnDomain 1
now <- liftIO getCurrentTime
let startTime = addUTCTime (negate 3600) now
-- meetingValidityPeriodSeconds is configured to 5 seconds in galley.integration.yaml
endTime = now
newMeeting = defaultMeetingJson "Expiring Meeting" startTime endTime []

r1 <- postMeetings owner newMeeting
assertSuccess r1
meeting <- getJSON 201 r1
(meetingId, domain) <- getMeetingIdAndDomain meeting

-- Check it is accessible immediately (endDate = now, so valid until now + 5s)
getMeeting owner domain meetingId >>= assertStatus 200

-- Wait 6 seconds
liftIO $ threadDelay 6_000_000

-- Check it is expired
getMeeting owner domain meetingId >>= assertStatus 404
8 changes: 8 additions & 0 deletions libs/wire-subsystems/src/Wire/MeetingsStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,13 @@ data MeetingsStore m a where
MeetingId ->
[EmailAddress] ->
MeetingsStore m ()
-- Cleanup operations
GetOldMeetings ::
UTCTime ->
Int ->
MeetingsStore m [StoredMeeting]
DeleteMeetingBatch ::
[MeetingId] ->
MeetingsStore m Int64

makeSem ''MeetingsStore
57 changes: 57 additions & 0 deletions libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ interpretMeetingsStoreToPostgres =
addInvitedEmailsImpl meetingId email
RemoveInvitedEmails meetingId emails ->
removeInvitedEmailsImpl meetingId emails
GetOldMeetings cutoffTime batchSize ->
getOldMeetingsImpl cutoffTime batchSize
DeleteMeetingBatch meetingIds ->
deleteMeetingBatchImpl meetingIds

-- * Create

Expand Down Expand Up @@ -395,3 +399,56 @@ removeInvitedEmailsImpl meetingId emails = do
updated_at = NOW()
WHERE id = ($2 :: uuid)
|]

getOldMeetingsImpl ::
( Member (Input Pool) r,
Member (Embed IO) r,
Member (Error UsageError) r
) =>
UTCTime ->
Int ->
Sem r [StoredMeeting]
getOldMeetingsImpl cutoffTime batchSize = do
pool <- input
result <- liftIO $ use pool session
either throw pure result
where
session :: Session [StoredMeeting]
session = statement (cutoffTime, fromIntegral batchSize) $ V.toList <$> listStatement
listStatement :: Statement (UTCTime, Int32) (V.Vector StoredMeeting)
listStatement =
refineResult
(traverse (postgresUnmarshall @StoredMeetingTuple @StoredMeeting))
$ [vectorStatement|
SELECT
id :: uuid, title :: text, creator :: uuid,
start_time :: timestamptz, end_time :: timestamptz,
recurrence_frequency :: text?, recurrence_interval :: int4?, recurrence_until :: timestamptz?,
conversation_id :: uuid, invited_emails :: text[], trial :: boolean,
created_at :: timestamptz, updated_at :: timestamptz
FROM meetings
WHERE end_time < ($1 :: timestamptz)
ORDER BY end_time ASC
LIMIT ($2 :: int4)
|]

deleteMeetingBatchImpl ::
( Member (Input Pool) r,
Member (Embed IO) r,
Member (Error UsageError) r
) =>
[MeetingId] ->
Sem r Int64
deleteMeetingBatchImpl meetingIds = do
pool <- input
result <- liftIO $ use pool session
either throw pure result
where
session :: Session Int64
session = statement (V.fromList (toUUID <$> meetingIds)) deleteStatement
deleteStatement :: Statement (V.Vector UUID) Int64
deleteStatement =
[rowsAffectedStatement|
DELETE FROM meetings
WHERE id IN (SELECT unnest($1::uuid[]))
|]
32 changes: 32 additions & 0 deletions libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{-# LANGUAGE TemplateHaskell #-}

-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2026 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Wire.MeetingsSubsystemCleaning where

import Data.Time.Clock (UTCTime)
import Imports
import Polysemy

data MeetingsSubsystemCleaning m a where
CleanupOldMeetings ::
Comment thread
battermann marked this conversation as resolved.
Outdated
UTCTime ->
Int ->
MeetingsSubsystemCleaning m Int64

makeSem ''MeetingsSubsystemCleaning
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{-# LANGUAGE DuplicateRecordFields #-}

-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2026 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Wire.MeetingsSubsystemCleaning.Interpreter where
Comment thread
battermann marked this conversation as resolved.
Outdated

import Data.Time.Clock (UTCTime)
import Imports
import Polysemy
import Wire.API.Conversation (GroupConvType (MeetingConversation), cnvmGroupConvType)
import Wire.ConversationStore qualified as ConvStore
import Wire.MeetingsStore qualified as Store
import Wire.MeetingsSubsystemCleaning
import Wire.StoredConversation (StoredConversation (..))

interpretMeetingsSubsystemCleaning ::
( Member Store.MeetingsStore r,
Member ConvStore.ConversationStore r
) =>
InterpreterFor MeetingsSubsystemCleaning r
interpretMeetingsSubsystemCleaning = interpret $ \case
CleanupOldMeetings cutoffTime batchSize ->
cleanupOldMeetingsImpl cutoffTime batchSize

cleanupOldMeetingsImpl ::
( Member Store.MeetingsStore r,
Member ConvStore.ConversationStore r
) =>
UTCTime ->
Int ->
Sem r Int64
cleanupOldMeetingsImpl cutoffTime batchSize = do
oldMeetings <- Store.getOldMeetings cutoffTime batchSize
if null oldMeetings
then pure 0
else do
-- 2. Delete associated conversations first (before meetings)
-- This ensures proper cleanup: conversation data should be removed before meeting records
-- We only delete conversations that are meeting conversations (GroupConvType = MeetingConversation)
for_ oldMeetings $ \meeting -> do
maybeConv <- ConvStore.getConversation meeting.conversationId
case maybeConv of
Just conv
| conv.metadata.cnvmGroupConvType == Just MeetingConversation,
conv.id_ == meeting.conversationId ->
ConvStore.deleteConversation meeting.conversationId
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that we should use the MeetingSubsystem to delete the meeting because that properly cleans up the conversation data. Here AFAICT we only delete the conversation via the ConversationStore but instead we should use the handler from the ConversationSubsystem because that takes care of deleting all the other stuff that is related to a conversation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to pair program on this tomorrow because regular meeting deletion requires a Qualified UserId and a ConnId

_ -> pure ()

-- 3. Now delete the meeting records from the database
Store.deleteMeetingBatch $ map (\Store.StoredMeeting {id = mid} -> mid) oldMeetings
Loading