Skip to content

Commit 1dfad08

Browse files
author
flowcore-platform
committed
feat(duckdb): ✨ Add duckdb-async support and enhance DuckDB tools
1 parent 0df6421 commit 1dfad08

File tree

7 files changed

+220
-116
lines changed

7 files changed

+220
-116
lines changed

bun.lock

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"aws-sdk": "^2.1692.0",
1818
"dayjs": "^1.11.13",
1919
"duckdb": "^1.2.0",
20+
"duckdb-async": "^1.2.0",
2021
"jwt-decode": "^4.0.0",
2122
"mock-aws-s3": "^4.0.2",
2223
"nock": "^14.0.1",
@@ -209,6 +210,8 @@
209210

210211
"duckdb": ["[email protected]", "", { "dependencies": { "@mapbox/node-pre-gyp": "^2.0.0", "node-addon-api": "^7.0.0", "node-gyp": "^9.3.0" } }, "sha512-zAHHRTMoZhWIwvOsyNkgV9c1nq0gR0j+ZyX0uTCRFZTNOlYO4lnErP5Fddt/6iKMXsTNL9v1oTG9E76S5jMh7w=="],
211212

213+
"duckdb-async": ["[email protected]", "", { "dependencies": { "duckdb": "^1.2.0" } }, "sha512-Ri8PL/TGZRhbW+53a/zqD4nJdH3p+mRB6owb7Hl5tXu3e1rvXVLil543aiVEnVKKxSHSBUwHyoutt+rOHrhc6g=="],
214+
212215
"dunder-proto": ["[email protected]", "", { "dependencies": { "call-bind-apply-helpers": "^1.0.1", "es-errors": "^1.3.0", "gopd": "^1.2.0" } }, "sha512-KIN/nDJBQRcXw0MLVhZE9iQHmG68qAVIBg9CqmUYjmQIhgij9U5MFvrqkUL5FbtyyzZuOeOt0zdeRe4UY7ct+A=="],
213216

214217
"eastasianwidth": ["[email protected]", "", {}, "sha512-I88TYZWc9XiYHRQ4/3c5rjjfgkjhLyW2luGIheGERbNQ6OY7yTybanSpDXZa8y7VUP9YmDcYa+eyq4ca7iLqWA=="],

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
"aws-sdk": "^2.1692.0",
4545
"dayjs": "^1.11.13",
4646
"duckdb": "^1.2.0",
47+
"duckdb-async": "^1.2.0",
4748
"jwt-decode": "^4.0.0",
4849
"mock-aws-s3": "^4.0.2",
4950
"nock": "^14.0.1",

src/duckdb/index.ts

Lines changed: 61 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,50 @@
1-
import { type DuckDBConnection, DuckDBInstance } from "@duckdb/node-api"
1+
import { type Connection, Database } from "duckdb-async"
22

3-
let instance: DuckDBInstance | null = null
4-
let connection: DuckDBConnection | null = null
3+
let db: Database | null = null
4+
let connection: Connection | null = null
55

