feat(napi): add threadsafe deferred values (#1306)

This commit is contained in:
Devon Govett 2022-10-02 22:00:48 -07:00 committed by GitHub
parent 5561502fd5
commit 5541d650a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 187 additions and 124 deletions

View file

@ -1109,6 +1109,14 @@ impl Env {
Ok(unsafe { JsObject::from_raw_unchecked(self.0, promise) }) Ok(unsafe { JsObject::from_raw_unchecked(self.0, promise) })
} }
/// Creates a deferred promise, which can be resolved or rejected from a background thread.
#[cfg(feature = "napi4")]
pub fn create_deferred<Data: ToNapiValue, Resolver: FnOnce(Env) -> Result<Data>>(
&self,
) -> Result<(JsDeferred<Data, Resolver>, JsObject)> {
JsDeferred::new(self.raw())
}
/// This API does not observe leap seconds; they are ignored, as ECMAScript aligns with POSIX time specification. /// This API does not observe leap seconds; they are ignored, as ECMAScript aligns with POSIX time specification.
/// ///
/// This API allocates a JavaScript Date object. /// This API allocates a JavaScript Date object.

View file

@ -0,0 +1,126 @@
use std::ffi::CStr;
use std::marker::PhantomData;
use std::os::raw::c_void;
use std::ptr;
use crate::bindgen_runtime::ToNapiValue;
use crate::{check_status, JsError, JsObject, Value};
use crate::{sys, Env, Error, Result};
pub struct JsDeferred<Data: ToNapiValue, Resolver: FnOnce(Env) -> Result<Data>> {
tsfn: sys::napi_threadsafe_function,
_data: PhantomData<Data>,
_resolver: PhantomData<Resolver>,
}
unsafe impl<Data: ToNapiValue, Resolver: FnOnce(Env) -> Result<Data>> Send
for JsDeferred<Data, Resolver>
{
}
impl<Data: ToNapiValue, Resolver: FnOnce(Env) -> Result<Data>> JsDeferred<Data, Resolver> {
pub(crate) fn new(env: sys::napi_env) -> Result<(Self, JsObject)> {
let mut raw_promise = ptr::null_mut();
let mut raw_deferred = ptr::null_mut();
check_status! {
unsafe { sys::napi_create_promise(env, &mut raw_deferred, &mut raw_promise) }
}?;
// Create a threadsafe function so we can call back into the JS thread when we are done.
let mut async_resource_name = ptr::null_mut();
let s = unsafe { CStr::from_bytes_with_nul_unchecked(b"napi_resolve_deferred\0") };
check_status!(unsafe {
sys::napi_create_string_utf8(env, s.as_ptr(), 22, &mut async_resource_name)
})?;
let mut tsfn = ptr::null_mut();
check_status! {unsafe {
sys::napi_create_threadsafe_function(
env,
ptr::null_mut(),
ptr::null_mut(),
async_resource_name,
0,
1,
ptr::null_mut(),
None,
raw_deferred as *mut c_void,
Some(napi_resolve_deferred::<Data, Resolver>),
&mut tsfn,
)
}}?;
let deferred = Self {
tsfn,
_data: PhantomData,
_resolver: PhantomData,
};
let promise = JsObject(Value {
env,
value: raw_promise,
value_type: crate::ValueType::Object,
});
Ok((deferred, promise))
}
/// Consumes the deferred, and resolves the promise. The provided function will be called
/// from the JavaScript thread, and should return the resolved value.
pub fn resolve(self, resolver: Resolver) {
self.call_tsfn(Ok(resolver))
}
/// Consumes the deferred, and rejects the promise with the provided error.
pub fn reject(self, error: Error) {
self.call_tsfn(Err(error))
}
fn call_tsfn(self, result: Result<Resolver>) {
// Call back into the JS thread via a threadsafe function. This results in napi_resolve_deferred being called.
let status = unsafe {
sys::napi_call_threadsafe_function(
self.tsfn,
Box::into_raw(Box::from(result)) as *mut c_void,
sys::ThreadsafeFunctionCallMode::nonblocking,
)
};
debug_assert!(
status == sys::Status::napi_ok,
"Call threadsafe function failed"
);
let status = unsafe {
sys::napi_release_threadsafe_function(self.tsfn, sys::ThreadsafeFunctionReleaseMode::release)
};
debug_assert!(
status == sys::Status::napi_ok,
"Release threadsafe function failed"
);
}
}
extern "C" fn napi_resolve_deferred<Data: ToNapiValue, Resolver: FnOnce(Env) -> Result<Data>>(
env: sys::napi_env,
_js_callback: sys::napi_value,
context: *mut c_void,
data: *mut c_void,
) {
let deferred = context as sys::napi_deferred;
let resolver = unsafe { Box::from_raw(data as *mut Result<Resolver>) };
let result = resolver
.and_then(|resolver| resolver(unsafe { Env::from_raw(env) }))
.and_then(|res| unsafe { ToNapiValue::to_napi_value(env, res) });
match result {
Ok(res) => {
let status = unsafe { sys::napi_resolve_deferred(env, deferred, res) };
debug_assert!(status == sys::Status::napi_ok, "Resolve promise failed");
}
Err(e) => {
let status =
unsafe { sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)) };
debug_assert!(status == sys::Status::napi_ok, "Reject promise failed");
}
}
}

View file

@ -19,6 +19,8 @@ mod boolean;
mod buffer; mod buffer;
#[cfg(feature = "napi5")] #[cfg(feature = "napi5")]
mod date; mod date;
#[cfg(feature = "napi4")]
mod deferred;
mod either; mod either;
mod escapable_handle_scope; mod escapable_handle_scope;
mod function; mod function;
@ -41,6 +43,8 @@ pub use buffer::*;
pub use date::*; pub use date::*;
#[cfg(feature = "serde-json")] #[cfg(feature = "serde-json")]
pub(crate) use de::De; pub(crate) use de::De;
#[cfg(feature = "napi4")]
pub use deferred::*;
pub use either::Either; pub use either::Either;
pub use escapable_handle_scope::EscapableHandleScope; pub use escapable_handle_scope::EscapableHandleScope;
pub use function::JsFunction; pub use function::JsFunction;

View file

@ -86,8 +86,6 @@ mod cleanup_env;
mod env; mod env;
mod error; mod error;
mod js_values; mod js_values;
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
mod promise;
mod status; mod status;
mod task; mod task;
#[cfg(all(feature = "tokio_rt", feature = "napi4"))] #[cfg(all(feature = "tokio_rt", feature = "napi4"))]

View file

@ -1,117 +0,0 @@
use std::ffi::CStr;
use std::future::Future;
use std::marker::PhantomData;
use std::os::raw::c_void;
use std::ptr;
use crate::{check_status, sys, JsError, Result};
pub struct FuturePromise<Data, Resolver: FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>> {
deferred: sys::napi_deferred,
env: sys::napi_env,
tsfn: sys::napi_threadsafe_function,
async_resource_name: sys::napi_value,
resolver: Resolver,
_data: PhantomData<Data>,
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<T, F: FnOnce(sys::napi_env, T) -> Result<sys::napi_value>> Send
for FuturePromise<T, F>
{
}
impl<Data, Resolver: FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>>
FuturePromise<Data, Resolver>
{
pub fn new(env: sys::napi_env, deferred: sys::napi_deferred, resolver: Resolver) -> Result<Self> {
let mut async_resource_name = ptr::null_mut();
let s = unsafe { CStr::from_bytes_with_nul_unchecked(b"napi_resolve_promise_from_future\0") };
check_status!(unsafe {
sys::napi_create_string_utf8(env, s.as_ptr(), 32, &mut async_resource_name)
})?;
Ok(FuturePromise {
deferred,
resolver,
env,
tsfn: ptr::null_mut(),
async_resource_name,
_data: PhantomData,
})
}
pub(crate) fn start(self) -> Result<TSFNValue> {
let mut tsfn_value = ptr::null_mut();
let async_resource_name = self.async_resource_name;
let env = self.env;
let self_ref = Box::leak(Box::from(self));
check_status!(unsafe {
sys::napi_create_threadsafe_function(
env,
ptr::null_mut(),
ptr::null_mut(),
async_resource_name,
0,
1,
ptr::null_mut(),
None,
self_ref as *mut FuturePromise<Data, Resolver> as *mut c_void,
Some(call_js_cb::<Data, Resolver>),
&mut tsfn_value,
)
})?;
self_ref.tsfn = tsfn_value;
Ok(TSFNValue(tsfn_value))
}
}
pub(crate) struct TSFNValue(sys::napi_threadsafe_function);
unsafe impl Send for TSFNValue {}
pub(crate) async fn resolve_from_future<Data: Send, Fut: Future<Output = Result<Data>>>(
tsfn_value: TSFNValue,
fut: Fut,
) {
let val = fut.await;
check_status!(unsafe {
sys::napi_call_threadsafe_function(
tsfn_value.0,
Box::into_raw(Box::from(val)) as *mut c_void,
sys::ThreadsafeFunctionCallMode::nonblocking,
)
})
.expect("Failed to call thread safe function");
check_status!(unsafe {
sys::napi_release_threadsafe_function(tsfn_value.0, sys::ThreadsafeFunctionReleaseMode::release)
})
.expect("Failed to release thread safe function");
}
unsafe extern "C" fn call_js_cb<
Data,
Resolver: FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
>(
env: sys::napi_env,
_js_callback: sys::napi_value,
context: *mut c_void,
data: *mut c_void,
) {
let future_promise = unsafe { Box::from_raw(context as *mut FuturePromise<Data, Resolver>) };
let value = unsafe { Box::from_raw(data as *mut Result<Data>) };
let resolver = future_promise.resolver;
let deferred = future_promise.deferred;
let js_value_to_resolve = value.and_then(move |v| (resolver)(env, v));
match js_value_to_resolve {
Ok(v) => {
let status = unsafe { sys::napi_resolve_deferred(env, deferred, v) };
debug_assert!(status == sys::Status::napi_ok, "Resolve promise failed");
}
Err(e) => {
let status =
unsafe { sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)) };
debug_assert!(status == sys::Status::napi_ok, "Reject promise failed");
}
};
}

