Skip to content

Commit dd54dcb

Browse files
committed
Add IPC shared memory example using the ipc.h API
Signed-off-by: Lukasz Dorau <[email protected]>
1 parent 4865d89 commit dd54dcb

File tree

7 files changed

+589
-3
lines changed

7 files changed

+589
-3
lines changed

CMakeLists.txt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,8 +437,13 @@ install(FILES ${CMAKE_SOURCE_DIR}/LICENSE.TXT
437437
DESTINATION "${CMAKE_INSTALL_DATAROOTDIR}/doc/${PROJECT_NAME}/")
438438

439439
install(
440-
FILES examples/basic/gpu_shared_memory.c examples/basic/utils_level_zero.h
441-
examples/basic/basic.c examples/basic/ipc_level_zero.c
440+
FILES examples/basic/gpu_shared_memory.c
441+
examples/basic/utils_level_zero.h
442+
examples/basic/basic.c
443+
examples/basic/ipc_level_zero.c
444+
examples/basic/ipc_shm_ipcapi.sh
445+
examples/basic/ipc_shm_ipcapi_consumer.c
446+
examples/basic/ipc_shm_ipcapi_producer.c
442447
DESTINATION "${CMAKE_INSTALL_DOCDIR}/examples")
443448

444449
# Add the include directory and the headers target to the install.

