Merge pull request #1556 from napi-rs/fix-tsfn-callback-throw
fix(napi): re-throw error in ThreadsafeFunction callback if we could
This commit is contained in:
commit
5624c82926
11 changed files with 137 additions and 116 deletions
1
.github/workflows/linux-armv7.yaml
vendored
1
.github/workflows/linux-armv7.yaml
vendored
|
@ -2,6 +2,7 @@ name: Linux-armv7
|
||||||
|
|
||||||
env:
|
env:
|
||||||
DEBUG: 'napi:*'
|
DEBUG: 'napi:*'
|
||||||
|
RUST_BACKTRACE: 1
|
||||||
|
|
||||||
concurrency:
|
concurrency:
|
||||||
group: ${{ github.workflow }}-${{ github.ref }}
|
group: ${{ github.workflow }}-${{ github.ref }}
|
||||||
|
|
|
@ -50,12 +50,7 @@ pub fn create_iterator<T: Generator>(
|
||||||
check_status_or_throw!(
|
check_status_or_throw!(
|
||||||
env,
|
env,
|
||||||
unsafe {
|
unsafe {
|
||||||
sys::napi_get_named_property(
|
sys::napi_get_named_property(env, global, "Symbol\0".as_ptr().cast(), &mut symbol_object)
|
||||||
env,
|
|
||||||
global,
|
|
||||||
"Symbol\0".as_ptr() as *const c_char,
|
|
||||||
&mut symbol_object,
|
|
||||||
)
|
|
||||||
},
|
},
|
||||||
"Get global object failed",
|
"Get global object failed",
|
||||||
);
|
);
|
||||||
|
@ -66,7 +61,7 @@ pub fn create_iterator<T: Generator>(
|
||||||
sys::napi_get_named_property(
|
sys::napi_get_named_property(
|
||||||
env,
|
env,
|
||||||
symbol_object,
|
symbol_object,
|
||||||
"iterator\0".as_ptr() as *const c_char,
|
"iterator\0".as_ptr().cast(),
|
||||||
&mut iterator_symbol,
|
&mut iterator_symbol,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -78,7 +73,7 @@ pub fn create_iterator<T: Generator>(
|
||||||
unsafe {
|
unsafe {
|
||||||
sys::napi_create_function(
|
sys::napi_create_function(
|
||||||
env,
|
env,
|
||||||
"Iterator\0".as_ptr() as *const c_char,
|
"Iterator\0".as_ptr().cast(),
|
||||||
8,
|
8,
|
||||||
Some(symbol_generator::<T>),
|
Some(symbol_generator::<T>),
|
||||||
generator_ptr as *mut c_void,
|
generator_ptr as *mut c_void,
|
||||||
|
@ -129,7 +124,7 @@ pub unsafe extern "C" fn symbol_generator<T: Generator>(
|
||||||
unsafe {
|
unsafe {
|
||||||
sys::napi_create_function(
|
sys::napi_create_function(
|
||||||
env,
|
env,
|
||||||
"next\0".as_ptr() as *const c_char,
|
"next\0".as_ptr().cast(),
|
||||||
4,
|
4,
|
||||||
Some(generator_next::<T>),
|
Some(generator_next::<T>),
|
||||||
generator_ptr,
|
generator_ptr,
|
||||||
|
@ -144,7 +139,7 @@ pub unsafe extern "C" fn symbol_generator<T: Generator>(
|
||||||
unsafe {
|
unsafe {
|
||||||
sys::napi_create_function(
|
sys::napi_create_function(
|
||||||
env,
|
env,
|
||||||
"return\0".as_ptr() as *const c_char,
|
"return\0".as_ptr().cast(),
|
||||||
6,
|
6,
|
||||||
Some(generator_return::<T>),
|
Some(generator_return::<T>),
|
||||||
generator_ptr,
|
generator_ptr,
|
||||||
|
@ -159,7 +154,7 @@ pub unsafe extern "C" fn symbol_generator<T: Generator>(
|
||||||
unsafe {
|
unsafe {
|
||||||
sys::napi_create_function(
|
sys::napi_create_function(
|
||||||
env,
|
env,
|
||||||
"throw\0".as_ptr() as *const c_char,
|
"throw\0".as_ptr().cast(),
|
||||||
5,
|
5,
|
||||||
Some(generator_throw::<T>),
|
Some(generator_throw::<T>),
|
||||||
generator_ptr,
|
generator_ptr,
|
||||||
|
@ -175,7 +170,7 @@ pub unsafe extern "C" fn symbol_generator<T: Generator>(
|
||||||
sys::napi_set_named_property(
|
sys::napi_set_named_property(
|
||||||
env,
|
env,
|
||||||
generator_object,
|
generator_object,
|
||||||
"next\0".as_ptr() as *const c_char,
|
"next\0".as_ptr().cast(),
|
||||||
next_function,
|
next_function,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -188,7 +183,7 @@ pub unsafe extern "C" fn symbol_generator<T: Generator>(
|
||||||
sys::napi_set_named_property(
|
sys::napi_set_named_property(
|
||||||
env,
|
env,
|
||||||
generator_object,
|
generator_object,
|
||||||
"return\0".as_ptr() as *const c_char,
|
"return\0".as_ptr().cast(),
|
||||||
return_function,
|
return_function,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -201,7 +196,7 @@ pub unsafe extern "C" fn symbol_generator<T: Generator>(
|
||||||
sys::napi_set_named_property(
|
sys::napi_set_named_property(
|
||||||
env,
|
env,
|
||||||
generator_object,
|
generator_object,
|
||||||
"throw\0".as_ptr() as *const c_char,
|
"throw\0".as_ptr().cast(),
|
||||||
throw_function,
|
throw_function,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -216,7 +211,7 @@ pub unsafe extern "C" fn symbol_generator<T: Generator>(
|
||||||
);
|
);
|
||||||
|
|
||||||
let properties = vec![sys::napi_property_descriptor {
|
let properties = vec![sys::napi_property_descriptor {
|
||||||
utf8name: GENERATOR_STATE_KEY.as_ptr() as *const c_char,
|
utf8name: GENERATOR_STATE_KEY.as_ptr().cast(),
|
||||||
name: ptr::null_mut(),
|
name: ptr::null_mut(),
|
||||||
method: None,
|
method: None,
|
||||||
getter: None,
|
getter: None,
|
||||||
|
@ -264,7 +259,7 @@ extern "C" fn generator_next<T: Generator>(
|
||||||
sys::napi_get_named_property(
|
sys::napi_get_named_property(
|
||||||
env,
|
env,
|
||||||
this,
|
this,
|
||||||
GENERATOR_STATE_KEY.as_ptr() as *const c_char,
|
GENERATOR_STATE_KEY.as_ptr().cast(),
|
||||||
&mut generator_state,
|
&mut generator_state,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -385,7 +380,7 @@ extern "C" fn generator_return<T: Generator>(
|
||||||
sys::napi_set_named_property(
|
sys::napi_set_named_property(
|
||||||
env,
|
env,
|
||||||
this,
|
this,
|
||||||
GENERATOR_STATE_KEY.as_ptr() as *const c_char,
|
GENERATOR_STATE_KEY.as_ptr().cast(),
|
||||||
generator_state,
|
generator_state,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -498,7 +493,7 @@ extern "C" fn generator_throw<T: Generator>(
|
||||||
sys::napi_set_named_property(
|
sys::napi_set_named_property(
|
||||||
env,
|
env,
|
||||||
this,
|
this,
|
||||||
GENERATOR_STATE_KEY.as_ptr() as *const c_char,
|
GENERATOR_STATE_KEY.as_ptr().cast(),
|
||||||
generator_state,
|
generator_state,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -530,7 +525,7 @@ extern "C" fn generator_throw<T: Generator>(
|
||||||
sys::napi_set_named_property(
|
sys::napi_set_named_property(
|
||||||
env,
|
env,
|
||||||
this,
|
this,
|
||||||
GENERATOR_STATE_KEY.as_ptr() as *const c_char,
|
GENERATOR_STATE_KEY.as_ptr().cast(),
|
||||||
generator_state,
|
generator_state,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -538,14 +533,7 @@ extern "C" fn generator_throw<T: Generator>(
|
||||||
);
|
);
|
||||||
check_status_or_throw!(
|
check_status_or_throw!(
|
||||||
env,
|
env,
|
||||||
unsafe {
|
unsafe { sys::napi_set_named_property(env, result, "done\0".as_ptr().cast(), generator_state) },
|
||||||
sys::napi_set_named_property(
|
|
||||||
env,
|
|
||||||
result,
|
|
||||||
"done\0".as_ptr() as *const c_char,
|
|
||||||
generator_state,
|
|
||||||
)
|
|
||||||
},
|
|
||||||
"Get generator state failed"
|
"Get generator state failed"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -1,33 +1,30 @@
|
||||||
use std::ffi::{c_void, CStr};
|
use std::ffi::CStr;
|
||||||
use std::future;
|
use std::future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::ptr;
|
use std::ptr;
|
||||||
use std::sync::{
|
use std::sync::{
|
||||||
atomic::{AtomicBool, AtomicPtr, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
Arc,
|
Arc,
|
||||||
};
|
};
|
||||||
use std::task::{Context, Poll, Waker};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use crate::{check_status, sys, Error, JsUnknown, NapiValue, Result};
|
use tokio::sync::oneshot::{channel, Receiver, Sender};
|
||||||
|
|
||||||
|
use crate::{check_status, sys, Error, JsUnknown, NapiValue, Result, Status};
|
||||||
|
|
||||||
use super::{FromNapiValue, TypeName, ValidateNapiValue};
|
use super::{FromNapiValue, TypeName, ValidateNapiValue};
|
||||||
|
|
||||||
struct PromiseInner<T: FromNapiValue> {
|
pub struct Promise<T: FromNapiValue> {
|
||||||
value: AtomicPtr<Result<T>>,
|
value: Pin<Box<Receiver<*mut Result<T>>>>,
|
||||||
waker: AtomicPtr<Waker>,
|
aborted: Arc<AtomicBool>,
|
||||||
aborted: AtomicBool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: FromNapiValue> Drop for PromiseInner<T> {
|
impl<T: FromNapiValue> Drop for Promise<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.aborted.store(true, Ordering::SeqCst);
|
self.aborted.store(true, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Promise<T: FromNapiValue> {
|
|
||||||
inner: Arc<PromiseInner<T>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: FromNapiValue> TypeName for Promise<T> {
|
impl<T: FromNapiValue> TypeName for Promise<T> {
|
||||||
fn type_name() -> &'static str {
|
fn type_name() -> &'static str {
|
||||||
"Promise"
|
"Promise"
|
||||||
|
@ -103,13 +100,9 @@ impl<T: FromNapiValue> FromNapiValue for Promise<T> {
|
||||||
)?;
|
)?;
|
||||||
let mut promise_after_then = ptr::null_mut();
|
let mut promise_after_then = ptr::null_mut();
|
||||||
let mut then_js_cb = ptr::null_mut();
|
let mut then_js_cb = ptr::null_mut();
|
||||||
let promise_inner = PromiseInner {
|
let (tx, rx) = channel();
|
||||||
value: AtomicPtr::new(ptr::null_mut()),
|
let aborted = Arc::new(AtomicBool::new(false));
|
||||||
waker: AtomicPtr::new(ptr::null_mut()),
|
let tx_ptr = Box::into_raw(Box::new((tx, aborted.clone())));
|
||||||
aborted: AtomicBool::new(false),
|
|
||||||
};
|
|
||||||
let shared_inner = Arc::new(promise_inner);
|
|
||||||
let context_ptr = Arc::into_raw(shared_inner.clone());
|
|
||||||
check_status!(
|
check_status!(
|
||||||
unsafe {
|
unsafe {
|
||||||
sys::napi_create_function(
|
sys::napi_create_function(
|
||||||
|
@ -117,7 +110,7 @@ impl<T: FromNapiValue> FromNapiValue for Promise<T> {
|
||||||
then_c_string.as_ptr(),
|
then_c_string.as_ptr(),
|
||||||
4,
|
4,
|
||||||
Some(then_callback::<T>),
|
Some(then_callback::<T>),
|
||||||
context_ptr as *mut c_void,
|
tx_ptr.cast(),
|
||||||
&mut then_js_cb,
|
&mut then_js_cb,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -152,7 +145,7 @@ impl<T: FromNapiValue> FromNapiValue for Promise<T> {
|
||||||
catch_c_string.as_ptr(),
|
catch_c_string.as_ptr(),
|
||||||
5,
|
5,
|
||||||
Some(catch_callback::<T>),
|
Some(catch_callback::<T>),
|
||||||
context_ptr as *mut c_void,
|
tx_ptr.cast(),
|
||||||
&mut catch_js_cb,
|
&mut catch_js_cb,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -172,7 +165,8 @@ impl<T: FromNapiValue> FromNapiValue for Promise<T> {
|
||||||
"Failed to call catch method"
|
"Failed to call catch method"
|
||||||
)?;
|
)?;
|
||||||
Ok(Promise {
|
Ok(Promise {
|
||||||
inner: shared_inner,
|
value: Box::pin(rx),
|
||||||
|
aborted,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -180,19 +174,13 @@ impl<T: FromNapiValue> FromNapiValue for Promise<T> {
|
||||||
impl<T: FromNapiValue> future::Future for Promise<T> {
|
impl<T: FromNapiValue> future::Future for Promise<T> {
|
||||||
type Output = Result<T>;
|
type Output = Result<T>;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
if self.inner.value.load(Ordering::Relaxed).is_null() {
|
match self.value.as_mut().poll(cx) {
|
||||||
if self.inner.waker.load(Ordering::Acquire).is_null() {
|
Poll::Pending => Poll::Pending,
|
||||||
self.inner.waker.store(
|
Poll::Ready(v) => Poll::Ready(
|
||||||
Box::into_raw(Box::new(cx.waker().clone())),
|
v.map_err(|e| Error::new(Status::GenericFailure, format!("{}", e)))
|
||||||
Ordering::Release,
|
.and_then(|v| unsafe { *Box::from_raw(v) }.map_err(Error::from)),
|
||||||
);
|
),
|
||||||
}
|
|
||||||
Poll::Pending
|
|
||||||
} else {
|
|
||||||
Poll::Ready(
|
|
||||||
unsafe { Box::from_raw(self.inner.value.load(Ordering::Relaxed)) }.map_err(Error::from),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -218,20 +206,15 @@ unsafe extern "C" fn then_callback<T: FromNapiValue>(
|
||||||
get_cb_status == sys::Status::napi_ok,
|
get_cb_status == sys::Status::napi_ok,
|
||||||
"Get callback info from Promise::then failed"
|
"Get callback info from Promise::then failed"
|
||||||
);
|
);
|
||||||
let PromiseInner {
|
let (sender, aborted) =
|
||||||
value,
|
*unsafe { Box::from_raw(data as *mut (Sender<*mut Result<T>>, Arc<AtomicBool>)) };
|
||||||
waker,
|
|
||||||
aborted,
|
|
||||||
} = &*unsafe { Arc::from_raw(data as *mut PromiseInner<T>) };
|
|
||||||
if aborted.load(Ordering::SeqCst) {
|
if aborted.load(Ordering::SeqCst) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
let resolve_value_t = Box::new(unsafe { T::from_napi_value(env, resolved_value[0]) });
|
let resolve_value_t = Box::new(unsafe { T::from_napi_value(env, resolved_value[0]) });
|
||||||
value.store(Box::into_raw(resolve_value_t), Ordering::Relaxed);
|
sender
|
||||||
let waker = waker.load(Ordering::Acquire);
|
.send(Box::into_raw(resolve_value_t))
|
||||||
if !waker.is_null() {
|
.expect("Send Promise resolved value error");
|
||||||
unsafe { Box::from_raw(waker) }.wake();
|
|
||||||
}
|
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -258,23 +241,15 @@ unsafe extern "C" fn catch_callback<T: FromNapiValue>(
|
||||||
"Get callback info from Promise::catch failed"
|
"Get callback info from Promise::catch failed"
|
||||||
);
|
);
|
||||||
let rejected_value = rejected_value[0];
|
let rejected_value = rejected_value[0];
|
||||||
let PromiseInner {
|
let (sender, aborted) =
|
||||||
value,
|
*unsafe { Box::from_raw(data as *mut (Sender<*mut Result<T>>, Arc<AtomicBool>)) };
|
||||||
waker,
|
|
||||||
aborted,
|
|
||||||
} = &*unsafe { Arc::from_raw(data as *mut PromiseInner<T>) };
|
|
||||||
if aborted.load(Ordering::SeqCst) {
|
if aborted.load(Ordering::SeqCst) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
value.store(
|
sender
|
||||||
Box::into_raw(Box::new(Err(Error::from(unsafe {
|
.send(Box::into_raw(Box::new(Err(Error::from(unsafe {
|
||||||
JsUnknown::from_raw_unchecked(env, rejected_value)
|
JsUnknown::from_raw_unchecked(env, rejected_value)
|
||||||
})))),
|
})))))
|
||||||
Ordering::Relaxed,
|
.expect("Send Promise resolved value error");
|
||||||
);
|
|
||||||
let waker = waker.load(Ordering::Acquire);
|
|
||||||
if !waker.is_null() {
|
|
||||||
unsafe { Box::from_raw(waker) }.wake();
|
|
||||||
}
|
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,10 @@ impl From<JsUnknown> for Error {
|
||||||
let mut result = std::ptr::null_mut();
|
let mut result = std::ptr::null_mut();
|
||||||
let status = unsafe { sys::napi_create_reference(value.0.env, value.0.value, 0, &mut result) };
|
let status = unsafe { sys::napi_create_reference(value.0.env, value.0.value, 0, &mut result) };
|
||||||
if status != sys::Status::napi_ok {
|
if status != sys::Status::napi_ok {
|
||||||
return Error::new(Status::from(status), "".to_owned());
|
return Error::new(
|
||||||
|
Status::from(status),
|
||||||
|
"Create Error reference failed".to_owned(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Self {
|
Self {
|
||||||
status: Status::GenericFailure,
|
status: Status::GenericFailure,
|
||||||
|
|
|
@ -115,12 +115,20 @@ extern "C" fn napi_resolve_deferred<Data: ToNapiValue, Resolver: FnOnce(Env) ->
|
||||||
match result {
|
match result {
|
||||||
Ok(res) => {
|
Ok(res) => {
|
||||||
let status = unsafe { sys::napi_resolve_deferred(env, deferred, res) };
|
let status = unsafe { sys::napi_resolve_deferred(env, deferred, res) };
|
||||||
debug_assert!(status == sys::Status::napi_ok, "Resolve promise failed");
|
debug_assert!(
|
||||||
|
status == sys::Status::napi_ok,
|
||||||
|
"Resolve promise failed {:?}",
|
||||||
|
crate::Status::from(status)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let status =
|
let status =
|
||||||
unsafe { sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)) };
|
unsafe { sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)) };
|
||||||
debug_assert!(status == sys::Status::napi_ok, "Reject promise failed");
|
debug_assert!(
|
||||||
|
status == sys::Status::napi_ok,
|
||||||
|
"Reject promise failed {:?}",
|
||||||
|
crate::Status::from(status)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -178,7 +178,7 @@ enum ThreadsafeFunctionCallVariant {
|
||||||
struct ThreadsafeFunctionCallJsBackData<T> {
|
struct ThreadsafeFunctionCallJsBackData<T> {
|
||||||
data: T,
|
data: T,
|
||||||
call_variant: ThreadsafeFunctionCallVariant,
|
call_variant: ThreadsafeFunctionCallVariant,
|
||||||
callback: Box<dyn FnOnce(JsUnknown) -> Result<()>>,
|
callback: Box<dyn FnOnce(Result<JsUnknown>) -> Result<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Communicate with the addon's main thread by invoking a JavaScript function from other threads.
|
/// Communicate with the addon's main thread by invoking a JavaScript function from other threads.
|
||||||
|
@ -434,7 +434,7 @@ impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::CalleeHandled> {
|
||||||
ThreadsafeFunctionCallJsBackData {
|
ThreadsafeFunctionCallJsBackData {
|
||||||
data,
|
data,
|
||||||
call_variant: ThreadsafeFunctionCallVariant::Direct,
|
call_variant: ThreadsafeFunctionCallVariant::Direct,
|
||||||
callback: Box::new(|_d: JsUnknown| Ok(())),
|
callback: Box::new(|_d: Result<JsUnknown>| Ok(())),
|
||||||
}
|
}
|
||||||
})))
|
})))
|
||||||
.cast(),
|
.cast(),
|
||||||
|
@ -463,8 +463,8 @@ impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::CalleeHandled> {
|
||||||
ThreadsafeFunctionCallJsBackData {
|
ThreadsafeFunctionCallJsBackData {
|
||||||
data,
|
data,
|
||||||
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
||||||
callback: Box::new(move |d: JsUnknown| {
|
callback: Box::new(move |d: Result<JsUnknown>| {
|
||||||
D::from_napi_value(d.0.env, d.0.value).and_then(cb)
|
d.and_then(|d| D::from_napi_value(d.0.env, d.0.value).and_then(cb))
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
})))
|
})))
|
||||||
|
@ -478,7 +478,7 @@ impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::CalleeHandled> {
|
||||||
|
|
||||||
#[cfg(feature = "tokio_rt")]
|
#[cfg(feature = "tokio_rt")]
|
||||||
pub async fn call_async<D: 'static + FromNapiValue>(&self, value: Result<T>) -> Result<D> {
|
pub async fn call_async<D: 'static + FromNapiValue>(&self, value: Result<T>) -> Result<D> {
|
||||||
let (sender, receiver) = tokio::sync::oneshot::channel::<D>();
|
let (sender, receiver) = tokio::sync::oneshot::channel::<Result<D>>();
|
||||||
|
|
||||||
self.handle.with_read_aborted(|aborted| {
|
self.handle.with_read_aborted(|aborted| {
|
||||||
if aborted {
|
if aborted {
|
||||||
|
@ -492,12 +492,12 @@ impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::CalleeHandled> {
|
||||||
ThreadsafeFunctionCallJsBackData {
|
ThreadsafeFunctionCallJsBackData {
|
||||||
data,
|
data,
|
||||||
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
||||||
callback: Box::new(move |d: JsUnknown| {
|
callback: Box::new(move |d: Result<JsUnknown>| {
|
||||||
D::from_napi_value(d.0.env, d.0.value).and_then(move |d| {
|
sender
|
||||||
sender.send(d).map_err(|_| {
|
.send(d.and_then(|d| D::from_napi_value(d.0.env, d.0.value)))
|
||||||
|
.map_err(|_| {
|
||||||
crate::Error::from_reason("Failed to send return value to tokio sender")
|
crate::Error::from_reason("Failed to send return value to tokio sender")
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
})))
|
})))
|
||||||
|
@ -508,7 +508,13 @@ impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::CalleeHandled> {
|
||||||
})?;
|
})?;
|
||||||
receiver
|
receiver
|
||||||
.await
|
.await
|
||||||
.map_err(|err| crate::Error::new(Status::GenericFailure, format!("{}", err)))
|
.map_err(|_| {
|
||||||
|
crate::Error::new(
|
||||||
|
Status::GenericFailure,
|
||||||
|
"Receive value from threadsafe function sender failed",
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.and_then(|ret| ret)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -527,7 +533,7 @@ impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::Fatal> {
|
||||||
Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
|
Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
|
||||||
data: value,
|
data: value,
|
||||||
call_variant: ThreadsafeFunctionCallVariant::Direct,
|
call_variant: ThreadsafeFunctionCallVariant::Direct,
|
||||||
callback: Box::new(|_d: JsUnknown| Ok(())),
|
callback: Box::new(|_d: Result<JsUnknown>| Ok(())),
|
||||||
}))
|
}))
|
||||||
.cast(),
|
.cast(),
|
||||||
mode.into(),
|
mode.into(),
|
||||||
|
@ -554,8 +560,8 @@ impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::Fatal> {
|
||||||
Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
|
Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
|
||||||
data: value,
|
data: value,
|
||||||
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
||||||
callback: Box::new(move |d: JsUnknown| {
|
callback: Box::new(move |d: Result<JsUnknown>| {
|
||||||
D::from_napi_value(d.0.env, d.0.value).and_then(cb)
|
d.and_then(|d| D::from_napi_value(d.0.env, d.0.value).and_then(cb))
|
||||||
}),
|
}),
|
||||||
}))
|
}))
|
||||||
.cast(),
|
.cast(),
|
||||||
|
@ -581,10 +587,12 @@ impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::Fatal> {
|
||||||
Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
|
Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
|
||||||
data: value,
|
data: value,
|
||||||
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
call_variant: ThreadsafeFunctionCallVariant::WithCallback,
|
||||||
callback: Box::new(move |d: JsUnknown| {
|
callback: Box::new(move |d: Result<JsUnknown>| {
|
||||||
D::from_napi_value(d.0.env, d.0.value).and_then(move |d| {
|
d.and_then(|d| {
|
||||||
sender.send(d).map_err(|_| {
|
D::from_napi_value(d.0.env, d.0.value).and_then(move |d| {
|
||||||
crate::Error::from_reason("Failed to send return value to tokio sender")
|
sender.send(d).map_err(|_| {
|
||||||
|
crate::Error::from_reason("Failed to send return value to tokio sender")
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}),
|
}),
|
||||||
|
@ -677,7 +685,7 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
|
||||||
values.collect()
|
values.collect()
|
||||||
};
|
};
|
||||||
let mut return_value = ptr::null_mut();
|
let mut return_value = ptr::null_mut();
|
||||||
let status = match args {
|
let mut status = match args {
|
||||||
Ok(args) => unsafe {
|
Ok(args) => unsafe {
|
||||||
sys::napi_call_function(
|
sys::napi_call_function(
|
||||||
raw_env,
|
raw_env,
|
||||||
|
@ -705,11 +713,26 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
if let ThreadsafeFunctionCallVariant::WithCallback = call_variant {
|
if let ThreadsafeFunctionCallVariant::WithCallback = call_variant {
|
||||||
if let Err(err) = callback(JsUnknown(crate::Value {
|
// throw Error in JavaScript callback
|
||||||
env: raw_env,
|
let callback_arg = if status == sys::Status::napi_pending_exception {
|
||||||
value: return_value,
|
let mut exception = ptr::null_mut();
|
||||||
value_type: crate::ValueType::Unknown,
|
status = unsafe { sys::napi_get_and_clear_last_exception(raw_env, &mut exception) };
|
||||||
})) {
|
Err(
|
||||||
|
JsUnknown(crate::Value {
|
||||||
|
env: raw_env,
|
||||||
|
value: exception,
|
||||||
|
value_type: crate::ValueType::Unknown,
|
||||||
|
})
|
||||||
|
.into(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
Ok(JsUnknown(crate::Value {
|
||||||
|
env: raw_env,
|
||||||
|
value: return_value,
|
||||||
|
value_type: crate::ValueType::Unknown,
|
||||||
|
}))
|
||||||
|
};
|
||||||
|
if let Err(err) = callback(callback_arg) {
|
||||||
let message = format!(
|
let message = format!(
|
||||||
"Failed to convert return value in ThreadsafeFunction callback into Rust value: {}",
|
"Failed to convert return value in ThreadsafeFunction callback into Rust value: {}",
|
||||||
err
|
err
|
||||||
|
@ -717,7 +740,7 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
|
||||||
let message_length = message.len();
|
let message_length = message.len();
|
||||||
unsafe {
|
unsafe {
|
||||||
sys::napi_fatal_error(
|
sys::napi_fatal_error(
|
||||||
"threadsafe_function.rs:642\0".as_ptr().cast(),
|
"threadsafe_function.rs:720\0".as_ptr().cast(),
|
||||||
26,
|
26,
|
||||||
CString::new(message).unwrap().into_raw(),
|
CString::new(message).unwrap().into_raw(),
|
||||||
message_length,
|
message_length,
|
||||||
|
@ -769,7 +792,7 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
|
||||||
},
|
},
|
||||||
sys::Status::napi_ok,
|
sys::Status::napi_ok,
|
||||||
);
|
);
|
||||||
let error_msg = "Call JavaScript callback failed in thread safe function";
|
let error_msg = "Call JavaScript callback failed in threadsafe function";
|
||||||
let mut error_msg_value = ptr::null_mut();
|
let mut error_msg_value = ptr::null_mut();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
unsafe {
|
unsafe {
|
||||||
|
|
|
@ -518,6 +518,8 @@ Generated by [AVA](https://avajs.dev).
|
||||||
␊
|
␊
|
||||||
export function tsfnReturnPromiseTimeout(func: (err: Error | null, value: number) => any): Promise<number>␊
|
export function tsfnReturnPromiseTimeout(func: (err: Error | null, value: number) => any): Promise<number>␊
|
||||||
␊
|
␊
|
||||||
|
export function tsfnThrowFromJs(tsfn: (err: Error | null, value: number) => any): Promise<number>␊
|
||||||
|
␊
|
||||||
export function tsRename(a: { foo: number }): string[]␊
|
export function tsRename(a: { foo: number }): string[]␊
|
||||||
␊
|
␊
|
||||||
export interface TsTypeChanged {␊
|
export interface TsTypeChanged {␊
|
||||||
|
|
Binary file not shown.
|
@ -55,6 +55,7 @@ import {
|
||||||
threadsafeFunctionClosureCapture,
|
threadsafeFunctionClosureCapture,
|
||||||
tsfnCallWithCallback,
|
tsfnCallWithCallback,
|
||||||
tsfnAsyncCall,
|
tsfnAsyncCall,
|
||||||
|
tsfnThrowFromJs,
|
||||||
asyncPlus100,
|
asyncPlus100,
|
||||||
getGlobal,
|
getGlobal,
|
||||||
getUndefined,
|
getUndefined,
|
||||||
|
@ -843,6 +844,19 @@ Napi4Test('async call ThreadsafeFunction', async (t) => {
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test('Throw from ThreadsafeFunction JavaScript callback', async (t) => {
|
||||||
|
const errMsg = 'ThrowFromJavaScriptRawCallback'
|
||||||
|
await t.throwsAsync(
|
||||||
|
() =>
|
||||||
|
tsfnThrowFromJs(() => {
|
||||||
|
throw new Error(errMsg)
|
||||||
|
}),
|
||||||
|
{
|
||||||
|
message: errMsg,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
Napi4Test('accept ThreadsafeFunction', async (t) => {
|
Napi4Test('accept ThreadsafeFunction', async (t) => {
|
||||||
await new Promise<void>((resolve, reject) => {
|
await new Promise<void>((resolve, reject) => {
|
||||||
acceptThreadsafeFunction((err, value) => {
|
acceptThreadsafeFunction((err, value) => {
|
||||||
|
@ -923,7 +937,7 @@ Napi4Test('object only from js', (t) => {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Napi4Test('promise in either', async (t) => {
|
test('promise in either', async (t) => {
|
||||||
t.is(await promiseInEither(1), false)
|
t.is(await promiseInEither(1), false)
|
||||||
t.is(await promiseInEither(20), true)
|
t.is(await promiseInEither(20), true)
|
||||||
t.is(await promiseInEither(Promise.resolve(1)), false)
|
t.is(await promiseInEither(Promise.resolve(1)), false)
|
||||||
|
|
2
examples/napi/index.d.ts
vendored
2
examples/napi/index.d.ts
vendored
|
@ -508,6 +508,8 @@ export function tsfnReturnPromise(func: (err: Error | null, value: number) => an
|
||||||
|
|
||||||
export function tsfnReturnPromiseTimeout(func: (err: Error | null, value: number) => any): Promise<number>
|
export function tsfnReturnPromiseTimeout(func: (err: Error | null, value: number) => any): Promise<number>
|
||||||
|
|
||||||
|
export function tsfnThrowFromJs(tsfn: (err: Error | null, value: number) => any): Promise<number>
|
||||||
|
|
||||||
export function tsRename(a: { foo: number }): string[]
|
export function tsRename(a: { foo: number }): string[]
|
||||||
|
|
||||||
export interface TsTypeChanged {
|
export interface TsTypeChanged {
|
||||||
|
|
|
@ -160,3 +160,8 @@ pub async fn tsfn_return_promise_timeout(func: ThreadsafeFunction<u32>) -> Resul
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[napi]
|
||||||
|
pub async fn tsfn_throw_from_js(tsfn: ThreadsafeFunction<u32>) -> napi::Result<u32> {
|
||||||
|
tsfn.call_async::<Promise<u32>>(Ok(42)).await?.await
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue