Skip to content

Commit 142ac64

Browse files
committed
improve connection handling
1 parent f7e33fc commit 142ac64

File tree

1 file changed

+49
-26
lines changed

1 file changed

+49
-26
lines changed

demos/app_runner/app.py

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22

3-
# Copyright 2021 MONAI Consortium
3+
# Copyright 2022 MONAI Consortium
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
66
# You may obtain a copy of the License at
@@ -94,38 +94,33 @@ def _message_callback(self, ch, method, properties, body):
9494
job_dir_input = job_dir / "input"
9595
job_dir_output = job_dir / "output"
9696
if not os.path.exists(job_dir_input):
97-
self._logger.info(f"Creating input directory for job {job_dir_input}")
97+
self._logger.info(
98+
f"Creating input directory for job {job_dir_input}")
9899
os.makedirs(job_dir_input)
99100

100101
# note: in IG 0.1.1 or later, the bucket name can be found inside body.payload[]
101102
bucket = self._config['storage']['bucket']
102-
file_list = self._storage_client.list_objects(bucket, prefix=request_message['payload_id'],
103-
recursive=True)
104-
for file in file_list:
105-
if self._config['ignore_json'] and file.object_name.endswith('json'):
106-
self._logger.info(f'Skipping JSON file {file.object_name}...')
107-
continue
108-
109-
self._logger.info(f'Downloading file {file.object_name}...')
110-
data = self._storage_client.get_object(bucket, file.object_name)
111-
file_path = job_dir_input / os.path.dirname(file.object_name)
112-
if not os.path.exists(file_path):
113-
self._logger.info(f"Creating directory {file_path}")
114-
os.makedirs(file_path)
115-
116-
file_path = job_dir_input / file.object_name
117-
with open(file_path, 'wb') as file_data:
118-
for d in data.stream(32*1024):
119-
file_data.write(d)
103+
try:
104+
self._download_payload(request_message, job_dir_input, bucket)
105+
except:
106+
e = sys.exc_info()[0]
107+
self._logger.error(
108+
f'Failed to download payload for request. Correlation ID={correlation_id}: {e}.')
109+
self._send_acknowledgement(method.delivery_tag)
110+
return
120111

121-
self._logger.info(
122-
f"Finished download payload {request_message['payload_id']}...")
112+
self._launch_applications(
113+
request_message, job_dir_input, job_dir_output)
114+
self._send_acknowledgement(method.delivery_tag)
123115

116+
def _launch_applications(self, request_message, job_dir_input, job_dir_output):
124117
applications = request_message['workflows']
125118
for application in applications:
126-
app_output = job_dir_output / re.sub(r'[^\w\-_\. ]', '-', application)
119+
app_output = job_dir_output / \
120+
re.sub(r'[^\w\-_\. ]', '-', application)
127121
if not os.path.exists(app_output):
128-
self._logger.info(f"Creating output directory for job {app_output}")
122+
self._logger.info(
123+
f"Creating output directory for job {app_output}")
129124
os.makedirs(app_output)
130125

131126
argsd = {}
@@ -149,7 +144,36 @@ def _message_callback(self, ch, method, properties, body):
149144
self._logger.info(f"\tInput:\t {job_dir_input}")
150145
self._logger.info(f"\tOutput:\t {app_output}")
151146

152-
self._send_acknowledgement(method.delivery_tag)
147+
def _download_payload(self, request_message, job_dir_input, bucket):
148+
file_list = self._storage_client.list_objects(bucket, prefix=request_message['payload_id'],
149+
recursive=True)
150+
for file in file_list:
151+
if self._config['ignore_json'] and file.object_name.endswith('json'):
152+
self._logger.info(f'Skipping JSON file {file.object_name}...')
153+
continue
154+
155+
self._logger.info(f'Downloading file {file.object_name}...')
156+
157+
try:
158+
data = self._storage_client.get_object(
159+
bucket, file.object_name)
160+
file_path = job_dir_input / os.path.dirname(file.object_name)
161+
if not os.path.exists(file_path):
162+
self._logger.info(f"Creating directory {file_path}")
163+
os.makedirs(file_path)
164+
165+
file_path = job_dir_input / file.object_name
166+
with open(file_path, 'wb') as file_data:
167+
for d in data.stream(32*1024):
168+
file_data.write(d)
169+
except:
170+
raise
171+
finally:
172+
data.close()
173+
data.release_conn()
174+
175+
self._logger.info(
176+
f"Finished download payload {request_message['payload_id']}...")
153177

154178
def _send_acknowledgement(self, delivery_tag):
155179
self._logger.info(f"Sending acknowledgement...")
@@ -168,7 +192,6 @@ def handler(signal_received, frame):
168192

169193

170194
if __name__ == "__main__":
171-
signal(SIGINT, handler)
172195
logging.basicConfig(
173196
level=logging.INFO,
174197
format="%(asctime)s [%(levelname)s] %(message)s",

0 commit comments

Comments
 (0)