Skip to content

Commit 8f8f41e

Browse files
Raj MusukuZsailer
Raj Musuku
andcommitted
Emit events from the kernels service and gateway client
Co-authored-by: Zach Sailer <[email protected]>
1 parent ca4b062 commit 8f8f41e

File tree

9 files changed

+462
-10
lines changed

9 files changed

+462
-10
lines changed

docs/source/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@
329329
"ipython": ("https://ipython.readthedocs.io/en/stable/", None),
330330
"nbconvert": ("https://nbconvert.readthedocs.io/en/stable/", None),
331331
"nbformat": ("https://nbformat.readthedocs.io/en/stable/", None),
332-
"jupyter_core": ("https://jupyter_core.readthedocs.io/en/stable/", None),
332+
"jupyter_core": ("https://jupyter-core.readthedocs.io/en/stable/", None),
333333
"tornado": ("https://www.tornadoweb.org/en/stable/", None),
334334
"traitlets": ("https://traitlets.readthedocs.io/en/stable/", None),
335335
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"$id": https://events.jupyter.org/jupyter_server/gateway_client/v1
2+
version: 1
3+
title: Gateway Client activities.
4+
personal-data: true
5+
description: |
6+
Record events of a gateway client.
7+
type: object
8+
required:
9+
- status
10+
- msg
11+
properties:
12+
status:
13+
enum:
14+
- error
15+
- success
16+
description: |
17+
Status received by Gateway client based on the rest api operation to gateway kernel.
18+
19+
This is a required field.
20+
21+
Possible values:
22+
23+
1. error
24+
Error response from a rest api operation to gateway kernel.
25+
26+
2. success
27+
Success response from a rest api operation to gateway kernel.
28+
status_code:
29+
type: number
30+
description: |
31+
Http response codes from a rest api operation to gateway kernel.
32+
Examples: 200, 400, 502, 503, 599 etc.
33+
msg:
34+
type: string
35+
description: |
36+
Description of the event being emitted.
37+
gateway_url:
38+
type: string
39+
description: |
40+
Gateway url where the remote server exist.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
"$id": https://events.jupyter.org/jupyter_server/kernel_actions/v1
2+
version: 1
3+
title: Kernel Manager activities
4+
personal-data: true
5+
description: |
6+
Record events of a kernel manager.
7+
type: object
8+
required:
9+
- action
10+
- kernel_id
11+
- msg
12+
properties:
13+
action:
14+
enum:
15+
- start
16+
- interrupt
17+
- shutdown
18+
- restart
19+
description: |
20+
Action performed by the Kernel Manager.
21+
22+
This is a required field.
23+
24+
Possible values:
25+
26+
1. start
27+
A kernel has been started with the given kernel id.
28+
29+
2. interrupt
30+
A kernel has been interrupted for the given kernel id.
31+
32+
3. shutdown
33+
A kernel has been shut down for the given kernel id.
34+
35+
4. restart
36+
A kernel has been restarted for the given kernel id.
37+
kernel_id:
38+
type: string
39+
description: |
40+
Kernel id.
41+
42+
This is a required field.
43+
kernel_name:
44+
type: string
45+
description: |
46+
Name of the kernel.
47+
status:
48+
enum:
49+
- error
50+
- success
51+
description: |
52+
Status received from a rest api operation to kernel server.
53+
54+
This is a required field.
55+
56+
Possible values:
57+
58+
1. error
59+
Error response from a rest api operation to kernel server.
60+
61+
2. success
62+
Success response from a rest api operation to kernel server.
63+
status_code:
64+
type: number
65+
description: |
66+
Http response codes from a rest api operation to kernel server.
67+
Examples: 200, 400, 502, 503, 599 etc
68+
msg:
69+
type: string
70+
description: |
71+
Description of the event specified in action.

jupyter_server/gateway/gateway_client.py

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,31 @@
1212
from http.cookies import SimpleCookie
1313
from socket import gaierror
1414

