Merge pull request #1332 from napi-rs/array-buffer-send
fix(napi): make Buffer/ArrayBuffer truely Send/Sync safe
This commit is contained in:
commit
5561502fd5
4 changed files with 176 additions and 5 deletions
|
@ -26,7 +26,7 @@ full = ["latin1", "napi8", "async", "serde-json", "experimental", "chrono_date"]
|
|||
latin1 = ["encoding_rs"]
|
||||
error_anyhow = ["anyhow"]
|
||||
napi1 = []
|
||||
napi2 = ["napi1"]
|
||||
napi2 = ["napi1", "napi-sys/napi2"]
|
||||
napi3 = ["napi2", "napi-sys/napi3"]
|
||||
napi4 = ["napi3", "napi-sys/napi4"]
|
||||
napi5 = ["napi4", "napi-sys/napi5"]
|
||||
|
|
|
@ -7,6 +7,8 @@ use std::sync::{
|
|||
Arc,
|
||||
};
|
||||
|
||||
#[cfg(feature = "napi4")]
|
||||
use crate::bindgen_prelude::{CUSTOM_GC_TSFN, CUSTOM_GC_TSFN_CLOSED, MAIN_THREAD_ID};
|
||||
pub use crate::js_values::TypedArrayType;
|
||||
use crate::{check_status, sys, Error, Result, Status};
|
||||
|
||||
|
@ -64,15 +66,40 @@ macro_rules! impl_typed_array {
|
|||
fn drop(&mut self) {
|
||||
if Arc::strong_count(&self.drop_in_vm) == 1 {
|
||||
if let Some((ref_, env)) = self.raw {
|
||||
#[cfg(feature = "napi4")]
|
||||
{
|
||||
if CUSTOM_GC_TSFN_CLOSED.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
return;
|
||||
}
|
||||
if !MAIN_THREAD_ID
|
||||
.get()
|
||||
.map(|id| &std::thread::current().id() == id)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
let status = unsafe {
|
||||
sys::napi_call_threadsafe_function(
|
||||
CUSTOM_GC_TSFN.load(std::sync::atomic::Ordering::SeqCst),
|
||||
ref_ as *mut c_void,
|
||||
1,
|
||||
)
|
||||
};
|
||||
assert!(
|
||||
status == sys::Status::napi_ok,
|
||||
"Call custom GC in ArrayBuffer::drop failed {:?}",
|
||||
Status::from(status)
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
crate::check_status_or_throw!(
|
||||
env,
|
||||
unsafe { sys::napi_reference_unref(env, ref_, &mut 0) },
|
||||
"Failed to unref Buffer reference in drop"
|
||||
"Failed to unref ArrayBuffer reference in drop"
|
||||
);
|
||||
crate::check_status_or_throw!(
|
||||
env,
|
||||
unsafe { sys::napi_delete_reference(env, ref_) },
|
||||
"Failed to delete Buffer reference in drop"
|
||||
"Failed to delete ArrayBuffer reference in drop"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -32,6 +32,31 @@ impl Drop for Buffer {
|
|||
fn drop(&mut self) {
|
||||
if Arc::strong_count(&self.ref_count) == 1 {
|
||||
if let Some((ref_, env)) = self.raw {
|
||||
#[cfg(feature = "napi4")]
|
||||
{
|
||||
if CUSTOM_GC_TSFN_CLOSED.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
return;
|
||||
}
|
||||
if !MAIN_THREAD_ID
|
||||
.get()
|
||||
.map(|id| &std::thread::current().id() == id)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
let status = unsafe {
|
||||
sys::napi_call_threadsafe_function(
|
||||
CUSTOM_GC_TSFN.load(std::sync::atomic::Ordering::SeqCst),
|
||||
ref_ as *mut c_void,
|
||||
1,
|
||||
)
|
||||
};
|
||||
assert!(
|
||||
status == sys::Status::napi_ok,
|
||||
"Call custom GC in Buffer::drop failed {:?}",
|
||||
Status::from(status)
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
let mut ref_count = 0;
|
||||
check_status_or_throw!(
|
||||
env,
|
||||
|
|
|
@ -1,11 +1,21 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
#[cfg(feature = "napi4")]
|
||||
use std::ffi::c_void;
|
||||
use std::ffi::CStr;
|
||||
use std::mem;
|
||||
#[cfg(feature = "napi4")]
|
||||
use std::os::raw::c_char;
|
||||
use std::ptr;
|
||||
use std::sync::atomic::{AtomicBool, AtomicPtr};
|
||||
use std::sync::{atomic::Ordering, Mutex};
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, AtomicPtr, Ordering},
|
||||
Mutex,
|
||||
};
|
||||
#[cfg(feature = "napi4")]
|
||||
use std::thread::ThreadId;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
#[cfg(feature = "napi4")]
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
use crate::{
|
||||
check_status, check_status_or_throw, sys, Env, JsError, JsFunction, Property, Result, Value,
|
||||
|
@ -16,6 +26,16 @@ pub type ExportRegisterCallback = unsafe fn(sys::napi_env) -> Result<sys::napi_v
|
|||
pub type ModuleExportsCallback =
|
||||
unsafe fn(env: sys::napi_env, exports: sys::napi_value) -> Result<()>;
|
||||
|
||||
#[cfg(feature = "napi4")]
|
||||
pub(crate) static CUSTOM_GC_TSFN: AtomicPtr<sys::napi_threadsafe_function__> =
|
||||
AtomicPtr::new(ptr::null_mut());
|
||||
#[cfg(feature = "napi4")]
|
||||
// CustomGC ThreadsafeFunction may be deleted during the process exit.
|
||||
// And there may still some Buffer alive after that.
|
||||
pub(crate) static CUSTOM_GC_TSFN_CLOSED: AtomicBool = AtomicBool::new(false);
|
||||
#[cfg(feature = "napi4")]
|
||||
pub(crate) static MAIN_THREAD_ID: OnceCell<ThreadId> = OnceCell::new();
|
||||
|
||||
struct PersistedSingleThreadVec<T> {
|
||||
inner: Mutex<Vec<T>>,
|
||||
}
|
||||
|
@ -458,6 +478,9 @@ unsafe extern "C" fn napi_register_module_v1(
|
|||
);
|
||||
}
|
||||
mem::drop(lock);
|
||||
|
||||
#[cfg(feature = "napi4")]
|
||||
create_custom_gc(env);
|
||||
REGISTERED.store(true, Ordering::SeqCst);
|
||||
exports
|
||||
}
|
||||
|
@ -479,3 +502,99 @@ pub(crate) unsafe extern "C" fn noop(
|
|||
}
|
||||
ptr::null_mut()
|
||||
}
|
||||
|
||||
#[cfg(feature = "napi4")]
|
||||
fn create_custom_gc(env: sys::napi_env) {
|
||||
let mut custom_gc_fn = ptr::null_mut();
|
||||
check_status_or_throw!(
|
||||
env,
|
||||
unsafe {
|
||||
sys::napi_create_function(
|
||||
env,
|
||||
"custom_gc".as_ptr() as *const c_char,
|
||||
9,
|
||||
Some(empty),
|
||||
ptr::null_mut(),
|
||||
&mut custom_gc_fn,
|
||||
)
|
||||
},
|
||||
"Create Custom GC Function in napi_register_module_v1 failed"
|
||||
);
|
||||
let mut async_resource_name = ptr::null_mut();
|
||||
check_status_or_throw!(
|
||||
env,
|
||||
unsafe {
|
||||
sys::napi_create_string_utf8(
|
||||
env,
|
||||
"CustomGC".as_ptr() as *const c_char,
|
||||
8,
|
||||
&mut async_resource_name,
|
||||
)
|
||||
},
|
||||
"Create async resource string in napi_register_module_v1 napi_register_module_v1"
|
||||
);
|
||||
let mut custom_gc_tsfn = ptr::null_mut();
|
||||
check_status_or_throw!(
|
||||
env,
|
||||
unsafe {
|
||||
sys::napi_create_threadsafe_function(
|
||||
env,
|
||||
custom_gc_fn,
|
||||
ptr::null_mut(),
|
||||
async_resource_name,
|
||||
0,
|
||||
1,
|
||||
ptr::null_mut(),
|
||||
Some(custom_gc_finalize),
|
||||
ptr::null_mut(),
|
||||
Some(custom_gc),
|
||||
&mut custom_gc_tsfn,
|
||||
)
|
||||
},
|
||||
"Create Custom GC ThreadsafeFunction in napi_register_module_v1 failed"
|
||||
);
|
||||
check_status_or_throw!(
|
||||
env,
|
||||
unsafe { sys::napi_unref_threadsafe_function(env, custom_gc_tsfn) },
|
||||
"Unref Custom GC ThreadsafeFunction in napi_register_module_v1 failed"
|
||||
);
|
||||
CUSTOM_GC_TSFN.store(custom_gc_tsfn, Ordering::SeqCst);
|
||||
MAIN_THREAD_ID.get_or_init(|| std::thread::current().id());
|
||||
}
|
||||
|
||||
#[cfg(feature = "napi4")]
|
||||
#[allow(unused)]
|
||||
unsafe extern "C" fn empty(env: sys::napi_env, info: sys::napi_callback_info) -> sys::napi_value {
|
||||
ptr::null_mut()
|
||||
}
|
||||
|
||||
#[cfg(feature = "napi4")]
|
||||
#[allow(unused)]
|
||||
unsafe extern "C" fn custom_gc_finalize(
|
||||
env: sys::napi_env,
|
||||
finalize_data: *mut c_void,
|
||||
finalize_hint: *mut c_void,
|
||||
) {
|
||||
CUSTOM_GC_TSFN_CLOSED.store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
#[cfg(feature = "napi4")]
|
||||
// recycle the ArrayBuffer/Buffer Reference if the ArrayBuffer/Buffer is not dropped on the main thread
|
||||
extern "C" fn custom_gc(
|
||||
env: sys::napi_env,
|
||||
_js_callback: sys::napi_value,
|
||||
_context: *mut c_void,
|
||||
data: *mut c_void,
|
||||
) {
|
||||
let mut ref_count = 0;
|
||||
check_status_or_throw!(
|
||||
env,
|
||||
unsafe { sys::napi_reference_unref(env, data as sys::napi_ref, &mut ref_count) },
|
||||
"Failed to unref Buffer reference in Custom GC"
|
||||
);
|
||||
check_status_or_throw!(
|
||||
env,
|
||||
unsafe { sys::napi_delete_reference(env, data as sys::napi_ref) },
|
||||
"Failed to delete Buffer reference in Custom GC"
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue