-
-
Notifications
You must be signed in to change notification settings - Fork 729
V4 dequeue performance (return faster) #1989
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Error: Could not generate a valid Mermaid diagram after multiple attempts. ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
internal-packages/run-engine/src/run-queue/index.ts (1)
395-436
: Single-pass dequeue may under-deliver messagesThe refactor iterates over each queue exactly once. If
maxCount
is 10 but there are 5 envs × 1 message ready per queue, the loop will collect only 5 and return early, forcing another round-trip to Redis on the next API call.Unless that latency is desirable, consider repeating the traversal until either
a)messages.length === maxCount
or
b) no additional messages were dequeued in the last pass.- for (const env of envQueues) { + let progress = true; + while (progress && messages.length < maxCount) { + progress = false; + for (const env of envQueues) { ... if (message) { messages.push(message); + progress = true; } - // If we've reached maxCount, we don't want to look at this env anymore + if (messages.length >= maxCount) break; } }This preserves the “don’t spin forever” goal while still maximising throughput.
references/hello-world/src/trigger/deadlocks.ts (1)
27-29
: Queue saturation risk
deadlockNestedTask
is pinned todeadlockQueue
(concurrency = 1). If the mutual-wait loop above is not addressed, the single slot becomes blocked indefinitely and every subsequent trigger will queue-pile, effectively acting as a DoS on that queue.Consider:
- Raising the concurrency, or
- Enabling
releaseConcurrencyOnWaitpoint: true
, or- Removing the circular wait.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
apps/webapp/app/env.server.ts
(0 hunks)apps/webapp/app/v3/runEngine.server.ts
(0 hunks)internal-packages/run-engine/src/engine/index.ts
(0 hunks)internal-packages/run-engine/src/engine/types.ts
(0 hunks)internal-packages/run-engine/src/run-queue/index.ts
(2 hunks)references/hello-world/src/trigger/deadlocks.ts
(1 hunks)
💤 Files with no reviewable changes (4)
- internal-packages/run-engine/src/engine/types.ts
- apps/webapp/app/v3/runEngine.server.ts
- apps/webapp/app/env.server.ts
- internal-packages/run-engine/src/engine/index.ts
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
internal-packages/run-engine/src/run-queue/index.ts (1)
33-34
:✅ Verification successful
Confirm
tryCatch
contract & import path
tryCatch
is destructured as[error, message]
, implying it returns a tuple in the form[Error | undefined, T | undefined]
.
- Please double-check that the helper exported from
@trigger.dev/core
actually follows that signature and isawait
-able – some utility wrappers are synchronous. A mismatch will break the runtime destructuring.- If the return type is a discriminated union (
{ error?; data? }
) or throws, this new code will silently mis-handle errors.Run the snippet below to grep for the helper definition and verify its return type:
🏁 Script executed:
#!/bin/bash ast-grep --pattern $'export function tryCatch($$$) {' | head -n 20Length of output: 67
🏁 Script executed:
#!/bin/bash set -e # Search for all occurrences of tryCatch to locate its definition and signature rg "tryCatch" -n -C3 | head -n 50Length of output: 3950
🏁 Script executed:
#!/bin/bash set -e # Find where tryCatch is defined and inspect its signature (including async flag) rg -E "export (async )?(function|const) tryCatch" -n . | head -n 20Length of output: 157
🏁 Script executed:
#!/bin/bash # Search for tryCatch definition in the monorepo (excluding node_modules) rg --ignore-dir node_modules "export.*tryCatch" -n . || true rg --ignore-dir node_modules "const tryCatch" -n . || true rg --ignore-dir node_modules "function tryCatch" -n . || trueLength of output: 1513
🏁 Script executed:
#!/bin/bash set -e # List top-level directories and packages to locate the core package echo "Root directory contents:" ls -1 echo echo "packages directory contents:" ls -1 packagesLength of output: 561
🏁 Script executed:
#!/bin/bash set -e echo "Searching for tryCatch in the core package source:" rg "tryCatch" -n packages/core/src | head -n 50Length of output: 3281
🏁 Script executed:
#!/bin/bash set -e echo "=== tryCatch implementation (packages/core/src/v3/tryCatch.ts) ===" sed -n '1,200p' packages/core/src/v3/tryCatch.ts || true echo echo "=== Utils re-export (packages/core/src/utils.ts) ===" sed -n '1,200p' packages/core/src/utils.ts || true echo echo "=== Result type definitions in core ===" rg "type Result" -n packages/core/src || trueLength of output: 1487
🏁 Script executed:
#!/bin/bash set -e echo "packages/core/src contents:" ls -1 packages/core/src || true echo echo "packages/core/src/index.ts (if exists):" if [ -f packages/core/src/index.ts ]; then sed -n '1,200p' packages/core/src/index.ts else echo "No index.ts in packages/core/src" fi echo echo "package.json fields in packages/core:" grep -E '"(main|exports|module|types)"' -n packages/core/package.json || true echo echo "First 200 lines of packages/core/package.json:" sed -n '1,200p' packages/core/package.jsonLength of output: 11030
Confirmed
tryCatch
is async and returns the expected tuple – no changes needed
The helper in@trigger.dev/core/src/utils.ts
(and re-exported insrc/index.ts
) is declared asexport async function tryCatch<T, E = Error>( promise: Promise<T> | undefined ): Promise<[null, T] | [E, null]> { … }– it is
await
-able, always returns a two-element tuple, and never throws. Importing it directly from@trigger.dev/core
is valid. No updates required.
const [error, message] = await tryCatch( | ||
this.#callDequeueMessage({ | ||
messageQueue: queue, | ||
}) | ||
); | ||
|
||
if (error) { | ||
this.logger.error( | ||
`[dequeueMessageInSharedQueue][${this.name}] Failed to dequeue from queue ${queue}`, | ||
{ | ||
error, | ||
} | ||
); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Errors are swallowed – consider surfacing them to callers
tryCatch
logs the error but then execution continues as if nothing happened.
In production this can create “black-hole” failures where a queue is permanently unhealthy yet no call-site notices. At minimum:
if (error) {
this.logger.error(
`[dequeueMessageInSharedQueue][${this.name}] Failed to dequeue from queue ${queue}`,
{
error,
}
);
+ // Bubble a warning to the tracing span so alerts can fire
+ span.recordException(error as Error);
}
Better: propagate a metric / circuit-breaker so repeated failures trigger recovery logic.
Committable suggestion skipped: line range outside the PR's diff.
await deadlockNestedTask.triggerAndWait({ | ||
message: "Hello, world!", | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Infinite mutual triggering – system can still dead-lock
deadlockTester
now triggers deadlockNestedTask
once (instead of twice) but nothing prevents the callee from immediately triggering the caller again (see lines 27-29). Without a depth or retry guard this still creates an unbounded recursion chain:
Tester → Nested → Tester → Nested …
Add a counter or a unique correlation id to detect and stop after n hops, or use trigger
(fire-and-forget) instead of triggerAndWait
if synchronous wait isn’t required.
When dequeuing in v4 we now only look at each tenant and queue a maximum of one time, returning as soon as we have enough messages.
This prevents a situation where we have stale queues and keep considering them again and again. We want to return ASAP so we can get fresh queues again. Instead focusing on having multiple separate simultaneous calls to the overall dequeuing.
Summary by CodeRabbit