View file

@ -9,7 +9,7 @@ use tokio::{
sync::mpsc::{self, error::TrySendError}, sync::mpsc::{self, error::TrySendError},
}; };
use crate::{check_status, promise, sys, Result}; use crate::{check_status, sys, JsDeferred, JsUnknown, NapiValue, Result};
pub(crate) static RT: Lazy<(Handle, mpsc::Sender<()>)> = Lazy::new(|| { pub(crate) static RT: Lazy<(Handle, mpsc::Sender<()>)> = Lazy::new(|| {
let runtime = tokio::runtime::Runtime::new(); let runtime = tokio::runtime::Runtime::new();
@ -84,9 +84,16 @@ pub fn execute_tokio_future<
check_status!(unsafe { sys::napi_create_promise(env, &mut deferred, &mut promise) })?; check_status!(unsafe { sys::napi_create_promise(env, &mut deferred, &mut promise) })?;
let future_promise = promise::FuturePromise::new(env, deferred, resolver)?; let (deferred, promise) = JsDeferred::new(env)?;
let future_to_resolve = promise::resolve_from_future(future_promise.start()?, fut);
spawn(future_to_resolve);
Ok(promise) spawn(async move {
match fut.await {
Ok(v) => deferred.resolve(|env| {
resolver(env.raw(), v).map(|v| unsafe { JsUnknown::from_raw_unchecked(env.raw(), v) })
}),
Err(e) => deferred.reject(e),
}
});
Ok(promise.0.value)
} }

View file

@ -0,0 +1,15 @@
import test from 'ava'
const bindings = require('../../index.node')
test('should resolve deferred from background thread', async (t) => {
const promise = bindings.testDeferred(false)
t.assert(promise instanceof Promise)
const result = await promise
t.is(result, 15)
})
test('should reject deferred from background thread', async (t) => {
await t.throwsAsync(() => bindings.testDeferred(true), { message: 'Fail' })
})

View file

@ -0,0 +1,20 @@
use std::thread;
use napi::{CallContext, Error, JsObject, Result};
#[js_function(1)]
pub fn test_deferred(ctx: CallContext) -> Result<JsObject> {
let reject: bool = ctx.get(0)?;
let (deferred, promise) = ctx.env.create_deferred()?;
thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(10));
if reject {
deferred.reject(Error::from_reason("Fail"));
} else {
deferred.resolve(|_| Ok(15));
}
});
Ok(promise)
}

View file

@ -1,5 +1,6 @@
use napi::{Env, JsObject, Result}; use napi::{Env, JsObject, Result};
mod deferred;
mod tsfn; mod tsfn;
mod tsfn_dua_instance; mod tsfn_dua_instance;
@ -23,6 +24,7 @@ pub fn register_js(exports: &mut JsObject, env: &Env) -> Result<()> {
test_call_aborted_threadsafe_function, test_call_aborted_threadsafe_function,
)?; )?;
exports.create_named_method("testTsfnWithRef", test_tsfn_with_ref)?; exports.create_named_method("testTsfnWithRef", test_tsfn_with_ref)?;
exports.create_named_method("testDeferred", deferred::test_deferred)?;
let obj = env.define_class("A", constructor, &[])?; let obj = env.define_class("A", constructor, &[])?;