From 9118e9e62d47792f23aa72285f16ffcb090405fc Mon Sep 17 00:00:00 2001 From: LongYinan Date: Wed, 8 Jul 2020 00:59:09 +0800 Subject: [PATCH 1/2] feat(napi): implment tokio_rt feature --- napi-derive/src/lib.rs | 3 +- napi/Cargo.toml | 16 ++- napi/src/env.rs | 77 +++++++---- napi/src/error.rs | 13 +- napi/src/lib.rs | 30 ++++- napi/src/promise.rs | 139 ++++++++++++-------- napi/src/tokio_rt.rs | 49 +++++++ test_module/Cargo.toml | 2 +- test_module/__test__/napi4/tokio_rt.spec.js | 64 +++++++++ test_module/__test__/napi4/uv.spec.js | 2 +- test_module/src/lib.rs | 22 +++- test_module/src/tokio_rt/mod.rs | 3 + test_module/src/tokio_rt/read_file.rs | 31 +++++ 13 files changed, 357 insertions(+), 94 deletions(-) create mode 100644 napi/src/tokio_rt.rs create mode 100644 test_module/__test__/napi4/tokio_rt.spec.js create mode 100644 test_module/src/tokio_rt/mod.rs create mode 100644 test_module/src/tokio_rt/read_file.rs diff --git a/napi-derive/src/lib.rs b/napi-derive/src/lib.rs index fd3cc20e..85168638 100644 --- a/napi-derive/src/lib.rs +++ b/napi-derive/src/lib.rs @@ -89,6 +89,7 @@ pub fn js_function(attr: TokenStream, input: TokenStream) -> TokenStream { use std::mem; use std::os::raw::c_char; use std::ptr; + use std::ffi::CString; use napi_rs::{JsUnknown, Env, Status, NapiValue, CallContext}; let mut argc = #arg_len_span as usize; let mut raw_args = @@ -119,7 +120,7 @@ pub fn js_function(attr: TokenStream, input: TokenStream) -> TokenStream { Err(e) => { let message = format!("{}", e); unsafe { - napi_rs::sys::napi_throw_error(raw_env, ptr::null(), message.as_ptr() as *const c_char); + napi_rs::sys::napi_throw_error(raw_env, ptr::null(), CString::from_vec_unchecked(message.into()).as_ptr() as *const c_char); } let mut undefined = ptr::null_mut(); unsafe { napi_rs::sys::napi_get_undefined(raw_env, &mut undefined) }; diff --git a/napi/Cargo.toml b/napi/Cargo.toml index 365064a6..cd0e23b5 100644 --- a/napi/Cargo.toml +++ b/napi/Cargo.toml @@ -11,9 +11,21 @@ edition = "2018" [features] libuv = ["futures"] +tokio_rt = ["futures", "tokio", "once_cell"] + +[dependencies.futures] +version = "0.3" +optional = true + +[dependencies.tokio] +version = "0.2" +features = ["rt-core", "rt-threaded", "sync"] +optional = true + +[dependencies.once_cell] +version = "1.4" +optional = true -[dependencies] -futures = { version = "0.3", optional = true } [target.'cfg(windows)'.build-dependencies] flate2 = "1.0" diff --git a/napi/src/env.rs b/napi/src/env.rs index 14047573..2e46f0dc 100644 --- a/napi/src/env.rs +++ b/napi/src/env.rs @@ -1,21 +1,25 @@ -use crate::task::Task; use std::any::TypeId; use std::convert::TryInto; +use std::ffi::CString; use std::mem; -use std::os::raw::c_char; -use std::os::raw::c_void; +use std::os::raw::{c_char, c_void}; use std::ptr; use crate::error::check_status; use crate::js_values::*; +use crate::task::Task; use crate::{sys, AsyncWork, Error, NodeVersion, Result, Status}; -#[cfg(all(feature = "libuv", napi4))] +#[cfg(all(any(feature = "libuv", feature = "tokio_rt"), napi4))] use crate::promise; +#[cfg(all(feature = "tokio_rt", napi4))] +use crate::tokio_rt::{get_tokio_sender, Message}; #[cfg(all(feature = "libuv", napi4))] use crate::uv; #[cfg(all(feature = "libuv", napi4))] use std::future::Future; +#[cfg(all(feature = "tokio_rt", napi4))] +use tokio::sync::mpsc::error::TrySendError; pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value; @@ -277,9 +281,13 @@ impl Env { #[inline] pub fn throw_error(&self, msg: &str) -> Result<()> { - let status = unsafe { sys::napi_throw_error(self.0, ptr::null(), msg.as_ptr() as *const _) }; - check_status(status)?; - Ok(()) + check_status(unsafe { + sys::napi_throw_error( + self.0, + ptr::null(), + CString::from_vec_unchecked(msg.into()).as_ptr() as *const _, + ) + }) } #[inline] @@ -489,7 +497,7 @@ impl Env { #[cfg(all(feature = "libuv", napi4))] #[inline] pub fn execute< - T: 'static, + T: 'static + Send, V: 'static + NapiValue, F: 'static + Future>, R: 'static + Send + Sync + FnOnce(&mut Env, T) -> Result, @@ -498,8 +506,6 @@ impl Env { deferred: F, resolver: R, ) -> Result { - use futures::prelude::*; - let mut raw_promise = ptr::null_mut(); let mut raw_deferred = ptr::null_mut(); @@ -509,24 +515,49 @@ impl Env { } let event_loop = unsafe { sys::uv_default_loop() }; - let raw_env = self.0; - let future_to_execute = promise::resolve_from_future(self.0, deferred, resolver, raw_deferred) - .map(move |v| match v { - Ok(value) => value, - Err(e) => { - let cloned_error = e.clone(); - unsafe { - sys::napi_throw_error(raw_env, ptr::null(), e.reason.as_ptr() as *const _); - }; - eprintln!("{:?}", &cloned_error); - panic!(cloned_error); - } - }); + let future_promise = promise::FuturePromise::create(self.0, raw_deferred, Box::from(resolver))?; + let future_to_execute = promise::resolve_from_future(future_promise.start()?, deferred); uv::execute(event_loop, Box::pin(future_to_execute))?; Ok(JsObject::from_raw_unchecked(self.0, raw_promise)) } + #[cfg(all(feature = "tokio_rt", napi4))] + #[inline] + pub fn execute_tokio_future< + T: 'static + Send, + V: 'static + NapiValue, + F: 'static + Send + Future>, + R: 'static + Send + Sync + FnOnce(&mut Env, T) -> Result, + >( + &self, + fut: F, + resolver: R, + ) -> Result { + let mut raw_promise = ptr::null_mut(); + let mut raw_deferred = ptr::null_mut(); + check_status(unsafe { sys::napi_create_promise(self.0, &mut raw_deferred, &mut raw_promise) })?; + + let raw_env = self.0; + let future_promise = + promise::FuturePromise::create(raw_env, raw_deferred, Box::from(resolver))?; + let future_to_resolve = promise::resolve_from_future(future_promise.start()?, fut); + let mut sender = get_tokio_sender().clone(); + sender + .try_send(Message::Task(Box::pin(future_to_resolve))) + .map_err(|e| match e { + TrySendError::Full(_) => Error::new( + Status::QueueFull, + format!("Failed to run future: no available capacity"), + ), + TrySendError::Closed(_) => Error::new( + Status::Closing, + format!("Failed to run future: receiver closed"), + ), + })?; + Ok(JsObject::from_raw_unchecked(self.0, raw_promise)) + } + #[inline] pub fn get_node_version(&self) -> Result { let mut result = ptr::null(); diff --git a/napi/src/error.rs b/napi/src/error.rs index c0171913..2dcabbf3 100644 --- a/napi/src/error.rs +++ b/napi/src/error.rs @@ -62,8 +62,17 @@ impl Error { impl From for Error { fn from(error: std::ffi::NulError) -> Self { Error { - status: Status::StringExpected, - reason: format!("{:?}", error), + status: Status::GenericFailure, + reason: format!("{}", error), + } + } +} + +impl From for Error { + fn from(error: std::io::Error) -> Self { + Error { + status: Status::GenericFailure, + reason: format!("{}", error), } } } diff --git a/napi/src/lib.rs b/napi/src/lib.rs index 33c296b0..6abd9085 100644 --- a/napi/src/lib.rs +++ b/napi/src/lib.rs @@ -11,6 +11,8 @@ pub mod sys; mod task; #[cfg(napi4)] pub mod threadsafe_function; +#[cfg(all(feature = "tokio_rt", napi4))] +mod tokio_rt; #[cfg(all(feature = "libuv", napi4))] mod uv; mod version; @@ -26,19 +28,34 @@ pub use sys::napi_valuetype; pub use task::Task; pub use version::NodeVersion; +#[cfg(all(feature = "tokio_rt", napi4))] +pub use tokio_rt::shutdown as shutdown_tokio_rt; + #[macro_export] macro_rules! register_module { ($module_name:ident, $init:ident) => { + #[inline] + fn check_status(code: $crate::sys::napi_status) -> Result<()> { + let status = Status::from(code); + match status { + Status::Ok => Ok(()), + _ => Err(Error::from_status(status)), + } + } #[no_mangle] #[cfg_attr(target_os = "linux", link_section = ".ctors")] #[cfg_attr(target_os = "macos", link_section = "__DATA,__mod_init_func")] #[cfg_attr(target_os = "windows", link_section = ".CRT$XCU")] pub static __REGISTER_MODULE: extern "C" fn() = { + use std::ffi::CString; use std::io::Write; use std::os::raw::c_char; use std::ptr; use $crate::{sys, Env, JsObject, Module, NapiValue}; + #[cfg(all(feature = "tokio_rt", napi4))] + use $crate::shutdown_tokio_rt; + extern "C" fn register_module() { static mut MODULE_DESCRIPTOR: Option = None; unsafe { @@ -64,14 +81,23 @@ macro_rules! register_module { let mut cjs_module = Module { env, exports }; let result = $init(&mut cjs_module); - match result { + #[cfg(all(feature = "tokio_rt", napi4))] + let hook_result = check_status(unsafe { + sys::napi_add_env_cleanup_hook(raw_env, Some(shutdown_tokio_rt), ptr::null_mut()) + }); + + #[cfg(not(all(feature = "tokio_rt", napi4)))] + let hook_result = Ok(()); + + match hook_result.and_then(move |_| result) { Ok(_) => exports.into_raw(), Err(e) => { unsafe { sys::napi_throw_error( raw_env, ptr::null(), - format!("Error initializing module: {:?}", e).as_ptr() as *const _, + CString::from_vec_unchecked(format!("Error initializing module: {}", e).into()) + .as_ptr() as *const _, ) }; ptr::null_mut() diff --git a/napi/src/promise.rs b/napi/src/promise.rs index 0b13c319..e222d477 100644 --- a/napi/src/promise.rs +++ b/napi/src/promise.rs @@ -5,71 +5,90 @@ use std::ptr; use crate::error::check_status; use crate::{sys, Env, NapiValue, Result}; -struct FuturePromise { +pub struct FuturePromise { deferred: sys::napi_deferred, + env: sys::napi_env, + tsfn: sys::napi_threadsafe_function, + async_resource_name: sys::napi_value, resolver: Box Result>, } -#[inline] -pub async fn resolve_from_future< - T, - V: NapiValue, - R: FnOnce(&mut Env, T) -> Result + 'static, - F: Future>, ->( - env: sys::napi_env, - fut: F, - resolver: R, - raw_deferred: sys::napi_deferred, -) -> Result<()> { - let mut async_resource_name = ptr::null_mut(); - let s = "napi_resolve_promise_from_future"; - let status = unsafe { - sys::napi_create_string_utf8( - env, - s.as_ptr() as *const c_char, - s.len() as u64, - &mut async_resource_name, - ) - }; - check_status(status)?; +unsafe impl Send for FuturePromise {} - let initial_thread_count: u64 = 1; - let mut tsfn_value = ptr::null_mut(); - let future_promise = FuturePromise { - deferred: raw_deferred, - resolver: Box::from(resolver), - }; - let status = unsafe { - sys::napi_create_threadsafe_function( +impl FuturePromise { + pub fn create( + env: sys::napi_env, + raw_deferred: sys::napi_deferred, + resolver: Box Result>, + ) -> Result { + let mut async_resource_name = ptr::null_mut(); + let s = "napi_resolve_promise_from_future"; + check_status(unsafe { + sys::napi_create_string_utf8( + env, + s.as_ptr() as *const c_char, + s.len() as u64, + &mut async_resource_name, + ) + })?; + + Ok(FuturePromise { + deferred: raw_deferred, + resolver, env, - ptr::null_mut(), - ptr::null_mut(), + tsfn: ptr::null_mut(), async_resource_name, - 0, - initial_thread_count, - ptr::null_mut(), - None, - Box::leak(Box::from(future_promise)) as *mut _ as *mut c_void, - Some(call_js_cb::), - &mut tsfn_value, - ) - }; - check_status(status)?; - let val = fut.await?; + }) + } + + pub(crate) fn start(self) -> Result { + let mut tsfn_value = ptr::null_mut(); + let async_resource_name = self.async_resource_name; + let initial_thread_count: u64 = 1; + let env = self.env; + let self_ref = Box::leak(Box::from(self)); + check_status(unsafe { + sys::napi_create_threadsafe_function( + env, + ptr::null_mut(), + ptr::null_mut(), + async_resource_name, + 0, + initial_thread_count, + ptr::null_mut(), + None, + self_ref as *mut _ as *mut c_void, + Some(call_js_cb::), + &mut tsfn_value, + ) + })?; + self_ref.tsfn = tsfn_value; + Ok(TSFNValue(tsfn_value)) + } +} + +pub(crate) struct TSFNValue(sys::napi_threadsafe_function); + +unsafe impl Send for TSFNValue {} + +unsafe impl Sync for TSFNValue {} + +#[inline] +pub(crate) async fn resolve_from_future>>( + tsfn_value: TSFNValue, + fut: F, +) { + let val = fut.await; + check_status(unsafe { sys::napi_acquire_threadsafe_function(tsfn_value.0) }) + .expect("Failed to acquire thread safe function"); check_status(unsafe { sys::napi_call_threadsafe_function( - tsfn_value, + tsfn_value.0, Box::into_raw(Box::from(val)) as *mut _ as *mut c_void, sys::napi_threadsafe_function_call_mode::napi_tsfn_nonblocking, ) - })?; - check_status(unsafe { - sys::napi_release_threadsafe_function( - tsfn_value, - sys::napi_threadsafe_function_release_mode::napi_tsfn_release, - ) }) + .expect("Failed to call thread safe function"); } unsafe extern "C" fn call_js_cb( @@ -80,17 +99,27 @@ unsafe extern "C" fn call_js_cb( ) { let mut env = Env::from_raw(raw_env); let future_promise = Box::from_raw(context as *mut FuturePromise); - let value = ptr::read(data as *const _); - let js_value_to_resolve = (future_promise.resolver)(&mut env, value); + let value: Result = ptr::read(data as *const _); + let resolver = future_promise.resolver; let deferred = future_promise.deferred; + let tsfn = future_promise.tsfn; + let js_value_to_resolve = value.and_then(move |v| (resolver)(&mut env, v)); match js_value_to_resolve { Ok(v) => { let status = sys::napi_resolve_deferred(raw_env, deferred, v.raw_value()); - debug_assert!(status == sys::napi_status::napi_ok, "Resolve promise failed"); + debug_assert!( + status == sys::napi_status::napi_ok, + "Resolve promise failed" + ); } Err(e) => { let status = sys::napi_reject_deferred(raw_env, deferred, e.into_raw(raw_env)); debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed"); } }; + check_status(sys::napi_release_threadsafe_function( + tsfn, + sys::napi_threadsafe_function_release_mode::napi_tsfn_release, + )) + .expect("Release threadsafe function failed"); } diff --git a/napi/src/tokio_rt.rs b/napi/src/tokio_rt.rs new file mode 100644 index 00000000..e7432211 --- /dev/null +++ b/napi/src/tokio_rt.rs @@ -0,0 +1,49 @@ +use std::ffi::c_void; +use std::mem; +use std::pin::Pin; +use std::thread::spawn; +use std::time::Duration; + +use futures::future::Future; +use once_cell::sync::OnceCell; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; + +use crate::Error; + +pub(crate) enum Message { + Task(Pin + Send>>), + Shutdown, +} + +#[inline] +pub(crate) fn get_tokio_sender() -> &'static mpsc::Sender { + static SENDER: OnceCell> = OnceCell::new(); + SENDER.get_or_init(|| { + let (sender, mut receiver) = mpsc::channel(100); + spawn(move || { + let mut rt = Runtime::new().expect("Failed to create tokio runtime"); + rt.block_on(async { + loop { + match receiver.recv().await { + Some(Message::Task(fut)) => fut.await, + Some(Message::Shutdown) => break, + None => {} + } + } + }); + rt.shutdown_timeout(Duration::from_secs(5)); + mem::drop(receiver); + }); + + sender + }) +} + +pub unsafe extern "C" fn shutdown(_data: *mut c_void) { + let mut sender = get_tokio_sender().clone(); + sender + .try_send(Message::Shutdown) + .map_err(|e| Error::from_reason(format!("Shutdown tokio runtime failed: {}", e))) + .unwrap() +} diff --git a/test_module/Cargo.toml b/test_module/Cargo.toml index 4ed26aeb..9d8705a5 100644 --- a/test_module/Cargo.toml +++ b/test_module/Cargo.toml @@ -9,7 +9,7 @@ crate-type = ["cdylib"] [dependencies] futures = "0.3" -napi-rs = { path = "../napi", features = ["libuv"] } +napi-rs = { path = "../napi", features = ["libuv", "tokio_rt"] } napi-rs-derive = { path = "../napi-derive" } tokio = { version = "0.2", features = ["default", "fs"]} diff --git a/test_module/__test__/napi4/tokio_rt.spec.js b/test_module/__test__/napi4/tokio_rt.spec.js new file mode 100644 index 00000000..1873e550 --- /dev/null +++ b/test_module/__test__/napi4/tokio_rt.spec.js @@ -0,0 +1,64 @@ +const test = require('ava') +const { join } = require('path') +const { readFileSync } = require('fs') + +const bindings = require('../../index.node') +const napiVersion = require('../napi-version') + +const filepath = join(__dirname, './example.txt') + +test.serial('should execute future on tokio runtime', async (t) => { + if (napiVersion < 4) { + t.is(bindings.testExecuteTokioReadfile, undefined) + return + } + const fileContent = await bindings.testExecuteTokioReadfile(filepath) + t.true(Buffer.isBuffer(fileContent)) + t.deepEqual(readFileSync(filepath), fileContent) +}) + +test.serial('should reject error from tokio future', async (t) => { + if (napiVersion < 4) { + t.is(bindings.testTokioError, undefined) + return + } + try { + await bindings.testTokioError(filepath) + throw new TypeError('Unreachable') + } catch (e) { + t.is(e.message, 'Error from tokio future') + } +}) + +test.serial('should be able to execute future paralleled', async (t) => { + if (napiVersion < 4) { + t.is(bindings.testExecuteTokioReadfile, undefined) + return + } + const buffers = await Promise.all( + Array.from({ length: 50 }).map((_) => + bindings.testExecuteTokioReadfile(filepath), + ), + ) + for (const fileContent of buffers) { + t.true(Buffer.isBuffer(fileContent)) + t.deepEqual(readFileSync(filepath), fileContent) + } +}) + +test.serial('should reject if task queue is full', async (t) => { + if (napiVersion < 4) { + t.is(bindings.testExecuteTokioReadfile, undefined) + return + } + try { + await Promise.all( + Array.from({ length: 1000 * 1000 }).map((_) => + bindings.testExecuteTokioReadfile(filepath), + ), + ) + throw new TypeError('Unreachable') + } catch (e) { + t.is(e.message, 'QueueFull: Failed to run future: no available capacity') + } +}) diff --git a/test_module/__test__/napi4/uv.spec.js b/test_module/__test__/napi4/uv.spec.js index d31eeaae..b719a929 100644 --- a/test_module/__test__/napi4/uv.spec.js +++ b/test_module/__test__/napi4/uv.spec.js @@ -7,7 +7,7 @@ const napiVersion = require('../napi-version') const filepath = join(__dirname, './example.txt') -test('should call callback with the first arguments as an Error', async (t) => { +test('should execute future on libuv thread pool', async (t) => { if (napiVersion < 4) { t.is(bindings.uvReadFile, undefined) return diff --git a/test_module/src/lib.rs b/test_module/src/lib.rs index b01a595a..e72ec243 100644 --- a/test_module/src/lib.rs +++ b/test_module/src/lib.rs @@ -5,32 +5,36 @@ extern crate napi_rs_derive; use napi::{CallContext, Error, JsString, JsUnknown, Module, Result, Status}; -#[cfg(napi4)] -mod napi4; #[cfg(napi4)] mod libuv; +#[cfg(napi4)] +mod napi4; #[cfg(napi5)] mod napi5; +#[cfg(napi4)] +mod tokio_rt; mod buffer; -mod function; mod external; +mod function; +mod napi_version; mod symbol; mod task; -mod napi_version; use buffer::{buffer_to_string, get_buffer_length}; -use function::{call_function, call_function_with_this}; use external::{create_external, get_external_count}; +use function::{call_function, call_function_with_this}; +#[cfg(napi4)] +use libuv::read_file::uv_read_file; #[cfg(napi4)] use napi4::{test_threadsafe_function, test_tokio_readfile, test_tsfn_error}; #[cfg(napi5)] use napi5::is_date::test_object_is_date; +use napi_version::get_napi_version; use symbol::{create_named_symbol, create_symbol_from_js_string, create_unnamed_symbol}; use task::test_spawn_thread; #[cfg(napi4)] -use libuv::read_file::uv_read_file; -use napi_version::get_napi_version; +use tokio_rt::{error_from_tokio_future, test_execute_tokio_readfile}; register_module!(test_module, init); @@ -49,12 +53,16 @@ fn init(module: &mut Module) -> Result<()> { module.create_named_method("testCallFunction", call_function)?; module.create_named_method("testCallFunctionWithThis", call_function_with_this)?; #[cfg(napi4)] + module.create_named_method("testExecuteTokioReadfile", test_execute_tokio_readfile)?; + #[cfg(napi4)] module.create_named_method("testTsfnError", test_tsfn_error)?; #[cfg(napi4)] module.create_named_method("testThreadsafeFunction", test_threadsafe_function)?; #[cfg(napi4)] module.create_named_method("testTokioReadfile", test_tokio_readfile)?; #[cfg(napi4)] + module.create_named_method("testTokioError", error_from_tokio_future)?; + #[cfg(napi4)] module.create_named_method("uvReadFile", uv_read_file)?; #[cfg(napi5)] module.create_named_method("testObjectIsDate", test_object_is_date)?; diff --git a/test_module/src/tokio_rt/mod.rs b/test_module/src/tokio_rt/mod.rs new file mode 100644 index 00000000..5e6f6d4f --- /dev/null +++ b/test_module/src/tokio_rt/mod.rs @@ -0,0 +1,3 @@ +mod read_file; + +pub use read_file::*; diff --git a/test_module/src/tokio_rt/read_file.rs b/test_module/src/tokio_rt/read_file.rs new file mode 100644 index 00000000..de270e3b --- /dev/null +++ b/test_module/src/tokio_rt/read_file.rs @@ -0,0 +1,31 @@ +use futures::prelude::*; +use napi::{CallContext, Error, JsObject, JsString, Result, Status}; +use tokio; + +#[js_function(1)] +pub fn test_execute_tokio_readfile(ctx: CallContext) -> Result { + let js_filepath = ctx.get::(0)?; + let path_str = js_filepath.as_str()?; + ctx.env.execute_tokio_future( + tokio::fs::read(path_str.to_owned()) + .map(|v| v.map_err(|e| Error::new(Status::Unknown, format!("failed to read file, {}", e)))), + |&mut env, data| env.create_buffer_with_data(data), + ) +} + +#[js_function(1)] +pub fn error_from_tokio_future(ctx: CallContext) -> Result { + let js_filepath = ctx.get::(0)?; + let path_str = js_filepath.as_str()?; + ctx.env.execute_tokio_future( + tokio::fs::read(path_str.to_owned()) + .map_err(Error::from) + .and_then(|_| async move { + Err::, Error>(Error::new( + Status::Unknown, + "Error from tokio future".to_owned(), + )) + }), + |&mut env, data| env.create_buffer_with_data(data), + ) +} From 62482ab2e65d35094973fdd31f15e306b406c454 Mon Sep 17 00:00:00 2001 From: LongYinan Date: Tue, 14 Jul 2020 22:57:11 +0800 Subject: [PATCH 2/2] doc: add some documents --- napi/src/env.rs | 3 +- napi/src/error.rs | 2 +- napi/src/js_values/mod.rs | 8 +-- napi/src/lib.rs | 70 ++++++++++++++++++- napi/src/tokio_rt.rs | 9 ++- .../__test__/napi4/tokio_rt-isolate.spec.js | 27 +++++++ 6 files changed, 108 insertions(+), 11 deletions(-) create mode 100644 test_module/__test__/napi4/tokio_rt-isolate.spec.js diff --git a/napi/src/env.rs b/napi/src/env.rs index 2e46f0dc..2b4d2d98 100644 --- a/napi/src/env.rs +++ b/napi/src/env.rs @@ -5,10 +5,11 @@ use std::mem; use std::os::raw::{c_char, c_void}; use std::ptr; +use crate::async_work::AsyncWork; use crate::error::check_status; use crate::js_values::*; use crate::task::Task; -use crate::{sys, AsyncWork, Error, NodeVersion, Result, Status}; +use crate::{sys, Error, NodeVersion, Result, Status}; #[cfg(all(any(feature = "libuv", feature = "tokio_rt"), napi4))] use crate::promise; diff --git a/napi/src/error.rs b/napi/src/error.rs index 2dcabbf3..6c44603d 100644 --- a/napi/src/error.rs +++ b/napi/src/error.rs @@ -37,7 +37,7 @@ impl Error { } } - pub fn into_raw(self, env: sys::napi_env) -> sys::napi_value { + pub(crate) fn into_raw(self, env: sys::napi_env) -> sys::napi_value { let mut err = ptr::null_mut(); let s = self.reason; unsafe { diff --git a/napi/src/js_values/mod.rs b/napi/src/js_values/mod.rs index 3d3472d0..37eb34ae 100644 --- a/napi/src/js_values/mod.rs +++ b/napi/src/js_values/mod.rs @@ -25,9 +25,9 @@ pub use function::JsFunction; pub use number::JsNumber; pub use object::JsObject; pub use string::JsString; -pub use tagged_object::TaggedObject; -pub use value::Value; -pub use value_ref::Ref; +pub(crate) use tagged_object::TaggedObject; +pub(crate) use value::Value; +pub(crate) use value_ref::Ref; pub use value_type::ValueType; // Value types @@ -51,7 +51,7 @@ pub struct JsSymbol(pub(crate) Value); pub struct JsExternal(pub(crate) Value); #[inline] -pub fn type_of(env: sys::napi_env, raw_value: sys::napi_value) -> Result { +pub(crate) fn type_of(env: sys::napi_env, raw_value: sys::napi_value) -> Result { unsafe { let mut value_type = sys::napi_valuetype::napi_undefined; check_status(sys::napi_typeof(env, raw_value, &mut value_type))?; diff --git a/napi/src/lib.rs b/napi/src/lib.rs index 6abd9085..3a274512 100644 --- a/napi/src/lib.rs +++ b/napi/src/lib.rs @@ -1,3 +1,61 @@ +//! High level NodeJS [N-API](https://nodejs.org/api/n-api.html) binding +//! +//! **napi-rs** provides minimal overhead to write N-API modules in `Rust`. +//! ## Feature flags +//! ### libuv +//! With `libuv` feature, you can execute a rust future in `libuv` in NodeJS, and return a `promise` object. +//! ``` +//! use std::thread; +//! use std::fs; +//! +//! use futures::prelude::*; +//! use futures::channel::oneshot; +//! use napi::{CallContext, Result, JsString, JsObject, Status, Error}; +//! +//! #[js_function(1)] +//! pub fn uv_read_file(ctx: CallContext) -> Result { +//! let path = ctx.get::(0)?; +//! let (sender, receiver) = oneshot::channel(); +//! let p = path.as_str()?.to_owned(); +//! thread::spawn(|| { +//! let res = fs::read(p).map_err(|e| Error::new(Status::Unknown, format!("{}", e))); +//! sender.send(res).expect("Send data failed"); +//! }); +//! ctx.env.execute(receiver.map_err(|e| Error::new(Status::Unknown, format!("{}", e))).map(|x| x.and_then(|x| x)), |&mut env, data| { +//! env.create_buffer_with_data(data) +//! }) +//! } +//! ``` +//! ### tokio_rt +//! With `tokio_rt` feature, `napi-rs` provides a ***tokio runtime*** in an additional thread. +//! And you can easily run tokio `future` in it and return `promise`. +//! +//! ``` +//! use futures::prelude::*; +//! use napi::{CallContext, Error, JsObject, JsString, Result, Status}; +//! use tokio; +//! +//! #[js_function(1)] +//! pub fn tokio_readfile(ctx: CallContext) -> Result { +//! let js_filepath = ctx.get::(0)?; +//! let path_str = js_filepath.as_str()?; +//! ctx.env.execute_tokio_future( +//! tokio::fs::read(path_str.to_owned()) +//! .map(|v| v.map_err(|e| Error::new(Status::Unknown, format!("failed to read file, {}", e)))), +//! |&mut env, data| env.create_buffer_with_data(data), +//! ) +//! } +//! ``` +//! +//! ***Tokio channel in `napi-rs` buffer size is default `100`.*** +//! +//! ***You can adjust it via `NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE` environment variable*** +//! +//! ``` +//! NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE=1000 node ./app.js +//! ``` +//! + mod async_work; mod call_context; mod env; @@ -17,20 +75,28 @@ mod tokio_rt; mod uv; mod version; -pub use async_work::AsyncWork; pub use call_context::CallContext; pub use env::*; pub use error::{Error, Result}; pub use js_values::*; pub use module::Module; pub use status::Status; -pub use sys::napi_valuetype; pub use task::Task; pub use version::NodeVersion; #[cfg(all(feature = "tokio_rt", napi4))] pub use tokio_rt::shutdown as shutdown_tokio_rt; +/// register nodejs module +/// +/// ## Example +/// ``` +/// register_module!(test_module, init); +/// +/// fn init(module: &mut Module) -> Result<()> { +/// module.create_named_method("nativeFunction", native_function)?; +/// } +/// ``` #[macro_export] macro_rules! register_module { ($module_name:ident, $init:ident) => { diff --git a/napi/src/tokio_rt.rs b/napi/src/tokio_rt.rs index e7432211..387cee6e 100644 --- a/napi/src/tokio_rt.rs +++ b/napi/src/tokio_rt.rs @@ -1,5 +1,5 @@ +use std::env::var; use std::ffi::c_void; -use std::mem; use std::pin::Pin; use std::thread::spawn; use std::time::Duration; @@ -20,7 +20,11 @@ pub(crate) enum Message { pub(crate) fn get_tokio_sender() -> &'static mpsc::Sender { static SENDER: OnceCell> = OnceCell::new(); SENDER.get_or_init(|| { - let (sender, mut receiver) = mpsc::channel(100); + let buffer_size = var("NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE") + .map_err(|_| ()) + .and_then(|s| s.parse().map_err(|_| ())) + .unwrap_or(100); + let (sender, mut receiver) = mpsc::channel(buffer_size); spawn(move || { let mut rt = Runtime::new().expect("Failed to create tokio runtime"); rt.block_on(async { @@ -33,7 +37,6 @@ pub(crate) fn get_tokio_sender() -> &'static mpsc::Sender { } }); rt.shutdown_timeout(Duration::from_secs(5)); - mem::drop(receiver); }); sender diff --git a/test_module/__test__/napi4/tokio_rt-isolate.spec.js b/test_module/__test__/napi4/tokio_rt-isolate.spec.js new file mode 100644 index 00000000..ea01323c --- /dev/null +++ b/test_module/__test__/napi4/tokio_rt-isolate.spec.js @@ -0,0 +1,27 @@ +const test = require('ava') +const { join } = require('path') + +const napiVersion = require('../napi-version') + +const filepath = join(__dirname, './example.txt') + +process.env.NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE = '1' + +const bindings = require('../../index.node') + +test('should be able adjust queue size via process.env', async (t) => { + if (napiVersion < 4) { + t.is(bindings.testExecuteTokioReadfile, undefined) + return + } + try { + await Promise.all( + Array.from({ length: 50 }).map((_) => + bindings.testExecuteTokioReadfile(filepath), + ), + ) + throw new TypeError('Unreachable') + } catch (e) { + t.is(e.message, 'QueueFull: Failed to run future: no available capacity') + } +})