From 0a6b214505281993c5ff6f095dae631605e3235d Mon Sep 17 00:00:00 2001 From: LongYinan Date: Thu, 30 Mar 2023 12:55:54 +0800 Subject: [PATCH] fix(napi): free buffer in current thread if env is available (#1549) --- .../bindgen_runtime/js_values/arraybuffer.rs | 15 ++-- .../src/bindgen_runtime/js_values/buffer.rs | 14 +-- .../src/bindgen_runtime/module_register.rs | 88 +++++++++++++++++-- 3 files changed, 94 insertions(+), 23 deletions(-) diff --git a/crates/napi/src/bindgen_runtime/js_values/arraybuffer.rs b/crates/napi/src/bindgen_runtime/js_values/arraybuffer.rs index bec0e251..04beba6f 100644 --- a/crates/napi/src/bindgen_runtime/js_values/arraybuffer.rs +++ b/crates/napi/src/bindgen_runtime/js_values/arraybuffer.rs @@ -8,7 +8,7 @@ use std::sync::{ }; #[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] -use crate::bindgen_prelude::{CUSTOM_GC_TSFN, CUSTOM_GC_TSFN_CLOSED, MAIN_THREAD_ID}; +use crate::bindgen_prelude::{CUSTOM_GC_TSFN, CUSTOM_GC_TSFN_CLOSED, THREADS_CAN_ACCESS_ENV}; pub use crate::js_values::TypedArrayType; use crate::{check_status, sys, Error, Result, Status}; @@ -68,18 +68,19 @@ macro_rules! impl_typed_array { if let Some((ref_, env)) = self.raw { #[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] { - if CUSTOM_GC_TSFN_CLOSED.load(std::sync::atomic::Ordering::SeqCst) { + if CUSTOM_GC_TSFN_CLOSED + .with(|closed| closed.load(std::sync::atomic::Ordering::Relaxed)) + { return; } - if !MAIN_THREAD_ID - .get() - .map(|id| &std::thread::current().id() == id) - .unwrap_or(false) + if !THREADS_CAN_ACCESS_ENV + .get_or_init(Default::default) + .contains(&std::thread::current().id()) { let status = unsafe { sys::napi_call_threadsafe_function( CUSTOM_GC_TSFN.load(std::sync::atomic::Ordering::SeqCst), - ref_ as *mut c_void, + ref_.cast(), 1, ) }; diff --git a/crates/napi/src/bindgen_runtime/js_values/buffer.rs b/crates/napi/src/bindgen_runtime/js_values/buffer.rs index ff249e42..aab0fcef 100644 --- a/crates/napi/src/bindgen_runtime/js_values/buffer.rs +++ b/crates/napi/src/bindgen_runtime/js_values/buffer.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use std::sync::Mutex; #[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] -use crate::bindgen_prelude::{CUSTOM_GC_TSFN, CUSTOM_GC_TSFN_CLOSED, MAIN_THREAD_ID}; +use crate::bindgen_prelude::{CUSTOM_GC_TSFN, CUSTOM_GC_TSFN_CLOSED, THREADS_CAN_ACCESS_ENV}; use crate::{bindgen_prelude::*, check_status, sys, Result, ValueType}; #[cfg(all(debug_assertions, not(windows)))] @@ -36,18 +36,18 @@ impl Drop for Buffer { if let Some((ref_, env)) = self.raw { #[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] { - if CUSTOM_GC_TSFN_CLOSED.load(std::sync::atomic::Ordering::SeqCst) { + if CUSTOM_GC_TSFN_CLOSED.with(|closed| closed.load(std::sync::atomic::Ordering::Relaxed)) + { return; } - if !MAIN_THREAD_ID - .get() - .map(|id| &std::thread::current().id() == id) - .unwrap_or(false) + if !THREADS_CAN_ACCESS_ENV + .get_or_init(Default::default) + .contains(&std::thread::current().id()) { let status = unsafe { sys::napi_call_threadsafe_function( CUSTOM_GC_TSFN.load(std::sync::atomic::Ordering::SeqCst), - ref_ as *mut c_void, + ref_.cast(), 1, ) }; diff --git a/crates/napi/src/bindgen_runtime/module_register.rs b/crates/napi/src/bindgen_runtime/module_register.rs index b9d142a7..4788611a 100644 --- a/crates/napi/src/bindgen_runtime/module_register.rs +++ b/crates/napi/src/bindgen_runtime/module_register.rs @@ -1,5 +1,9 @@ use std::collections::{HashMap, HashSet}; use std::ffi::CStr; +#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] +use std::hash::Hash; +#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] +use std::ops::Deref; use std::ptr; use std::sync::atomic::AtomicUsize; use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; @@ -73,6 +77,45 @@ impl PersistedPerInstanceVec { unsafe impl Send for PersistedPerInstanceVec {} unsafe impl Sync for PersistedPerInstanceVec {} +#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] +pub(crate) struct PersistedPerInstanceHashSet { + inner: *mut HashSet, +} + +#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] +impl PersistedPerInstanceHashSet { + pub(crate) fn insert(&self, item: T) { + Box::leak(unsafe { Box::from_raw(self.inner) }).insert(item); + } + + fn remove(&self, item: &T) { + Box::leak(unsafe { Box::from_raw(self.inner) }).remove(item); + } +} + +#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] +impl Deref for PersistedPerInstanceHashSet { + type Target = HashSet; + + fn deref(&self) -> &Self::Target { + Box::leak(unsafe { Box::from_raw(self.inner) }) + } +} + +#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] +impl Default for PersistedPerInstanceHashSet { + fn default() -> Self { + Self { + inner: Box::leak(Box::default()), + } + } +} + +#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] +unsafe impl Send for PersistedPerInstanceHashSet {} +#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] +unsafe impl Sync for PersistedPerInstanceHashSet {} + pub(crate) struct PersistedPerInstanceHashMap(*mut HashMap); impl PersistedPerInstanceHashMap { @@ -121,12 +164,16 @@ static FN_REGISTER_MAP: Lazy = Lazy::new(Default::default); pub(crate) static CUSTOM_GC_TSFN: AtomicPtr = AtomicPtr::new(ptr::null_mut()); #[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] -// CustomGC ThreadsafeFunction may be deleted during the process exit. -// And there may still some Buffer alive after that. -pub(crate) static CUSTOM_GC_TSFN_CLOSED: AtomicBool = AtomicBool::new(false); +thread_local! { + // CustomGC ThreadsafeFunction may be deleted during the process exit. + // And there may still some Buffer alive after that. + pub(crate) static CUSTOM_GC_TSFN_CLOSED: AtomicBool = AtomicBool::new(false); +} #[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] -pub(crate) static MAIN_THREAD_ID: once_cell::sync::OnceCell = - once_cell::sync::OnceCell::new(); +// Store thread id of the thread that created the CustomGC ThreadsafeFunction. +pub(crate) static THREADS_CAN_ACCESS_ENV: once_cell::sync::OnceCell< + PersistedPerInstanceHashSet, +> = once_cell::sync::OnceCell::new(); type RegisteredClasses = PersistedPerInstanceHashMap; @@ -552,7 +599,7 @@ fn create_custom_gc(env: sys::napi_env) { &mut async_resource_name, ) }, - "Create async resource string in napi_register_module_v1 napi_register_module_v1" + "Create async resource string in napi_register_module_v1" ); let mut custom_gc_tsfn = ptr::null_mut(); check_status_or_throw!( @@ -579,8 +626,29 @@ fn create_custom_gc(env: sys::napi_env) { unsafe { sys::napi_unref_threadsafe_function(env, custom_gc_tsfn) }, "Unref Custom GC ThreadsafeFunction in napi_register_module_v1 failed" ); - CUSTOM_GC_TSFN.store(custom_gc_tsfn, Ordering::SeqCst); - MAIN_THREAD_ID.get_or_init(|| std::thread::current().id()); + CUSTOM_GC_TSFN.store(custom_gc_tsfn, Ordering::Relaxed); + let threads = THREADS_CAN_ACCESS_ENV.get_or_init(Default::default); + let current_thread_id = std::thread::current().id(); + threads.insert(current_thread_id); + check_status_or_throw!( + env, + unsafe { + sys::napi_add_env_cleanup_hook( + env, + Some(remove_thread_id), + Box::into_raw(Box::new(current_thread_id)).cast(), + ) + }, + "Failed to add remove thread id cleanup hook" + ); +} + +#[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] +unsafe extern "C" fn remove_thread_id(id: *mut std::ffi::c_void) { + let thread_id = unsafe { Box::from_raw(id.cast::()) }; + THREADS_CAN_ACCESS_ENV + .get_or_init(Default::default) + .remove(&*thread_id); } #[cfg(all(feature = "napi4", not(target_arch = "wasm32")))] @@ -596,7 +664,9 @@ unsafe extern "C" fn custom_gc_finalize( finalize_data: *mut std::ffi::c_void, finalize_hint: *mut std::ffi::c_void, ) { - CUSTOM_GC_TSFN_CLOSED.store(true, Ordering::SeqCst); + CUSTOM_GC_TSFN_CLOSED.with(|closed| { + closed.store(true, Ordering::Relaxed); + }); } #[cfg(all(feature = "napi4", not(target_arch = "wasm32")))]