fix(napi): do not call release tsfn in Drop when ref count is 0

This commit is contained in:
LongYinan 2021-03-31 21:31:37 +08:00
parent 3e239f69b2
commit f0b8c5da07
No known key found for this signature in database
GPG key ID: C3666B7FC82ADAD7
7 changed files with 73 additions and 8 deletions

View file

@ -3,7 +3,7 @@ use std::ffi::CString;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::os::raw::c_void; use std::os::raw::c_void;
use std::ptr; use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use crate::{check_status, sys, Env, Error, JsError, JsFunction, NapiValue, Result, Status}; use crate::{check_status, sys, Env, Error, JsError, JsFunction, NapiValue, Result, Status};
@ -150,6 +150,7 @@ type_level_enum! {
pub struct ThreadsafeFunction<T: 'static, ES: ErrorStrategy::T = ErrorStrategy::CalleeHandled> { pub struct ThreadsafeFunction<T: 'static, ES: ErrorStrategy::T = ErrorStrategy::CalleeHandled> {
raw_tsfn: sys::napi_threadsafe_function, raw_tsfn: sys::napi_threadsafe_function,
aborted: Arc<AtomicBool>, aborted: Arc<AtomicBool>,
ref_count: Arc<AtomicUsize>,
_phantom: PhantomData<(T, ES)>, _phantom: PhantomData<(T, ES)>,
} }
@ -166,6 +167,7 @@ impl<T: 'static, ES: ErrorStrategy::T> Clone for ThreadsafeFunction<T, ES> {
Self { Self {
raw_tsfn: self.raw_tsfn, raw_tsfn: self.raw_tsfn,
aborted: Arc::clone(&self.aborted), aborted: Arc::clone(&self.aborted),
ref_count: Arc::clone(&self.ref_count),
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
@ -217,6 +219,7 @@ impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
Ok(ThreadsafeFunction { Ok(ThreadsafeFunction {
raw_tsfn, raw_tsfn,
aborted: Arc::new(AtomicBool::new(false)), aborted: Arc::new(AtomicBool::new(false)),
ref_count: Arc::new(AtomicUsize::new(initial_thread_count)),
_phantom: PhantomData, _phantom: PhantomData,
}) })
} }
@ -232,6 +235,7 @@ impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
format!("Can not ref, Thread safe function already aborted"), format!("Can not ref, Thread safe function already aborted"),
)); ));
} }
self.ref_count.fetch_add(1, Ordering::Acquire);
check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.raw_tsfn) }) check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.raw_tsfn) })
} }
@ -244,6 +248,7 @@ impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
format!("Can not unref, Thread safe function already aborted"), format!("Can not unref, Thread safe function already aborted"),
)); ));
} }
self.ref_count.fetch_sub(1, Ordering::Acquire);
check_status!(unsafe { sys::napi_unref_threadsafe_function(env.0, self.raw_tsfn) }) check_status!(unsafe { sys::napi_unref_threadsafe_function(env.0, self.raw_tsfn) })
} }
@ -306,7 +311,7 @@ impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::Fatal> {
impl<T: 'static, ES: ErrorStrategy::T> Drop for ThreadsafeFunction<T, ES> { impl<T: 'static, ES: ErrorStrategy::T> Drop for ThreadsafeFunction<T, ES> {
fn drop(&mut self) { fn drop(&mut self) {
if !self.aborted.load(Ordering::Acquire) { if !self.aborted.load(Ordering::Acquire) && self.ref_count.load(Ordering::Relaxed) > 0usize {
let release_status = unsafe { let release_status = unsafe {
sys::napi_release_threadsafe_function( sys::napi_release_threadsafe_function(
self.raw_tsfn, self.raw_tsfn,

View file

@ -86,3 +86,13 @@ test('should be able to throw error in tsfn', (t) => {
execSync(`node ${join(__dirname, 'tsfn-throw.js')}`) execSync(`node ${join(__dirname, 'tsfn-throw.js')}`)
}) })
}) })
test('tsfn dua instance', (t) => {
if (napiVersion < 4) {
t.is(bindings.A, undefined)
return
}
t.notThrows(() => {
execSync(`node ${join(__dirname, 'tsfn-dua-instance.js')}`)
})
})

View file

@ -0,0 +1,11 @@
const bindings = require('../../index.node')
async function main() {
await Promise.resolve()
new bindings.A((s) => console.info(s))
new bindings.A((s) => console.info(s))
}
main().catch((e) => {
console.error(e)
})

View file

@ -9,7 +9,7 @@ test('should call callback with the first arguments as an Error', async (t) => {
t.is(bindings.testTsfnError, undefined) t.is(bindings.testTsfnError, undefined)
return return
} }
await new Promise((resolve, reject) => { await new Promise<void>((resolve, reject) => {
bindings.testTsfnError((err: Error) => { bindings.testTsfnError((err: Error) => {
try { try {
t.is(err instanceof Error, true) t.is(err instanceof Error, true)

View file

@ -3,7 +3,7 @@ extern crate napi_derive;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
use napi::{JsObject, Result}; use napi::{Env, JsObject, Result};
mod cleanup_env; mod cleanup_env;
#[cfg(feature = "latest")] #[cfg(feature = "latest")]
@ -35,7 +35,7 @@ mod task;
use napi_version::get_napi_version; use napi_version::get_napi_version;
#[module_exports] #[module_exports]
fn init(mut exports: JsObject) -> Result<()> { fn init(mut exports: JsObject, env: Env) -> Result<()> {
exports.create_named_method("getNapiVersion", get_napi_version)?; exports.create_named_method("getNapiVersion", get_napi_version)?;
array::register_js(&mut exports)?; array::register_js(&mut exports)?;
error::register_js(&mut exports)?; error::register_js(&mut exports)?;
@ -54,7 +54,7 @@ fn init(mut exports: JsObject) -> Result<()> {
global::register_js(&mut exports)?; global::register_js(&mut exports)?;
cleanup_env::register_js(&mut exports)?; cleanup_env::register_js(&mut exports)?;
#[cfg(feature = "latest")] #[cfg(feature = "latest")]
napi4::register_js(&mut exports)?; napi4::register_js(&mut exports, &env)?;
#[cfg(feature = "latest")] #[cfg(feature = "latest")]
tokio_rt::register_js(&mut exports)?; tokio_rt::register_js(&mut exports)?;
#[cfg(feature = "latest")] #[cfg(feature = "latest")]

View file

@ -1,10 +1,12 @@
use napi::{JsObject, Result}; use napi::{Env, JsObject, Result};
mod tsfn; mod tsfn;
mod tsfn_dua_instance;
use tsfn::*; use tsfn::*;
use tsfn_dua_instance::constructor;
pub fn register_js(exports: &mut JsObject) -> Result<()> { pub fn register_js(exports: &mut JsObject, env: &Env) -> Result<()> {
exports.create_named_method("testThreadsafeFunction", test_threadsafe_function)?; exports.create_named_method("testThreadsafeFunction", test_threadsafe_function)?;
exports.create_named_method("testTsfnError", test_tsfn_error)?; exports.create_named_method("testTsfnError", test_tsfn_error)?;
exports.create_named_method("testTokioReadfile", test_tokio_readfile)?; exports.create_named_method("testTokioReadfile", test_tokio_readfile)?;
@ -21,5 +23,9 @@ pub fn register_js(exports: &mut JsObject) -> Result<()> {
test_call_aborted_threadsafe_function, test_call_aborted_threadsafe_function,
)?; )?;
exports.create_named_method("testTsfnWithRef", test_tsfn_with_ref)?; exports.create_named_method("testTsfnWithRef", test_tsfn_with_ref)?;
let obj = env.define_class("A", constructor, &[])?;
exports.set_named_property("A", obj)?;
Ok(()) Ok(())
} }

View file

@ -0,0 +1,33 @@
use napi::{
threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction},
CallContext, JsFunction, JsObject, JsUndefined,
};
use napi_derive::js_function;
#[derive(Clone)]
struct A {
cb: ThreadsafeFunction<String>,
}
#[js_function(1)]
pub fn constructor(ctx: CallContext) -> napi::Result<JsUndefined> {
let callback = ctx.get::<JsFunction>(0)?;
let mut cb =
ctx
.env
.create_threadsafe_function(&callback, 0, |ctx: ThreadSafeCallContext<String>| {
ctx
.env
.create_string_from_std(ctx.value)
.map(|js_string| vec![js_string])
})?;
cb.unref(&ctx.env)?;
let mut this: JsObject = ctx.this_unchecked();
let obj = A { cb };
ctx.env.wrap(&mut this, obj)?;
ctx.env.get_undefined()
}