fix(napi): free buffer in current thread if env is available (#1549)

This commit is contained in:
LongYinan 2023-03-30 12:55:54 +08:00 committed by GitHub
parent a0b6e2b263
commit 0a6b214505
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 23 deletions

View file

@ -8,7 +8,7 @@ use std::sync::{
};
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
use crate::bindgen_prelude::{CUSTOM_GC_TSFN, CUSTOM_GC_TSFN_CLOSED, MAIN_THREAD_ID};
use crate::bindgen_prelude::{CUSTOM_GC_TSFN, CUSTOM_GC_TSFN_CLOSED, THREADS_CAN_ACCESS_ENV};
pub use crate::js_values::TypedArrayType;
use crate::{check_status, sys, Error, Result, Status};
@ -68,18 +68,19 @@ macro_rules! impl_typed_array {
if let Some((ref_, env)) = self.raw {
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
{
if CUSTOM_GC_TSFN_CLOSED.load(std::sync::atomic::Ordering::SeqCst) {
if CUSTOM_GC_TSFN_CLOSED
.with(|closed| closed.load(std::sync::atomic::Ordering::Relaxed))
{
return;
}
if !MAIN_THREAD_ID
.get()
.map(|id| &std::thread::current().id() == id)
.unwrap_or(false)
if !THREADS_CAN_ACCESS_ENV
.get_or_init(Default::default)
.contains(&std::thread::current().id())
{
let status = unsafe {
sys::napi_call_threadsafe_function(
CUSTOM_GC_TSFN.load(std::sync::atomic::Ordering::SeqCst),
ref_ as *mut c_void,
ref_.cast(),
1,
)
};

View file

@ -10,7 +10,7 @@ use std::sync::Arc;
use std::sync::Mutex;
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
use crate::bindgen_prelude::{CUSTOM_GC_TSFN, CUSTOM_GC_TSFN_CLOSED, MAIN_THREAD_ID};
use crate::bindgen_prelude::{CUSTOM_GC_TSFN, CUSTOM_GC_TSFN_CLOSED, THREADS_CAN_ACCESS_ENV};
use crate::{bindgen_prelude::*, check_status, sys, Result, ValueType};
#[cfg(all(debug_assertions, not(windows)))]
@ -36,18 +36,18 @@ impl Drop for Buffer {
if let Some((ref_, env)) = self.raw {
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
{
if CUSTOM_GC_TSFN_CLOSED.load(std::sync::atomic::Ordering::SeqCst) {
if CUSTOM_GC_TSFN_CLOSED.with(|closed| closed.load(std::sync::atomic::Ordering::Relaxed))
{
return;
}
if !MAIN_THREAD_ID
.get()
.map(|id| &std::thread::current().id() == id)
.unwrap_or(false)
if !THREADS_CAN_ACCESS_ENV
.get_or_init(Default::default)
.contains(&std::thread::current().id())
{
let status = unsafe {
sys::napi_call_threadsafe_function(
CUSTOM_GC_TSFN.load(std::sync::atomic::Ordering::SeqCst),
ref_ as *mut c_void,
ref_.cast(),
1,
)
};

View file

@ -1,5 +1,9 @@
use std::collections::{HashMap, HashSet};
use std::ffi::CStr;
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
use std::hash::Hash;
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
use std::ops::Deref;
use std::ptr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
@ -73,6 +77,45 @@ impl<T> PersistedPerInstanceVec<T> {
unsafe impl<T: Send> Send for PersistedPerInstanceVec<T> {}
unsafe impl<T: Sync> Sync for PersistedPerInstanceVec<T> {}
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
pub(crate) struct PersistedPerInstanceHashSet<T: 'static> {
inner: *mut HashSet<T>,
}
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
impl<T: 'static + PartialEq + Eq + Hash> PersistedPerInstanceHashSet<T> {
pub(crate) fn insert(&self, item: T) {
Box::leak(unsafe { Box::from_raw(self.inner) }).insert(item);
}
fn remove(&self, item: &T) {
Box::leak(unsafe { Box::from_raw(self.inner) }).remove(item);
}
}
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
impl<T: 'static> Deref for PersistedPerInstanceHashSet<T> {
type Target = HashSet<T>;
fn deref(&self) -> &Self::Target {
Box::leak(unsafe { Box::from_raw(self.inner) })
}
}
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
impl<T: 'static> Default for PersistedPerInstanceHashSet<T> {
fn default() -> Self {
Self {
inner: Box::leak(Box::default()),
}
}
}
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
unsafe impl<T: Send> Send for PersistedPerInstanceHashSet<T> {}
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
unsafe impl<T: Sync> Sync for PersistedPerInstanceHashSet<T> {}
pub(crate) struct PersistedPerInstanceHashMap<K, V>(*mut HashMap<K, V>);
impl<K, V> PersistedPerInstanceHashMap<K, V> {
@ -121,12 +164,16 @@ static FN_REGISTER_MAP: Lazy<FnRegisterMap> = Lazy::new(Default::default);
pub(crate) static CUSTOM_GC_TSFN: AtomicPtr<sys::napi_threadsafe_function__> =
AtomicPtr::new(ptr::null_mut());
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
// CustomGC ThreadsafeFunction may be deleted during the process exit.
// And there may still some Buffer alive after that.
pub(crate) static CUSTOM_GC_TSFN_CLOSED: AtomicBool = AtomicBool::new(false);
thread_local! {
// CustomGC ThreadsafeFunction may be deleted during the process exit.
// And there may still some Buffer alive after that.
pub(crate) static CUSTOM_GC_TSFN_CLOSED: AtomicBool = AtomicBool::new(false);
}
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
pub(crate) static MAIN_THREAD_ID: once_cell::sync::OnceCell<ThreadId> =
once_cell::sync::OnceCell::new();
// Store thread id of the thread that created the CustomGC ThreadsafeFunction.
pub(crate) static THREADS_CAN_ACCESS_ENV: once_cell::sync::OnceCell<
PersistedPerInstanceHashSet<ThreadId>,
> = once_cell::sync::OnceCell::new();
type RegisteredClasses =
PersistedPerInstanceHashMap</* export name */ String, /* constructor */ sys::napi_ref>;
@ -552,7 +599,7 @@ fn create_custom_gc(env: sys::napi_env) {
&mut async_resource_name,
)
},
"Create async resource string in napi_register_module_v1 napi_register_module_v1"
"Create async resource string in napi_register_module_v1"
);
let mut custom_gc_tsfn = ptr::null_mut();
check_status_or_throw!(
@ -579,8 +626,29 @@ fn create_custom_gc(env: sys::napi_env) {
unsafe { sys::napi_unref_threadsafe_function(env, custom_gc_tsfn) },
"Unref Custom GC ThreadsafeFunction in napi_register_module_v1 failed"
);
CUSTOM_GC_TSFN.store(custom_gc_tsfn, Ordering::SeqCst);
MAIN_THREAD_ID.get_or_init(|| std::thread::current().id());
CUSTOM_GC_TSFN.store(custom_gc_tsfn, Ordering::Relaxed);
let threads = THREADS_CAN_ACCESS_ENV.get_or_init(Default::default);
let current_thread_id = std::thread::current().id();
threads.insert(current_thread_id);
check_status_or_throw!(
env,
unsafe {
sys::napi_add_env_cleanup_hook(
env,
Some(remove_thread_id),
Box::into_raw(Box::new(current_thread_id)).cast(),
)
},
"Failed to add remove thread id cleanup hook"
);
}
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
unsafe extern "C" fn remove_thread_id(id: *mut std::ffi::c_void) {
let thread_id = unsafe { Box::from_raw(id.cast::<ThreadId>()) };
THREADS_CAN_ACCESS_ENV
.get_or_init(Default::default)
.remove(&*thread_id);
}
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]
@ -596,7 +664,9 @@ unsafe extern "C" fn custom_gc_finalize(
finalize_data: *mut std::ffi::c_void,
finalize_hint: *mut std::ffi::c_void,
) {
CUSTOM_GC_TSFN_CLOSED.store(true, Ordering::SeqCst);
CUSTOM_GC_TSFN_CLOSED.with(|closed| {
closed.store(true, Ordering::Relaxed);
});
}
#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]