Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5-internal/WPB-24076
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add meeting cleaner job in `background-worker`.
10 changes: 5 additions & 5 deletions charts/integration/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ data:
port: 8080

backgroundWorker:
host: backgroundWorker.{{ .Release.Namespace }}.svc.cluster.local
host: background-worker.{{ .Release.Namespace }}.svc.cluster.local
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting finding 👍

port: 8080
# Background jobs defaults for integration tests
backgroundJobs:
Expand Down Expand Up @@ -145,7 +145,7 @@ data:
port: 8080

backgroundWorker:
host: backgroundWorker.{{ .Release.Namespace }}-fed2.svc.cluster.local
host: background-worker.{{ .Release.Namespace }}-fed2.svc.cluster.local
port: 8080

stern:
Expand Down Expand Up @@ -212,7 +212,7 @@ data:
host: proxy.wire-federation-v0.svc.cluster.local
port: 8080
backgroundWorker:
host: backgroundWorker.wire-federation-v0.svc.cluster.local
host: background-worker.wire-federation-v0.svc.cluster.local
port: 8080
stern:
host: stern.wire-federation-v0.svc.cluster.local
Expand Down Expand Up @@ -255,7 +255,7 @@ data:
host: proxy.wire-federation-v1.svc.cluster.local
port: 8080
backgroundWorker:
host: backgroundWorker.wire-federation-v1.svc.cluster.local
host: background-worker.wire-federation-v1.svc.cluster.local
port: 8080
stern:
host: stern.wire-federation-v1.svc.cluster.local
Expand Down Expand Up @@ -298,7 +298,7 @@ data:
host: proxy.wire-federation-v2.svc.cluster.local
port: 8080
backgroundWorker:
host: backgroundWorker.wire-federation-v2.svc.cluster.local
host: background-worker.wire-federation-v2.svc.cluster.local
port: 8080
stern:
host: stern.wire-federation-v2.svc.cluster.local
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ data:
{{toYaml .backendNotificationPusher | indent 6 }}
{{- with .backgroundJobs }}
backgroundJobs:
{{ toYaml . | indent 6 }}
{{- end }}
{{- with .meetingsCleanup }}
meetingsCleanup:
{{ toYaml . | indent 6 }}
{{- end }}
{{- if .postgresMigration }}
Expand Down
9 changes: 9 additions & 0 deletions charts/wire-server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,15 @@ background-worker:
# Total attempts, including the first try
maxAttempts: 3

# Meetings cleanup configuration
meetingsCleanup:
# Delete meetings older than this many hours (48 hours = 2 days)
cleanOlderThanHours: 48.0
# Maximum number of meetings to delete per batch
batchSize: 1000
# Cron schedule for the cleanup job (0 * * * * = every hour)
schedule: "0 * * * *"

# Controls where conversation data is stored/accessed
postgresMigration:
conversation: cassandra
Expand Down
6 changes: 6 additions & 0 deletions hack/helm_vars/wire-server/values.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ galley:
# See helmfile for the real value
federationDomain: integration.example.com
disabledAPIVersions: []
meetings:
validityPeriod: "5s"

# These values are insecure, against anyone getting hold of the hash,
# but its not a concern for the integration tests.
Expand Down Expand Up @@ -644,6 +646,10 @@ background-worker:
concurrency: 8
jobTimeout: 60s
maxAttempts: 3
meetingsCleanup:
cleanOlderThanHours: 0.0014
batchSize: 100
schedule: "* * * * *"
# Cassandra clusters used by background-worker
cassandra:
host: {{ .Values.cassandraHost }}
Expand Down
96 changes: 96 additions & 0 deletions integration/test/Test/Meetings.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ module Test.Meetings where

import API.Galley
import qualified API.GalleyInternal as I
import Control.Monad.Reader (ask)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import Data.Time.Clock
import qualified Data.Time.Format as Time
import SetupHelpers
import System.Timeout (timeout)
import Testlib.Prelude
import Text.Regex.TDFA ((=~))
import UnliftIO.Concurrent (threadDelay)

