From dc3a4c9f255d7f586d8a76c886ac94c5be0fa36e Mon Sep 17 00:00:00 2001 From: LongYinan Date: Wed, 11 Jan 2023 18:54:45 +0800 Subject: [PATCH] feat(napi): refactor ThreadsafeFunction to allow get return value of it (#1427) --- crates/napi/src/env.rs | 17 + crates/napi/src/threadsafe_function.rs | 342 ++++++++++++------ .../__test__/napi4/tsfn-dua-instance.js | 12 +- examples/napi-compat-mode/src/napi4/mod.rs | 13 +- examples/napi-compat-mode/src/napi4/tsfn.rs | 2 +- .../src/napi4/tsfn_dua_instance.rs | 23 +- examples/napi/__test__/typegen.spec.ts.md | 2 + examples/napi/__test__/typegen.spec.ts.snap | Bin 3602 -> 3635 bytes examples/napi/__test__/values.spec.ts | 26 ++ examples/napi/index.d.ts | 2 + examples/napi/src/threadsafe_function.rs | 28 ++ 11 files changed, 354 insertions(+), 113 deletions(-) diff --git a/crates/napi/src/env.rs b/crates/napi/src/env.rs index b5ec8f75..6f0587d8 100644 --- a/crates/napi/src/env.rs +++ b/crates/napi/src/env.rs @@ -1116,6 +1116,23 @@ impl Env { Ok(unsafe { JsObject::from_raw_unchecked(self.0, promise) }) } + #[cfg(all(feature = "tokio_rt", feature = "napi4"))] + pub fn spawn_future< + T: 'static + Send + ToNapiValue, + F: 'static + Send + Future>, + >( + &self, + fut: F, + ) -> Result { + use crate::tokio_runtime; + + let promise = tokio_runtime::execute_tokio_future(self.0, fut, |env, val| unsafe { + ToNapiValue::to_napi_value(env, val) + })?; + + Ok(unsafe { JsObject::from_raw_unchecked(self.0, promise) }) + } + /// Creates a deferred promise, which can be resolved or rejected from a background thread. #[cfg(feature = "napi4")] pub fn create_deferred Result>( diff --git a/crates/napi/src/threadsafe_function.rs b/crates/napi/src/threadsafe_function.rs index 4912d6e6..369b69a0 100644 --- a/crates/napi/src/threadsafe_function.rs +++ b/crates/napi/src/threadsafe_function.rs @@ -5,11 +5,11 @@ use std::ffi::CString; use std::marker::PhantomData; use std::os::raw::c_void; use std::ptr; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; -use crate::bindgen_runtime::ToNapiValue; -use crate::{check_status, sys, Env, Error, JsError, Result, Status}; +use crate::bindgen_runtime::{FromNapiValue, ToNapiValue}; +use crate::{check_status, sys, Env, JsError, JsUnknown, Result, Status}; /// ThreadSafeFunction Context object /// the `value` is the value passed to `call` method @@ -96,6 +96,46 @@ type_level_enum! { } } +struct ThreadsafeFunctionHandle { + raw: sys::napi_threadsafe_function, + aborted: RwLock, + referred: AtomicBool, +} + +unsafe impl Send for ThreadsafeFunctionHandle {} +unsafe impl Sync for ThreadsafeFunctionHandle {} + +impl Drop for ThreadsafeFunctionHandle { + fn drop(&mut self) { + let aborted_guard = self + .aborted + .read() + .expect("Threadsafe Function aborted lock failed"); + if !*aborted_guard && self.referred.load(Ordering::Acquire) { + let release_status = unsafe { + sys::napi_release_threadsafe_function(self.raw, sys::ThreadsafeFunctionReleaseMode::release) + }; + assert!( + release_status == sys::Status::napi_ok, + "Threadsafe Function release failed {}", + Status::from(release_status) + ); + } + } +} + +#[repr(u8)] +enum ThreadsafeFunctionCallVariant { + Direct, + WithCallback, +} + +struct ThreadsafeFunctionCallJsBackData { + data: T, + call_variant: ThreadsafeFunctionCallVariant, + callback: Box Result<()>>, +} + /// Communicate with the addon's main thread by invoking a JavaScript function from other threads. /// /// ## Example @@ -146,41 +186,28 @@ type_level_enum! { /// } /// ``` pub struct ThreadsafeFunction { - raw_tsfn: sys::napi_threadsafe_function, - aborted: Arc>, - ref_count: Arc, + handle: Arc, _phantom: PhantomData<(T, ES)>, } impl Clone for ThreadsafeFunction { fn clone(&self) -> Self { - let is_aborted = self.aborted.lock().unwrap(); - if !*is_aborted { - let acquire_status = unsafe { sys::napi_acquire_threadsafe_function(self.raw_tsfn) }; - debug_assert!( - acquire_status == sys::Status::napi_ok, - "Acquire threadsafe function failed in clone" - ); - } else { + let aborted_guard = self + .handle + .aborted + .read() + .expect("Threadsafe Function aborted lock failed"); + if *aborted_guard { panic!("ThreadsafeFunction was aborted, can not clone it"); } - self.ref_count.fetch_add(1, Ordering::AcqRel); - - drop(is_aborted); - Self { - raw_tsfn: self.raw_tsfn, - aborted: Arc::clone(&self.aborted), - ref_count: Arc::clone(&self.ref_count), + handle: self.handle.clone(), _phantom: PhantomData, } } } -unsafe impl Send for ThreadsafeFunction {} -unsafe impl Sync for ThreadsafeFunction {} - impl ThreadsafeFunction { /// See [napi_create_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_create_threadsafe_function) /// for more information. @@ -201,11 +228,8 @@ impl ThreadsafeFunction { sys::napi_create_string_utf8(env, s.as_ptr(), len, &mut async_resource_name) })?; - let initial_thread_count = 1usize; let mut raw_tsfn = ptr::null_mut(); - let ptr = Box::into_raw(Box::new(callback)) as *mut c_void; - let aborted = Arc::new(Mutex::new(false)); - let aborted_ptr = Arc::into_raw(aborted.clone()); + let callback_ptr = Box::into_raw(Box::new(callback)); check_status!(unsafe { sys::napi_create_threadsafe_function( env, @@ -213,19 +237,23 @@ impl ThreadsafeFunction { ptr::null_mut(), async_resource_name, max_queue_size, - initial_thread_count, - aborted_ptr as *mut c_void, + 1, + ptr::null_mut(), Some(thread_finalize_cb::), - ptr, + callback_ptr.cast(), Some(call_js_cb::), &mut raw_tsfn, ) })?; + check_status!(unsafe { sys::napi_acquire_threadsafe_function(raw_tsfn) })?; + Ok(ThreadsafeFunction { - raw_tsfn, - aborted, - ref_count: Arc::new(AtomicUsize::new(initial_thread_count)), + handle: Arc::new(ThreadsafeFunctionHandle { + raw: raw_tsfn, + aborted: RwLock::new(false), + referred: AtomicBool::new(true), + }), _phantom: PhantomData, }) } @@ -235,54 +263,63 @@ impl ThreadsafeFunction { /// /// "ref" is a keyword so that we use "refer" here. pub fn refer(&mut self, env: &Env) -> Result<()> { - let is_aborted = self.aborted.lock().unwrap(); - if *is_aborted { - return Err(Error::new( - Status::Closing, - "Can not ref, Thread safe function already aborted".to_string(), - )); + let aborted_guard = self + .handle + .aborted + .read() + .expect("Threadsafe Function aborted lock failed"); + if !*aborted_guard && !self.handle.referred.load(Ordering::Acquire) { + check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.handle.raw) })?; + self.handle.referred.store(true, Ordering::Release); } - drop(is_aborted); - self.ref_count.fetch_add(1, Ordering::AcqRel); - check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.raw_tsfn) }) + Ok(()) } /// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function) /// for more information. pub fn unref(&mut self, env: &Env) -> Result<()> { - let is_aborted = self.aborted.lock().unwrap(); - if *is_aborted { - return Err(Error::new( - Status::Closing, - "Can not unref, Thread safe function already aborted".to_string(), - )); + let aborted_guard = self + .handle + .aborted + .read() + .expect("Threadsafe Function aborted lock failed"); + if !*aborted_guard && self.handle.referred.load(Ordering::Acquire) { + check_status!(unsafe { sys::napi_unref_threadsafe_function(env.0, self.handle.raw) })?; + self.handle.referred.store(false, Ordering::Release); } - self.ref_count.fetch_sub(1, Ordering::AcqRel); - check_status!(unsafe { sys::napi_unref_threadsafe_function(env.0, self.raw_tsfn) }) + Ok(()) } pub fn aborted(&self) -> bool { - let is_aborted = self.aborted.lock().unwrap(); - *is_aborted + let aborted_guard = self + .handle + .aborted + .read() + .expect("Threadsafe Function aborted lock failed"); + *aborted_guard } pub fn abort(self) -> Result<()> { - let mut is_aborted = self.aborted.lock().unwrap(); - if !*is_aborted { + let mut aborted_guard = self + .handle + .aborted + .write() + .expect("Threadsafe Function aborted lock failed"); + if !*aborted_guard { check_status!(unsafe { sys::napi_release_threadsafe_function( - self.raw_tsfn, + self.handle.raw, sys::ThreadsafeFunctionReleaseMode::abort, ) })?; + *aborted_guard = true; } - *is_aborted = true; Ok(()) } /// Get the raw `ThreadSafeFunction` pointer pub fn raw(&self) -> sys::napi_threadsafe_function { - self.raw_tsfn + self.handle.raw } } @@ -290,65 +327,149 @@ impl ThreadsafeFunction { /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function) /// for more information. pub fn call(&self, value: Result, mode: ThreadsafeFunctionCallMode) -> Status { - let is_aborted = self.aborted.lock().unwrap(); - if *is_aborted { - return Status::Closing; - } unsafe { sys::napi_call_threadsafe_function( - self.raw_tsfn, - Box::into_raw(Box::new(value)) as *mut c_void, + self.handle.raw, + Box::into_raw(Box::new(value.map(|data| { + ThreadsafeFunctionCallJsBackData { + data, + call_variant: ThreadsafeFunctionCallVariant::Direct, + callback: Box::new(|_d: JsUnknown| Ok(())), + } + }))) + .cast(), mode.into(), ) } .into() } + + pub fn call_with_return_value Result<()>>( + &self, + value: Result, + mode: ThreadsafeFunctionCallMode, + cb: F, + ) -> Status { + unsafe { + sys::napi_call_threadsafe_function( + self.handle.raw, + Box::into_raw(Box::new(value.map(|data| { + ThreadsafeFunctionCallJsBackData { + data, + call_variant: ThreadsafeFunctionCallVariant::WithCallback, + callback: Box::new(move |d: JsUnknown| { + D::from_napi_value(d.0.env, d.0.value).and_then(cb) + }), + } + }))) + .cast(), + mode.into(), + ) + } + .into() + } + + #[cfg(feature = "tokio_rt")] + pub async fn call_async(&self, value: Result) -> Result { + let (sender, receiver) = tokio::sync::oneshot::channel::(); + check_status!(unsafe { + sys::napi_call_threadsafe_function( + self.handle.raw, + Box::into_raw(Box::new(value.map(|data| { + ThreadsafeFunctionCallJsBackData { + data, + call_variant: ThreadsafeFunctionCallVariant::WithCallback, + callback: Box::new(move |d: JsUnknown| { + D::from_napi_value(d.0.env, d.0.value).and_then(move |d| { + sender.send(d).map_err(|_| { + crate::Error::from_reason("Failed to send return value to tokio sender") + }) + }) + }), + } + }))) + .cast(), + ThreadsafeFunctionCallMode::NonBlocking.into(), + ) + })?; + receiver + .await + .map_err(|err| crate::Error::new(Status::GenericFailure, format!("{}", err))) + } } impl ThreadsafeFunction { /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function) /// for more information. pub fn call(&self, value: T, mode: ThreadsafeFunctionCallMode) -> Status { - let is_aborted = self.aborted.lock().unwrap(); - if *is_aborted { - return Status::Closing; - } unsafe { sys::napi_call_threadsafe_function( - self.raw_tsfn, - Box::into_raw(Box::new(value)) as *mut c_void, + self.handle.raw, + Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData { + data: value, + call_variant: ThreadsafeFunctionCallVariant::Direct, + callback: Box::new(|_d: JsUnknown| Ok(())), + })) + .cast(), mode.into(), ) } .into() } -} -impl Drop for ThreadsafeFunction { - fn drop(&mut self) { - let mut is_aborted = self.aborted.lock().unwrap(); - if !*is_aborted && self.ref_count.load(Ordering::Acquire) <= 1 { - let release_status = unsafe { - sys::napi_release_threadsafe_function( - self.raw_tsfn, - sys::ThreadsafeFunctionReleaseMode::release, - ) - }; - assert!( - release_status == sys::Status::napi_ok, - "Threadsafe Function release failed {:?}", - Status::from(release_status) - ); - *is_aborted = true; - } else { - self.ref_count.fetch_sub(1, Ordering::Release); + pub fn call_with_return_value Result<()>>( + &self, + value: T, + mode: ThreadsafeFunctionCallMode, + cb: F, + ) -> Status { + unsafe { + sys::napi_call_threadsafe_function( + self.handle.raw, + Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData { + data: value, + call_variant: ThreadsafeFunctionCallVariant::WithCallback, + callback: Box::new(move |d: JsUnknown| { + D::from_napi_value(d.0.env, d.0.value).and_then(cb) + }), + })) + .cast(), + mode.into(), + ) } - drop(is_aborted); + .into() + } + + #[cfg(feature = "tokio_rt")] + pub async fn call_async(&self, value: T) -> Result { + let (sender, receiver) = tokio::sync::oneshot::channel::(); + check_status!(unsafe { + sys::napi_call_threadsafe_function( + self.handle.raw, + Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData { + data: value, + call_variant: ThreadsafeFunctionCallVariant::WithCallback, + callback: Box::new(move |d: JsUnknown| { + D::from_napi_value(d.0.env, d.0.value).and_then(move |d| { + sender.send(d).map_err(|_| { + crate::Error::from_reason("Failed to send return value to tokio sender") + }) + }) + }), + })) + .cast(), + ThreadsafeFunctionCallMode::NonBlocking.into(), + ) + })?; + receiver + .await + .map_err(|err| crate::Error::new(Status::GenericFailure, format!("{}", err))) } } +#[allow(unused_variables)] unsafe extern "C" fn thread_finalize_cb( - _raw_env: sys::napi_env, + env: sys::napi_env, finalize_data: *mut c_void, finalize_hint: *mut c_void, ) where @@ -356,9 +477,6 @@ unsafe extern "C" fn thread_finalize_cb( { // cleanup drop(unsafe { Box::::from_raw(finalize_hint.cast()) }); - let aborted = unsafe { Arc::>::from_raw(finalize_data.cast()) }; - let mut is_aborted = aborted.lock().unwrap(); - *is_aborted = true; } unsafe extern "C" fn call_js_cb( @@ -375,11 +493,15 @@ unsafe extern "C" fn call_js_cb( return; } - let ctx: &mut R = unsafe { &mut *context.cast::() }; - let val: Result = unsafe { + let ctx: &mut R = unsafe { Box::leak(Box::from_raw(context.cast())) }; + let val = unsafe { match ES::VALUE { - ErrorStrategy::CalleeHandled::VALUE => *Box::>::from_raw(data.cast()), - ErrorStrategy::Fatal::VALUE => Ok(*Box::::from_raw(data.cast())), + ErrorStrategy::CalleeHandled::VALUE => { + *Box::>>::from_raw(data.cast()) + } + ErrorStrategy::Fatal::VALUE => Ok(*Box::>::from_raw( + data.cast(), + )), } }; @@ -389,15 +511,16 @@ unsafe extern "C" fn call_js_cb( let ret = val.and_then(|v| { (ctx)(ThreadSafeCallContext { env: unsafe { Env::from_raw(raw_env) }, - value: v, + value: v.data, }) + .map(|ret| (ret, v.call_variant, v.callback)) }); // Follow async callback conventions: https://nodejs.org/en/knowledge/errors/what-are-the-error-conventions/ // Check if the Result is okay, if so, pass a null as the first (error) argument automatically. // If the Result is an error, pass that as the first argument. let status = match ret { - Ok(values) => { + Ok((values, call_variant, callback)) => { let values = values .into_iter() .map(|v| unsafe { ToNapiValue::to_napi_value(raw_env, v) }); @@ -408,7 +531,8 @@ unsafe extern "C" fn call_js_cb( } else { values.collect() }; - match args { + let mut return_value = ptr::null_mut(); + let status = match args { Ok(args) => unsafe { sys::napi_call_function( raw_env, @@ -416,7 +540,7 @@ unsafe extern "C" fn call_js_cb( js_callback, args.len(), args.as_ptr(), - ptr::null_mut(), + &mut return_value, ) }, Err(e) => match ES::VALUE { @@ -430,11 +554,21 @@ unsafe extern "C" fn call_js_cb( js_callback, 1, [JsError::from(e).into_value(raw_env)].as_mut_ptr(), - ptr::null_mut(), + &mut return_value, ) }, }, + }; + if let ThreadsafeFunctionCallVariant::WithCallback = call_variant { + if let Err(err) = callback(JsUnknown(crate::Value { + env: raw_env, + value: return_value, + value_type: crate::ValueType::Unknown, + })) { + unsafe { sys::napi_throw(raw_env, JsError::from(err).into_value(raw_env)) }; + } } + status } Err(e) if ES::VALUE == ErrorStrategy::Fatal::VALUE => unsafe { sys::napi_fatal_exception(raw_env, JsError::from(e).into_value(raw_env)) diff --git a/examples/napi-compat-mode/__test__/napi4/tsfn-dua-instance.js b/examples/napi-compat-mode/__test__/napi4/tsfn-dua-instance.js index 8950b019..abd7283f 100644 --- a/examples/napi-compat-mode/__test__/napi4/tsfn-dua-instance.js +++ b/examples/napi-compat-mode/__test__/napi4/tsfn-dua-instance.js @@ -2,8 +2,16 @@ const bindings = require('../../index.node') async function main() { await Promise.resolve() - new bindings.A((s) => console.info(s)) - new bindings.A((s) => console.info(s)) + const a1 = new bindings.A((err, s) => { + console.info(s) + a1.unref() + }) + const a2 = new bindings.A((err, s) => { + console.info(s) + a2.unref() + }) + a1.call() + a2.call() } main().catch((e) => { diff --git a/examples/napi-compat-mode/src/napi4/mod.rs b/examples/napi-compat-mode/src/napi4/mod.rs index 844ef940..27b375d1 100644 --- a/examples/napi-compat-mode/src/napi4/mod.rs +++ b/examples/napi-compat-mode/src/napi4/mod.rs @@ -1,11 +1,11 @@ -use napi::{Env, JsObject, Result}; +use napi::{Env, JsObject, Property, Result}; mod deferred; mod tsfn; mod tsfn_dua_instance; use tsfn::*; -use tsfn_dua_instance::constructor; +use tsfn_dua_instance::*; pub fn register_js(exports: &mut JsObject, env: &Env) -> Result<()> { exports.create_named_method("testThreadsafeFunction", test_threadsafe_function)?; @@ -26,7 +26,14 @@ pub fn register_js(exports: &mut JsObject, env: &Env) -> Result<()> { exports.create_named_method("testTsfnWithRef", test_tsfn_with_ref)?; exports.create_named_method("testDeferred", deferred::test_deferred)?; - let obj = env.define_class("A", constructor, &[])?; + let obj = env.define_class( + "A", + constructor, + &[ + Property::new("call")?.with_method(call), + Property::new("unref")?.with_method(unref), + ], + )?; exports.set_named_property("A", obj)?; Ok(()) diff --git a/examples/napi-compat-mode/src/napi4/tsfn.rs b/examples/napi-compat-mode/src/napi4/tsfn.rs index dd464e38..5b845561 100644 --- a/examples/napi-compat-mode/src/napi4/tsfn.rs +++ b/examples/napi-compat-mode/src/napi4/tsfn.rs @@ -95,7 +95,7 @@ pub fn test_call_aborted_threadsafe_function(ctx: CallContext) -> Result napi::Result { let callback = ctx.get::(0)?; - let mut cb = + let cb = ctx .env .create_threadsafe_function(&callback, 0, |ctx: ThreadSafeCallContext| { @@ -23,11 +23,28 @@ pub fn constructor(ctx: CallContext) -> napi::Result { .map(|js_string| vec![js_string]) })?; - cb.unref(ctx.env)?; - let mut this: JsObject = ctx.this_unchecked(); let obj = A { cb }; ctx.env.wrap(&mut this, obj)?; ctx.env.get_undefined() } + +#[js_function] +pub fn call(ctx: CallContext) -> napi::Result { + let this = ctx.this_unchecked(); + let obj = ctx.env.unwrap::(&this)?; + obj.cb.call( + Ok("ThreadsafeFunction NonBlocking Call".to_owned()), + napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking, + ); + ctx.env.get_undefined() +} + +#[js_function] +pub fn unref(ctx: CallContext) -> napi::Result { + let this = ctx.this_unchecked(); + let obj = ctx.env.unwrap::(&this)?; + obj.cb.unref(ctx.env)?; + ctx.env.get_undefined() +} diff --git a/examples/napi/__test__/typegen.spec.ts.md b/examples/napi/__test__/typegen.spec.ts.md index 1e758c2b..2c99d8f3 100644 --- a/examples/napi/__test__/typegen.spec.ts.md +++ b/examples/napi/__test__/typegen.spec.ts.md @@ -195,6 +195,8 @@ Generated by [AVA](https://avajs.dev). export function threadsafeFunctionFatalMode(cb: (...args: any[]) => any): void␊ export function threadsafeFunctionFatalModeError(cb: (...args: any[]) => any): void␊ export function threadsafeFunctionClosureCapture(func: (...args: any[]) => any): void␊ + export function tsfnCallWithCallback(func: (...args: any[]) => any): void␊ + export function tsfnAsyncCall(func: (...args: any[]) => any): Promise␊ export function getBuffer(): Buffer␊ export function appendBuffer(buf: Buffer): Buffer␊ export function getEmptyBuffer(): Buffer␊ diff --git a/examples/napi/__test__/typegen.spec.ts.snap b/examples/napi/__test__/typegen.spec.ts.snap index d6b3df0b0717a738faafc5b064e52ad4a9c88e12..031a546009b36f651b186e6a25de0d6681f84193 100644 GIT binary patch literal 3635 zcmV-34$SdERzV4B*;!3g}q%5ChHnAk* z{s|0sHXn-!00000000B6TJLTgw-HZU6b1Y(Z@{`JvS-MSB^gF~@E@E`QX<=u4C!1a zaDs^!NYS{`?+MGUkcuM_kfz%*egZ??5BdjrW~KY^Avp_wASw z8867u;Pt`%lT%H=u2PXH62??YvUjCemeBZk`~moSGI~a??+}tP8i^!ckk3Br>xM>C zp78H;M&x2X7P0>P?q$DtJ?oPZG~L{}0aosj(Tqz1|5KQIA{BCA8PM;$d;5bouSVYu zk55N^lH~I-%PhqtPeR2-0`gODMHb1R*C&H4ql+g}WjvWa(?fj8lq6EjSF3;p78-LC{f;>by6`2i`J&aKP`G(m3>g5 zfm|eEkWw{+WzeF5p06`8=aN0y$tRPpnNsX@n8%8bE=54IsU6Je>yy%$PeJhpQ54XA z@v=e2_OWLlTKz)cSg{>GJxElr9@)X;M@uM8GpN9u$Bzc`RL?yabB7Znk1kOUFQXu& zaXhBsd7q#J9+u)GPoI$s!6N{`pH_E~;?G{(?!)8yh-u;>!Np>pz+T$2lM2Z;TYY;BMU zC{>;@32*Q2N|r_J{s$?NyJzI4%Vk*OYRhCV%S7h#@(~t0W58STLhct0sPn;t2M+)k z{18nR@6I#^JhjcGVDLm^Slb;v3oOM!lI9BVG{Oj2s%SQdGd-4N)$Gh7gS@~+fKzv8 zP>1N?Cq+SPpo3&s;0NJ&)AE2gF!Fe-E>oYBz>--up&=t?quS>zj)u5{9l$0+6ZnAJ z)R<>cpX|WDu={lT6x07LS2NvR1EHvWxE<>hGJesGV|>3KtXDAC#7yH{9w#hNuvzxW zTOO&If3JZEfsMqG{upqxu)EOa?-9;2k%(R0?~unam$&T)j(+oyC%Iycp7_&VBy5#> zMI`!_f{YNBM9+(t_WCf8CCiKlKhEm&E6u9hpgb%aE85dgwsYC1f#Ts{+cPLUjD?69 zOzY|>gKwW^hpGvNIS?1eO%Dl#LUm3W7_`q+@W977S7 z7~m4Hi38AsMBvl<_|>`_?B?Qonf5HpKDf8z)Oz9$4oeT;)`Ef7fr$|P*5X%0JH}@%r|b zG^R@?081XW!gk&FB~K#FAENHi>|96QgzSnbY_ShFiZ3t0K$)mwl0E@cam$Px%!*KE}sAKAqttzrdFx zu5tdRJ2MPMHXnAaBMdtd2isc#>(`lCl6ZiNzSLW1;0r8Ln()y3)$EEk4GbjzfjLIg zNDqD4KpmtA!cppA=}tn@K(jeqM`T{|!-@wVD}Kq$bLCEK70?E1%HQ#OIr&a`fdX;d zfpuaCp-Z-Qj%a(DbS!{m$vmy@F!?6uUR|OTe>sM6H4ryvddBQ@GMNe%0 zvo;4u5LcsIby~1msBi2(gMG?PXN`_FglcVq-X29rAEd1Uz#1h13sRWBbPV5QyU~XE)Q?2yOKcopQ%2ha+93+~H(xO(HhY!Juy8yBR zE?``l+orbvmnSTtU_-PI*{&0DkegEO9d#oCd2z-gc92B;f=4-x2if#EDQyFSuVX_J zv7}1zC)Q$0opUWKPo}G6gky?u51-T3vp_caLUjht%sYBdMH_ zZ)2pU7HG!PwZK(gXTrx|!ePjRWHm7)tqNTIOSX`hX4^Gv6@ncF*^0boq!Vy z4_+M%PBm8p8_Mrs+}f3+Xp3PH`2D{Q3?|8mnL|`S;1*Cr+aRof@)m?} zy9rf8jHML4VspC%(s}9E5SJ`5kX1QXQx_`!MiRrf;fy9zX0j#Ju)-~|QYY)pcIBRz z-QsTWQ4L8kvA}PS`taa&oXgFJ57&zq zYNtVoBO%PxQtsZ>ttsY(Q8BS0>8IbxyU0S{j>F6uwcvv3s{p}7T=s+5lb1E zL=ti)pSS!lO%~5I(Z%j+x-Lbdk;660(s}I?yz}`8_w=-Sul4osuZl@^iiPbH0k#e_ zX@Cccx_;d2ch`;~O`|-KfrLypWYta!=|Aw~L~*|G1`tAu=CDr8XNcdZ$>!tr$zM^( z-#6Ex&+NR{WJiJ>>zMN(L47yzrxS;Ib_`>CMcmr6^ewVWuuPGw!5CVHAm>cPG1#AH zn_LxOBicGp8$|M{PIWc)seV}lA7d`8ZO};4iIXQ)p;-5EgwIE+7S^b$R_N^Ox8m)C zZpVirVjmgqBeFP*h0HTHG$$hZK<4(Sa-t7Wba-qJQ+_p)J}X9;l@L~85UR6GQD6#NrKNUN zW(^mS=N8_-36HgCUQ;R2vhwMyA>)-d0IC3?1yU<9 zS)i)XbDiLXMR~|N7rY}LxI^xd_d4ptU{7T4i!)#y61+F9?_neTKIeFB3pxu)1B)X` zlptf+-t%$H!wrPdrv;aQ5Bt`X@OcWP$P;nkk?t?=@N776T6<@A+V+}dT!L>nO*~k5(t5y!-C8iY$%aI zlAwe<)Vl&gIB2N&np2>{uum5VdQRLUZDVs(q{cWDsf!s&+vMOI{_x_D!=!ng<9(9* zF;=`oF`u(UnRa9o!#*|_A2|8nzyI|Q-HsEvoC!MTH3r?;n|Xqhs!ba>-4#tB@S(g%1*u&Cyd;QJN_0 z8`yeXMz=S;=0Uj8Y&@Ikh@V}xtSd#MKVsevKx@?y;xgr=6S@(DTyt~DaSt|;FUQq) z<#u9v&bG!_fn|8!V%~SmyG|`m=i(eoLKp&T|DpR8x8Qg!t0-pTLfFIV(xTWRIAn?|RTWK4 zwDs_=-)^i*uRgV&Q{uWbqywQ)<11>bbJQ&jz3bN1-yewQ*CS5vBr zPq#K|ge_G);>ia($}<(Su0e_^^73}Z@gyh`=R7%2#HCx(H7;qW0|v6dGrt7UiPG?Y z28Z^~Uuym-VV5T~ouNz|K8)TUQh7cUv9bG-pTZz&ZN__`CoJxU;J-cd(@?P&av&w% zgnUZOLt_cd$LkKIfPn)ZIf+OCWs8QLE%dgV;hS!(pZYPwc(FpZ6;|i}6 z>;3AszFrnuTx)Oo#KNcPZE&9htNaJn(wa-W236&>Gloa%USrpnrp(nFW5b=^qxP+Z zQEt%tMY!en8t53i=GlbsOQdeXhJn^CleQ(bJZr}_nbqC)`B)RZGbM}%T;VP$vNJTz zgW~0h6}`EvXXEzO&|AL&yO6I*%>v1o>zEL`W4vuM;XKRqeS>o46WEJYlJ5Ek?vM*I+V;h!rz;j861(7Y??U>e#i>YTrCOJw3@|_KHjN&qFEgTSL#Q z4;mAEGro6RuQz@>r6{XYwTRj2j4>U@!BAC0XP(i$ebo9HCLNq6n+|DHjP*buL;f4w z+ifSab8`hkAQdpCpY{`vaGS8;|U^T-mu3+QQ?E6=#^;GW~kx!ST(V{{xiK#!MYF F000&06r%tD literal 3602 zcmV+t4(;(lRzVk0qdg3o*_GyWEkneA2^+)M7AXv(z#Bc z2r+M$cSmb)xmj{0T@(S$8}xVo6v-3z5jsOIcex+DJ2j4D-;z5V&J5?9`^#h;i||7J z{3}yj#q9cTawhUPB2$*Ij4DP#k!37YL@iQAA~qot9y2ZzngXPfKmQU%RMBs@pZ(#p zKYfP({qB#S{rUF~x4(Eml#F?z`Vp6O95eFZi+eE1^yEWl5@%_y#6!DhM8*qpG9aAc_LoF9sWAY#)2} zp|vgqiWS@8(}P3>>wz6SeX<17OoIx%d-`M`Pc`qsm^+jhd31?-cohX9jpH#5FZu)} z@VFEodH#Z23LXIh{An#0DgNvwUJM%qMw>6~ko*Kz0~XH{u*jc4&~d!x9;b?C%C}1g zTC?#T8PS_8;b!GNdbITk*gXa()1=}~XwfBrL*?W-xgnR}4iNWqvb8}bpj3IrB#h4P zOO{3K;YTTw`{(4g%Vk*OYRhCV%S7hV@(B(*V?bN-O70gEsPn<2M~?t8_#v7s&dxOs zcxszVLEwqTVQqKxBCrq#Nt!Fbr!kCxqzYz(IP9@3tGF|V4DtdO0Z!fJppK%0o)iVG z5gjDM0)G&WH!Tf_2X-EB)n)3F5?C_JCNyNk1gd?`;%JBw>;QrYP2dHFsWH!@KG}hP zA^LRt6tn(4S2Mj_BSKO8@OGS2$oOS99^?D{$a)2FO`K_*%j1Lv3W8;yyyuac`C|=a z2y7&d^vi&ogWZKW|B7&yiA3z`b%#8TxxDK-aP*spJjoSnti+%8A>pjlCnC|W6kvp~ zBzjp4+ULVSmMk+K{5Y%gSDIBBpgbfSE85ej?Ba4w1B-`(ZO^9gFcuqCZR^mI%)u%?e%P|#Ui9uW~iRhnUOk|?lo0r~p&mNceICV-Ya zZl&#d?Q5Pynm)w3L$eE=c@wfLrVwHuZxnA{gUgZHHy7`RRKfdA`+h{FS`;hu@B(9X z#ig^|-aG|3+^?uDz7;vV#{z2%zWsuL7=UV)LX3<+`WgI^gk6=4<(_}{Qh0!izS5yH@Ff;0O?c>iYoekp0|Uu_WRB4^%0pi@P)AZE z;V5;mbf+L`V6!z`M`m8q!-@wVDSpY!bLBF&3eg5?%0KXZ$$Y20K!G?$U|kqO>XNNp zBifaw91EaWGEb{lj20=2%!Y+!zXK2PrJh-1KV1mfm8%N0a+L`^0aj9F)GMC6i!ibW z(Weof`jYg2Fpc{R#a zmj$bZ`p(7~TvG;}H5S?wsucviD~gmpNLvknHCh4|Nn!rdF?>%6UaruMBfUaDt-+g# z%%BPSES6SY8$`okZCd41t@6yjrAaNyQ#hC$B$|rSqE?!R4}%qVfyfHDK;z2XPPP5N zJYfk18=`%vcAboa+?I0h=o<;Bi!&avgCydYJj!W2$fn0hX&aDy9UG8{B~^((u@+P6 zybHJ0O11C0uP&#?Fy)rqN?bW4AhRp$>`w9=>O4KV>+9LmCxe*NGJ`5Tw8vXQr~-$O zqQ3Wh4xy{VXcw6#<6RxeoTf!)3TpJyNo_l{x_pOYk7jCz)ax$;shp56FjCVCH2djZ z;3}^(;bSo2Fyuk98W@UJ1+M-zTSzRk?HRTj!rrQXd2jZsSd6I_0=DMGgiQ5B7wBqH z=w5GIq)vSh$JT+F@TIfOnrg$CPT9*PX@5~twQ!+$U|oBUPW^BhaB|_nn}flrrfOhQ z`5nw#yJ{3|F)RYVpBJ2f9%uwux-8*flAIVGq5=hmfEu<9!U|O0h7g9EP&LF@O3^De zcUd5vLBED^$q@rsRdY3Up`vdjF?}1(XfkD{T0#vg+!8BwvOaB3?q&IYNk0p{CIpI< zWom(R+?Xn-ungnEHjrZc3uINd3Z#wIfE0`cetXoH2XEtCZa#jzUcOMf4N4pdVWpOW zuLpg9g#B2jI;()*8h}^$zIDn)9SP;1?e~HOV>MWyHN|D|&RG?)lz~YkA!qVs%Ma6J z@j?S#?yiRGQZyPlRHH1NVVB^Y*GCxB(`sB>>yNLBNp*^i?GpjE4lrqe7mB)n-0OGu zj-gDWJduHfN;YIwq=oVyXmYYRUwQ)=LW<_FPR?h5->J#w)AhmMP^jNG*MZM0-)oX1 z!H#v#d61yKoBY$sLp>40*j|yh_AGo0>8v0ZZ*1(TB7xp%2Bg%`S+Xvl_4@Ja2F}hDk z;xHC6&)Cr1i0BKMJ7WbiESd&Ao07Kuaf&X=_7?Jqle>CWV4DRyYAaEL4T9@b@3yyE zJ&0mz<)wPPLFDu^dNn+rCQiHmu^$WSY~Crgh-GY2HA>J_s*yGqJsie#ZqfV0-B^p} zEtL{=IG@fMGG2MtUWMWol3FvzBB~}m7xyPD%0t$9;2n|o9(h1M===`TDv^CCZd-Nm z{a|K4K*;(b=XgO1CJ*U3iz7*tAY%xm`8ej`2BOh@`76K=5n@XCJOxmsi8$yH?yqNf z3mZ7Iz4Lo+=T(1zGD!HG#$^{-x0E4xQ!}mOcKPYa1-$HQkTT+%bOJ-YvyWNkOa#@+ zjauf=YapTok_}j}Aae$;J2IFgC?OBE|3wrIHdK5~DL`R@rAzUilVM0Z+1wSOG2=t% zf+K0C9Q=kp{KUo~(hTSL<;KGpd&{Ai&sn0(Jd%kC7MqKYoc#Ac|Nf_*M@Ft@g3fu3 zK$m;NC-|ls+d%2Am}Z)a3DH`h$=iLx!tr|iG zq@2WEPhvW0z7cZVgDsiMef3vqJ2AarTVw2XGQ8O^;~g{BN&M+tTwwDBlTGbE^t$4E zGhWNz3r<`V_FA~KD7FYL7)Z3(7{1*G}66goy?YwPW$~%@&QXlI}ADrLZBPc z5IZJU?Anx)bGjI_BgUetcZeme-rn`wjZ^8>Iot)J3(CuMwAiH%=sNRSP^h4dTXHP; zw#%KO>hGFcK1cVvl*zc7QdNAqwNV3Xsp=6=KGIR1shD+bQn1L&?;wsRK~bXS$weZr z+~%oqNkg45kOkhZC5W9Ujs7p-(Ejyn%|9jV>V&2UecmUoQ(SuFW@nV&T(t7~JPTD*utSwB`ywII610 z8NFG%xvo~C#e;!I{M-4r%UTB=)n{nLnA-nP0DMeYGszuCBXN>7Q4z{W$I_r$? z@KIZ5h;;BY$#g)QVyq_u8PeaN-iDpb^5zPJKq+8M$K6Fspkuzlgav8v(z&wJTw9!Z z=`1%!er1YH=j-d4(fHa0$>7bwS4W3?Myh1-D?68=Exb5b Yaa-6e({EN@(ca$sADwL>REIMF0K8k{KmY&$ diff --git a/examples/napi/__test__/values.spec.ts b/examples/napi/__test__/values.spec.ts index af98615a..b6f38072 100644 --- a/examples/napi/__test__/values.spec.ts +++ b/examples/napi/__test__/values.spec.ts @@ -51,6 +51,8 @@ import { callThreadsafeFunction, threadsafeFunctionThrowError, threadsafeFunctionClosureCapture, + tsfnCallWithCallback, + tsfnAsyncCall, asyncPlus100, getGlobal, getUndefined, @@ -759,6 +761,30 @@ Napi4Test('Promise should reject raw error in rust', async (t) => { t.is(err, fxError) }) +Napi4Test('call ThreadsafeFunction with callback', async (t) => { + await t.notThrowsAsync( + () => + new Promise((resolve) => { + tsfnCallWithCallback(() => { + resolve() + return 'ReturnFromJavaScriptRawCallback' + }) + }), + ) +}) + +Napi4Test('async call ThreadsafeFunction', async (t) => { + await t.notThrowsAsync(() => + tsfnAsyncCall((err, arg1, arg2, arg3) => { + t.is(err, null) + t.is(arg1, 0) + t.is(arg2, 1) + t.is(arg3, 2) + return 'ReturnFromJavaScriptRawCallback' + }), + ) +}) + const Napi5Test = Number(process.versions.napi) >= 5 ? test : test.skip Napi5Test('Date test', (t) => { diff --git a/examples/napi/index.d.ts b/examples/napi/index.d.ts index 9cadb8a7..a9c5ca4a 100644 --- a/examples/napi/index.d.ts +++ b/examples/napi/index.d.ts @@ -185,6 +185,8 @@ export function threadsafeFunctionThrowError(cb: (...args: any[]) => any): void export function threadsafeFunctionFatalMode(cb: (...args: any[]) => any): void export function threadsafeFunctionFatalModeError(cb: (...args: any[]) => any): void export function threadsafeFunctionClosureCapture(func: (...args: any[]) => any): void +export function tsfnCallWithCallback(func: (...args: any[]) => any): void +export function tsfnAsyncCall(func: (...args: any[]) => any): Promise export function getBuffer(): Buffer export function appendBuffer(buf: Buffer): Buffer export function getEmptyBuffer(): Buffer diff --git a/examples/napi/src/threadsafe_function.rs b/examples/napi/src/threadsafe_function.rs index 089fa858..546e870e 100644 --- a/examples/napi/src/threadsafe_function.rs +++ b/examples/napi/src/threadsafe_function.rs @@ -76,3 +76,31 @@ fn threadsafe_function_closure_capture(func: JsFunction) -> napi::Result<()> { Ok(()) } + +#[napi] +pub fn tsfn_call_with_callback(func: JsFunction) -> napi::Result<()> { + let tsfn: ThreadsafeFunction<()> = + func.create_threadsafe_function(0, move |_| Ok(Vec::::new()))?; + tsfn.call_with_return_value( + Ok(()), + ThreadsafeFunctionCallMode::NonBlocking, + |value: String| { + println!("{}", value); + assert_eq!(value, "ReturnFromJavaScriptRawCallback".to_owned()); + Ok(()) + }, + ); + Ok(()) +} + +#[napi(ts_return_type = "Promise")] +pub fn tsfn_async_call(env: Env, func: JsFunction) -> napi::Result { + let tsfn: ThreadsafeFunction<()> = + func.create_threadsafe_function(0, move |_| Ok(vec![0u32, 1u32, 2u32]))?; + + env.spawn_future(async move { + let msg: String = tsfn.call_async(Ok(())).await?; + assert_eq!(msg, "ReturnFromJavaScriptRawCallback".to_owned()); + Ok(()) + }) +}