diff --git a/napi-derive/src/lib.rs b/napi-derive/src/lib.rs index fd3cc20e..85168638 100644 --- a/napi-derive/src/lib.rs +++ b/napi-derive/src/lib.rs @@ -89,6 +89,7 @@ pub fn js_function(attr: TokenStream, input: TokenStream) -> TokenStream { use std::mem; use std::os::raw::c_char; use std::ptr; + use std::ffi::CString; use napi_rs::{JsUnknown, Env, Status, NapiValue, CallContext}; let mut argc = #arg_len_span as usize; let mut raw_args = @@ -119,7 +120,7 @@ pub fn js_function(attr: TokenStream, input: TokenStream) -> TokenStream { Err(e) => { let message = format!("{}", e); unsafe { - napi_rs::sys::napi_throw_error(raw_env, ptr::null(), message.as_ptr() as *const c_char); + napi_rs::sys::napi_throw_error(raw_env, ptr::null(), CString::from_vec_unchecked(message.into()).as_ptr() as *const c_char); } let mut undefined = ptr::null_mut(); unsafe { napi_rs::sys::napi_get_undefined(raw_env, &mut undefined) }; diff --git a/napi/Cargo.toml b/napi/Cargo.toml index 365064a6..cd0e23b5 100644 --- a/napi/Cargo.toml +++ b/napi/Cargo.toml @@ -11,9 +11,21 @@ edition = "2018" [features] libuv = ["futures"] +tokio_rt = ["futures", "tokio", "once_cell"] + +[dependencies.futures] +version = "0.3" +optional = true + +[dependencies.tokio] +version = "0.2" +features = ["rt-core", "rt-threaded", "sync"] +optional = true + +[dependencies.once_cell] +version = "1.4" +optional = true -[dependencies] -futures = { version = "0.3", optional = true } [target.'cfg(windows)'.build-dependencies] flate2 = "1.0" diff --git a/napi/src/env.rs b/napi/src/env.rs index 14047573..2e46f0dc 100644 --- a/napi/src/env.rs +++ b/napi/src/env.rs @@ -1,21 +1,25 @@ -use crate::task::Task; use std::any::TypeId; use std::convert::TryInto; +use std::ffi::CString; use std::mem; -use std::os::raw::c_char; -use std::os::raw::c_void; +use std::os::raw::{c_char, c_void}; use std::ptr; use crate::error::check_status; use crate::js_values::*; +use crate::task::Task; use crate::{sys, AsyncWork, Error, NodeVersion, Result, Status}; -#[cfg(all(feature = "libuv", napi4))] +#[cfg(all(any(feature = "libuv", feature = "tokio_rt"), napi4))] use crate::promise; +#[cfg(all(feature = "tokio_rt", napi4))] +use crate::tokio_rt::{get_tokio_sender, Message}; #[cfg(all(feature = "libuv", napi4))] use crate::uv; #[cfg(all(feature = "libuv", napi4))] use std::future::Future; +#[cfg(all(feature = "tokio_rt", napi4))] +use tokio::sync::mpsc::error::TrySendError; pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value; @@ -277,9 +281,13 @@ impl Env { #[inline] pub fn throw_error(&self, msg: &str) -> Result<()> { - let status = unsafe { sys::napi_throw_error(self.0, ptr::null(), msg.as_ptr() as *const _) }; - check_status(status)?; - Ok(()) + check_status(unsafe { + sys::napi_throw_error( + self.0, + ptr::null(), + CString::from_vec_unchecked(msg.into()).as_ptr() as *const _, + ) + }) } #[inline] @@ -489,7 +497,7 @@ impl Env { #[cfg(all(feature = "libuv", napi4))] #[inline] pub fn execute< - T: 'static, + T: 'static + Send, V: 'static + NapiValue, F: 'static + Future>, R: 'static + Send + Sync + FnOnce(&mut Env, T) -> Result, @@ -498,8 +506,6 @@ impl Env { deferred: F, resolver: R, ) -> Result { - use futures::prelude::*; - let mut raw_promise = ptr::null_mut(); let mut raw_deferred = ptr::null_mut(); @@ -509,24 +515,49 @@ impl Env { } let event_loop = unsafe { sys::uv_default_loop() }; - let raw_env = self.0; - let future_to_execute = promise::resolve_from_future(self.0, deferred, resolver, raw_deferred) - .map(move |v| match v { - Ok(value) => value, - Err(e) => { - let cloned_error = e.clone(); - unsafe { - sys::napi_throw_error(raw_env, ptr::null(), e.reason.as_ptr() as *const _); - }; - eprintln!("{:?}", &cloned_error); - panic!(cloned_error); - } - }); + let future_promise = promise::FuturePromise::create(self.0, raw_deferred, Box::from(resolver))?; + let future_to_execute = promise::resolve_from_future(future_promise.start()?, deferred); uv::execute(event_loop, Box::pin(future_to_execute))?; Ok(JsObject::from_raw_unchecked(self.0, raw_promise)) } + #[cfg(all(feature = "tokio_rt", napi4))] + #[inline] + pub fn execute_tokio_future< + T: 'static + Send, + V: 'static + NapiValue, + F: 'static + Send + Future>, + R: 'static + Send + Sync + FnOnce(&mut Env, T) -> Result, + >( + &self, + fut: F, + resolver: R, + ) -> Result { + let mut raw_promise = ptr::null_mut(); + let mut raw_deferred = ptr::null_mut(); + check_status(unsafe { sys::napi_create_promise(self.0, &mut raw_deferred, &mut raw_promise) })?; + + let raw_env = self.0; + let future_promise = + promise::FuturePromise::create(raw_env, raw_deferred, Box::from(resolver))?; + let future_to_resolve = promise::resolve_from_future(future_promise.start()?, fut); + let mut sender = get_tokio_sender().clone(); + sender + .try_send(Message::Task(Box::pin(future_to_resolve))) + .map_err(|e| match e { + TrySendError::Full(_) => Error::new( + Status::QueueFull, + format!("Failed to run future: no available capacity"), + ), + TrySendError::Closed(_) => Error::new( + Status::Closing, + format!("Failed to run future: receiver closed"), + ), + })?; + Ok(JsObject::from_raw_unchecked(self.0, raw_promise)) + } + #[inline] pub fn get_node_version(&self) -> Result { let mut result = ptr::null(); diff --git a/napi/src/error.rs b/napi/src/error.rs index c0171913..2dcabbf3 100644 --- a/napi/src/error.rs +++ b/napi/src/error.rs @@ -62,8 +62,17 @@ impl Error { impl From for Error { fn from(error: std::ffi::NulError) -> Self { Error { - status: Status::StringExpected, - reason: format!("{:?}", error), + status: Status::GenericFailure, + reason: format!("{}", error), + } + } +} + +impl From for Error { + fn from(error: std::io::Error) -> Self { + Error { + status: Status::GenericFailure, + reason: format!("{}", error), } } } diff --git a/napi/src/lib.rs b/napi/src/lib.rs index 33c296b0..6abd9085 100644 --- a/napi/src/lib.rs +++ b/napi/src/lib.rs @@ -11,6 +11,8 @@ pub mod sys; mod task; #[cfg(napi4)] pub mod threadsafe_function; +#[cfg(all(feature = "tokio_rt", napi4))] +mod tokio_rt; #[cfg(all(feature = "libuv", napi4))] mod uv; mod version; @@ -26,19 +28,34 @@ pub use sys::napi_valuetype; pub use task::Task; pub use version::NodeVersion; +#[cfg(all(feature = "tokio_rt", napi4))] +pub use tokio_rt::shutdown as shutdown_tokio_rt; + #[macro_export] macro_rules! register_module { ($module_name:ident, $init:ident) => { + #[inline] + fn check_status(code: $crate::sys::napi_status) -> Result<()> { + let status = Status::from(code); + match status { + Status::Ok => Ok(()), + _ => Err(Error::from_status(status)), + } + } #[no_mangle] #[cfg_attr(target_os = "linux", link_section = ".ctors")] #[cfg_attr(target_os = "macos", link_section = "__DATA,__mod_init_func")] #[cfg_attr(target_os = "windows", link_section = ".CRT$XCU")] pub static __REGISTER_MODULE: extern "C" fn() = { + use std::ffi::CString; use std::io::Write; use std::os::raw::c_char; use std::ptr; use $crate::{sys, Env, JsObject, Module, NapiValue}; + #[cfg(all(feature = "tokio_rt", napi4))] + use $crate::shutdown_tokio_rt; + extern "C" fn register_module() { static mut MODULE_DESCRIPTOR: Option = None; unsafe { @@ -64,14 +81,23 @@ macro_rules! register_module { let mut cjs_module = Module { env, exports }; let result = $init(&mut cjs_module); - match result { + #[cfg(all(feature = "tokio_rt", napi4))] + let hook_result = check_status(unsafe { + sys::napi_add_env_cleanup_hook(raw_env, Some(shutdown_tokio_rt), ptr::null_mut()) + }); + + #[cfg(not(all(feature = "tokio_rt", napi4)))] + let hook_result = Ok(()); + + match hook_result.and_then(move |_| result) { Ok(_) => exports.into_raw(), Err(e) => { unsafe { sys::napi_throw_error( raw_env, ptr::null(), - format!("Error initializing module: {:?}", e).as_ptr() as *const _, + CString::from_vec_unchecked(format!("Error initializing module: {}", e).into()) + .as_ptr() as *const _, ) }; ptr::null_mut() diff --git a/napi/src/promise.rs b/napi/src/promise.rs index 0b13c319..e222d477 100644 --- a/napi/src/promise.rs +++ b/napi/src/promise.rs @@ -5,71 +5,90 @@ use std::ptr; use crate::error::check_status; use crate::{sys, Env, NapiValue, Result}; -struct FuturePromise { +pub struct FuturePromise { deferred: sys::napi_deferred, + env: sys::napi_env, + tsfn: sys::napi_threadsafe_function, + async_resource_name: sys::napi_value, resolver: Box Result>, } -#[inline] -pub async fn resolve_from_future< - T, - V: NapiValue, - R: FnOnce(&mut Env, T) -> Result + 'static, - F: Future>, ->( - env: sys::napi_env, - fut: F, - resolver: R, - raw_deferred: sys::napi_deferred, -) -> Result<()> { - let mut async_resource_name = ptr::null_mut(); - let s = "napi_resolve_promise_from_future"; - let status = unsafe { - sys::napi_create_string_utf8( - env, - s.as_ptr() as *const c_char, - s.len() as u64, - &mut async_resource_name, - ) - }; - check_status(status)?; +unsafe impl Send for FuturePromise {} - let initial_thread_count: u64 = 1; - let mut tsfn_value = ptr::null_mut(); - let future_promise = FuturePromise { - deferred: raw_deferred, - resolver: Box::from(resolver), - }; - let status = unsafe { - sys::napi_create_threadsafe_function( +impl FuturePromise { + pub fn create( + env: sys::napi_env, + raw_deferred: sys::napi_deferred, + resolver: Box Result>, + ) -> Result { + let mut async_resource_name = ptr::null_mut(); + let s = "napi_resolve_promise_from_future"; + check_status(unsafe { + sys::napi_create_string_utf8( + env, + s.as_ptr() as *const c_char, + s.len() as u64, + &mut async_resource_name, + ) + })?; + + Ok(FuturePromise { + deferred: raw_deferred, + resolver, env, - ptr::null_mut(), - ptr::null_mut(), + tsfn: ptr::null_mut(), async_resource_name, - 0, - initial_thread_count, - ptr::null_mut(), - None, - Box::leak(Box::from(future_promise)) as *mut _ as *mut c_void, - Some(call_js_cb::), - &mut tsfn_value, - ) - }; - check_status(status)?; - let val = fut.await?; + }) + } + + pub(crate) fn start(self) -> Result { + let mut tsfn_value = ptr::null_mut(); + let async_resource_name = self.async_resource_name; + let initial_thread_count: u64 = 1; + let env = self.env; + let self_ref = Box::leak(Box::from(self)); + check_status(unsafe { + sys::napi_create_threadsafe_function( + env, + ptr::null_mut(), + ptr::null_mut(), + async_resource_name, + 0, + initial_thread_count, + ptr::null_mut(), + None, + self_ref as *mut _ as *mut c_void, + Some(call_js_cb::), + &mut tsfn_value, + ) + })?; + self_ref.tsfn = tsfn_value; + Ok(TSFNValue(tsfn_value)) + } +} + +pub(crate) struct TSFNValue(sys::napi_threadsafe_function); + +unsafe impl Send for TSFNValue {} + +unsafe impl Sync for TSFNValue {} + +#[inline] +pub(crate) async fn resolve_from_future>>( + tsfn_value: TSFNValue, + fut: F, +) { + let val = fut.await; + check_status(unsafe { sys::napi_acquire_threadsafe_function(tsfn_value.0) }) + .expect("Failed to acquire thread safe function"); check_status(unsafe { sys::napi_call_threadsafe_function( - tsfn_value, + tsfn_value.0, Box::into_raw(Box::from(val)) as *mut _ as *mut c_void, sys::napi_threadsafe_function_call_mode::napi_tsfn_nonblocking, ) - })?; - check_status(unsafe { - sys::napi_release_threadsafe_function( - tsfn_value, - sys::napi_threadsafe_function_release_mode::napi_tsfn_release, - ) }) + .expect("Failed to call thread safe function"); } unsafe extern "C" fn call_js_cb( @@ -80,17 +99,27 @@ unsafe extern "C" fn call_js_cb( ) { let mut env = Env::from_raw(raw_env); let future_promise = Box::from_raw(context as *mut FuturePromise); - let value = ptr::read(data as *const _); - let js_value_to_resolve = (future_promise.resolver)(&mut env, value); + let value: Result = ptr::read(data as *const _); + let resolver = future_promise.resolver; let deferred = future_promise.deferred; + let tsfn = future_promise.tsfn; + let js_value_to_resolve = value.and_then(move |v| (resolver)(&mut env, v)); match js_value_to_resolve { Ok(v) => { let status = sys::napi_resolve_deferred(raw_env, deferred, v.raw_value()); - debug_assert!(status == sys::napi_status::napi_ok, "Resolve promise failed"); + debug_assert!( + status == sys::napi_status::napi_ok, + "Resolve promise failed" + ); } Err(e) => { let status = sys::napi_reject_deferred(raw_env, deferred, e.into_raw(raw_env)); debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed"); } }; + check_status(sys::napi_release_threadsafe_function( + tsfn, + sys::napi_threadsafe_function_release_mode::napi_tsfn_release, + )) + .expect("Release threadsafe function failed"); } diff --git a/napi/src/tokio_rt.rs b/napi/src/tokio_rt.rs new file mode 100644 index 00000000..e7432211 --- /dev/null +++ b/napi/src/tokio_rt.rs @@ -0,0 +1,49 @@ +use std::ffi::c_void; +use std::mem; +use std::pin::Pin; +use std::thread::spawn; +use std::time::Duration; + +use futures::future::Future; +use once_cell::sync::OnceCell; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; + +use crate::Error; + +pub(crate) enum Message { + Task(Pin + Send>>), + Shutdown, +} + +#[inline] +pub(crate) fn get_tokio_sender() -> &'static mpsc::Sender { + static SENDER: OnceCell> = OnceCell::new(); + SENDER.get_or_init(|| { + let (sender, mut receiver) = mpsc::channel(100); + spawn(move || { + let mut rt = Runtime::new().expect("Failed to create tokio runtime"); + rt.block_on(async { + loop { + match receiver.recv().await { + Some(Message::Task(fut)) => fut.await, + Some(Message::Shutdown) => break, + None => {} + } + } + }); + rt.shutdown_timeout(Duration::from_secs(5)); + mem::drop(receiver); + }); + + sender + }) +} + +pub unsafe extern "C" fn shutdown(_data: *mut c_void) { + let mut sender = get_tokio_sender().clone(); + sender + .try_send(Message::Shutdown) + .map_err(|e| Error::from_reason(format!("Shutdown tokio runtime failed: {}", e))) + .unwrap() +} diff --git a/test_module/Cargo.toml b/test_module/Cargo.toml index 4ed26aeb..9d8705a5 100644 --- a/test_module/Cargo.toml +++ b/test_module/Cargo.toml @@ -9,7 +9,7 @@ crate-type = ["cdylib"] [dependencies] futures = "0.3" -napi-rs = { path = "../napi", features = ["libuv"] } +napi-rs = { path = "../napi", features = ["libuv", "tokio_rt"] } napi-rs-derive = { path = "../napi-derive" } tokio = { version = "0.2", features = ["default", "fs"]} diff --git a/test_module/__test__/napi4/tokio_rt.spec.js b/test_module/__test__/napi4/tokio_rt.spec.js new file mode 100644 index 00000000..1873e550 --- /dev/null +++ b/test_module/__test__/napi4/tokio_rt.spec.js @@ -0,0 +1,64 @@ +const test = require('ava') +const { join } = require('path') +const { readFileSync } = require('fs') + +const bindings = require('../../index.node') +const napiVersion = require('../napi-version') + +const filepath = join(__dirname, './example.txt') + +test.serial('should execute future on tokio runtime', async (t) => { + if (napiVersion < 4) { + t.is(bindings.testExecuteTokioReadfile, undefined) + return + } + const fileContent = await bindings.testExecuteTokioReadfile(filepath) + t.true(Buffer.isBuffer(fileContent)) + t.deepEqual(readFileSync(filepath), fileContent) +}) + +test.serial('should reject error from tokio future', async (t) => { + if (napiVersion < 4) { + t.is(bindings.testTokioError, undefined) + return + } + try { + await bindings.testTokioError(filepath) + throw new TypeError('Unreachable') + } catch (e) { + t.is(e.message, 'Error from tokio future') + } +}) + +test.serial('should be able to execute future paralleled', async (t) => { + if (napiVersion < 4) { + t.is(bindings.testExecuteTokioReadfile, undefined) + return + } + const buffers = await Promise.all( + Array.from({ length: 50 }).map((_) => + bindings.testExecuteTokioReadfile(filepath), + ), + ) + for (const fileContent of buffers) { + t.true(Buffer.isBuffer(fileContent)) + t.deepEqual(readFileSync(filepath), fileContent) + } +}) + +test.serial('should reject if task queue is full', async (t) => { + if (napiVersion < 4) { + t.is(bindings.testExecuteTokioReadfile, undefined) + return + } + try { + await Promise.all( + Array.from({ length: 1000 * 1000 }).map((_) => + bindings.testExecuteTokioReadfile(filepath), + ), + ) + throw new TypeError('Unreachable') + } catch (e) { + t.is(e.message, 'QueueFull: Failed to run future: no available capacity') + } +}) diff --git a/test_module/__test__/napi4/uv.spec.js b/test_module/__test__/napi4/uv.spec.js index d31eeaae..b719a929 100644 --- a/test_module/__test__/napi4/uv.spec.js +++ b/test_module/__test__/napi4/uv.spec.js @@ -7,7 +7,7 @@ const napiVersion = require('../napi-version') const filepath = join(__dirname, './example.txt') -test('should call callback with the first arguments as an Error', async (t) => { +test('should execute future on libuv thread pool', async (t) => { if (napiVersion < 4) { t.is(bindings.uvReadFile, undefined) return diff --git a/test_module/src/lib.rs b/test_module/src/lib.rs index b01a595a..e72ec243 100644 --- a/test_module/src/lib.rs +++ b/test_module/src/lib.rs @@ -5,32 +5,36 @@ extern crate napi_rs_derive; use napi::{CallContext, Error, JsString, JsUnknown, Module, Result, Status}; -#[cfg(napi4)] -mod napi4; #[cfg(napi4)] mod libuv; +#[cfg(napi4)] +mod napi4; #[cfg(napi5)] mod napi5; +#[cfg(napi4)] +mod tokio_rt; mod buffer; -mod function; mod external; +mod function; +mod napi_version; mod symbol; mod task; -mod napi_version; use buffer::{buffer_to_string, get_buffer_length}; -use function::{call_function, call_function_with_this}; use external::{create_external, get_external_count}; +use function::{call_function, call_function_with_this}; +#[cfg(napi4)] +use libuv::read_file::uv_read_file; #[cfg(napi4)] use napi4::{test_threadsafe_function, test_tokio_readfile, test_tsfn_error}; #[cfg(napi5)] use napi5::is_date::test_object_is_date; +use napi_version::get_napi_version; use symbol::{create_named_symbol, create_symbol_from_js_string, create_unnamed_symbol}; use task::test_spawn_thread; #[cfg(napi4)] -use libuv::read_file::uv_read_file; -use napi_version::get_napi_version; +use tokio_rt::{error_from_tokio_future, test_execute_tokio_readfile}; register_module!(test_module, init); @@ -49,12 +53,16 @@ fn init(module: &mut Module) -> Result<()> { module.create_named_method("testCallFunction", call_function)?; module.create_named_method("testCallFunctionWithThis", call_function_with_this)?; #[cfg(napi4)] + module.create_named_method("testExecuteTokioReadfile", test_execute_tokio_readfile)?; + #[cfg(napi4)] module.create_named_method("testTsfnError", test_tsfn_error)?; #[cfg(napi4)] module.create_named_method("testThreadsafeFunction", test_threadsafe_function)?; #[cfg(napi4)] module.create_named_method("testTokioReadfile", test_tokio_readfile)?; #[cfg(napi4)] + module.create_named_method("testTokioError", error_from_tokio_future)?; + #[cfg(napi4)] module.create_named_method("uvReadFile", uv_read_file)?; #[cfg(napi5)] module.create_named_method("testObjectIsDate", test_object_is_date)?; diff --git a/test_module/src/tokio_rt/mod.rs b/test_module/src/tokio_rt/mod.rs new file mode 100644 index 00000000..5e6f6d4f --- /dev/null +++ b/test_module/src/tokio_rt/mod.rs @@ -0,0 +1,3 @@ +mod read_file; + +pub use read_file::*; diff --git a/test_module/src/tokio_rt/read_file.rs b/test_module/src/tokio_rt/read_file.rs new file mode 100644 index 00000000..de270e3b --- /dev/null +++ b/test_module/src/tokio_rt/read_file.rs @@ -0,0 +1,31 @@ +use futures::prelude::*; +use napi::{CallContext, Error, JsObject, JsString, Result, Status}; +use tokio; + +#[js_function(1)] +pub fn test_execute_tokio_readfile(ctx: CallContext) -> Result { + let js_filepath = ctx.get::(0)?; + let path_str = js_filepath.as_str()?; + ctx.env.execute_tokio_future( + tokio::fs::read(path_str.to_owned()) + .map(|v| v.map_err(|e| Error::new(Status::Unknown, format!("failed to read file, {}", e)))), + |&mut env, data| env.create_buffer_with_data(data), + ) +} + +#[js_function(1)] +pub fn error_from_tokio_future(ctx: CallContext) -> Result { + let js_filepath = ctx.get::(0)?; + let path_str = js_filepath.as_str()?; + ctx.env.execute_tokio_future( + tokio::fs::read(path_str.to_owned()) + .map_err(Error::from) + .and_then(|_| async move { + Err::, Error>(Error::new( + Status::Unknown, + "Error from tokio future".to_owned(), + )) + }), + |&mut env, data| env.create_buffer_with_data(data), + ) +}