-- Helper to extract meetingId and domain from a meeting JSON object
getMeetingIdAndDomain :: (HasCallStack) => Value -> App (String, String)
Expand Down Expand Up @@ -359,3 +365,93 @@ testMeetingDeleteUnauthorized = do
meeting <- getJSON 201 r1
(meetingId, domain) <- getMeetingIdAndDomain meeting
deleteMeeting otherUser domain meetingId >>= assertStatus 404

testMeetingCleanup :: (HasCallStack) => App ()
testMeetingCleanup = do
env <- ask
timedOutResult <- liftIO $ timeout (2 * 60 * 1_000_000) $ runAppWithEnv env $ do
-- 2 minutes timeout
(owner, _tid, _members) <- createTeam OwnDomain 1
now <- liftIO getCurrentTime
-- Create a meeting that ends now.
-- Configured retention is 0.0014 hours (~5 seconds).
-- cutoffTime will be now' - 5s.
-- We need end_date < cutoffTime.
-- If we wait 6 seconds, now' = now + 6s.
-- cutoffTime = now + 6s - 5s = now + 1s.
-- end_date (now) < cutoffTime (now + 1s).
let startTime = addUTCTime (negate 3600) now
endTime = now
newMeeting = defaultMeetingJson "Cleanup Test" startTime endTime []

r1 <- postMeetings owner newMeeting
assertSuccess r1
meeting <- getJSON 201 r1
(meetingId, domain) <- getMeetingIdAndDomain meeting

-- Wait 6 seconds to ensure meeting is old enough
liftIO $ threadDelay 6_000_000

-- Wait for cleanup job to run
waitForCleanupJob OwnDomain

-- Check it's gone
getMeeting owner domain meetingId >>= assertStatus 404

case timedOutResult of
Just () -> pure ()
Nothing -> assertFailure "testMeetingCleanup timed out after 2 minutes"

waitForCleanupJob :: (HasCallStack, MakesValue domain) => domain -> App ()
waitForCleanupJob domain = do
initialMetrics <- getMetricsBody domain
let initialCount = getRunCount initialMetrics

waitForIncrease domain initialCount
where
getMetricsBody d = do
getMetrics d BackgroundWorker `bindResponse` \resp -> do
resp.status `shouldMatchInt` 200
pure $ Text.unpack $ Text.decodeUtf8 resp.body

getRunCount metrics =
let (_, _, _, matches) :: (String, String, String, [String]) = (metrics =~ "wire_meetings_cleanup_runs_total ([0-9]+)")
in case matches of
[val] -> read val :: Int
_ -> 0

waitForIncrease d oldVal = do
metrics <- getMetricsBody d
let newVal = getRunCount metrics
-- We wait until it increases.
-- Note: if oldVal was 0 (metric didn't exist), getting 0 again means it hasn't run.
-- If it runs, it should become >= 1.
-- But wait, if matches is empty, we return 0.
-- If the metric appears, it will be >= 1 (initialized at 0? Counter starts at 0).
-- If it runs, it increments.
when (newVal <= oldVal) $ do
liftIO $ threadDelay 1_000_000 -- Wait 1s
waitForIncrease d oldVal

testMeetingExpiration :: (HasCallStack) => App ()
testMeetingExpiration = do
(owner, _tid, _members) <- createTeam OwnDomain 1
now <- liftIO getCurrentTime
let startTime = addUTCTime (negate 3600) now
-- meetingValidityPeriodSeconds is configured to 5 seconds in galley.integration.yaml
endTime = now
newMeeting = defaultMeetingJson "Expiring Meeting" startTime endTime []

r1 <- postMeetings owner newMeeting
assertSuccess r1
meeting <- getJSON 201 r1
(meetingId, domain) <- getMeetingIdAndDomain meeting

-- Check it is accessible immediately (endDate = now, so valid until now + 5s)
getMeeting owner domain meetingId >>= assertStatus 200

-- Wait 6 seconds
liftIO $ threadDelay 6_000_000

-- Check it is expired
getMeeting owner domain meetingId >>= assertStatus 404
20 changes: 20 additions & 0 deletions libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Wire.ConversationStore.Cassandra
( interpretMLSCommitLockStoreToCassandra,
interpretConversationStoreToCassandra,
interpretConversationStoreToCassandraAndPostgres,
interpretConversationStoreByMigration,
MigrationError (..),
)
where
Expand Down Expand Up @@ -80,6 +81,7 @@ import Wire.ConversationStore.Migration.Cleanup
import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres)
import Wire.MigrationLock
import Wire.Postgres
import Wire.PostgresMigrationOpts (StorageLocation (..))
import Wire.Sem.Paging.Cassandra
import Wire.StoredConversation
import Wire.StoredConversation qualified as StoreConv
Expand Down Expand Up @@ -1602,3 +1604,21 @@ withMigrationLocksAndUserCleanup cassClient lockType maxWait userIds action =
. runInputConst cassClient
$ cleanupIfNecessary (Right <$> userIds)
action