66
/**
77
* Initializes an in-memory DuckDB database
88
* @returns A reference to the initialized database
99
*/
10-
export async function initializeDuckDB(databaseFile?: string) {
11-
if (instance !== null) {
12-
return { success: true, databaseInstance: instance }
13-
}
14-
10+
export async function initializeDuckDB(
11+
file?: string,
12+
): Promise<{ success: boolean; databaseInstance?: Database; error?: string }> {
1513
try {
16-
// Create a new DuckDB instance with a persistent database file
17-
instance = await DuckDBInstance.create(databaseFile || ":memory:")
18-
connection = await instance.connect()
14+
// Create a new DuckDB instance with a persistent database file or in-memory
15+
db = await Database.create(file || ":memory:")
16+
connection = await db.connect()
17+
18+
// Verify connection is working
19+
await connection.exec("SELECT 1")
1920

20-
return { success: true, databaseInstance: instance }
21+
return { success: true, databaseInstance: db }
2122
} catch (error) {
2223
return { success: false, error: `Failed to initialize DuckDB: ${error}` }
2324
}
2425
}
2526

27+
export async function closeDuckDB() {
28+
if (db) {
29+
await db.close()
30+
}
31+
}
32+
2633
/**
2734
* Creates a table in the DuckDB database using raw SQL
2835
* @param createTableSQL The SQL CREATE TABLE statement
2936
* @returns Object indicating success or failure
3037
*/
31-
export async function createTableWithSQL(createTableSQL: string) {
32-
if (!instance || !connection) {
38+
export async function createTableWithSQL(createTableSQL: string): Promise<{ success: boolean; message: string }> {
39+
if (!db || !connection) {
3340
return {
3441
success: false,
3542
message: "Database not initialized. Call initializeDuckDB first.",
3643
}
3744
}
3845

3946
try {
40-
await connection.run(createTableSQL)
41-
47+
await connection.exec(createTableSQL)
4248
return {
4349
success: true,
4450
message: "Table created successfully",
@@ -61,8 +67,8 @@ export async function createTableWithSQL(createTableSQL: string) {
6167
export async function createProjectionTable(
6268
tableName: string,
6369
columns: Array<{ name: string; type: string; constraints?: string }>,
64-
) {
65-
if (!instance || !connection) {
70+
): Promise<{ success: boolean; message: string }> {
71+
if (!db || !connection) {
6672
return {
6773
success: false,
6874
message: "Database not initialized. Call initializeDuckDB first.",
@@ -77,7 +83,7 @@ export async function createProjectionTable(
7783

7884
const createTableSQL = `CREATE TABLE IF NOT EXISTS ${tableName} (${columnsSQL})`
7985

80-
await connection.run(createTableSQL)
86+
await connection.exec(createTableSQL)
8187

8288
return {
8389
success: true,
@@ -97,8 +103,11 @@ export async function createProjectionTable(
97103
* @param record Record to insert
98104
* @returns Object indicating success or failure
99105
*/
100-
export async function insertRecordIntoTable(tableName: string, record: Record<string, unknown>) {
101-
if (!instance || !connection) {
106+
export async function insertRecordIntoTable(
107+
tableName: string,
108+
record: Record<string, unknown>,
109+
): Promise<{ success: boolean; message: string }> {
110+
if (!db || !connection) {
102111
return {
103112
success: false,
104113
message: "Database not initialized. Call initializeDuckDB first.",
@@ -107,46 +116,35 @@ export async function insertRecordIntoTable(tableName: string, record: Record<st
107116

108117
try {
109118
const columnNames = Object.keys(record).join(", ")
110-
const placeholders = Object.keys(record)
111-
.map((_, index) => `$${index + 1}`)
112-
.join(", ")
113-
const values = Object.values(record)
114119

115-
const insertSQL = `INSERT INTO ${tableName} (${columnNames}) VALUES (${placeholders})`
116-
117-
try {
118-
const prepared = await connection.prepare(insertSQL)
120+
// Format values according to their type
121+
const formattedValues = Object.values(record)
122+
.map((value) => {
123+
if (value === null) {
124+
return "NULL"
125+
}
119126

120-
// Bind values based on their types
121-
values.forEach((value, index) => {
122-
const paramIndex = index + 1
123127
if (typeof value === "string") {
124-
prepared.bindVarchar(paramIndex, value)
125-
} else if (typeof value === "number") {
126-
if (Number.isInteger(value)) {
127-
prepared.bindInteger(paramIndex, value)
128-
} else {
129-
prepared.bindDouble(paramIndex, value)
130-
}
131-
} else if (typeof value === "boolean") {
132-
prepared.bindBoolean(paramIndex, value)
133-
} else if (value === null) {
134-
prepared.bindNull(paramIndex)
135-
} else if (typeof value === "bigint") {
136-
prepared.bindBigInt(paramIndex, value)
137-
} else {
138-
// For other types, convert to string
139-
prepared.bindVarchar(paramIndex, String(value))
128+
// Escape single quotes in strings
129+
return `'${String(value).replace(/'/g, "''")}'`
140130
}
131+
132+
if (typeof value === "boolean") {
133+
return value ? "TRUE" : "FALSE"
134+
}
135+
136+
if (value instanceof Date) {
137+
return `'${value.toISOString()}'`
138+
}
139+
140+
return String(value)
141141
})
142+
.join(", ")
143+
144+
const insertSQL = `INSERT INTO ${tableName} (${columnNames}) VALUES (${formattedValues})`
145+
146+
await connection.exec(insertSQL)
142147

143-
await prepared.run()
144-
} catch (error) {
145-
return {
146-
success: false,
147-
message: `Failed to insert record into ${tableName}: ${error}`,
148-
}
149-
}
150148
return {
151149
success: true,
152150
message: `Record inserted into ${tableName} successfully`,
@@ -164,8 +162,10 @@ export async function insertRecordIntoTable(tableName: string, record: Record<st
164162
* @param query SQL query to execute
165163
* @returns Object with query results or error
166164
*/
167-
export async function queryDatabase(query: string) {
168-
if (!instance || !connection) {
165+
export async function queryDatabase(
166+
query: string,
167+
): Promise<{ success: boolean; message: string; results: unknown[] | null }> {
168+
if (!db || !connection) {
169169
return {
170170
success: false,
171171
message: "Database not initialized. Call initializeDuckDB first.",
@@ -174,11 +174,12 @@ export async function queryDatabase(query: string) {
174174
}
175175

176176
try {
177-
const result = await connection.run(query)
177+
const rows = await connection.all(query)
178+
178179
return {
179180
success: true,
180181
message: "Query executed successfully",
181-
results: result,
182+
results: rows,
182183
}
183184
} catch (error) {
184185
return {
@@ -194,7 +195,7 @@ export async function queryDatabase(query: string) {
194195
* @returns The database instance or null if not initialized
195196
*/
196197
export function getDatabaseInstance() {
197-
return instance
198+
return db
198199
}
199200

200201
/**

src/duckdb/projector.ts

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,33 @@ export async function createEventProjector(name: string, absoluteFilePath: strin
2626
// Create a unique ID for the projector
2727
const projectorId = `${name}-${Date.now()}`
2828

29-
const projectorFunction = await import(absoluteFilePath)
29+
try {
30+
const projectorFunction = await import(absoluteFilePath)
3031

31-
// Store the projector info
32-
projectors[name] = {
33-
name,
34-
function: projectorFunction.default as ProjectionFunction,
35-
}
32+
if (!projectorFunction.default && projectorFunction.projectEvent) {
33+
// Store the projector info
34+
projectors[name] = {
35+
name,
36+
function: projectorFunction.projectEvent as ProjectionFunction,
37+
}
38+
} else if (projectorFunction.default) {
39+
// Store the projector info
40+
projectors[name] = {
41+
name,
42+
function: projectorFunction.default as ProjectionFunction,
43+
}
44+
} else {
45+
throw new Error(`No valid projector function found in ${absoluteFilePath}`)
46+
}
3647

37-
return {
38-
success: true,
39-
projectorId,
40-
name,
48+
return {
49+
success: true,
50+
projectorId,
51+
name,
52+
}
53+
} catch (error) {
54+
console.error(`Failed to create event projector '${name}' from ${absoluteFilePath}:`, error)
55+
throw error
4156
}
4257
}
4358

@@ -47,7 +62,8 @@ export async function createEventProjector(name: string, absoluteFilePath: strin
4762
* @returns The projector or undefined if not found
4863
*/
4964
export function getProjectorByName(name: string): ProjectorInfo | undefined {
50-
return projectors[name]
65+
const projector = projectors[name]
66+
return projector
5167
}
5268

5369
/**

src/duckdb/stream.ts

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ dayjs.extend(utc)
99
// Store active streams
1010
const activeStreams: Record<string, StreamInfo> = {}
1111

12+
const activeDataPumps: Record<string, FlowcoreDataPump> = {}
13+
1214
interface StreamInfo {
1315
id: string
14-
status: "INITIALIZING" | "RUNNING" | "COMPLETED" | "ERROR"
16+
status: "INITIALIZING" | "RUNNING" | "COMPLETED" | "ERROR" | "STOPPED"
1517
eventCount: number
1618
tenant: string
1719
dataCore: string
@@ -104,11 +106,10 @@ async function processEventStream(getBearerToken: () => Promise<string>, streamI
104106

105107
try {
106108
streamInfo.status = "RUNNING"
107-
108109
const startState = getState(streamInfo.startDate)
109110
const endState = getState(streamInfo.endDate)
110111

111-
const dataPump = FlowcoreDataPump.create({
112+
activeDataPumps[streamId] = FlowcoreDataPump.create({
112113
auth: {
113114
getBearerToken: async () => await getBearerToken(),
114115
},
@@ -133,10 +134,21 @@ async function processEventStream(getBearerToken: () => Promise<string>, streamI
133134
return
134135
}
135136

137+
let successCount = 0
138+
let errorCount = 0
139+
136140
for (const event of events) {
137-
const projectionResult = await projectEvent(event, streamInfo.targetTable, streamInfo.projectorName)
138-
if (projectionResult.success) {
139-
streamInfo.eventCount++
141+
try {
142+
const projectionResult = await projectEvent(event, streamInfo.targetTable, streamInfo.projectorName)
143+
144+
if (projectionResult.success) {
145+
successCount++
146+
streamInfo.eventCount++
147+
} else {
148+
errorCount++
149+
}
150+
} catch (error) {
151+
errorCount++
140152
}
141153
}
142154
},
@@ -148,7 +160,7 @@ async function processEventStream(getBearerToken: () => Promise<string>, streamI
148160
logger: console,
149161
})
150162

151-
await dataPump.start((error?: Error) => {
163+
await activeDataPumps[streamId].start((error?: Error) => {
152164
if (error) {
153165
streamInfo.status = "ERROR"
154166
streamInfo.error = `${error}`
@@ -163,6 +175,28 @@ async function processEventStream(getBearerToken: () => Promise<string>, streamI
163175
}
164176
}
165177

178+
export function stopEventStreamProjection(streamId: string) {
179+
const streamInfo = activeStreams[streamId]
180+
181+
if (!streamInfo) {
182+
throw new Error(`Stream ${streamId} not found`)
183+
}
184+
185+
if (!activeDataPumps[streamId]) {
186+
throw new Error(`Data pump for stream ${streamId} not found`)
187+
}
188+
189+
streamInfo.status = "STOPPED"
190+
191+
activeDataPumps[streamId].stop()
192+
}
193+
194+
export function stopAllEventStreamProjections() {
195+
for (const streamId in activeStreams) {
196+
stopEventStreamProjection(streamId)
197+
}
198+
}
199+
166200
function getState(dateString: string) {
167201
const date = dayjs(dateString)
168202
return {

0 commit comments

Comments
 (0)