Skip to content

Commit fa18f00

Browse files
committed
Allow execution of multiple instances of the same plugin.
Signed-off-by: Piotr Sikora <[email protected]>
1 parent 40fd3d0 commit fa18f00

File tree

4 files changed

+86
-71
lines changed

4 files changed

+86
-71
lines changed

include/proxy-wasm/context.h

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,23 @@ struct PluginBase {
5050
std::string_view runtime, std::string_view plugin_configuration, bool fail_open)
5151
: name_(std::string(name)), root_id_(std::string(root_id)), vm_id_(std::string(vm_id)),
5252
runtime_(std::string(runtime)), plugin_configuration_(plugin_configuration),
53-
fail_open_(fail_open) {}
53+
fail_open_(fail_open), key_(root_id_ + "||" + plugin_configuration_) {}
5454

5555
const std::string name_;
5656
const std::string root_id_;
5757
const std::string vm_id_;
5858
const std::string runtime_;
59-
std::string plugin_configuration_;
59+
const std::string plugin_configuration_;
6060
const bool fail_open_;
61+
62+
const std::string &key() const { return key_; }
6163
const std::string &log_prefix() const { return log_prefix_; }
6264

6365
private:
6466
std::string makeLogPrefix() const;
6567

66-
std::string log_prefix_;
68+
const std::string key_;
69+
const std::string log_prefix_;
6770
};
6871

6972
struct BufferBase : public BufferInterface {
@@ -373,16 +376,16 @@ class ContextBase : public RootInterface,
373376
protected:
374377
friend class WasmBase;
375378

376-
void initializeRootBase(WasmBase *wasm, std::shared_ptr<PluginBase> plugin);
377379
std::string makeRootLogPrefix(std::string_view vm_id) const;
378380

379381
WasmBase *wasm_{nullptr};
380382
uint32_t id_{0};
381-
uint32_t parent_context_id_{0}; // 0 for roots and the general context.
382-
ContextBase *parent_context_{nullptr}; // set in all contexts.
383-
std::string root_id_; // set only in root context.
384-
std::string root_log_prefix_; // set only in root context.
385-
std::shared_ptr<PluginBase> plugin_;
383+
uint32_t parent_context_id_{0}; // 0 for roots and the general context.
384+
ContextBase *parent_context_{nullptr}; // set in all contexts.
385+
std::string root_id_; // set only in root context.
386+
std::string root_log_prefix_; // set only in root context.
387+
std::shared_ptr<PluginBase> plugin_; // set in root and stream contexts.
388+
std::shared_ptr<PluginBase> temp_plugin_; // Remove once ABI v0.1.0 is gone.
386389
bool in_vm_context_created_ = false;
387390
bool destroyed_ = false;
388391
};

