diff --git a/napi/Cargo.toml b/napi/Cargo.toml index b2c56310..5fe0f188 100644 --- a/napi/Cargo.toml +++ b/napi/Cargo.toml @@ -11,6 +11,9 @@ edition = "2018" [dependencies] futures = { version = "0.3", features = ["default", "thread-pool"] } +num_cpus = "1.13" +once_cell = "1.3" +threadpool = "1.8" [target.'cfg(windows)'.build-dependencies] flate2 = "1.0" diff --git a/napi/src/lib.rs b/napi/src/lib.rs index a7dbb547..6964fbff 100644 --- a/napi/src/lib.rs +++ b/napi/src/lib.rs @@ -1,7 +1,8 @@ -extern crate futures; - use core::fmt::Debug; +use futures::channel::oneshot::channel; use futures::prelude::*; +use num_cpus::get_physical; +use once_cell::sync::OnceCell; use std::any::TypeId; use std::convert::{TryFrom, TryInto}; use std::ffi::CString; @@ -13,20 +14,27 @@ use std::ptr; use std::slice; use std::str; use std::string::String as RustString; +use std::sync::Arc; +use threadpool::ThreadPool; mod call_context; mod executor; mod promise; pub mod sys; +mod task; mod version; pub use call_context::CallContext; pub use sys::{napi_valuetype, Status}; +pub use task::Task; +use task::{NapiRSThreadPool, ThreadSafeTask}; pub use version::NodeVersion; pub type Result = std::result::Result; pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value; +static THREAD_POOL: OnceCell = OnceCell::new(); + #[derive(Debug, Clone)] pub struct Error { pub status: Status, @@ -526,7 +534,8 @@ impl Env { Ok(Value::from_raw_value(self, result, Object)) } - pub fn perform_async_operation< + #[inline] + pub fn execute< T: 'static, V: 'static + ValueType, F: 'static + Future>, @@ -568,6 +577,43 @@ impl Env { Ok(Value::from_raw_value(self, raw_promise, Object)) } + #[inline] + pub fn spawn< + V: 'static + ValueType, + O: Send + 'static, + T: 'static + Send + Task, + >( + &self, + task: T, + ) -> Result> { + let (sender, receiver) = channel::>(); + let threadpool = + THREAD_POOL.get_or_init(|| NapiRSThreadPool(ThreadPool::new(get_physical() * 2))); + let inner_task = Arc::new(ThreadSafeTask::new(task)); + let thread_task = Arc::clone(&inner_task); + let promise_task = Arc::clone(&inner_task); + threadpool.0.execute(move || { + let value = thread_task.borrow().compute(); + match sender.send(value) { + Err(e) => panic!(e), + _ => {} + }; + }); + let rev_value = async { + let result = receiver.await; + result + .map_err(|_| Error { + status: Status::Cancelled, + reason: Some("Receiver cancelled".to_owned()), + }) + .and_then(|v| v) + }; + + self.execute(rev_value, move |env, v| { + promise_task.borrow().resolve(env, v) + }) + } + pub fn get_node_version(&self) -> Result { let mut result = ptr::null(); check_status(unsafe { sys::napi_get_node_version(self.0, &mut result) })?; diff --git a/napi/src/task.rs b/napi/src/task.rs new file mode 100644 index 00000000..0b636de5 --- /dev/null +++ b/napi/src/task.rs @@ -0,0 +1,40 @@ +use threadpool::ThreadPool; + +use crate::{Env, Result, Value, ValueType}; + +pub struct NapiRSThreadPool(pub ThreadPool); + +unsafe impl Send for NapiRSThreadPool {} + +unsafe impl Sync for NapiRSThreadPool {} + +pub trait Task { + type Output: Send + 'static; + type JsValue: ValueType; + + fn compute(&mut self) -> Result; + + fn resolve(&self, env: &mut Env, output: Self::Output) -> Result>; +} + +pub struct ThreadSafeTask(pub *mut T); + +impl ThreadSafeTask { + pub fn new(task: T) -> ThreadSafeTask { + ThreadSafeTask(Box::into_raw(Box::new(task))) + } + + #[inline] + pub fn borrow(&self) -> &'static mut T { + Box::leak(unsafe { Box::from_raw(self.0) }) + } +} + +impl Drop for ThreadSafeTask { + fn drop(&mut self) { + unsafe { Box::from_raw(self.0) }; + } +} + +unsafe impl Send for ThreadSafeTask {} +unsafe impl Sync for ThreadSafeTask {} diff --git a/test_module/index.js b/test_module/index.js index 33a813d0..aa8a95cf 100644 --- a/test_module/index.js +++ b/test_module/index.js @@ -16,6 +16,11 @@ function testThrow() { } } +function testSpawnThread(n) { + console.info('=== Test spawn task to threadpool') + return testModule.testSpawnThread(n) +} + const future = testSpawn() future @@ -23,6 +28,11 @@ future console.info(`${value} from napi`) testThrow() }) + .then(() => testSpawnThread(20)) + .then((value) => { + console.assert(value === 6765) + console.info('=== fibonacci result', value) + }) .catch((e) => { console.error(e) process.exit(1) diff --git a/test_module/src/lib.rs b/test_module/src/lib.rs index 7131f50e..9d0f249b 100644 --- a/test_module/src/lib.rs +++ b/test_module/src/lib.rs @@ -5,13 +5,18 @@ extern crate napi_rs_derive; extern crate futures; -use napi::{Any, CallContext, Env, Error, Object, Result, Status, Value}; +use napi::{Any, CallContext, Env, Error, Number, Object, Result, Status, Task, Value}; +use std::convert::TryInto; register_module!(test_module, init); fn init(env: &Env, exports: &mut Value) -> Result<()> { exports.set_named_property("testSpawn", env.create_function("testSpawn", test_spawn)?)?; exports.set_named_property("testThrow", env.create_function("testThrow", test_throw)?)?; + exports.set_named_property( + "testSpawnThread", + env.create_function("testSpawnThread", test_spawn_thread)?, + )?; Ok(()) } @@ -35,7 +40,45 @@ fn test_spawn(ctx: CallContext) -> Result> { Ok(results.len() as u32) }; - env.perform_async_operation(fut_values, |&mut env, len| env.create_uint32(len)) + env.execute(fut_values, |&mut env, len| env.create_uint32(len)) +} + +struct ComputeFib { + n: u32, +} + +impl ComputeFib { + pub fn new(n: u32) -> ComputeFib { + ComputeFib { n } + } +} + +impl Task for ComputeFib { + type Output = u32; + type JsValue = Number; + + fn compute(&mut self) -> Result { + Ok(fibonacci_native(self.n)) + } + + fn resolve(&self, env: &mut Env, output: Self::Output) -> Result> { + env.create_uint32(output) + } +} + +#[inline] +fn fibonacci_native(n: u32) -> u32 { + match n { + 1 | 2 => 1, + _ => fibonacci_native(n - 1) + fibonacci_native(n - 2), + } +} + +#[js_function(1)] +fn test_spawn_thread(ctx: CallContext) -> Result> { + let n = ctx.get::(0)?; + let task = ComputeFib::new(n.try_into()?); + ctx.env.spawn(task) } #[js_function]