@@ -554,6 +554,71 @@ std::shared_ptr<WasmHandleBase> getThreadLocalWasm(std::string_view vm_key) {
554
554
return wasm;
555
555
}
556
556
557
+ void setWasmFailCallback (const std::string &vm_key,
558
+ const std::shared_ptr<WasmHandleBase> &wasm_handle) {
559
+ std::weak_ptr<WasmHandleBase> wasm_handle_for_copy = wasm_handle;
560
+ wasm_handle->wasm ()->wasm_vm ()->addFailCallback (
561
+ [vm_key, wasm_handle_for_copy](proxy_wasm::FailState fail_state) {
562
+ if (fail_state == proxy_wasm::FailState::RuntimeError) {
563
+ // If VM failed, erase the entry so that:
564
+ // 1) we can recreate the new thread local VM from the same base_wasm.
565
+ // 2) we wouldn't reuse the failed VM for new plugins accidentally.
566
+ local_wasms.erase (vm_key);
567
+ auto wasm_handle = wasm_handle_for_copy.lock ();
568
+ if (!wasm_handle) {
569
+ return ;
570
+ }
571
+ wasm_handle->setNeedRecover ();
572
+ }
573
+ });
574
+ }
575
+
576
+ void setWasmRecoverCallback (const std::string &vm_key,
577
+ const std::shared_ptr<WasmHandleBase> &wasm_handle,
578
+ const std::shared_ptr<WasmHandleBase> &base_handle,
579
+ const WasmHandleCloneFactory &clone_factory) {
580
+ std::weak_ptr<WasmHandleBase> wasm_handle_for_copy = wasm_handle;
581
+ wasm_handle->setRecoverVmCallback ([vm_key, wasm_handle_for_copy, base_handle,
582
+ clone_factory]() -> std::shared_ptr<WasmHandleBase> {
583
+ const auto &integration = base_handle->wasm ()->wasm_vm ()->integration ();
584
+ integration->trace (" Start recover wasm_handle" );
585
+ auto it = local_wasms.find (vm_key);
586
+ if (it != local_wasms.end ()) {
587
+ auto wasm_handle = it->second .lock ();
588
+ if (wasm_handle) {
589
+ integration->trace (" Wasm handle already exists" );
590
+ return wasm_handle;
591
+ }
592
+ local_wasms.erase (vm_key);
593
+ }
594
+ // try to recover wasm vm
595
+ auto wasm_handle = wasm_handle_for_copy.lock ();
596
+ if (!wasm_handle) {
597
+ base_handle->wasm ()->fail (FailState::RecoverError, " Wasm handle lock failed" );
598
+ return nullptr ;
599
+ }
600
+ auto new_handle = clone_factory (base_handle);
601
+ if (!new_handle) {
602
+ base_handle->wasm ()->fail (FailState::RecoverError,
603
+ " Failed to clone Base Wasm during recover" );
604
+ return nullptr ;
605
+ }
606
+
607
+ if (!new_handle->wasm ()->initialize ()) {
608
+ base_handle->wasm ()->fail (FailState::RecoverError,
609
+ " Failed to initialize Wasm code during recover" );
610
+ return nullptr ;
611
+ }
612
+ // avoid the context use the stale wasm ptr
613
+ wasm_handle->wasm ()->clearWasmInContext ();
614
+ wasm_handle->swap (new_handle);
615
+ local_wasms[vm_key] = wasm_handle;
616
+ integration->trace (" Wasm handle has been recovered" );
617
+ setWasmFailCallback (vm_key, wasm_handle);
618
+ return wasm_handle;
619
+ });
620
+ }
621
+
557
622
static std::shared_ptr<WasmHandleBase>
558
623
getOrCreateThreadLocalWasm (const std::shared_ptr<WasmHandleBase> &base_handle,
559
624
const WasmHandleCloneFactory &clone_factory) {
@@ -580,17 +645,75 @@ getOrCreateThreadLocalWasm(const std::shared_ptr<WasmHandleBase> &base_handle,
580
645
return nullptr ;
581
646
}
582
647
local_wasms[vm_key] = wasm_handle;
583
- wasm_handle->wasm ()->wasm_vm ()->addFailCallback ([vm_key](proxy_wasm::FailState fail_state) {
584
- if (fail_state == proxy_wasm::FailState::RuntimeError) {
585
- // If VM failed, erase the entry so that:
586
- // 1) we can recreate the new thread local VM from the same base_wasm.
587
- // 2) we wouldn't reuse the failed VM for new plugins accidentally.
588
- local_wasms.erase (vm_key);
589
- };
590
- });
648
+ setWasmFailCallback (vm_key, wasm_handle);
649
+ setWasmRecoverCallback (vm_key, wasm_handle, base_handle, clone_factory);
591
650
return wasm_handle;
592
651
}
593
652
653
+ void setPluginFailCallback (const std::string &key,
654
+ const std::shared_ptr<WasmHandleBase> &wasm_handle,
655
+ const std::shared_ptr<PluginHandleBase> &plugin_handle) {
656
+ std::weak_ptr<PluginHandleBase> plugin_handle_for_copy = plugin_handle;
657
+ wasm_handle->wasm ()->wasm_vm ()->addFailCallback (
658
+ [key, plugin_handle_for_copy](proxy_wasm::FailState fail_state) {
659
+ if (fail_state == proxy_wasm::FailState::RuntimeError) {
660
+ // If VM failed, erase the entry so that:
661
+ // 1) we can recreate the new thread local plugin from the same base_wasm.
662
+ // 2) we wouldn't reuse the failed VM for new plugin configs accidentally.
663
+ local_plugins.erase (key);
664
+ auto plugin_handle = plugin_handle_for_copy.lock ();
665
+ if (!plugin_handle) {
666
+ return ;
667
+ }
668
+ plugin_handle->setNeedRecover ();
669
+ }
670
+ });
671
+ }
672
+
673
+ void setPluginRecoverCallback (const std::string &key,
674
+ const std::shared_ptr<PluginHandleBase> &plugin_handle,
675
+ const std::shared_ptr<WasmHandleBase> &base_handle,
676
+ const std::shared_ptr<PluginBase> &plugin) {
677
+ std::weak_ptr<PluginHandleBase> plugin_handle_for_copy = plugin_handle;
678
+ plugin_handle->setRecoverPluginCallback (
679
+ [key, plugin_handle_for_copy, base_handle,
680
+ plugin](std::shared_ptr<WasmHandleBase> &wasm_handle) -> std::shared_ptr<PluginHandleBase> {
681
+ const auto &integration = base_handle->wasm ()->wasm_vm ()->integration ();
682
+ integration->trace (" Start recover plugin_handle" );
683
+ auto it = local_plugins.find (key);
684
+ if (it != local_plugins.end ()) {
685
+ auto plugin_handle = it->second .lock ();
686
+ if (plugin_handle) {
687
+ integration->trace (" Plugin handle already exists" );
688
+ return plugin_handle;
689
+ }
690
+ local_plugins.erase (key);
691
+ }
692
+ auto plugin_handle = plugin_handle_for_copy.lock ();
693
+ if (!plugin_handle) {
694
+ base_handle->wasm ()->fail (FailState::RecoverError, " Plugin handle lock failed" );
695
+ return nullptr ;
696
+ }
697
+ plugin_handle->updateWasm (wasm_handle);
698
+ // Create and initialize new thread-local Plugin.
699
+ auto *plugin_context = wasm_handle->wasm ()->start (plugin);
700
+ if (plugin_context == nullptr ) {
701
+ base_handle->wasm ()->fail (FailState::RecoverError,
702
+ " Failed to start thread-local Wasm during recover" );
703
+ return nullptr ;
704
+ }
705
+ if (!wasm_handle->wasm ()->configure (plugin_context, plugin)) {
706
+ base_handle->wasm ()->fail (FailState::RecoverError,
707
+ " Failed to configure thread-local Wasm plugin during recover" );
708
+ return nullptr ;
709
+ }
710
+ local_plugins[key] = plugin_handle;
711
+ integration->trace (" Plugin_handle has been recovered" );
712
+ setPluginFailCallback (key, wasm_handle, plugin_handle);
713
+ return plugin_handle;
714
+ });
715
+ }
716
+
594
717
std::shared_ptr<PluginHandleBase> getOrCreateThreadLocalPlugin (
595
718
const std::shared_ptr<WasmHandleBase> &base_handle, const std::shared_ptr<PluginBase> &plugin,
596
719
const WasmHandleCloneFactory &clone_factory, const PluginHandleFactory &plugin_factory) {
@@ -623,14 +746,8 @@ std::shared_ptr<PluginHandleBase> getOrCreateThreadLocalPlugin(
623
746
}
624
747
auto plugin_handle = plugin_factory (wasm_handle, plugin);
625
748
local_plugins[key] = plugin_handle;
626
- wasm_handle->wasm ()->wasm_vm ()->addFailCallback ([key](proxy_wasm::FailState fail_state) {
627
- if (fail_state == proxy_wasm::FailState::RuntimeError) {
628
- // If VM failed, erase the entry so that:
629
- // 1) we can recreate the new thread local plugin from the same base_wasm.
630
- // 2) we wouldn't reuse the failed VM for new plugin configs accidentally.
631
- local_plugins.erase (key);
632
- };
633
- });
749
+ setPluginFailCallback (key, wasm_handle, plugin_handle);
750
+ setPluginRecoverCallback (key, plugin_handle, base_handle, plugin);
634
751
return plugin_handle;
635
752
}
636
753
0 commit comments