Skip to content

Commit f7e33fc

Browse files
committed
use workflows specified in the request message
Signed-off-by: Victor Chang <[email protected]>
1 parent 59da18e commit f7e33fc

File tree

4 files changed

+69
-47
lines changed

4 files changed

+69
-47
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ name: ci
1414
on:
1515
# Triggers on pushes and on pull requests
1616
push:
17+
paths-ignore:
18+
- 'demos/**'
1719
pull_request:
20+
paths-ignore:
21+
- 'demos/**'
1822

1923
# Allows you to run this workflow manually from the Actions tab
2024
workflow_dispatch:

demos/app_runner/README.md

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ This demo shows how an application integrates with the MONAI Deploy Informatics
88
## Requirements
99

1010
- MONAI Deploy Informatics Gateway 0.1.0+
11-
- MONAI Deploy App SDK 0.2.1+ with [dicom_series_to_image_app](https://github.com/Project-MONAI/monai-deploy-app-sdk/tree/main/examples/apps/dicom_series_to_image_app) built as `dcm-to-img:latest`
11+
- MONAI Deploy App SDK 0.2.1+
12+
- A MAP from the [examples](https://github.com/Project-MONAI/monai-deploy-app-sdk/tree/main/examples/apps/) or BYO MAP.
1213
- RabbitMQ configured and running
1314
- MinIO configured and running
1415
- Python 3.7+
@@ -17,16 +18,18 @@ This demo shows how an application integrates with the MONAI Deploy Informatics
1718
## Running the demo
1819

1920
1. Install requirements specified above
20-
2. Install python dependencies specified in [requirements.txt](./requirements.txt)
21-
3. Edit `config.json` and change:
21+
2. Configure an AET with one or more workflows. For example, the following command would trigger the `dcm-to-img:latest` MAP.
22+
```
23+
mig-cli aet add -a DCM2PNG -w dcm-to-img:latest -v
24+
```
25+
3. Install python dependencies specified in [requirements.txt](./requirements.txt)
26+
4. Edit `config.json` and change:
2227
1. `endpoint`/`host`, `username`, and `password` for both storage and messaging services
2328
2. `bucket` where payloads are stored
24-
3. `application` to point to the correct MAP
25-
4. python app.py
29+
5. python app.py
2630
27-
**Notes**: For MONAI Deploy App SDK 0.2.1, set `ignore_json` to false in the `config.json` file so DICOM JSON files are not downloaded.
31+
**Notes**: For MONAI Deploy App SDK 0.2.1, set `ignore_json` to `false` in the `config.json` file so DICOM JSON files are not downloaded.
2832
29-
## Tips
33+
## Other Ideas
3034
31-
- Try another MAP by changing `application` in the `config.json` file.
32-
- Instead of calling App Runner, integrate with [MIS](https://github.com/Project-MONAI/monai-deploy-app-server)
35+
💡 Instead of calling App Runner, integrate with [MIS](https://github.com/Project-MONAI/monai-deploy-app-server)

demos/app_runner/app.py

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313

14-
1514
import json
1615
import logging
1716
import os
18-
from signal import signal, SIGINT
17+
import re
1918
import sys
2019
from pathlib import Path
20+
from signal import SIGINT, signal
2121
from types import SimpleNamespace
2222

2323
import pika
@@ -39,14 +39,15 @@ def _init_storage(self) -> None:
3939
config['endpoint'],
4040
config['username'],
4141
config['password'],
42-
secure=False # DEMO purposes only!!! Make sure to use a secure connection!!!
42+
secure=False # DEMO purposes only!!! Make sure to use a secure connection!!!
4343
)
4444

4545
if not self._storage_client.bucket_exists(config['bucket']):
4646
raise f"Bucket '{config['bucket']}' does not exist"
4747

4848
if not os.path.exists(self._working_directory):
49-
self._logger.info(f"Creating working directory {self._working_directory}")
49+
self._logger.info(
50+
f"Creating working directory {self._working_directory}")
5051
os.makedirs(self._working_directory)
5152

5253
def _init_messaging(self) -> None:
@@ -72,31 +73,34 @@ def _load_config(self) -> None:
7273
self._config = json.load(f)
7374

7475
self._working_directory = Path(self._config['working_dir'])
75-
self._application = self._config['application']
7676

7777
def _message_callback(self, ch, method, properties, body):
7878
correlation_id = properties.correlation_id
7979
self._logger.info(
8080
f"Message received from application={properties.app_id}. Correlation ID={correlation_id}. Delivery tag={method.delivery_tag}. Topic={method.routing_key}")
8181

8282
request_message = json.loads(body)
83-
print(" body\t%r" % (request_message))
84-
print(" properties\t%r" % (properties))
83+
84+
if 'workflows' not in request_message or len(request_message['workflows']) == 0:
85+
self._logger.error(
86+
f"No applications defined in the message body, skipping.")
87+
self._send_acknowledgement(method.delivery_tag)
88+
return
89+
90+
# print(" body\t%r" % (request_message))
91+
# print(" properties\t%r" % (properties))
8592

8693
job_dir = self._working_directory / correlation_id
8794
job_dir_input = job_dir / "input"
8895
job_dir_output = job_dir / "output"
8996
if not os.path.exists(job_dir_input):
90-
self._logger.info(f"Creating working directory for job {job_dir}")
97+
self._logger.info(f"Creating input directory for job {job_dir_input}")
9198
os.makedirs(job_dir_input)
92-
if not os.path.exists(job_dir_output):
93-
self._logger.info(f"Creating working directory for job {job_dir}")
94-
os.makedirs(job_dir_output)
9599

96100
# note: in IG 0.1.1 or later, the bucket name can be found inside body.payload[]
97-
bucket=self._config['storage']['bucket']
101+
bucket = self._config['storage']['bucket']
98102
file_list = self._storage_client.list_objects(bucket, prefix=request_message['payload_id'],
99-
recursive=True)
103+
recursive=True)
100104
for file in file_list:
101105
if self._config['ignore_json'] and file.object_name.endswith('json'):
102106
self._logger.info(f'Skipping JSON file {file.object_name}...')
@@ -114,31 +118,43 @@ def _message_callback(self, ch, method, properties, body):
114118
for d in data.stream(32*1024):
115119
file_data.write(d)
116120

117-
self._logger.info(f"Finished download payload {request_message['payload_id']}...")
118-
119-
argsd = {}
120-
argsd['map'] = self._application
121-
argsd['input'] = job_dir_input
122-
argsd['output'] =job_dir_output
123-
argsd['quiet'] = False
124-
125-
126-
self._logger.info(f"Launching application {self._application}...")
127-
self._logger.info(f"\tInput:\t {job_dir_input}...")
128-
self._logger.info(f"\tOutput:\t {job_dir_output}...")
129-
args = SimpleNamespace(**argsd)
130-
try:
131-
runner.main(args)
132-
except:
133-
e = sys.exc_info()[0]
134-
self._logger.error(f'{self._application} failed with {e}.')
135-
else:
121+
self._logger.info(
122+
f"Finished download payload {request_message['payload_id']}...")
123+
124+
applications = request_message['workflows']
125+
for application in applications:
126+
app_output = job_dir_output / re.sub(r'[^\w\-_\. ]', '-', application)
127+
if not os.path.exists(app_output):
128+
self._logger.info(f"Creating output directory for job {app_output}")
129+
os.makedirs(app_output)
130+
131+
argsd = {}
132+
argsd['map'] = application
133+
argsd['input'] = job_dir_input
134+
argsd['output'] = app_output
135+
argsd['quiet'] = False
136+
137+
self._logger.info(f"Launching application {application}...")
136138
self._logger.info(f"\tInput:\t {job_dir_input}...")
137-
self._logger.info(f"\tOutput:\t {job_dir_output}...")
138-
finally:
139-
self._logger.info(f"{self._application} completed. Sending acknowledgement...")
140-
self._pika_channel.basic_ack(method.delivery_tag)
141-
self._logger.info(f"{self._application} completed. Acknowledgement sent...")
139+
self._logger.info(f"\tOutput:\t {app_output}...")
140+
args = SimpleNamespace(**argsd)
141+
try:
142+
runner.main(args)
143+
except:
144+
e = sys.exc_info()[0]
145+
self._logger.error(f'{application} failed with {e}.')
146+
else:
147+
self._logger.info(
148+
f"Application {application} completed successfully.")
149+
self._logger.info(f"\tInput:\t {job_dir_input}")
150+
self._logger.info(f"\tOutput:\t {app_output}")
151+
152+
self._send_acknowledgement(method.delivery_tag)
153+
154+
def _send_acknowledgement(self, delivery_tag):
155+
self._logger.info(f"Sending acknowledgement...")
156+
self._pika_channel.basic_ack(delivery_tag)
157+
self._logger.info(f"Acknowledgement sent...")
142158

143159
def run(self):
144160
self._logger.info('[*] Waiting for logs. To exit press CTRL+C')

demos/app_runner/config.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,5 @@
1414
"topic": "md.workflow.request"
1515
},
1616
"working_dir": "jobs/",
17-
"application": "dcm-to-img:latest",
1817
"ignore_json": false
1918
}

0 commit comments

Comments
 (0)