Skip to content

Commit cda179a

Browse files
committed
Add IPC shared memory example using the ipc.h API
Signed-off-by: Lukasz Dorau <[email protected]>
1 parent a27b777 commit cda179a

File tree

7 files changed

+554
-3
lines changed

7 files changed

+554
-3
lines changed

CMakeLists.txt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,8 +437,13 @@ install(FILES ${CMAKE_SOURCE_DIR}/LICENSE.TXT
437437
DESTINATION "${CMAKE_INSTALL_DATAROOTDIR}/doc/${PROJECT_NAME}/")
438438

439439
install(
440-
FILES examples/basic/gpu_shared_memory.c examples/basic/utils_level_zero.h
441-
examples/basic/basic.c examples/basic/ipc_level_zero.c
440+
FILES examples/basic/gpu_shared_memory.c
441+
examples/basic/utils_level_zero.h
442+
examples/basic/basic.c
443+
examples/basic/ipc_level_zero.c
444+
examples/basic/ipc_shm_ipcapi.sh
445+
examples/basic/ipc_shm_ipcapi_consumer.c
446+
examples/basic/ipc_shm_ipcapi_producer.c
442447
DESTINATION "${CMAKE_INSTALL_DOCDIR}/examples")
443448

444449
# Add the include directory and the headers target to the install.

