Skip to content

Commit 32f7342

Browse files
committed
Add multiprocess test for L0 IPC
1 parent 1814d90 commit 32f7342

12 files changed

+1364
-753
lines changed

test/CMakeLists.txt

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -249,33 +249,71 @@ endif()
249249

250250
add_umf_test(NAME ipc SRCS ipcAPI.cpp)
251251

252-
function(build_umf_ipc_test name)
253-
set(BASE_NAME ${name})
252+
function(build_umf_ipc_test)
253+
# Parameters:
254+
#
255+
# * NAME - a name of the test
256+
# * SRC_DIR - source files directory path
257+
# * LIBS - libraries to be linked with
258+
set(oneValueArgs NAME SRC_DIR)
259+
set(multiValueArgs LIBS)
260+
cmake_parse_arguments(
261+
ARG
262+
""
263+
"${oneValueArgs}"
264+
"${multiValueArgs}"
265+
${ARGN})
266+
267+
set(BASE_NAME ${ARG_NAME})
268+
269+
if(DEFINED ARG_SRC_DIR)
270+
set(SRC_DIR ${ARG_SRC_DIR})
271+
else()
272+
set(SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR})
273+
endif()
254274

255275
foreach(loop_var IN ITEMS "producer" "consumer")
256276
set(EXEC_NAME umf_test-${BASE_NAME}_${loop_var})
257277
add_umf_executable(
258278
NAME ${EXEC_NAME}
259-
SRCS ${BASE_NAME}_${loop_var}.c
260-
LIBS umf)
279+
SRCS ${SRC_DIR}/${BASE_NAME}_${loop_var}.c
280+
LIBS umf ${ARG_LIBS})
261281

262282
target_include_directories(
263-
${EXEC_NAME} PRIVATE ${UMF_CMAKE_SOURCE_DIR}/src/utils
264-
${UMF_CMAKE_SOURCE_DIR}/include)
283+
${EXEC_NAME}
284+
PRIVATE ${UMF_CMAKE_SOURCE_DIR}/src/utils
285+
${UMF_CMAKE_SOURCE_DIR}/include ${UMF_TEST_DIR}/common)
265286

266287
target_link_directories(${EXEC_NAME} PRIVATE ${LIBHWLOC_LIBRARY_DIRS})
267288
endforeach(loop_var)
268289
endfunction()
269290

270-
function(add_umf_ipc_test script)
271-
set(TEST_NAME umf-${script})
291+
function(add_umf_ipc_test)
292+
# Parameters:
293+
#
294+
# * TEST - a name of the test
295+
# * SRC_DIR - source files directory path
296+
set(oneValueArgs TEST SRC_DIR)
297+
cmake_parse_arguments(
298+
ARG
299+
""
300+
"${oneValueArgs}"
301+
""
302+
${ARGN})
303+
304+
set(TEST_NAME umf-${ARG_TEST})
272305

