From 5541d650a9f4b55a6e07984652ac72505068184a Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Sun, 2 Oct 2022 22:00:48 -0700 Subject: [PATCH] feat(napi): add threadsafe deferred values (#1306) --- crates/napi/src/env.rs | 8 ++ crates/napi/src/js_values/deferred.rs | 126 ++++++++++++++++++ crates/napi/src/js_values/mod.rs | 4 + crates/napi/src/lib.rs | 2 - crates/napi/src/promise.rs | 117 ---------------- crates/napi/src/tokio_runtime.rs | 17 ++- .../__test__/napi4/deferred.spec.ts | 15 +++ .../napi-compat-mode/src/napi4/deferred.rs | 20 +++ examples/napi-compat-mode/src/napi4/mod.rs | 2 + 9 files changed, 187 insertions(+), 124 deletions(-) create mode 100644 crates/napi/src/js_values/deferred.rs delete mode 100644 crates/napi/src/promise.rs create mode 100644 examples/napi-compat-mode/__test__/napi4/deferred.spec.ts create mode 100644 examples/napi-compat-mode/src/napi4/deferred.rs diff --git a/crates/napi/src/env.rs b/crates/napi/src/env.rs index e5ab7b6c..20849d77 100644 --- a/crates/napi/src/env.rs +++ b/crates/napi/src/env.rs @@ -1109,6 +1109,14 @@ impl Env { Ok(unsafe { JsObject::from_raw_unchecked(self.0, promise) }) } + /// Creates a deferred promise, which can be resolved or rejected from a background thread. + #[cfg(feature = "napi4")] + pub fn create_deferred Result>( + &self, + ) -> Result<(JsDeferred, JsObject)> { + JsDeferred::new(self.raw()) + } + /// This API does not observe leap seconds; they are ignored, as ECMAScript aligns with POSIX time specification. /// /// This API allocates a JavaScript Date object. diff --git a/crates/napi/src/js_values/deferred.rs b/crates/napi/src/js_values/deferred.rs new file mode 100644 index 00000000..2660e283 --- /dev/null +++ b/crates/napi/src/js_values/deferred.rs @@ -0,0 +1,126 @@ +use std::ffi::CStr; +use std::marker::PhantomData; +use std::os::raw::c_void; +use std::ptr; + +use crate::bindgen_runtime::ToNapiValue; +use crate::{check_status, JsError, JsObject, Value}; +use crate::{sys, Env, Error, Result}; + +pub struct JsDeferred Result> { + tsfn: sys::napi_threadsafe_function, + _data: PhantomData, + _resolver: PhantomData, +} + +unsafe impl Result> Send + for JsDeferred +{ +} + +impl Result> JsDeferred { + pub(crate) fn new(env: sys::napi_env) -> Result<(Self, JsObject)> { + let mut raw_promise = ptr::null_mut(); + let mut raw_deferred = ptr::null_mut(); + check_status! { + unsafe { sys::napi_create_promise(env, &mut raw_deferred, &mut raw_promise) } + }?; + + // Create a threadsafe function so we can call back into the JS thread when we are done. + let mut async_resource_name = ptr::null_mut(); + let s = unsafe { CStr::from_bytes_with_nul_unchecked(b"napi_resolve_deferred\0") }; + check_status!(unsafe { + sys::napi_create_string_utf8(env, s.as_ptr(), 22, &mut async_resource_name) + })?; + + let mut tsfn = ptr::null_mut(); + check_status! {unsafe { + sys::napi_create_threadsafe_function( + env, + ptr::null_mut(), + ptr::null_mut(), + async_resource_name, + 0, + 1, + ptr::null_mut(), + None, + raw_deferred as *mut c_void, + Some(napi_resolve_deferred::), + &mut tsfn, + ) + }}?; + + let deferred = Self { + tsfn, + _data: PhantomData, + _resolver: PhantomData, + }; + + let promise = JsObject(Value { + env, + value: raw_promise, + value_type: crate::ValueType::Object, + }); + + Ok((deferred, promise)) + } + + /// Consumes the deferred, and resolves the promise. The provided function will be called + /// from the JavaScript thread, and should return the resolved value. + pub fn resolve(self, resolver: Resolver) { + self.call_tsfn(Ok(resolver)) + } + + /// Consumes the deferred, and rejects the promise with the provided error. + pub fn reject(self, error: Error) { + self.call_tsfn(Err(error)) + } + + fn call_tsfn(self, result: Result) { + // Call back into the JS thread via a threadsafe function. This results in napi_resolve_deferred being called. + let status = unsafe { + sys::napi_call_threadsafe_function( + self.tsfn, + Box::into_raw(Box::from(result)) as *mut c_void, + sys::ThreadsafeFunctionCallMode::nonblocking, + ) + }; + debug_assert!( + status == sys::Status::napi_ok, + "Call threadsafe function failed" + ); + + let status = unsafe { + sys::napi_release_threadsafe_function(self.tsfn, sys::ThreadsafeFunctionReleaseMode::release) + }; + debug_assert!( + status == sys::Status::napi_ok, + "Release threadsafe function failed" + ); + } +} + +extern "C" fn napi_resolve_deferred Result>( + env: sys::napi_env, + _js_callback: sys::napi_value, + context: *mut c_void, + data: *mut c_void, +) { + let deferred = context as sys::napi_deferred; + let resolver = unsafe { Box::from_raw(data as *mut Result) }; + let result = resolver + .and_then(|resolver| resolver(unsafe { Env::from_raw(env) })) + .and_then(|res| unsafe { ToNapiValue::to_napi_value(env, res) }); + + match result { + Ok(res) => { + let status = unsafe { sys::napi_resolve_deferred(env, deferred, res) }; + debug_assert!(status == sys::Status::napi_ok, "Resolve promise failed"); + } + Err(e) => { + let status = + unsafe { sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)) }; + debug_assert!(status == sys::Status::napi_ok, "Reject promise failed"); + } + } +} diff --git a/crates/napi/src/js_values/mod.rs b/crates/napi/src/js_values/mod.rs index 0c56a7b2..9c016972 100644 --- a/crates/napi/src/js_values/mod.rs +++ b/crates/napi/src/js_values/mod.rs @@ -19,6 +19,8 @@ mod boolean; mod buffer; #[cfg(feature = "napi5")] mod date; +#[cfg(feature = "napi4")] +mod deferred; mod either; mod escapable_handle_scope; mod function; @@ -41,6 +43,8 @@ pub use buffer::*; pub use date::*; #[cfg(feature = "serde-json")] pub(crate) use de::De; +#[cfg(feature = "napi4")] +pub use deferred::*; pub use either::Either; pub use escapable_handle_scope::EscapableHandleScope; pub use function::JsFunction; diff --git a/crates/napi/src/lib.rs b/crates/napi/src/lib.rs index 002e6d5a..9d5de8a6 100644 --- a/crates/napi/src/lib.rs +++ b/crates/napi/src/lib.rs @@ -86,8 +86,6 @@ mod cleanup_env; mod env; mod error; mod js_values; -#[cfg(all(feature = "tokio_rt", feature = "napi4"))] -mod promise; mod status; mod task; #[cfg(all(feature = "tokio_rt", feature = "napi4"))] diff --git a/crates/napi/src/promise.rs b/crates/napi/src/promise.rs deleted file mode 100644 index ae8dd5cf..00000000 --- a/crates/napi/src/promise.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::ffi::CStr; -use std::future::Future; -use std::marker::PhantomData; -use std::os::raw::c_void; -use std::ptr; - -use crate::{check_status, sys, JsError, Result}; - -pub struct FuturePromise Result> { - deferred: sys::napi_deferred, - env: sys::napi_env, - tsfn: sys::napi_threadsafe_function, - async_resource_name: sys::napi_value, - resolver: Resolver, - _data: PhantomData, -} - -#[allow(clippy::non_send_fields_in_send_ty)] -unsafe impl Result> Send - for FuturePromise -{ -} - -impl Result> - FuturePromise -{ - pub fn new(env: sys::napi_env, deferred: sys::napi_deferred, resolver: Resolver) -> Result { - let mut async_resource_name = ptr::null_mut(); - let s = unsafe { CStr::from_bytes_with_nul_unchecked(b"napi_resolve_promise_from_future\0") }; - check_status!(unsafe { - sys::napi_create_string_utf8(env, s.as_ptr(), 32, &mut async_resource_name) - })?; - - Ok(FuturePromise { - deferred, - resolver, - env, - tsfn: ptr::null_mut(), - async_resource_name, - _data: PhantomData, - }) - } - - pub(crate) fn start(self) -> Result { - let mut tsfn_value = ptr::null_mut(); - let async_resource_name = self.async_resource_name; - let env = self.env; - let self_ref = Box::leak(Box::from(self)); - check_status!(unsafe { - sys::napi_create_threadsafe_function( - env, - ptr::null_mut(), - ptr::null_mut(), - async_resource_name, - 0, - 1, - ptr::null_mut(), - None, - self_ref as *mut FuturePromise as *mut c_void, - Some(call_js_cb::), - &mut tsfn_value, - ) - })?; - self_ref.tsfn = tsfn_value; - Ok(TSFNValue(tsfn_value)) - } -} - -pub(crate) struct TSFNValue(sys::napi_threadsafe_function); - -unsafe impl Send for TSFNValue {} - -pub(crate) async fn resolve_from_future>>( - tsfn_value: TSFNValue, - fut: Fut, -) { - let val = fut.await; - check_status!(unsafe { - sys::napi_call_threadsafe_function( - tsfn_value.0, - Box::into_raw(Box::from(val)) as *mut c_void, - sys::ThreadsafeFunctionCallMode::nonblocking, - ) - }) - .expect("Failed to call thread safe function"); - check_status!(unsafe { - sys::napi_release_threadsafe_function(tsfn_value.0, sys::ThreadsafeFunctionReleaseMode::release) - }) - .expect("Failed to release thread safe function"); -} - -unsafe extern "C" fn call_js_cb< - Data, - Resolver: FnOnce(sys::napi_env, Data) -> Result, ->( - env: sys::napi_env, - _js_callback: sys::napi_value, - context: *mut c_void, - data: *mut c_void, -) { - let future_promise = unsafe { Box::from_raw(context as *mut FuturePromise) }; - let value = unsafe { Box::from_raw(data as *mut Result) }; - let resolver = future_promise.resolver; - let deferred = future_promise.deferred; - let js_value_to_resolve = value.and_then(move |v| (resolver)(env, v)); - match js_value_to_resolve { - Ok(v) => { - let status = unsafe { sys::napi_resolve_deferred(env, deferred, v) }; - debug_assert!(status == sys::Status::napi_ok, "Resolve promise failed"); - } - Err(e) => { - let status = - unsafe { sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)) }; - debug_assert!(status == sys::Status::napi_ok, "Reject promise failed"); - } - }; -} diff --git a/crates/napi/src/tokio_runtime.rs b/crates/napi/src/tokio_runtime.rs index 2706f7c4..6073b1f5 100644 --- a/crates/napi/src/tokio_runtime.rs +++ b/crates/napi/src/tokio_runtime.rs @@ -9,7 +9,7 @@ use tokio::{ sync::mpsc::{self, error::TrySendError}, }; -use crate::{check_status, promise, sys, Result}; +use crate::{check_status, sys, JsDeferred, JsUnknown, NapiValue, Result}; pub(crate) static RT: Lazy<(Handle, mpsc::Sender<()>)> = Lazy::new(|| { let runtime = tokio::runtime::Runtime::new(); @@ -84,9 +84,16 @@ pub fn execute_tokio_future< check_status!(unsafe { sys::napi_create_promise(env, &mut deferred, &mut promise) })?; - let future_promise = promise::FuturePromise::new(env, deferred, resolver)?; - let future_to_resolve = promise::resolve_from_future(future_promise.start()?, fut); - spawn(future_to_resolve); + let (deferred, promise) = JsDeferred::new(env)?; - Ok(promise) + spawn(async move { + match fut.await { + Ok(v) => deferred.resolve(|env| { + resolver(env.raw(), v).map(|v| unsafe { JsUnknown::from_raw_unchecked(env.raw(), v) }) + }), + Err(e) => deferred.reject(e), + } + }); + + Ok(promise.0.value) } diff --git a/examples/napi-compat-mode/__test__/napi4/deferred.spec.ts b/examples/napi-compat-mode/__test__/napi4/deferred.spec.ts new file mode 100644 index 00000000..b67e2896 --- /dev/null +++ b/examples/napi-compat-mode/__test__/napi4/deferred.spec.ts @@ -0,0 +1,15 @@ +import test from 'ava' + +const bindings = require('../../index.node') + +test('should resolve deferred from background thread', async (t) => { + const promise = bindings.testDeferred(false) + t.assert(promise instanceof Promise) + + const result = await promise + t.is(result, 15) +}) + +test('should reject deferred from background thread', async (t) => { + await t.throwsAsync(() => bindings.testDeferred(true), { message: 'Fail' }) +}) diff --git a/examples/napi-compat-mode/src/napi4/deferred.rs b/examples/napi-compat-mode/src/napi4/deferred.rs new file mode 100644 index 00000000..7f33ffa6 --- /dev/null +++ b/examples/napi-compat-mode/src/napi4/deferred.rs @@ -0,0 +1,20 @@ +use std::thread; + +use napi::{CallContext, Error, JsObject, Result}; + +#[js_function(1)] +pub fn test_deferred(ctx: CallContext) -> Result { + let reject: bool = ctx.get(0)?; + let (deferred, promise) = ctx.env.create_deferred()?; + + thread::spawn(move || { + thread::sleep(std::time::Duration::from_millis(10)); + if reject { + deferred.reject(Error::from_reason("Fail")); + } else { + deferred.resolve(|_| Ok(15)); + } + }); + + Ok(promise) +} diff --git a/examples/napi-compat-mode/src/napi4/mod.rs b/examples/napi-compat-mode/src/napi4/mod.rs index 55cd071a..844ef940 100644 --- a/examples/napi-compat-mode/src/napi4/mod.rs +++ b/examples/napi-compat-mode/src/napi4/mod.rs @@ -1,5 +1,6 @@ use napi::{Env, JsObject, Result}; +mod deferred; mod tsfn; mod tsfn_dua_instance; @@ -23,6 +24,7 @@ pub fn register_js(exports: &mut JsObject, env: &Env) -> Result<()> { test_call_aborted_threadsafe_function, )?; exports.create_named_method("testTsfnWithRef", test_tsfn_with_ref)?; + exports.create_named_method("testDeferred", deferred::test_deferred)?; let obj = env.define_class("A", constructor, &[])?;