feat(napi): refactor ThreadsafeFunction to allow get return value of it (#1427)
This commit is contained in:
parent
5492a0b9e9
commit
dc3a4c9f25
11 changed files with 354 additions and 113 deletions
|
@ -1116,6 +1116,23 @@ impl Env {
|
|||
Ok(unsafe { JsObject::from_raw_unchecked(self.0, promise) })
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
|
||||
pub fn spawn_future<
|
||||
T: 'static + Send + ToNapiValue,
|
||||
F: 'static + Send + Future<Output = Result<T>>,
|
||||
>(
|
||||
&self,
|
||||
fut: F,
|
||||
) -> Result<JsObject> {
|
||||
use crate::tokio_runtime;
|
||||
|
||||
let promise = tokio_runtime::execute_tokio_future(self.0, fut, |env, val| unsafe {
|
||||
ToNapiValue::to_napi_value(env, val)
|
||||
})?;
|
||||
|
||||
Ok(unsafe { JsObject::from_raw_unchecked(self.0, promise) })
|
||||
}
|
||||
|
||||
/// Creates a deferred promise, which can be resolved or rejected from a background thread.
|
||||
#[cfg(feature = "napi4")]
|
||||
pub fn create_deferred<Data: ToNapiValue, Resolver: FnOnce(Env) -> Result<Data>>(
|
||||
|
|
|
@ -5,11 +5,11 @@ use std::ffi::CString;
|
|||
use std::marker::PhantomData;
|
||||
use std::os::raw::c_void;
|
||||
use std::ptr;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use crate::bindgen_runtime::ToNapiValue;
|
||||
use crate::{check_status, sys, Env, Error, JsError, Result, Status};
|
||||
use crate::bindgen_runtime::{FromNapiValue, ToNapiValue};
|
||||
use crate::{check_status, sys, Env, JsError, JsUnknown, Result, Status};
|
||||
|
||||
/// ThreadSafeFunction Context object
|
||||
/// the `value` is the value passed to `call` method
|
||||
|
@ -96,6 +96,46 @@ type_level_enum! {
|
|||
}
|
||||
}
|
||||
|
||||
struct ThreadsafeFunctionHandle {
|
||||
raw: sys::napi_threadsafe_function,
|
||||
aborted: RwLock<bool>,
|
||||
referred: AtomicBool,
|
||||
}
|
||||
|
||||
unsafe impl Send for ThreadsafeFunctionHandle {}
|
||||
unsafe impl Sync for ThreadsafeFunctionHandle {}
|
||||
|
||||
impl Drop for ThreadsafeFunctionHandle {
|
||||
fn drop(&mut self) {
|
||||
let aborted_guard = self
|
||||
.aborted
|
||||
.read()
|
||||
.expect("Threadsafe Function aborted lock failed");
|
||||
if !*aborted_guard && self.referred.load(Ordering::Acquire) {
|
||||
let release_status = unsafe {
|
||||
sys::napi_release_threadsafe_function(self.raw, sys::ThreadsafeFunctionReleaseMode::release)
|
||||
};
|
||||
assert!(
|
||||
release_status == sys::Status::napi_ok,
|
||||
"Threadsafe Function release failed {}",
|
||||
Status::from(release_status)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(u8)]
|
||||
enum ThreadsafeFunctionCallVariant {
|
||||
Direct,
|
||||
WithCallback,
|
||||
}
|
||||
|
||||
struct ThreadsafeFunctionCallJsBackData<T> {
|
||||
data: T,
|
||||
call_variant: ThreadsafeFunctionCallVariant,
|
||||
callback: Box<dyn FnOnce(JsUnknown) -> Result<()>>,
|
||||
}
|
||||
|
||||
/// Communicate with the addon's main thread by invoking a JavaScript function from other threads.
|
||||
///
|
||||
/// ## Example
|
||||
|
@ -146,41 +186,28 @@ type_level_enum! {
|
|||
/// }
|
||||
/// ```
|
||||
pub struct ThreadsafeFunction<T: 'static, ES: ErrorStrategy::T = ErrorStrategy::CalleeHandled> {
|
||||
raw_tsfn: sys::napi_threadsafe_function,
|
||||
aborted: Arc<Mutex<bool>>,
|
||||
ref_count: Arc<AtomicUsize>,
|
||||
handle: Arc<ThreadsafeFunctionHandle>,
|
||||
_phantom: PhantomData<(T, ES)>,
|
||||
}
|
||||
|
||||
impl<T: 'static, ES: ErrorStrategy::T> Clone for ThreadsafeFunction<T, ES> {
|
||||
fn clone(&self) -> Self {
|
||||
let is_aborted = self.aborted.lock().unwrap();
|
||||
if !*is_aborted {
|
||||
let acquire_status = unsafe { sys::napi_acquire_threadsafe_function(self.raw_tsfn) };
|
||||
debug_assert!(
|
||||
acquire_status == sys::Status::napi_ok,
|
||||
"Acquire threadsafe function failed in clone"
|
||||
);
|
||||
} else {
|
||||
let aborted_guard = self
|
||||
.handle
|
||||
.aborted
|
||||
.read()
|
||||
.expect("Threadsafe Function aborted lock failed");
|
||||
if *aborted_guard {
|
||||
panic!("ThreadsafeFunction was aborted, can not clone it");
|
||||
}
|
||||
|
||||
self.ref_count.fetch_add(1, Ordering::AcqRel);
|
||||
|
||||
drop(is_aborted);
|
||||
|
||||
Self {
|
||||
raw_tsfn: self.raw_tsfn,
|
||||
aborted: Arc::clone(&self.aborted),
|
||||
ref_count: Arc::clone(&self.ref_count),
|
||||
handle: self.handle.clone(),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl<T, ES: ErrorStrategy::T> Send for ThreadsafeFunction<T, ES> {}
|
||||
unsafe impl<T, ES: ErrorStrategy::T> Sync for ThreadsafeFunction<T, ES> {}
|
||||
|
||||
impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
|
||||
/// See [napi_create_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_create_threadsafe_function)
|
||||
/// for more information.
|
||||
|
@ -201,11 +228,8 @@ impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
|
|||
sys::napi_create_string_utf8(env, s.as_ptr(), len, &mut async_resource_name)
|
||||
})?;
|
||||
|
||||
let initial_thread_count = 1usize;
|
||||
let mut raw_tsfn = ptr::null_mut();
|
||||
let ptr = Box::into_raw(Box::new(callback)) as *mut c_void;
|
||||
let aborted = Arc::new(Mutex::new(false));
|
||||
let aborted_ptr = Arc::into_raw(aborted.clone());
|
||||
let callback_ptr = Box::into_raw(Box::new(callback));
|
||||
check_status!(unsafe {
|
||||
sys::napi_create_threadsafe_function(
|
||||
env,
|
||||
|
@ -213,19 +237,23 @@ impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
|
|||
ptr::null_mut(),
|
||||
async_resource_name,
|
||||
max_queue_size,
|
||||
initial_thread_count,
|
||||
aborted_ptr as *mut c_void,
|
||||
1,
|
||||
ptr::null_mut(),
|
||||
Some(thread_finalize_cb::<T, V, R>),
|
||||
ptr,
|
||||
callback_ptr.cast(),
|
||||
Some(call_js_cb::<T, V, R, ES>),
|
||||
&mut raw_tsfn,
|
||||
)
|
||||
})?;
|
||||
|
||||
check_status!(unsafe { sys::napi_acquire_threadsafe_function(raw_tsfn) })?;
|
||||
|
||||
Ok(ThreadsafeFunction {
|
||||
raw_tsfn,
|
||||
aborted,
|
||||
ref_count: Arc::new(AtomicUsize::new(initial_thread_count)),
|
||||
handle: Arc::new(ThreadsafeFunctionHandle {
|
||||
raw: raw_tsfn,
|
||||
aborted: RwLock::new(false),
|
||||
referred: AtomicBool::new(true),
|
||||
}),
|
||||
_phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
|
@ -235,54 +263,63 @@ impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
|
|||
///
|
||||
/// "ref" is a keyword so that we use "refer" here.
|
||||
pub fn refer(&mut self, env: &Env) -> Result<()> {
|
||||
let is_aborted = self.aborted.lock().unwrap();
|
||||
if *is_aborted {
|
||||
return Err(Error::new(
|
||||
Status::Closing,
|
||||
"Can not ref, Thread safe function already aborted".to_string(),
|
||||
));
|
||||
let aborted_guard = self
|
||||
.handle
|
||||
.aborted
|
||||
.read()
|
||||
.expect("Threadsafe Function aborted lock failed");
|
||||
if !*aborted_guard && !self.handle.referred.load(Ordering::Acquire) {
|
||||
check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.handle.raw) })?;
|
||||
self.handle.referred.store(true, Ordering::Release);
|
||||
}
|
||||
drop(is_aborted);
|
||||
self.ref_count.fetch_add(1, Ordering::AcqRel);
|
||||
check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.raw_tsfn) })
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function)
|
||||
/// for more information.
|
||||
pub fn unref(&mut self, env: &Env) -> Result<()> {
|
||||
let is_aborted = self.aborted.lock().unwrap();
|
||||
if *is_aborted {
|
||||
return Err(Error::new(
|
||||
Status::Closing,
|
||||
"Can not unref, Thread safe function already aborted".to_string(),
|
||||
));
|
||||
let aborted_guard = self
|
||||
.handle
|
||||
.aborted
|
||||
.read()
|
||||
.expect("Threadsafe Function aborted lock failed");
|
||||
if !*aborted_guard && self.handle.referred.load(Ordering::Acquire) {
|
||||
check_status!(unsafe { sys::napi_unref_threadsafe_function(env.0, self.handle.raw) })?;
|
||||
self.handle.referred.store(false, Ordering::Release);
|
||||
}
|
||||
self.ref_count.fetch_sub(1, Ordering::AcqRel);
|
||||
check_status!(unsafe { sys::napi_unref_threadsafe_function(env.0, self.raw_tsfn) })
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn aborted(&self) -> bool {
|
||||
let is_aborted = self.aborted.lock().unwrap();
|
||||
*is_aborted
|
||||
let aborted_guard = self
|
||||
.handle
|
||||
.aborted
|
||||
.read()
|
||||
.expect("Threadsafe Function aborted lock failed");
|
||||
*aborted_guard
|
||||
}
|
||||
|
||||
pub fn abort(self) -> Result<()> {
|
||||
let mut is_aborted = self.aborted.lock().unwrap();
|
||||
if !*is_aborted {
|
||||
let mut aborted_guard = self
|
||||
.handle
|
||||
.aborted
|
||||
.write()
|
||||
.expect("Threadsafe Function aborted lock failed");
|
||||
if !*aborted_guard {
|
||||
check_status!(unsafe {
|
||||
sys::napi_release_threadsafe_function(
|
||||
self.raw_tsfn,
|
||||
self.handle.raw,
|
||||
sys::ThreadsafeFunctionReleaseMode::abort,
|
||||
)
|
||||
})?;
|
||||
*aborted_guard = true;
|
||||
}
|
||||
*is_aborted = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the raw `ThreadSafeFunction` pointer
|
||||
pub fn raw(&self) -> sys::napi_threadsafe_function {
|
||||
self.raw_tsfn
|
||||
self.handle.raw
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -290,65 +327,149 @@ impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::CalleeHandled> {
|
|||
/// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function)
|
||||
/// for more information.
|
||||
pub fn call(&self, value: Result<T>, mode: ThreadsafeFunctionCallMode) -> Status {
|
||||
let is_aborted = self.aborted.lock().unwrap();
|
||||
if *is_aborted {
|
||||
return Status::Closing;
|
||||
}
|
||||
unsafe {
|
||||
sys::napi_call_threadsafe_function(
|
||||
self.raw_tsfn,
|
||||
Box::into_raw(Box::new(value)) as *mut c_void,
|
||||
self.handle.raw,
|
||||
Box::into_raw(Box::new(value.map(|data| {
|
||||
ThreadsafeFunctionCallJsBackData {
|
||||
data,
|
||||
call_variant: ThreadsafeFunctionCallVariant::Direct,
|
||||
callback: Box::new(|_d: JsUnknown| Ok(())),
|
||||
}
|
||||
})))
|
||||
.cast(),
|
||||
mode.into(),
|
||||
)
|
||||
}
|
||||
.into()
|
||||
}
|
||||
|
||||
pub fn call_with_return_value<D: FromNapiValue, F: 'static + FnOnce(D) -> Result<()>>(
|
||||
&self,
|
||||
value: Result<T>,
|
||||
mode: ThreadsafeFunctionCallMode,
|
||||
cb: F,
|
||||
) -> Status {
|
||||
unsafe {
|
||||
sys::napi_call_threadsafe_function(
|
||||
self.handle.raw,
|
||||
Box::into_raw(Box::new(value.map(|data| {
|
||||
ThreadsafeFunctionCallJsBackData {
|
||||
data,
|
||||
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
||||
callback: Box::new(move |d: JsUnknown| {
|
||||
D::from_napi_value(d.0.env, d.0.value).and_then(cb)
|
||||
}),
|
||||
}
|
||||
})))
|
||||
.cast(),
|
||||
mode.into(),
|
||||
)
|
||||
}
|
||||
.into()
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio_rt")]
|
||||
pub async fn call_async<D: 'static + FromNapiValue>(&self, value: Result<T>) -> Result<D> {
|
||||
let (sender, receiver) = tokio::sync::oneshot::channel::<D>();
|
||||
check_status!(unsafe {
|
||||
sys::napi_call_threadsafe_function(
|
||||
self.handle.raw,
|
||||
Box::into_raw(Box::new(value.map(|data| {
|
||||
ThreadsafeFunctionCallJsBackData {
|
||||
data,
|
||||
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
||||
callback: Box::new(move |d: JsUnknown| {
|
||||
D::from_napi_value(d.0.env, d.0.value).and_then(move |d| {
|
||||
sender.send(d).map_err(|_| {
|
||||
crate::Error::from_reason("Failed to send return value to tokio sender")
|
||||
})
|
||||
})
|
||||
}),
|
||||
}
|
||||
})))
|
||||
.cast(),
|
||||
ThreadsafeFunctionCallMode::NonBlocking.into(),
|
||||
)
|
||||
})?;
|
||||
receiver
|
||||
.await
|
||||
.map_err(|err| crate::Error::new(Status::GenericFailure, format!("{}", err)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::Fatal> {
|
||||
/// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function)
|
||||
/// for more information.
|
||||
pub fn call(&self, value: T, mode: ThreadsafeFunctionCallMode) -> Status {
|
||||
let is_aborted = self.aborted.lock().unwrap();
|
||||
if *is_aborted {
|
||||
return Status::Closing;
|
||||
}
|
||||
unsafe {
|
||||
sys::napi_call_threadsafe_function(
|
||||
self.raw_tsfn,
|
||||
Box::into_raw(Box::new(value)) as *mut c_void,
|
||||
self.handle.raw,
|
||||
Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
|
||||
data: value,
|
||||
call_variant: ThreadsafeFunctionCallVariant::Direct,
|
||||
callback: Box::new(|_d: JsUnknown| Ok(())),
|
||||
}))
|
||||
.cast(),
|
||||
mode.into(),
|
||||
)
|
||||
}
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static, ES: ErrorStrategy::T> Drop for ThreadsafeFunction<T, ES> {
|
||||
fn drop(&mut self) {
|
||||
let mut is_aborted = self.aborted.lock().unwrap();
|
||||
if !*is_aborted && self.ref_count.load(Ordering::Acquire) <= 1 {
|
||||
let release_status = unsafe {
|
||||
sys::napi_release_threadsafe_function(
|
||||
self.raw_tsfn,
|
||||
sys::ThreadsafeFunctionReleaseMode::release,
|
||||
)
|
||||
};
|
||||
assert!(
|
||||
release_status == sys::Status::napi_ok,
|
||||
"Threadsafe Function release failed {:?}",
|
||||
Status::from(release_status)
|
||||
);
|
||||
*is_aborted = true;
|
||||
} else {
|
||||
self.ref_count.fetch_sub(1, Ordering::Release);
|
||||
pub fn call_with_return_value<D: FromNapiValue, F: 'static + FnOnce(D) -> Result<()>>(
|
||||
&self,
|
||||
value: T,
|
||||
mode: ThreadsafeFunctionCallMode,
|
||||
cb: F,
|
||||
) -> Status {
|
||||
unsafe {
|
||||
sys::napi_call_threadsafe_function(
|
||||
self.handle.raw,
|
||||
Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
|
||||
data: value,
|
||||
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
||||
callback: Box::new(move |d: JsUnknown| {
|
||||
D::from_napi_value(d.0.env, d.0.value).and_then(cb)
|
||||
}),
|
||||
}))
|
||||
.cast(),
|
||||
mode.into(),
|
||||
)
|
||||
}
|
||||
drop(is_aborted);
|
||||
.into()
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio_rt")]
|
||||
pub async fn call_async<D: 'static + FromNapiValue>(&self, value: T) -> Result<D> {
|
||||
let (sender, receiver) = tokio::sync::oneshot::channel::<D>();
|
||||
check_status!(unsafe {
|
||||
sys::napi_call_threadsafe_function(
|
||||
self.handle.raw,
|
||||
Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
|
||||
data: value,
|
||||
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
||||
callback: Box::new(move |d: JsUnknown| {
|
||||
D::from_napi_value(d.0.env, d.0.value).and_then(move |d| {
|
||||
sender.send(d).map_err(|_| {
|
||||
crate::Error::from_reason("Failed to send return value to tokio sender")
|
||||
})
|
||||
})
|
||||
}),
|
||||
}))
|
||||
.cast(),
|
||||
ThreadsafeFunctionCallMode::NonBlocking.into(),
|
||||
)
|
||||
})?;
|
||||
receiver
|
||||
.await
|
||||
.map_err(|err| crate::Error::new(Status::GenericFailure, format!("{}", err)))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
unsafe extern "C" fn thread_finalize_cb<T: 'static, V: ToNapiValue, R>(
|
||||
_raw_env: sys::napi_env,
|
||||
env: sys::napi_env,
|
||||
finalize_data: *mut c_void,
|
||||
finalize_hint: *mut c_void,
|
||||
) where
|
||||
|
@ -356,9 +477,6 @@ unsafe extern "C" fn thread_finalize_cb<T: 'static, V: ToNapiValue, R>(
|
|||
{
|
||||
// cleanup
|
||||
drop(unsafe { Box::<R>::from_raw(finalize_hint.cast()) });
|
||||
let aborted = unsafe { Arc::<Mutex<bool>>::from_raw(finalize_data.cast()) };
|
||||
let mut is_aborted = aborted.lock().unwrap();
|
||||
*is_aborted = true;
|
||||
}
|
||||
|
||||
unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
|
||||
|
@ -375,11 +493,15 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
|
|||
return;
|
||||
}
|
||||
|
||||
let ctx: &mut R = unsafe { &mut *context.cast::<R>() };
|
||||
let val: Result<T> = unsafe {
|
||||
let ctx: &mut R = unsafe { Box::leak(Box::from_raw(context.cast())) };
|
||||
let val = unsafe {
|
||||
match ES::VALUE {
|
||||
ErrorStrategy::CalleeHandled::VALUE => *Box::<Result<T>>::from_raw(data.cast()),
|
||||
ErrorStrategy::Fatal::VALUE => Ok(*Box::<T>::from_raw(data.cast())),
|
||||
ErrorStrategy::CalleeHandled::VALUE => {
|
||||
*Box::<Result<ThreadsafeFunctionCallJsBackData<T>>>::from_raw(data.cast())
|
||||
}
|
||||
ErrorStrategy::Fatal::VALUE => Ok(*Box::<ThreadsafeFunctionCallJsBackData<T>>::from_raw(
|
||||
data.cast(),
|
||||
)),
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -389,15 +511,16 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
|
|||
let ret = val.and_then(|v| {
|
||||
(ctx)(ThreadSafeCallContext {
|
||||
env: unsafe { Env::from_raw(raw_env) },
|
||||
value: v,
|
||||
value: v.data,
|
||||
})
|
||||
.map(|ret| (ret, v.call_variant, v.callback))
|
||||
});
|
||||
|
||||
// Follow async callback conventions: https://nodejs.org/en/knowledge/errors/what-are-the-error-conventions/
|
||||
// Check if the Result is okay, if so, pass a null as the first (error) argument automatically.
|
||||
// If the Result is an error, pass that as the first argument.
|
||||
let status = match ret {
|
||||
Ok(values) => {
|
||||
Ok((values, call_variant, callback)) => {
|
||||
let values = values
|
||||
.into_iter()
|
||||
.map(|v| unsafe { ToNapiValue::to_napi_value(raw_env, v) });
|
||||
|
@ -408,7 +531,8 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
|
|||
} else {
|
||||
values.collect()
|
||||
};
|
||||
match args {
|
||||
let mut return_value = ptr::null_mut();
|
||||
let status = match args {
|
||||
Ok(args) => unsafe {
|
||||
sys::napi_call_function(
|
||||
raw_env,
|
||||
|
@ -416,7 +540,7 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
|
|||
js_callback,
|
||||
args.len(),
|
||||
args.as_ptr(),
|
||||
ptr::null_mut(),
|
||||
&mut return_value,
|
||||
)
|
||||
},
|
||||
Err(e) => match ES::VALUE {
|
||||
|
@ -430,11 +554,21 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
|
|||
js_callback,
|
||||
1,
|
||||
[JsError::from(e).into_value(raw_env)].as_mut_ptr(),
|
||||
ptr::null_mut(),
|
||||
&mut return_value,
|
||||
)
|
||||
},
|
||||
},
|
||||
};
|
||||
if let ThreadsafeFunctionCallVariant::WithCallback = call_variant {
|
||||
if let Err(err) = callback(JsUnknown(crate::Value {
|
||||
env: raw_env,
|
||||
value: return_value,
|
||||
value_type: crate::ValueType::Unknown,
|
||||
})) {
|
||||
unsafe { sys::napi_throw(raw_env, JsError::from(err).into_value(raw_env)) };
|
||||
}
|
||||
}
|
||||
status
|
||||
}
|
||||
Err(e) if ES::VALUE == ErrorStrategy::Fatal::VALUE => unsafe {
|
||||
sys::napi_fatal_exception(raw_env, JsError::from(e).into_value(raw_env))
|
||||
|
|
|
@ -2,8 +2,16 @@ const bindings = require('../../index.node')
|
|||
|
||||
async function main() {
|
||||
await Promise.resolve()
|
||||
new bindings.A((s) => console.info(s))
|
||||
new bindings.A((s) => console.info(s))
|
||||
const a1 = new bindings.A((err, s) => {
|
||||
console.info(s)
|
||||
a1.unref()
|
||||
})
|
||||
const a2 = new bindings.A((err, s) => {
|
||||
console.info(s)
|
||||
a2.unref()
|
||||
})
|
||||
a1.call()
|
||||
a2.call()
|
||||
}
|
||||
|
||||
main().catch((e) => {
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
use napi::{Env, JsObject, Result};
|
||||
use napi::{Env, JsObject, Property, Result};
|
||||
|
||||
mod deferred;
|
||||
mod tsfn;
|
||||
mod tsfn_dua_instance;
|
||||
|
||||
use tsfn::*;
|
||||
use tsfn_dua_instance::constructor;
|
||||
use tsfn_dua_instance::*;
|
||||
|
||||
pub fn register_js(exports: &mut JsObject, env: &Env) -> Result<()> {
|
||||
exports.create_named_method("testThreadsafeFunction", test_threadsafe_function)?;
|
||||
|
@ -26,7 +26,14 @@ pub fn register_js(exports: &mut JsObject, env: &Env) -> Result<()> {
|
|||
exports.create_named_method("testTsfnWithRef", test_tsfn_with_ref)?;
|
||||
exports.create_named_method("testDeferred", deferred::test_deferred)?;
|
||||
|
||||
let obj = env.define_class("A", constructor, &[])?;
|
||||
let obj = env.define_class(
|
||||
"A",
|
||||
constructor,
|
||||
&[
|
||||
Property::new("call")?.with_method(call),
|
||||
Property::new("unref")?.with_method(unref),
|
||||
],
|
||||
)?;
|
||||
|
||||
exports.set_named_property("A", obj)?;
|
||||
Ok(())
|
||||
|
|
|
@ -95,7 +95,7 @@ pub fn test_call_aborted_threadsafe_function(ctx: CallContext) -> Result<JsUndef
|
|||
tsfn_clone.abort()?;
|
||||
|
||||
let call_status = tsfn.call(Ok(1), ThreadsafeFunctionCallMode::NonBlocking);
|
||||
assert!(call_status == Status::Closing);
|
||||
assert!(call_status != Status::Ok);
|
||||
ctx.env.get_undefined()
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ pub struct A {
|
|||
pub fn constructor(ctx: CallContext) -> napi::Result<JsUndefined> {
|
||||
let callback = ctx.get::<JsFunction>(0)?;
|
||||
|
||||
let mut cb =
|
||||
let cb =
|
||||
ctx
|
||||
.env
|
||||
.create_threadsafe_function(&callback, 0, |ctx: ThreadSafeCallContext<String>| {
|
||||
|
@ -23,11 +23,28 @@ pub fn constructor(ctx: CallContext) -> napi::Result<JsUndefined> {
|
|||
.map(|js_string| vec![js_string])
|
||||
})?;
|
||||
|
||||
cb.unref(ctx.env)?;
|
||||
|
||||
let mut this: JsObject = ctx.this_unchecked();
|
||||
let obj = A { cb };
|
||||
|
||||
ctx.env.wrap(&mut this, obj)?;
|
||||
ctx.env.get_undefined()
|
||||
}
|
||||
|
||||
#[js_function]
|
||||
pub fn call(ctx: CallContext) -> napi::Result<JsUndefined> {
|
||||
let this = ctx.this_unchecked();
|
||||
let obj = ctx.env.unwrap::<A>(&this)?;
|
||||
obj.cb.call(
|
||||
Ok("ThreadsafeFunction NonBlocking Call".to_owned()),
|
||||
napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking,
|
||||
);
|
||||
ctx.env.get_undefined()
|
||||
}
|
||||
|
||||
#[js_function]
|
||||
pub fn unref(ctx: CallContext) -> napi::Result<JsUndefined> {
|
||||
let this = ctx.this_unchecked();
|
||||
let obj = ctx.env.unwrap::<A>(&this)?;
|
||||
obj.cb.unref(ctx.env)?;
|
||||
ctx.env.get_undefined()
|
||||
}
|
||||
|
|
|
@ -195,6 +195,8 @@ Generated by [AVA](https://avajs.dev).
|
|||
export function threadsafeFunctionFatalMode(cb: (...args: any[]) => any): void␊
|
||||
export function threadsafeFunctionFatalModeError(cb: (...args: any[]) => any): void␊
|
||||
export function threadsafeFunctionClosureCapture(func: (...args: any[]) => any): void␊
|
||||
export function tsfnCallWithCallback(func: (...args: any[]) => any): void␊
|
||||
export function tsfnAsyncCall(func: (...args: any[]) => any): Promise<void>␊
|
||||
export function getBuffer(): Buffer␊
|
||||
export function appendBuffer(buf: Buffer): Buffer␊
|
||||
export function getEmptyBuffer(): Buffer␊
|
||||
|
|
Binary file not shown.
|
@ -51,6 +51,8 @@ import {
|
|||
callThreadsafeFunction,
|
||||
threadsafeFunctionThrowError,
|
||||
threadsafeFunctionClosureCapture,
|
||||
tsfnCallWithCallback,
|
||||
tsfnAsyncCall,
|
||||
asyncPlus100,
|
||||
getGlobal,
|
||||
getUndefined,
|
||||
|
@ -759,6 +761,30 @@ Napi4Test('Promise should reject raw error in rust', async (t) => {
|
|||
t.is(err, fxError)
|
||||
})
|
||||
|
||||
Napi4Test('call ThreadsafeFunction with callback', async (t) => {
|
||||
await t.notThrowsAsync(
|
||||
() =>
|
||||
new Promise<void>((resolve) => {
|
||||
tsfnCallWithCallback(() => {
|
||||
resolve()
|
||||
return 'ReturnFromJavaScriptRawCallback'
|
||||
})
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
Napi4Test('async call ThreadsafeFunction', async (t) => {
|
||||
await t.notThrowsAsync(() =>
|
||||
tsfnAsyncCall((err, arg1, arg2, arg3) => {
|
||||
t.is(err, null)
|
||||
t.is(arg1, 0)
|
||||
t.is(arg2, 1)
|
||||
t.is(arg3, 2)
|
||||
return 'ReturnFromJavaScriptRawCallback'
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
const Napi5Test = Number(process.versions.napi) >= 5 ? test : test.skip
|
||||
|
||||
Napi5Test('Date test', (t) => {
|
||||
|
|
2
examples/napi/index.d.ts
vendored
2
examples/napi/index.d.ts
vendored
|
@ -185,6 +185,8 @@ export function threadsafeFunctionThrowError(cb: (...args: any[]) => any): void
|
|||
export function threadsafeFunctionFatalMode(cb: (...args: any[]) => any): void
|
||||
export function threadsafeFunctionFatalModeError(cb: (...args: any[]) => any): void
|
||||
export function threadsafeFunctionClosureCapture(func: (...args: any[]) => any): void
|
||||
export function tsfnCallWithCallback(func: (...args: any[]) => any): void
|
||||
export function tsfnAsyncCall(func: (...args: any[]) => any): Promise<void>
|
||||
export function getBuffer(): Buffer
|
||||
export function appendBuffer(buf: Buffer): Buffer
|
||||
export function getEmptyBuffer(): Buffer
|
||||
|
|
|
@ -76,3 +76,31 @@ fn threadsafe_function_closure_capture(func: JsFunction) -> napi::Result<()> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn tsfn_call_with_callback(func: JsFunction) -> napi::Result<()> {
|
||||
let tsfn: ThreadsafeFunction<()> =
|
||||
func.create_threadsafe_function(0, move |_| Ok(Vec::<JsString>::new()))?;
|
||||
tsfn.call_with_return_value(
|
||||
Ok(()),
|
||||
ThreadsafeFunctionCallMode::NonBlocking,
|
||||
|value: String| {
|
||||
println!("{}", value);
|
||||
assert_eq!(value, "ReturnFromJavaScriptRawCallback".to_owned());
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[napi(ts_return_type = "Promise<void>")]
|
||||
pub fn tsfn_async_call(env: Env, func: JsFunction) -> napi::Result<Object> {
|
||||
let tsfn: ThreadsafeFunction<()> =
|
||||
func.create_threadsafe_function(0, move |_| Ok(vec![0u32, 1u32, 2u32]))?;
|
||||
|
||||
env.spawn_future(async move {
|
||||
let msg: String = tsfn.call_async(Ok(())).await?;
|
||||
assert_eq!(msg, "ReturnFromJavaScriptRawCallback".to_owned());
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue