Skip to content

Commit a3b4d8e

Browse files
committed
contribute otlp udp exporter
1 parent 1fae89f commit a3b4d8e

File tree

10 files changed

+359
-0
lines changed

10 files changed

+359
-0
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
AWS OpenTelemetry OTLP UDP Exporter
2+
===================================
3+
4+
Installation
5+
------------
6+
7+
::
8+
9+
pip install aws-otel-otlp-udp-exporter
10+
11+
12+
This package provides a UDP exporter for OpenTelemetry.
13+
14+
References
15+
----------
16+
17+
* `OpenTelemetry Project <https://opentelemetry.io/>`_
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
[build-system]
2+
requires = ["hatchling"]
3+
build-backend = "hatchling.build"
4+
5+
[project]
6+
name = "aws-otel-otlp-udp-exporter"
7+
version = "0.1.0"
8+
description = "OTLP UDP Exporter for OpenTelemetry"
9+
readme = "README.rst"
10+
license = "Apache-2.0"
11+
requires-python = ">=3.8"
12+
authors = [
13+
{ name = "Amazon Web Services" }
14+
]
15+
classifiers = [
16+
"Development Status :: 4 - Beta",
17+
"Intended Audience :: Developers",
18+
"License :: OSI Approved :: Apache Software License",
19+
"Programming Language :: Python",
20+
"Programming Language :: Python :: 3",
21+
"Programming Language :: Python :: 3.8",
22+
"Programming Language :: Python :: 3.9",
23+
"Programming Language :: Python :: 3.10",
24+
"Programming Language :: Python :: 3.11",
25+
]
26+
27+
dependencies = [
28+
"opentelemetry-sdk == 1.27.0",
29+
"opentelemetry-exporter-otlp-proto-grpc == 1.27.0",
30+
]
31+
32+
[project.urls]
33+
Homepage = "https://github.com/aws-observability/aws-otel-python-instrumentation/tree/main/exporters"
34+
35+
[tool.hatch.build.targets.wheel]
36+
packages = ["src/amazon"]

exporters/aws-otel-otlp-udp-exporter/src/__init__.py

Whitespace-only changes.

exporters/aws-otel-otlp-udp-exporter/src/amazon/__init__.py

Whitespace-only changes.

exporters/aws-otel-otlp-udp-exporter/src/amazon/opentelemetry/__init__.py

Whitespace-only changes.

exporters/aws-otel-otlp-udp-exporter/src/amazon/opentelemetry/exporters/__init__.py

Whitespace-only changes.

exporters/aws-otel-otlp-udp-exporter/src/amazon/opentelemetry/exporters/otlp/__init__.py

