Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3-bug-fixes/pg-reconnect
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reconnect and retry queries when the PostgreSQL server restarts
26 changes: 22 additions & 4 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@
};

hasql-migration = {
url = "github:wireapp/hasql-migration?ref=allow-no-transaction";
url = "github:wireapp/hasql-migration?ref=upgrade-hasql";
flake = false;
};

postgresql-connection-string = {
url = "github:wireapp/postgresql-connection-string?ref=expose-from-key-value-params";
flake = false;
};
};
Expand Down
2 changes: 2 additions & 0 deletions libs/extended/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
, memory
, metrics-wai
, monad-control
, postgresql-connection-string
, prometheus-client
, retry
, servant
Expand Down Expand Up @@ -80,6 +81,7 @@ mkDerivation {
memory
metrics-wai
monad-control
postgresql-connection-string
prometheus-client
retry
servant
Expand Down
1 change: 1 addition & 0 deletions libs/extended/extended.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ library
, memory
, metrics-wai
, monad-control
, postgresql-connection-string
, prometheus-client
, retry
, servant
Expand Down
17 changes: 7 additions & 10 deletions libs/extended/src/Hasql/Pool/Extended.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import Data.Map as Map
import Data.Misc
import Data.Set qualified as Set
import Data.UUID
import Hasql.Connection.Setting qualified as HasqlSetting
import Hasql.Connection.Setting.Connection qualified as HasqlConn
import Hasql.Connection.Setting.Connection.Param qualified as HasqlConfig
import Hasql.Connection.Settings qualified as HasqlConnSettings
import Hasql.Pool as HasqlPool
import Hasql.Pool.Config qualified as HasqlPool
import Hasql.Pool.Observation
import Imports
import PostgresqlConnectionString qualified
import Prometheus
import Util.Options

Expand All @@ -50,23 +49,21 @@ instance FromJSON PoolConfig where

-- | Creates a pool from postgres config params
--
-- HasqlConn.params translates pgParams into connection (which just holds the connection string and is not a real connection)
-- HasqlSetting.connection unwraps the connection string out of connection
-- HasqlPool.staticConnectionSettings translates the connection string to the pool settings
-- HasqlPool.staticConnectionSettings translates the connection settings to the pool settings
-- HasqlPool.settings translates the pool settings into pool config
-- HasqlPool.acquire creates the pool.
-- ezpz.
initPostgresPool :: PoolConfig -> Map Text Text -> Maybe FilePathSecrets -> IO HasqlPool.Pool
initPostgresPool config pgConfig mFpSecrets = do
mPw <- for mFpSecrets initCredentials
let pgConfigWithPw = maybe pgConfig (\pw -> Map.insert "password" pw pgConfig) mPw
pgParams = Map.foldMapWithKey (\k v -> [HasqlConfig.other k v]) pgConfigWithPw
let pgSettings =
HasqlConnSettings.connectionString (PostgresqlConnectionString.toUrl $ PostgresqlConnectionString.fromKeyValueParams pgConfig)
<> foldMap HasqlConnSettings.password mPw
metrics <- initHasqlPoolMetrics
connsRef <- newIORef $ Connections mempty mempty mempty
HasqlPool.acquire $
HasqlPool.settings
[ HasqlPool.staticConnectionSettings $
[HasqlSetting.connection $ HasqlConn.params pgParams],
[ HasqlPool.staticConnectionSettings pgSettings,
HasqlPool.size config.size,
HasqlPool.acquisitionTimeout config.acquisitionTimeout.duration,
HasqlPool.agingTimeout config.agingTimeout.duration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ saveConvToPostgres allConvData = do
meta.cnvmCellsState,
meta.cnvmParent
)
runTransaction ReadCommitted Write $ do
runTransactionWithRetry ReadCommitted Write $ do
Comment thread
fisx marked this conversation as resolved.
Transaction.statement convRow insertConv
Transaction.statement localMemberColumns insertLocalMembers
Transaction.statement remoteMemberColumns insertRemoteMembers
Expand Down Expand Up @@ -468,7 +468,7 @@ getRemoteMemberStatusFromCassandra uid = withCassandra $ do

saveRemoteMemberStatusToPostgres :: (PGConstraints r) => UserId -> Map (Remote ConvId) MemberStatus -> Sem r ()
saveRemoteMemberStatusToPostgres uid statusses =
runTransaction ReadCommitted Write $ do
runTransactionWithRetry ReadCommitted Write $ do
Transaction.statement statusColumns insertStatuses
Transaction.statement (DeleteUser, uid) markDeletionPendingStmt
where
Expand Down
12 changes: 6 additions & 6 deletions libs/wire-subsystems/src/Wire/ConversationStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ upsertConversationImpl lcnv nc = do
meta.cnvmParent,
fmap (.depth) hconfig
)
runTransaction ReadCommitted Write $ do
runTransactionWithRetry ReadCommitted Write $ do
Transaction.statement convRow insertConvStatement
upsertMembersTransaction storedConv.id_ $ UserList localUsers remoteUsers
pure storedConv
Expand Down Expand Up @@ -206,7 +206,7 @@ deleteConversationImpl cid =

getConversationImpl :: (PGConstraints r) => ConvId -> Sem r (Maybe StoredConversation)
getConversationImpl cid =
runTransaction ReadCommitted Read $ do
runTransactionWithRetry ReadCommitted Read $ do
mConvRow <- Transaction.statement cid selectConvMetadata
case mConvRow of
Nothing -> pure Nothing
Expand Down Expand Up @@ -624,7 +624,7 @@ deleteTeamConversationsImpl tid =
-- MEMBER OPERATIONS
upsertMembersImpl :: (PGConstraints r) => ConvId -> UserList (UserId, RoleName) -> Sem r ([LocalMember], [RemoteMember])
upsertMembersImpl convId users@(UserList lusers rusers) = do
runTransaction ReadCommitted Write $ upsertMembersTransaction convId users
runTransactionWithRetry ReadCommitted Write $ upsertMembersTransaction convId users
pure (map newMemberWithRole lusers, map newRemoteMemberWithRole rusers)

upsertMembersTransaction :: ConvId -> UserList (UserId, RoleName) -> Transaction ()
Expand Down Expand Up @@ -680,7 +680,7 @@ createBotMemberImpl serviceRef botId convId = do
getLocalMemberImpl :: (PGConstraints r) => ConvId -> UserId -> Sem r (Maybe LocalMember)
getLocalMemberImpl convId userId = do
mRow <-
runSession $ do
runSessionWithRetry $ do
mDirectMember <- HasqlSession.statement (convId, userId) selectMember
case mDirectMember of
Nothing -> HasqlSession.statement (convId, userId) selectParentMember
Expand Down Expand Up @@ -769,7 +769,7 @@ type RemoteMemberRow = (ConvId, Domain, UserId, RoleName)
getRemoteMemberImpl :: (PGConstraints r) => ConvId -> Remote UserId -> Sem r (Maybe RemoteMember)
getRemoteMemberImpl convId (tUntagged -> Qualified uid domain) = do
mRow <-
runSession $ do
runSessionWithRetry $ do
mDirectMember <- HasqlSession.statement (convId, domain, uid) selectMember
case mDirectMember of
Nothing -> HasqlSession.statement (convId, domain, uid) selectParentMember
Expand Down Expand Up @@ -958,7 +958,7 @@ setOtherRemoteMember cid (tUntagged -> Qualified uid domain) upd =

deleteMembersImpl :: (PGConstraints r) => ConvId -> UserList UserId -> Sem r ()
deleteMembersImpl cid users =
runTransaction ReadCommitted Write $ do
runTransactionWithRetry ReadCommitted Write $ do
Transaction.statement (cid, users.ulLocals) deleteLocalsStmt
for_ (bucketRemote users.ulRemotes) $ \(tUntagged -> Qualified remotes domain) ->
Transaction.statement (cid, domain, remotes) deleteRemotesStmt
Expand Down
60 changes: 14 additions & 46 deletions libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,17 @@ import Data.Time.Clock
import Data.UUID (UUID, nil)
import Data.Vector qualified as V
import Hasql.Pool
import Hasql.Session
import Hasql.Statement
import Hasql.TH
import Imports
import Polysemy
import Polysemy.Error (Error, throw)
import Polysemy.Error (Error)
import Polysemy.Input
import Wire.API.Meeting (Recurrence)
import Wire.API.PostgresMarshall (PostgresMarshall (..), PostgresUnmarshall (..), dimapPG)
import Wire.API.User.Identity (EmailAddress, fromEmail)
import Wire.MeetingsStore
import Wire.Postgres (PGConstraints)
import Wire.Postgres

interpretMeetingsStoreToPostgres ::
(PGConstraints r) =>
Expand Down Expand Up @@ -80,7 +79,6 @@ createMeetingImpl ::
Bool ->
Sem r StoredMeeting
createMeetingImpl title creator startTime endTime recurrence convId emails trial = do
pool <- input
now <- liftIO getCurrentTime
let sm =
StoredMeeting
Expand All @@ -96,8 +94,7 @@ createMeetingImpl title creator startTime endTime recurrence convId emails trial
createdAt = now,
updatedAt = now
}
result <- liftIO $ use pool $ statement sm insertStatement
either throw pure result
runStatement sm insertStatement

