Skip to content

Commit f71808a

Browse files
committed
finalise the bulk inserts
1 parent c9498ba commit f71808a

File tree

5 files changed

+195
-81
lines changed

5 files changed

+195
-81
lines changed

cardano-db/cardano-db.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ library
6969
Cardano.Db.Statement.Base
7070
Cardano.Db.Statement.EpochAndProtocol
7171
Cardano.Db.Statement.GovernanceAndVoting
72+
Cardano.Db.Statement.Helpers
7273
Cardano.Db.Statement.MultiAsset
7374
Cardano.Db.Statement.OffChain
7475
Cardano.Db.Statement.Pool
@@ -89,6 +90,7 @@ library
8990
, containers
9091
, conduit-extra
9192
, contra-tracer
93+
, contravariant-extras
9294
, cryptonite
9395
, directory
9496
, esqueleto

cardano-db/src/Cardano/Db/Schema/Core/Base.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,8 @@ txInDecoder =
220220
txInEncoder :: E.Params TxIn
221221
txInEncoder =
222222
mconcat
223-
[ txInId >$< idEncoder getTxInId
224-
, txInTxInId >$< idEncoder getTxId
223+
[ -- txInId >$< idEncoder getTxInId
224+
txInTxInId >$< idEncoder getTxId
225225
, txInTxOutId >$< idEncoder getTxId
226226
, txInTxOutIndex >$< E.param (E.nonNullable $ fromIntegral >$< E.int8)
227227
, txInRedeemerId >$< maybeIdEncoder getRedeemerId

cardano-db/src/Cardano/Db/Statement/Base.hs

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,26 @@
33
module Cardano.Db.Statement.Base where
44

55
import Cardano.Db.Schema.Core (Block)
6-
import Cardano.Db.Schema.Ids (BlockId (..), idDecoder)
7-
import qualified Hasql.Transaction as SqlTx
8-
import Cardano.Prelude (MonadIO)
9-
import Cardano.Db.Error (AsDbError)
10-
import Cardano.Db.Types (DbAction, runDbTx, DbTxMode (..))
11-
import Cardano.Db.Schema.Core.Base (blockEncoder)
12-
import qualified Hasql.Statement as SqlStmt
13-
import qualified Hasql.Decoders as SqlDecode
6+
import Cardano.Db.Schema.Core.Base ( TxIn (..), blockEncoder )
7+
import Cardano.Db.Schema.Ids (BlockId (..), idDecoder, TxInId (..), TxId (..), RedeemerId (..))
8+
import Cardano.Db.Types (DbAction, DbTxMode (..))
9+
import Cardano.Prelude (MonadIO, Word64)
10+
import qualified Hasql.Decoders as HsqlD
11+
import qualified Hasql.Statement as HsqlS
12+
import qualified Hasql.Transaction as HsqlT
13+
import qualified Hasql.Encoders as HsqlE
14+
import Data.Functor.Contravariant ((>$<))
15+
import Contravariant.Extras (contrazip4)
16+
import Cardano.Db.Statement.Helpers (runDbT, mkDbTransaction, bulkInsert)
1417

1518
-- The wrapped version that provides the DbAction context
16-
insertBlockTx :: (MonadIO m, AsDbError e) => Block -> DbAction e m BlockId
17-
insertBlockTx block = runDbTx Write $ insertBlockStm block
19+
insertBlock :: MonadIO m => Block -> DbAction m BlockId
20+
insertBlock block =
21+
runDbT Write $ mkDbTransaction "" $ insertBlockStm block
1822

19-
insertBlockStm :: Block -> SqlTx.Transaction BlockId
23+
insertBlockStm :: Block -> HsqlT.Transaction BlockId
2024
insertBlockStm block =
21-
SqlTx.statement block $ SqlStmt.Statement sql blockEncoder (SqlDecode.singleRow $ idDecoder BlockId) True
25+
HsqlT.statement block $ HsqlS.Statement sql blockEncoder (HsqlD.singleRow $ idDecoder BlockId) True
2226
where
2327
sql =
2428
"INSERT INTO block \
@@ -30,6 +34,34 @@ insertBlockStm block =
3034
\RETURNING id"
3135

3236

37+
insertManyTxIn :: MonadIO m => [TxIn] -> DbAction m [TxInId]
38+
insertManyTxIn txIns = runDbT Write $ mkDbTransaction "insertManyTxIn" (insertManyTxInStm txIns)
39+
40+
insertManyTxInStm :: [TxIn] -> HsqlT.Transaction [TxInId]
41+
insertManyTxInStm txIns =
42+
bulkInsert
43+
"tx_in"
44+
["tx_in_id", "tx_out_id", "tx_out_index", "redeemer_id"]
45+
["bigint[]", "bigint[]", "int8[]", "int8[]"]
46+
extractTxIn
47+
encodeTxIn
48+
(HsqlD.rowList $ idDecoder TxInId)
49+
txIns
50+
51+
where
52+
extractTxIn :: [TxIn] -> ([TxId], [TxId], [Word64], [Maybe RedeemerId])
53+
extractTxIn xs = ( map txInTxInId xs
54+
, map txInTxOutId xs
55+
, map txInTxOutIndex xs
56+
, map txInRedeemerId xs
57+
)
58+
59+
encodeTxIn :: HsqlE.Params ([TxId], [TxId], [Word64], [Maybe RedeemerId])
60+
encodeTxIn = contrazip4
61+
(HsqlE.param $ HsqlE.nonNullable $ HsqlE.foldableArray $ HsqlE.nonNullable $ getTxId >$< HsqlE.int8)
62+
(HsqlE.param $ HsqlE.nonNullable $ HsqlE.foldableArray $ HsqlE.nonNullable $ getTxId >$< HsqlE.int8)
63+
(HsqlE.param $ HsqlE.nonNullable $ HsqlE.foldableArray $ HsqlE.nonNullable $ fromIntegral >$< HsqlE.int8)
64+
(HsqlE.param $ HsqlE.nonNullable $ HsqlE.foldableArray $ HsqlE.nullable $ getRedeemerId >$< HsqlE.int8)
3365

3466
-- These tables store fundamental blockchain data, such as blocks, transactions, and UTXOs.
3567

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
{-# LANGUAGE RecordWildCards #-}
2+
{-# LANGUAGE OverloadedStrings #-}
3+
4+
module Cardano.Db.Statement.Helpers where
5+
6+
import Cardano.BM.Trace (logDebug)
7+
import Cardano.Db.Error (CallSite (..), DbError (..))
8+
import Cardano.Db.Types (DbAction (..), DbTxMode (..), DbTransaction (..), DbEnv (..))
9+
import Cardano.Prelude (MonadIO (..), ask, when, MonadError (..))
10+
import Data.Time (getCurrentTime, diffUTCTime)
11+
import GHC.Stack (HasCallStack, getCallStack, callStack, SrcLoc (..))
12+
import qualified Data.Text as Text
13+
import qualified Hasql.Decoders as HsqlD
14+
import qualified Hasql.Encoders as HsqlE
15+
import qualified Hasql.Session as HsqlS
16+
import qualified Hasql.Statement as HsqlS
17+
import qualified Hasql.Transaction as HsqlT
18+
import qualified Hasql.Transaction.Sessions as HsqlT
19+
import qualified Data.Text.Encoding as TextEnc
20+
21+
-- | Runs a database transaction with optional logging.
22+
--
23+
-- This function executes a `DbTransaction` within the `DbAction` monad, handling
24+
-- the transaction mode (read-only or write) and logging execution details if
25+
-- enabled in the `DbEnv`. It captures timing information and call site details
26+
-- for debugging purposes when logging is active.
27+
--
28+
-- ==== Parameters
29+
-- * @mode@: The transaction mode (`Write` or `ReadOnly`).
30+
-- * @DbTransaction{..}@: The transaction to execute, containing the function name,
31+
-- call site, and the `Hasql` transaction.
32+
--
33+
-- ==== Returns
34+
-- * @DbAction m a@: The result of the transaction wrapped in the `DbAction` monad.
35+
runDbT
36+
:: MonadIO m
37+
=> DbTxMode
38+
-> DbTransaction a
39+
-> DbAction m a
40+
runDbT mode DbTransaction{..} = DbAction $ do
41+
dbEnv <- ask
42+
let logMsg msg = when (dbEnableLogging dbEnv) $ liftIO $ logDebug (dbTracer dbEnv) msg
43+
44+
-- Run the session and handle the result
45+
let runSession = do
46+
result <- liftIO $ HsqlS.run session (dbConnection dbEnv)
47+
case result of
48+
Left err -> throwError $ QueryError "Transaction failed" dtCallSite err
49+
Right val -> pure val
50+
51+
if dbEnableLogging dbEnv
52+
then do
53+
logMsg $ "Starting transaction: " <> dtFunctionName <> locationInfo
54+
start <- liftIO getCurrentTime
55+
result <- runSession
56+
end <- liftIO getCurrentTime
57+
let duration = diffUTCTime end start
58+
logMsg $ "Transaction completed: " <> dtFunctionName <> locationInfo <> " in " <> Text.pack (show duration)
59+
pure result
60+
else runSession
61+
where
62+
session = HsqlT.transaction HsqlT.Serializable txMode dtTx
63+
txMode = case mode of
64+
Write -> HsqlT.Write
65+
ReadOnly -> HsqlT.Read
66+
locationInfo = " at " <> csModule dtCallSite <> ":" <>
67+
csFile dtCallSite <> ":" <> Text.pack (show $ csLine dtCallSite)
68+
69+
-- | Creates a `DbTransaction` with a function name and call site.
70+
--
71+
-- Constructs a `DbTransaction` record for use with `runDbT`, capturing the
72+
-- function name and call site from the current stack trace. This is useful
73+
-- for logging and debugging database operations.
74+
--
75+
-- ==== Parameters
76+
-- * @funcName@: The name of the function or operation being performed.
77+
-- * @transx@: The `Hasql` transaction to encapsulate.
78+
--
79+
-- ==== Returns
80+
-- * @DbTransaction a@: A transaction record with metadata.
81+
mkDbTransaction :: Text.Text -> HsqlT.Transaction a -> DbTransaction a
82+
mkDbTransaction funcName transx =
83+
DbTransaction
84+
{ dtFunctionName = funcName
85+
, dtCallSite = mkCallSite
86+
, dtTx = transx
87+
}
88+
where
89+
mkCallSite :: HasCallStack => CallSite
90+
mkCallSite =
91+
case reverse (getCallStack callStack) of
92+
(_, srcLoc) : _ -> CallSite
93+
{ csModule = Text.pack $ srcLocModule srcLoc
94+
, csFile = Text.pack $ srcLocFile srcLoc
95+
, csLine = srcLocStartLine srcLoc
96+
}
97+
[] -> error "No call stack info"
98+
99+
-- | Inserts multiple records into a table in a single transaction using UNNEST.
100+
--
101+
-- This function performs a bulk insert into a specified table, using PostgreSQL’s
102+
-- `UNNEST` to expand arrays of field values into rows. It’s designed for efficiency,
103+
-- executing all inserts in one SQL statement, and returns the generated IDs.
104+
--
105+
-- ==== Parameters
106+
-- * @table@: Text - The name of the table to insert into.
107+
-- * @cols@: [Text] - List of column names (excluding the ID column).
108+
-- * @types@: [Text] - List of PostgreSQL type casts for each column (e.g., "bigint[]").
109+
-- * @extract@: ([a] -> [b]) - Function to extract fields from a list of records into a tuple of lists.
110+
-- * @enc@: HsqlE.Params [b] - Encoder for the extracted fields as a tuple of lists.
111+
-- * @dec@: HsqlD.Result [c] - Decoder for the returned IDs.
112+
-- * @xs@: [a] - List of records to insert.
113+
--
114+
-- ==== Returns
115+
-- * @DbAction m [c]@: The list of generated IDs wrapped in the `DbAction` monad.
116+
bulkInsert
117+
:: Text.Text -- Table name
118+
-> [Text.Text] -- Column names
119+
-> [Text.Text] -- Type casts for UNNEST
120+
-> ([a] -> b) -- Field extractor (e.g., to tuple)
121+
-> HsqlE.Params b -- Bulk encoder
122+
-> HsqlD.Result [c] -- ID decoder
123+
-> [a] -- Records
124+
-> HsqlT.Transaction [c] -- Resulting IDs
125+
bulkInsert table cols types extract enc dec xs =
126+
HsqlT.statement params $ HsqlS.Statement sql enc dec True
127+
where
128+
params = extract xs
129+
sql = TextEnc.encodeUtf8 $
130+
"INSERT INTO " <> table <> " (" <> Text.intercalate ", " cols <> ") \
131+
\SELECT * FROM UNNEST (" <> Text.intercalate ", " (zipWith (\i t -> "$" <> Text.pack (show i) <> "::" <> t) [1..] types) <> ") \
132+
\RETURNING id"

cardano-db/src/Cardano/Db/Types.hs

Lines changed: 15 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
module Cardano.Db.Types (
1212
DbAction (..),
1313
DbTxMode (..),
14+
DbTransaction (..),
1415
DbEnv (..),
1516
Ada (..),
1617
AnchorType (..),
@@ -34,9 +35,6 @@ module Cardano.Db.Types (
3435
VoterRole (..),
3536
GovActionType (..),
3637
BootstrapState (..),
37-
runDbTx,
38-
mkCallSite,
39-
mkDbTransaction,
4038
dbInt65Decoder,
4139
dbInt65Encoder,
4240
rewardSourceDecoder,
@@ -117,38 +115,38 @@ import Data.Word (Word16, Word64)
117115
import GHC.Generics (Generic)
118116
import Quiet (Quiet (..))
119117
import Data.Int (Int64)
120-
import Cardano.Prelude (Bifunctor(..), MonadError (..), MonadIO (..), ask)
118+
import Cardano.Prelude (Bifunctor(..), MonadError (..), MonadIO (..), ask, MonadReader, when)
121119
import Data.Bits (Bits(..))
122120
import qualified Hasql.Decoders as D
123121
import qualified Hasql.Encoders as E
124122
import Data.Functor.Contravariant ((>$<))
125123
import Data.WideWord (Word128 (..))
126124
import Control.Monad.Trans.Except (ExceptT)
127125
import Control.Monad.Trans.Reader (ReaderT)
128-
import Control.Monad.Logger (LoggingT, MonadLogger)
129-
import Cardano.Db.Error (AsDbError, DbError (..), toDbError, CallSite (..))
130-
import qualified Hasql.Connection as HasqlC
131-
import qualified Hasql.Session as HasqlS
132-
import qualified Hasql.Transaction as HasqlTx
133-
import qualified Hasql.Transaction.Sessions as HasqlTx
126+
import Cardano.Db.Error (DbError (..), CallSite (..))
127+
import qualified Hasql.Connection as HsqlC
128+
import qualified Hasql.Session as HsqlS
129+
import qualified Hasql.Transaction as HsqlT
130+
import qualified Hasql.Transaction.Sessions as HsqlT
134131
import Cardano.BM.Trace (Trace, logDebug)
135132
import GHC.Stack (SrcLoc (..), HasCallStack, getCallStack, callStack)
136133
import Data.Time (getCurrentTime, diffUTCTime)
137134

138-
-- | The database action monad.
139-
newtype DbAction e m a = DbAction
140-
{ runDbAction :: ExceptT e (ReaderT DbEnv (LoggingT m)) a }
135+
136+
newtype DbAction m a = DbAction
137+
{ runDbAction :: ExceptT DbError (ReaderT DbEnv m) a }
141138
deriving newtype
142139
( Functor, Applicative, Monad
143-
, MonadError e
144-
, MonadIO, MonadLogger
140+
, MonadError DbError
141+
, MonadReader DbEnv
142+
, MonadIO
145143
)
146144

147145
data DbTxMode = Write | ReadOnly
148146

149147
-- Environment with transaction settings
150148
data DbEnv = DbEnv
151-
{ dbConnection :: !HasqlC.Connection
149+
{ dbConnection :: !HsqlC.Connection
152150
, dbEnableLogging :: !Bool
153151
,dbTracer :: !(Trace IO Text)
154152
}
@@ -157,59 +155,9 @@ data DbEnv = DbEnv
157155
data DbTransaction a = DbTransaction
158156
{ dtFunctionName :: !Text
159157
, dtCallSite :: !CallSite
160-
, dtTx :: !(HasqlTx.Transaction a)
158+
, dtTx :: !(HsqlT.Transaction a)
161159
}
162160

163-
mkCallSite :: HasCallStack => CallSite
164-
mkCallSite =
165-
case reverse (getCallStack callStack) of
166-
(_, srcLoc) : _ -> CallSite
167-
{ csModule = Text.pack $ srcLocModule srcLoc
168-
, csFile = Text.pack $ srcLocFile srcLoc
169-
, csLine = srcLocStartLine srcLoc
170-
}
171-
[] -> error "No call stack info"
172-
173-
mkDbTransaction :: Text -> CallSite -> HasqlTx.Transaction a -> DbTransaction a
174-
mkDbTransaction funcName callSite transx =
175-
DbTransaction { dtFunctionName = funcName
176-
, dtCallSite = callSite
177-
, dtTx = transx
178-
}
179-
180-
runDbTx :: (MonadIO m, AsDbError e)
181-
=> DbTxMode
182-
-> DbTransaction a
183-
-> DbAction e m a
184-
runDbTx mode DbTransaction{..} = DbAction $ do
185-
env <- ask
186-
let session = HasqlTx.transaction HasqlTx.Serializable txMode dtTx
187-
txMode = case mode of
188-
Write -> HasqlTx.Write
189-
ReadOnly -> HasqlTx.Read
190-
if not (dbEnableLogging env)
191-
then do
192-
-- Just run the transaction without any logging overhead
193-
result <- liftIO $ HasqlS.run session (dbConnection env)
194-
either (throwError . toDbError . QueryError "Transaction failed" dtCallSite) pure result
195-
else do
196-
-- Logging path with timing and location info
197-
let locationInfo = " at " <> csModule dtCallSite <> ":" <>
198-
csFile dtCallSite <> ":" <> Text.pack (show $ csLine dtCallSite)
199-
200-
logDbDebug env $ "Starting transaction: " <> dtFunctionName <> locationInfo
201-
start <- liftIO getCurrentTime
202-
result <- liftIO $ HasqlS.run session (dbConnection env)
203-
end <- liftIO getCurrentTime
204-
let duration = diffUTCTime end start
205-
logDbDebug env $ "Transaction completed: "
206-
<> dtFunctionName <> locationInfo <> " in " <> Text.pack (show duration)
207-
either (throwError . toDbError . QueryError "Transaction failed" dtCallSite) pure result
208-
209-
logDbDebug :: MonadIO m => DbEnv -> Text -> m ()
210-
logDbDebug dbEnv msg =
211-
liftIO $ logDebug (dbTracer dbEnv) msg
212-
213161
newtype Ada = Ada
214162
{ unAda :: Micro
215163
}

0 commit comments

Comments
 (0)