feat(napi): impl Clone and Drop for ThreadSafeFunction, return Status from ThreadSafeFunction::call

This commit is contained in:
LongYinan 2020-11-11 12:16:19 +08:00
parent f3bb57abfb
commit eb5f4931bd
No known key found for this signature in database
GPG key ID: A3FFE134A3E20881
5 changed files with 205 additions and 68 deletions

View file

@ -597,7 +597,7 @@ impl Env {
R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>, R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>,
>( >(
&self, &self,
func: JsFunction, func: &JsFunction,
max_queue_size: usize, max_queue_size: usize,
callback: R, callback: R,
) -> Result<ThreadsafeFunction<T>> { ) -> Result<ThreadsafeFunction<T>> {

View file

@ -2,12 +2,16 @@ use std::convert::Into;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::os::raw::{c_char, c_void}; use std::os::raw::{c_char, c_void};
use std::ptr; use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use crate::error::check_status; use crate::error::check_status;
use crate::{sys, Env, JsFunction, NapiValue, Result}; use crate::{sys, Env, Error, JsFunction, NapiValue, Result, Status};
use sys::napi_threadsafe_function_call_mode; use sys::napi_threadsafe_function_call_mode;
/// ThreadSafeFunction Context object
/// the `value` is the value passed to `call` method
pub struct ThreadSafeCallContext<T: 'static> { pub struct ThreadSafeCallContext<T: 'static> {
pub env: Env, pub env: Env,
pub value: T, pub value: T,
@ -44,40 +48,38 @@ impl Into<napi_threadsafe_function_call_mode> for ThreadsafeFunctionCallMode {
/// use std::thread; /// use std::thread;
/// ///
/// use napi::{ /// use napi::{
/// threadsafe_function::{ /// threadsafe_function::{
/// ThreadSafeCallContext, ThreadsafeFunctionCallMode, ThreadsafeFunctionReleaseMode, /// ThreadSafeCallContext, ThreadsafeFunctionCallMode, ThreadsafeFunctionReleaseMode,
/// }, /// },
/// CallContext, Error, JsFunction, JsNumber, JsUndefined, Result, Status, /// CallContext, Error, JsFunction, JsNumber, JsUndefined, Result, Status,
/// }; /// };
/// #[js_function(1)] /// #[js_function(1)]
/// pub fn test_threadsafe_function(ctx: CallContext) -> Result<JsUndefined> { /// pub fn test_threadsafe_function(ctx: CallContext) -> Result<JsUndefined> {
/// let func = ctx.get::<JsFunction>(0)?; /// let func = ctx.get::<JsFunction>(0)?;
/// let tsfn = /// let tsfn = ctx.env
/// ctx /// .create_threadsafe_function(func, 0, |ctx: ThreadSafeCallContext<Vec<u32>>| {
/// .env /// ctx.value
/// .create_threadsafe_function(func, 0, |ctx: ThreadSafeCallContext<Vec<u32>>| { /// .iter()
/// ctx /// .map(|v| ctx.env.create_uint32(*v))]
/// .value /// .collect::<Result<Vec<JsNumber>>>()
/// .iter() /// })?;
/// .map(|v| ctx.env.create_uint32(*v))
/// .collect::<Result<Vec<JsNumber>>>()
/// })?;
/// thread::spawn(move || { /// thread::spawn(move || {
/// let output: Vec<u32> = vec![42, 1, 2, 3]; /// let output: Vec<u32> = vec![42, 1, 2, 3];
/// /// It's okay to call a threadsafe function multiple times. /// // It's okay to call a threadsafe function multiple times.
/// tsfn.call(Ok(output.clone()), ThreadsafeFunctionCallMode::Blocking); /// tsfn.call(Ok(output.clone()), ThreadsafeFunctionCallMode::Blocking);
/// tsfn.call(Ok(output.clone()), ThreadsafeFunctionCallMode::NonBlocking); /// tsfn.call(Ok(output.clone()), ThreadsafeFunctionCallMode::NonBlocking);
/// tsfn.release(ThreadsafeFunctionReleaseMode::Release); /// tsfn.release(ThreadsafeFunctionReleaseMode::Release);
/// }); /// });
/// ctx.env.get_undefined() /// ctx.env.get_undefined()
/// } /// }
/// ``` /// ```
pub struct ThreadsafeFunction<T: 'static> { pub struct ThreadsafeFunction<T: 'static> {
raw_tsfn: sys::napi_threadsafe_function, raw_tsfn: sys::napi_threadsafe_function,
aborted: Arc<AtomicBool>,
_phantom: PhantomData<T>, _phantom: PhantomData<T>,
} }
@ -98,7 +100,7 @@ impl<T: 'static> ThreadsafeFunction<T> {
R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>, R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>,
>( >(
env: sys::napi_env, env: sys::napi_env,
func: JsFunction, func: &JsFunction,
max_queue_size: usize, max_queue_size: usize,
callback: R, callback: R,
) -> Result<Self> { ) -> Result<Self> {
@ -135,59 +137,92 @@ impl<T: 'static> ThreadsafeFunction<T> {
Ok(ThreadsafeFunction { Ok(ThreadsafeFunction {
raw_tsfn, raw_tsfn,
aborted: Arc::new(AtomicBool::new(false)),
_phantom: PhantomData, _phantom: PhantomData,
}) })
} }
/// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function) /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function)
/// for more information. /// for more information.
pub fn call(&self, value: Result<T>, mode: ThreadsafeFunctionCallMode) { pub fn call(&self, value: Result<T>, mode: ThreadsafeFunctionCallMode) -> Status {
let status = unsafe { if self.aborted.load(Ordering::Acquire) {
return Status::Closing;
}
unsafe {
sys::napi_call_threadsafe_function( sys::napi_call_threadsafe_function(
self.raw_tsfn, self.raw_tsfn,
Box::into_raw(Box::new(value)) as *mut _, Box::into_raw(Box::new(value)) as *mut _,
mode.into(), mode.into(),
) )
}; }
debug_assert!( .into()
status == sys::napi_status::napi_ok,
"Threadsafe Function call failed"
);
}
/// See [napi_acquire_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_acquire_threadsafe_function)
/// for more information.
pub fn acquire(&self) {
let status = unsafe { sys::napi_acquire_threadsafe_function(self.raw_tsfn) };
debug_assert!(
status == sys::napi_status::napi_ok,
"Threadsafe Function acquire failed"
);
}
/// See [napi_release_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_release_threadsafe_function)
/// for more information.
pub fn release(self, mode: ThreadsafeFunctionReleaseMode) {
let status = unsafe { sys::napi_release_threadsafe_function(self.raw_tsfn, mode.into()) };
debug_assert!(
status == sys::napi_status::napi_ok,
"Threadsafe Function call failed"
);
} }
/// See [napi_ref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_ref_threadsafe_function) /// See [napi_ref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_ref_threadsafe_function)
/// for more information. /// for more information.
/// ///
/// "ref" is a keyword so that we use "refer" here. /// "ref" is a keyword so that we use "refer" here.
pub fn refer(&self, env: &Env) -> Result<()> { pub fn refer(&mut self, env: &Env) -> Result<()> {
check_status(unsafe { sys::napi_ref_threadsafe_function(env.0, self.raw_tsfn) }) check_status(unsafe { sys::napi_ref_threadsafe_function(env.0, self.raw_tsfn) })
} }
/// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function) /// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function)
/// for more information. /// for more information.
pub fn unref(&self, env: &Env) -> Result<()> { pub fn unref(&mut self, env: &Env) -> Result<()> {
check_status(unsafe { sys::napi_unref_threadsafe_function(env.0, self.raw_tsfn) }) check_status(unsafe { sys::napi_unref_threadsafe_function(env.0, self.raw_tsfn) })
} }
pub fn aborted(&self) -> bool {
self.aborted.load(Ordering::Acquire)
}
pub fn abort(self) -> Result<()> {
check_status(unsafe {
sys::napi_release_threadsafe_function(
self.raw_tsfn,
sys::napi_threadsafe_function_release_mode::napi_tsfn_abort,
)
})?;
self.aborted.store(true, Ordering::Release);
Ok(())
}
pub fn try_clone(&self) -> Result<Self> {
if self.aborted.load(Ordering::Acquire) {
return Err(Error::new(
Status::Closing,
format!("Thread safe function already aborted"),
));
}
check_status(unsafe { sys::napi_acquire_threadsafe_function(self.raw_tsfn) })?;
Ok(Self {
raw_tsfn: self.raw_tsfn,
aborted: Arc::clone(&self.aborted),
_phantom: PhantomData,
})
}
/// Get the raw `ThreadSafeFunction` pointer
pub fn raw(&self) -> sys::napi_threadsafe_function {
self.raw_tsfn
}
}
impl<T: 'static> Drop for ThreadsafeFunction<T> {
fn drop(&mut self) {
if !self.aborted.load(Ordering::Acquire) {
let release_status = unsafe {
sys::napi_release_threadsafe_function(
self.raw_tsfn,
sys::napi_threadsafe_function_release_mode::napi_tsfn_release,
)
};
assert!(
release_status == sys::Status::napi_ok,
"Threadsafe Function release failed"
);
}
}
} }
unsafe extern "C" fn thread_finalize_cb<T: 'static, V: NapiValue>( unsafe extern "C" fn thread_finalize_cb<T: 'static, V: NapiValue>(

View file

@ -16,7 +16,11 @@ test('should get js function called from a thread', async (t) => {
bindings.testThreadsafeFunction((...args: any[]) => { bindings.testThreadsafeFunction((...args: any[]) => {
called += 1 called += 1
try { try {
t.deepEqual(args, [null, 42, 1, 2, 3]) if (args[1] === 0) {
t.deepEqual(args, [null, 0, 1, 2, 3])
} else {
t.deepEqual(args, [null, 3, 2, 1, 0])
}
} catch (err) { } catch (err) {
reject(err) reject(err)
} }
@ -27,3 +31,27 @@ test('should get js function called from a thread', async (t) => {
}) })
}) })
}) })
test('should be able to abort tsfn', (t) => {
if (napiVersion < 4) {
t.is(bindings.testAbortThreadsafeFunction, undefined)
return
}
t.true(bindings.testAbortThreadsafeFunction(() => {}))
})
test('should be able to abort independent tsfn', (t) => {
if (napiVersion < 4) {
t.is(bindings.testAbortIndependentThreadsafeFunction, undefined)
return
}
t.false(bindings.testAbortIndependentThreadsafeFunction(() => {}))
})
test('should return Closing while calling aborted tsfn', (t) => {
if (napiVersion < 4) {
t.is(bindings.testCallAbortedThreadsafeFunction, undefined)
return
}
t.notThrows(() => bindings.testCallAbortedThreadsafeFunction(() => {}))
})

