Skip to content

Commit c76cb73

Browse files
authored
supporting blog content: onelake connector part II (#445)
1 parent b58a387 commit c76cb73

File tree

3 files changed

+592
-0
lines changed

3 files changed

+592
-0
lines changed
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
#
2+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
# or more contributor license agreements. Licensed under the Elastic License 2.0;
4+
# you may not use this file except in compliance with the Elastic License 2.0.
5+
#
6+
7+
import os
8+
9+
from envyaml import EnvYAML
10+
11+
from connectors.logger import logger
12+
13+
DEFAULT_ELASTICSEARCH_MAX_RETRIES = 5
14+
DEFAULT_ELASTICSEARCH_RETRY_INTERVAL = 10
15+
16+
DEFAULT_MAX_FILE_SIZE = 10485760 # 10MB
17+
18+
19+
def load_config(config_file):
20+
logger.info(f"Loading config from {config_file}")
21+
yaml_config = EnvYAML(config_file, flatten=False).export()
22+
nested_yaml_config = {}
23+
for key, value in yaml_config.items():
24+
_nest_configs(nested_yaml_config, key, value)
25+
configuration = dict(_merge_dicts(_default_config(), nested_yaml_config))
26+
_ent_search_config(configuration)
27+
28+
return configuration
29+
30+
31+
def add_defaults(config, default_config=None):
32+
if default_config is None:
33+
default_config = _default_config()
34+
configuration = dict(_merge_dicts(default_config, config))
35+
return configuration
36+
37+
38+
# Left - in Enterprise Search; Right - in Connectors
39+
config_mappings = {
40+
"elasticsearch.host": "elasticsearch.host",
41+
"elasticsearch.username": "elasticsearch.username",
42+
"elasticsearch.password": "elasticsearch.password",
43+
"elasticsearch.headers": "elasticsearch.headers",
44+
"log_level": "service.log_level",
45+
}
46+
47+
# Enterprise Search uses Ruby and is in lower case always, so hacking it here for now
48+
# Ruby-supported log levels: 'debug', 'info', 'warn', 'error', 'fatal', 'unknown'
49+
# Python-supported log levels: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NOTSET'
50+
log_level_mappings = {
51+
"debug": "DEBUG",
52+
"info": "INFO",
53+
"warn": "WARNING",
54+
"error": "ERROR",
55+
"fatal": "CRITICAL",
56+
"unknown": "NOTSET",
57+
}
58+
59+
60+
def _default_config():
61+
return {
62+
"elasticsearch": {
63+
"host": "http://localhost:9200",
64+
"username": "elastic",
65+
"password": "changeme",
66+
"ssl": True,
67+
"verify_certs": True,
68+
"bulk": {
69+
"queue_max_size": 1024,
70+
"queue_max_mem_size": 25,
71+
"queue_refresh_interval": 1,
72+
"queue_refresh_timeout": 600,
73+
"display_every": 100,
74+
"chunk_size": 1000,
75+
"max_concurrency": 5,
76+
"chunk_max_mem_size": 5,
77+
"max_retries": DEFAULT_ELASTICSEARCH_MAX_RETRIES,
78+
"retry_interval": DEFAULT_ELASTICSEARCH_RETRY_INTERVAL,
79+
"concurrent_downloads": 10,
80+
"enable_operations_logging": False,
81+
},
82+
"max_retries": DEFAULT_ELASTICSEARCH_MAX_RETRIES,
83+
"retry_interval": DEFAULT_ELASTICSEARCH_RETRY_INTERVAL,
84+
"retry_on_timeout": True,
85+
"request_timeout": 120,
86+
"max_wait_duration": 120,
87+
"initial_backoff_duration": 1,
88+
"backoff_multiplier": 2,
89+
"log_level": "info",
90+
"feature_use_connectors_api": True,
91+
},
92+
"service": {
93+
"idling": 30,
94+
"heartbeat": 300,
95+
"preflight_max_attempts": 10,
96+
"preflight_idle": 30,
97+
"max_errors": 20,
98+
"max_errors_span": 600,
99+
"max_concurrent_content_syncs": 1,
100+
"max_concurrent_access_control_syncs": 1,
101+
"max_file_download_size": DEFAULT_MAX_FILE_SIZE,
102+
"job_cleanup_interval": 300,
103+
"log_level": "INFO",
104+
},
105+
"sources": {
106+
"onelake": "connectors.sources.onelake:OneLakeDataSource",
107+
"azure_blob_storage": "connectors.sources.azure_blob_storage:AzureBlobStorageDataSource",
108+
"box": "connectors.sources.box:BoxDataSource",
109+
"confluence": "connectors.sources.confluence:ConfluenceDataSource",
110+
"dir": "connectors.sources.directory:DirectoryDataSource",
111+
"dropbox": "connectors.sources.dropbox:DropboxDataSource",
112+
"github": "connectors.sources.github:GitHubDataSource",
113+
"gmail": "connectors.sources.gmail:GMailDataSource",
114+
"google_cloud_storage": "connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource",
115+
"google_drive": "connectors.sources.google_drive:GoogleDriveDataSource",
116+
"graphql": "connectors.sources.graphql:GraphQLDataSource",
117+
"jira": "connectors.sources.jira:JiraDataSource",
118+
"microsoft_teams": "connectors.sources.microsoft_teams:MicrosoftTeamsDataSource",
119+
"mongodb": "connectors.sources.mongo:MongoDataSource",
120+
"mssql": "connectors.sources.mssql:MSSQLDataSource",
121+
"mysql": "connectors.sources.mysql:MySqlDataSource",
122+
"network_drive": "connectors.sources.network_drive:NASDataSource",
123+
"notion": "connectors.sources.notion:NotionDataSource",
124+
"onedrive": "connectors.sources.onedrive:OneDriveDataSource",
125+
"oracle": "connectors.sources.oracle:OracleDataSource",
126+
"outlook": "connectors.sources.outlook:OutlookDataSource",
127+
"postgresql": "connectors.sources.postgresql:PostgreSQLDataSource",
128+
"redis": "connectors.sources.redis:RedisDataSource",
129+
"s3": "connectors.sources.s3:S3DataSource",
130+
"salesforce": "connectors.sources.salesforce:SalesforceDataSource",
131+
"servicenow": "connectors.sources.servicenow:ServiceNowDataSource",
132+
"sharepoint_online": "connectors.sources.sharepoint_online:SharepointOnlineDataSource",
133+
"sharepoint_server": "connectors.sources.sharepoint_server:SharepointServerDataSource",
134+
"slack": "connectors.sources.slack:SlackDataSource",
135+
"zoom": "connectors.sources.zoom:ZoomDataSource",
136+
},
137+
}
138+
139+
140+
def _ent_search_config(configuration):
141+
if "ENT_SEARCH_CONFIG_PATH" not in os.environ:
142+
return
143+
logger.info("Found ENT_SEARCH_CONFIG_PATH, loading ent-search config")
144+
ent_search_config = EnvYAML(os.environ["ENT_SEARCH_CONFIG_PATH"])
145+
for es_field in config_mappings.keys():
146+
if es_field not in ent_search_config:
147+
continue
148+
149+
connector_field = config_mappings[es_field]
150+
es_field_value = ent_search_config[es_field]
151+
152+
if es_field == "log_level":
153+
if es_field_value not in log_level_mappings:
154+
msg = f"Unexpected log level: {es_field_value}. Allowed values: {', '.join(log_level_mappings.keys())}"
155+
raise ValueError(msg)
156+
es_field_value = log_level_mappings[es_field_value]
157+
158+
_nest_configs(configuration, connector_field, es_field_value)
159+
160+
logger.debug(f"Overridden {connector_field}")
161+
162+
163+
def _nest_configs(configuration, field, value):
164+
"""
165+
Update configuration field value taking into account the nesting.
166+
167+
Configuration is a hash of hashes, so we need to dive inside to do proper assignment.
168+
169+
E.g. _nest_config({}, "elasticsearch.bulk.queuesize", 20) will result in the following config:
170+
{
171+
"elasticsearch": {
172+
"bulk": {
173+
"queuesize": 20
174+
}
175+
}
176+
}
177+
"""
178+
subfields = field.split(".")
179+
last_key = subfields[-1]
180+
181+
current_leaf = configuration
182+
for subfield in subfields[:-1]:
183+
if subfield not in current_leaf:
184+
current_leaf[subfield] = {}
185+
current_leaf = current_leaf[subfield]
186+
187+
if isinstance(current_leaf.get(last_key), dict):
188+
current_leaf[last_key] = dict(_merge_dicts(current_leaf[last_key], value))
189+
else:
190+
current_leaf[last_key] = value
191+
192+
193+
def _merge_dicts(hsh1, hsh2):
194+
for k in set(hsh1.keys()).union(hsh2.keys()):
195+
if k in hsh1 and k in hsh2:
196+
if isinstance(hsh1[k], dict) and isinstance(
197+
hsh2[k], dict
198+
): # only merge objects
199+
yield (k, dict(_merge_dicts(hsh1[k], hsh2[k])))
200+
else:
201+
yield (k, hsh2[k])
202+
elif k in hsh1:
203+
yield (k, hsh1[k])
204+
else:
205+
yield (k, hsh2[k])
206+
207+
208+
class DataSourceFrameworkConfig:
209+
"""
210+
The configs that will be exposed to DataSource instances.
211+
This abstraction prevents DataSource instances from having access to all configuration, while also
212+
preventing them from requiring substantial changes to access new configs that may be added.
213+
"""
214+
215+
def __init__(self, max_file_size):
216+
"""
217+
Should not be called directly. Use the Builder.
218+
"""
219+
self.max_file_size = max_file_size
220+
221+
class Builder:
222+
def __init__(self):
223+
self.max_file_size = DEFAULT_MAX_FILE_SIZE
224+
225+
def with_max_file_size(self, max_file_size):
226+
self.max_file_size = max_file_size
227+
return self
228+
229+
def build(self):
230+
return DataSourceFrameworkConfig(self.max_file_size)

0 commit comments

Comments
 (0)