include/proxy-wasm/wasm.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ class WasmBase : public std::enable_shared_from_this<WasmBase> {
5959
std::string_view vm_key() const { return vm_key_; }
6060
WasmVm *wasm_vm() const { return wasm_vm_.get(); }
6161
ContextBase *vm_context() const { return vm_context_.get(); }
62-
ContextBase *getRootContext(std::string_view root_id);
63-
ContextBase *getOrCreateRootContext(const std::shared_ptr<PluginBase> &plugin);
62+
ContextBase *getRootContext(const std::shared_ptr<PluginBase> &plugin, bool allow_closed);
6463
ContextBase *getContext(uint32_t id) {
6564
auto it = contexts_.find(id);
6665
if (it != contexts_.end())
@@ -78,6 +77,7 @@ class WasmBase : public std::enable_shared_from_this<WasmBase> {
7877
void timerReady(uint32_t root_context_id);
7978
void queueReady(uint32_t root_context_id, uint32_t token);
8079

80+
void startShutdown(const std::shared_ptr<PluginBase> &plugin);
8181
void startShutdown();
8282
WasmResult done(ContextBase *root_context);
8383
void finishShutdown();
@@ -164,11 +164,12 @@ class WasmBase : public std::enable_shared_from_this<WasmBase> {
164164
uint32_t next_context_id_ = 1; // 0 is reserved for the VM context.
165165
std::shared_ptr<ContextBase> vm_context_; // Context unrelated to any specific root or stream
166166
// (e.g. for global constructors).
167-
std::unordered_map<std::string, std::unique_ptr<ContextBase>> root_contexts_;
167+
std::unordered_map<std::string, std::unique_ptr<ContextBase>> root_contexts_; // Root contexts.
168+
std::unordered_map<std::string, std::unique_ptr<ContextBase>> pending_done_; // Root contexts.
169+
std::unordered_set<std::unique_ptr<ContextBase>> pending_delete_; // Root contexts.
168170
std::unordered_map<uint32_t, ContextBase *> contexts_; // Contains all contexts.
169171
std::unordered_map<uint32_t, std::chrono::milliseconds> timer_period_; // per root_id.
170172
std::unique_ptr<ShutdownHandle> shutdown_handle_;
171-
std::unordered_set<ContextBase *> pending_done_; // Root contexts not done during shutdown.
172173

173174
WasmCallVoid<0> _initialize_; /* Emscripten v1.39.17+ */
174175
WasmCallVoid<0> _start_; /* Emscripten v1.39.0+ */

src/context.cc

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,10 @@ ContextBase::ContextBase(WasmBase *wasm) : wasm_(wasm), parent_context_(this) {
269269
wasm_->contexts_[id_] = this;
270270
}
271271

272-
ContextBase::ContextBase(WasmBase *wasm, std::shared_ptr<PluginBase> plugin) {
273-
initializeRootBase(wasm, plugin);
272+
ContextBase::ContextBase(WasmBase *wasm, std::shared_ptr<PluginBase> plugin)
273+
: wasm_(wasm), id_(wasm->allocContextId()), parent_context_(this), root_id_(plugin->root_id_),
274+
root_log_prefix_(makeRootLogPrefix(plugin->vm_id_)), plugin_(plugin) {
275+
wasm_->contexts_[id_] = this;
274276
}
275277

276278
// NB: wasm can be nullptr if it failed to be created successfully.
@@ -288,15 +290,6 @@ WasmVm *ContextBase::wasmVm() const { return wasm_->wasm_vm(); }
288290

289291
bool ContextBase::isFailed() { return !wasm_ || wasm_->isFailed(); }
290292

291-
void ContextBase::initializeRootBase(WasmBase *wasm, std::shared_ptr<PluginBase> plugin) {
292-
wasm_ = wasm;
293-
id_ = wasm->allocContextId();
294-
root_id_ = plugin->root_id_;
295-
root_log_prefix_ = makeRootLogPrefix(plugin->vm_id_);
296-
parent_context_ = this;
297-
wasm_->contexts_[id_] = this;
298-
}
299-
300293
std::string ContextBase::makeRootLogPrefix(std::string_view vm_id) const {
301294
std::string prefix;
302295
if (!root_id_.empty()) {
@@ -315,10 +308,10 @@ bool ContextBase::onStart(std::shared_ptr<PluginBase> plugin) {
315308
DeferAfterCallActions actions(this);
316309
bool result = true;
317310
if (wasm_->on_context_create_) {
318-
plugin_ = plugin;
311+
temp_plugin_ = plugin;
319312
wasm_->on_context_create_(this, id_, 0);
320313
in_vm_context_created_ = true;
321-
plugin_.reset();
314+
temp_plugin_.reset();
322315
}
323316
if (wasm_->on_vm_start_) {
324317
// Do not set plugin_ as the on_vm_start handler should be independent of the plugin since the
@@ -350,11 +343,11 @@ bool ContextBase::onConfigure(std::shared_ptr<PluginBase> plugin) {
350343
}
351344

352345
DeferAfterCallActions actions(this);
353-
plugin_ = plugin;
346+
temp_plugin_ = plugin;
354347
auto result =
355348
wasm_->on_configure_(this, id_, static_cast<uint32_t>(plugin->plugin_configuration_.size()))
356349
.u64_ != 0;
357-
plugin_.reset();
350+
temp_plugin_.reset();
358351
return result;
359352
}
360353

@@ -644,8 +637,8 @@ WasmResult ContextBase::setTimerPeriod(std::chrono::milliseconds period,
644637
}
645638

646639
ContextBase::~ContextBase() {
647-
// Do not remove vm or root contexts which have the same lifetime as wasm_.
648-
if (parent_context_id_) {
640+
// Do not remove vm context which has the same lifetime as wasm_.
641+
if (id_) {
649642
wasm_->contexts_.erase(id_);
650643
}
651644
}

src/wasm.cc

Lines changed: 59 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,11 @@ WasmBase::WasmBase(std::unique_ptr<WasmVm> wasm_vm, std::string_view vm_id,
280280
}
281281
}
282282

283-
WasmBase::~WasmBase() {}
283+
WasmBase::~WasmBase() {
284+
root_contexts_.clear();
285+
pending_done_.clear();
286+
pending_delete_.clear();
287+
}
284288

285289
bool WasmBase::initialize(const std::string &code, bool allow_precompiled) {
286290
if (!wasm_vm_) {
@@ -319,22 +323,19 @@ bool WasmBase::initialize(const std::string &code, bool allow_precompiled) {
319323
return !isFailed();
320324
}
321325

322-
ContextBase *WasmBase::getRootContext(std::string_view root_id) {
323-
auto it = root_contexts_.find(std::string(root_id));
324-
if (it == root_contexts_.end()) {
325-
return nullptr;
326+
ContextBase *WasmBase::getRootContext(const std::shared_ptr<PluginBase> &plugin,
327+
bool allow_closed) {
328+
auto it = root_contexts_.find(plugin->key());
329+
if (it != root_contexts_.end()) {
330+
return it->second.get();
326331
}
327-
return it->second.get();
328-
}
329-
330-
ContextBase *WasmBase::getOrCreateRootContext(const std::shared_ptr<PluginBase> &plugin) {
331-
auto root_context = getRootContext(plugin->root_id_);
332-
if (!root_context) {
333-
auto context = std::unique_ptr<ContextBase>(createRootContext(plugin));
334-
root_context = context.get();
335-
root_contexts_[plugin->root_id_] = std::move(context);
332+
if (allow_closed) {
333+
it = pending_done_.find(plugin->key());
334+
if (it != pending_done_.end()) {
335+
return it->second.get();
336+
}
336337
}
337-
return root_context;
338+
return nullptr;
338339
}
339340

340341
void WasmBase::startVm(ContextBase *root_context) {
@@ -352,15 +353,14 @@ bool WasmBase::configure(ContextBase *root_context, std::shared_ptr<PluginBase>
352353
}
353354

354355
ContextBase *WasmBase::start(std::shared_ptr<PluginBase> plugin) {
355-
auto root_id = plugin->root_id_;
356-
auto it = root_contexts_.find(root_id);
356+
auto it = root_contexts_.find(plugin->key());
357357
if (it != root_contexts_.end()) {
358358
it->second->onStart(plugin);
359359
return it->second.get();
360360
}
361361
auto context = std::unique_ptr<ContextBase>(createRootContext(plugin));
362362
auto context_ptr = context.get();
363-
root_contexts_[root_id] = std::move(context);
363+
root_contexts_[plugin->key()] = std::move(context);
364364
if (!context_ptr->onStart(plugin)) {
365365
return nullptr;
366366
}
@@ -377,38 +377,49 @@ uint32_t WasmBase::allocContextId() {
377377
}
378378
}
379379

380-
void WasmBase::startShutdown() {
381-
bool all_done = true;
382-
for (auto &p : root_contexts_) {
383-
if (!p.second->onDone()) {
384-
all_done = false;
385-
pending_done_.insert(p.second.get());
380+
void WasmBase::startShutdown(const std::shared_ptr<PluginBase> &plugin) {
381+
auto it = root_contexts_.find(plugin->key());
382+
if (it != root_contexts_.end()) {
383+
if (it->second->onDone()) {
384+
it->second->onDelete();
385+
} else {
386+
pending_done_[it->first] = std::move(it->second);
386387
}
388+
root_contexts_.erase(it);
387389
}
388-
if (!all_done) {
389-
shutdown_handle_ = std::make_unique<ShutdownHandle>(shared_from_this());
390-
} else {
391-
finishShutdown();
390+
}
391+
392+
void WasmBase::startShutdown() {
393+
auto it = root_contexts_.begin();
394+
while (it != root_contexts_.end()) {
395+
if (it->second->onDone()) {
396+
it->second->onDelete();
397+
} else {
398+
pending_done_[it->first] = std::move(it->second);
399+
}
400+
it = root_contexts_.erase(it);
392401
}
393402
}
394403

395404
WasmResult WasmBase::done(ContextBase *root_context) {
396-
auto it = pending_done_.find(root_context);
405+
auto it = pending_done_.find(root_context->plugin_->key());
397406
if (it == pending_done_.end()) {
398407
return WasmResult::NotFound;
399408
}
409+
pending_delete_.insert(std::move(it->second));
400410
pending_done_.erase(it);
401-
if (pending_done_.empty() && shutdown_handle_) {
402-
// Defer the delete so that onDelete is not called from within the done() handler.
403-
addAfterVmCallAction(
404-
[shutdown_handle = shutdown_handle_.release()]() { delete shutdown_handle; });
405-
}
411+
// Defer the delete so that onDelete is not called from within the done() handler.
412+
shutdown_handle_ = std::make_unique<ShutdownHandle>(shared_from_this());
413+
addAfterVmCallAction(
414+
[shutdown_handle = shutdown_handle_.release()]() { delete shutdown_handle; });
406415
return WasmResult::Ok;
407416
}
408417

409418
void WasmBase::finishShutdown() {
410-
for (auto &p : root_contexts_) {
411-
p.second->onDelete();
419+
auto it = pending_delete_.begin();
420+
while (it != pending_delete_.end()) {
421+
(*it)->onDelete();
422+
it = pending_delete_.erase(it);
412423
}
413424
}
414425

@@ -520,11 +531,18 @@ getOrCreateThreadLocalWasm(std::shared_ptr<WasmHandleBase> base_wasm,
520531
WasmHandleCloneFactory clone_factory) {
521532
auto wasm_handle = getThreadLocalWasm(base_wasm->wasm()->vm_key());
522533
if (wasm_handle) {
523-
auto root_context = wasm_handle->wasm()->getOrCreateRootContext(plugin);
524-
if (!wasm_handle->wasm()->configure(root_context, plugin)) {
525-
base_wasm->wasm()->fail(FailState::ConfigureFailed,
526-
"Failed to configure thread-local Wasm code");
527-
return nullptr;
534+
auto root_context = wasm_handle->wasm()->getRootContext(plugin, false);
535+
if (!root_context) {
536+
root_context = wasm_handle->wasm()->start(plugin);
537+
if (!root_context) {
538+
base_wasm->wasm()->fail(FailState::StartFailed, "Failed to start thread-local Wasm");
539+
return nullptr;
540+
}
541+
if (!wasm_handle->wasm()->configure(root_context, plugin)) {
542+
base_wasm->wasm()->fail(FailState::ConfigureFailed,
543+
"Failed to configure thread-local Wasm plugin");
544+
return nullptr;
545+
}
528546
}
529547
return wasm_handle;
530548
}

0 commit comments

Comments
 (0)