Skip to content

Commit 5d41039

Browse files
authored
Contribute OTLP UDP Exporter (#329)
*Description of changes:* Contributing our OTLP UDP exporter. **Test plan:** Built wheel file and pip installed to an isolated python `venv` using a modified version of [this sample app](https://github.com/aws-observability/aws-otel-community/tree/master/sample-apps/python-manual-instrumentation-sample-app) which uses the udp exporter: <img width="1512" alt="Screenshot 2025-03-03 at 10 22 16 AM" src="https://github.com/user-attachments/assets/6d610cce-b817-46aa-80ed-cf15e3530621" /> Ran unit tests: <img width="1512" alt="Screenshot 2025-03-03 at 10 22 16 AM" src="https://github.com/user-attachments/assets/7debe0c0-312b-4499-a67e-e73a8d9756bd" /> By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 67ef4d3 commit 5d41039

File tree

5 files changed

+274
-0
lines changed

5 files changed

+274
-0
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
AWS OpenTelemetry OTLP UDP Exporter
2+
===================================
3+
4+
Installation
5+
------------
6+
7+
::
8+
9+
pip install aws-otel-otlp-udp-exporter
10+
11+
12+
This package provides a UDP exporter for OpenTelemetry.
13+
14+
References
15+
----------
16+
17+
* `OpenTelemetry Project <https://opentelemetry.io/>`_
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
[build-system]
2+
requires = ["hatchling"]
3+
build-backend = "hatchling.build"
4+
5+
[project]
6+
name = "aws-otel-otlp-udp-exporter"
7+
version = "0.1.0"
8+
description = "OTLP UDP Exporter for OpenTelemetry"
9+
readme = "README.rst"
10+
license = "Apache-2.0"
11+
requires-python = ">=3.8"
12+
authors = [
13+
{ name = "Amazon Web Services" }
14+
]
15+
classifiers = [
16+
"Development Status :: 4 - Beta",
17+
"Intended Audience :: Developers",
18+
"License :: OSI Approved :: Apache Software License",
19+
"Programming Language :: Python",
20+
"Programming Language :: Python :: 3",
21+
"Programming Language :: Python :: 3.8",
22+
"Programming Language :: Python :: 3.9",
23+
"Programming Language :: Python :: 3.10",
24+
"Programming Language :: Python :: 3.11",
25+
]
26+
27+
dependencies = [
28+
"opentelemetry-sdk == 1.27.0",
29+
"opentelemetry-exporter-otlp-proto-common == 1.27.0",
30+
]
31+
32+
[project.urls]
33+
Homepage = "https://github.com/aws-observability/aws-otel-python-instrumentation/tree/main/exporters/aws-otel-otlp-udp-exporter"
34+
35+
[tool.hatch.build.targets.wheel]
36+
packages = ["src/amazon"]
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from .exporter import (
5+
DEFAULT_ENDPOINT,
6+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX,
7+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX,
8+
PROTOCOL_HEADER,
9+
OTLPUdpSpanExporter,
10+
UdpExporter,
11+
)
12+
13+
__all__ = [
14+
"UdpExporter",
15+
"OTLPUdpSpanExporter",
16+
"DEFAULT_ENDPOINT",
17+
"FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX",
18+
"FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX",
19+
"PROTOCOL_HEADER",
20+
]
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import base64
4+
import os
5+
import socket
6+
from logging import Logger, getLogger
7+
from typing import Optional, Sequence, Tuple
8+
9+
from typing_extensions import override
10+
11+
from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans
12+
from opentelemetry.sdk.trace import ReadableSpan
13+
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
14+
15+
DEFAULT_ENDPOINT = "127.0.0.1:2000"
16+
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
17+
18+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S"
19+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U"
20+
21+
_logger: Logger = getLogger(__name__)
22+
23+
24+
class UdpExporter:
25+
def __init__(self, endpoint: Optional[str] = None):
26+
if endpoint is None and "AWS_LAMBDA_FUNCTION_NAME" in os.environ:
27+
# If in an AWS Lambda Environment, `AWS_XRAY_DAEMON_ADDRESS` will be defined
28+
endpoint = os.environ.get("AWS_XRAY_DAEMON_ADDRESS", DEFAULT_ENDPOINT)
29+
30+
self._endpoint = endpoint or DEFAULT_ENDPOINT
31+
self._host, self._port = self._parse_endpoint(self._endpoint)
32+
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
33+
self._socket.setblocking(False)
34+
35+
def send_data(self, data: bytes, signal_format_prefix: str):
36+
# base64 encoding and then converting to string with utf-8
37+
base64_encoded_string: str = base64.b64encode(data).decode("utf-8")
38+
message = f"{PROTOCOL_HEADER}{signal_format_prefix}{base64_encoded_string}"
39+
40+
try:
41+
_logger.debug("Sending UDP data: %s", message)
42+
self._socket.sendto(message.encode("utf-8"), (self._host, int(self._port)))
43+
except Exception as exc: # pylint: disable=broad-except
44+
_logger.error("Error sending UDP data: %s", exc)
45+
raise
46+
47+
def shutdown(self):
48+
self._socket.close()
49+
50+
# pylint: disable=no-self-use
51+
def _parse_endpoint(self, endpoint: str) -> Tuple[str, int]:
52+
try:
53+
vals = endpoint.split(":")
54+
host = vals[0]
55+
port = int(vals[1])
56+
except Exception as exc: # pylint: disable=broad-except
57+
raise ValueError(f"Invalid endpoint: {endpoint}") from exc
58+
59+
return host, port
60+
61+
62+
class OTLPUdpSpanExporter(SpanExporter):
63+
def __init__(self, endpoint: Optional[str] = None, sampled: bool = True):
64+
self._udp_exporter = UdpExporter(endpoint=endpoint)
65+
self._sampled = sampled
66+
67+
@override
68+
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
69+
serialized_data = encode_spans(spans).SerializeToString()
70+
71+
try:
72+
prefix = (
73+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
74+
if self._sampled
75+
else FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
76+
)
77+
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=prefix)
78+
return SpanExportResult.SUCCESS
79+
except Exception as exc: # pylint: disable=broad-except
80+
_logger.error("Error exporting spans: %s", exc)
81+
return SpanExportResult.FAILURE
82+
83+
# pylint: disable=no-self-use
84+
@override
85+
def force_flush(self, timeout_millis: int = 30000) -> bool:
86+
# TODO: implement force flush
87+
return True
88+
89+
@override
90+
def shutdown(self) -> None:
91+
self._udp_exporter.shutdown()
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import base64
4+
import socket
5+
import unittest
6+
from unittest import TestCase
7+
from unittest.mock import MagicMock, patch
8+
9+
from amazon.opentelemetry.exporters.otlp.udp import (
10+
DEFAULT_ENDPOINT,
11+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX,
12+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX,
13+
PROTOCOL_HEADER,
14+
OTLPUdpSpanExporter,
15+
UdpExporter,
16+
)
17+
from opentelemetry.sdk.trace.export import SpanExportResult
18+
19+
20+
class TestUdpExporter(TestCase):
21+
22+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket")
23+
def test_udp_exporter_init_default(self, mock_socket):
24+
exporter = UdpExporter()
25+
self.assertEqual(exporter._endpoint, DEFAULT_ENDPOINT)
26+
self.assertEqual(exporter._host, "127.0.0.1")
27+
self.assertEqual(exporter._port, 2000)
28+
mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM)
29+
mock_socket().setblocking.assert_called_once_with(False)
30+
31+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket")
32+
def test_udp_exporter_init_with_endpoint(self, mock_socket):
33+
exporter = UdpExporter(endpoint="localhost:5000")
34+
self.assertNotEqual(exporter._endpoint, DEFAULT_ENDPOINT)
35+
self.assertEqual(exporter._host, "localhost")
36+
self.assertEqual(exporter._port, 5000)
37+
mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM)
38+
mock_socket().setblocking.assert_called_once_with(False)
39+
40+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket")
41+
def test_udp_exporter_init_invalid_endpoint(self, mock_socket):
42+
with self.assertRaises(ValueError):
43+
UdpExporter(endpoint="invalidEndpoint:port")
44+
45+
# pylint: disable=no-self-use
46+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket")
47+
def test_send_data(self, mock_socket):
48+
mock_socket_instance = mock_socket.return_value
49+
exporter = UdpExporter()
50+
input_bytes: bytes = b"hello"
51+
encoded_bytes: bytes = base64.b64encode(input_bytes)
52+
exporter.send_data(input_bytes, "signal_prefix")
53+
expected_message = PROTOCOL_HEADER + "signal_prefix" + encoded_bytes.decode("utf-8")
54+
mock_socket_instance.sendto.assert_called_once_with(expected_message.encode("utf-8"), ("127.0.0.1", 2000))
55+
56+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket")
57+
def test_shutdown(self, mock_socket):
58+
mock_socket_instance = mock_socket.return_value
59+
exporter = UdpExporter()
60+
exporter.shutdown()
61+
mock_socket_instance.close.assert_called_once()
62+
63+
64+
class TestOTLPUdpSpanExporter(unittest.TestCase):
65+
66+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_spans")
67+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
68+
def test_export_unsampled_span(self, mock_udp_exporter, mock_encode_spans):
69+
mock_udp_exporter_instance = mock_udp_exporter.return_value
70+
mock_encoded_data = MagicMock()
71+
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
72+
exporter = OTLPUdpSpanExporter(sampled=False)
73+
result = exporter.export(MagicMock())
74+
mock_udp_exporter_instance.send_data.assert_called_once_with(
75+
data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
76+
)
77+
self.assertEqual(result, SpanExportResult.SUCCESS)
78+
79+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_spans")
80+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
81+
def test_export_sampled_span(self, mock_udp_exporter, mock_encode_spans):
82+
mock_udp_exporter_instance = mock_udp_exporter.return_value
83+
mock_encoded_data = MagicMock()
84+
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
85+
exporter = OTLPUdpSpanExporter()
86+
result = exporter.export(MagicMock())
87+
mock_udp_exporter_instance.send_data.assert_called_once_with(
88+
data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
89+
)
90+
self.assertEqual(result, SpanExportResult.SUCCESS)
91+
92+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_spans")
93+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
94+
def test_export_with_exception(self, mock_udp_exporter, mock_encode_spans):
95+
mock_udp_exporter_instance = mock_udp_exporter.return_value
96+
mock_encoded_data = MagicMock()
97+
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
98+
mock_udp_exporter_instance.send_data.side_effect = Exception("Something went wrong")
99+
exporter = OTLPUdpSpanExporter()
100+
result = exporter.export(MagicMock())
101+
self.assertEqual(result, SpanExportResult.FAILURE)
102+
103+
# pylint: disable=no-self-use
104+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
105+
def test_shutdown(self, mock_udp_exporter):
106+
mock_udp_exporter_instance = mock_udp_exporter.return_value
107+
exporter = OTLPUdpSpanExporter()
108+
exporter.shutdown()
109+
exporter.force_flush()
110+
mock_udp_exporter_instance.shutdown.assert_called_once()

0 commit comments

Comments
 (0)