Skip to content

Commit 4aa527a

Browse files
authored
Workflow Engine's process_data_sources pipeline method (#78526)
## Description 📜 [Tech Spec](https://www.notion.so/sentry/Alerts-Create-Issues-9114a498098143178839d584c293fe75?pvs=4#0f513ed0ba46424fa79cd0ba0788c6f2) - Adds `DataPacket` as a Protocol, so we can enforce a base set of attributes - Adds `process_data_sources` method - Adds telemetry - Tracking by data packet type - adds a span to the DB queries so we can ensure performanec.
1 parent dfcdadb commit 4aa527a

File tree

6 files changed

+155
-2
lines changed

6 files changed

+155
-2
lines changed

src/sentry/workflow_engine/models/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"DataCondition",
44
"DataConditionGroup",
55
"DataConditionGroupAction",
6+
"DataPacket",
67
"DataSource",
78
"DataSourceDetector",
89
"Detector",
@@ -16,7 +17,7 @@
1617
from .data_condition import DataCondition
1718
from .data_condition_group import DataConditionGroup
1819
from .data_condition_group_action import DataConditionGroupAction
19-
from .data_source import DataSource
20+
from .data_source import DataPacket, DataSource
2021
from .data_source_detector import DataSourceDetector
2122
from .detector import Detector
2223
from .detector_state import DetectorState

src/sentry/workflow_engine/models/data_source.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Protocol
2+
13
from django.db import models
24

35
from sentry.backup.scopes import RelocationScope
@@ -10,12 +12,17 @@
1012
from sentry.workflow_engine.models.data_source_detector import DataSourceDetector
1113

1214

15+
class DataPacket(Protocol):
16+
query_id: int
17+
18+
1319
@region_silo_model
1420
class DataSource(DefaultFieldsModel):
1521
__relocation_scope__ = RelocationScope.Organization
1622

1723
class Type(models.IntegerChoices):
1824
SNUBA_QUERY_SUBSCRIPTION = 1
25+
SNUBA_QUERY = 2
1926

2027
organization = FlexibleForeignKey("sentry.Organization")
2128
query_id = BoundedBigIntegerField()
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
__all__ = [
2+
"process_data_sources",
3+
]
4+
5+
from .data_source import process_data_sources
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import logging
2+
3+
import sentry_sdk
4+
from django.db.models import Prefetch
5+
6+
from sentry.utils import metrics
7+
from sentry.workflow_engine.models import DataPacket, DataSource, Detector
8+
9+
logger = logging.getLogger("sentry.workflow_engine.process_data_source")
10+
11+
12+
def process_data_sources(
13+
data_packets: list[DataPacket], query_type: DataSource.Type = DataSource.Type.SNUBA_QUERY
14+
) -> list[tuple[DataPacket, list[Detector]]]:
15+
metrics.incr("sentry.workflow_engine.process_data_sources", tags={"query_type": query_type})
16+
17+
data_packet_ids = {packet.query_id for packet in data_packets}
18+
19+
# Fetch all data sources and associated detectors for the given data packets
20+
with sentry_sdk.start_span(op="sentry.workflow_engine.process_data_sources.fetch_data_sources"):
21+
data_sources = DataSource.objects.filter(
22+
query_id__in=data_packet_ids, type=query_type
23+
).prefetch_related(Prefetch("detectors"))
24+
25+
# Build a lookup dict for query_id to detectors
26+
query_id_to_detectors = {ds.query_id: list(ds.detectors.all()) for ds in data_sources}
27+
28+
# Create the result tuples
29+
result = []
30+
for packet in data_packets:
31+
detectors = query_id_to_detectors.get(packet.query_id)
32+
33+
if detectors:
34+
data_packet_tuple = (packet, detectors)
35+
result.append(data_packet_tuple)
36+
else:
37+
logger.warning(
38+
"No detectors found", extra={"query_id": packet.query_id, "query_type": query_type}
39+
)
40+
metrics.incr(
41+
"sentry.workflow_engine.process_data_sources.no_detectors",
42+
tags={"query_type": query_type},
43+
)
44+
45+
return result

