-
-
Notifications
You must be signed in to change notification settings - Fork 729
Multiple queue consumers per supervisor #1947
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
WalkthroughThe changes introduce support for configuring multiple queue consumers in the supervisor session. A new environment variable, Changes
Sequence Diagram(s)sequenceDiagram
participant Env
participant ManagedSupervisor
participant SupervisorSession
participant RunQueueConsumer
Env->>ManagedSupervisor: Provide TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT
ManagedSupervisor->>SupervisorSession: Pass maxConsumerCount option
loop maxConsumerCount times
SupervisorSession->>RunQueueConsumer: Instantiate consumer
end
SupervisorSession->>RunQueueConsumer: start() all consumers concurrently
SupervisorSession->>RunQueueConsumer: stop() all consumers concurrently
Poem
Tip ⚡💬 Agentic Chat (Pro Plan, General Availability)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
packages/core/src/v3/runEngineWorker/supervisor/session.ts (2)
187-187
: Consider adding error handling for consumer start failures.Using
Promise.allSettled
is a good choice to ensure all consumers attempt to start. Consider adding some logging if any individual consumer fails to start.- await Promise.allSettled(this.queueConsumers.map(async (q) => q.start())); + const startResults = await Promise.allSettled(this.queueConsumers.map(async (q, index) => q.start())); + startResults.forEach((result, index) => { + if (result.status === 'rejected') { + console.error(`[SupervisorSession] Failed to start queue consumer ${index}`, { error: result.reason }); + } + });
202-202
: Consider adding error handling for consumer stop failures.Similarly to the start method, consider adding error handling for consumers that fail to stop properly.
- await Promise.allSettled(this.queueConsumers.map(async (q) => q.stop())); + const stopResults = await Promise.allSettled(this.queueConsumers.map(async (q, index) => q.stop())); + stopResults.forEach((result, index) => { + if (result.status === 'rejected') { + console.error(`[SupervisorSession] Failed to stop queue consumer ${index}`, { error: result.reason }); + } + });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
apps/supervisor/src/env.ts
(1 hunks)apps/supervisor/src/index.ts
(1 hunks)packages/core/src/v3/runEngineWorker/supervisor/session.ts
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (5)
apps/supervisor/src/env.ts (1)
36-36
: LGTM: Environment variable definition is well-structured.The new environment variable
TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT
is properly defined with appropriate type coercion to integer and a sensible default value of 1, which maintains backward compatibility.apps/supervisor/src/index.ts (1)
121-121
: LGTM: Parameter correctly passed to SupervisorSession.The environment variable is appropriately passed to the SupervisorSession constructor, enabling dynamic configuration of the number of queue consumers.
packages/core/src/v3/runEngineWorker/supervisor/session.ts (3)
21-21
: LGTM: Well-defined option type extension.The
SupervisorSessionOptions
type is correctly extended with the optionalmaxConsumerCount
parameter.
31-31
: LGTM: Appropriate type change for multiple consumers.Good change from a single consumer instance to an array of consumers, matching the new functionality.
43-52
: LGTM: Clean implementation of multiple consumers initialization.The constructor efficiently creates the specified number of queue consumers using
Array.from
, with a sensible default of 1 if no count is specified.
Consumer count can now be tweaked via env vars in addition to replica count.
Summary by CodeRabbit