Skip to content

Commit 13922f6

Browse files
committed
basic event bus
1 parent d376780 commit 13922f6

File tree

13 files changed

+291
-0
lines changed

13 files changed

+291
-0
lines changed

jupyter_server/base/handlers.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,10 @@ def kernel_spec_manager(self):
340340
def config_manager(self):
341341
return self.settings["config_manager"]
342342

343+
@property
344+
def event_bus(self):
345+
return self.settings["event_bus"]
346+
343347
# ---------------------------------------------------------------
344348
# CORS
345349
# ---------------------------------------------------------------

jupyter_server/serverapp.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
AsyncContentsManager,
121121
ContentsManager,
122122
)
123+
from jupyter_server.services.events.bus import EventBus
123124
from jupyter_server.services.kernels.kernelmanager import (
124125
AsyncMappingKernelManager,
125126
MappingKernelManager,
@@ -164,6 +165,7 @@
164165
sessions=["jupyter_server.services.sessions.handlers"],
165166
shutdown=["jupyter_server.services.shutdown"],
166167
view=["jupyter_server.view.handlers"],
168+
events=["jupyter_server.services.events.handlers"],
167169
)
168170

169171
# Added for backwards compatibility from classic notebook server.
@@ -207,6 +209,7 @@ def __init__(
207209
session_manager,
208210
kernel_spec_manager,
209211
config_manager,
212+
event_bus,
210213
extra_services,
211214
log,
212215
base_url,
@@ -242,6 +245,7 @@ def __init__(
242245
session_manager,
243246
kernel_spec_manager,
244247
config_manager,
248+
event_bus,
245249
extra_services,
246250
log,
247251
base_url,
@@ -263,6 +267,7 @@ def init_settings(
263267
session_manager,
264268
kernel_spec_manager,
265269
config_manager,
270+
event_bus,
266271
extra_services,
267272
log,
268273
base_url,
@@ -354,6 +359,7 @@ def init_settings(
354359
config_manager=config_manager,
355360
authorizer=authorizer,
356361
identity_provider=identity_provider,
362+
event_bus=event_bus,
357363
# handlers
358364
extra_services=extra_services,
359365
# Jupyter stuff
@@ -769,6 +775,7 @@ class ServerApp(JupyterApp):
769775
GatewaySessionManager,
770776
GatewayClient,
771777
Authorizer,
778+
EventBus,
772779
]
773780

774781
subcommands = dict(
@@ -794,6 +801,7 @@ class ServerApp(JupyterApp):
794801
"sessions",
795802
"shutdown",
796803
"view",
804+
"events",
797805
)
798806

799807
_log_formatter_cls = LogFormatter
@@ -1561,6 +1569,11 @@ def _default_kernel_spec_manager_class(self):
15611569
),
15621570
)
15631571

1572+
event_bus = Instance(
1573+
EventBus,
1574+
help="An EventBus for emitting structured event data from Jupyter Server and extensions.",
1575+
)
1576+
15641577
info_file = Unicode()
15651578

15661579
@default("info_file")
@@ -1906,6 +1919,10 @@ def init_logging(self):
19061919
logger.parent = self.log
19071920
logger.setLevel(self.log.level)
19081921

1922+
def init_eventbus(self):
1923+
"""Initialize the Event Bus."""
1924+
self.event_bus = EventBus.instance(parent=self)
1925+
19091926
def init_webapp(self):
19101927
"""initialize tornado webapp"""
19111928
self.tornado_settings["allow_origin"] = self.allow_origin
@@ -1970,6 +1987,7 @@ def init_webapp(self):
19701987
self.session_manager,
19711988
self.kernel_spec_manager,
19721989
self.config_manager,
1990+
self.event_bus,
19731991
self.extra_services,
19741992
self.log,
19751993
self.base_url,
@@ -2436,6 +2454,7 @@ def initialize(
24362454
if find_extensions:
24372455
self.find_server_extensions()
24382456
self.init_logging()
2457+
self.init_eventbus()
24392458
self.init_server_extensions()
24402459

24412460
# Special case the starter extension and load

jupyter_server/services/events/__init__.py

Whitespace-only changes.

jupyter_server/services/events/bus.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from jupyter_telemetry.eventlog import EventLog
2+
from traitlets.config import SingletonConfigurable
3+
4+
5+
class EventBus(EventLog, SingletonConfigurable):
6+
"""A Jupyter EventLog as a Singleton, making it easy to
7+
access from anywhere and log events.
8+
"""
9+
10+
def record_event(self, *args, **kwargs):
11+
12+
super().record_event(*args, **kwargs)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import logging
2+
3+
import tornado.web
4+
import tornado.websocket
5+
from jupyter_telemetry.eventlog import _skip_message
6+
from pythonjsonlogger import jsonlogger
7+
8+
from jupyter_server.base.handlers import JupyterHandler
9+
10+
11+
class TornadoWebSocketLoggingHandler(logging.Handler):
12+
"""Logging handler that routes records to a Tornado websocket."""
13+
14+
def __init__(self, websocket):
15+
super().__init__()
16+
self.websocket = websocket
17+
18+
def emit(self, record):
19+
"""Emit the message across the websocket"""
20+
self.websocket.write_message(record.msg)
21+
22+
23+
class SubscribeWebsocket(
24+
JupyterHandler,
25+
tornado.websocket.WebSocketHandler,
26+
):
27+
@property
28+
def event_bus(self):
29+
return self.settings["event_bus"]
30+
31+
def open(self):
32+
self.logging_handler = TornadoWebSocketLoggingHandler(self)
33+
# Add a JSON formatter to the handler.
34+
formatter = jsonlogger.JsonFormatter(json_serializer=_skip_message)
35+
self.logging_handler.setFormatter(formatter)
36+
# To do: add an eventlog.add_handler method to jupyter_telemetry.
37+
self.event_bus.log.addHandler(self.logging_handler)
38+
self.event_bus.handlers.append(self.logging_handler)
39+
40+
def on_close(self):
41+
self.event_bus.log.removeHandler(self.logging_handler)
42+
43+
44+
default_handlers = [
45+
(r"/api/events/subscribe", SubscribeWebsocket),
46+
]

setup.cfg

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
[metadata]
2+
name = jupyter_server
3+
version = attr: jupyter_server.__version__
4+
description = The backend—i.e. core services, APIs, and REST endpoints—to Jupyter web applications.
5+
long_description = file: README.md
6+
long_description_content_type = text/markdown
7+
license_files = COPYING.md
8+
author = Jupyter Development Team
9+
author_email = [email protected]
10+
url = https://jupyter-server.readthedocs.io
11+
platforms = Linux, Mac OS X, Windows
12+
keywords = ipython, jupyter
13+
classifiers =
14+
Development Status :: 5 - Production/Stable
15+
Framework :: Jupyter
16+
Intended Audience :: Developers
17+
Intended Audience :: Science/Research
18+
Intended Audience :: System Administrators
19+
License :: OSI Approved :: BSD License
20+
Programming Language :: Python
21+
Programming Language :: Python :: 3
22+
Programming Language :: Python :: 3 :: Only
23+
Programming Language :: Python :: 3.7
24+
Programming Language :: Python :: 3.8
25+
Programming Language :: Python :: 3.9
26+
Programming Language :: Python :: 3.10
27+
project_urls =
28+
Documentation = https://jupyter-server.readthedocs.io
29+
Funding = https://numfocus.org/donate
30+
Source = https://github.com/jupyter-server/jupyter_server
31+
Tracker = https://github.com/jupyter-server/jupyter_server/issues
32+
33+
[options]
34+
zip_safe = False
35+
include_package_data = True
36+
packages = find:
37+
package_dir =
38+
"" = "jupyter_server"
39+
python_requires = >=3.7
40+
install_requires =
41+
anyio>=3.1.0,<4
42+
argon2-cffi
43+
jinja2
44+
jupyter_client>=6.1.12
45+
jupyter_core>=4.7.0
46+
jupyter_server_terminals
47+
nbconvert>=6.4.4
48+
nbformat>=5.2.0
49+
packaging
50+
prometheus_client
51+
pywinpty;os_name=='nt'
52+
pyzmq>=17
53+
Send2Trash
54+
terminado>=0.8.3
55+
tornado>=6.1.0
56+
traitlets>=5.1
57+
websocket-client
58+
jupyter_telemetry
59+
60+
[options.extras_require]
61+
test =
62+
coverage
63+
ipykernel
64+
pre-commit
65+
pytest-console-scripts
66+
pytest-cov
67+
pytest-timeout
68+
pytest-tornasync
69+
pytest>=6.0
70+
requests
71+
72+
[options.entry_points]
73+
console_scripts =
74+
jupyter-server = jupyter_server.serverapp:main
75+
76+
[options.packages.find]
77+
exclude =
78+
docs.*
79+
examples.*
80+
tests
81+
tests.*
82+
83+
[flake8]
84+
ignore = E501, W503, E402
85+
builtins = c, get_config
86+
exclude =
87+
.cache,
88+
.github,
89+
docs,
90+
setup.py
91+
enable-extensions = G
92+
extend-ignore =
93+
G001, G002, G004, G200, G201, G202,
94+
# black adds spaces around ':'
95+
E203,
96+
per-file-ignores =
97+
# B011: Do not call assert False since python -O removes these calls
98+
# F841 local variable 'foo' is assigned to but never used
99+
tests/*: B011, F841

tests/services/events/__init__.py

Whitespace-only changes.

tests/services/events/mock_event.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
$id: event.mock.jupyter.com/message
2+
version: 1
3+
title: Message
4+
description: |
5+
Emit a message
6+
type: object
7+
properties:
8+
event_message:
9+
title: Event Messages
10+
description: |
11+
Mock event message to read.
12+
required:
13+
- event_message
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Function that makes these extensions discoverable
2+
# by the test functions.
3+
def _jupyter_server_extension_points():
4+
return [
5+
{"module": "tests.events.mock_extension"},
6+
]
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import pathlib
2+
3+
from jupyter_server.base.handlers import JupyterHandler
4+
from jupyter_server.utils import url_path_join
5+
6+
7+
class MockEventHandler(JupyterHandler):
8+
def get(self):
9+
# Emit an event.
10+
self.event_bus.record_event(
11+
schema_name="event.mockextension.jupyter.com/message",
12+
version=1,
13+
event={"message": "Hello world, from mock extension!"},
14+
)
15+
16+
17+
def _load_jupyter_server_extension(serverapp):
18+
# Register a schema with the EventBus
19+
schema_file = pathlib.Path(__file__).parent / "mock_event_schema.yaml"
20+
serverapp.event_bus.register_schema_file(schema_file)
21+
serverapp.web_app.add_handlers(
22+
".*$", [(url_path_join(serverapp.base_url, "/mock/event"), MockEventHandler)]
23+
)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
$id: event.mockextension.jupyter.com/message
2+
version: 1
3+
title: Message
4+
description: |
5+
Emit a message
6+
type: object
7+
properties:
8+
event_message:
9+
title: Event Message
10+
description: |
11+
Mock event message to read.
12+
required:
13+
- event_message

tests/services/events/test_api.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import json
2+
import pathlib
3+
4+
import pytest
5+
6+
7+
@pytest.fixture
8+
def event_bus(jp_serverapp):
9+
event_bus = jp_serverapp.event_bus
10+
# Register the event schema defined in this directory.
11+
schema_file = pathlib.Path(__file__).parent / "mock_event.yaml"
12+
event_bus.register_schema_file(schema_file)
13+
#
14+
event_bus.allowed_schemas = ["event.mock.jupyter.com/message"]
15+
return event_bus
16+
17+
18+
async def test_subscribe_websocket(jp_ws_fetch, event_bus):
19+
# Open a websocket connection.
20+
ws = await jp_ws_fetch("/api/events/subscribe")
21+
22+
event_bus.record_event(
23+
schema_name="event.mock.jupyter.com/message",
24+
version=1,
25+
event={"event_message": "Hello, world!"},
26+
)
27+
message = await ws.read_message()
28+
event_data = json.loads(message)
29+
# Close websocket
30+
ws.close()
31+
32+
assert event_data.get("event_message") == "Hello, world!"
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import json
2+
import pathlib
3+
4+
import pytest
5+
6+
7+
@pytest.fixture
8+
def event_bus(jp_serverapp):
9+
event_bus = jp_serverapp.event_bus
10+
event_bus.allowed_schemas = ["event.mockextension.jupyter.com/message"]
11+
return event_bus
12+
13+
14+
async def test_subscribe_websocket(jp_ws_fetch, jp_fetch, event_bus):
15+
# Open a websocket connection.
16+
ws = await jp_ws_fetch("/api/events/subscribe")
17+
18+
await jp_fetch("/mock/event")
19+
message = await ws.read_message()
20+
event_data = json.loads(message)
21+
# Close websocket
22+
ws.close()
23+
24+
assert event_data.get("event_message") == "Hello world, from mock extension!"

0 commit comments

Comments
 (0)