Skip to content

Commit 91b2e51

Browse files
committed
Add health_check_timeout for detecting hung processes.
1 parent 21099b5 commit 91b2e51

File tree

7 files changed

+142
-60
lines changed

7 files changed

+142
-60
lines changed

async-container.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ Gem::Specification.new do |spec|
2424

2525
spec.required_ruby_version = ">= 3.1"
2626

27-
spec.add_dependency "async", "~> 2.10"
27+
spec.add_dependency "async", "~> 2.22"
2828
end

examples/health_check/test.rb

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#!/usr/bin/env ruby
2+
# frozen_string_literal: true
3+
4+
# Released under the MIT License.
5+
# Copyright, 2022, by Anton Sozontov.
6+
# Copyright, 2024, by Samuel Williams.
7+
8+
require "../../lib/async/container/controller"
9+
10+
NAMES = [
11+
"Cupcake", "Donut", "Eclair", "Froyo", "Gingerbread", "Honeycomb", "Ice Cream Sandwich", "Jelly Bean", "KitKat", "Lollipop", "Marshmallow", "Nougat", "Oreo", "Pie", "Apple Tart"
12+
]
13+
14+
class Controller < Async::Container::Controller
15+
def setup(container)
16+
container.run(count: 10, restart: true, health_check_timeout: 1) do |instance|
17+
if container.statistics.failed?
18+
Console.debug(self, "Child process restarted #{container.statistics.restarts} times.")
19+
else
20+
Console.debug(self, "Child process started.")
21+
end
22+
23+
instance.name = NAMES.sample
24+
25+
instance.ready!
26+
27+
while true
28+
# Must update status more frequently than health check timeout...
29+
sleep(rand*1.2)
30+
31+
instance.ready!
32+
end
33+
end
34+
end
35+
end
36+
37+
controller = Controller.new # (container_class: Async::Container::Threaded)
38+
39+
controller.run

examples/test.rb

Lines changed: 0 additions & 54 deletions
This file was deleted.

lib/async/container/forked.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,24 @@ def initialize(name: nil)
116116
self.close_write
117117
end
118118

119+
# Convert the child process to a hash, suitable for serialization.
120+
#
121+
# @returns [Hash] The request as a hash.
122+
def as_json(...)
123+
{
124+
name: @name,
125+
pid: @pid,
126+
status: @status&.to_i,
127+
}
128+
end
129+
130+
# Convert the request to JSON.
131+
#
132+
# @returns [String] The request as JSON.
133+
def to_json(...)
134+
as_json.to_json(...)
135+
end
136+
119137
# Set the name of the process.
120138
# Invokes {::Process.setproctitle} if invoked in the child process.
121139
def name= value

lib/async/container/generic.rb

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# Copyright, 2019-2024, by Samuel Williams.
55

66
require "etc"
7+
require "async/clock"
78

89
require_relative "group"
910
require_relative "keyed"
@@ -141,7 +142,8 @@ def stop(timeout = true)
141142
# @parameter name [String] The name of the child instance.
142143
# @parameter restart [Boolean] Whether to restart the child instance if it fails.
143144
# @parameter key [Symbol] A key used for reloading child instances.
144-
def spawn(name: nil, restart: false, key: nil, &block)
145+
# @parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy.
146+
def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, &block)
145147
name ||= UNNAMED
146148

147149
if mark?(key)
@@ -157,9 +159,24 @@ def spawn(name: nil, restart: false, key: nil, &block)
157159

158160
state = insert(key, child)
159161

162+
# If a health check is specified, we will monitor the child process and terminate it if it does not update its state within the specified time.
163+
if health_check_timeout
164+
age_clock = state[:age] = Clock.start
165+
end
166+
160167
begin
161168
status = @group.wait_for(child) do |message|
162-
state.update(message)
169+
case message
170+
when :health_check!
171+
if health_check_timeout&.<(age_clock.total)
172+
Console.warn(self, "Child failed health check!", child: child, age: age_clock.total, health_check_timeout: health_check_timeout)
173+
# If the child has failed the health check, we assume the worst and terminate it (SIGTERM).
174+
child.terminate!
175+
end
176+
else
177+
state.update(message)
178+
age_clock&.reset!
179+
end
163180
end
164181
ensure
165182
delete(key, child)

lib/async/container/group.rb

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ module Container
1313
# Manages a group of running processes.
1414
class Group
1515
# Initialize an empty group.
16-
def initialize
16+
#
17+
# @parameter health_check_interval [Numeric | Nil] The (biggest) interval at which health checks are performed.
18+
def initialize(health_check_interval: 1.0)
19+
@health_check_interval = health_check_interval
20+
21+
# The running fibers, indexed by IO:
1722
@running = {}
1823

1924
# This queue allows us to wait for processes to complete, without spawning new processes as a result.
@@ -57,8 +62,36 @@ def sleep(duration)
5762
def wait
5863
self.resume
5964

60-
while self.running?
61-
self.wait_for_children
65+
with_health_checks do |duration|
66+
self.wait_for_children(duration)
67+
end
68+
end
69+
70+
private def with_health_checks
71+
if @health_check_interval
72+
health_check_clock = Clock.start
73+
74+
while self.running?
75+
duration = [@health_check_interval - health_check_clock.total, 0].max
76+
77+
yield duration
78+
79+
if health_check_clock.total > @health_check_interval
80+
self.health_check!
81+
health_check_clock.reset!
82+
end
83+
end
84+
else
85+
while self.running?
86+
yield nil
87+
end
88+
end
89+
end
90+
91+
# Perform a health check on all running processes.
92+
def health_check!
93+
@running.each_value do |fiber|
94+
fiber.resume(:health_check!)
6295
end
6396
end
6497

@@ -119,15 +152,19 @@ def wait_for(channel)
119152
@running[io] = Fiber.current
120153

121154
while @running.key?(io)
155+
# Wait for some event on the channel:
122156
result = Fiber.yield
123157

124158
if result == Interrupt
125159
channel.interrupt!
126160
elsif result == Terminate
127161
channel.terminate!
162+
elsif result
163+
yield result
128164
elsif message = channel.receive
129165
yield message
130166
else
167+
# Wait for the channel to exit:
131168
return channel.wait
132169
end
133170
end

lib/async/container/threaded.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,23 @@ def initialize(name: nil)
125125
end
126126
end
127127

128+
# Convert the child process to a hash, suitable for serialization.
129+
#
130+
# @returns [Hash] The request as a hash.
131+
def as_json(...)
132+
{
133+
name: @thread.name,
134+
status: @status&.as_json,
135+
}
136+
end
137+
138+
# Convert the request to JSON.
139+
#
140+
# @returns [String] The request as JSON.
141+
def to_json(...)
142+
as_json.to_json(...)
143+
end
144+
128145
# Set the name of the thread.
129146
# @parameter value [String] The name to set.
130147
def name= value
@@ -191,6 +208,14 @@ def success?
191208
@error.nil?
192209
end
193210

211+
def as_json(...)
212+
if @error
213+
@error.inspect
214+
else
215+
true
216+
end
217+
end
218+
194219
# A human readable representation of the status.
195220
def to_s
196221
"\#<#{self.class} #{success? ? "success" : "failure"}>"

0 commit comments

Comments
 (0)