Merge pull request #94 from napi-rs/tokio-rt

feat(napi): implement tokio_rt feature to execute future on tokio runtime
This commit is contained in:
LongYinan 2020-07-14 23:26:53 +08:00 committed by GitHub
commit 3f2b6c230e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 462 additions and 102 deletions

View file

@ -89,6 +89,7 @@ pub fn js_function(attr: TokenStream, input: TokenStream) -> TokenStream {
use std::mem; use std::mem;
use std::os::raw::c_char; use std::os::raw::c_char;
use std::ptr; use std::ptr;
use std::ffi::CString;
use napi_rs::{JsUnknown, Env, Status, NapiValue, CallContext}; use napi_rs::{JsUnknown, Env, Status, NapiValue, CallContext};
let mut argc = #arg_len_span as usize; let mut argc = #arg_len_span as usize;
let mut raw_args = let mut raw_args =
@ -119,7 +120,7 @@ pub fn js_function(attr: TokenStream, input: TokenStream) -> TokenStream {
Err(e) => { Err(e) => {
let message = format!("{}", e); let message = format!("{}", e);
unsafe { unsafe {
napi_rs::sys::napi_throw_error(raw_env, ptr::null(), message.as_ptr() as *const c_char); napi_rs::sys::napi_throw_error(raw_env, ptr::null(), CString::from_vec_unchecked(message.into()).as_ptr() as *const c_char);
} }
let mut undefined = ptr::null_mut(); let mut undefined = ptr::null_mut();
unsafe { napi_rs::sys::napi_get_undefined(raw_env, &mut undefined) }; unsafe { napi_rs::sys::napi_get_undefined(raw_env, &mut undefined) };

View file

@ -11,9 +11,21 @@ edition = "2018"
[features] [features]
libuv = ["futures"] libuv = ["futures"]
tokio_rt = ["futures", "tokio", "once_cell"]
[dependencies.futures]
version = "0.3"
optional = true
[dependencies.tokio]
version = "0.2"
features = ["rt-core", "rt-threaded", "sync"]
optional = true
[dependencies.once_cell]
version = "1.4"
optional = true
[dependencies]
futures = { version = "0.3", optional = true }
[target.'cfg(windows)'.build-dependencies] [target.'cfg(windows)'.build-dependencies]
flate2 = "1.0" flate2 = "1.0"

View file

@ -1,21 +1,26 @@
use crate::task::Task;
use std::any::TypeId; use std::any::TypeId;
use std::convert::TryInto; use std::convert::TryInto;
use std::ffi::CString;
use std::mem; use std::mem;
use std::os::raw::c_char; use std::os::raw::{c_char, c_void};
use std::os::raw::c_void;
use std::ptr; use std::ptr;
use crate::async_work::AsyncWork;
use crate::error::check_status; use crate::error::check_status;
use crate::js_values::*; use crate::js_values::*;
use crate::{sys, AsyncWork, Error, NodeVersion, Result, Status}; use crate::task::Task;
use crate::{sys, Error, NodeVersion, Result, Status};
#[cfg(all(feature = "libuv", napi4))] #[cfg(all(any(feature = "libuv", feature = "tokio_rt"), napi4))]
use crate::promise; use crate::promise;
#[cfg(all(feature = "tokio_rt", napi4))]
use crate::tokio_rt::{get_tokio_sender, Message};
#[cfg(all(feature = "libuv", napi4))] #[cfg(all(feature = "libuv", napi4))]
use crate::uv; use crate::uv;
#[cfg(all(feature = "libuv", napi4))] #[cfg(all(feature = "libuv", napi4))]
use std::future::Future; use std::future::Future;
#[cfg(all(feature = "tokio_rt", napi4))]
use tokio::sync::mpsc::error::TrySendError;
pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value; pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value;
@ -277,9 +282,13 @@ impl Env {
#[inline] #[inline]
pub fn throw_error(&self, msg: &str) -> Result<()> { pub fn throw_error(&self, msg: &str) -> Result<()> {
let status = unsafe { sys::napi_throw_error(self.0, ptr::null(), msg.as_ptr() as *const _) }; check_status(unsafe {
check_status(status)?; sys::napi_throw_error(
Ok(()) self.0,
ptr::null(),
CString::from_vec_unchecked(msg.into()).as_ptr() as *const _,
)
})
} }
#[inline] #[inline]
@ -489,7 +498,7 @@ impl Env {
#[cfg(all(feature = "libuv", napi4))] #[cfg(all(feature = "libuv", napi4))]
#[inline] #[inline]
pub fn execute< pub fn execute<
T: 'static, T: 'static + Send,
V: 'static + NapiValue, V: 'static + NapiValue,
F: 'static + Future<Output = Result<T>>, F: 'static + Future<Output = Result<T>>,
R: 'static + Send + Sync + FnOnce(&mut Env, T) -> Result<V>, R: 'static + Send + Sync + FnOnce(&mut Env, T) -> Result<V>,
@ -498,8 +507,6 @@ impl Env {
deferred: F, deferred: F,
resolver: R, resolver: R,
) -> Result<JsObject> { ) -> Result<JsObject> {
use futures::prelude::*;
let mut raw_promise = ptr::null_mut(); let mut raw_promise = ptr::null_mut();
let mut raw_deferred = ptr::null_mut(); let mut raw_deferred = ptr::null_mut();
@ -509,24 +516,49 @@ impl Env {
} }
let event_loop = unsafe { sys::uv_default_loop() }; let event_loop = unsafe { sys::uv_default_loop() };
let raw_env = self.0; let future_promise = promise::FuturePromise::create(self.0, raw_deferred, Box::from(resolver))?;
let future_to_execute = promise::resolve_from_future(self.0, deferred, resolver, raw_deferred) let future_to_execute = promise::resolve_from_future(future_promise.start()?, deferred);
.map(move |v| match v {
Ok(value) => value,
Err(e) => {
let cloned_error = e.clone();
unsafe {
sys::napi_throw_error(raw_env, ptr::null(), e.reason.as_ptr() as *const _);
};
eprintln!("{:?}", &cloned_error);
panic!(cloned_error);
}
});
uv::execute(event_loop, Box::pin(future_to_execute))?; uv::execute(event_loop, Box::pin(future_to_execute))?;
Ok(JsObject::from_raw_unchecked(self.0, raw_promise)) Ok(JsObject::from_raw_unchecked(self.0, raw_promise))
} }
#[cfg(all(feature = "tokio_rt", napi4))]
#[inline]
pub fn execute_tokio_future<
T: 'static + Send,
V: 'static + NapiValue,
F: 'static + Send + Future<Output = Result<T>>,
R: 'static + Send + Sync + FnOnce(&mut Env, T) -> Result<V>,
>(
&self,
fut: F,
resolver: R,
) -> Result<JsObject> {
let mut raw_promise = ptr::null_mut();
let mut raw_deferred = ptr::null_mut();
check_status(unsafe { sys::napi_create_promise(self.0, &mut raw_deferred, &mut raw_promise) })?;
let raw_env = self.0;
let future_promise =
promise::FuturePromise::create(raw_env, raw_deferred, Box::from(resolver))?;
let future_to_resolve = promise::resolve_from_future(future_promise.start()?, fut);
let mut sender = get_tokio_sender().clone();
sender
.try_send(Message::Task(Box::pin(future_to_resolve)))
.map_err(|e| match e {
TrySendError::Full(_) => Error::new(
Status::QueueFull,
format!("Failed to run future: no available capacity"),
),
TrySendError::Closed(_) => Error::new(
Status::Closing,
format!("Failed to run future: receiver closed"),
),
})?;
Ok(JsObject::from_raw_unchecked(self.0, raw_promise))
}
#[inline] #[inline]
pub fn get_node_version(&self) -> Result<NodeVersion> { pub fn get_node_version(&self) -> Result<NodeVersion> {
let mut result = ptr::null(); let mut result = ptr::null();

View file

@ -37,7 +37,7 @@ impl Error {
} }
} }
pub fn into_raw(self, env: sys::napi_env) -> sys::napi_value { pub(crate) fn into_raw(self, env: sys::napi_env) -> sys::napi_value {
let mut err = ptr::null_mut(); let mut err = ptr::null_mut();
let s = self.reason; let s = self.reason;
unsafe { unsafe {
@ -62,8 +62,17 @@ impl Error {
impl From<std::ffi::NulError> for Error { impl From<std::ffi::NulError> for Error {
fn from(error: std::ffi::NulError) -> Self { fn from(error: std::ffi::NulError) -> Self {
Error { Error {
status: Status::StringExpected, status: Status::GenericFailure,
reason: format!("{:?}", error), reason: format!("{}", error),
}
}
}
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Error {
status: Status::GenericFailure,
reason: format!("{}", error),
} }
} }
} }

View file

@ -25,9 +25,9 @@ pub use function::JsFunction;
pub use number::JsNumber; pub use number::JsNumber;
pub use object::JsObject; pub use object::JsObject;
pub use string::JsString; pub use string::JsString;
pub use tagged_object::TaggedObject; pub(crate) use tagged_object::TaggedObject;
pub use value::Value; pub(crate) use value::Value;
pub use value_ref::Ref; pub(crate) use value_ref::Ref;
pub use value_type::ValueType; pub use value_type::ValueType;
// Value types // Value types
@ -51,7 +51,7 @@ pub struct JsSymbol(pub(crate) Value);
pub struct JsExternal(pub(crate) Value); pub struct JsExternal(pub(crate) Value);
#[inline] #[inline]
pub fn type_of(env: sys::napi_env, raw_value: sys::napi_value) -> Result<ValueType> { pub(crate) fn type_of(env: sys::napi_env, raw_value: sys::napi_value) -> Result<ValueType> {
unsafe { unsafe {
let mut value_type = sys::napi_valuetype::napi_undefined; let mut value_type = sys::napi_valuetype::napi_undefined;
check_status(sys::napi_typeof(env, raw_value, &mut value_type))?; check_status(sys::napi_typeof(env, raw_value, &mut value_type))?;

View file

@ -1,3 +1,61 @@
//! High level NodeJS [N-API](https://nodejs.org/api/n-api.html) binding
//!
//! **napi-rs** provides minimal overhead to write N-API modules in `Rust`.
//! ## Feature flags
//! ### libuv
//! With `libuv` feature, you can execute a rust future in `libuv` in NodeJS, and return a `promise` object.
//! ```
//! use std::thread;
//! use std::fs;
//!
//! use futures::prelude::*;
//! use futures::channel::oneshot;
//! use napi::{CallContext, Result, JsString, JsObject, Status, Error};
//!
//! #[js_function(1)]
//! pub fn uv_read_file(ctx: CallContext) -> Result<JsObject> {
//! let path = ctx.get::<JsString>(0)?;
//! let (sender, receiver) = oneshot::channel();
//! let p = path.as_str()?.to_owned();
//! thread::spawn(|| {
//! let res = fs::read(p).map_err(|e| Error::new(Status::Unknown, format!("{}", e)));
//! sender.send(res).expect("Send data failed");
//! });
//! ctx.env.execute(receiver.map_err(|e| Error::new(Status::Unknown, format!("{}", e))).map(|x| x.and_then(|x| x)), |&mut env, data| {
//! env.create_buffer_with_data(data)
//! })
//! }
//! ```
//! ### tokio_rt
//! With `tokio_rt` feature, `napi-rs` provides a ***tokio runtime*** in an additional thread.
//! And you can easily run tokio `future` in it and return `promise`.
//!
//! ```
//! use futures::prelude::*;
//! use napi::{CallContext, Error, JsObject, JsString, Result, Status};
//! use tokio;
//!
//! #[js_function(1)]
//! pub fn tokio_readfile(ctx: CallContext) -> Result<JsObject> {
//! let js_filepath = ctx.get::<JsString>(0)?;
//! let path_str = js_filepath.as_str()?;
//! ctx.env.execute_tokio_future(
//! tokio::fs::read(path_str.to_owned())
//! .map(|v| v.map_err(|e| Error::new(Status::Unknown, format!("failed to read file, {}", e)))),
//! |&mut env, data| env.create_buffer_with_data(data),
//! )
//! }
//! ```
//!
//! ***Tokio channel in `napi-rs` buffer size is default `100`.***
//!
//! ***You can adjust it via `NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE` environment variable***
//!
//! ```
//! NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE=1000 node ./app.js
//! ```
//!
mod async_work; mod async_work;
mod call_context; mod call_context;
mod env; mod env;
@ -11,34 +69,59 @@ pub mod sys;
mod task; mod task;
#[cfg(napi4)] #[cfg(napi4)]
pub mod threadsafe_function; pub mod threadsafe_function;
#[cfg(all(feature = "tokio_rt", napi4))]
mod tokio_rt;
#[cfg(all(feature = "libuv", napi4))] #[cfg(all(feature = "libuv", napi4))]
mod uv; mod uv;
mod version; mod version;
pub use async_work::AsyncWork;
pub use call_context::CallContext; pub use call_context::CallContext;
pub use env::*; pub use env::*;
pub use error::{Error, Result}; pub use error::{Error, Result};
pub use js_values::*; pub use js_values::*;
pub use module::Module; pub use module::Module;
pub use status::Status; pub use status::Status;
pub use sys::napi_valuetype;
pub use task::Task; pub use task::Task;
pub use version::NodeVersion; pub use version::NodeVersion;
#[cfg(all(feature = "tokio_rt", napi4))]
pub use tokio_rt::shutdown as shutdown_tokio_rt;
/// register nodejs module
///
/// ## Example
/// ```
/// register_module!(test_module, init);
///
/// fn init(module: &mut Module) -> Result<()> {
/// module.create_named_method("nativeFunction", native_function)?;
/// }
/// ```
#[macro_export] #[macro_export]
macro_rules! register_module { macro_rules! register_module {
($module_name:ident, $init:ident) => { ($module_name:ident, $init:ident) => {
#[inline]
fn check_status(code: $crate::sys::napi_status) -> Result<()> {
let status = Status::from(code);
match status {
Status::Ok => Ok(()),
_ => Err(Error::from_status(status)),
}
}
#[no_mangle] #[no_mangle]
#[cfg_attr(target_os = "linux", link_section = ".ctors")] #[cfg_attr(target_os = "linux", link_section = ".ctors")]
#[cfg_attr(target_os = "macos", link_section = "__DATA,__mod_init_func")] #[cfg_attr(target_os = "macos", link_section = "__DATA,__mod_init_func")]
#[cfg_attr(target_os = "windows", link_section = ".CRT$XCU")] #[cfg_attr(target_os = "windows", link_section = ".CRT$XCU")]
pub static __REGISTER_MODULE: extern "C" fn() = { pub static __REGISTER_MODULE: extern "C" fn() = {
use std::ffi::CString;
use std::io::Write; use std::io::Write;
use std::os::raw::c_char; use std::os::raw::c_char;
use std::ptr; use std::ptr;
use $crate::{sys, Env, JsObject, Module, NapiValue}; use $crate::{sys, Env, JsObject, Module, NapiValue};
#[cfg(all(feature = "tokio_rt", napi4))]
use $crate::shutdown_tokio_rt;
extern "C" fn register_module() { extern "C" fn register_module() {
static mut MODULE_DESCRIPTOR: Option<sys::napi_module> = None; static mut MODULE_DESCRIPTOR: Option<sys::napi_module> = None;
unsafe { unsafe {
@ -64,14 +147,23 @@ macro_rules! register_module {
let mut cjs_module = Module { env, exports }; let mut cjs_module = Module { env, exports };
let result = $init(&mut cjs_module); let result = $init(&mut cjs_module);
match result { #[cfg(all(feature = "tokio_rt", napi4))]
let hook_result = check_status(unsafe {
sys::napi_add_env_cleanup_hook(raw_env, Some(shutdown_tokio_rt), ptr::null_mut())
});
#[cfg(not(all(feature = "tokio_rt", napi4)))]
let hook_result = Ok(());
match hook_result.and_then(move |_| result) {
Ok(_) => exports.into_raw(), Ok(_) => exports.into_raw(),
Err(e) => { Err(e) => {
unsafe { unsafe {
sys::napi_throw_error( sys::napi_throw_error(
raw_env, raw_env,
ptr::null(), ptr::null(),
format!("Error initializing module: {:?}", e).as_ptr() as *const _, CString::from_vec_unchecked(format!("Error initializing module: {}", e).into())
.as_ptr() as *const _,
) )
}; };
ptr::null_mut() ptr::null_mut()

View file

@ -5,71 +5,90 @@ use std::ptr;
use crate::error::check_status; use crate::error::check_status;
use crate::{sys, Env, NapiValue, Result}; use crate::{sys, Env, NapiValue, Result};
struct FuturePromise<T, V: NapiValue> { pub struct FuturePromise<T, V: NapiValue> {
deferred: sys::napi_deferred, deferred: sys::napi_deferred,
env: sys::napi_env,
tsfn: sys::napi_threadsafe_function,
async_resource_name: sys::napi_value,
resolver: Box<dyn FnOnce(&mut Env, T) -> Result<V>>, resolver: Box<dyn FnOnce(&mut Env, T) -> Result<V>>,
} }
#[inline] unsafe impl<T, V: NapiValue> Send for FuturePromise<T, V> {}
pub async fn resolve_from_future<
T,
V: NapiValue,
R: FnOnce(&mut Env, T) -> Result<V> + 'static,
F: Future<Output = Result<T>>,
>(
env: sys::napi_env,
fut: F,
resolver: R,
raw_deferred: sys::napi_deferred,
) -> Result<()> {
let mut async_resource_name = ptr::null_mut();
let s = "napi_resolve_promise_from_future";
let status = unsafe {
sys::napi_create_string_utf8(
env,
s.as_ptr() as *const c_char,
s.len() as u64,
&mut async_resource_name,
)
};
check_status(status)?;
let initial_thread_count: u64 = 1; impl<T, V: NapiValue> FuturePromise<T, V> {
let mut tsfn_value = ptr::null_mut(); pub fn create(
let future_promise = FuturePromise { env: sys::napi_env,
deferred: raw_deferred, raw_deferred: sys::napi_deferred,
resolver: Box::from(resolver), resolver: Box<dyn FnOnce(&mut Env, T) -> Result<V>>,
}; ) -> Result<Self> {
let status = unsafe { let mut async_resource_name = ptr::null_mut();
sys::napi_create_threadsafe_function( let s = "napi_resolve_promise_from_future";
check_status(unsafe {
sys::napi_create_string_utf8(
env,
s.as_ptr() as *const c_char,
s.len() as u64,
&mut async_resource_name,
)
})?;
Ok(FuturePromise {
deferred: raw_deferred,
resolver,
env, env,
ptr::null_mut(), tsfn: ptr::null_mut(),
ptr::null_mut(),
async_resource_name, async_resource_name,
0, })
initial_thread_count, }
ptr::null_mut(),
None, pub(crate) fn start(self) -> Result<TSFNValue> {
Box::leak(Box::from(future_promise)) as *mut _ as *mut c_void, let mut tsfn_value = ptr::null_mut();
Some(call_js_cb::<T, V>), let async_resource_name = self.async_resource_name;
&mut tsfn_value, let initial_thread_count: u64 = 1;
) let env = self.env;
}; let self_ref = Box::leak(Box::from(self));
check_status(status)?; check_status(unsafe {
let val = fut.await?; sys::napi_create_threadsafe_function(
env,
ptr::null_mut(),
ptr::null_mut(),
async_resource_name,
0,
initial_thread_count,
ptr::null_mut(),
None,
self_ref as *mut _ as *mut c_void,
Some(call_js_cb::<T, V>),
&mut tsfn_value,
)
})?;
self_ref.tsfn = tsfn_value;
Ok(TSFNValue(tsfn_value))
}
}
pub(crate) struct TSFNValue(sys::napi_threadsafe_function);
unsafe impl Send for TSFNValue {}
unsafe impl Sync for TSFNValue {}
#[inline]
pub(crate) async fn resolve_from_future<T: Send, F: Future<Output = Result<T>>>(
tsfn_value: TSFNValue,
fut: F,
) {
let val = fut.await;
check_status(unsafe { sys::napi_acquire_threadsafe_function(tsfn_value.0) })
.expect("Failed to acquire thread safe function");
check_status(unsafe { check_status(unsafe {
sys::napi_call_threadsafe_function( sys::napi_call_threadsafe_function(
tsfn_value, tsfn_value.0,
Box::into_raw(Box::from(val)) as *mut _ as *mut c_void, Box::into_raw(Box::from(val)) as *mut _ as *mut c_void,
sys::napi_threadsafe_function_call_mode::napi_tsfn_nonblocking, sys::napi_threadsafe_function_call_mode::napi_tsfn_nonblocking,
) )
})?;
check_status(unsafe {
sys::napi_release_threadsafe_function(
tsfn_value,
sys::napi_threadsafe_function_release_mode::napi_tsfn_release,
)
}) })
.expect("Failed to call thread safe function");
} }
unsafe extern "C" fn call_js_cb<T, V: NapiValue>( unsafe extern "C" fn call_js_cb<T, V: NapiValue>(
@ -80,17 +99,27 @@ unsafe extern "C" fn call_js_cb<T, V: NapiValue>(
) { ) {
let mut env = Env::from_raw(raw_env); let mut env = Env::from_raw(raw_env);
let future_promise = Box::from_raw(context as *mut FuturePromise<T, V>); let future_promise = Box::from_raw(context as *mut FuturePromise<T, V>);
let value = ptr::read(data as *const _); let value: Result<T> = ptr::read(data as *const _);
let js_value_to_resolve = (future_promise.resolver)(&mut env, value); let resolver = future_promise.resolver;
let deferred = future_promise.deferred; let deferred = future_promise.deferred;
let tsfn = future_promise.tsfn;
let js_value_to_resolve = value.and_then(move |v| (resolver)(&mut env, v));
match js_value_to_resolve { match js_value_to_resolve {
Ok(v) => { Ok(v) => {
let status = sys::napi_resolve_deferred(raw_env, deferred, v.raw_value()); let status = sys::napi_resolve_deferred(raw_env, deferred, v.raw_value());
debug_assert!(status == sys::napi_status::napi_ok, "Resolve promise failed"); debug_assert!(
status == sys::napi_status::napi_ok,
"Resolve promise failed"
);
} }
Err(e) => { Err(e) => {
let status = sys::napi_reject_deferred(raw_env, deferred, e.into_raw(raw_env)); let status = sys::napi_reject_deferred(raw_env, deferred, e.into_raw(raw_env));
debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed"); debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed");
} }
}; };
check_status(sys::napi_release_threadsafe_function(
tsfn,
sys::napi_threadsafe_function_release_mode::napi_tsfn_release,
))
.expect("Release threadsafe function failed");
} }

52
napi/src/tokio_rt.rs Normal file
View file

@ -0,0 +1,52 @@
use std::env::var;
use std::ffi::c_void;
use std::pin::Pin;
use std::thread::spawn;
use std::time::Duration;
use futures::future::Future;
use once_cell::sync::OnceCell;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use crate::Error;
pub(crate) enum Message {
Task(Pin<Box<dyn Future<Output = ()> + Send>>),
Shutdown,
}
#[inline]
pub(crate) fn get_tokio_sender() -> &'static mpsc::Sender<Message> {
static SENDER: OnceCell<mpsc::Sender<Message>> = OnceCell::new();
SENDER.get_or_init(|| {
let buffer_size = var("NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE")
.map_err(|_| ())
.and_then(|s| s.parse().map_err(|_| ()))
.unwrap_or(100);
let (sender, mut receiver) = mpsc::channel(buffer_size);
spawn(move || {
let mut rt = Runtime::new().expect("Failed to create tokio runtime");
rt.block_on(async {
loop {
match receiver.recv().await {
Some(Message::Task(fut)) => fut.await,
Some(Message::Shutdown) => break,
None => {}
}
}
});
rt.shutdown_timeout(Duration::from_secs(5));
});
sender
})
}
pub unsafe extern "C" fn shutdown(_data: *mut c_void) {
let mut sender = get_tokio_sender().clone();
sender
.try_send(Message::Shutdown)
.map_err(|e| Error::from_reason(format!("Shutdown tokio runtime failed: {}", e)))
.unwrap()
}

View file

@ -9,7 +9,7 @@ crate-type = ["cdylib"]
[dependencies] [dependencies]
futures = "0.3" futures = "0.3"
napi-rs = { path = "../napi", features = ["libuv"] } napi-rs = { path = "../napi", features = ["libuv", "tokio_rt"] }
napi-rs-derive = { path = "../napi-derive" } napi-rs-derive = { path = "../napi-derive" }
tokio = { version = "0.2", features = ["default", "fs"]} tokio = { version = "0.2", features = ["default", "fs"]}

View file

@ -0,0 +1,27 @@
const test = require('ava')
const { join } = require('path')
const napiVersion = require('../napi-version')
const filepath = join(__dirname, './example.txt')
process.env.NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE = '1'
const bindings = require('../../index.node')
test('should be able adjust queue size via process.env', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testExecuteTokioReadfile, undefined)
return
}
try {
await Promise.all(
Array.from({ length: 50 }).map((_) =>
bindings.testExecuteTokioReadfile(filepath),
),
)
throw new TypeError('Unreachable')
} catch (e) {
t.is(e.message, 'QueueFull: Failed to run future: no available capacity')
}
})

View file

@ -0,0 +1,64 @@
const test = require('ava')
const { join } = require('path')
const { readFileSync } = require('fs')
const bindings = require('../../index.node')
const napiVersion = require('../napi-version')
const filepath = join(__dirname, './example.txt')
test.serial('should execute future on tokio runtime', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testExecuteTokioReadfile, undefined)
return
}
const fileContent = await bindings.testExecuteTokioReadfile(filepath)
t.true(Buffer.isBuffer(fileContent))
t.deepEqual(readFileSync(filepath), fileContent)
})
test.serial('should reject error from tokio future', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testTokioError, undefined)
return
}
try {
await bindings.testTokioError(filepath)
throw new TypeError('Unreachable')
} catch (e) {
t.is(e.message, 'Error from tokio future')
}
})
test.serial('should be able to execute future paralleled', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testExecuteTokioReadfile, undefined)
return
}
const buffers = await Promise.all(
Array.from({ length: 50 }).map((_) =>
bindings.testExecuteTokioReadfile(filepath),
),
)
for (const fileContent of buffers) {
t.true(Buffer.isBuffer(fileContent))
t.deepEqual(readFileSync(filepath), fileContent)
}
})
test.serial('should reject if task queue is full', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testExecuteTokioReadfile, undefined)
return
}
try {
await Promise.all(
Array.from({ length: 1000 * 1000 }).map((_) =>
bindings.testExecuteTokioReadfile(filepath),
),
)
throw new TypeError('Unreachable')
} catch (e) {
t.is(e.message, 'QueueFull: Failed to run future: no available capacity')
}
})

View file

@ -7,7 +7,7 @@ const napiVersion = require('../napi-version')
const filepath = join(__dirname, './example.txt') const filepath = join(__dirname, './example.txt')
test('should call callback with the first arguments as an Error', async (t) => { test('should execute future on libuv thread pool', async (t) => {
if (napiVersion < 4) { if (napiVersion < 4) {
t.is(bindings.uvReadFile, undefined) t.is(bindings.uvReadFile, undefined)
return return

View file

@ -5,32 +5,36 @@ extern crate napi_rs_derive;
use napi::{CallContext, Error, JsString, JsUnknown, Module, Result, Status}; use napi::{CallContext, Error, JsString, JsUnknown, Module, Result, Status};
#[cfg(napi4)]
mod napi4;
#[cfg(napi4)] #[cfg(napi4)]
mod libuv; mod libuv;
#[cfg(napi4)]
mod napi4;
#[cfg(napi5)] #[cfg(napi5)]
mod napi5; mod napi5;
#[cfg(napi4)]
mod tokio_rt;
mod buffer; mod buffer;
mod function;
mod external; mod external;
mod function;
mod napi_version;
mod symbol; mod symbol;
mod task; mod task;
mod napi_version;
use buffer::{buffer_to_string, get_buffer_length}; use buffer::{buffer_to_string, get_buffer_length};
use function::{call_function, call_function_with_this};
use external::{create_external, get_external_count}; use external::{create_external, get_external_count};
use function::{call_function, call_function_with_this};
#[cfg(napi4)]
use libuv::read_file::uv_read_file;
#[cfg(napi4)] #[cfg(napi4)]
use napi4::{test_threadsafe_function, test_tokio_readfile, test_tsfn_error}; use napi4::{test_threadsafe_function, test_tokio_readfile, test_tsfn_error};
#[cfg(napi5)] #[cfg(napi5)]
use napi5::is_date::test_object_is_date; use napi5::is_date::test_object_is_date;
use napi_version::get_napi_version;
use symbol::{create_named_symbol, create_symbol_from_js_string, create_unnamed_symbol}; use symbol::{create_named_symbol, create_symbol_from_js_string, create_unnamed_symbol};
use task::test_spawn_thread; use task::test_spawn_thread;
#[cfg(napi4)] #[cfg(napi4)]
use libuv::read_file::uv_read_file; use tokio_rt::{error_from_tokio_future, test_execute_tokio_readfile};
use napi_version::get_napi_version;
register_module!(test_module, init); register_module!(test_module, init);
@ -49,12 +53,16 @@ fn init(module: &mut Module) -> Result<()> {
module.create_named_method("testCallFunction", call_function)?; module.create_named_method("testCallFunction", call_function)?;
module.create_named_method("testCallFunctionWithThis", call_function_with_this)?; module.create_named_method("testCallFunctionWithThis", call_function_with_this)?;
#[cfg(napi4)] #[cfg(napi4)]
module.create_named_method("testExecuteTokioReadfile", test_execute_tokio_readfile)?;
#[cfg(napi4)]
module.create_named_method("testTsfnError", test_tsfn_error)?; module.create_named_method("testTsfnError", test_tsfn_error)?;
#[cfg(napi4)] #[cfg(napi4)]
module.create_named_method("testThreadsafeFunction", test_threadsafe_function)?; module.create_named_method("testThreadsafeFunction", test_threadsafe_function)?;
#[cfg(napi4)] #[cfg(napi4)]
module.create_named_method("testTokioReadfile", test_tokio_readfile)?; module.create_named_method("testTokioReadfile", test_tokio_readfile)?;
#[cfg(napi4)] #[cfg(napi4)]
module.create_named_method("testTokioError", error_from_tokio_future)?;
#[cfg(napi4)]
module.create_named_method("uvReadFile", uv_read_file)?; module.create_named_method("uvReadFile", uv_read_file)?;
#[cfg(napi5)] #[cfg(napi5)]
module.create_named_method("testObjectIsDate", test_object_is_date)?; module.create_named_method("testObjectIsDate", test_object_is_date)?;

View file

@ -0,0 +1,3 @@
mod read_file;
pub use read_file::*;

View file

@ -0,0 +1,31 @@
use futures::prelude::*;
use napi::{CallContext, Error, JsObject, JsString, Result, Status};
use tokio;
#[js_function(1)]
pub fn test_execute_tokio_readfile(ctx: CallContext) -> Result<JsObject> {
let js_filepath = ctx.get::<JsString>(0)?;
let path_str = js_filepath.as_str()?;
ctx.env.execute_tokio_future(
tokio::fs::read(path_str.to_owned())
.map(|v| v.map_err(|e| Error::new(Status::Unknown, format!("failed to read file, {}", e)))),
|&mut env, data| env.create_buffer_with_data(data),
)
}
#[js_function(1)]
pub fn error_from_tokio_future(ctx: CallContext) -> Result<JsObject> {
let js_filepath = ctx.get::<JsString>(0)?;
let path_str = js_filepath.as_str()?;
ctx.env.execute_tokio_future(
tokio::fs::read(path_str.to_owned())
.map_err(Error::from)
.and_then(|_| async move {
Err::<Vec<u8>, Error>(Error::new(
Status::Unknown,
"Error from tokio future".to_owned(),
))
}),
|&mut env, data| env.create_buffer_with_data(data),
)
}