insertStatement :: Statement StoredMeeting StoredMeeting
insertStatement =
Expand Down Expand Up @@ -187,18 +184,12 @@ updateMeetingImpl ::
Maybe (Maybe Recurrence) ->
Sem r (Maybe StoredMeeting)
updateMeetingImpl meetingId mTitle mStartDate mEndDate mRecurrence = do
pool <- input
result <- liftIO $ use pool session
either throw pure result
case mRecurrence of
Nothing ->
runStatement (mTitle, mStartDate, mEndDate, meetingId) updateWithoutRecurrenceStatement
Just recurrence ->
runStatement (mTitle, mStartDate, mEndDate, recurrence, meetingId) updateWithRecurrenceStatement
where
session :: Session (Maybe StoredMeeting)
session =
case mRecurrence of
Nothing ->
statement (mTitle, mStartDate, mEndDate, meetingId) updateWithoutRecurrenceStatement
Just recurrence ->
statement (mTitle, mStartDate, mEndDate, recurrence, meetingId) updateWithRecurrenceStatement

updateWithRecurrenceStatement :: Statement UpdateMeetingWithRecurrenceTuple (Maybe StoredMeeting)
updateWithRecurrenceStatement =
dimapPG
Expand Down Expand Up @@ -256,12 +247,8 @@ deleteMeetingImpl ::
MeetingId ->
Sem r ()
deleteMeetingImpl meetingId = do
pool <- input
result <- liftIO $ use pool session
either throw pure result
runStatement (toUUID meetingId) deleteStatement
where
session :: Session ()
session = statement (toUUID meetingId) deleteStatement
deleteStatement :: Statement UUID ()
deleteStatement =
[resultlessStatement|
Expand All @@ -276,9 +263,7 @@ getMeetingImpl ::
MeetingId ->
Sem r (Maybe StoredMeeting)
getMeetingImpl meetingId = do
pool <- input
result <- liftIO $ use pool $ statement (toUUID meetingId) getMeetingStatement
either throw pure result
runStatement (toUUID meetingId) getMeetingStatement

getMeetingStatement :: Statement UUID (Maybe StoredMeeting)
getMeetingStatement =
Expand All @@ -303,12 +288,8 @@ listMeetingsByUserImpl ::
UTCTime ->
Sem r [StoredMeeting]
listMeetingsByUserImpl userId cutoffTime = do
pool <- input
result <- liftIO $ use pool session
either throw pure result
runStatement (toUUID userId, cutoffTime) $ V.toList <$> listStatement
where
session :: Session [StoredMeeting]
session = statement (toUUID userId, cutoffTime) $ V.toList <$> listStatement
listStatement :: Statement (UUID, UTCTime) (V.Vector StoredMeeting)
listStatement =
refineResult
Expand All @@ -331,12 +312,8 @@ listMeetingsByConversationImpl ::
UTCTime ->
Sem r [StoredMeeting]
listMeetingsByConversationImpl convId cutoffTime = do
pool <- input
result <- liftIO $ use pool session
either throw pure result
runStatement (toUUID convId, cutoffTime) $ V.toList <$> listStatement
where
session :: Session [StoredMeeting]
session = statement (toUUID convId, cutoffTime) $ V.toList <$> listStatement
listStatement :: Statement (UUID, UTCTime) (V.Vector StoredMeeting)
listStatement =
refineResult
Expand All @@ -359,13 +336,8 @@ addInvitedEmailsImpl ::
[EmailAddress] ->
Sem r ()
addInvitedEmailsImpl meetingId emails = do
pool <- input
result <- liftIO $ use pool session
either throw pure result
runStatement (V.fromList (fromEmail <$> emails), toUUID meetingId) addEmailStatement
where
session :: Session ()
session = statement (V.fromList (fromEmail <$> emails), toUUID meetingId) addEmailStatement

addEmailStatement :: Statement (V.Vector Text, UUID) ()
addEmailStatement =
[resultlessStatement|
Expand All @@ -381,12 +353,8 @@ removeInvitedEmailsImpl ::
[EmailAddress] ->
Sem r ()
removeInvitedEmailsImpl meetingId emails = do
pool <- input
result <- liftIO $ use pool session
either throw pure result
runStatement (V.fromList (fromEmail <$> emails), toUUID meetingId) removeEmailStatement
where
session :: Session ()
session = statement (V.fromList (fromEmail <$> emails), toUUID meetingId) removeEmailStatement
removeEmailStatement :: Statement (V.Vector Text, UUID) ()
removeEmailStatement =
[resultlessStatement|
Expand Down
Loading