Skip to content

Capturing Initialization and Timeout errors for AWS Lambda Integration #756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 76 additions & 4 deletions sentry_sdk/integrations/aws_lambda.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timedelta
from os import environ
import sys
import json

from sentry_sdk.hub import Hub, _should_send_default_pii
from sentry_sdk._compat import reraise
Expand All @@ -9,6 +10,7 @@
capture_internal_exceptions,
event_from_exception,
logger,
TimeoutThread,
)
from sentry_sdk.integrations import Integration
from sentry_sdk.integrations._wsgi_common import _filter_headers
Expand All @@ -25,6 +27,45 @@

F = TypeVar("F", bound=Callable[..., Any])

# Constants
TIMEOUT_WARNING_BUFFER = 1500 # Buffer time required to send timeout warning to Sentry
MILLIS_TO_SECONDS = 1000.0


def _wrap_init_error(init_error):
# type: (F) -> F
def sentry_init_error(*args, **kwargs):
# type: (*Any, **Any) -> Any

hub = Hub.current
integration = hub.get_integration(AwsLambdaIntegration)
if integration is None:
return init_error(*args, **kwargs)

# Fetch Initialization error details from arguments
error = json.loads(args[1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be more defensive here. This will blow up if called with less than two positional arguments or if the arg is not valid JSON.

Can be done in a follow up.


# If an integration is there, a client has to be there.
client = hub.client # type: Any

with hub.push_scope() as scope:
with capture_internal_exceptions():
scope.clear_breadcrumbs()
# Checking if there is any error/exception which is raised in the runtime
# environment from arguments and, re-raising it to capture it as an event.
if error.get("errorType"):
exc_info = sys.exc_info()
event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={"type": "aws_lambda", "handled": False},
)
hub.capture_event(event, hint=hint)

return init_error(*args, **kwargs)

return sentry_init_error # type: ignore


def _wrap_handler(handler):
# type: (F) -> F
Expand All @@ -37,12 +78,31 @@ def sentry_handler(event, context, *args, **kwargs):

# If an integration is there, a client has to be there.
client = hub.client # type: Any
configured_time = context.get_remaining_time_in_millis()

with hub.push_scope() as scope:
with capture_internal_exceptions():
scope.clear_breadcrumbs()
scope.transaction = context.function_name
scope.add_event_processor(_make_request_event_processor(event, context))
scope.add_event_processor(
_make_request_event_processor(event, context, configured_time)
)
# Starting the Timeout thread only if the configured time is greater than Timeout warning
# buffer and timeout_warning parameter is set True.
if (
integration.timeout_warning
and configured_time > TIMEOUT_WARNING_BUFFER
):
waiting_time = (
configured_time - TIMEOUT_WARNING_BUFFER
) / MILLIS_TO_SECONDS

timeout_thread = TimeoutThread(
waiting_time, configured_time / MILLIS_TO_SECONDS
)

# Starting the thread to raise timeout warning exception
timeout_thread.start()

try:
return handler(event, context, *args, **kwargs)
Expand Down Expand Up @@ -73,6 +133,10 @@ def _drain_queue():
class AwsLambdaIntegration(Integration):
identifier = "aws_lambda"

def __init__(self, timeout_warning=False):
# type: (bool) -> None
self.timeout_warning = timeout_warning

@staticmethod
def setup_once():
# type: () -> None
Expand Down Expand Up @@ -126,6 +190,10 @@ def sentry_to_json(*args, **kwargs):

lambda_bootstrap.to_json = sentry_to_json
else:
lambda_bootstrap.LambdaRuntimeClient.post_init_error = _wrap_init_error(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this will be executed for Python 3.7 only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since for python environments 2.7 & 3.6 the runtime already handles these issues. So, I need not handle for those scenarios. In those cases runtime generates FaultException for initialization error.

lambda_bootstrap.LambdaRuntimeClient.post_init_error
)

old_handle_event_request = lambda_bootstrap.handle_event_request

def sentry_handle_event_request( # type: ignore
Expand Down Expand Up @@ -158,19 +226,23 @@ def inner(*args, **kwargs):
)


def _make_request_event_processor(aws_event, aws_context):
# type: (Any, Any) -> EventProcessor
def _make_request_event_processor(aws_event, aws_context, configured_timeout):
# type: (Any, Any, Any) -> EventProcessor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type here is well-known, float.

start_time = datetime.now()

def event_processor(event, hint, start_time=start_time):
# type: (Event, Hint, datetime) -> Optional[Event]
remaining_time_in_milis = aws_context.get_remaining_time_in_millis()
exec_duration = configured_timeout - remaining_time_in_milis

extra = event.setdefault("extra", {})
extra["lambda"] = {
"function_name": aws_context.function_name,
"function_version": aws_context.function_version,
"invoked_function_arn": aws_context.invoked_function_arn,
"remaining_time_in_millis": aws_context.get_remaining_time_in_millis(),
"aws_request_id": aws_context.aws_request_id,
"execution_duration_in_millis": exec_duration,
"remaining_time_in_millis": remaining_time_in_milis,
}

extra["cloudwatch logs"] = {
Expand Down
38 changes: 38 additions & 0 deletions sentry_sdk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import os
import sys
import time
import threading

from datetime import datetime

Expand Down Expand Up @@ -871,3 +873,39 @@ def transaction_from_function(func):


disable_capture_event = ContextVar("disable_capture_event")


class ServerlessTimeoutWarning(Exception):
"""Raised when a serverless method is about to reach its timeout."""

pass


class TimeoutThread(threading.Thread):
"""Creates a Thread which runs (sleeps) for a time duration equal to
waiting_time and raises a custom ServerlessTimeout exception.
"""

def __init__(self, waiting_time, configured_timeout):
# type: (float, int) -> None
threading.Thread.__init__(self)
self.waiting_time = waiting_time
self.configured_timeout = configured_timeout

def run(self):
# type: () -> None

time.sleep(self.waiting_time)

integer_configured_timeout = int(self.configured_timeout)

# Setting up the exact integer value of configured time(in seconds)
if integer_configured_timeout < self.configured_timeout:
integer_configured_timeout = integer_configured_timeout + 1

# Raising Exception after timeout duration is reached
raise ServerlessTimeoutWarning(
"WARNING : Function is expected to get timed out. Configured timeout duration = {} seconds.".format(
integer_configured_timeout
)
)
81 changes: 76 additions & 5 deletions tests/integrations/aws_lambda/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@
import json
from sentry_sdk.transport import HttpTransport

FLUSH_EVENT = True
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rhcarvalho @untitaker defined a global constant to parameterize timeout error to capture events data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that works for me


class TestTransport(HttpTransport):
def _send_event(self, event):
# Delay event output like this to test proper shutdown
# Note that AWS Lambda truncates the log output to 4kb, so you better
# pray that your events are smaller than that or else tests start
# failing.
time.sleep(1)
if FLUSH_EVENT:
time.sleep(1)
Comment on lines +33 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard to understand for maintenance why FLUSH_EVENT toggles calling time.sleep(1).

I'd think it would control calling sentry.flush() 😕

Might be a matter of naming?

print("\\nEVENT:", json.dumps(event))

def init_sdk(**extra_init_args):
def init_sdk(timeout_warning=False, **extra_init_args):
sentry_sdk.init(
dsn="https://[email protected]/123",
transport=TestTransport,
integrations=[AwsLambdaIntegration()],
integrations=[AwsLambdaIntegration(timeout_warning=timeout_warning)],
shutdown_timeout=10,
**extra_init_args
)
Expand All @@ -60,7 +63,7 @@ def run_lambda_function(tmpdir, lambda_client, request, relay_normalize):
if request.param == "python3.8":
pytest.xfail("Python 3.8 is currently broken")

def inner(code, payload):
def inner(code, payload, syntax_check=True):
runtime = request.param
tmpdir.ensure_dir("lambda_tmp").remove()
tmp = tmpdir.ensure_dir("lambda_tmp")
Expand All @@ -70,7 +73,8 @@ def inner(code, payload):
# Check file for valid syntax first, and that the integration does not
# crash when not running in Lambda (but rather a local deployment tool
# such as chalice's)
subprocess.check_call([sys.executable, str(tmp.join("test_lambda.py"))])
if syntax_check:
subprocess.check_call([sys.executable, str(tmp.join("test_lambda.py"))])

tmp.join("setup.cfg").write("[install]\nprefix=")
subprocess.check_call([sys.executable, "setup.py", "sdist", "-d", str(tmpdir)])
Expand All @@ -88,6 +92,7 @@ def inner(code, payload):
Handler="test_lambda.test_handler",
Code={"ZipFile": tmpdir.join("ball.zip").read(mode="rb")},
Description="Created as part of testsuite for getsentry/sentry-python",
Timeout=4,
)

@request.addfinalizer
Expand Down Expand Up @@ -124,6 +129,8 @@ def test_basic(run_lambda_function):
+ dedent(
"""
init_sdk()


def test_handler(event, context):
raise Exception("something went wrong")
"""
Expand Down Expand Up @@ -237,3 +244,67 @@ def test_handler(event, context):
"query_string": {"bonkers": "true"},
"url": "https://iwsz2c7uwi.execute-api.us-east-1.amazonaws.com/asd",
}


def test_init_error(run_lambda_function):
events, response = run_lambda_function(
LAMBDA_PRELUDE
+ dedent(
"""
init_sdk()
func()

def test_handler(event, context):
return 0
"""
),
b'{"foo": "bar"}',
syntax_check=False,
)

log_result = (base64.b64decode(response["LogResult"])).decode("utf-8")
expected_text = "name 'func' is not defined"
assert expected_text in log_result


def test_timeout_error(run_lambda_function):
events, response = run_lambda_function(
LAMBDA_PRELUDE
+ dedent(
"""
init_sdk(timeout_warning=True)
FLUSH_EVENT=False


def test_handler(event, context):
time.sleep(10)
return 0
"""
),
b'{"foo": "bar"}',
)

(event,) = events
assert event["level"] == "error"
(exception,) = event["exception"]["values"]
assert exception["type"] == "ServerlessTimeoutWarning"
assert (
exception["value"]
== "WARNING : Function is expected to get timed out. Configured timeout duration = 4 seconds."
)

assert exception["mechanism"] == {"type": "threading", "handled": False}

assert event["extra"]["lambda"]["function_name"].startswith("test_function_")

logs_url = event["extra"]["cloudwatch logs"]["url"]
assert logs_url.startswith("https://console.aws.amazon.com/cloudwatch/home?region=")
assert not re.search("(=;|=$)", logs_url)
assert event["extra"]["cloudwatch logs"]["log_group"].startswith(
"/aws/lambda/test_function_"
)

log_stream_re = "^[0-9]{4}/[0-9]{2}/[0-9]{2}/\\[[^\\]]+][a-f0-9]+$"
log_stream = event["extra"]["cloudwatch logs"]["log_stream"]

assert re.match(log_stream_re, log_stream)