examples/CMakeLists.txt

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,37 @@ else()
108108
"UMF_BUILD_LEVEL_ZERO_PROVIDER, UMF_BUILD_LIBUMF_POOL_DISJOINT and "
109109
"UMF_ENABLE_POOL_TRACKING to be turned ON - skipping")
110110
endif()
111+
112+
if(LINUX AND UMF_BUILD_LIBUMF_POOL_SCALABLE)
113+
set(BASE_NAME ipc_shm_ipcapi)
114+
set(EXAMPLE_NAME umf_example_${BASE_NAME})
115+
116+
foreach(loop_var IN ITEMS "producer" "consumer")
117+
set(EX_NAME ${EXAMPLE_NAME}_${loop_var})
118+
add_umf_executable(
119+
NAME ${EX_NAME}
120+
SRCS basic/${BASE_NAME}_${loop_var}.c
121+
LIBS umf scalable_pool)
122+
123+
target_include_directories(
124+
${EX_NAME} PRIVATE ${UMF_CMAKE_SOURCE_DIR}/src/utils
125+
${UMF_CMAKE_SOURCE_DIR}/include)
126+
127+
target_link_directories(${EX_NAME} PRIVATE ${LIBHWLOC_LIBRARY_DIRS})
128+
endforeach(loop_var)
129+
130+
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/basic/${BASE_NAME}.sh
131+
DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
132+
133+
add_test(
134+
NAME ${EXAMPLE_NAME}
135+
COMMAND ${BASE_NAME}.sh
136+
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
137+
138+
set_tests_properties(${EXAMPLE_NAME} PROPERTIES LABELS "example")
139+
else()
140+
message(
141+
STATUS
142+
"IPC shared memory example with UMF pool API is supported on Linux only - skipping"
143+
)
144+
endif()

examples/README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,32 @@ and build this example Level Zero development package should be installed.
3838
* Level Zero headers and libraries
3939
* compatible GPU with installed driver
4040
* set UMF_BUILD_GPU_EXAMPLES, UMF_BUILD_LIBUMF_POOL_DISJOINT, UMF_BUILD_LEVEL_ZERO_PROVIDER and UMF_ENABLE_POOL_TRACKING CMake configuration flags to ON
41+
42+
## IPC example with shared memory
43+
This example also demonstrates how to use UMF IPC API. The example creates two
44+
processes: a producer and a consumer that communicate in the following way
45+
(the initial value 140722582213392 in the shared memory is quasi-random):
46+
- Consumer starts
47+
- Consumer creates a socket
48+
- Consumer listens for incoming connections
49+
- Producer starts
50+
- Producer's shared memory contains a number: 140722582213392
51+
- Producer gets the IPC handle
52+
- Producer creates a socket
53+
- Producer connects to the consumer
54+
- Consumer connects at IP 127.0.0.1 and port 47770 to the producer
55+
- Producer sents the IPC handle to the consumer (24 bytes)
56+
- Consumer receives the IPC handle from the producer (24 bytes)
57+
- Consumer opens the IPC handle received from the producer
58+
- Consumer reads the number from the producer's shared memory: 140722582213392
59+
- Consumer writes a new number directly to the producer's shared memory: 70361291106696
60+
- Consumer sents a response message to the producer
61+
- Consumer closes the IPC handle received from the producer
62+
- Producer receives the response from the consumer: "This is the consumer. I just wrote a new number directly into your shared memory!"
63+
- Producer verifies the consumer wrote the correct value (the old one / 2) to the producer's shared memory: 70361291106696
64+
- Producer puts the IPC handle
65+
- Consumer shuts down
66+
- Producer shuts down
67+
68+
### Requirements
69+
* set UMF_BUILD_LIBUMF_POOL_SCALABLE CMake configuration flag to ON

examples/basic/ipc_shm_ipcapi.sh

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#
2+
# Copyright (C) 2024 Intel Corporation
3+
#
4+
# Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT.
5+
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6+
#
7+
8+
#!/bin/bash
9+
10+
# port should be a number from the range <1024, 65535>
11+
PORT=$(( 1024 + ( $$ % ( 65535 - 1024 ))))
12+
13+
# The ipc_shm_ipcapi example requires using pidfd_getfd(2)
14+
# to obtain a duplicate of another process's file descriptor.
15+
# Permission to duplicate another process's file descriptor
16+
# is governed by a ptrace access mode PTRACE_MODE_ATTACH_REALCREDS check (see ptrace(2))
17+
# that can be changed using the /proc/sys/kernel/yama/ptrace_scope interface.
18+
PTRACE_SCOPE_FILE="/proc/sys/kernel/yama/ptrace_scope"
19+
VAL=0
20+
if [ -f $PTRACE_SCOPE_FILE ]; then
21+
PTRACE_SCOPE_VAL=$(cat $PTRACE_SCOPE_FILE)
22+
if [ $PTRACE_SCOPE_VAL -ne $VAL ]; then
23+
echo "Setting ptrace_scope to 0 (classic ptrace permissions) ..."
24+
echo "$ sudo bash -c \"echo $VAL > $PTRACE_SCOPE_FILE\""
25+
sudo bash -c "echo $VAL > $PTRACE_SCOPE_FILE"
26+
fi
27+
PTRACE_SCOPE_VAL=$(cat $PTRACE_SCOPE_FILE)
28+
if [ $PTRACE_SCOPE_VAL -ne $VAL ]; then
29+
echo "SKIP: setting ptrace_scope to 0 (classic ptrace permissions) FAILED - skipping the test"
30+
exit 0
31+
fi
32+
fi
33+
34+
UMF_LOG_VAL="level:debug;flush:debug;output:stderr;pid:yes"
35+
36+
echo "Starting ipc_shm_ipcapi CONSUMER on port $PORT ..."
37+
UMF_LOG=$UMF_LOG_VAL ./umf_example_ipc_shm_ipcapi_consumer $PORT &
38+
39+
echo "Waiting 1 sec ..."
40+
sleep 1
41+
42+
echo "Starting ipc_shm_ipcapi PRODUCER on port $PORT ..."
43+
UMF_LOG=$UMF_LOG_VAL ./umf_example_ipc_shm_ipcapi_producer $PORT
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
* Copyright (C) 2024 Intel Corporation
3+
*
4+
* Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT.
5+
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6+
*/
7+
8+
#include <arpa/inet.h>
9+
#include <stdio.h>
10+
#include <stdlib.h>
11+
#include <string.h>
12+
#include <sys/socket.h>
13+
#include <unistd.h>
14+
15+
#include <umf/ipc.h>
16+
#include <umf/memory_pool.h>
17+
#include <umf/pools/pool_scalable.h>
18+
#include <umf/providers/provider_os_memory.h>
19+
20+
#define INET_ADDR "127.0.0.1"
21+
#define CONSUMER_MSG_SIZE 256
22+
#define RECV_BUFF_SIZE 1024
23+
24+
// consumer's response message
25+
#define CONSUMER_MSG \
26+
"This is the consumer. I just wrote a new number directly into your " \
27+
"shared memory!"
28+
29+
int main(int argc, char *argv[]) {
30+
struct sockaddr_in consumer_addr;
31+
struct sockaddr_in producer_addr;
32+
char consumer_message[CONSUMER_MSG_SIZE];
33+
char recv_buffer[RECV_BUFF_SIZE];
34+
int producer_socket;
35+
int producer_addr_len;
36+
int consumer_socket;
37+
int ret = -1;
38+
39+
if (argc < 2) {
40+
fprintf(stderr, "usage: %s port\n", argv[0]);
41+
return -1;
42+
}
43+
44+
int port = atoi(argv[1]);
45+
46+
umf_memory_provider_handle_t OS_memory_provider = NULL;
47+
umf_os_memory_provider_params_t os_params;
48+
enum umf_result_t umf_result;
49+
50+
os_params = umfOsMemoryProviderParamsDefault();
51+
os_params.visibility = UMF_MEM_MAP_SHARED;
52+
53+
// create OS memory provider
54+
umf_result = umfMemoryProviderCreate(umfOsMemoryProviderOps(), &os_params,
55+
&OS_memory_provider);
56+
if (umf_result != UMF_RESULT_SUCCESS) {
57+
fprintf(stderr,
58+
"[consumer] ERROR: creating OS memory provider failed\n");
59+
return -1;
60+
}
61+
62+
umf_memory_pool_handle_t scalable_pool;
63+
umf_result = umfPoolCreate(umfScalablePoolOps(), OS_memory_provider, NULL,
64+
0, &scalable_pool);
65+
if (umf_result != UMF_RESULT_SUCCESS) {
66+
fprintf(stderr,
67+
"[producer] ERROR: creating jemalloc UMF pool failed\n");
68+
goto err_destroy_OS_memory_provider;
69+
}
70+
71+
// create socket
72+
consumer_socket = socket(AF_INET, SOCK_STREAM, 0);
73+
if (consumer_socket < 0) {
74+
fprintf(stderr, "[consumer] ERROR: creating socket failed\n");
75+
goto err_destroy_scalable_pool;
76+
}
77+
78+
fprintf(stderr, "[consumer] Socket created\n");
79+
80+
// set port and IP address
81+
consumer_addr.sin_family = AF_INET;
82+
consumer_addr.sin_port = htons(port);
83+
consumer_addr.sin_addr.s_addr = inet_addr(INET_ADDR);
84+
85+
// bind to the IP address and port
86+
if (bind(consumer_socket, (struct sockaddr *)&consumer_addr,
87+
sizeof(consumer_addr)) < 0) {
88+
fprintf(stderr, "[consumer] ERROR: cannot bind to the port\n");
89+
goto err_close_consumer_socket;
90+
}
91+
92+
fprintf(stderr, "[consumer] Binding done\n");
93+
94+
// listen for the producer
95+
if (listen(consumer_socket, 1) < 0) {
96+
fprintf(stderr, "[consumer] ERROR: listen() failed\n");
97+
goto err_close_consumer_socket;
98+
}
99+
100+
fprintf(stderr, "[consumer] Listening for incoming connections ...\n");
101+
102+
// accept an incoming connection
103+
producer_addr_len = sizeof(producer_addr);
104+
producer_socket = accept(consumer_socket, (struct sockaddr *)&producer_addr,
105+
(socklen_t *)&producer_addr_len);
106+
if (producer_socket < 0) {
107+
fprintf(stderr, "[consumer] ERROR: accept() failed\n");
108+
goto err_close_consumer_socket;
109+
}
110+
111+
fprintf(stderr, "[consumer] Producer connected at IP %s and port %i\n",
112+
inet_ntoa(producer_addr.sin_addr), ntohs(producer_addr.sin_port));
113+
114+
// receive the IPC handle from the producer's
115+
ssize_t len = recv(producer_socket, recv_buffer, RECV_BUFF_SIZE, 0);
116+
if (len < 0) {
117+
fprintf(stderr, "[consumer] ERROR: recv() failed\n");
118+
goto err_close_producer_socket;
119+
}
120+
121+
umf_ipc_handle_t IPC_handle = (umf_ipc_handle_t)recv_buffer;
122+
123+
fprintf(
124+
stderr,
125+
"[consumer] Received the IPC handle from the producer (%zi bytes)\n",
126+
len);
127+
128+
void *SHM_ptr;
129+
umf_result = umfOpenIPCHandle(scalable_pool, IPC_handle, &SHM_ptr);
130+
if (umf_result == UMF_RESULT_ERROR_NOT_SUPPORTED) {
131+
fprintf(stderr,
132+
"[consumer] SKIP: opening the IPC handle is not supported\n");
133+
ret = 1; // SKIP
134+
135+
// write the SKIP response to the consumer_message buffer
136+
strcpy(consumer_message, "SKIP");
137+
138+
// send the SKIP response to the producer
139+
send(producer_socket, consumer_message, strlen(consumer_message) + 1,
140+
0);
141+
142+
goto err_close_producer_socket;
143+
}
144+
if (umf_result != UMF_RESULT_SUCCESS) {
145+
fprintf(stderr, "[consumer] ERROR: opening the IPC handle failed\n");
146+
goto err_close_producer_socket;
147+
}
148+
149+
fprintf(stderr,
150+
"[consumer] Opened the IPC handle received from the producer\n");
151+
152+
// read the current value from the shared memory
153+
unsigned long long SHM_number_1 = *(unsigned long long *)SHM_ptr;
154+
fprintf(
155+
stderr,
156+
"[consumer] Read the number from the producer's shared memory: %llu\n",
157+
SHM_number_1);
158+
159+
// calculate the new value
160+
unsigned long long SHM_number_2 = SHM_number_1 / 2;
161+
162+
// write the new number directly to the producer's shared memory
163+
*(unsigned long long *)SHM_ptr = SHM_number_2;
164+
fprintf(stderr,
165+
"[consumer] Wrote a new number directly to the producer's shared "
166+
"memory: %llu\n",
167+
SHM_number_2);
168+
169+
// write the response to the consumer_message buffer
170+
memset(consumer_message, 0, sizeof(consumer_message));
171+
strcpy(consumer_message, CONSUMER_MSG);
172+
173+
// send response to the producer
174+
if (send(producer_socket, consumer_message, strlen(consumer_message) + 1,
175+
0) < 0) {
176+
fprintf(stderr, "[consumer] ERROR: send() failed\n");
177+
goto err_CloseIPCHandle;
178+
}
179+
180+
fprintf(stderr, "[consumer] Sent a response message to the producer\n");
181+
182+
ret = 0; // SUCCESS
183+
184+
err_CloseIPCHandle:
185+
umf_result = umfCloseIPCHandle(SHM_ptr);
186+
if (umf_result != UMF_RESULT_SUCCESS) {
187+
fprintf(stderr, "[consumer] ERROR: closing the IPC handle failed\n");
188+
}
189+
190+
fprintf(stderr,
191+
"[consumer] Closed the IPC handle received from the producer\n");
192+
193+
err_close_producer_socket:
194+
close(producer_socket);
195+
196+
err_close_consumer_socket:
197+
close(consumer_socket);
198+
199+
err_destroy_scalable_pool:
200+
umfPoolDestroy(scalable_pool);
201+
202+
err_destroy_OS_memory_provider:
203+
umfMemoryProviderDestroy(OS_memory_provider);
204+
205+
if (ret == 0) {
206+
fprintf(stderr, "[consumer] Shutting down (status OK) ...\n");
207+
} else if (ret == 1) {
208+
fprintf(stderr, "[consumer] Shutting down (status SKIP) ...\n");
209+
ret = 0;
210+
} else {
211+
fprintf(stderr, "[consumer] Shutting down (status ERROR) ...\n");
212+
}
213+
214+
return ret;
215+
}

0 commit comments

Comments
 (0)