Whitespace-only changes.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from .exporter import (
5+
DEFAULT_ENDPOINT,
6+
FORMAT_OTEL_METRICS_BINARY_PREFIX,
7+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX,
8+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX,
9+
PROTOCOL_HEADER,
10+
OTLPUdpMetricExporter,
11+
OTLPUdpSpanExporter,
12+
UdpExporter,
13+
)
14+
15+
__all__ = [
16+
"UdpExporter",
17+
"OTLPUdpMetricExporter",
18+
"OTLPUdpSpanExporter",
19+
"DEFAULT_ENDPOINT",
20+
"FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX",
21+
"FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX",
22+
"PROTOCOL_HEADER",
23+
"FORMAT_OTEL_METRICS_BINARY_PREFIX",
24+
]
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import base64
4+
import os
5+
import socket
6+
from logging import Logger, getLogger
7+
from typing import Dict, Optional, Sequence, Tuple
8+
9+
from typing_extensions import override
10+
11+
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import encode_metrics
12+
from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans
13+
from opentelemetry.sdk.metrics._internal.aggregation import AggregationTemporality
14+
from opentelemetry.sdk.metrics._internal.export import MetricExportResult
15+
from opentelemetry.sdk.metrics._internal.point import MetricsData
16+
from opentelemetry.sdk.metrics.export import MetricExporter
17+
from opentelemetry.sdk.metrics.view import Aggregation
18+
from opentelemetry.sdk.trace import ReadableSpan
19+
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
20+
21+
DEFAULT_ENDPOINT = "127.0.0.1:2000"
22+
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
23+
FORMAT_OTEL_METRICS_BINARY_PREFIX = "M1"
24+
25+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S"
26+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U"
27+
28+
_logger: Logger = getLogger(__name__)
29+
30+
31+
class UdpExporter:
32+
def __init__(self, endpoint: Optional[str] = None):
33+
if endpoint is None and "AWS_LAMBDA_FUNCTION_NAME" in os.environ:
34+
# If in an AWS Lambda Environment, `AWS_XRAY_DAEMON_ADDRESS` will be defined
35+
endpoint = os.environ.get("AWS_XRAY_DAEMON_ADDRESS", DEFAULT_ENDPOINT)
36+
37+
self._endpoint = endpoint or DEFAULT_ENDPOINT
38+
self._host, self._port = self._parse_endpoint(self._endpoint)
39+
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
40+
self._socket.setblocking(False)
41+
42+
def send_data(self, data: bytes, signal_format_prefix: str):
43+
# base64 encoding and then converting to string with utf-8
44+
base64_encoded_string: str = base64.b64encode(data).decode("utf-8")
45+
message = f"{PROTOCOL_HEADER}{signal_format_prefix}{base64_encoded_string}"
46+
47+
try:
48+
_logger.debug("Sending UDP data: %s", message)
49+
self._socket.sendto(message.encode("utf-8"), (self._host, int(self._port)))
50+
except Exception as exc: # pylint: disable=broad-except
51+
_logger.error("Error sending UDP data: %s", exc)
52+
raise
53+
54+
def shutdown(self):
55+
self._socket.close()
56+
57+
# pylint: disable=no-self-use
58+
def _parse_endpoint(self, endpoint: str) -> Tuple[str, int]:
59+
try:
60+
vals = endpoint.split(":")
61+
host = vals[0]
62+
port = int(vals[1])
63+
except Exception as exc: # pylint: disable=broad-except
64+
raise ValueError(f"Invalid endpoint: {endpoint}") from exc
65+
66+
return host, port
67+
68+
69+
class OTLPUdpMetricExporter(MetricExporter):
70+
def __init__(
71+
self,
72+
endpoint: Optional[str] = None,
73+
preferred_temporality: Dict[type, AggregationTemporality] = None,
74+
preferred_aggregation: Dict[type, Aggregation] = None,
75+
):
76+
super().__init__(
77+
preferred_temporality=preferred_temporality,
78+
preferred_aggregation=preferred_aggregation,
79+
)
80+
self._udp_exporter = UdpExporter(endpoint=endpoint)
81+
82+
@override
83+
def export(
84+
self,
85+
metrics_data: MetricsData,
86+
timeout_millis: float = 10_000,
87+
**kwargs,
88+
) -> MetricExportResult:
89+
serialized_data = encode_metrics(metrics_data).SerializeToString()
90+
91+
try:
92+
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_METRICS_BINARY_PREFIX)
93+
return MetricExportResult.SUCCESS
94+
except Exception as exc: # pylint: disable=broad-except
95+
_logger.error("Error exporting metrics: %s", exc)
96+
return MetricExportResult.FAILURE
97+
98+
# pylint: disable=no-self-use
99+
def force_flush(self, timeout_millis: float = 10_000) -> bool:
100+
# TODO: implement force flush
101+
return True
102+
103+
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
104+
self._udp_exporter.shutdown()
105+
106+
107+
class OTLPUdpSpanExporter(SpanExporter):
108+
def __init__(self, endpoint: Optional[str] = None, sampled: bool = True):
109+
self._udp_exporter = UdpExporter(endpoint=endpoint)
110+
self._sampled = sampled
111+
112+
@override
113+
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
114+
serialized_data = encode_spans(spans).SerializeToString()
115+
116+
try:
117+
prefix = (
118+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
119+
if self._sampled
120+
else FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
121+
)
122+
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=prefix)
123+
return SpanExportResult.SUCCESS
124+
except Exception as exc: # pylint: disable=broad-except
125+
_logger.error("Error exporting spans: %s", exc)
126+
return SpanExportResult.FAILURE
127+
128+
# pylint: disable=no-self-use
129+
@override
130+
def force_flush(self, timeout_millis: int = 30000) -> bool:
131+
# TODO: implement force flush
132+
return True
133+
134+
@override
135+
def shutdown(self) -> None:
136+
self._udp_exporter.shutdown()
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import base64
4+
import socket
5+
import unittest
6+
from unittest import TestCase
7+
from unittest.mock import MagicMock, patch
8+
9+
from amazon.opentelemetry.exporters.otlp.udp import (
10+
DEFAULT_ENDPOINT,
11+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX,
12+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX,
13+
PROTOCOL_HEADER,
14+
OTLPUdpMetricExporter,
15+
OTLPUdpSpanExporter,
16+
UdpExporter,
17+
)
18+
from opentelemetry.sdk.metrics._internal.export import MetricExportResult
19+
from opentelemetry.sdk.trace.export import SpanExportResult
20+
21+
22+
class TestUdpExporter(TestCase):
23+
24+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket")
25+
def test_udp_exporter_init_default(self, mock_socket):
26+
exporter = UdpExporter()
27+
self.assertEqual(exporter._endpoint, DEFAULT_ENDPOINT)
28+
self.assertEqual(exporter._host, "127.0.0.1")
29+
self.assertEqual(exporter._port, 2000)
30+
mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM)
31+
mock_socket().setblocking.assert_called_once_with(False)
32+
33+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket")
34+
def test_udp_exporter_init_with_endpoint(self, mock_socket):
35+
exporter = UdpExporter(endpoint="localhost:5000")
36+
self.assertNotEqual(exporter._endpoint, DEFAULT_ENDPOINT)
37+
self.assertEqual(exporter._host, "localhost")
38+
self.assertEqual(exporter._port, 5000)
39+
mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM)
40+
mock_socket().setblocking.assert_called_once_with(False)
41+
42+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket")
43+
def test_udp_exporter_init_invalid_endpoint(self, mock_socket):
44+
with self.assertRaises(ValueError):
45+
UdpExporter(endpoint="invalidEndpoint:port")
46+
47+
# pylint: disable=no-self-use
48+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket")
49+
def test_send_data(self, mock_socket):
50+
mock_socket_instance = mock_socket.return_value
51+
exporter = UdpExporter()
52+
input_bytes: bytes = b"hello"
53+
encoded_bytes: bytes = base64.b64encode(input_bytes)
54+
exporter.send_data(input_bytes, "signal_prefix")
55+
expected_message = PROTOCOL_HEADER + "signal_prefix" + encoded_bytes.decode("utf-8")
56+
mock_socket_instance.sendto.assert_called_once_with(expected_message.encode("utf-8"), ("127.0.0.1", 2000))
57+
58+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.socket.socket")
59+
def test_shutdown(self, mock_socket):
60+
mock_socket_instance = mock_socket.return_value
61+
exporter = UdpExporter()
62+
exporter.shutdown()
63+
mock_socket_instance.close.assert_called_once()
64+
65+
66+
class TestOTLPUdpMetricExporter(unittest.TestCase):
67+
68+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_metrics")
69+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
70+
def test_export(self, mock_udp_exporter, mock_encode_metrics):
71+
mock_udp_exporter_instance = mock_udp_exporter.return_value
72+
mock_encoded_data = MagicMock()
73+
mock_encode_metrics.return_value.SerializeToString.return_value = mock_encoded_data
74+
exporter = OTLPUdpMetricExporter()
75+
result = exporter.export(MagicMock())
76+
mock_udp_exporter_instance.send_data.assert_called_once_with(data=mock_encoded_data, signal_format_prefix="M1")
77+
self.assertEqual(result, MetricExportResult.SUCCESS)
78+
79+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_metrics")
80+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
81+
def test_export_with_exception(self, mock_udp_exporter, mock_encode_metrics):
82+
mock_udp_exporter_instance = mock_udp_exporter.return_value
83+
mock_encoded_data = MagicMock()
84+
mock_encode_metrics.return_value.SerializeToString.return_value = mock_encoded_data
85+
mock_udp_exporter_instance.send_data.side_effect = Exception("Something went wrong")
86+
exporter = OTLPUdpMetricExporter()
87+
result = exporter.export(MagicMock())
88+
self.assertEqual(result, MetricExportResult.FAILURE)
89+
90+
# pylint: disable=no-self-use
91+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
92+
def test_shutdown(self, mock_udp_exporter):
93+
mock_udp_exporter_instance = mock_udp_exporter.return_value
94+
exporter = OTLPUdpMetricExporter()
95+
exporter.force_flush()
96+
exporter.shutdown()
97+
mock_udp_exporter_instance.shutdown.assert_called_once()
98+
99+
100+
class TestOTLPUdpSpanExporter(unittest.TestCase):
101+
102+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_spans")
103+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
104+
def test_export_unsampled_span(self, mock_udp_exporter, mock_encode_spans):
105+
mock_udp_exporter_instance = mock_udp_exporter.return_value
106+
mock_encoded_data = MagicMock()
107+
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
108+
exporter = OTLPUdpSpanExporter(sampled=False)
109+
result = exporter.export(MagicMock())
110+
mock_udp_exporter_instance.send_data.assert_called_once_with(
111+
data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
112+
)
113+
self.assertEqual(result, SpanExportResult.SUCCESS)
114+
115+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_spans")
116+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
117+
def test_export_sampled_span(self, mock_udp_exporter, mock_encode_spans):
118+
mock_udp_exporter_instance = mock_udp_exporter.return_value
119+
mock_encoded_data = MagicMock()
120+
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
121+
exporter = OTLPUdpSpanExporter()
122+
result = exporter.export(MagicMock())
123+
mock_udp_exporter_instance.send_data.assert_called_once_with(
124+
data=mock_encoded_data, signal_format_prefix=FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
125+
)
126+
self.assertEqual(result, SpanExportResult.SUCCESS)
127+
128+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.encode_spans")
129+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
130+
def test_export_with_exception(self, mock_udp_exporter, mock_encode_spans):
131+
mock_udp_exporter_instance = mock_udp_exporter.return_value
132+
mock_encoded_data = MagicMock()
133+
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
134+
mock_udp_exporter_instance.send_data.side_effect = Exception("Something went wrong")
135+
exporter = OTLPUdpSpanExporter()
136+
result = exporter.export(MagicMock())
137+
self.assertEqual(result, SpanExportResult.FAILURE)
138+
139+
# pylint: disable=no-self-use
140+
@patch("amazon.opentelemetry.exporters.otlp.udp.exporter.UdpExporter")
141+
def test_shutdown(self, mock_udp_exporter):
142+
mock_udp_exporter_instance = mock_udp_exporter.return_value
143+
exporter = OTLPUdpSpanExporter()
144+
exporter.shutdown()
145+
exporter.force_flush()
146+
mock_udp_exporter_instance.shutdown.assert_called_once()

0 commit comments

Comments
 (0)