273-
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/${script}.sh
274-
DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
306+
if(DEFINED ARG_SRC_DIR)
307+
set(SRC_DIR ${ARG_SRC_DIR})
308+
else()
309+
set(SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR})
310+
endif()
311+
312+
file(COPY ${SRC_DIR}/${ARG_TEST}.sh DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
275313

276314
add_test(
277315
NAME ${TEST_NAME}
278-
COMMAND ${script}.sh
316+
COMMAND ${ARG_TEST}.sh
279317
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
280318

281319
set_tests_properties(${TEST_NAME} PROPERTIES LABELS "umf")
@@ -285,9 +323,25 @@ function(add_umf_ipc_test script)
285323
endfunction()
286324

287325
if(LINUX)
288-
build_umf_ipc_test(ipc_os_prov)
289-
add_umf_ipc_test(ipc_os_prov_anon_fd)
290-
add_umf_ipc_test(ipc_os_prov_shm)
326+
build_umf_ipc_test(NAME ipc_os_prov)
327+
add_umf_ipc_test(TEST ipc_os_prov_anon_fd)
328+
add_umf_ipc_test(TEST ipc_os_prov_shm)
329+
330+
if(UMF_BUILD_GPU_TESTS AND UMF_BUILD_LEVEL_ZERO_PROVIDER)
331+
build_umf_ipc_test(
332+
NAME
333+
ipc_level_zero_prov
334+
SRC_DIR
335+
providers
336+
LIBS
337+
ze_loader
338+
${UMF_UTILS_FOR_TEST})
339+
target_include_directories(umf_test-ipc_level_zero_prov_producer
340+
PRIVATE ${LEVEL_ZERO_INCLUDE_DIRS})
341+
target_include_directories(umf_test-ipc_level_zero_prov_consumer
342+
PRIVATE ${LEVEL_ZERO_INCLUDE_DIRS})
343+
add_umf_ipc_test(TEST ipc_level_zero_prov SRC_DIR providers)
344+
endif()
291345
else()
292346
message(STATUS "IPC tests are supported on Linux only - skipping")
293347
endif()

test/common/ipc_prov_consumer.h

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/*
2+
* Copyright (C) 2024 Intel Corporation
3+
*
4+
* Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT.
5+
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6+
*/
7+
8+
#include <arpa/inet.h>
9+
#include <stdio.h>
10+
#include <stdlib.h>
11+
#include <string.h>
12+
#include <sys/socket.h>
13+
#include <unistd.h>
14+
15+
#include <umf/memory_provider.h>
16+
17+
#define INET_ADDR "127.0.0.1"
18+
#define MSG_SIZE 256
19+
#define RECV_BUFF_SIZE 1024
20+
21+
// consumer's response message
22+
#define CONSUMER_MSG \
23+
"This is the consumer. I just wrote a new number directly into your " \
24+
"shared memory!"
25+
26+
/*
27+
Generally communication between the producer and the consumer looks like:
28+
- Consumer starts
29+
- Consumer creates a socket
30+
- Consumer listens for incoming connections
31+
- Producer starts
32+
- Producer's shared memory contains a number: N
33+
- Producer gets the IPC handle
34+
- Producer creates a socket
35+
- Producer connects to the consumer
36+
- Consumer connects at IP 127.0.0.1 and a port to the producer
37+
- Producer sends the IPC handle to the consumer
38+
- Consumer receives the IPC handle from the producer
39+
- Consumer opens the IPC handle received from the producer
40+
- Consumer reads the number from the producer's shared memory: N
41+
- Consumer writes a new number directly to the producer's shared memory: N/2
42+
- Consumer sends a response message to the producer
43+
- Consumer closes the IPC handle received from the producer
44+
- Producer receives the response from the consumer: "This is the consumer. I just wrote a new number directly into your shared memory!"
45+
- Producer verifies the consumer wrote the correct value (the old one / 2) to the producer's shared memory: N/2
46+
- Producer puts the IPC handle
47+
- Consumer shuts down
48+
- Producer shuts down
49+
*/
50+
51+
int consumer_connect(int port) {
52+
struct sockaddr_in consumer_addr;
53+
struct sockaddr_in producer_addr;
54+
int producer_addr_len;
55+
int producer_socket = -1;
56+
int consumer_socket = -1;
57+
int ret = -1;
58+
59+
// create a socket
60+
consumer_socket = socket(AF_INET, SOCK_STREAM, 0);
61+
if (consumer_socket < 0) {
62+
fprintf(stderr, "[consumer] ERROR: creating socket failed\n");
63+
return -1;
64+
}
65+
66+
fprintf(stderr, "[consumer] Socket created\n");
67+
68+
// set the IP address and the port
69+
consumer_addr.sin_family = AF_INET;
70+
consumer_addr.sin_port = htons(port);
71+
consumer_addr.sin_addr.s_addr = inet_addr(INET_ADDR);
72+
73+
// bind to the IP address and the port
74+
if (bind(consumer_socket, (struct sockaddr *)&consumer_addr,
75+
sizeof(consumer_addr)) < 0) {
76+
fprintf(stderr, "[consumer] ERROR: cannot bind to the port\n");
77+
goto err_close_consumer_socket;
78+
}
79+
80+
fprintf(stderr, "[consumer] Binding done\n");
81+
82+
// listen for the producer
83+
if (listen(consumer_socket, 1) < 0) {
84+
fprintf(stderr, "[consumer] ERROR: listen() failed\n");
85+
goto err_close_consumer_socket;
86+
}
87+
88+
fprintf(stderr, "[consumer] Listening for incoming connections ...\n");
89+
90+
// accept an incoming connection
91+
producer_addr_len = sizeof(producer_addr);
92+
producer_socket = accept(consumer_socket, (struct sockaddr *)&producer_addr,
93+
(socklen_t *)&producer_addr_len);
94+
if (producer_socket < 0) {
95+
fprintf(stderr, "[consumer] ERROR: accept() failed\n");
96+
goto err_close_consumer_socket;
97+
}
98+
99+
fprintf(stderr, "[consumer] Producer connected at IP %s and port %i\n",
100+
inet_ntoa(producer_addr.sin_addr), ntohs(producer_addr.sin_port));
101+
102+
ret = producer_socket; // success
103+
104+
err_close_consumer_socket:
105+
close(consumer_socket);
106+
107+
return ret;
108+
}
109+
110+
//pointer to the function that returns void and accept two int values
111+
typedef void (*memcopy_callback_t)(void *dst, const void *src, size_t size,
112+
void *context);
113+
114+
int run_consumer(int port, umf_memory_provider_ops_t *provider_ops,
115+
void *provider_params, memcopy_callback_t memcopy_callback,
116+
void *memcopy_ctx) {
117+
char consumer_message[MSG_SIZE];
118+
char recv_buffer[RECV_BUFF_SIZE];
119+
int producer_socket = -1;
120+
int ret = -1;
121+
umf_memory_provider_handle_t provider = NULL;
122+
umf_result_t umf_result = UMF_RESULT_ERROR_UNKNOWN;
123+
124+
// zero the consumer_message buffer
125+
memset(consumer_message, 0, sizeof(consumer_message));
126+
127+
// create OS memory provider
128+
umf_result =
129+
umfMemoryProviderCreate(provider_ops, provider_params, &provider);
130+
if (umf_result != UMF_RESULT_SUCCESS) {
131+
fprintf(stderr,
132+
"[consumer] ERROR: creating OS memory provider failed\n");
133+
return -1;
134+
}
135+
136+
// get the size of the IPC handle
137+
size_t IPC_handle_size;
138+
umf_result = umfMemoryProviderGetIPCHandleSize(provider, &IPC_handle_size);
139+
if (umf_result != UMF_RESULT_SUCCESS) {
140+
fprintf(stderr,
141+
"[consumer] ERROR: getting size of the IPC handle failed\n");
142+
goto err_umfMemoryProviderDestroy;
143+
}
144+
145+
producer_socket = consumer_connect(port);
146+
if (producer_socket < 0) {
147+
goto err_umfMemoryProviderDestroy;
148+
}
149+
150+
// zero the receive buffer
151+
memset(recv_buffer, 0, RECV_BUFF_SIZE);
152+
153+
// receive a producer's message
154+
ssize_t len = recv(producer_socket, recv_buffer, RECV_BUFF_SIZE, 0);
155+
if (len < 0) {
156+
fprintf(stderr, "[consumer] ERROR: recv() failed\n");
157+
goto err_close_producer_socket;
158+
}
159+
if (len != IPC_handle_size) {
160+
fprintf(stderr,
161+
"[consumer] ERROR: recv() received a wrong number of bytes "
162+
"(%zi != %zu expected)\n",
163+
len, IPC_handle_size);
164+
goto err_close_producer_socket;
165+
}
166+
167+
void *IPC_handle = recv_buffer;
168+
169+
fprintf(
170+
stderr,
171+
"[consumer] Received the IPC handle from the producer (%zi bytes)\n",
172+
len);
173+
174+
void *SHM_ptr;
175+
umf_result = umfMemoryProviderOpenIPCHandle(provider, IPC_handle, &SHM_ptr);
176+
if (umf_result == UMF_RESULT_ERROR_NOT_SUPPORTED) {
177+
fprintf(stderr,
178+
"[consumer] SKIP: opening the IPC handle is not supported\n");
179+
ret = 1; // SKIP
180+
181+
// write the SKIP response to the consumer_message buffer
182+
strcpy(consumer_message, "SKIP");
183+
184+
// send the SKIP response to the producer
185+
send(producer_socket, consumer_message, strlen(consumer_message) + 1,
186+
0);
187+
188+
goto err_close_producer_socket;
189+
}
190+
if (umf_result != UMF_RESULT_SUCCESS) {
191+
fprintf(stderr, "[consumer] ERROR: opening the IPC handle failed\n");
192+
goto err_close_producer_socket;
193+
}
194+
195+
fprintf(stderr,
196+
"[consumer] Opened the IPC handle received from the producer\n");
197+
198+
// read the current value from the shared memory
199+
unsigned long long SHM_number_1 = 0;
200+
memcopy_callback(&SHM_number_1, SHM_ptr, sizeof(SHM_number_1), memcopy_ctx);
201+
fprintf(
202+
stderr,
203+
"[consumer] Read the number from the producer's shared memory: %llu\n",
204+
SHM_number_1);
205+
206+
// calculate the new value
207+
unsigned long long SHM_number_2 = SHM_number_1 / 2;
208+
209+
// write the new number directly to the producer's shared memory
210+
memcopy_callback(SHM_ptr, &SHM_number_2, sizeof(SHM_number_2), memcopy_ctx);
211+
fprintf(stderr,
212+
"[consumer] Wrote a new number directly to the producer's shared "
213+
"memory: %llu\n",
214+
SHM_number_2);
215+
216+
// write the response to the consumer_message buffer
217+
strcpy(consumer_message, CONSUMER_MSG);
218+
219+
// send response to the producer
220+
if (send(producer_socket, consumer_message, strlen(consumer_message) + 1,
221+
0) < 0) {
222+
fprintf(stderr, "[consumer] ERROR: send() failed\n");
223+
goto err_closeIPCHandle;
224+
}
225+
226+
fprintf(stderr, "[consumer] Sent a response message to the producer\n");
227+
228+
ret = 0; // SUCCESS
229+
230+
err_closeIPCHandle:
231+
// we do not know the exact size of the remote shared memory
232+
umf_result = umfMemoryProviderCloseIPCHandle(provider, SHM_ptr,
233+
sizeof(unsigned long long));
234+
if (umf_result != UMF_RESULT_SUCCESS) {
235+
fprintf(stderr, "[consumer] ERROR: closing the IPC handle failed\n");
236+
}
237+
238+
fprintf(stderr,
239+
"[consumer] Closed the IPC handle received from the producer\n");
240+
241+
err_close_producer_socket:
242+
close(producer_socket);
243+
244+
err_umfMemoryProviderDestroy:
245+
umfMemoryProviderDestroy(provider);
246+
247+
if (ret == 0) {
248+
fprintf(stderr, "[consumer] Shutting down (status OK) ...\n");
249+
} else if (ret == 1) {
250+
fprintf(stderr, "[consumer] Shutting down (status SKIP) ...\n");
251+
ret = 0;
252+
} else {
253+
fprintf(stderr, "[consumer] Shutting down (status ERROR) ...\n");
254+
}
255+
256+
return ret;
257+
}

0 commit comments

Comments
 (0)