Skip to content

Improve ray tests #3846

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 92 additions & 75 deletions tests/integrations/ray/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,27 @@ def setup_sentry(transport=None):
)


def read_error_from_log(job_id):
log_dir = "/tmp/ray/session_latest/logs/"
log_file = [
f
for f in os.listdir(log_dir)
if "worker" in f and job_id in f and f.endswith(".out")
][0]
with open(os.path.join(log_dir, log_file), "r") as file:
lines = file.readlines()

try:
# parse error object from log line
error = json.loads(lines[4][:-1])
except IndexError:
error = None

return error


@pytest.mark.forked
def test_ray_tracing():
def test_tracing_in_ray_tasks():
setup_sentry()

ray.init(
Expand All @@ -50,6 +69,7 @@ def test_ray_tracing():
}
)

# Setup ray task
@ray.remote
def example_task():
with sentry_sdk.start_span(op="task", name="example task step"):
Expand All @@ -62,63 +82,42 @@ def example_task():

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()
assert client_transaction["transaction"] == "ray test transaction"
assert client_transaction["transaction_info"] == {"source": "custom"}

worker_envelope = worker_envelopes[0]
worker_transaction = worker_envelope.get_transaction_event()

assert (
client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
worker_transaction["transaction"]
== "tests.integrations.ray.test_ray.test_tracing_in_ray_tasks.<locals>.example_task"
)
assert worker_transaction["transaction_info"] == {"source": "task"}

for span in client_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)

for span in worker_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)


@pytest.mark.forked
def test_ray_spans():
setup_sentry()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry,
"working_dir": "./",
}
(span,) = client_transaction["spans"]
assert span["op"] == "queue.submit.ray"
assert span["origin"] == "auto.queue.ray"
assert (
span["description"]
== "tests.integrations.ray.test_ray.test_tracing_in_ray_tasks.<locals>.example_task"
)
assert span["parent_span_id"] == client_transaction["contexts"]["trace"]["span_id"]
assert span["trace_id"] == client_transaction["contexts"]["trace"]["trace_id"]

@ray.remote
def example_task():
return sentry_sdk.get_client().transport.envelopes
(span,) = worker_transaction["spans"]
assert span["op"] == "task"
assert span["origin"] == "manual"
assert span["description"] == "example task step"
assert span["parent_span_id"] == worker_transaction["contexts"]["trace"]["span_id"]
assert span["trace_id"] == worker_transaction["contexts"]["trace"]["trace_id"]

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
worker_envelopes = ray.get(example_task.remote())

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()
worker_envelope = worker_envelopes[0]
worker_transaction = worker_envelope.get_transaction_event()

for span in client_transaction["spans"]:
assert span["op"] == "queue.submit.ray"
assert span["origin"] == "auto.queue.ray"

for span in worker_transaction["spans"]:
assert span["op"] == "queue.task.ray"
assert span["origin"] == "auto.queue.ray"
assert (
client_transaction["contexts"]["trace"]["trace_id"]
== worker_transaction["contexts"]["trace"]["trace_id"]
)


@pytest.mark.forked
def test_ray_errors():
def test_errors_in_ray_tasks():
setup_sentry_with_logging_transport()

ray.init(
Expand All @@ -128,6 +127,7 @@ def test_ray_errors():
}
)

# Setup ray task
@ray.remote
def example_task():
1 / 0
Expand All @@ -138,30 +138,19 @@ def example_task():
ray.get(future)

job_id = future.job_id().hex()

# Read the worker log output containing the error
log_dir = "/tmp/ray/session_latest/logs/"
log_file = [
f
for f in os.listdir(log_dir)
if "worker" in f and job_id in f and f.endswith(".out")
][0]
with open(os.path.join(log_dir, log_file), "r") as file:
lines = file.readlines()
# parse error object from log line
error = json.loads(lines[4][:-1])
error = read_error_from_log(job_id)

assert error["level"] == "error"
assert (
error["transaction"]
== "tests.integrations.ray.test_ray.test_ray_errors.<locals>.example_task"
) # its in the worker, not the client thus not "ray test transaction"
== "tests.integrations.ray.test_ray.test_errors_in_ray_tasks.<locals>.example_task"
)
assert error["exception"]["values"][0]["mechanism"]["type"] == "ray"
assert not error["exception"]["values"][0]["mechanism"]["handled"]


@pytest.mark.forked
def test_ray_actor():
def test_tracing_in_ray_actors():
setup_sentry()

ray.init(
Expand All @@ -171,13 +160,14 @@ def test_ray_actor():
}
)

# Setup ray actor
@ray.remote
class Counter:
def __init__(self):
self.n = 0

def increment(self):
with sentry_sdk.start_span(op="task", name="example task step"):
with sentry_sdk.start_span(op="task", name="example actor execution"):
self.n += 1

return sentry_sdk.get_client().transport.envelopes
Expand All @@ -186,20 +176,47 @@ def increment(self):
counter = Counter.remote()
worker_envelopes = ray.get(counter.increment.remote())

# Currently no transactions/spans are captured in actors
assert worker_envelopes == []

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()

assert (
client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
# Spans for submitting the actor task are not created (actors are not supported yet)
assert client_transaction["spans"] == []

# Transaction are not yet created when executing ray actors (actors are not supported yet)
assert worker_envelopes == []


@pytest.mark.forked
def test_errors_in_ray_actors():
setup_sentry_with_logging_transport()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry_with_logging_transport,
"working_dir": "./",
}
)

for span in client_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)
# Setup ray actor
@ray.remote
class Counter:
def __init__(self):
self.n = 0

def increment(self):
with sentry_sdk.start_span(op="task", name="example actor execution"):
1 / 0

return sentry_sdk.get_client().transport.envelopes

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
with pytest.raises(ZeroDivisionError):
counter = Counter.remote()
future = counter.increment.remote()
ray.get(future)

job_id = future.job_id().hex()
error = read_error_from_log(job_id)

# We do not capture errors in ray actors yet
assert error is None
Loading