15+
from jupyter_events import EventLogger
1516
from tornado import web
1617
from tornado.httpclient import AsyncHTTPClient, HTTPClientError, HTTPResponse
17-
from traitlets import Bool, Float, Int, TraitError, Type, Unicode, default, observe, validate
18+
from traitlets import (
19+
Bool,
20+
Float,
21+
Instance,
22+
Int,
23+
TraitError,
24+
Type,
25+
Unicode,
26+
default,
27+
observe,
28+
validate,
29+
)
1830
from traitlets.config import LoggingConfigurable, SingletonConfigurable
1931

32+
from jupyter_server import DEFAULT_EVENTS_SCHEMA_PATH, JUPYTER_SERVER_EVENTS_URI
33+
34+
ERROR_STATUS = "error"
35+
SUCCESS_STATUS = "success"
36+
STATUS_KEY = "status"
37+
STATUS_CODE_KEY = "status_code"
38+
MESSAGE_KEY = "msg"
39+
2040
if ty.TYPE_CHECKING:
2141
from http.cookies import Morsel
2242

@@ -71,10 +91,30 @@ def get_token(
7191
class GatewayClient(SingletonConfigurable):
7292
"""This class manages the configuration. It's its own singleton class so
7393
that we can share these values across all objects. It also contains some
74-
helper methods to build request arguments out of the various config
7594
options.
95+
helper methods to build request arguments out of the various config
7696
"""
7797

98+
event_schema_id = JUPYTER_SERVER_EVENTS_URI + "/gateway_client/v1"
99+
event_logger = Instance(EventLogger).tag(config=True)
100+
101+
@default("event_logger")
102+
def _default_event_logger(self):
103+
if self.parent and hasattr(self.parent, "event_logger"):
104+
# Event logger is attached from serverapp.
105+
return self.parent.event_logger
106+
else:
107+
# If parent does not have an event logger, create one.
108+
logger = EventLogger()
109+
schema_path = DEFAULT_EVENTS_SCHEMA_PATH / "gateway_client" / "v1.yaml"
110+
logger.register_event_schema(schema_path)
111+
self.log.info("Event is registered in GatewayClient.")
112+
return logger
113+
114+
def emit(self, data):
115+
"""Emit event using the core event schema from Jupyter Server's Gateway Client."""
116+
self.event_logger.emit(schema_id=self.event_schema_id, data=data)
117+
78118
url = Unicode(
79119
default_value=None,
80120
allow_none=True,
@@ -97,7 +137,9 @@ def _url_validate(self, proposal):
97137
value = proposal["value"]
98138
# Ensure value, if present, starts with 'http'
99139
if value is not None and len(value) > 0 and not str(value).lower().startswith("http"):
100-
raise TraitError("GatewayClient url must start with 'http': '%r'" % value)
140+
message = "GatewayClient url must start with 'http': '%r'" % value
141+
self.emit(data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 400, MESSAGE_KEY: message})
142+
raise TraitError(message)
101143
return value
102144

103145
ws_url = Unicode(
@@ -123,7 +165,9 @@ def _ws_url_validate(self, proposal):
123165
value = proposal["value"]
124166
# Ensure value, if present, starts with 'ws'
125167
if value is not None and len(value) > 0 and not str(value).lower().startswith("ws"):
126-
raise TraitError("GatewayClient ws_url must start with 'ws': '%r'" % value)
168+
message = "GatewayClient ws_url must start with 'ws': '%r'" % value
169+
self.emit(data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 400, MESSAGE_KEY: message})
170+
raise TraitError(message)
127171
return value
128172

129173
kernels_endpoint_default_value = "/api/kernels"
@@ -728,6 +772,9 @@ async def gateway_request(endpoint: str, **kwargs: ty.Any) -> HTTPResponse:
728772
# NOTE: We do this here since this handler is called during the server's startup and subsequent refreshes
729773
# of the tree view.
730774
except HTTPClientError as e:
775+
GatewayClient.instance().emit(
776+
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: e.code, MESSAGE_KEY: str(e.message)}
777+
)
731778
error_reason = f"Exception while attempting to connect to Gateway server url '{GatewayClient.instance().url}'"
732779
error_message = e.message
733780
if e.response:
@@ -744,12 +791,18 @@ async def gateway_request(endpoint: str, **kwargs: ty.Any) -> HTTPResponse:
744791
"Ensure gateway url is valid and the Gateway instance is running.",
745792
) from e
746793
except ConnectionError as e:
794+
GatewayClient.instance().emit(
795+
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 503, MESSAGE_KEY: str(e)}
796+
)
747797
raise web.HTTPError(
748798
503,
749799
f"ConnectionError was received from Gateway server url '{GatewayClient.instance().url}'. "
750800
"Check to be sure the Gateway instance is running.",
751801
) from e
752802
except gaierror as e:
803+
GatewayClient.instance().emit(
804+
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 404, MESSAGE_KEY: str(e)}
805+
)
753806
raise web.HTTPError(
754807
404,
755808
f"The Gateway server specified in the gateway_url '{GatewayClient.instance().url}' doesn't "

jupyter_server/gateway/managers.py

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import datetime
66
import json
77
import os
8+
import pathlib
9+
import typing as t
810
from logging import Logger
911
from queue import Empty, Queue
1012
from threading import Thread
@@ -18,12 +20,15 @@
1820
from jupyter_client.manager import AsyncKernelManager
1921
from jupyter_client.managerabc import KernelManagerABC
2022
from jupyter_core.utils import ensure_async
23+
from jupyter_events import EventLogger
24+
from jupyter_events.schema_registry import SchemaRegistryException
2125
from tornado import web
2226
from tornado.escape import json_decode, json_encode, url_escape, utf8
23-
from traitlets import DottedObjectName, Instance, Type, default
27+
from traitlets import DottedObjectName, Instance, List, Type, default
2428

29+
from .. import DEFAULT_EVENTS_SCHEMA_PATH
2530
from .._tz import UTC
26-
from ..services.kernels.kernelmanager import AsyncMappingKernelManager
31+
from ..services.kernels.kernelmanager import AsyncMappingKernelManager, emit_kernel_action_event
2732
from ..services.sessions.sessionmanager import SessionManager
2833
from ..utils import url_path_join
2934
from .gateway_client import GatewayClient, gateway_request
@@ -79,7 +84,6 @@ async def start_kernel(self, *, kernel_id=None, path=None, **kwargs):
7984
await km.start_kernel(kernel_id=kernel_id, **kwargs)
8085
kernel_id = km.kernel_id
8186
self._kernels[kernel_id] = km
82-
8387
# Initialize culling if not already
8488
if not self._initialized_culler:
8589
self.initialize_culler()
@@ -290,6 +294,7 @@ async def list_kernel_specs(self):
290294
response = await gateway_request(kernel_spec_url, method="GET")
291295
kernel_specs = json_decode(response.body)
292296
kernel_specs = self._replace_path_kernelspec_resources(kernel_specs)
297+
self.log.debug(f"Retrieved list of kernel specs for the uri: {kernel_spec_url}")
293298
return kernel_specs
294299

295300
async def get_kernel_spec(self, kernel_name, **kwargs):
@@ -376,6 +381,49 @@ class GatewayKernelManager(AsyncKernelManager):
376381
def _default_cache_ports(self):
377382
return False # no need to cache ports here
378383

384+
# A list of pathlib objects, each pointing at an event
385+
# schema to register with this kernel manager's eventlogger.
386+
# This trait should not be overridden.
387+
388+
@property
389+
def core_event_schema_paths(self) -> t.List[pathlib.Path]:
390+
return [DEFAULT_EVENTS_SCHEMA_PATH / "kernel_actions" / "v1.yaml"]
391+
392+
# This trait is intended for subclasses to override and define
393+
# custom event schemas.
394+
extra_event_schema_paths = List(
395+
default_value=[],
396+
help="""
397+
A list of pathlib.Path objects pointing at to register with
398+
the kernel manager's eventlogger.
399+
""",
400+
).tag(config=True)
401+
402+
event_logger = Instance(EventLogger)
403+
404+
@default("event_logger")
405+
def _default_event_logger(self):
406+
"""Initialize the logger and ensure all required events are present."""
407+
if self.parent is not None and hasattr(self.parent, "event_logger"):
408+
logger = self.parent.event_logger
409+
else:
410+
# If parent does not have an event logger, create one.
411+
logger = EventLogger()
412+
# Ensure that all the expected schemas are registered. If not, register them.
413+
schemas = self.core_event_schema_paths + self.extra_event_schema_paths
414+
for schema_path in schemas:
415+
# Try registering the event.
416+
try:
417+
logger.register_event_schema(schema_path)
418+
# Pass if it already exists.
419+
except SchemaRegistryException:
420+
pass
421+
return logger
422+
423+
def emit(self, schema_id, data):
424+
"""Emit an event from the kernel manager."""
425+
self.event_logger.emit(schema_id=schema_id, data=data)
426+
379427
def __init__(self, **kwargs):
380428
"""Initialize the gateway kernel manager."""
381429
super().__init__(**kwargs)
@@ -458,6 +506,9 @@ async def refresh_model(self, model=None):
458506
# Kernel management
459507
# --------------------------------------------------------------------------
460508

509+
@emit_kernel_action_event(
510+
success_msg="Kernel {kernel_id} was started.",
511+
)
461512
async def start_kernel(self, **kwargs):
462513
"""Starts a kernel via HTTP in an asynchronous manner.
463514
@@ -509,6 +560,9 @@ async def start_kernel(self, **kwargs):
509560
self.kernel = await self.refresh_model()
510561
self.log.info(f"GatewayKernelManager using existing kernel: {self.kernel_id}")
511562

563+
@emit_kernel_action_event(
564+
success_msg="Kernel {kernel_id} was shutdown.",
565+
)
512566
async def shutdown_kernel(self, now=False, restart=False):
513567
"""Attempts to stop the kernel process cleanly via HTTP."""
514568

@@ -523,6 +577,9 @@ async def shutdown_kernel(self, now=False, restart=False):
523577
else:
524578
raise
525579

580+
@emit_kernel_action_event(
581+
success_msg="Kernel {kernel_id} was restarted.",
582+
)
526583
async def restart_kernel(self, **kw):
527584
"""Restarts a kernel via HTTP."""
528585
if self.has_kernel:
@@ -537,6 +594,9 @@ async def restart_kernel(self, **kw):
537594
)
538595
self.log.debug("Restart kernel response: %d %s", response.code, response.reason)
539596

597+
@emit_kernel_action_event(
598+
success_msg="Kernel {kernel_id} was interrupted.",
599+
)
540600
async def interrupt_kernel(self):
541601
"""Interrupts the kernel via an HTTP request."""
542602
if self.has_kernel:
@@ -556,8 +616,10 @@ async def is_alive(self):
556616
if self.has_kernel:
557617
# Go ahead and issue a request to get the kernel
558618
self.kernel = await self.refresh_model()
619+
self.log.debug(f"The kernel: {self.kernel} is alive.")
559620
return True
560621
else: # we don't have a kernel
622+
self.log.debug(f"The kernel: {self.kernel} no longer exists.")
561623
return False
562624

563625
def cleanup_resources(self, restart=False):

jupyter_server/serverapp.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1964,6 +1964,8 @@ def init_event_logger(self):
19641964
# events URI, `JUPYTER_SERVER_EVENTS_URI`.
19651965
schema_ids = [
19661966
"https://events.jupyter.org/jupyter_server/contents_service/v1",
1967+
"https://events.jupyter.org/jupyter_server/gateway_client/v1",
1968+
"https://events.jupyter.org/jupyter_server/kernel_actions/v1",
19671969
]
19681970
for schema_id in schema_ids:
19691971
# Get the schema path from the schema ID.

0 commit comments

Comments
 (0)