interpretConversationStoreByMigration ::
forall r a.
( Member TinyLog r,
PGConstraints r,
Member Async r,
Member (Error MigrationError) r,
Member Race r
) =>
StorageLocation ->
ClientState ->
Sem (ConversationStore ': r) a ->
Sem r a
interpretConversationStoreByMigration storageLocation client =
case storageLocation of
CassandraStorage -> interpretConversationStoreToCassandra client
MigrationToPostgresql -> interpretConversationStoreToCassandraAndPostgres client
PostgresqlStorage -> interpretConversationStoreToPostgres
3 changes: 3 additions & 0 deletions libs/wire-subsystems/src/Wire/ConversationSubsystem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ data ConversationSubsystem m a where
ConnId ->
Local ConvId ->
ConversationSubsystem m (UpdateResult Event)
InternalDeleteLocalConversation ::
Local ConvId ->
ConversationSubsystem m ()
GetMLSPublicKeys ::
Maybe MLSPublicKeyFormat ->
ConversationSubsystem m (MLSKeysByPurpose (MLSKeys SomeKey))
Expand Down
21 changes: 21 additions & 0 deletions libs/wire-subsystems/src/Wire/ConversationSubsystem/Action.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ module Wire.ConversationSubsystem.Action
updateLocalConversationLeave,
updateLocalConversationMemberUpdate,
updateLocalConversationDelete,
updateLocalConversationDeleteUnchecked,
updateLocalConversationRename,
updateLocalConversationMessageTimerUpdate,
updateLocalConversationReceiptModeUpdate,
Expand Down Expand Up @@ -1088,6 +1089,26 @@ updateLocalConversationDelete ::
updateLocalConversationDelete lcnvId uid connId =
updateLocalConversation @'ConversationDeleteTag lcnvId uid connId ()

updateLocalConversationDeleteUnchecked ::
( Member (ErrorS 'InvalidOperation) r,
Member (ErrorS 'ConvNotFound) r,
Member CodeStore r,
Member E.ConversationStore r,
Member (ErrorS 'NotATeamMember) r,
Member ProposalStore r
) =>
Local ConvId ->
Sem r ()
updateLocalConversationDeleteUnchecked lcnv = do
let tag = sing @'ConversationDeleteTag
conv <- getConversationWithError lcnv
-- check that the action does not bypass the underlying protocol
unless (protocolValidAction conv.protocol tag ()) $
throwS @'InvalidOperation
-- perform all authorisation checks and, if successful, then update itself
let lconv = qualifyAs lcnv conv
void $ performAction @'ConversationDeleteTag lconv (error "not used") Nothing ()

updateLocalConversationRename ::
( Member (Error FederationError) r,
Member (ErrorS ('ActionDenied (ConversationActionPermission 'ConversationRenameTag))) r,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import Wire.CodeStore (CodeStore)
import Wire.ConversationStore (ConversationStore)
import Wire.ConversationStore qualified as ConvStore
import Wire.ConversationSubsystem (ConversationSubsystem (..))
import Wire.ConversationSubsystem.Action qualified as Action
import Wire.ConversationSubsystem.Action.Notify qualified as ActionNotify
import Wire.ConversationSubsystem.Clients as Clients
import Wire.ConversationSubsystem.Create qualified as Create
Expand Down Expand Up @@ -210,6 +211,8 @@ interpretConversationSubsystem = interpret $ \case
mapErrors $ Update.postProteusBroadcast lusr con msg
DeleteLocalConversation lusr con lcnv ->
mapErrors $ Update.deleteLocalConversation lusr con lcnv
InternalDeleteLocalConversation lcnv ->
mapErrors $ Action.updateLocalConversationDeleteUnchecked lcnv
GetMLSPublicKeys fmt ->
mapErrors $ MLS.getMLSPublicKeys fmt
ResetMLSConversation lusr reset ->
Expand Down
5 changes: 5 additions & 0 deletions libs/wire-subsystems/src/Wire/MeetingsStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,10 @@ data MeetingsStore m a where
MeetingId ->
[EmailAddress] ->
MeetingsStore m ()
-- Cleanup operations
GetOldMeetings ::
UTCTime ->
Int ->
MeetingsStore m [StoredMeeting]

makeSem ''MeetingsStore
34 changes: 34 additions & 0 deletions libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ interpretMeetingsStoreToPostgres =
addInvitedEmailsImpl meetingId email
RemoveInvitedEmails meetingId emails ->
removeInvitedEmailsImpl meetingId emails
GetOldMeetings cutoffTime batchSize ->
getOldMeetingsImpl cutoffTime batchSize

-- * Create

Expand Down Expand Up @@ -395,3 +397,35 @@ removeInvitedEmailsImpl meetingId emails = do
updated_at = NOW()
WHERE id = ($2 :: uuid)
|]

getOldMeetingsImpl ::
( Member (Input Pool) r,
Member (Embed IO) r,
Member (Error UsageError) r
) =>
UTCTime ->
Int ->
Sem r [StoredMeeting]
getOldMeetingsImpl cutoffTime batchSize = do
pool <- input
result <- liftIO $ use pool session
either throw pure result
where
session :: Session [StoredMeeting]
session = statement (cutoffTime, fromIntegral batchSize) $ V.toList <$> listStatement
listStatement :: Statement (UTCTime, Int32) (V.Vector StoredMeeting)
listStatement =
refineResult
(traverse (postgresUnmarshall @StoredMeetingTuple @StoredMeeting))
$ [vectorStatement|
SELECT
id :: uuid, title :: text, creator :: uuid,
start_time :: timestamptz, end_time :: timestamptz,
recurrence_frequency :: text?, recurrence_interval :: int4?, recurrence_until :: timestamptz?,
conversation_id :: uuid, invited_emails :: text[], trial :: boolean,
created_at :: timestamptz, updated_at :: timestamptz
FROM meetings
WHERE end_time < ($1 :: timestamptz)
ORDER BY end_time ASC
LIMIT ($2 :: int4)
|]
5 changes: 5 additions & 0 deletions libs/wire-subsystems/src/Wire/MeetingsSubsystem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Wire.MeetingsSubsystem where

import Data.Id
import Data.Qualified
import Data.Time.Clock (UTCTime)
import Imports
import Polysemy
import Wire.API.Meeting
Expand Down Expand Up @@ -59,5 +60,9 @@ data MeetingsSubsystem m a where
Qualified MeetingId ->
[EmailAddress] ->
MeetingsSubsystem m Bool
CleanupOldMeetings ::
UTCTime ->
Int ->
MeetingsSubsystem m Int64

makeSem ''MeetingsSubsystem
Loading