tests/sentry/workflow_engine/models/test_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from sentry.testutils.cases import TestCase
2-
from sentry.workflow_engine.models.workflow import Workflow
2+
from sentry.workflow_engine.models import Workflow
33

44

55
class WorkflowTest(TestCase):
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from sentry.snuba.models import SnubaQuery
2+
from sentry.testutils.cases import TestCase
3+
from sentry.workflow_engine.models import DataSource
4+
from sentry.workflow_engine.processors import process_data_sources
5+
6+
7+
class TestProcessDataSources(TestCase):
8+
def create_snuba_query(self, **kwargs):
9+
return SnubaQuery.objects.create(
10+
type=SnubaQuery.Type.ERROR.value,
11+
dataset="events",
12+
aggregate="count()",
13+
time_window=60,
14+
resolution=60,
15+
**kwargs,
16+
)
17+
18+
def setUp(self):
19+
self.query = self.create_snuba_query()
20+
self.query_two = self.create_snuba_query()
21+
22+
self.detector_one = self.create_detector(name="test_detector1")
23+
self.detector_two = self.create_detector(name="test_detector2")
24+
25+
self.ds1 = self.create_data_source(query_id=self.query.id, type=DataSource.Type.SNUBA_QUERY)
26+
self.ds1.detectors.set([self.detector_one])
27+
28+
self.ds2 = self.create_data_source(
29+
query_id=self.query_two.id, type=DataSource.Type.SNUBA_QUERY
30+
)
31+
self.ds2.detectors.set([self.detector_two])
32+
33+
self.packet = self.query
34+
self.packet_two = self.query_two
35+
36+
# turn a query into a data packet, "simulating" the result from the snuba query
37+
self.packet.query_id = self.query.id
38+
self.packet_two.query_id = self.query_two.id
39+
40+
self.data_packets = [self.query, self.query_two]
41+
42+
def test_single_data_packet(self):
43+
self.data_packets = [self.query]
44+
assert process_data_sources(self.data_packets, DataSource.Type.SNUBA_QUERY) == [
45+
(self.query, [self.detector_one])
46+
]
47+
48+
def test_multiple_data_packets(self):
49+
assert process_data_sources(self.data_packets, DataSource.Type.SNUBA_QUERY) == [
50+
(self.query, [self.detector_one]),
51+
(self.query_two, [self.detector_two]),
52+
]
53+
54+
def test_multiple_detectors(self):
55+
self.detector_three = self.create_detector(name="test_detector3")
56+
self.detector_four = self.create_detector(name="test_detector4")
57+
self.detector_five = self.create_detector(name="test_detector5")
58+
self.detector_five = self.create_detector(name="test_detector5")
59+
60+
self.ds2.detectors.add(self.detector_three)
61+
self.ds2.detectors.add(self.detector_four)
62+
self.ds2.detectors.add(self.detector_five)
63+
64+
assert process_data_sources(self.data_packets, DataSource.Type.SNUBA_QUERY) == [
65+
(self.query, [self.detector_one]),
66+
(
67+
self.query_two,
68+
[self.detector_two, self.detector_three, self.detector_four, self.detector_five],
69+
),
70+
]
71+
72+
def test_no_results(self):
73+
self.ds1.detectors.clear()
74+
self.ds2.detectors.clear()
75+
76+
assert process_data_sources(self.data_packets, DataSource.Type.SNUBA_QUERY) == []
77+
78+
def test_different_data_packet_type__no_results(self):
79+
assert (
80+
process_data_sources(self.data_packets, DataSource.Type.SNUBA_QUERY_SUBSCRIPTION) == []
81+
)
82+
83+
def test_different_data_packet_type__with_results(self):
84+
self.ds1.type = DataSource.Type.SNUBA_QUERY_SUBSCRIPTION
85+
self.ds1.save()
86+
87+
self.ds2.type = DataSource.Type.SNUBA_QUERY_SUBSCRIPTION
88+
self.ds2.save()
89+
90+
assert process_data_sources(
91+
self.data_packets, DataSource.Type.SNUBA_QUERY_SUBSCRIPTION
92+
) == [
93+
(self.query, [self.detector_one]),
94+
(self.query_two, [self.detector_two]),
95+
]

0 commit comments

Comments
 (0)