Skip to content

Commit e2185f9

Browse files
committed
contribute otlp udp exporter
1 parent 1fae89f commit e2185f9

File tree

5 files changed

+189
-0
lines changed

5 files changed

+189
-0
lines changed

exporters/aws-otel-otlp-udp-exporter/README.rst

Whitespace-only changes.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .aws_otel_otlp_udp_exporter import UdpExporter, OTLPUdpMetricExporter, OTLPUdpSpanExporter
2+
3+
__all__ = ["UdpExporter", "OTLPUdpMetricExporter", "OTLPUdpSpanExporter"]
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import base64
4+
import socket
5+
from logging import Logger, getLogger
6+
from typing import Dict, Optional, Sequence, Tuple
7+
8+
from typing_extensions import override
9+
10+
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import encode_metrics
11+
from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans
12+
from opentelemetry.sdk.metrics._internal.aggregation import AggregationTemporality
13+
from opentelemetry.sdk.metrics._internal.export import MetricExportResult
14+
from opentelemetry.sdk.metrics._internal.point import MetricsData
15+
from opentelemetry.sdk.metrics.export import MetricExporter
16+
from opentelemetry.sdk.metrics.view import Aggregation
17+
from opentelemetry.sdk.trace import ReadableSpan
18+
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
19+
20+
DEFAULT_ENDPOINT = "127.0.0.1:2000"
21+
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
22+
FORMAT_OTEL_METRICS_BINARY_PREFIX = "M1"
23+
24+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S"
25+
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U"
26+
27+
_logger: Logger = getLogger(__name__)
28+
29+
30+
class UdpExporter:
31+
def __init__(self, endpoint: Optional[str] = None):
32+
self._endpoint = endpoint or DEFAULT_ENDPOINT
33+
self._host, self._port = self._parse_endpoint(self._endpoint)
34+
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
35+
self._socket.setblocking(False)
36+
37+
def send_data(self, data: bytes, signal_format_prefix: str):
38+
# base64 encoding and then converting to string with utf-8
39+
base64_encoded_string: str = base64.b64encode(data).decode("utf-8")
40+
message = f"{PROTOCOL_HEADER}{signal_format_prefix}{base64_encoded_string}"
41+
42+
try:
43+
_logger.debug("Sending UDP data: %s", message)
44+
self._socket.sendto(message.encode("utf-8"), (self._host, int(self._port)))
45+
except Exception as exc: # pylint: disable=broad-except
46+
_logger.error("Error sending UDP data: %s", exc)
47+
raise
48+
49+
def shutdown(self):
50+
self._socket.close()
51+
52+
# pylint: disable=no-self-use
53+
def _parse_endpoint(self, endpoint: str) -> Tuple[str, int]:
54+
try:
55+
vals = endpoint.split(":")
56+
host = vals[0]
57+
port = int(vals[1])
58+
except Exception as exc: # pylint: disable=broad-except
59+
raise ValueError(f"Invalid endpoint: {endpoint}") from exc
60+
61+
return host, port
62+
63+
64+
class OTLPUdpMetricExporter(MetricExporter):
65+
def __init__(
66+
self,
67+
endpoint: Optional[str] = None,
68+
preferred_temporality: Dict[type, AggregationTemporality] = None,
69+
preferred_aggregation: Dict[type, Aggregation] = None,
70+
):
71+
super().__init__(
72+
preferred_temporality=preferred_temporality,
73+
preferred_aggregation=preferred_aggregation,
74+
)
75+
self._udp_exporter = UdpExporter(endpoint=endpoint)
76+
77+
@override
78+
def export(
79+
self,
80+
metrics_data: MetricsData,
81+
timeout_millis: float = 10_000,
82+
**kwargs,
83+
) -> MetricExportResult:
84+
serialized_data = encode_metrics(metrics_data).SerializeToString()
85+
86+
try:
87+
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_METRICS_BINARY_PREFIX)
88+
return MetricExportResult.SUCCESS
89+
except Exception as exc: # pylint: disable=broad-except
90+
_logger.error("Error exporting metrics: %s", exc)
91+
return MetricExportResult.FAILURE
92+
93+
# pylint: disable=no-self-use
94+
def force_flush(self, timeout_millis: float = 10_000) -> bool:
95+
# TODO: implement force flush
96+
return True
97+
98+
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
99+
self._udp_exporter.shutdown()
100+
101+
102+
class OTLPUdpSpanExporter(SpanExporter):
103+
def __init__(self, endpoint: Optional[str] = None, sampled: bool = True):
104+
self._udp_exporter = UdpExporter(endpoint=endpoint)
105+
self._sampled = sampled
106+
107+
@override
108+
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
109+
serialized_data = encode_spans(spans).SerializeToString()
110+
111+
try:
112+
prefix = (
113+
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
114+
if self._sampled
115+
else FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX
116+
)
117+
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=prefix)
118+
return SpanExportResult.SUCCESS
119+
except Exception as exc: # pylint: disable=broad-except
120+
_logger.error("Error exporting spans: %s", exc)
121+
return SpanExportResult.FAILURE
122+
123+
# pylint: disable=no-self-use
124+
@override
125+
def force_flush(self, timeout_millis: int = 30000) -> bool:
126+
# TODO: implement force flush
127+
return True
128+
129+
@override
130+
def shutdown(self) -> None:
131+
self._udp_exporter.shutdown()
132+
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
[build-system]
2+
requires = ["hatchling"]
3+
build-backend = "hatchling.build"
4+
5+
[project]
6+
name = "aws-otel-otlp-udp-exporter"
7+
version = "0.1.0"
8+
description = "OTLP UDP Exporter for OpenTelemetry"
9+
readme = "README.rst"
10+
license = "Apache-2.0"
11+
requires-python = ">=3.8"
12+
authors = [
13+
{ name = "Amazon Web Services" }
14+
]
15+
classifiers = [
16+
"Development Status :: 4 - Beta",
17+
"Intended Audience :: Developers",
18+
"License :: OSI Approved :: Apache Software License",
19+
"Programming Language :: Python",
20+
"Programming Language :: Python :: 3",
21+
"Programming Language :: Python :: 3.8",
22+
"Programming Language :: Python :: 3.9",
23+
"Programming Language :: Python :: 3.10",
24+
"Programming Language :: Python :: 3.11",
25+
]
26+
27+
dependencies = [
28+
"opentelemetry-sdk == 1.27.0",
29+
"opentelemetry-exporter-otlp-proto-grpc == 1.27.0",
30+
]
31+
32+
[project.urls]
33+
Homepage = "https://github.com/aws-observability/aws-otel-python-instrumentation/tree/main/exporters"
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from aws_otel_otlp_udp_exporter import UdpExporter
2+
3+
def main():
4+
# Initialize the UDP exporter
5+
exporter = UdpExporter(endpoint="127.0.0.1:2000")
6+
7+
# Sample data to send
8+
sample_data = b"Test data for UDP exporter"
9+
10+
# Send the data
11+
try:
12+
exporter.send_data(data=sample_data, signal_format_prefix="T1S")
13+
print("Data send successfuly.")
14+
except Exception as e:
15+
print(f"An error occurred: {e}")
16+
17+
# Shutdown the exporter
18+
exporter.shutdown()
19+
20+
if __name__ == "__main__":
21+
main()

0 commit comments

Comments
 (0)