Skip to content

Commit 5a68af8

Browse files
committed
Standardise code style
1 parent 4fe995a commit 5a68af8

File tree

11 files changed

+143
-77
lines changed

11 files changed

+143
-77
lines changed

.flake8

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,6 @@ per-file-ignores =
55
proton_driver/bufferedreader.pyx: E225, E226, E227, E999
66
proton_driver/bufferedwriter.pyx: E225, E226, E227, E999
77
proton_driver/varint.pyx: E225, E226, E227, E999
8+
# ignore example print warning.
9+
example/*: T201,T001
10+
exclude = venv,.conda,build

example/bytewax/hackernews.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ class HNSource(SimplePollingSource):
1919
def next_item(self):
2020
return (
2121
"GLOBAL_ID",
22-
requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json(),
22+
requests.get(
23+
"https://hacker-news.firebaseio.com/v0/maxitem.json"
24+
).json(),
2325
)
2426

2527

26-
def get_id_stream(old_max_id, new_max_id) -> Tuple[str,list]:
28+
def get_id_stream(old_max_id, new_max_id) -> Tuple[str, list]:
2729
if old_max_id is None:
2830
# Get the last 150 items on the first run.
2931
old_max_id = new_max_id - 150
@@ -51,12 +53,7 @@ def recurse_tree(metadata, og_metadata=None) -> any:
5153
parent_metadata = download_metadata(parent_id)
5254
return recurse_tree(parent_metadata[1], og_metadata)
5355
except KeyError:
54-
return (metadata["id"],
55-
{
56-
**og_metadata,
57-
"root_id":metadata["id"]
58-
}
59-
)
56+
return (metadata["id"], {**og_metadata, "root_id": metadata["id"]})
6057

6158

6259
def key_on_parent(key__metadata) -> tuple:
@@ -68,19 +65,32 @@ def format(id__metadata):
6865
id, metadata = id__metadata
6966
return json.dumps(metadata)
7067

68+
7169
flow = Dataflow("hn_scraper")
7270
max_id = op.input("in", flow, HNSource(timedelta(seconds=15)))
73-
id_stream = op.stateful_map("range", max_id, lambda: None, get_id_stream).then(
74-
op.flat_map, "strip_key_flatten", lambda key_ids: key_ids[1]).then(
75-
op.redistribute, "redist")
71+
id_stream = \
72+
op.stateful_map("range", max_id, lambda: None, get_id_stream) \
73+
.then(op.flat_map, "strip_key_flatten", lambda key_ids: key_ids[1]) \
74+
.then(op.redistribute, "redist")
75+
7676
id_stream = op.filter_map("meta_download", id_stream, download_metadata)
77-
split_stream = op.branch("split_comments", id_stream, lambda item: item[1]["type"] == "story")
77+
split_stream = op.branch(
78+
"split_comments", id_stream, lambda item: item[1]["type"] == "story"
79+
)
7880
story_stream = split_stream.trues
7981
story_stream = op.map("format_stories", story_stream, format)
8082
comment_stream = split_stream.falses
8183
comment_stream = op.map("key_on_parent", comment_stream, key_on_parent)
8284
comment_stream = op.map("format_comments", comment_stream, format)
8385
op.inspect("stories", story_stream)
8486
op.inspect("comments", comment_stream)
85-
op.output("stories-out", story_stream, ProtonSink("hn_stories_raw", os.environ.get("PROTON_HOST","127.0.0.1")))
86-
op.output("comments-out", comment_stream, ProtonSink("hn_comments_raw", os.environ.get("PROTON_HOST","127.0.0.1")))
87+
op.output(
88+
"stories-out",
89+
story_stream,
90+
ProtonSink("hn_stories_raw", os.environ.get("PROTON_HOST", "127.0.0.1")),
91+
)
92+
op.output(
93+
"comments-out",
94+
comment_stream,
95+
ProtonSink("hn_comments_raw", os.environ.get("PROTON_HOST", "127.0.0.1")),
96+
)

example/bytewax/proton.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,32 @@
99
logger = logging.getLogger(__name__)
1010
logger.setLevel(logging.INFO)
1111

12+
1213
class _ProtonSinkPartition(StatelessSinkPartition):
1314
def __init__(self, stream: str, host: str):
14-
self.client=client.Client(host=host, port=8463)
15-
self.stream=stream
16-
sql=f"CREATE STREAM IF NOT EXISTS `{stream}` (raw string)"
15+
self.client = client.Client(host=host, port=8463)
16+
self.stream = stream
17+
sql = f"CREATE STREAM IF NOT EXISTS `{stream}` (raw string)"
1718
logger.debug(sql)
1819
self.client.execute(sql)
1920

2021
def write_batch(self, items):
2122
logger.debug(f"inserting data {items}")
22-
rows=[]
23+
rows = []
2324
for item in items:
24-
rows.append([item]) # single column in each row
25+
rows.append([item]) # single column in each row
2526
sql = f"INSERT INTO `{self.stream}` (raw) VALUES"
2627
logger.debug(f"inserting data {sql}")
27-
self.client.execute(sql,rows)
28+
self.client.execute(sql, rows)
29+
2830

2931
class ProtonSink(DynamicSink):
3032
def __init__(self, stream: str, host: str):
3133
self.stream = stream
3234
self.host = host if host is not None and host != "" else "127.0.0.1"
33-
34-
"""Write each output item to Proton on that worker.
35+
36+
"""
37+
Write each output item to Proton on that worker.
3538
3639
Items consumed from the dataflow must look like a string. Use a
3740
proceeding map step to do custom formatting.
@@ -40,9 +43,7 @@ def __init__(self, stream: str, host: str):
4043
4144
Can support at-least-once processing. Messages from the resume
4245
epoch will be duplicated right after resume.
43-
4446
"""
45-
4647
def build(self, worker_index, worker_count):
4748
"""See ABC docstring."""
48-
return _ProtonSinkPartition(self.stream, self.host)
49+
return _ProtonSinkPartition(self.stream, self.host)

example/descriptive_pipeline/server/main.py

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
1-
from fastapi import FastAPI, WebSocket, HTTPException, WebSocketDisconnect, Request, BackgroundTasks
1+
from fastapi import (
2+
FastAPI,
3+
WebSocket,
4+
HTTPException,
5+
WebSocketDisconnect,
6+
Request,
7+
BackgroundTasks,
8+
)
29
from fastapi.responses import JSONResponse, StreamingResponse
310
from pydantic import BaseModel, Field
411
import yaml
512
import queue
613
import threading
714
import asyncio
815
import json
9-
1016
from proton_driver import client
11-
1217
from .utils.logging import getLogger
18+
1319
logger = getLogger()
1420

1521

@@ -58,7 +64,11 @@ def pipeline_exist(self, name):
5864
return False
5965

6066
def delete_pipeline(self, name):
61-
updated_pipelines = [pipeline for pipeline in self.config.pipelines if pipeline.name != name]
67+
updated_pipelines = [
68+
pipeline
69+
for pipeline in self.config.pipelines
70+
if pipeline.name != name
71+
]
6272
self.config.pipelines = updated_pipelines
6373
self.save()
6474

@@ -73,11 +83,13 @@ def save(self):
7383
yaml.dump(self.config, yaml_file)
7484

7585
def run_pipeline(self, name):
76-
proton_client = client.Client(host=self.config.host,
77-
port=self.config.port,
78-
database=self.config.db,
79-
user=self.config.user,
80-
password=self.config.password)
86+
proton_client = client.Client(
87+
host=self.config.host,
88+
port=self.config.port,
89+
database=self.config.db,
90+
user=self.config.user,
91+
password=self.config.password,
92+
)
8193
pipeline = self.get_pipeline_by_name(name)
8294
if pipeline is not None:
8395
for query in pipeline.sqls[:-1]:
@@ -93,7 +105,7 @@ def conf(self):
93105
return self.config
94106

95107

96-
class Query():
108+
class Query:
97109
def __init__(self, sql, client):
98110
self.sql = sql
99111
self.lock = threading.Lock()
@@ -198,7 +210,7 @@ async def query_stream(name, request, background_tasks):
198210
async def check_disconnect():
199211
while True:
200212
await asyncio.sleep(1)
201-
disconnected = await request.is_disconnected();
213+
disconnected = await request.is_disconnected()
202214
if disconnected:
203215
query.cancel()
204216
logger.info('Client disconnected')
@@ -215,28 +227,34 @@ async def check_disconnect():
215227
result = {}
216228
for index, (name, t) in enumerate(header):
217229
if t.startswith('date'):
218-
result[name] = str(m[index]) # convert datetime type to string
230+
# convert datetime type to string
231+
result[name] = str(m[index])
219232
else:
220233
result[name] = m[index]
221234
result_str = json.dumps(result).encode("utf-8") + b"\n"
222235
yield result_str
223236
except Exception as e:
224237
query.cancel()
225-
logger.info(f'query cancelled due to {e}' )
238+
logger.info(f'query cancelled due to {e}')
226239
break
227-
240+
228241
if query.is_finshed():
229242
break
230243

231244
await asyncio.sleep(0.1)
232-
245+
233246

234247
@app.get("/queries/{name}")
235-
def query_pipeline(name: str, request: Request , background_tasks: BackgroundTasks):
248+
def query_pipeline(
249+
name: str, request: Request, background_tasks: BackgroundTasks
250+
):
236251
if not config_manager.pipeline_exist(name):
237252
raise HTTPException(status_code=404, detail="pipeline not found")
238253

239-
return StreamingResponse(query_stream(name, request, background_tasks), media_type="application/json")
254+
return StreamingResponse(
255+
query_stream(name, request, background_tasks),
256+
media_type="application/json",
257+
)
240258

241259

242260
@app.websocket("/queries/{name}")
@@ -258,10 +276,11 @@ async def websocket_endpoint(name: str, websocket: WebSocket):
258276
result = {}
259277
for index, (name, t) in enumerate(header):
260278
if t.startswith('date'):
261-
result[name] = str(m[index]) # convert datetime type to string
279+
# convert datetime type to string
280+
result[name] = str(m[index])
262281
else:
263282
result[name] = m[index]
264-
283+
265284
await websocket.send_text(f'{json.dumps(result)}')
266285
except Exception:
267286
hasError = True
@@ -282,6 +301,7 @@ async def websocket_endpoint(name: str, websocket: WebSocket):
282301
except Exception as e:
283302
logger.exception(e)
284303
finally:
285-
query.cancel() # Ensure query cancellation even if an exception is raised
304+
# Ensure query cancellation even if an exception is raised
305+
query.cancel()
286306
await websocket.close()
287307
logger.debug('session closed')

example/pandas/dataframe.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,31 @@
88

99
# setup the test stream
1010
c.execute("drop stream if exists test")
11-
c.execute("""create stream test (
11+
c.execute(
12+
"""create stream test (
1213
year int16,
1314
first_name string
14-
)""")
15+
)"""
16+
)
1517
# add some data
16-
df = pd.DataFrame.from_records([
17-
{'year': 1994, 'first_name': 'Vova'},
18-
{'year': 1995, 'first_name': 'Anja'},
19-
{'year': 1996, 'first_name': 'Vasja'},
20-
{'year': 1997, 'first_name': 'Petja'},
21-
])
18+
df = pd.DataFrame.from_records(
19+
[
20+
{'year': 1994, 'first_name': 'Vova'},
21+
{'year': 1995, 'first_name': 'Anja'},
22+
{'year': 1996, 'first_name': 'Vasja'},
23+
{'year': 1997, 'first_name': 'Petja'},
24+
]
25+
)
2226
c.insert_dataframe(
2327
'INSERT INTO "test" (year, first_name) VALUES',
2428
df,
2529
settings=dict(use_numpy=True),
2630
)
27-
# or c.execute("INSERT INTO test(year, first_name) VALUES", df.to_dict('records'))
28-
time.sleep(3) # wait for 3 sec to make sure data available in historical store
31+
# or c.execute(
32+
# "INSERT INTO test(year, first_name) VALUES", df.to_dict('records')
33+
# )
34+
# wait for 3 sec to make sure data available in historical store
35+
time.sleep(3)
2936

3037
df = c.query_dataframe('SELECT * FROM table(test)')
3138
print(df)

example/streaming_query/car.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,46 @@
11
"""
22
This example uses driver DB API.
3-
In this example, a thread writes a huge list of data of car speed into database,
4-
and another thread reads from the database to figure out which car is speeding.
3+
In this example, a thread writes a huge list of data of car speed into
4+
database, and another thread reads from the database to figure out which
5+
car is speeding.
56
"""
7+
68
import datetime
79
import random
810
import threading
911
import time
1012

1113
from proton_driver import connect
1214

13-
account='default:'
15+
account = 'default:'
16+
1417

1518
def create_stream():
1619
with connect(f"proton://{account}@localhost:8463/default") as conn:
1720
with conn.cursor() as cursor:
1821
cursor.execute("drop stream if exists cars")
19-
cursor.execute("create stream if not exists car(id int64, speed float64)")
22+
cursor.execute(
23+
"create stream if not exists car(id int64, speed float64)"
24+
)
2025

2126

2227
def write_data(car_num: int):
2328
car_begin_date = datetime.datetime(2022, 1, 1, 1, 0, 0)
2429
for day in range(100):
2530
car_begin_date += datetime.timedelta(days=1)
26-
data = [(random.randint(0, car_num - 1), random.random() * 20 + 50,
27-
car_begin_date
28-
+ datetime.timedelta(milliseconds=i * 100)) for i in range(300000)]
31+
data = [
32+
(
33+
random.randint(0, car_num - 1),
34+
random.random() * 20 + 50,
35+
car_begin_date + datetime.timedelta(milliseconds=i * 100),
36+
)
37+
for i in range(300000)
38+
]
2939
with connect(f"proton://{account}@localhost:8463/default") as conn:
3040
with conn.cursor() as cursor:
31-
cursor.executemany("insert into car (id, speed, _tp_time) values", data)
41+
cursor.executemany(
42+
"insert into car (id, speed, _tp_time) values", data
43+
)
3244
print(f"row count: {cursor.rowcount}")
3345
time.sleep(10)
3446

0 commit comments

Comments
 (0)