Skip to content

Commit 94d5438

Browse files
author
chaoqin-li1123
committed
add share queue test
Signed-off-by: chaoqin-li1123 <[email protected]>
1 parent 5a204b0 commit 94d5438

File tree

3 files changed

+59
-11
lines changed

3 files changed

+59
-11
lines changed

src/shared_queue.cc

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,18 +136,19 @@ WasmResult SharedQueue::enqueue(uint32_t token, std::string_view value) {
136136
call_on_thread = target_queue->call_on_thread;
137137
target_queue->queue.push_back(std::string(value));
138138
}
139-
140-
call_on_thread([vm_key, context_id, token] {
141-
// This code may or may not execute in another thread.
142-
// Make sure that the lock is no longer held here.
143-
auto wasm = getThreadLocalWasm(vm_key);
144-
if (wasm) {
145-
auto context = wasm->wasm()->getContext(context_id);
146-
if (context) {
147-
context->onQueueReady(token);
139+
if (call_on_thread != nullptr) {
140+
call_on_thread([vm_key, context_id, token] {
141+
// This code may or may not execute in another thread.
142+
// Make sure that the lock is no longer held here.
143+
auto wasm = getThreadLocalWasm(vm_key);
144+
if (wasm) {
145+
auto context = wasm->wasm()->getContext(context_id);
146+
if (context) {
147+
context->onQueueReady(token);
148+
}
148149
}
149-
}
150-
});
150+
});
151+
}
151152
return WasmResult::Ok;
152153
}
153154

test/test_data/abi_export.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub extern "C" fn proxy_on_vm_start(_: u32, _: usize) -> bool {
2323
#[no_mangle]
2424
pub extern "C" fn proxy_on_context_create(_: u32, _: u32) {}
2525

26+
#[no_mangle]
27+
pub extern "C" fn proxy_on_queue_ready(_: u32, _: u32) {}
28+
2629
#[no_mangle]
2730
pub extern "C" fn proxy_on_memory_allocate(size: usize) -> *mut u8 {
2831
let mut vec: Vec<u8> = Vec::with_capacity(size);

test/wasm_test.cc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "gtest/gtest.h"
1818
#include <memory>
19+
#include <string>
1920

2021
#include "test/utility.h"
2122

@@ -147,4 +148,47 @@ TEST_P(WasmTest, DifferentRootContextsFromDifferentPluginKeys) {
147148
EXPECT_TRUE(root_context1->root_id() == root_context2->root_id());
148149
}
149150

151+
TEST_P(WasmTest, SharedQueueProducerConsumer) {
152+
const std::string plugin_name = "plugin_name";
153+
const std::string root_id = "root_id";
154+
const std::string plugin_config = "plugin_config";
155+
const bool fail_open = false;
156+
const std::shared_ptr<PluginBase> plugin1 = std::make_shared<PluginBase>(
157+
plugin_name, root_id, vm_id_, runtime_, plugin_config, fail_open, "plugin1_key");
158+
159+
const std::string vm_key = "vm_key";
160+
// Create base Wasm via createWasm.
161+
std::shared_ptr<WasmHandleBase> base_wasm_handle =
162+
createWasm(vm_key, source_, plugin1, wasm_handle_factory_, wasm_handle_clone_factory_, false);
163+
base_wasm_handle->wasm()->start(plugin1);
164+
ContextBase *root_context1 = base_wasm_handle->wasm()->getRootContext(plugin1, false);
165+
EXPECT_TRUE(root_context1 != nullptr);
166+
167+
// Create a new plugin with different key.
168+
const std::shared_ptr<PluginBase> plugin2 = std::make_shared<PluginBase>(
169+
plugin_name, root_id, vm_id_, runtime_, plugin_config, fail_open, "plugin2_key");
170+
EXPECT_TRUE(base_wasm_handle->wasm()->getRootContext(plugin2, false) == nullptr);
171+
172+
// Create context from a plugin2.
173+
base_wasm_handle->wasm()->start(plugin2);
174+
ContextBase *root_context2 = base_wasm_handle->wasm()->getRootContext(plugin2, false);
175+
EXPECT_TRUE(root_context2 != nullptr);
176+
177+
// Verify that the 2 contexts can communicate through a shared queue.
178+
std::string queue_name{"queue"};
179+
SharedQueueDequeueToken token1;
180+
EXPECT_EQ(root_context1->registerSharedQueue(queue_name, &token1), WasmResult::Ok);
181+
SharedQueueDequeueToken token2;
182+
EXPECT_EQ(root_context2->lookupSharedQueue(vm_id_, queue_name, &token2), WasmResult::Ok);
183+
EXPECT_EQ(token1, token2);
184+
for (int i = 0; i < 5; i++) {
185+
root_context1->enqueueSharedQueue(token1, std::to_string(i));
186+
}
187+
std::string data;
188+
for (int i = 0; i < 5; i++) {
189+
root_context1->dequeueSharedQueue(token1, &data);
190+
EXPECT_EQ(data, std::to_string(i));
191+
}
192+
}
193+
150194
} // namespace proxy_wasm

0 commit comments

Comments
 (0)