examples/CMakeLists.txt

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,37 @@ else()
108108
"UMF_BUILD_LEVEL_ZERO_PROVIDER, UMF_BUILD_LIBUMF_POOL_DISJOINT and "
109109
"UMF_ENABLE_POOL_TRACKING to be turned ON - skipping")
110110
endif()
111+
112+
if(LINUX AND UMF_BUILD_LIBUMF_POOL_SCALABLE)
113+
set(BASE_NAME ipc_shm_ipcapi)
114+
set(EXAMPLE_NAME umf_example_${BASE_NAME})
115+
116+
foreach(loop_var IN ITEMS "producer" "consumer")
117+
set(EX_NAME ${EXAMPLE_NAME}_${loop_var})
118+
add_umf_executable(
119+
NAME ${EX_NAME}
120+
SRCS basic/${BASE_NAME}_${loop_var}.c
121+
LIBS umf scalable_pool)
122+
123+
target_include_directories(
124+
${EX_NAME} PRIVATE ${UMF_CMAKE_SOURCE_DIR}/src/utils
125+
${UMF_CMAKE_SOURCE_DIR}/include)
126+
127+
target_link_directories(${EX_NAME} PRIVATE ${LIBHWLOC_LIBRARY_DIRS})
128+
endforeach(loop_var)
129+
130+
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/basic/${BASE_NAME}.sh
131+
DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
132+
133+
add_test(
134+
NAME ${EXAMPLE_NAME}
135+
COMMAND ${BASE_NAME}.sh
136+
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
137+
138+
set_tests_properties(${EXAMPLE_NAME} PROPERTIES LABELS "example")
139+
else()
140+
message(
141+
STATUS
142+
"IPC shared memory example with UMF pool API is supported on Linux only - skipping"
143+
)
144+
endif()

examples/README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,32 @@ and build this example Level Zero development package should be installed.
3838
* Level Zero headers and libraries
3939
* compatible GPU with installed driver
4040
* set UMF_BUILD_GPU_EXAMPLES, UMF_BUILD_LIBUMF_POOL_DISJOINT, UMF_BUILD_LEVEL_ZERO_PROVIDER and UMF_ENABLE_POOL_TRACKING CMake configuration flags to ON
41+
42+
## IPC example with shared memory
43+
This example also demonstrates how to use UMF IPC API. The example creates two
44+
processes: a producer and a consumer that communicate in the following way
45+
(the initial value 140722582213392 in the shared memory is quasi-random):
46+
- Consumer starts
47+
- Consumer creates a socket
48+
- Consumer listens for incoming connections
49+
- Producer starts
50+
- Producer's shared memory contains a number: 140722582213392
51+
- Producer gets the IPC handle
52+
- Producer creates a socket
53+
- Producer connects to the consumer
54+
- Consumer connects at IP 127.0.0.1 and port 47770 to the producer
55+
- Producer sents the IPC handle to the consumer (24 bytes)
56+
- Consumer receives the IPC handle from the producer (24 bytes)
57+
- Consumer opens the IPC handle received from the producer
58+
- Consumer reads the number from the producer's shared memory: 140722582213392
59+
- Consumer writes a new number directly to the producer's shared memory: 70361291106696
60+
- Consumer sents a response message to the producer
61+
- Consumer closes the IPC handle received from the producer
62+
- Producer receives the response from the consumer: "This is the consumer. I just wrote a new number directly into your shared memory!"
63+
- Producer verifies the consumer wrote the correct value (the old one / 2) to the producer's shared memory: 70361291106696
64+
- Producer puts the IPC handle
65+
- Consumer shuts down
66+
- Producer shuts down
67+
68+
### Requirements
69+
* set UMF_BUILD_LIBUMF_POOL_SCALABLE CMake configuration flag to ON

examples/basic/ipc_shm_ipcapi.sh

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#
2+
# Copyright (C) 2024 Intel Corporation
3+
#
4+
# Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT.
5+
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6+
#
7+
8+
#!/bin/bash
9+
10+
# port should be a number from the range <1024, 65535>
11+
PORT=$(( 1024 + ( $$ % ( 65535 - 1024 ))))
12+
13+
# The ipc_shm_ipcapi example requires using pidfd_getfd(2)
14+
# to obtain a duplicate of another process's file descriptor.
15+
# Permission to duplicate another process's file descriptor
16+
# is governed by a ptrace access mode PTRACE_MODE_ATTACH_REALCREDS check (see ptrace(2))
17+
# that can be changed using the /proc/sys/kernel/yama/ptrace_scope interface.
18+
PTRACE_SCOPE_FILE="/proc/sys/kernel/yama/ptrace_scope"
19+
VAL=0
20+
if [ -f $PTRACE_SCOPE_FILE ]; then
21+
PTRACE_SCOPE_VAL=$(cat $PTRACE_SCOPE_FILE)
22+
if [ $PTRACE_SCOPE_VAL -ne $VAL ]; then
23+
# check if sudo requires password
24+
if ! timeout --kill-after=5s 3s sudo date; then
25+
echo "ERROR: sudo requires password - cannot set ptrace_scope to 0"
26+
exit 1
27+
fi
28+
echo "Setting ptrace_scope to 0 (classic ptrace permissions) ..."
29+
echo "$ sudo bash -c \"echo $VAL > $PTRACE_SCOPE_FILE\""
30+
sudo bash -c "echo $VAL > $PTRACE_SCOPE_FILE"
31+
fi
32+
PTRACE_SCOPE_VAL=$(cat $PTRACE_SCOPE_FILE)
33+
if [ $PTRACE_SCOPE_VAL -ne $VAL ]; then
34+
echo "SKIP: setting ptrace_scope to 0 (classic ptrace permissions) FAILED - skipping the test"
35+
exit 0
36+
fi
37+
fi
38+
39+
UMF_LOG_VAL="level:debug;flush:debug;output:stderr;pid:yes"
40+
41+
echo "Starting ipc_shm_ipcapi CONSUMER on port $PORT ..."
42+
UMF_LOG=$UMF_LOG_VAL ./umf_example_ipc_shm_ipcapi_consumer $PORT &
43+
44+
echo "Waiting 1 sec ..."
45+
sleep 1
46+
47+
echo "Starting ipc_shm_ipcapi PRODUCER on port $PORT ..."
48+
UMF_LOG=$UMF_LOG_VAL ./umf_example_ipc_shm_ipcapi_producer $PORT
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright (C) 2024 Intel Corporation
3+
*
4+
* Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT.
5+
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6+
*/
7+
8+
#include <arpa/inet.h>
9+
#include <stdio.h>
10+
#include <stdlib.h>
11+
#include <string.h>
12+
#include <sys/socket.h>
13+
#include <unistd.h>
14+
15+
#include <umf/ipc.h>
16+
#include <umf/memory_pool.h>
17+
#include <umf/pools/pool_scalable.h>
18+
#include <umf/providers/provider_os_memory.h>
19+
20+
#define INET_ADDR "127.0.0.1"
21+
#define CONSUMER_MSG_SIZE 256
22+
#define RECV_BUFF_SIZE 1024
23+
24+
// consumer's response message
25+
#define CONSUMER_MSG \
26+
"This is the consumer. I just wrote a new number directly into your " \
27+
"shared memory!"
28+
29+
int consumer_connect(int port) {
30+
struct sockaddr_in consumer_addr;
31+
struct sockaddr_in producer_addr;
32+
int producer_addr_len;
33+
int producer_socket = -1;
34+
int consumer_socket = -1;
35+
int ret = -1;
36+
37+
// create a socket
38+
consumer_socket = socket(AF_INET, SOCK_STREAM, 0);
39+
if (consumer_socket < 0) {
40+
fprintf(stderr, "[consumer] ERROR: creating socket failed\n");
41+
return -1;
42+
}
43+
44+
fprintf(stderr, "[consumer] Socket created\n");
45+
46+
// set the IP address and the port
47+
consumer_addr.sin_family = AF_INET;
48+
consumer_addr.sin_port = htons(port);
49+
consumer_addr.sin_addr.s_addr = inet_addr(INET_ADDR);
50+
51+
// bind to the IP address and the port
52+
if (bind(consumer_socket, (struct sockaddr *)&consumer_addr,
53+
sizeof(consumer_addr)) < 0) {
54+
fprintf(stderr, "[consumer] ERROR: cannot bind to the port\n");
55+
goto err_close_consumer_socket;
56+
}
57+
58+
fprintf(stderr, "[consumer] Binding done\n");
59+
60+
// listen for the producer
61+
if (listen(consumer_socket, 1) < 0) {
62+
fprintf(stderr, "[consumer] ERROR: listen() failed\n");
63+
goto err_close_consumer_socket;
64+
}
65+
66+
fprintf(stderr, "[consumer] Listening for incoming connections ...\n");
67+
68+
// accept an incoming connection
69+
producer_addr_len = sizeof(producer_addr);
70+
producer_socket = accept(consumer_socket, (struct sockaddr *)&producer_addr,
71+
(socklen_t *)&producer_addr_len);
72+
if (producer_socket < 0) {
73+
fprintf(stderr, "[consumer] ERROR: accept() failed\n");
74+
goto err_close_consumer_socket;
75+
}
76+
77+
fprintf(stderr, "[consumer] Producer connected at IP %s and port %i\n",
78+
inet_ntoa(producer_addr.sin_addr), ntohs(producer_addr.sin_port));
79+
80+
ret = producer_socket; // success
81+
82+
err_close_consumer_socket:
83+
close(consumer_socket);
84+
85+
return ret;
86+
}
87+
88+
int main(int argc, char *argv[]) {
89+
char consumer_message[CONSUMER_MSG_SIZE];
90+
char recv_buffer[RECV_BUFF_SIZE];
91+
int producer_socket;
92+
int ret = -1;
93+
94+
if (argc < 2) {
95+
fprintf(stderr, "usage: %s port\n", argv[0]);
96+
return -1;
97+
}
98+
99+
int port = atoi(argv[1]);
100+
101+
umf_memory_provider_handle_t OS_memory_provider = NULL;
102+
umf_os_memory_provider_params_t os_params;
103+
enum umf_result_t umf_result;
104+
105+
os_params = umfOsMemoryProviderParamsDefault();
106+
os_params.visibility = UMF_MEM_MAP_SHARED;
107+
108+
// create OS memory provider
109+
umf_result = umfMemoryProviderCreate(umfOsMemoryProviderOps(), &os_params,
110+
&OS_memory_provider);
111+
if (umf_result != UMF_RESULT_SUCCESS) {
112+
fprintf(stderr,
113+
"[consumer] ERROR: creating OS memory provider failed\n");
114+
return -1;
115+
}
116+
117+
umf_memory_pool_handle_t scalable_pool;
118+
umf_result = umfPoolCreate(umfScalablePoolOps(), OS_memory_provider, NULL,
119+
0, &scalable_pool);
120+
if (umf_result != UMF_RESULT_SUCCESS) {
121+
fprintf(stderr,
122+
"[producer] ERROR: creating jemalloc UMF pool failed\n");
123+
goto err_destroy_OS_memory_provider;
124+
}
125+
126+
producer_socket = consumer_connect(port);
127+
if (producer_socket < 0) {
128+
goto err_destroy_scalable_pool;
129+
}
130+
131+
// receive the IPC handle from the producer's
132+
ssize_t len = recv(producer_socket, recv_buffer, RECV_BUFF_SIZE, 0);
133+
if (len < 0) {
134+
fprintf(stderr, "[consumer] ERROR: recv() failed\n");
135+
goto err_close_producer_socket;
136+
}
137+
138+
umf_ipc_handle_t IPC_handle = (umf_ipc_handle_t)recv_buffer;
139+
140+
fprintf(
141+
stderr,
142+
"[consumer] Received the IPC handle from the producer (%zi bytes)\n",
143+
len);
144+
145+
void *SHM_ptr;
146+
umf_result = umfOpenIPCHandle(scalable_pool, IPC_handle, &SHM_ptr);
147+
if (umf_result == UMF_RESULT_ERROR_NOT_SUPPORTED) {
148+
fprintf(stderr,
149+
"[consumer] SKIP: opening the IPC handle is not supported\n");
150+
ret = 1; // SKIP
151+
152+
// write the SKIP response to the consumer_message buffer
153+
strcpy(consumer_message, "SKIP");
154+
155+
// send the SKIP response to the producer
156+
send(producer_socket, consumer_message, strlen(consumer_message) + 1,
157+
0);
158+
159+
goto err_close_producer_socket;
160+
}
161+
if (umf_result != UMF_RESULT_SUCCESS) {
162+
fprintf(stderr, "[consumer] ERROR: opening the IPC handle failed\n");
163+
goto err_close_producer_socket;
164+
}
165+
166+
fprintf(stderr,
167+
"[consumer] Opened the IPC handle received from the producer\n");
168+
169+
// read the current value from the shared memory
170+
unsigned long long SHM_number_1 = *(unsigned long long *)SHM_ptr;
171+
fprintf(
172+
stderr,
173+
"[consumer] Read the number from the producer's shared memory: %llu\n",
174+
SHM_number_1);
175+
176+
// calculate the new value
177+
unsigned long long SHM_number_2 = SHM_number_1 / 2;
178+
179+
// write the new number directly to the producer's shared memory
180+
*(unsigned long long *)SHM_ptr = SHM_number_2;
181+
fprintf(stderr,
182+
"[consumer] Wrote a new number directly to the producer's shared "
183+
"memory: %llu\n",
184+
SHM_number_2);
185+
186+
// write the response to the consumer_message buffer
187+
memset(consumer_message, 0, sizeof(consumer_message));
188+
strcpy(consumer_message, CONSUMER_MSG);
189+
190+
// send response to the producer
191+
if (send(producer_socket, consumer_message, strlen(consumer_message) + 1,
192+
0) < 0) {
193+
fprintf(stderr, "[consumer] ERROR: send() failed\n");
194+
goto err_CloseIPCHandle;
195+
}
196+
197+
fprintf(stderr, "[consumer] Sent a response message to the producer\n");
198+
199+
ret = 0; // SUCCESS
200+
201+
err_CloseIPCHandle:
202+
umf_result = umfCloseIPCHandle(SHM_ptr);
203+
if (umf_result != UMF_RESULT_SUCCESS) {
204+
fprintf(stderr, "[consumer] ERROR: closing the IPC handle failed\n");
205+
}
206+
207+
fprintf(stderr,
208+
"[consumer] Closed the IPC handle received from the producer\n");
209+
210+
err_close_producer_socket:
211+
close(producer_socket);
212+
213+
err_destroy_scalable_pool:
214+
umfPoolDestroy(scalable_pool);
215+
216+
err_destroy_OS_memory_provider:
217+
umfMemoryProviderDestroy(OS_memory_provider);
218+
219+
if (ret == 0) {
220+
fprintf(stderr, "[consumer] Shutting down (status OK) ...\n");
221+
} else if (ret == 1) {
222+
fprintf(stderr, "[consumer] Shutting down (status SKIP) ...\n");
223+
ret = 0;
224+
} else {
225+
fprintf(stderr, "[consumer] Shutting down (status ERROR) ...\n");
226+
}
227+
228+
return ret;
229+
}

0 commit comments

Comments
 (0)