feat(napi): implment tokio_rt feature

This commit is contained in:
LongYinan 2020-07-08 00:59:09 +08:00
parent f43b6a77ea
commit 9118e9e62d
No known key found for this signature in database
GPG key ID: C3666B7FC82ADAD7
13 changed files with 357 additions and 94 deletions

View file

@ -89,6 +89,7 @@ pub fn js_function(attr: TokenStream, input: TokenStream) -> TokenStream {
use std::mem;
use std::os::raw::c_char;
use std::ptr;
use std::ffi::CString;
use napi_rs::{JsUnknown, Env, Status, NapiValue, CallContext};
let mut argc = #arg_len_span as usize;
let mut raw_args =
@ -119,7 +120,7 @@ pub fn js_function(attr: TokenStream, input: TokenStream) -> TokenStream {
Err(e) => {
let message = format!("{}", e);
unsafe {
napi_rs::sys::napi_throw_error(raw_env, ptr::null(), message.as_ptr() as *const c_char);
napi_rs::sys::napi_throw_error(raw_env, ptr::null(), CString::from_vec_unchecked(message.into()).as_ptr() as *const c_char);
}
let mut undefined = ptr::null_mut();
unsafe { napi_rs::sys::napi_get_undefined(raw_env, &mut undefined) };

View file

@ -11,9 +11,21 @@ edition = "2018"
[features]
libuv = ["futures"]
tokio_rt = ["futures", "tokio", "once_cell"]
[dependencies.futures]
version = "0.3"
optional = true
[dependencies.tokio]
version = "0.2"
features = ["rt-core", "rt-threaded", "sync"]
optional = true
[dependencies.once_cell]
version = "1.4"
optional = true
[dependencies]
futures = { version = "0.3", optional = true }
[target.'cfg(windows)'.build-dependencies]
flate2 = "1.0"

View file

@ -1,21 +1,25 @@
use crate::task::Task;
use std::any::TypeId;
use std::convert::TryInto;
use std::ffi::CString;
use std::mem;
use std::os::raw::c_char;
use std::os::raw::c_void;
use std::os::raw::{c_char, c_void};
use std::ptr;
use crate::error::check_status;
use crate::js_values::*;
use crate::task::Task;
use crate::{sys, AsyncWork, Error, NodeVersion, Result, Status};
#[cfg(all(feature = "libuv", napi4))]
#[cfg(all(any(feature = "libuv", feature = "tokio_rt"), napi4))]
use crate::promise;
#[cfg(all(feature = "tokio_rt", napi4))]
use crate::tokio_rt::{get_tokio_sender, Message};
#[cfg(all(feature = "libuv", napi4))]
use crate::uv;
#[cfg(all(feature = "libuv", napi4))]
use std::future::Future;
#[cfg(all(feature = "tokio_rt", napi4))]
use tokio::sync::mpsc::error::TrySendError;
pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value;
@ -277,9 +281,13 @@ impl Env {
#[inline]
pub fn throw_error(&self, msg: &str) -> Result<()> {
let status = unsafe { sys::napi_throw_error(self.0, ptr::null(), msg.as_ptr() as *const _) };
check_status(status)?;
Ok(())
check_status(unsafe {
sys::napi_throw_error(
self.0,
ptr::null(),
CString::from_vec_unchecked(msg.into()).as_ptr() as *const _,
)
})
}
#[inline]
@ -489,7 +497,7 @@ impl Env {
#[cfg(all(feature = "libuv", napi4))]
#[inline]
pub fn execute<
T: 'static,
T: 'static + Send,
V: 'static + NapiValue,
F: 'static + Future<Output = Result<T>>,
R: 'static + Send + Sync + FnOnce(&mut Env, T) -> Result<V>,
@ -498,8 +506,6 @@ impl Env {
deferred: F,
resolver: R,
) -> Result<JsObject> {
use futures::prelude::*;
let mut raw_promise = ptr::null_mut();
let mut raw_deferred = ptr::null_mut();
@ -509,24 +515,49 @@ impl Env {
}
let event_loop = unsafe { sys::uv_default_loop() };
let raw_env = self.0;
let future_to_execute = promise::resolve_from_future(self.0, deferred, resolver, raw_deferred)
.map(move |v| match v {
Ok(value) => value,
Err(e) => {
let cloned_error = e.clone();
unsafe {
sys::napi_throw_error(raw_env, ptr::null(), e.reason.as_ptr() as *const _);
};
eprintln!("{:?}", &cloned_error);
panic!(cloned_error);
}
});
let future_promise = promise::FuturePromise::create(self.0, raw_deferred, Box::from(resolver))?;
let future_to_execute = promise::resolve_from_future(future_promise.start()?, deferred);
uv::execute(event_loop, Box::pin(future_to_execute))?;
Ok(JsObject::from_raw_unchecked(self.0, raw_promise))
}
#[cfg(all(feature = "tokio_rt", napi4))]
#[inline]
pub fn execute_tokio_future<
T: 'static + Send,
V: 'static + NapiValue,
F: 'static + Send + Future<Output = Result<T>>,
R: 'static + Send + Sync + FnOnce(&mut Env, T) -> Result<V>,
>(
&self,
fut: F,
resolver: R,
) -> Result<JsObject> {
let mut raw_promise = ptr::null_mut();
let mut raw_deferred = ptr::null_mut();
check_status(unsafe { sys::napi_create_promise(self.0, &mut raw_deferred, &mut raw_promise) })?;
let raw_env = self.0;
let future_promise =
promise::FuturePromise::create(raw_env, raw_deferred, Box::from(resolver))?;
let future_to_resolve = promise::resolve_from_future(future_promise.start()?, fut);
let mut sender = get_tokio_sender().clone();
sender
.try_send(Message::Task(Box::pin(future_to_resolve)))
.map_err(|e| match e {
TrySendError::Full(_) => Error::new(
Status::QueueFull,
format!("Failed to run future: no available capacity"),
),
TrySendError::Closed(_) => Error::new(
Status::Closing,
format!("Failed to run future: receiver closed"),
),
})?;
Ok(JsObject::from_raw_unchecked(self.0, raw_promise))
}
#[inline]
pub fn get_node_version(&self) -> Result<NodeVersion> {
let mut result = ptr::null();

View file

@ -62,8 +62,17 @@ impl Error {
impl From<std::ffi::NulError> for Error {
fn from(error: std::ffi::NulError) -> Self {
Error {
status: Status::StringExpected,
reason: format!("{:?}", error),
status: Status::GenericFailure,
reason: format!("{}", error),
}
}
}
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Error {
status: Status::GenericFailure,
reason: format!("{}", error),
}
}
}

View file

@ -11,6 +11,8 @@ pub mod sys;
mod task;
#[cfg(napi4)]
pub mod threadsafe_function;
#[cfg(all(feature = "tokio_rt", napi4))]
mod tokio_rt;
#[cfg(all(feature = "libuv", napi4))]
mod uv;
mod version;
@ -26,19 +28,34 @@ pub use sys::napi_valuetype;
pub use task::Task;
pub use version::NodeVersion;
#[cfg(all(feature = "tokio_rt", napi4))]
pub use tokio_rt::shutdown as shutdown_tokio_rt;
#[macro_export]
macro_rules! register_module {
($module_name:ident, $init:ident) => {
#[inline]
fn check_status(code: $crate::sys::napi_status) -> Result<()> {
let status = Status::from(code);
match status {
Status::Ok => Ok(()),
_ => Err(Error::from_status(status)),
}
}
#[no_mangle]
#[cfg_attr(target_os = "linux", link_section = ".ctors")]
#[cfg_attr(target_os = "macos", link_section = "__DATA,__mod_init_func")]
#[cfg_attr(target_os = "windows", link_section = ".CRT$XCU")]
pub static __REGISTER_MODULE: extern "C" fn() = {
use std::ffi::CString;
use std::io::Write;
use std::os::raw::c_char;
use std::ptr;
use $crate::{sys, Env, JsObject, Module, NapiValue};
#[cfg(all(feature = "tokio_rt", napi4))]
use $crate::shutdown_tokio_rt;
extern "C" fn register_module() {
static mut MODULE_DESCRIPTOR: Option<sys::napi_module> = None;
unsafe {
@ -64,14 +81,23 @@ macro_rules! register_module {
let mut cjs_module = Module { env, exports };
let result = $init(&mut cjs_module);
match result {
#[cfg(all(feature = "tokio_rt", napi4))]
let hook_result = check_status(unsafe {
sys::napi_add_env_cleanup_hook(raw_env, Some(shutdown_tokio_rt), ptr::null_mut())
});
#[cfg(not(all(feature = "tokio_rt", napi4)))]
let hook_result = Ok(());
match hook_result.and_then(move |_| result) {
Ok(_) => exports.into_raw(),
Err(e) => {
unsafe {
sys::napi_throw_error(
raw_env,
ptr::null(),
format!("Error initializing module: {:?}", e).as_ptr() as *const _,
CString::from_vec_unchecked(format!("Error initializing module: {}", e).into())
.as_ptr() as *const _,
)
};
ptr::null_mut()

View file

@ -5,42 +5,49 @@ use std::ptr;
use crate::error::check_status;
use crate::{sys, Env, NapiValue, Result};
struct FuturePromise<T, V: NapiValue> {
pub struct FuturePromise<T, V: NapiValue> {
deferred: sys::napi_deferred,
env: sys::napi_env,
tsfn: sys::napi_threadsafe_function,
async_resource_name: sys::napi_value,
resolver: Box<dyn FnOnce(&mut Env, T) -> Result<V>>,
}
#[inline]
pub async fn resolve_from_future<
T,
V: NapiValue,
R: FnOnce(&mut Env, T) -> Result<V> + 'static,
F: Future<Output = Result<T>>,
>(
unsafe impl<T, V: NapiValue> Send for FuturePromise<T, V> {}
impl<T, V: NapiValue> FuturePromise<T, V> {
pub fn create(
env: sys::napi_env,
fut: F,
resolver: R,
raw_deferred: sys::napi_deferred,
) -> Result<()> {
resolver: Box<dyn FnOnce(&mut Env, T) -> Result<V>>,
) -> Result<Self> {
let mut async_resource_name = ptr::null_mut();
let s = "napi_resolve_promise_from_future";
let status = unsafe {
check_status(unsafe {
sys::napi_create_string_utf8(
env,
s.as_ptr() as *const c_char,
s.len() as u64,
&mut async_resource_name,
)
};
check_status(status)?;
})?;
let initial_thread_count: u64 = 1;
let mut tsfn_value = ptr::null_mut();
let future_promise = FuturePromise {
Ok(FuturePromise {
deferred: raw_deferred,
resolver: Box::from(resolver),
};
let status = unsafe {
resolver,
env,
tsfn: ptr::null_mut(),
async_resource_name,
})
}
pub(crate) fn start(self) -> Result<TSFNValue> {
let mut tsfn_value = ptr::null_mut();
let async_resource_name = self.async_resource_name;
let initial_thread_count: u64 = 1;
let env = self.env;
let self_ref = Box::leak(Box::from(self));
check_status(unsafe {
sys::napi_create_threadsafe_function(
env,
ptr::null_mut(),
@ -50,26 +57,38 @@ pub async fn resolve_from_future<
initial_thread_count,
ptr::null_mut(),
None,
Box::leak(Box::from(future_promise)) as *mut _ as *mut c_void,
self_ref as *mut _ as *mut c_void,
Some(call_js_cb::<T, V>),
&mut tsfn_value,
)
};
check_status(status)?;
let val = fut.await?;
})?;
self_ref.tsfn = tsfn_value;
Ok(TSFNValue(tsfn_value))
}
}
pub(crate) struct TSFNValue(sys::napi_threadsafe_function);
unsafe impl Send for TSFNValue {}
unsafe impl Sync for TSFNValue {}
#[inline]
pub(crate) async fn resolve_from_future<T: Send, F: Future<Output = Result<T>>>(
tsfn_value: TSFNValue,
fut: F,
) {
let val = fut.await;
check_status(unsafe { sys::napi_acquire_threadsafe_function(tsfn_value.0) })
.expect("Failed to acquire thread safe function");
check_status(unsafe {
sys::napi_call_threadsafe_function(
tsfn_value,
tsfn_value.0,
Box::into_raw(Box::from(val)) as *mut _ as *mut c_void,
sys::napi_threadsafe_function_call_mode::napi_tsfn_nonblocking,
)
})?;
check_status(unsafe {
sys::napi_release_threadsafe_function(
tsfn_value,
sys::napi_threadsafe_function_release_mode::napi_tsfn_release,
)
})
.expect("Failed to call thread safe function");
}
unsafe extern "C" fn call_js_cb<T, V: NapiValue>(
@ -80,17 +99,27 @@ unsafe extern "C" fn call_js_cb<T, V: NapiValue>(
) {
let mut env = Env::from_raw(raw_env);
let future_promise = Box::from_raw(context as *mut FuturePromise<T, V>);
let value = ptr::read(data as *const _);
let js_value_to_resolve = (future_promise.resolver)(&mut env, value);
let value: Result<T> = ptr::read(data as *const _);
let resolver = future_promise.resolver;
let deferred = future_promise.deferred;
let tsfn = future_promise.tsfn;
let js_value_to_resolve = value.and_then(move |v| (resolver)(&mut env, v));
match js_value_to_resolve {
Ok(v) => {
let status = sys::napi_resolve_deferred(raw_env, deferred, v.raw_value());
debug_assert!(status == sys::napi_status::napi_ok, "Resolve promise failed");
debug_assert!(
status == sys::napi_status::napi_ok,
"Resolve promise failed"
);
}
Err(e) => {
let status = sys::napi_reject_deferred(raw_env, deferred, e.into_raw(raw_env));
debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed");
}
};
check_status(sys::napi_release_threadsafe_function(
tsfn,
sys::napi_threadsafe_function_release_mode::napi_tsfn_release,
))
.expect("Release threadsafe function failed");
}

49
napi/src/tokio_rt.rs Normal file
View file

@ -0,0 +1,49 @@
use std::ffi::c_void;
use std::mem;
use std::pin::Pin;
use std::thread::spawn;
use std::time::Duration;
use futures::future::Future;
use once_cell::sync::OnceCell;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use crate::Error;
pub(crate) enum Message {
Task(Pin<Box<dyn Future<Output = ()> + Send>>),
Shutdown,
}
#[inline]
pub(crate) fn get_tokio_sender() -> &'static mpsc::Sender<Message> {
static SENDER: OnceCell<mpsc::Sender<Message>> = OnceCell::new();
SENDER.get_or_init(|| {
let (sender, mut receiver) = mpsc::channel(100);
spawn(move || {
let mut rt = Runtime::new().expect("Failed to create tokio runtime");
rt.block_on(async {
loop {
match receiver.recv().await {
Some(Message::Task(fut)) => fut.await,
Some(Message::Shutdown) => break,
None => {}
}
}
});
rt.shutdown_timeout(Duration::from_secs(5));
mem::drop(receiver);
});
sender
})
}
pub unsafe extern "C" fn shutdown(_data: *mut c_void) {
let mut sender = get_tokio_sender().clone();
sender
.try_send(Message::Shutdown)
.map_err(|e| Error::from_reason(format!("Shutdown tokio runtime failed: {}", e)))
.unwrap()
}

View file

@ -9,7 +9,7 @@ crate-type = ["cdylib"]
[dependencies]
futures = "0.3"
napi-rs = { path = "../napi", features = ["libuv"] }
napi-rs = { path = "../napi", features = ["libuv", "tokio_rt"] }
napi-rs-derive = { path = "../napi-derive" }
tokio = { version = "0.2", features = ["default", "fs"]}

View file

@ -0,0 +1,64 @@
const test = require('ava')
const { join } = require('path')
const { readFileSync } = require('fs')
const bindings = require('../../index.node')
const napiVersion = require('../napi-version')
const filepath = join(__dirname, './example.txt')
test.serial('should execute future on tokio runtime', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testExecuteTokioReadfile, undefined)
return
}
const fileContent = await bindings.testExecuteTokioReadfile(filepath)
t.true(Buffer.isBuffer(fileContent))
t.deepEqual(readFileSync(filepath), fileContent)
})
test.serial('should reject error from tokio future', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testTokioError, undefined)
return
}
try {
await bindings.testTokioError(filepath)
throw new TypeError('Unreachable')
} catch (e) {
t.is(e.message, 'Error from tokio future')
}
})
test.serial('should be able to execute future paralleled', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testExecuteTokioReadfile, undefined)
return
}
const buffers = await Promise.all(
Array.from({ length: 50 }).map((_) =>
bindings.testExecuteTokioReadfile(filepath),
),
)
for (const fileContent of buffers) {
t.true(Buffer.isBuffer(fileContent))
t.deepEqual(readFileSync(filepath), fileContent)
}
})
test.serial('should reject if task queue is full', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testExecuteTokioReadfile, undefined)
return
}
try {
await Promise.all(
Array.from({ length: 1000 * 1000 }).map((_) =>
bindings.testExecuteTokioReadfile(filepath),
),
)
throw new TypeError('Unreachable')
} catch (e) {
t.is(e.message, 'QueueFull: Failed to run future: no available capacity')
}
})

View file

@ -7,7 +7,7 @@ const napiVersion = require('../napi-version')
const filepath = join(__dirname, './example.txt')
test('should call callback with the first arguments as an Error', async (t) => {
test('should execute future on libuv thread pool', async (t) => {
if (napiVersion < 4) {
t.is(bindings.uvReadFile, undefined)
return

View file

@ -5,32 +5,36 @@ extern crate napi_rs_derive;
use napi::{CallContext, Error, JsString, JsUnknown, Module, Result, Status};
#[cfg(napi4)]
mod napi4;
#[cfg(napi4)]
mod libuv;
#[cfg(napi4)]
mod napi4;
#[cfg(napi5)]
mod napi5;
#[cfg(napi4)]
mod tokio_rt;
mod buffer;
mod function;
mod external;
mod function;
mod napi_version;
mod symbol;
mod task;
mod napi_version;
use buffer::{buffer_to_string, get_buffer_length};
use function::{call_function, call_function_with_this};
use external::{create_external, get_external_count};
use function::{call_function, call_function_with_this};
#[cfg(napi4)]
use libuv::read_file::uv_read_file;
#[cfg(napi4)]
use napi4::{test_threadsafe_function, test_tokio_readfile, test_tsfn_error};
#[cfg(napi5)]
use napi5::is_date::test_object_is_date;
use napi_version::get_napi_version;
use symbol::{create_named_symbol, create_symbol_from_js_string, create_unnamed_symbol};
use task::test_spawn_thread;
#[cfg(napi4)]
use libuv::read_file::uv_read_file;
use napi_version::get_napi_version;
use tokio_rt::{error_from_tokio_future, test_execute_tokio_readfile};
register_module!(test_module, init);
@ -49,12 +53,16 @@ fn init(module: &mut Module) -> Result<()> {
module.create_named_method("testCallFunction", call_function)?;
module.create_named_method("testCallFunctionWithThis", call_function_with_this)?;
#[cfg(napi4)]
module.create_named_method("testExecuteTokioReadfile", test_execute_tokio_readfile)?;
#[cfg(napi4)]
module.create_named_method("testTsfnError", test_tsfn_error)?;
#[cfg(napi4)]
module.create_named_method("testThreadsafeFunction", test_threadsafe_function)?;
#[cfg(napi4)]
module.create_named_method("testTokioReadfile", test_tokio_readfile)?;
#[cfg(napi4)]
module.create_named_method("testTokioError", error_from_tokio_future)?;
#[cfg(napi4)]
module.create_named_method("uvReadFile", uv_read_file)?;
#[cfg(napi5)]
module.create_named_method("testObjectIsDate", test_object_is_date)?;

View file

@ -0,0 +1,3 @@
mod read_file;
pub use read_file::*;

View file

@ -0,0 +1,31 @@
use futures::prelude::*;
use napi::{CallContext, Error, JsObject, JsString, Result, Status};
use tokio;
#[js_function(1)]
pub fn test_execute_tokio_readfile(ctx: CallContext) -> Result<JsObject> {
let js_filepath = ctx.get::<JsString>(0)?;
let path_str = js_filepath.as_str()?;
ctx.env.execute_tokio_future(
tokio::fs::read(path_str.to_owned())
.map(|v| v.map_err(|e| Error::new(Status::Unknown, format!("failed to read file, {}", e)))),
|&mut env, data| env.create_buffer_with_data(data),
)
}
#[js_function(1)]
pub fn error_from_tokio_future(ctx: CallContext) -> Result<JsObject> {
let js_filepath = ctx.get::<JsString>(0)?;
let path_str = js_filepath.as_str()?;
ctx.env.execute_tokio_future(
tokio::fs::read(path_str.to_owned())
.map_err(Error::from)
.and_then(|_| async move {
Err::<Vec<u8>, Error>(Error::new(
Status::Unknown,
"Error from tokio future".to_owned(),
))
}),
|&mut env, data| env.create_buffer_with_data(data),
)
}