Skip to content

Commit 4d97080

Browse files
committed
optimize wasm recover
Link: https://code.alibaba-inc.com/Ingress/proxy-wasm-cpp-host/codereview/14808697 * optimize wasm recover * update UT * add some assert
1 parent b38b44f commit 4d97080

File tree

5 files changed

+152
-184
lines changed

5 files changed

+152
-184
lines changed

include/proxy-wasm/context.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ class ContextBase : public RootInterface,
151151
virtual ~ContextBase();
152152

153153
WasmBase *wasm() const { return wasm_; }
154-
void clearWasm() { wasm_ = nullptr; }
155154
uint32_t id() const { return id_; }
156155
// The VM Context used for calling "malloc" has an id_ == 0.
157156
bool isVmContext() const { return id_ == 0; }

include/proxy-wasm/wasm.h

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include <string.h>
1919

20+
#include <cassert>
2021
#include <atomic>
2122
#include <deque>
2223
#include <map>
@@ -73,11 +74,6 @@ class WasmBase : public std::enable_shared_from_this<WasmBase> {
7374
return it->second;
7475
return nullptr;
7576
}
76-
void clearWasmInContext() {
77-
for (auto &item : contexts_) {
78-
item.second->clearWasm();
79-
}
80-
}
8177
uint32_t allocContextId();
8278
bool isFailed() { return failed_ != FailState::Ok; }
8379
FailState fail_state() { return failed_; }
@@ -328,7 +324,7 @@ using WasmHandleCloneFactory =
328324
class WasmHandleBase : public std::enable_shared_from_this<WasmHandleBase> {
329325
public:
330326
explicit WasmHandleBase(std::shared_ptr<WasmBase> wasm_base) : wasm_base_(wasm_base) {}
331-
virtual ~WasmHandleBase() {
327+
~WasmHandleBase() {
332328
if (wasm_base_) {
333329
wasm_base_->startShutdown();
334330
}
@@ -341,29 +337,24 @@ class WasmHandleBase : public std::enable_shared_from_this<WasmHandleBase> {
341337

342338
std::shared_ptr<WasmBase> &wasm() { return wasm_base_; }
343339

344-
virtual void swap(std::shared_ptr<WasmHandleBase> &new_handle) {
345-
wasm_base_.swap(new_handle->wasm_base_);
346-
}
347-
348340
void setRecoverVmCallback(std::function<std::shared_ptr<WasmHandleBase>()> &&f) {
349341
recover_vm_callback_ = std::move(f);
350342
}
351-
void setNeedRecover() { need_recover_ = true; }
352-
bool needRecover() { return need_recover_; }
343+
344+
// Recover the wasm vm and generate a new wasm handle
353345
bool doRecover(std::shared_ptr<WasmHandleBase> &new_handle) {
354-
if (!need_recover_ || recover_vm_callback_ == nullptr) {
346+
assert(new_handle == nullptr);
347+
if (recover_vm_callback_ == nullptr) {
355348
return true;
356349
}
357350
new_handle = recover_vm_callback_();
358351
if (!new_handle) {
359352
return false;
360353
}
361-
need_recover_ = false;
362354
return true;
363355
}
364356

365357
protected:
366-
bool need_recover_ = false;
367358
std::shared_ptr<WasmBase> wasm_base_;
368359
std::function<std::shared_ptr<WasmHandleBase>()> recover_vm_callback_;
369360
};
@@ -385,7 +376,7 @@ class PluginHandleBase : public std::enable_shared_from_this<PluginHandleBase> {
385376
explicit PluginHandleBase(std::shared_ptr<WasmHandleBase> wasm_handle,
386377
std::shared_ptr<PluginBase> plugin)
387378
: plugin_(plugin), wasm_handle_(wasm_handle) {}
388-
virtual ~PluginHandleBase() {
379+
~PluginHandleBase() {
389380
if (wasm_handle_) {
390381
wasm_handle_->wasm()->startShutdown(plugin_->key());
391382
}
@@ -395,35 +386,33 @@ class PluginHandleBase : public std::enable_shared_from_this<PluginHandleBase> {
395386
std::shared_ptr<WasmBase> &wasm() { return wasm_handle_->wasm(); }
396387
std::shared_ptr<WasmHandleBase> &wasmHandle() { return wasm_handle_; }
397388

398-
void setRecoverPluginCallback(std::function<bool(std::shared_ptr<WasmHandleBase> &)> &&f) {
389+
void setRecoverPluginCallback(
390+
std::function<std::shared_ptr<PluginHandleBase>(std::shared_ptr<WasmHandleBase> &)> &&f) {
399391
recover_plugin_callback_ = std::move(f);
400392
}
401-
void setNeedRecover() { need_recover_ = true; }
402-
bool needRecover() { return need_recover_; }
403-
bool doRecover() {
404-
if (!need_recover_ || recover_plugin_callback_ == nullptr) {
393+
394+
// Recover the wasm plugin and generate a new plugin handle
395+
bool doRecover(std::shared_ptr<PluginHandleBase> &new_handle) {
396+
assert(new_handle == nullptr);
397+
if (recover_plugin_callback_ == nullptr) {
405398
return true;
406399
}
407400
std::shared_ptr<WasmHandleBase> new_wasm_handle;
408401
if (!wasm_handle_->doRecover(new_wasm_handle)) {
409402
return false;
410403
}
411-
if (!recover_plugin_callback_(new_wasm_handle)) {
404+
new_handle = recover_plugin_callback_(new_wasm_handle);
405+
if (!new_handle) {
412406
return false;
413407
}
414-
need_recover_ = false;
415408
return true;
416409
}
417410

418-
virtual void updateWasm(std::shared_ptr<WasmHandleBase> &new_handle) {
419-
wasm_handle_ = new_handle;
420-
}
421-
422411
protected:
423-
bool need_recover_ = false;
424412
std::shared_ptr<PluginBase> plugin_;
425413
std::shared_ptr<WasmHandleBase> wasm_handle_;
426-
std::function<bool(std::shared_ptr<WasmHandleBase> &)> recover_plugin_callback_;
414+
std::function<std::shared_ptr<PluginHandleBase>(std::shared_ptr<WasmHandleBase> &)>
415+
recover_plugin_callback_;
427416
};
428417

429418
using PluginHandleFactory = std::function<std::shared_ptr<PluginHandleBase>(

src/context.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16+
#include <cassert>
1617
#include <deque>
1718
#include <map>
1819
#include <memory>
@@ -529,7 +530,8 @@ FilterMetadataStatus ContextBase::convertVmCallResultToFilterMetadataStatus(uint
529530

530531
ContextBase::~ContextBase() {
531532
// Do not remove vm context which has the same lifetime as wasm_.
532-
if (id_ != 0U && wasm_ != nullptr) {
533+
if (id_ != 0U) {
534+
assert(wasm_ != nullptr);
533535
wasm_->contexts_.erase(id_);
534536
}
535537
}

src/wasm.cc

Lines changed: 84 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -556,67 +556,52 @@ std::shared_ptr<WasmHandleBase> getThreadLocalWasm(std::string_view vm_key) {
556556

557557
void setWasmFailCallback(const std::string &vm_key,
558558
const std::shared_ptr<WasmHandleBase> &wasm_handle) {
559-
std::weak_ptr<WasmHandleBase> wasm_handle_for_copy = wasm_handle;
560-
wasm_handle->wasm()->wasm_vm()->addFailCallback(
561-
[vm_key, wasm_handle_for_copy](proxy_wasm::FailState fail_state) {
562-
if (fail_state == proxy_wasm::FailState::RuntimeError) {
563-
// If VM failed, erase the entry so that:
564-
// 1) we can recreate the new thread local VM from the same base_wasm.
565-
// 2) we wouldn't reuse the failed VM for new plugins accidentally.
566-
local_wasms.erase(vm_key);
567-
auto wasm_handle = wasm_handle_for_copy.lock();
568-
if (!wasm_handle) {
569-
return;
570-
}
571-
wasm_handle->setNeedRecover();
572-
}
573-
});
559+
wasm_handle->wasm()->wasm_vm()->addFailCallback([vm_key](proxy_wasm::FailState fail_state) {
560+
if (fail_state == proxy_wasm::FailState::RuntimeError) {
561+
// If VM failed, erase the entry so that:
562+
// 1) we can recreate the new thread local VM from the same base_wasm.
563+
// 2) we wouldn't reuse the failed VM for new plugins accidentally.
564+
local_wasms.erase(vm_key);
565+
}
566+
});
574567
}
575568

576569
void setWasmRecoverCallback(const std::string &vm_key,
577570
const std::shared_ptr<WasmHandleBase> &wasm_handle,
578571
const std::shared_ptr<WasmHandleBase> &base_handle,
579572
const WasmHandleCloneFactory &clone_factory) {
580-
std::weak_ptr<WasmHandleBase> wasm_handle_for_copy = wasm_handle;
581-
wasm_handle->setRecoverVmCallback([vm_key, wasm_handle_for_copy, base_handle,
582-
clone_factory]() -> std::shared_ptr<WasmHandleBase> {
583-
const auto &integration = base_handle->wasm()->wasm_vm()->integration();
584-
integration->trace("Start recover wasm_handle");
585-
auto it = local_wasms.find(vm_key);
586-
if (it != local_wasms.end()) {
587-
auto wasm_handle = it->second.lock();
588-
if (wasm_handle) {
589-
integration->trace("Wasm handle already exists");
590-
return wasm_handle;
591-
}
592-
local_wasms.erase(vm_key);
593-
}
594-
// try to recover wasm vm
595-
auto wasm_handle = wasm_handle_for_copy.lock();
596-
if (!wasm_handle) {
597-
base_handle->wasm()->fail(FailState::RecoverError, "Wasm handle lock failed");
598-
return nullptr;
599-
}
600-
auto new_handle = clone_factory(base_handle);
601-
if (!new_handle) {
602-
base_handle->wasm()->fail(FailState::RecoverError,
603-
"Failed to clone Base Wasm during recover");
604-
return nullptr;
605-
}
573+
wasm_handle->setRecoverVmCallback(
574+
[vm_key, base_handle, clone_factory]() -> std::shared_ptr<WasmHandleBase> {
575+
const auto &integration = base_handle->wasm()->wasm_vm()->integration();
576+
integration->trace("Start recover wasm_handle");
577+
auto it = local_wasms.find(vm_key);
578+
if (it != local_wasms.end()) {
579+
auto wasm_handle = it->second.lock();
580+
if (wasm_handle) {
581+
integration->trace("Wasm handle already exists");
582+
return wasm_handle;
583+
}
584+
local_wasms.erase(vm_key);
585+
}
586+
// Try to recover wasm vm
587+
auto new_handle = clone_factory(base_handle);
588+
if (!new_handle) {
589+
base_handle->wasm()->fail(FailState::RecoverError,
590+
"Failed to clone Base Wasm during recover");
591+
return nullptr;
592+
}
606593

607-
if (!new_handle->wasm()->initialize()) {
608-
base_handle->wasm()->fail(FailState::RecoverError,
609-
"Failed to initialize Wasm code during recover");
610-
return nullptr;
611-
}
612-
// avoid the context use the stale wasm ptr
613-
wasm_handle->wasm()->clearWasmInContext();
614-
wasm_handle->swap(new_handle);
615-
local_wasms[vm_key] = wasm_handle;
616-
integration->trace("Wasm handle has been recovered");
617-
setWasmFailCallback(vm_key, wasm_handle);
618-
return wasm_handle;
619-
});
594+
if (!new_handle->wasm()->initialize()) {
595+
base_handle->wasm()->fail(FailState::RecoverError,
596+
"Failed to initialize Wasm code during recover");
597+
return nullptr;
598+
}
599+
local_wasms[vm_key] = new_handle;
600+
setWasmFailCallback(vm_key, new_handle);
601+
setWasmRecoverCallback(vm_key, new_handle, base_handle, clone_factory);
602+
integration->trace("Wasm handle has been recovered");
603+
return new_handle;
604+
});
620605
}
621606

622607
static std::shared_ptr<WasmHandleBase>
@@ -651,59 +636,54 @@ getOrCreateThreadLocalWasm(const std::shared_ptr<WasmHandleBase> &base_handle,
651636
}
652637

653638
void setPluginFailCallback(const std::string &key,
654-
const std::shared_ptr<WasmHandleBase> &wasm_handle,
655-
const std::shared_ptr<PluginHandleBase> &plugin_handle) {
656-
std::weak_ptr<PluginHandleBase> plugin_handle_for_copy = plugin_handle;
657-
wasm_handle->wasm()->wasm_vm()->addFailCallback(
658-
[key, plugin_handle_for_copy](proxy_wasm::FailState fail_state) {
659-
if (fail_state == proxy_wasm::FailState::RuntimeError) {
660-
// If VM failed, erase the entry so that:
661-
// 1) we can recreate the new thread local plugin from the same base_wasm.
662-
// 2) we wouldn't reuse the failed VM for new plugin configs accidentally.
663-
local_plugins.erase(key);
664-
auto plugin_handle = plugin_handle_for_copy.lock();
665-
if (!plugin_handle) {
666-
return;
667-
}
668-
plugin_handle->setNeedRecover();
669-
}
670-
});
639+
const std::shared_ptr<WasmHandleBase> &wasm_handle) {
640+
wasm_handle->wasm()->wasm_vm()->addFailCallback([key](proxy_wasm::FailState fail_state) {
641+
if (fail_state == proxy_wasm::FailState::RuntimeError) {
642+
// If VM failed, erase the entry so that:
643+
// 1) we can recreate the new thread local plugin from the same base_wasm.
644+
// 2) we wouldn't reuse the failed VM for new plugin configs accidentally.
645+
local_plugins.erase(key);
646+
}
647+
});
671648
}
672649

673650
void setPluginRecoverCallback(const std::string &key,
674651
const std::shared_ptr<PluginHandleBase> &plugin_handle,
675652
const std::shared_ptr<WasmHandleBase> &base_handle,
676-
const std::shared_ptr<PluginBase> &plugin) {
677-
std::weak_ptr<PluginHandleBase> plugin_handle_for_copy = plugin_handle;
678-
plugin_handle->setRecoverPluginCallback([key, plugin_handle_for_copy, base_handle,
679-
plugin](std::shared_ptr<WasmHandleBase> &wasm_handle) {
680-
const auto &integration = base_handle->wasm()->wasm_vm()->integration();
681-
integration->trace("Start recover plugin_handle");
682-
// We cannot reuse plugin that other have created because the life cycle of the plugin
683-
// handle is controlled by the upper layer.
684-
auto plugin_handle = plugin_handle_for_copy.lock();
685-
if (!plugin_handle) {
686-
base_handle->wasm()->fail(FailState::RecoverError, "Plugin handle lock failed");
687-
return false;
688-
}
689-
plugin_handle->updateWasm(wasm_handle);
690-
// Create and initialize new thread-local Plugin.
691-
auto *plugin_context = wasm_handle->wasm()->start(plugin);
692-
if (plugin_context == nullptr) {
693-
base_handle->wasm()->fail(FailState::RecoverError,
694-
"Failed to start thread-local Wasm during recover");
695-
return false;
696-
}
697-
if (!wasm_handle->wasm()->configure(plugin_context, plugin)) {
698-
base_handle->wasm()->fail(FailState::RecoverError,
699-
"Failed to configure thread-local Wasm plugin during recover");
700-
return false;
701-
}
702-
local_plugins[key] = plugin_handle;
703-
integration->trace("Plugin_handle has been recovered");
704-
setPluginFailCallback(key, wasm_handle, plugin_handle);
705-
return true;
706-
});
653+
const std::shared_ptr<PluginBase> &plugin,
654+
const PluginHandleFactory &plugin_factory) {
655+
plugin_handle->setRecoverPluginCallback(
656+
[key, base_handle, plugin, plugin_factory](
657+
std::shared_ptr<WasmHandleBase> &wasm_handle) -> std::shared_ptr<PluginHandleBase> {
658+
const auto &integration = base_handle->wasm()->wasm_vm()->integration();
659+
integration->trace("Start recover plugin_handle");
660+
auto it = local_plugins.find(key);
661+
if (it != local_plugins.end()) {
662+
auto plugin_handle = it->second.lock();
663+
if (plugin_handle) {
664+
return plugin_handle;
665+
}
666+
local_plugins.erase(key);
667+
}
668+
// Try to recover wasm plugin
669+
auto *plugin_context = wasm_handle->wasm()->start(plugin);
670+
if (plugin_context == nullptr) {
671+
base_handle->wasm()->fail(FailState::RecoverError,
672+
"Failed to start thread-local Wasm during recover");
673+
return nullptr;
674+
}
675+
if (!wasm_handle->wasm()->configure(plugin_context, plugin)) {
676+
base_handle->wasm()->fail(FailState::RecoverError,
677+
"Failed to configure thread-local Wasm plugin during recover");
678+
return nullptr;
679+
}
680+
auto new_handle = plugin_factory(wasm_handle, plugin);
681+
local_plugins[key] = new_handle;
682+
setPluginFailCallback(key, wasm_handle);
683+
setPluginRecoverCallback(key, new_handle, base_handle, plugin, plugin_factory);
684+
integration->trace("Plugin handle has been recovered");
685+
return new_handle;
686+
});
707687
}
708688

709689
std::shared_ptr<PluginHandleBase> getOrCreateThreadLocalPlugin(
@@ -738,8 +718,8 @@ std::shared_ptr<PluginHandleBase> getOrCreateThreadLocalPlugin(
738718
}
739719
auto plugin_handle = plugin_factory(wasm_handle, plugin);
740720
local_plugins[key] = plugin_handle;
741-
setPluginFailCallback(key, wasm_handle, plugin_handle);
742-
setPluginRecoverCallback(key, plugin_handle, base_handle, plugin);
721+
setPluginFailCallback(key, wasm_handle);
722+
setPluginRecoverCallback(key, plugin_handle, base_handle, plugin, plugin_factory);
743723
return plugin_handle;
744724
}
745725

0 commit comments

Comments
 (0)