diff --git a/changelog.d/0-release-notes/WPB-22963 b/changelog.d/0-release-notes/WPB-22963 new file mode 100644 index 00000000000..db32715b668 --- /dev/null +++ b/changelog.d/0-release-notes/WPB-22963 @@ -0,0 +1,5 @@ +- `postgresMigration` now has a single source of truth in the Galley chart values. Galley, Brig, and background-worker all read their PostgreSQL migration settings from there. +- If your deployment overrides the full `postgresMigration` object, add the new `domainRegistration` field to that override. Otherwise services may fail to start because the config is incomplete. +- To migrate domain registration data to PostgreSQL, set `postgresMigration.domainRegistration` to `migration-to-postgresql`, run the background-worker migration with `migrateDomainRegistration: true`, and switch the setting to `postgresql` after completion. +- The domain registration migration covers these Cassandra tables: + `domain_registration`, `domain_registration_by_team`, and `domain_registration_challenge`. diff --git a/charts/wire-server/templates/background-worker/configmap.yaml b/charts/wire-server/templates/background-worker/configmap.yaml index 7c8ef9aee43..594bd5d1a92 100644 --- a/charts/wire-server/templates/background-worker/configmap.yaml +++ b/charts/wire-server/templates/background-worker/configmap.yaml @@ -83,6 +83,7 @@ data: migrateConversations: {{ .migrateConversations }} migrateConversationCodes: {{ .migrateConversationCodes }} migrateTeamFeatures: {{ .migrateTeamFeatures }} + migrateDomainRegistration: {{ .migrateDomainRegistration }} migrateConversationsOptions: {{toYaml .migrateConversationsOptions | indent 6 }} @@ -92,7 +93,7 @@ data: backgroundJobs: {{ toYaml . | indent 6 }} {{- end }} - {{- if .postgresMigration }} - postgresMigration: {{- toYaml .postgresMigration | nindent 6 }} + {{- if $.Values.galley.config.postgresMigration }} + postgresMigration: {{- toYaml $.Values.galley.config.postgresMigration | nindent 6 }} {{- end }} {{- end }} diff --git a/charts/wire-server/templates/brig/configmap.yaml b/charts/wire-server/templates/brig/configmap.yaml index 66435c8d799..ad9ee08bf8f 100644 --- a/charts/wire-server/templates/brig/configmap.yaml +++ b/charts/wire-server/templates/brig/configmap.yaml @@ -37,6 +37,7 @@ data: {{- if hasKey $.Values.brig.secrets "pgPassword" }} postgresqlPassword: /etc/wire/brig/secrets/pgPassword {{- end }} + postgresMigration: {{- toYaml $.Values.galley.config.postgresMigration | nindent 6 }} elasticsearch: url: {{ .elasticsearch.scheme }}://{{ .elasticsearch.host }}:{{ .elasticsearch.port }} diff --git a/charts/wire-server/values.yaml b/charts/wire-server/values.yaml index 77dd4b6953d..fc50c17dfad 100644 --- a/charts/wire-server/values.yaml +++ b/charts/wire-server/values.yaml @@ -89,6 +89,7 @@ galley: conversation: cassandra conversationCodes: cassandra teamFeatures: cassandra + domainRegistration: cassandra settings: httpPoolSize: 128 maxTeamSize: 10000 @@ -962,6 +963,10 @@ background-worker: # It's important to set `settings.postgresMigration.teamFeatures` to `migration-to-postgresql` # before starting the migration. migrateTeamFeatures: false + # This will start the migration of domain registration data. + # It's important to set `settings.postgresMigration.domainRegistration` to `migration-to-postgresql` + # before starting the migration. + migrateDomainRegistration: false backendNotificationPusher: pushBackoffMinWait: 10000 # in microseconds, so 10ms @@ -977,12 +982,6 @@ background-worker: # Total attempts, including the first try maxAttempts: 3 - # Controls where conversation data is stored/accessed - postgresMigration: - conversation: cassandra - conversationCodes: cassandra - teamFeatures: cassandra - secrets: {} diff --git a/docs/src/developer/reference/config-options.md b/docs/src/developer/reference/config-options.md index edda1f74f53..4a269a7fe45 100644 --- a/docs/src/developer/reference/config-options.md +++ b/docs/src/developer/reference/config-options.md @@ -1877,12 +1877,13 @@ parameters](https://www.postgresql.org/docs/17/libpq-connect.html#LIBPQ-PARAMKEY The `postgresqlPassword` file is read by `brig`, `galley`, and `background-worker`. Its content is used as `password` field. -### Using PostgreSQL for storing conversation data +### Using PostgreSQL for storing Cassandra-backed data #### New Installations -For new installations, configure both `galley` and `background-worker` to use -PostgreSQL for conversation data: +For new installations, configure `galley.config.postgresMigration` to use +PostgreSQL for migrated Cassandra-backed data. In the Helm charts, this is the single source +of truth and is consumed by `galley`, `brig`, and `background-worker`: ```yaml galley: @@ -1891,35 +1892,43 @@ galley: conversation: postgresql conversationCodes: postgresql teamFeatures: postgresql + domainRegistration: postgresql background-worker: config: - postgresMigration: - conversation: postgresql - conversationCodes: postgresql - teamFeatures: postgresql migrateConversations: false + migrateConversationCodes: false + migrateTeamFeatures: false + migrateDomainRegistration: false ``` #### Migration for existing installations -Existing installations should migrate conversation data to PostgreSQL from -Cassandra. This is necessary for channel search and management of channels from -the team-management UI. It is highly recommended to take a backup of the Galley -Cassandra before triggering the migration. +Existing installations should migrate Cassandra-backed data to PostgreSQL over +time. For conversations, this is necessary for channel search and management of +channels from the team-management UI. It is highly recommended to take a backup +of the affected Cassandra data before triggering a migration. Migrations are independent and can be run separately, in batches, or all at once. This is expected, because migrations will be released over time. The -pattern below applies per store. Use it for `conversation` and -`conversationCodes` now, and for future stores as they are added. +pattern below applies per `postgresMigration` setting. A single setting may +cover multiple Cassandra tables, depending on the store. -**Migration pattern per store(s)** +The current settings and their background-worker flags are: -1. Prepare the selected store(s) for migration by setting - `postgresMigration.` to `migration-to-postgresql`. This enables the - migration interpreter for that store, which ensures data is written to +- `conversation` -> `migrateConversations` +- `conversationCodes` -> `migrateConversationCodes` +- `teamFeatures` -> `migrateTeamFeatures` +- `domainRegistration` -> `migrateDomainRegistration` + +**Migration pattern per migration setting** + +1. Prepare the selected migration setting(s) for migration by setting + `postgresMigration.` to `migration-to-postgresql`. This enables the + migration interpreter for that setting, which ensures data is written to PostgreSQL (store-specific details are handled internally). - The configuration must be consistent across `galley` and - `background-worker`. + In the Helm charts, configure this only under `galley.config.postgresMigration`. + `brig` and `background-worker` consume the same settings from there, so the + migration configuration remains consistent across services. ```yaml galley: @@ -1928,21 +1937,19 @@ pattern below applies per store. Use it for `conversation` and conversation: migration-to-postgresql conversationCodes: migration-to-postgresql teamFeatures: migration-to-postgresql + domainRegistration: cassandra background-worker: config: - postgresMigration: - conversation: migration-to-postgresql - conversationCodes: migration-to-postgresql - teamFeatures: migration-to-postgresql - migrateConversations: false - migrateConversationCodes: false - migrateTeamFeatures: false + migrateConversations: false + migrateConversationCodes: false + migrateTeamFeatures: false + migrateDomainRegistration: false ``` - This change should restart all the galley pods, and new writes will follow - the migration interpreter. + This change should restart the affected pods, and new writes will follow the + migration interpreter. -2. Run the backfill for the selected store(s) via background-worker. +2. Run the backfill for the selected migration setting(s) via background-worker. ```yaml background-worker: @@ -1950,6 +1957,7 @@ pattern below applies per store. Use it for `conversation` and migrateConversations: true migrateConversationCodes: true migrateTeamFeatures: true + migrateDomainRegistration: true ``` During migration, Cassandra rows are not deleted. Writes and migration share @@ -1957,13 +1965,18 @@ pattern below applies per store. Use it for `conversation` and deferred to keep rollback options and to remove Cassandra only after a full cutover to PostgreSQL-only. - Wait for the store-specific migration metrics to reach `1.0`. For - conversations: `wire_local_convs_migration_finished` and - `wire_user_remote_convs_migration_finished`. For conversation codes: - `wire_conv_codes_migration_finished`. + Wait for the setting-specific migration metrics to reach `1.0`. Metric names + are store-specific. Current examples are: -3. Cut over reads and writes to PostgreSQL for the selected store(s). This - configuration must be used from now on for every new release. + - `conversation`: `wire_local_convs_migration_finished` and + `wire_user_remote_convs_migration_finished` + - `conversationCodes`: `wire_conv_codes_migration_finished` + - `teamFeatures`: `wire_team_features_migration_finished` + - `domainRegistration`: `wire_domain_registration_migration_finished` + +3. Cut over reads and writes to PostgreSQL for the selected migration + setting(s). This configuration must be used from now on for every new + release. ```yaml galley: @@ -1972,24 +1985,26 @@ pattern below applies per store. Use it for `conversation` and conversation: postgresql conversationCodes: postgresql teamFeatures: postgresql + domainRegistration: cassandra background-worker: config: - postgresMigration: - conversation: postgresql - conversationCodes: postgresql - teamFeatures: postgresql - migrateConversations: false - migrateConversationCodes: false - migrateTeamFeatures: false + migrateConversations: false + migrateConversationCodes: false + migrateTeamFeatures: false + migrateDomainRegistration: false ``` **How to run migrations independently or in batches** -- To migrate a single store, set only that store’s `postgresMigration.` - and `migrate` flags; leave others unchanged. -- To migrate a batch, set multiple stores to `migration-to-postgresql` and - enable only the matching `migrate` flags together. +- To migrate a single setting, set only that setting’s + `postgresMigration.` and matching `migrate<...>` flag; leave + others unchanged. +- To migrate a batch, set multiple settings to `migration-to-postgresql` and + enable only the matching `migrate<...>` flags together. - To reduce load, run large stores alone and group small stores together. +- Some settings cover multiple Cassandra tables. For example, + `postgresMigration.domainRegistration` covers `domain_registration`, + `domain_registration_by_team`, and `domain_registration_challenge`. ## Configure Cells @@ -2061,15 +2076,11 @@ postgresqlPool: agingTimeout: 1d idlenessTimeout: 10m -# Controls where conversation data is read/written -postgresMigration: - # Valid: cassandra | migration-to-postgresql | postgresql - conversation: postgresql - conversationCodes: postgresql - teamFeatures: postgresql - -# Start the migration worker when true +# Start migration workers when true migrateConversations: false +migrateConversationCodes: false +migrateTeamFeatures: false +migrateDomainRegistration: false # Background jobs consumer backgroundJobs: @@ -2089,7 +2100,7 @@ Notes - `postgresql` values follow libpq keywords; password is sourced via `secrets.pgPassword`. - RabbitMQ admin fields (`adminHost`, `adminPort`) are templated only when `config.enableFederation` is true. -- `postgresMigration.` must match between `galley` and `background-worker` during migration phases. -- `migrateConversations: true` triggers the conversation migration job; leave it `false` for new installs and after migration. +- In the Helm charts, `background-worker` reads `postgresMigration` from `galley.config.postgresMigration`. +- The `migrate...` flags control the corresponding PostgreSQL backfill jobs for the current migration settings; leave them `false` for new installs and after migration. - `concurrency`, `jobTimeout`, and `maxAttempts` control parallelism and retry behavior of the consumer. - `brig` and `gundeck` endpoints default to in-cluster services; override via `background-worker.config.brig` and `.gundeck` if your service DNS/ports differ. diff --git a/hack/helm_vars/common.yaml.gotmpl b/hack/helm_vars/common.yaml.gotmpl index 7cd9bb5fac5..17b0dbd6005 100644 --- a/hack/helm_vars/common.yaml.gotmpl +++ b/hack/helm_vars/common.yaml.gotmpl @@ -17,6 +17,7 @@ dynBackendDomain3: dynamic-backend-3.{{ requiredEnv "NAMESPACE_1" }}.svc.cluster conversationStore: {{ $preferredStore }} conversationCodesStore: {{ $preferredStore }} teamFeaturesStore: {{ $preferredStore }} +domainRegistration: {{ $preferredStore }} {{- if (eq (env "UPLOAD_XML_S3_BASE_URL") "") }} uploadXml: {} diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index fe0a19d0682..8ed568c72ca 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -304,6 +304,7 @@ galley: conversation: {{ .Values.conversationStore }} conversationCodes: {{ .Values.conversationCodesStore }} teamFeatures: {{ .Values.teamFeaturesStore }} + domainRegistration: {{ .Values.domainRegistration }} settings: maxConvAndTeamSize: 16 maxTeamSize: 32 @@ -661,10 +662,6 @@ background-worker: name: "cassandra-jks-keystore" key: "ca.crt" {{- end }} - postgresMigration: - conversation: {{ .Values.conversationStore }} - conversationCodes: {{ .Values.conversationCodesStore }} - teamFeatures: {{ .Values.teamFeaturesStore }} rabbitmq: port: 5671 adminPort: 15671 diff --git a/integration/integration.cabal b/integration/integration.cabal index 2a4fb71b60d..49fc43bcf5a 100644 --- a/integration/integration.cabal +++ b/integration/integration.cabal @@ -176,6 +176,7 @@ library Test.MessageTimer Test.Migration.Conversation Test.Migration.ConversationCodes + Test.Migration.DomainRegistration Test.Migration.TeamFeatures Test.Migration.Util Test.MLS diff --git a/integration/test/Test/Migration/DomainRegistration.hs b/integration/test/Test/Migration/DomainRegistration.hs new file mode 100644 index 00000000000..3a7915b3242 --- /dev/null +++ b/integration/test/Test/Migration/DomainRegistration.hs @@ -0,0 +1,186 @@ +module Test.Migration.DomainRegistration (testDomainRegistrationMigration) where + +import qualified API.Brig as Brig +import qualified API.BrigInternal as BrigInternal +import API.Common +import qualified API.GalleyInternal as GalleyInternal +import Control.Error (MaybeT (..)) +import Control.Monad.Codensity +import Control.Monad.Reader +import SetupHelpers +import Test.DNSMock +import Test.Migration.Util (waitForMigration) +import Testlib.Prelude +import Testlib.ResourcePool + +data DomainRegistrationTestCase = TeamFlow TeamStep | OnPremFlow OnPremStep + +type EmailDomain = String + +type AuthToken = String + +type TeamId = String + +type Owner = Value + +type Config = Value + +type OwnershipToken = String + +data OnPremStep + = PreAuthorization EmailDomain + | SetupChallenge EmailDomain + | VerifyDomain EmailDomain ChallengeSetup + | PostConfig EmailDomain AuthToken Config + | OnPremVerify EmailDomain Config + | OnPremSuccess EmailDomain Config + +data TeamStep + = TeamSetupChallenge (Owner, TeamId) EmailDomain + | TeamVerifyDomain (Owner, TeamId) EmailDomain ChallengeSetup + | TeamAuthorizeTeam (Owner, TeamId) EmailDomain OwnershipToken + | TeamUpdateConfig (Owner, TeamId) EmailDomain + | TeamSuccess (Owner, TeamId) EmailDomain + +testDomainRegistrationMigration :: (HasCallStack) => App () +testDomainRegistrationMigration = do + resourcePool <- asks (.resourcePool) + runCodensity (acquireResources 1 resourcePool) $ \[backend] -> do + let domain = backend.berDomain + let initTestCases = do + [t1, t2, t3, t4] <- replicateM 4 $ OnPremFlow . PreAuthorization <$> randomDomain + [t5, t6, t7, t8] <- replicateM 4 $ do + (owner, tid, _) <- createTeam domain 1 + GalleyInternal.setTeamFeatureLockStatus owner tid "domainRegistration" "unlocked" + GalleyInternal.setTeamFeatureStatus owner tid "domainRegistration" "enabled" >>= assertSuccess + TeamFlow . TeamSetupChallenge (owner, tid) <$> randomDomain + + sequence + [ pure t1, + runStep domain t2, + runStep domain t3 >>= runStep domain, + runStep domain t4 >>= runStep domain >>= runStep domain, + pure t5, + runStep domain t6, + runStep domain t7 >>= runStep domain, + runStep domain t8 >>= runStep domain >>= runStep domain + ] + + testCases1 <- runCodensity (startDynamicBackend backend (conf "cassandra" False)) . const $ do + testCases0 <- initTestCases + nextStepCases <- for testCases0 (runStep domain) + newCases <- initTestCases + pure $ nextStepCases <> newCases + + testCases2 <- runCodensity (startDynamicBackend backend (conf "migration-to-postgresql" False)) . const $ do + nextStepCases <- for testCases1 (runStep domain) + newCases <- initTestCases + pure $ nextStepCases <> newCases + + testCases3 <- runCodensity (startDynamicBackend backend (conf "migration-to-postgresql" True)) . const $ do + nextStepCases <- for testCases2 (runStep domain) + newCases <- initTestCases + waitForMigration domain counterName + + nextStepCases' <- for (nextStepCases <> newCases) (runStep domain) + newCases' <- initTestCases + pure $ nextStepCases' <> newCases' + + runCodensity (startDynamicBackend backend (conf "postgresql" False)) . const $ do + for_ testCases3 (runAll domain) + where + runStep :: (HasCallStack) => String -> DomainRegistrationTestCase -> App DomainRegistrationTestCase + -- TEAM FLOW + runStep domain (TeamFlow (TeamSetupChallenge team emailDomain)) = do + challenge <- setupChallenge domain emailDomain + registerTechnitiumRecord challenge.technitiumToken emailDomain ("wire-domain." <> emailDomain) "TXT" challenge.dnsToken + pure $ TeamFlow $ TeamVerifyDomain team emailDomain challenge + runStep _ (TeamFlow (TeamVerifyDomain team@(owner, _) emailDomain challenge)) = do + token <- bindResponse (Brig.verifyDomainForTeam owner emailDomain challenge.challengeId challenge.challengeToken) $ \resp -> do + resp.status `shouldMatchInt` 200 + resp.json %. "domain_ownership_token" & asString + pure $ TeamFlow $ TeamAuthorizeTeam team emailDomain token + runStep _ (TeamFlow (TeamAuthorizeTeam team@(owner, _) emailDomain token)) = do + Brig.authorizeTeam owner emailDomain token >>= assertStatus 200 + pure $ TeamFlow $ TeamUpdateConfig team emailDomain + runStep domain (TeamFlow (TeamUpdateConfig team@(owner, tid) emailDomain)) = do + bindResponse (Brig.updateTeamInvite owner emailDomain (object ["team_invite" .= "team", "team" .= tid])) $ \res -> do + res.status `shouldMatchInt` 200 + verifyTeamConfig domain tid emailDomain + pure $ TeamFlow $ TeamSuccess team emailDomain + runStep domain (TeamFlow (TeamSuccess team@(_, tid) emailDomain)) = do + verifyTeamConfig domain tid emailDomain + pure $ TeamFlow $ TeamSuccess team emailDomain + -- ON PREM FLOW + runStep domain (OnPremFlow (PreAuthorization emailDomain)) = do + BrigInternal.domainRegistrationPreAuthorize domain emailDomain >>= assertStatus 204 + pure $ OnPremFlow $ SetupChallenge emailDomain + runStep domain (OnPremFlow (SetupChallenge emailDomain)) = do + challenge <- setupChallenge domain emailDomain + registerTechnitiumRecord challenge.technitiumToken emailDomain ("wire-domain." <> emailDomain) "TXT" challenge.dnsToken + pure $ OnPremFlow $ VerifyDomain emailDomain challenge + runStep domain (OnPremFlow (VerifyDomain emailDomain challenge)) = do + bindResponse (BrigInternal.getDomainRegistration domain emailDomain) $ \res -> do + res.status `shouldMatchInt` 200 + token <- bindResponse (Brig.verifyDomain domain emailDomain challenge.challengeId challenge.challengeToken) $ \resp -> do + resp.status `shouldMatchInt` 200 + resp.json %. "domain_ownership_token" & asString + let config = mkDomainRedirectBackend "https://wire.example.com" "https://webapp.wire.example.com" + pure $ OnPremFlow $ PostConfig emailDomain token config + runStep domain (OnPremFlow (PostConfig emailDomain token config)) = do + Brig.updateDomainRedirect domain Versioned emailDomain (Just token) config + >>= assertStatus 200 + pure $ OnPremFlow (OnPremVerify emailDomain config) + runStep domain (OnPremFlow (OnPremVerify emailDomain config)) = do + verifyOnPremConfig domain emailDomain config + pure $ OnPremFlow $ OnPremSuccess emailDomain config + runStep domain success@(OnPremFlow (OnPremSuccess emailDomain config)) = do + verifyOnPremConfig domain emailDomain config + pure success + + verifyOnPremConfig :: (HasCallStack) => String -> String -> Value -> App () + verifyOnPremConfig domain emailDomain config = + bindResponse (Brig.getDomainRegistrationFromEmail domain Versioned ("ruffy@" ++ emailDomain)) \resp -> do + resp.status `shouldMatchInt` 200 + resp.json %. "domain_redirect" `shouldMatch` (config %. "domain_redirect") + let backendUrl v = runMaybeT $ lookupFieldM v "backend" >>= flip lookupFieldM "config_url" + webappUrl v = runMaybeT $ lookupFieldM v "backend" >>= flip lookupFieldM "webapp_url" + backendUrl resp.json `shouldMatch` backendUrl config + webappUrl resp.json `shouldMatch` webappUrl config + + verifyTeamConfig :: (HasCallStack) => String -> String -> String -> App () + verifyTeamConfig domain tid emailDomain = do + bindResponse (BrigInternal.getDomainRegistration domain emailDomain) $ \resp -> do + resp.status `shouldMatchInt` 200 + resp.json %. "domain" `shouldMatch` emailDomain + resp.json %. "domain_redirect" `shouldMatch` "none" + resp.json %. "team_invite" `shouldMatch` "team" + resp.json %. "team" `shouldMatch` tid + + bindResponse (Brig.getDomainRegistrationFromEmail domain Versioned ("ruffy@" ++ emailDomain)) \resp -> do + resp.status `shouldMatchInt` 200 + resp.json %. "domain_redirect" `shouldMatch` "none" + + runAll :: (HasCallStack) => String -> DomainRegistrationTestCase -> App () + runAll domain success@(OnPremFlow (OnPremSuccess _ _)) = void $ runStep domain success + runAll domain success@(TeamFlow (TeamSuccess _ _)) = void $ runStep domain success + runAll domain inProgress = runAll domain =<< runStep domain inProgress + + mkDomainRedirectBackend :: String -> String -> Value + mkDomainRedirectBackend configUrl webappUrl = + object + [ "domain_redirect" .= "backend", + "backend" .= object ["config_url" .= configUrl, "webapp_url" .= webappUrl] + ] + + conf :: String -> Bool -> ServiceOverrides + conf db runMigration = + def + { brigCfg = setField "postgresMigration.domainRegistration" db, + backgroundWorkerCfg = + setField "postgresMigration.domainRegistration" db + >=> setField "migrateDomainRegistration" runMigration + } + + counterName :: String + counterName = "^wire_domain_registration_migration_finished" diff --git a/integration/test/Testlib/ModService.hs b/integration/test/Testlib/ModService.hs index a28f3505365..5dddcb56c4f 100644 --- a/integration/test/Testlib/ModService.hs +++ b/integration/test/Testlib/ModService.hs @@ -242,7 +242,8 @@ startDynamicBackend resource beOverrides = do gundeckCfg = setField "cassandra.keyspace" resource.berGundeckKeyspace, backgroundWorkerCfg = setField "cassandra.keyspace" resource.berGundeckKeyspace - >=> setField "cassandraGalley.keyspace" resource.berGalleyKeyspace, + >=> setField "cassandraGalley.keyspace" resource.berGalleyKeyspace + >=> setField "cassandraBrig.keyspace" resource.berBrigKeyspace, cannonCfg = setField "cassandra.keyspace" resource.berGundeckKeyspace } diff --git a/libs/wire-api/src/Wire/API/PostgresMarshall.hs b/libs/wire-api/src/Wire/API/PostgresMarshall.hs index 666b5b78c40..46a806c7fc7 100644 --- a/libs/wire-api/src/Wire/API/PostgresMarshall.hs +++ b/libs/wire-api/src/Wire/API/PostgresMarshall.hs @@ -36,12 +36,15 @@ import Data.Misc import Data.Profunctor import Data.Set qualified as Set import Data.Text qualified as Text +import Data.Text.Ascii qualified as Ascii import Data.Text.Encoding qualified as Text import Data.UUID import Data.Vector (Vector) import Data.Vector qualified as V import Hasql.Statement import Imports +import SAML2.WebSSO qualified as SAML +import Wire.API.EnterpriseLogin class PostgresMarshall db domain where postgresMarshall :: domain -> db @@ -538,6 +541,33 @@ instance PostgresMarshall Text Code.Key where instance PostgresMarshall Text Code.Value where postgresMarshall = Text.decodeUtf8 . toByteString' +instance PostgresMarshall ByteString HttpsUrl where + postgresMarshall = toByteString' + +instance PostgresMarshall ByteString Token where + postgresMarshall = (.unToken) + +instance PostgresMarshall Text DnsVerificationToken where + postgresMarshall = Ascii.toText . (.unDnsVerificationToken) + +instance PostgresMarshall Int32 DomainRedirectTag where + postgresMarshall = \case + NoneTag -> 1 + LockedTag -> 2 + SSOTag -> 3 + BackendTag -> 4 + NoRegistrationTag -> 5 + PreAuthorizedTag -> 6 + +instance PostgresMarshall Int32 TeamInviteTag where + postgresMarshall = \case + AllowedTag -> 1 + NotAllowedTag -> 2 + TeamTag -> 3 + +instance PostgresMarshall UUID SAML.IdPId where + postgresMarshall = SAML.fromIdPId + --- class PostgresUnmarshall db domain where @@ -869,6 +899,35 @@ instance PostgresUnmarshall Text Code.Key where instance PostgresUnmarshall Text Code.Value where postgresUnmarshall = mapLeft Text.pack . BSC.runParser BSC.parser . Text.encodeUtf8 +instance PostgresUnmarshall ByteString HttpsUrl where + postgresUnmarshall = first Text.pack . BSC.runParser BSC.parser + +instance PostgresUnmarshall ByteString Token where + postgresUnmarshall = Right . Token + +instance PostgresUnmarshall Text DnsVerificationToken where + postgresUnmarshall = first Text.pack . fmap DnsVerificationToken . Ascii.validate + +instance PostgresUnmarshall Int32 DomainRedirectTag where + postgresUnmarshall = \case + 1 -> Right NoneTag + 2 -> Right LockedTag + 3 -> Right SSOTag + 4 -> Right BackendTag + 5 -> Right NoRegistrationTag + 6 -> Right PreAuthorizedTag + n -> Left $ "Unexpected DomainRedirectTag value: " <> Text.pack (show n) + +instance PostgresUnmarshall Int32 TeamInviteTag where + postgresUnmarshall = \case + 1 -> Right AllowedTag + 2 -> Right NotAllowedTag + 3 -> Right TeamTag + n -> Left $ "Unexpected TeamInviteTag value: " <> Text.pack (show n) + +instance PostgresUnmarshall UUID SAML.IdPId where + postgresUnmarshall = Right . SAML.IdPId + --- lmapPG :: (PostgresMarshall db domain, Profunctor p) => p db x -> p domain x diff --git a/libs/wire-subsystems/postgres-migrations/20260420134603-domain_registration.sql b/libs/wire-subsystems/postgres-migrations/20260420134603-domain_registration.sql new file mode 100644 index 00000000000..9fd2e05998f --- /dev/null +++ b/libs/wire-subsystems/postgres-migrations/20260420134603-domain_registration.sql @@ -0,0 +1,27 @@ +CREATE TABLE domain_registration ( + domain text PRIMARY KEY, + authorized_team uuid, + domain_redirect integer, + team_invite integer, + idp_id uuid, + backend_url bytea, + team uuid, + dns_verification_token text, + ownership_token_hash bytea, + webapp_url bytea +); + +CREATE INDEX domain_registration_authorized_team_idx + ON domain_registration (authorized_team); + +CREATE TABLE domain_registration_challenge ( + id uuid PRIMARY KEY, + domain text NOT NULL, + challenge_token_hash bytea NOT NULL, + dns_verification_token text NOT NULL, + expires_at timestamptz NOT NULL +); + +-- index for deletes like `DELETE ... WHERE expires_at <= now()` +CREATE INDEX domain_registration_challenge_expires_at_idx + ON domain_registration_challenge (expires_at); diff --git a/libs/wire-subsystems/src/Wire/DomainRegistrationStore.hs b/libs/wire-subsystems/src/Wire/DomainRegistrationStore.hs index 489b4bc1ca3..ec6697dded8 100644 --- a/libs/wire-subsystems/src/Wire/DomainRegistrationStore.hs +++ b/libs/wire-subsystems/src/Wire/DomainRegistrationStore.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TemplateHaskell #-} -- This file is part of the Wire Server implementation. @@ -25,6 +26,11 @@ module Wire.DomainRegistrationStore lookup, lookupByTeam, delete, + DomainRegistrationRow, + upsertInternal, + lookupInternal, + lookupByTeamInternal, + deleteInternal, ) where @@ -33,9 +39,11 @@ import Data.ByteString.Conversion import Data.CaseInsensitive import Data.CaseInsensitive qualified as CI import Data.Domain as Domain +import Data.Hashable (hash) import Data.Id import Data.Misc import Data.Text as T +import Data.UUID (UUID) import Database.CQL.Protocol (Record (..), TupleType, recordInstance) import Imports hiding (lookup) import Polysemy @@ -45,6 +53,8 @@ import Polysemy.TinyLog qualified as Log import SAML2.WebSSO qualified as SAML import System.Logger.Message qualified as Log import Wire.API.EnterpriseLogin +import Wire.API.PostgresMarshall +import Wire.MigrationLock newtype DomainKey = DomainKey {unDomainKey :: CI Text} deriving stock (Eq, Ord, Show) @@ -61,6 +71,68 @@ instance Cql DomainKey where fromCql (CqlText txt) = pure . DomainKey . CI.mk $ txt fromCql _ = Left "DomainKey: Text expected" +instance PostgresMarshall Text DomainKey where + postgresMarshall = CI.foldedCase . unDomainKey + +instance PostgresUnmarshall Text DomainKey where + postgresUnmarshall = Right . DomainKey . CI.mk + +instance MigrationLockable DomainKey where + lockKey = fromIntegral . hash . CI.foldedCase . unDomainKey + lockScope = "domain_registration" + +type DomainRegistrationRow = + ( Text, + Maybe Int32, + Maybe Int32, + Maybe UUID, + Maybe ByteString, + Maybe UUID, + Maybe Text, + Maybe ByteString, + Maybe UUID, + Maybe ByteString + ) + +instance PostgresMarshall DomainRegistrationRow StoredDomainRegistration where + postgresMarshall StoredDomainRegistration {..} = + ( postgresMarshall domain, + postgresMarshall domainRedirect, + postgresMarshall teamInvite, + postgresMarshall idpId, + postgresMarshall backendUrl, + postgresMarshall team, + postgresMarshall dnsVerificationToken, + postgresMarshall authTokenHash, + postgresMarshall authorizedTeam, + postgresMarshall webappUrl + ) + +instance PostgresUnmarshall DomainRegistrationRow StoredDomainRegistration where + postgresUnmarshall + ( domain, + domainRedirect, + teamInvite, + idpId, + backendUrl, + team, + dnsVerificationToken, + authTokenHash, + authorizedTeam, + webappUrl + ) = + StoredDomainRegistration + <$> postgresUnmarshall domain + <*> postgresUnmarshall domainRedirect + <*> postgresUnmarshall teamInvite + <*> postgresUnmarshall idpId + <*> postgresUnmarshall backendUrl + <*> postgresUnmarshall team + <*> postgresUnmarshall dnsVerificationToken + <*> postgresUnmarshall authTokenHash + <*> postgresUnmarshall authorizedTeam + <*> postgresUnmarshall webappUrl + data StoredDomainRegistration = StoredDomainRegistration { domain :: DomainKey, domainRedirect :: Maybe DomainRedirectTag, @@ -83,6 +155,8 @@ data DomainRegistrationStore m a where LookupByTeamInternal :: TeamId -> DomainRegistrationStore m [StoredDomainRegistration] DeleteInternal :: DomainKey -> DomainRegistrationStore m () +makeSem ''DomainRegistrationStore + upsert :: (Member DomainRegistrationStore r) => DomainRegistration -> Sem r () upsert = send . UpsertInternal . toStored diff --git a/libs/wire-subsystems/src/Wire/DomainRegistrationStore/DualWrite.hs b/libs/wire-subsystems/src/Wire/DomainRegistrationStore/DualWrite.hs new file mode 100644 index 00000000000..435a4c418f8 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/DomainRegistrationStore/DualWrite.hs @@ -0,0 +1,59 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.DomainRegistrationStore.DualWrite + ( interpretDomainRegistrationStoreToCassandraAndPostgres, + ) +where + +import Cassandra (ClientState) +import Imports +import Polysemy +import Polysemy.Async +import Polysemy.Conc.Effect.Race +import Polysemy.Error +import Polysemy.Time +import Polysemy.TinyLog +import Wire.DomainRegistrationStore +import Wire.DomainRegistrationStore qualified as DomainRegistrationStore +import Wire.DomainRegistrationStore.Cassandra qualified as Cassandra +import Wire.DomainRegistrationStore.Postgres qualified as Postgres +import Wire.MigrationLock +import Wire.Postgres + +interpretDomainRegistrationStoreToCassandraAndPostgres :: + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Error MigrationLockError) r + ) => + ClientState -> + InterpreterFor DomainRegistrationStore r +interpretDomainRegistrationStoreToCassandraAndPostgres cs = interpret $ \case + UpsertInternal dr -> + withMigrationLocks LockShared (MilliSeconds 500) [dr.domain] $ do + Cassandra.interpretDomainRegistrationStoreToCassandra cs $ DomainRegistrationStore.upsertInternal dr + Postgres.interpretDomainRegistrationStoreToPostgres $ DomainRegistrationStore.upsertInternal dr + LookupInternal domain -> + Cassandra.interpretDomainRegistrationStoreToCassandra cs $ DomainRegistrationStore.lookupInternal domain + LookupByTeamInternal tid -> + Cassandra.interpretDomainRegistrationStoreToCassandra cs $ DomainRegistrationStore.lookupByTeamInternal tid + DeleteInternal domain -> + withMigrationLocks LockShared (MilliSeconds 500) [domain] $ do + Cassandra.interpretDomainRegistrationStoreToCassandra cs $ DomainRegistrationStore.deleteInternal domain + Postgres.interpretDomainRegistrationStoreToPostgres $ DomainRegistrationStore.deleteInternal domain diff --git a/libs/wire-subsystems/src/Wire/DomainRegistrationStore/Migration.hs b/libs/wire-subsystems/src/Wire/DomainRegistrationStore/Migration.hs new file mode 100644 index 00000000000..8bebc7aa2e0 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/DomainRegistrationStore/Migration.hs @@ -0,0 +1,189 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.DomainRegistrationStore.Migration + ( migrateDomainRegistrationsLoop, + ) +where + +import Cassandra +import Data.ByteString.Conversion +import Data.Conduit +import Data.Conduit.List qualified as C +import Data.Domain +import Data.Id +import Database.CQL.Protocol (Record (asRecord), TupleType) +import Hasql.Pool qualified as Hasql +import Imports hiding (lookup) +import Polysemy +import Polysemy.Async +import Polysemy.Conc (interpretRace) +import Polysemy.Conc.Effect.Race hiding (Timeout) +import Polysemy.Error +import Polysemy.Input +import Polysemy.State +import Polysemy.Time +import Polysemy.TinyLog +import Prometheus qualified +import System.Logger qualified as Log +import Util.Timeout +import Wire.API.EnterpriseLogin +import Wire.DomainRegistrationStore +import Wire.DomainRegistrationStore.Cassandra qualified as DomainRegistrationCassandra +import Wire.DomainRegistrationStore.Postgres qualified as DomainRegistrationPostgres +import Wire.DomainVerificationChallengeStore +import Wire.DomainVerificationChallengeStore.Postgres qualified as ChallengePostgres +import Wire.Migration +import Wire.MigrationLock +import Wire.Postgres +import Wire.Sem.Logger (mapLogger) +import Wire.Sem.Logger.TinyLog (loggerToTinyLog) + +type EffectStack = + [ State Int, + Input ClientState, + Input Hasql.Pool, + Async, + Race, + TinyLog, + Embed IO, + Final IO + ] + +migrateDomainRegistrationsLoop :: + MigrationOptions -> + ClientState -> + Hasql.Pool -> + Log.Logger -> + Prometheus.Counter -> + Prometheus.Counter -> + Prometheus.Counter -> + IO () +migrateDomainRegistrationsLoop migOpts cassClient pgPool logger migCounter migFinished migFailed = + migrationLoop + logger + "domain registrations" + migFinished + migFailed + (interpreter cassClient pgPool logger "domain registrations") + (migrateAllDomainRegistrations migOpts migCounter) + +interpreter :: ClientState -> Hasql.Pool -> Log.Logger -> ByteString -> Sem EffectStack a -> IO (Int, a) +interpreter cassClient pgPool logger name = + runFinal + . embedToFinal + . loggerToTinyLog logger + . mapLogger (Log.field "migration" (Log.val name) .) + . raiseUnder + . interpretRace + . asyncToIOFinal + . runInputConst pgPool + . runInputConst cassClient + . runState 0 + +migrateAllDomainRegistrations :: + ( Member (Input Hasql.Pool) r, + Member (Embed IO) r, + Member (Input ClientState) r, + Member TinyLog r, + Member (State Int) r, + Member Async r, + Member Race r + ) => + MigrationOptions -> + Prometheus.Counter -> + ConduitM () Void (Sem r) () +migrateAllDomainRegistrations migOpts migCounter = do + lift $ info $ Log.msg (Log.val "migrateAllDomainRegistrationChallenges") + withCount (paginateSem selectAllChallenges (paramsP LocalQuorum () migOpts.pageSize) x5) + .| logRetrievedPage migOpts.pageSize id + .| C.mapM_ (traverse_ (\row@(cid, _, _, _, _) -> handleErrors (toByteString' cid) (migrateDomainVerificationChallengeRow migCounter row))) + + lift $ info $ Log.msg (Log.val "migrateAllDomainRegistrations") + withCount (paginateSem selectAllRegistrations (paramsP LocalQuorum () migOpts.pageSize) x5) + .| logRetrievedPage migOpts.pageSize asRecord + .| C.mapM_ (traverse_ (\row -> handleRegistrationErrors (toByteString' (show row.domain)) (migrateDomainRegistrationRow migCounter row))) + +migrateDomainRegistrationRow :: + ( PGConstraints r, + Member (Input ClientState) r, + Member TinyLog r, + Member Async r, + Member (Error MigrationLockError) r, + Member Race r + ) => + Prometheus.Counter -> + StoredDomainRegistration -> + Sem r () +migrateDomainRegistrationRow migCounter row = do + void . withMigrationLocks LockExclusive (Seconds 10) [row.domain] $ do + isMigrated <- DomainRegistrationPostgres.exists row.domain + unless isMigrated $ do + cassClient <- input @ClientState + mCurrentRow <- + DomainRegistrationCassandra.interpretDomainRegistrationStoreToCassandra cassClient $ + lookupInternal row.domain + for_ mCurrentRow $ \currentRow -> do + DomainRegistrationPostgres.interpretDomainRegistrationStoreToPostgres $ upsertInternal currentRow + liftIO $ Prometheus.incCounter migCounter + +migrateDomainVerificationChallengeRow :: + (PGConstraints r) => + Prometheus.Counter -> + (ChallengeId, Domain, Token, DnsVerificationToken, Int32) -> + Sem r () +migrateDomainVerificationChallengeRow migCounter (cid, domain, challengeTokenHash, dnsVerificationToken, ttlSecs) = + when (ttlSecs > 0) $ do + let ttl = Timeout (fromIntegral ttlSecs) + row = + StoredDomainVerificationChallenge + { challengeId = cid, + domain = domain, + challengeTokenHash = challengeTokenHash, + dnsVerificationToken = dnsVerificationToken + } + ChallengePostgres.interpretDomainVerificationChallengeStoreToPostgres ttl $ insert row + liftIO $ Prometheus.incCounter migCounter + +selectAllRegistrations :: PrepQuery R () (TupleType StoredDomainRegistration) +selectAllRegistrations = + "SELECT domain, domain_redirect, team_invite, idp_id, backend_url, team, dns_verification_token, ownership_token_hash, authorized_team, webapp_url FROM domain_registration" + +selectAllChallenges :: PrepQuery R () (ChallengeId, Domain, Token, DnsVerificationToken, Int32) +selectAllChallenges = + "SELECT id, domain, challenge_token_hash, dns_verification_token, ttl(challenge_token_hash) FROM domain_registration_challenge" + +handleRegistrationErrors :: + ( Member (State Int) r, + Member TinyLog r + ) => + ByteString -> + (Sem (Error MigrationLockError : Error Hasql.UsageError : r) ()) -> + Sem r () +handleRegistrationErrors key action = do + eithErr <- runError (runError action) + case eithErr of + Right (Right _) -> pure () + Right (Left e) -> logError (show e) + Left e -> logError (show e) + where + logError e = do + warn $ + Log.msg (Log.val "error occurred during migration") + . Log.field "key" (show key) + . Log.field "error" e + modify (+ 1) diff --git a/libs/wire-subsystems/src/Wire/DomainRegistrationStore/Postgres.hs b/libs/wire-subsystems/src/Wire/DomainRegistrationStore/Postgres.hs new file mode 100644 index 00000000000..179ac74b41a --- /dev/null +++ b/libs/wire-subsystems/src/Wire/DomainRegistrationStore/Postgres.hs @@ -0,0 +1,125 @@ +{-# LANGUAGE RecordWildCards #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.DomainRegistrationStore.Postgres + ( interpretDomainRegistrationStoreToPostgres, + exists, + ) +where + +import Data.Id (TeamId) +import Data.UUID (UUID) +import Data.Vector (Vector) +import Data.Vector qualified as Vector +import Hasql.Statement qualified as Hasql +import Hasql.TH +import Imports hiding (lookup) +import Polysemy +import Wire.API.PostgresMarshall +import Wire.DomainRegistrationStore +import Wire.Postgres + +interpretDomainRegistrationStoreToPostgres :: + (PGConstraints r) => + InterpreterFor DomainRegistrationStore r +interpretDomainRegistrationStoreToPostgres = interpret $ \case + UpsertInternal dr -> upsertImpl dr + LookupInternal domain -> lookupImpl domain + LookupByTeamInternal tid -> lookupByTeamInternalImpl tid + DeleteInternal domain -> deleteImpl domain + +upsertImpl :: (PGConstraints r) => StoredDomainRegistration -> Sem r () +upsertImpl dr = + runStatement dr upsertStatement + where + upsertStatement :: Hasql.Statement StoredDomainRegistration () + upsertStatement = + lmapPG + [resultlessStatement|INSERT INTO domain_registration + (domain, domain_redirect, team_invite, idp_id, backend_url, + team, dns_verification_token, ownership_token_hash, authorized_team, webapp_url) + VALUES + ($1 :: text, $2 :: int?, $3 :: int?, $4 :: uuid?, $5 :: bytea?, + $6 :: uuid?, $7 :: text?, $8 :: bytea?, $9 :: uuid?, $10 :: bytea?) + ON CONFLICT (domain) DO UPDATE + SET domain_redirect = ($2 :: int?), + team_invite = ($3 :: int?), + idp_id = ($4 :: uuid?), + backend_url = ($5 :: bytea?), + team = ($6 :: uuid?), + dns_verification_token = ($7 :: text?), + ownership_token_hash = ($8 :: bytea?), + authorized_team = ($9 :: uuid?), + webapp_url = ($10 :: bytea?) + |] + +lookupImpl :: (PGConstraints r) => DomainKey -> Sem r (Maybe StoredDomainRegistration) +lookupImpl domain = + runStatement domain selectStatement + where + selectStatement :: Hasql.Statement DomainKey (Maybe StoredDomainRegistration) + selectStatement = + dimapPG @Text @DomainKey @(Maybe DomainRegistrationRow) @(Maybe StoredDomainRegistration) $ + [maybeStatement|SELECT (domain :: text), (domain_redirect :: int?), (team_invite :: int?), + (idp_id :: uuid?), (backend_url :: bytea?), (team :: uuid?), + (dns_verification_token :: text?), (ownership_token_hash :: bytea?), + (authorized_team :: uuid?), (webapp_url :: bytea?) + FROM domain_registration + WHERE domain = ($1 :: text) + |] + +lookupByTeamInternalImpl :: (PGConstraints r) => TeamId -> Sem r [StoredDomainRegistration] +lookupByTeamInternalImpl tid = do + rows <- runStatement tid selectByTeamStatement + pure $ Vector.toList rows + where + selectByTeamStatement :: Hasql.Statement TeamId (Vector StoredDomainRegistration) + selectByTeamStatement = + dimapPG @UUID @TeamId @(Vector DomainRegistrationRow) @(Vector StoredDomainRegistration) $ + [vectorStatement|SELECT (domain :: text), (domain_redirect :: int?), (team_invite :: int?), + (idp_id :: uuid?), (backend_url :: bytea?), (team :: uuid?), + (dns_verification_token :: text?), (ownership_token_hash :: bytea?), + (authorized_team :: uuid?), (webapp_url :: bytea?) + FROM domain_registration + WHERE authorized_team = ($1 :: uuid) + |] + +deleteImpl :: (PGConstraints r) => DomainKey -> Sem r () +deleteImpl domain = + runStatement domain deleteStatement + where + deleteStatement :: Hasql.Statement DomainKey () + deleteStatement = + lmapPG + [resultlessStatement|DELETE FROM domain_registration + WHERE domain = ($1 :: text) + |] + +exists :: (PGConstraints r) => DomainKey -> Sem r Bool +exists domain = + runStatement domain existsStatement + where + existsStatement :: Hasql.Statement DomainKey Bool + existsStatement = + lmapPG @Text @DomainKey + [singletonStatement|SELECT EXISTS ( + SELECT 1 + FROM domain_registration + WHERE domain = ($1 :: text) + ) :: bool|] diff --git a/libs/wire-subsystems/src/Wire/DomainVerificationChallengeStore/Cassandra.hs b/libs/wire-subsystems/src/Wire/DomainVerificationChallengeStore/Cassandra.hs index 44ed929a560..bf32c931417 100644 --- a/libs/wire-subsystems/src/Wire/DomainVerificationChallengeStore/Cassandra.hs +++ b/libs/wire-subsystems/src/Wire/DomainVerificationChallengeStore/Cassandra.hs @@ -27,34 +27,31 @@ import Data.Id import Database.CQL.Protocol (Record (..), TupleType, asTuple) import Imports hiding (lookup) import Polysemy -import Polysemy.Embed import Polysemy.Input import Util.Timeout import Wire.DomainVerificationChallengeStore +import Wire.Util (embedClientInput) interpretDomainVerificationChallengeStoreToCassandra :: forall r. - (Member (Embed IO) r) => - ClientState -> + ( Member (Embed IO) r, + Member (Input ClientState) r + ) => Timeout -> InterpreterFor DomainVerificationChallengeStore r -interpretDomainVerificationChallengeStoreToCassandra casClient ttl = - runInputConst ttl - . runEmbedded (runClient casClient) - . interpret - ( \case - Insert challenge -> insertImpl challenge - Lookup challengeId -> lookupImpl challengeId - Delete challengeId -> deleteImpl challengeId - ) - . raiseUnder2 +interpretDomainVerificationChallengeStoreToCassandra ttl = + interpret + ( \case + Insert challenge -> embedClientInput $ insertImpl ttl challenge + Lookup challengeId -> embedClientInput $ lookupImpl challengeId + Delete challengeId -> embedClientInput $ deleteImpl challengeId + ) insertImpl :: - (Member (Embed Client) r, Member (Input Timeout) r) => + Timeout -> StoredDomainVerificationChallenge -> - Sem r () -insertImpl challenge = do - ttl <- input + Client () +insertImpl ttl challenge = do let q :: PrepQuery W (TupleType StoredDomainVerificationChallenge) () q = fromString $ @@ -62,22 +59,20 @@ insertImpl challenge = do \ (id, domain, challenge_token_hash, dns_verification_token)\ \ VALUES (?,?,?,?) using ttl " <> show (round (nominalDiffTimeToSeconds (timeoutDiff ttl)) :: Integer) - embed $ retry x5 $ write q (params LocalQuorum (asTuple challenge)) + retry x5 $ write q (params LocalQuorum (asTuple challenge)) lookupImpl :: - (Member (Embed Client) r) => ChallengeId -> - Sem r (Maybe StoredDomainVerificationChallenge) + Client (Maybe StoredDomainVerificationChallenge) lookupImpl challengeId = - embed $ - fmap asRecord - <$> retry x1 (query1 cqlSelect (params LocalQuorum (Identity challengeId))) + fmap asRecord + <$> retry x1 (query1 cqlSelect (params LocalQuorum (Identity challengeId))) cqlSelect :: PrepQuery R (Identity ChallengeId) (TupleType StoredDomainVerificationChallenge) cqlSelect = "SELECT id, domain, challenge_token_hash, dns_verification_token FROM domain_registration_challenge WHERE id = ?" -deleteImpl :: (Member (Embed Client) r) => ChallengeId -> Sem r () -deleteImpl challengeId = embed $ retry x5 $ write cqlDelete (params LocalQuorum (Identity challengeId)) +deleteImpl :: ChallengeId -> Client () +deleteImpl challengeId = retry x5 $ write cqlDelete (params LocalQuorum (Identity challengeId)) cqlDelete :: PrepQuery W (Identity ChallengeId) () cqlDelete = "DELETE FROM domain_registration_challenge WHERE id = ?" diff --git a/libs/wire-subsystems/src/Wire/DomainVerificationChallengeStore/DualWrite.hs b/libs/wire-subsystems/src/Wire/DomainVerificationChallengeStore/DualWrite.hs new file mode 100644 index 00000000000..333ab4d9601 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/DomainVerificationChallengeStore/DualWrite.hs @@ -0,0 +1,49 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.DomainVerificationChallengeStore.DualWrite + ( interpretDomainVerificationChallengeStoreToCassandraAndPostgres, + ) +where + +import Cassandra +import Imports +import Polysemy +import Polysemy.Input +import Util.Timeout +import Wire.DomainVerificationChallengeStore +import Wire.DomainVerificationChallengeStore qualified as DomainVerificationChallengeStore +import Wire.DomainVerificationChallengeStore.Cassandra qualified as Cassandra +import Wire.DomainVerificationChallengeStore.Postgres qualified as Postgres +import Wire.Postgres + +-- | Cassandra is the source of truth during migration; writes are mirrored to Postgres. +interpretDomainVerificationChallengeStoreToCassandraAndPostgres :: + ( Member (Input ClientState) r, + PGConstraints r + ) => + Timeout -> + InterpreterFor DomainVerificationChallengeStore r +interpretDomainVerificationChallengeStoreToCassandraAndPostgres to = interpret $ \case + Insert challenge -> do + Cassandra.interpretDomainVerificationChallengeStoreToCassandra to $ DomainVerificationChallengeStore.insert challenge + Postgres.interpretDomainVerificationChallengeStoreToPostgres to $ DomainVerificationChallengeStore.insert challenge + Lookup challengeId -> + Cassandra.interpretDomainVerificationChallengeStoreToCassandra to $ DomainVerificationChallengeStore.lookup challengeId + Delete challengeId -> do + Cassandra.interpretDomainVerificationChallengeStoreToCassandra to $ DomainVerificationChallengeStore.delete challengeId + Postgres.interpretDomainVerificationChallengeStoreToPostgres to $ DomainVerificationChallengeStore.delete challengeId diff --git a/libs/wire-subsystems/src/Wire/DomainVerificationChallengeStore/Postgres.hs b/libs/wire-subsystems/src/Wire/DomainVerificationChallengeStore/Postgres.hs new file mode 100644 index 00000000000..c06d1f9c227 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/DomainVerificationChallengeStore/Postgres.hs @@ -0,0 +1,100 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.DomainVerificationChallengeStore.Postgres + ( interpretDomainVerificationChallengeStoreToPostgres, + ) +where + +import Data.Domain +import Data.Id +import Hasql.Statement qualified as Hasql +import Hasql.TH +import Imports hiding (lookup) +import Polysemy +import Util.Timeout +import Wire.API.EnterpriseLogin +import Wire.API.PostgresMarshall +import Wire.DomainVerificationChallengeStore +import Wire.Postgres + +interpretDomainVerificationChallengeStoreToPostgres :: + forall r. + (PGConstraints r) => + Timeout -> + InterpreterFor DomainVerificationChallengeStore r +interpretDomainVerificationChallengeStoreToPostgres ttl = + interpret $ + \case + Insert challenge -> insertImpl ttl challenge + Lookup challengeId -> lookupImpl challengeId + Delete challengeId -> deleteImpl challengeId + +deleteImpl :: (PGConstraints r) => ChallengeId -> Sem r () +deleteImpl cid = + runStatement cid deleteStmt + where + deleteStmt :: Hasql.Statement ChallengeId () + deleteStmt = + lmapPG + [resultlessStatement|DELETE FROM domain_registration_challenge + WHERE id = ($1 :: uuid) + |] + +lookupImpl :: (PGConstraints r) => ChallengeId -> Sem r (Maybe StoredDomainVerificationChallenge) +lookupImpl cid = do + mRow <- runStatement cid select + pure $ mk <$> mRow + where + mk :: (Token, DnsVerificationToken, Domain) -> StoredDomainVerificationChallenge + mk (hash, token, domain) = + StoredDomainVerificationChallenge + { challengeId = cid, + domain = domain, + challengeTokenHash = hash, + dnsVerificationToken = token + } + + select :: Hasql.Statement ChallengeId (Maybe (Token, DnsVerificationToken, Domain)) + select = + dimapPG + [maybeStatement|SELECT + (challenge_token_hash :: bytea), + (dns_verification_token :: text), + (domain :: text) + FROM domain_registration_challenge + WHERE id = ($1 :: uuid) AND expires_at > now () + |] + +insertImpl :: (PGConstraints r) => Timeout -> StoredDomainVerificationChallenge -> Sem r () +insertImpl ttl ch = + runStatement (ch.challengeId, ch.domain, ch.challengeTokenHash, ch.dnsVerificationToken, ttlSecs) insertStmt + where + ttlSecs = round (nominalDiffTimeToSeconds (timeoutDiff ttl)) :: Int32 + insertStmt :: Hasql.Statement (ChallengeId, Domain, Token, DnsVerificationToken, Int32) () + insertStmt = + lmapPG + [resultlessStatement|INSERT INTO domain_registration_challenge + (id, domain, challenge_token_hash, dns_verification_token, expires_at) + VALUES + ($1 :: uuid, $2 :: text, $3 :: bytea, $4 :: text, now() + make_interval(secs => $5 :: int)) + ON CONFLICT (id) DO UPDATE + SET domain = ($2 :: text), + challenge_token_hash = ($3 :: bytea), + dns_verification_token = ($4 :: text), + expires_at = now() + make_interval(secs => $5 :: int) + |] diff --git a/libs/wire-subsystems/src/Wire/MigrationLock.hs b/libs/wire-subsystems/src/Wire/MigrationLock.hs index 140d7342bba..a1e18b5099d 100644 --- a/libs/wire-subsystems/src/Wire/MigrationLock.hs +++ b/libs/wire-subsystems/src/Wire/MigrationLock.hs @@ -30,7 +30,7 @@ import Hasql.Statement qualified as Hasql import Hasql.TH import Imports import Network.HTTP.Types.Status (status500) -import Network.Wai.Utilities.Error qualified as WaiError +import Network.Wai.Utilities.Error qualified as Wai import Network.Wai.Utilities.JSONResponse import Polysemy import Polysemy.Async @@ -43,6 +43,7 @@ import Polysemy.TinyLog qualified as TinyLog import System.Logger.Message qualified as Log import Wire.API.Error import Wire.API.PostgresMarshall +import Wire.Error import Wire.Postgres class MigrationLockable a where @@ -62,7 +63,13 @@ data MigrationLockError = TimedOutAcquiringLock deriving (Show) instance APIError MigrationLockError where - toResponse _ = waiErrorToJSONResponse $ WaiError.mkError status500 "internal-server-error" "Internal Server Error" + toResponse = waiErrorToJSONResponse . migrationLockErrorToWai + +migrationLockErrorToHttpError :: MigrationLockError -> HttpError +migrationLockErrorToHttpError = StdError . migrationLockErrorToWai + +migrationLockErrorToWai :: MigrationLockError -> Wai.Error +migrationLockErrorToWai _ = Wai.mkError status500 "internal-server-error" "Internal Server Error" withMigrationLocks :: forall x a u r. diff --git a/libs/wire-subsystems/src/Wire/PostgresMigrationOpts.hs b/libs/wire-subsystems/src/Wire/PostgresMigrationOpts.hs index df635d14530..f02ade14b9b 100644 --- a/libs/wire-subsystems/src/Wire/PostgresMigrationOpts.hs +++ b/libs/wire-subsystems/src/Wire/PostgresMigrationOpts.hs @@ -43,7 +43,8 @@ instance FromJSON StorageLocation where data PostgresMigrationOpts = PostgresMigrationOpts { conversation :: StorageLocation, conversationCodes :: StorageLocation, - teamFeatures :: StorageLocation + teamFeatures :: StorageLocation, + domainRegistration :: StorageLocation } deriving (Show) @@ -53,3 +54,4 @@ instance FromJSON PostgresMigrationOpts where <$> o .: "conversation" <*> o .: "conversationCodes" <*> o .: "teamFeatures" + <*> o .: "domainRegistration" diff --git a/libs/wire-subsystems/wire-subsystems.cabal b/libs/wire-subsystems/wire-subsystems.cabal index dd2e897a6c3..39886cb2de3 100644 --- a/libs/wire-subsystems/wire-subsystems.cabal +++ b/libs/wire-subsystems/wire-subsystems.cabal @@ -305,8 +305,13 @@ library Wire.DeleteQueue.InMemory Wire.DomainRegistrationStore Wire.DomainRegistrationStore.Cassandra + Wire.DomainRegistrationStore.DualWrite + Wire.DomainRegistrationStore.Migration + Wire.DomainRegistrationStore.Postgres Wire.DomainVerificationChallengeStore Wire.DomainVerificationChallengeStore.Cassandra + Wire.DomainVerificationChallengeStore.DualWrite + Wire.DomainVerificationChallengeStore.Postgres Wire.EmailSending Wire.EmailSending.SES Wire.EmailSending.SMTP diff --git a/postgres-schema.sql b/postgres-schema.sql index 070828aa351..36b3260dfcd 100644 --- a/postgres-schema.sql +++ b/postgres-schema.sql @@ -9,8 +9,8 @@ \restrict 79bbfb4630959c48307653a5cd3d83f2582b3c2210f75f10d79e3ebf0015620 --- Dumped from database version 17.7 --- Dumped by pg_dump version 17.7 +-- Dumped from database version 17.9 +-- Dumped by pg_dump version 17.9 SET statement_timeout = 0; SET lock_timeout = 0; @@ -178,6 +178,41 @@ CREATE TABLE public.conversation_out_of_sync ( ALTER TABLE public.conversation_out_of_sync OWNER TO "wire-server"; +-- +-- Name: domain_registration; Type: TABLE; Schema: public; Owner: wire-server +-- + +CREATE TABLE public.domain_registration ( + domain text NOT NULL, + authorized_team uuid, + domain_redirect integer, + team_invite integer, + idp_id uuid, + backend_url bytea, + team uuid, + dns_verification_token text, + ownership_token_hash bytea, + webapp_url bytea +); + + +ALTER TABLE public.domain_registration OWNER TO "wire-server"; + +-- +-- Name: domain_registration_challenge; Type: TABLE; Schema: public; Owner: wire-server +-- + +CREATE TABLE public.domain_registration_challenge ( + id uuid NOT NULL, + domain text NOT NULL, + challenge_token_hash bytea NOT NULL, + dns_verification_token text NOT NULL, + expires_at timestamp with time zone NOT NULL +); + + +ALTER TABLE public.domain_registration_challenge OWNER TO "wire-server"; + -- -- Name: local_conversation_remote_member; Type: TABLE; Schema: public; Owner: wire-server -- @@ -393,6 +428,22 @@ ALTER TABLE ONLY public.conversation ADD CONSTRAINT conversation_pkey PRIMARY KEY (id); +-- +-- Name: domain_registration_challenge domain_registration_challenge_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server +-- + +ALTER TABLE ONLY public.domain_registration_challenge + ADD CONSTRAINT domain_registration_challenge_pkey PRIMARY KEY (id); + + +-- +-- Name: domain_registration domain_registration_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server +-- + +ALTER TABLE ONLY public.domain_registration + ADD CONSTRAINT domain_registration_pkey PRIMARY KEY (domain); + + -- -- Name: local_conversation_remote_member local_conversation_remote_member_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server -- @@ -522,6 +573,20 @@ CREATE INDEX conversation_team_group_type_lower_name_id_idx ON public.conversati CREATE INDEX conversation_team_idx ON public.conversation USING btree (team); +-- +-- Name: domain_registration_authorized_team_idx; Type: INDEX; Schema: public; Owner: wire-server +-- + +CREATE INDEX domain_registration_authorized_team_idx ON public.domain_registration USING btree (authorized_team); + + +-- +-- Name: domain_registration_challenge_expires_at_idx; Type: INDEX; Schema: public; Owner: wire-server +-- + +CREATE INDEX domain_registration_challenge_expires_at_idx ON public.domain_registration_challenge USING btree (expires_at); + + -- -- Name: idx_meetings_conversation; Type: INDEX; Schema: public; Owner: wire-server -- diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml index eaee5a414c4..fa398766188 100644 --- a/services/background-worker/background-worker.integration.yaml +++ b/services/background-worker/background-worker.integration.yaml @@ -57,6 +57,7 @@ migrateConversationsOptions: parallelism: 2 migrateConversationCodes: false migrateTeamFeatures: false +migrateDomainRegistration: false # Background jobs consumer configuration for integration backgroundJobs: @@ -68,3 +69,4 @@ postgresMigration: conversation: postgresql conversationCodes: postgresql teamFeatures: postgresql + domainRegistration: postgresql diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index e89ed926f43..315bea5bd3b 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -71,6 +71,13 @@ run opts galleyOpts = do withNamedLogger "migrate-team-features" $ Migrations.teamFeatures (MigrationOptions 1000 1) else pure $ pure () + cleanupDomainRegistrationMigration <- + if opts.migrateDomainRegistration + then + runAppT env $ + withNamedLogger "migrate-domain-registration" $ + Migrations.domainRegistration (MigrationOptions 1000 1) + else pure $ pure () cleanupJobs <- runAppT env $ withNamedLogger "background-job-consumer" $ @@ -78,12 +85,13 @@ run opts galleyOpts = do let cleanup = void $ runConcurrently $ - (,,,,,) + (,,,,,,) <$> Concurrently cleanupDeadUserNotifWatcher <*> Concurrently cleanupBackendNotifPusher <*> Concurrently cleanupConvMigration <*> Concurrently cleanUpConvCodesMigration <*> Concurrently cleanupTeamFeaturesMigration + <*> Concurrently cleanupDomainRegistrationMigration <*> Concurrently cleanupJobs let server = defaultServer (T.unpack opts.backgroundWorker.host) opts.backgroundWorker.port env.logger diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index c616d1e5a4e..2d1078fe8f1 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -50,6 +50,7 @@ data Opts = Opts migrateConversationsOptions :: !MigrationOptions, migrateConversationCodes :: !Bool, migrateTeamFeatures :: !Bool, + migrateDomainRegistration :: !Bool, backgroundJobs :: BackgroundJobsConfig } deriving (Show, Generic) diff --git a/services/background-worker/src/Wire/PostgresMigrations.hs b/services/background-worker/src/Wire/PostgresMigrations.hs index 541716d0aec..ea8212a9d35 100644 --- a/services/background-worker/src/Wire/PostgresMigrations.hs +++ b/services/background-worker/src/Wire/PostgresMigrations.hs @@ -25,6 +25,7 @@ import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Util import Wire.CodeStore.Migration import Wire.ConversationStore.Migration +import Wire.DomainRegistrationStore.Migration import Wire.Migration (MigrationOptions) import Wire.TeamFeatureStore.Migration @@ -84,3 +85,20 @@ teamFeatures migOpts = do pure $ do Log.info logger $ Log.msg (Log.val "cancelling team features migration") cancel migrationLoop + +domainRegistration :: MigrationOptions -> AppT IO CleanupAction +domainRegistration migOpts = do + cassClient <- asks (.cassandraBrig) + pgPool <- asks (.hasqlPool) + logger <- asks (.logger) + Log.info logger $ Log.msg (Log.val "starting domain registration migration") + count <- register $ counter $ Prometheus.Info "wire_domain_registration_migrated_to_pg" "Number of domain registration rows migrated to Postgresql" + finished <- register $ counter $ Prometheus.Info "wire_domain_registration_migration_finished" "Whether the domain registration migration to Postgresql is finished successfully" + failed <- register $ counter $ Prometheus.Info "wire_domain_registration_migration_failed" "Whether the domain registration migration to Postgresql has failed" + + migrationLoop <- async . lift $ migrateDomainRegistrationsLoop migOpts cassClient pgPool logger count finished failed + + Log.info logger $ Log.msg (Log.val "started domain registration migration") + pure $ do + Log.info logger $ Log.msg (Log.val "cancelling domain registration migration") + cancel migrationLoop diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index a3730bdaf27..1b4715d07a1 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -366,7 +366,8 @@ spec = do PostgresMigrationOpts { conversation = CassandraStorage, conversationCodes = CassandraStorage, - teamFeatures = CassandraStorage + teamFeatures = CassandraStorage, + domainRegistration = CassandraStorage } gundeckEndpoint = undefined brigEndpoint = undefined @@ -417,7 +418,8 @@ spec = do PostgresMigrationOpts { conversation = CassandraStorage, conversationCodes = CassandraStorage, - teamFeatures = CassandraStorage + teamFeatures = CassandraStorage, + domainRegistration = CassandraStorage } gundeckEndpoint = undefined brigEndpoint = undefined diff --git a/services/background-worker/test/Test/Wire/Util.hs b/services/background-worker/test/Test/Wire/Util.hs index 6aa6afa8c91..a3e23d4ea56 100644 --- a/services/background-worker/test/Test/Wire/Util.hs +++ b/services/background-worker/test/Test/Wire/Util.hs @@ -44,7 +44,8 @@ testEnv = do PostgresMigrationOpts { conversation = CassandraStorage, conversationCodes = CassandraStorage, - teamFeatures = CassandraStorage + teamFeatures = CassandraStorage, + domainRegistration = CassandraStorage } statuses <- newIORef mempty backendNotificationMetrics <- mkBackendNotificationMetrics diff --git a/services/brig/brig.integration.yaml b/services/brig/brig.integration.yaml index fc23b069f74..e59957d04a6 100644 --- a/services/brig/brig.integration.yaml +++ b/services/brig/brig.integration.yaml @@ -171,6 +171,12 @@ turn: configTTL: 3600 tokenTTL: 21600 +postgresMigration: + conversation: postgresql + conversationCodes: postgresql + teamFeatures: postgresql + domainRegistration: postgresql + optSettings: setActivationTimeout: 4 setVerificationTimeout: 4 diff --git a/services/brig/src/Brig/App.hs b/services/brig/src/Brig/App.hs index d110b47f644..1e50c5179ed 100644 --- a/services/brig/src/Brig/App.hs +++ b/services/brig/src/Brig/App.hs @@ -73,6 +73,7 @@ module Brig.App enableSFTFederationLens, rateLimitEnvLens, amqpJobsPublisherChannelLens, + postgresMigrationLens, initZAuth, initLogger, initPostgresPool, @@ -167,6 +168,7 @@ import Wire.EmailSending.SMTP qualified as SMTP import Wire.EmailSubsystem.Template (Localised, TemplateBranding, forLocale) import Wire.EmailSubsystem.Templates.User import Wire.ExternalAccess.External +import Wire.PostgresMigrationOpts import Wire.RateLimit.Interpreter import Wire.SessionStore import Wire.SessionStore.Cassandra @@ -217,7 +219,8 @@ data Env = Env disabledVersions :: Set Version, enableSFTFederation :: Maybe Bool, rateLimitEnv :: RateLimitEnv, - amqpJobsPublisherChannel :: MVar Q.Channel + amqpJobsPublisherChannel :: MVar Q.Channel, + postgresMigration :: PostgresMigrationOpts } makeLensesWith (lensRules & lensField .~ suffixNamer) ''Env @@ -314,7 +317,8 @@ newEnv opts = do disabledVersions = allDisabledVersions, enableSFTFederation = opts.multiSFT, rateLimitEnv, - amqpJobsPublisherChannel + amqpJobsPublisherChannel, + postgresMigration = opts.postgresMigration } where emailConn _ (Opt.EmailAWS aws) = pure (Just aws, Nothing) diff --git a/services/brig/src/Brig/CanonicalInterpreter.hs b/services/brig/src/Brig/CanonicalInterpreter.hs index 270f1a7affa..06eb36f10ff 100644 --- a/services/brig/src/Brig/CanonicalInterpreter.hs +++ b/services/brig/src/Brig/CanonicalInterpreter.hs @@ -81,8 +81,12 @@ import Wire.ClientSubsystem.Interpreter import Wire.DeleteQueue import Wire.DomainRegistrationStore import Wire.DomainRegistrationStore.Cassandra +import Wire.DomainRegistrationStore.DualWrite +import Wire.DomainRegistrationStore.Postgres (interpretDomainRegistrationStoreToPostgres) import Wire.DomainVerificationChallengeStore import Wire.DomainVerificationChallengeStore.Cassandra +import Wire.DomainVerificationChallengeStore.DualWrite (interpretDomainVerificationChallengeStoreToCassandraAndPostgres) +import Wire.DomainVerificationChallengeStore.Postgres (interpretDomainVerificationChallengeStoreToPostgres) import Wire.EmailSending import Wire.EmailSending.SES import Wire.EmailSending.SMTP @@ -107,6 +111,7 @@ import Wire.IndexedUserStore import Wire.IndexedUserStore.ElasticSearch import Wire.InvitationStore (InvitationStore) import Wire.InvitationStore.Cassandra (interpretInvitationStoreToCassandra) +import Wire.MigrationLock import Wire.NotificationSubsystem import Wire.NotificationSubsystem.Interpreter (defaultNotificationSubsystemConfig, runNotificationSubsystemGundeck) import Wire.ParseException @@ -114,6 +119,7 @@ import Wire.PasswordResetCodeStore (PasswordResetCodeStore) import Wire.PasswordResetCodeStore.Cassandra (interpretClientToIO, passwordResetCodeStoreToCassandra) import Wire.PasswordStore (PasswordStore) import Wire.PasswordStore.Cassandra (interpretPasswordStore) +import Wire.PostgresMigrationOpts import Wire.PropertyStore import Wire.PropertyStore.Cassandra import Wire.PropertySubsystem @@ -200,10 +206,13 @@ type BrigLowerLevelEffects = BackgroundJobsPublisher, RateLimit, UserGroupStore, + DomainRegistrationStore, + DomainVerificationChallengeStore, Error AppSubsystemError, Error TeamCollaboratorsError, Error UsageError, Error EnterpriseLoginSubsystemError, + Error MigrationLockError, Error UserSubsystemError, Error UserGroupSubsystemError, Error TeamInvitationSubsystemError, @@ -216,8 +225,6 @@ type BrigLowerLevelEffects = ErrorS 'TeamNotFound, Error Wai.Error, Wire.FederationAPIAccess.FederationAPIAccess Wire.API.Federation.Client.FederatorClient, - DomainVerificationChallengeStore, - DomainRegistrationStore, CryptoSign, HashPassword, ClientStore, @@ -232,6 +239,7 @@ type BrigLowerLevelEffects = PropertyStore, SFT, ConnectionStore InternalPaging, + Input Cas.ClientState, Input Hasql.Pool, Input AppSubsystemConfig, Input UserSubsystemConfig, @@ -387,6 +395,15 @@ runBrigToIO e (AppT ma) = do local = localUnit, requestId = e.requestId } + domainRegistrationStore = case e.postgresMigration.domainRegistration of + CassandraStorage -> interpretDomainRegistrationStoreToCassandra e.casClient + PostgresqlStorage -> interpretDomainRegistrationStoreToPostgres + MigrationToPostgresql -> interpretDomainRegistrationStoreToCassandraAndPostgres e.casClient + + domainVerificationChallengeStore = case e.postgresMigration.domainRegistration of + CassandraStorage -> interpretDomainVerificationChallengeStoreToCassandra e.settings.challengeTTL + PostgresqlStorage -> interpretDomainVerificationChallengeStoreToPostgres e.settings.challengeTTL + MigrationToPostgresql -> interpretDomainVerificationChallengeStoreToCassandraAndPostgres e.settings.challengeTTL ( either throwM pure <=< ( runFinal @@ -426,6 +443,7 @@ runBrigToIO e (AppT ma) = do . runInputConst userSubsystemConfig . runInputConst appSubsystemConfig . runInputConst e.hasqlPool + . runInputConst e.casClient . connectionStoreToCassandra . interpretSFT e.httpManager . interpretPropertyStoreCassandra e.casClient @@ -440,8 +458,6 @@ runBrigToIO e (AppT ma) = do . interpretClientStoreCassandra clientStoreCassandraEnv . runHashPassword e.settings.passwordHashingOptions . runCryptoSign - . interpretDomainRegistrationStoreToCassandra e.casClient - . interpretDomainVerificationChallengeStoreToCassandra e.casClient e.settings.challengeTTL . interpretFederationAPIAccess federationApiAccessConfig . mapError StdError -- Wai.Error . mapError (const $ errorToWai @'TeamNotFound) -- ErrorS 'TeamNotFound @@ -454,10 +470,13 @@ runBrigToIO e (AppT ma) = do . mapError teamInvitationErrorToHttpError . mapError userGroupSubsystemErrorToHttpError . mapError userSubsystemErrorToHttpError + . mapError migrationLockErrorToHttpError . mapError enterpriseLoginSubsystemErrorToHttpError . mapError postgresUsageErrorToHttpError . mapError teamCollaboratorsSubsystemErrorToHttpError . mapError appSubsystemErrorToHttpError + . domainVerificationChallengeStore + . domainRegistrationStore . interpretUserGroupStoreToPostgres . interpretRateLimit e.rateLimitEnv . interpretBackgroundJobsPublisherRabbitMQ e.requestId e.amqpJobsPublisherChannel diff --git a/services/brig/src/Brig/Options.hs b/services/brig/src/Brig/Options.hs index cd2ae315b0b..da75b90138a 100644 --- a/services/brig/src/Brig/Options.hs +++ b/services/brig/src/Brig/Options.hs @@ -59,6 +59,7 @@ import Wire.AuthenticationSubsystem.Config (ZAuthSettings) import Wire.AuthenticationSubsystem.Cookie.Limit import Wire.EmailSending.SMTP (SMTPConnType (..)) import Wire.EmailSubsystem.Template (TeamOpts) +import Wire.PostgresMigrationOpts import Wire.RateLimit.Interpreter data ElasticSearchOpts = ElasticSearchOpts @@ -382,6 +383,7 @@ data Opts = Opts postgresql :: !(Map Text Text), postgresqlPassword :: !(Maybe FilePathSecrets), postgresqlPool :: !PoolConfig, + postgresMigration :: !PostgresMigrationOpts, -- | SFT Federation multiSFT :: !(Maybe Bool), -- | RabbitMQ settings, required when federation is enabled. diff --git a/services/galley/galley.integration.yaml b/services/galley/galley.integration.yaml index 10a7499f798..9d1c23291cb 100644 --- a/services/galley/galley.integration.yaml +++ b/services/galley/galley.integration.yaml @@ -252,3 +252,4 @@ postgresMigration: conversation: postgresql conversationCodes: postgresql teamFeatures: postgresql + domainRegistration: postgresql