chore(napi): fix tokio destroy logic

This commit is contained in:
LongYinan 2022-12-16 20:07:22 +08:00
parent e88fbcc404
commit 968d9e10b1
No known key found for this signature in database
GPG key ID: C3666B7FC82ADAD7
2 changed files with 49 additions and 69 deletions

View file

@ -1,5 +1,5 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::ffi::{c_void, CStr}; use std::ffi::CStr;
use std::ptr; use std::ptr;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
@ -21,17 +21,6 @@ struct PersistedPerInstanceVec<T> {
length: AtomicUsize, length: AtomicUsize,
} }
impl<T> Drop for PersistedPerInstanceVec<T> {
fn drop(&mut self) {
let length = self.length.load(Ordering::Relaxed);
if length == 0 {
return;
}
let inner = self.inner.load(Ordering::Relaxed);
unsafe { Vec::from_raw_parts(inner, length, length) };
}
}
impl<T> Default for PersistedPerInstanceVec<T> { impl<T> Default for PersistedPerInstanceVec<T> {
fn default() -> Self { fn default() -> Self {
let mut vec: Vec<T> = Vec::with_capacity(1); let mut vec: Vec<T> = Vec::with_capacity(1);
@ -86,12 +75,6 @@ unsafe impl<T: Sync> Sync for PersistedPerInstanceVec<T> {}
pub(crate) struct PersistedPerInstanceHashMap<K, V>(*mut HashMap<K, V>); pub(crate) struct PersistedPerInstanceHashMap<K, V>(*mut HashMap<K, V>);
impl<K, V> Drop for PersistedPerInstanceHashMap<K, V> {
fn drop(&mut self) {
unsafe { Box::from_raw(self.0) };
}
}
impl<K, V> PersistedPerInstanceHashMap<K, V> { impl<K, V> PersistedPerInstanceHashMap<K, V> {
pub(crate) fn from_hashmap(hashmap: HashMap<K, V>) -> Self { pub(crate) fn from_hashmap(hashmap: HashMap<K, V>) -> Self {
Self(Box::into_raw(Box::new(hashmap))) Self(Box::into_raw(Box::new(hashmap)))
@ -130,32 +113,11 @@ type RegisteredClassesMap = PersistedPerInstanceHashMap<ThreadId, RegisteredClas
static MODULE_REGISTER_CALLBACK: Lazy<ModuleRegisterCallback> = Lazy::new(Default::default); static MODULE_REGISTER_CALLBACK: Lazy<ModuleRegisterCallback> = Lazy::new(Default::default);
static MODULE_CLASS_PROPERTIES: Lazy<ModuleClassProperty> = Lazy::new(Default::default); static MODULE_CLASS_PROPERTIES: Lazy<ModuleClassProperty> = Lazy::new(Default::default);
static REGISTERED: AtomicBool = AtomicBool::new(false); static IS_FIRST_MODULE: AtomicBool = AtomicBool::new(true);
static FIRST_MODULE_REGISTERED: AtomicBool = AtomicBool::new(false);
static REGISTERED_CLASSES: Lazy<RegisteredClassesMap> = Lazy::new(Default::default); static REGISTERED_CLASSES: Lazy<RegisteredClassesMap> = Lazy::new(Default::default);
static FN_REGISTER_MAP: Lazy<FnRegisterMap> = Lazy::new(Default::default); static FN_REGISTER_MAP: Lazy<FnRegisterMap> = Lazy::new(Default::default);
#[ctor::dtor]
fn destroy() {
{
let ptr = MODULE_REGISTER_CALLBACK.inner.load(Ordering::Relaxed);
let len = MODULE_REGISTER_CALLBACK.length.load(Ordering::Relaxed);
unsafe { Vec::from_raw_parts(ptr, len, len) };
}
{
unsafe { Box::from_raw(MODULE_CLASS_PROPERTIES.0) };
}
{
unsafe { Box::from_raw(FN_REGISTER_MAP.0) };
}
}
#[inline]
fn wait_first_thread_registered() {
while !REGISTERED.load(Ordering::SeqCst) {
std::hint::spin_loop();
}
}
type RegisteredClasses = type RegisteredClasses =
PersistedPerInstanceHashMap</* export name */ String, /* constructor */ sys::napi_ref>; PersistedPerInstanceHashMap</* export name */ String, /* constructor */ sys::napi_ref>;
@ -164,9 +126,15 @@ type RegisteredClasses =
static MODULE_EXPORTS: Lazy<PersistedPerInstanceVec<ModuleExportsCallback>> = static MODULE_EXPORTS: Lazy<PersistedPerInstanceVec<ModuleExportsCallback>> =
Lazy::new(Default::default); Lazy::new(Default::default);
#[inline]
fn wait_first_thread_registered() {
while !FIRST_MODULE_REGISTERED.load(Ordering::SeqCst) {
std::hint::spin_loop();
}
}
#[doc(hidden)] #[doc(hidden)]
pub fn get_class_constructor(js_name: &'static str) -> Option<sys::napi_ref> { pub fn get_class_constructor(js_name: &'static str) -> Option<sys::napi_ref> {
wait_first_thread_registered();
let current_id = std::thread::current().id(); let current_id = std::thread::current().id();
REGISTERED_CLASSES.borrow_mut(|map| { REGISTERED_CLASSES.borrow_mut(|map| {
map map
@ -236,7 +204,6 @@ pub fn register_class(
/// ``` /// ```
/// ///
pub fn get_js_function(env: &Env, raw_fn: ExportRegisterCallback) -> Result<JsFunction> { pub fn get_js_function(env: &Env, raw_fn: ExportRegisterCallback) -> Result<JsFunction> {
wait_first_thread_registered();
FN_REGISTER_MAP.borrow_mut(|inner| { FN_REGISTER_MAP.borrow_mut(|inner| {
inner inner
.get(&raw_fn) .get(&raw_fn)
@ -290,7 +257,6 @@ pub fn get_js_function(env: &Env, raw_fn: ExportRegisterCallback) -> Result<JsFu
/// ``` /// ```
/// ///
pub fn get_c_callback(raw_fn: ExportRegisterCallback) -> Result<crate::Callback> { pub fn get_c_callback(raw_fn: ExportRegisterCallback) -> Result<crate::Callback> {
wait_first_thread_registered();
FN_REGISTER_MAP.borrow_mut(|inner| { FN_REGISTER_MAP.borrow_mut(|inner| {
inner inner
.get(&raw_fn) .get(&raw_fn)
@ -317,6 +283,11 @@ unsafe extern "C" fn napi_register_module_v1(
env: sys::napi_env, env: sys::napi_env,
exports: sys::napi_value, exports: sys::napi_value,
) -> sys::napi_value { ) -> sys::napi_value {
if IS_FIRST_MODULE.load(Ordering::SeqCst) {
IS_FIRST_MODULE.store(false, Ordering::SeqCst);
} else {
wait_first_thread_registered();
}
crate::__private::___CALL_FROM_FACTORY.get_or_default(); crate::__private::___CALL_FROM_FACTORY.get_or_default();
let mut exports_objects: HashSet<String> = HashSet::default(); let mut exports_objects: HashSet<String> = HashSet::default();
MODULE_REGISTER_CALLBACK.borrow_mut(|inner| { MODULE_REGISTER_CALLBACK.borrow_mut(|inner| {
@ -393,8 +364,7 @@ unsafe extern "C" fn napi_register_module_v1(
}) })
}); });
let mut registered_classes = let mut registered_classes = HashMap::new();
HashMap::with_capacity(MODULE_CLASS_PROPERTIES.borrow_mut(|inner| inner.len()));
MODULE_CLASS_PROPERTIES.borrow_mut(|inner| { MODULE_CLASS_PROPERTIES.borrow_mut(|inner| {
inner.iter().for_each(|(rust_name, js_mods)| { inner.iter().for_each(|(rust_name, js_mods)| {
@ -498,29 +468,21 @@ unsafe extern "C" fn napi_register_module_v1(
}) })
}); });
#[cfg(feature = "napi3")] #[cfg(all(windows, feature = "napi4", feature = "tokio_rt"))]
{ {
crate::tokio_runtime::RT_REFERENCE_COUNT.fetch_add(1, Ordering::SeqCst);
unsafe { unsafe {
sys::napi_add_env_cleanup_hook(env, Some(remove_registered_classes), env as *mut c_void) sys::napi_add_env_cleanup_hook(
env,
Some(crate::tokio_runtime::drop_runtime),
ptr::null_mut(),
)
}; };
} }
REGISTERED.store(true, Ordering::SeqCst); FIRST_MODULE_REGISTERED.store(true, Ordering::SeqCst);
exports exports
} }
unsafe extern "C" fn remove_registered_classes(env: *mut c_void) {
let env = env as sys::napi_env;
if let Some(registered_classes) =
REGISTERED_CLASSES.borrow_mut(|map| map.remove(&std::thread::current().id()))
{
registered_classes.borrow_mut(|map| {
map.iter().for_each(|(_, v)| {
unsafe { sys::napi_delete_reference(env, *v) };
})
});
}
}
pub(crate) unsafe extern "C" fn noop( pub(crate) unsafe extern "C" fn noop(
env: sys::napi_env, env: sys::napi_env,
_info: sys::napi_callback_info, _info: sys::napi_callback_info,

View file

@ -6,11 +6,29 @@ use tokio::runtime::Runtime;
use crate::{check_status, sys, JsDeferred, JsUnknown, NapiValue, Result}; use crate::{check_status, sys, JsDeferred, JsUnknown, NapiValue, Result};
pub(crate) static RT: Lazy<Runtime> = pub(crate) static mut RT: Lazy<Option<Runtime>> = Lazy::new(|| {
Lazy::new(|| tokio::runtime::Runtime::new().expect("Create tokio runtime failed")); let runtime = tokio::runtime::Runtime::new().expect("Create tokio runtime failed");
Some(runtime)
});
pub fn runtime() -> &'static Runtime { #[cfg(windows)]
&RT pub(crate) static RT_REFERENCE_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[cfg(windows)]
pub(crate) unsafe extern "C" fn drop_runtime(arg: *mut std::ffi::c_void) {
use std::sync::atomic::Ordering;
if RT_REFERENCE_COUNT.fetch_sub(1, Ordering::SeqCst) == 1 {
if let Some(rt) = Lazy::get_mut(unsafe { &mut RT }) {
rt.take();
}
}
unsafe {
let env: sys::napi_env = arg as *mut sys::napi_env__;
sys::napi_remove_env_cleanup_hook(env, Some(drop_runtime), arg);
}
} }
/// Spawns a future onto the Tokio runtime. /// Spawns a future onto the Tokio runtime.
@ -21,7 +39,7 @@ pub fn spawn<F>(fut: F) -> tokio::task::JoinHandle<F::Output>
where where
F: 'static + Send + Future<Output = ()>, F: 'static + Send + Future<Output = ()>,
{ {
RT.spawn(fut) unsafe { RT.as_ref() }.unwrap().spawn(fut)
} }
/// Runs a future to completion /// Runs a future to completion
@ -31,7 +49,7 @@ pub fn block_on<F>(fut: F) -> F::Output
where where
F: 'static + Send + Future<Output = ()>, F: 'static + Send + Future<Output = ()>,
{ {
RT.block_on(fut) unsafe { RT.as_ref() }.unwrap().block_on(fut)
} }
// This function's signature must be kept in sync with the one in lib.rs, otherwise napi // This function's signature must be kept in sync with the one in lib.rs, otherwise napi
@ -41,7 +59,7 @@ where
/// then call the provided closure. Otherwise it will just call the provided closure. /// then call the provided closure. Otherwise it will just call the provided closure.
#[inline] #[inline]
pub fn within_runtime_if_available<F: FnOnce() -> T, T>(f: F) -> T { pub fn within_runtime_if_available<F: FnOnce() -> T, T>(f: F) -> T {
let _rt_guard = RT.enter(); let _rt_guard = unsafe { RT.as_ref() }.unwrap().enter();
f() f()
} }