View file

@ -8,5 +8,17 @@ pub fn register_js(module: &mut Module) -> Result<()> {
module.create_named_method("testThreadsafeFunction", test_threadsafe_function)?; module.create_named_method("testThreadsafeFunction", test_threadsafe_function)?;
module.create_named_method("testTsfnError", test_tsfn_error)?; module.create_named_method("testTsfnError", test_tsfn_error)?;
module.create_named_method("testTokioReadfile", test_tokio_readfile)?; module.create_named_method("testTokioReadfile", test_tokio_readfile)?;
module.create_named_method(
"testAbortThreadsafeFunction",
test_abort_threadsafe_function,
)?;
module.create_named_method(
"testAbortIndependentThreadsafeFunction",
test_abort_independent_threadsafe_function,
)?;
module.create_named_method(
"testCallAbortedThreadsafeFunction",
test_call_aborted_threadsafe_function,
)?;
Ok(()) Ok(())
} }

View file

@ -2,10 +2,8 @@ use std::path::Path;
use std::thread; use std::thread;
use napi::{ use napi::{
threadsafe_function::{ threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunctionCallMode},
ThreadSafeCallContext, ThreadsafeFunctionCallMode, ThreadsafeFunctionReleaseMode, CallContext, Error, JsBoolean, JsFunction, JsNumber, JsString, JsUndefined, Result, Status,
},
CallContext, Error, JsFunction, JsNumber, JsString, JsUndefined, Result, Status,
}; };
use tokio; use tokio;
@ -16,7 +14,7 @@ pub fn test_threadsafe_function(ctx: CallContext) -> Result<JsUndefined> {
let tsfn = let tsfn =
ctx ctx
.env .env
.create_threadsafe_function(func, 0, |ctx: ThreadSafeCallContext<Vec<u32>>| { .create_threadsafe_function(&func, 0, |ctx: ThreadSafeCallContext<Vec<u32>>| {
ctx ctx
.value .value
.iter() .iter()
@ -24,14 +22,80 @@ pub fn test_threadsafe_function(ctx: CallContext) -> Result<JsUndefined> {
.collect::<Result<Vec<JsNumber>>>() .collect::<Result<Vec<JsNumber>>>()
})?; })?;
let tsfn_cloned = tsfn.try_clone()?;
thread::spawn(move || { thread::spawn(move || {
let output: Vec<u32> = vec![42, 1, 2, 3]; let output: Vec<u32> = vec![0, 1, 2, 3];
// It's okay to call a threadsafe function multiple times. // It's okay to call a threadsafe function multiple times.
tsfn.call(Ok(output.clone()), ThreadsafeFunctionCallMode::Blocking); tsfn.call(Ok(output.clone()), ThreadsafeFunctionCallMode::Blocking);
tsfn.call(Ok(output.clone()), ThreadsafeFunctionCallMode::NonBlocking);
tsfn.release(ThreadsafeFunctionReleaseMode::Release);
}); });
thread::spawn(move || {
let output: Vec<u32> = vec![3, 2, 1, 0];
// It's okay to call a threadsafe function multiple times.
tsfn_cloned.call(Ok(output.clone()), ThreadsafeFunctionCallMode::NonBlocking);
});
ctx.env.get_undefined()
}
#[js_function(1)]
pub fn test_abort_threadsafe_function(ctx: CallContext) -> Result<JsBoolean> {
let func = ctx.get::<JsFunction>(0)?;
let tsfn =
ctx
.env
.create_threadsafe_function(&func, 0, |ctx: ThreadSafeCallContext<Vec<u32>>| {
ctx
.value
.iter()
.map(|v| ctx.env.create_uint32(*v))
.collect::<Result<Vec<JsNumber>>>()
})?;
let tsfn_cloned = tsfn.try_clone()?;
tsfn_cloned.abort()?;
ctx.env.get_boolean(tsfn.aborted())
}
#[js_function(1)]
pub fn test_abort_independent_threadsafe_function(ctx: CallContext) -> Result<JsBoolean> {
let func = ctx.get::<JsFunction>(0)?;
let tsfn = ctx
.env
.create_threadsafe_function(&func, 0, |ctx: ThreadSafeCallContext<u32>| {
ctx.env.create_uint32(ctx.value).map(|v| vec![v])
})?;
let tsfn_other =
ctx
.env
.create_threadsafe_function(&func, 0, |ctx: ThreadSafeCallContext<u32>| {
ctx.env.create_uint32(ctx.value).map(|v| vec![v])
})?;
tsfn_other.abort()?;
ctx.env.get_boolean(tsfn.aborted())
}
#[js_function(1)]
pub fn test_call_aborted_threadsafe_function(ctx: CallContext) -> Result<JsUndefined> {
let func = ctx.get::<JsFunction>(0)?;
let tsfn = ctx
.env
.create_threadsafe_function(&func, 0, |ctx: ThreadSafeCallContext<u32>| {
ctx.env.create_uint32(ctx.value).map(|v| vec![v])
})?;
let tsfn_clone = tsfn.try_clone()?;
tsfn_clone.abort()?;
let call_status = tsfn.call(Ok(1), ThreadsafeFunctionCallMode::NonBlocking);
assert!(call_status == Status::Closing);
ctx.env.get_undefined() ctx.env.get_undefined()
} }
@ -40,7 +104,7 @@ pub fn test_tsfn_error(ctx: CallContext) -> Result<JsUndefined> {
let func = ctx.get::<JsFunction>(0)?; let func = ctx.get::<JsFunction>(0)?;
let tsfn = ctx let tsfn = ctx
.env .env
.create_threadsafe_function(func, 0, |ctx: ThreadSafeCallContext<()>| { .create_threadsafe_function(&func, 0, |ctx: ThreadSafeCallContext<()>| {
ctx.env.get_undefined().map(|v| vec![v]) ctx.env.get_undefined().map(|v| vec![v])
})?; })?;
thread::spawn(move || { thread::spawn(move || {
@ -48,7 +112,6 @@ pub fn test_tsfn_error(ctx: CallContext) -> Result<JsUndefined> {
Err(Error::new(Status::GenericFailure, "invalid".to_owned())), Err(Error::new(Status::GenericFailure, "invalid".to_owned())),
ThreadsafeFunctionCallMode::Blocking, ThreadsafeFunctionCallMode::Blocking,
); );
tsfn.release(ThreadsafeFunctionReleaseMode::Release);
}); });
ctx.env.get_undefined() ctx.env.get_undefined()
@ -69,7 +132,7 @@ pub fn test_tokio_readfile(ctx: CallContext) -> Result<JsUndefined> {
let tsfn = let tsfn =
ctx ctx
.env .env
.create_threadsafe_function(js_func, 0, |ctx: ThreadSafeCallContext<Vec<u8>>| { .create_threadsafe_function(&js_func, 0, |ctx: ThreadSafeCallContext<Vec<u8>>| {
ctx ctx
.env .env
.create_buffer_with_data(ctx.value) .create_buffer_with_data(ctx.value)
@ -81,7 +144,6 @@ pub fn test_tokio_readfile(ctx: CallContext) -> Result<JsUndefined> {
rt.block_on(async move { rt.block_on(async move {
let ret = read_file_content(&Path::new(&path_str)).await; let ret = read_file_content(&Path::new(&path_str)).await;
tsfn.call(ret, ThreadsafeFunctionCallMode::Blocking); tsfn.call(ret, ThreadsafeFunctionCallMode::Blocking);
tsfn.release(ThreadsafeFunctionReleaseMode::Release);
}); });
ctx.env.get_undefined() ctx.env.get_undefined()