From ec235d284d5ffc595b9a94e229356ad13c896f2d Mon Sep 17 00:00:00 2001 From: LongYinan Date: Tue, 12 May 2020 13:59:20 +0800 Subject: [PATCH] refactor(spawn): napi_async_worker implementation --- .github/workflows/linux.yaml | 12 ++- .github/workflows/macos.yaml | 12 ++- .github/workflows/windows.yaml | 10 +- napi/Cargo.toml | 3 - napi/src/async_work.rs | 114 ++++++++++++++++++++++ napi/src/executor.rs | 142 ++++++++++++++++------------ napi/src/lib.rs | 78 +++++++-------- napi/src/task.rs | 34 +------ package.json | 3 +- test_module/{index.js => future.js} | 19 +--- test_module/fuzzy.js | 34 +++++++ test_module/spawn.js | 16 ++++ test_module/src/lib.rs | 2 +- 13 files changed, 303 insertions(+), 176 deletions(-) create mode 100644 napi/src/async_work.rs rename test_module/{index.js => future.js} (54%) create mode 100644 test_module/fuzzy.js create mode 100644 test_module/spawn.js diff --git a/.github/workflows/linux.yaml b/.github/workflows/linux.yaml index d4042c75..65639baf 100644 --- a/.github/workflows/linux.yaml +++ b/.github/workflows/linux.yaml @@ -50,25 +50,27 @@ jobs: path: target key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }} - - name: check build + - name: Check build uses: actions-rs/cargo@v1 with: command: check args: --all --bins --examples --tests -vvv - - name: tests + - name: Tests uses: actions-rs/cargo@v1 - timeout-minutes: 40 + timeout-minutes: 5 with: command: test args: -p napi-rs --lib -- --nocapture - - name: test scripts + - name: Fuzzy tests run: | yarn cd test_module yarn build - yarn test + node fuzzy.js + env: + RUST_BACKTRACE: 1 - name: Clear the cargo caches run: | diff --git a/.github/workflows/macos.yaml b/.github/workflows/macos.yaml index b0765a34..c6bdb536 100644 --- a/.github/workflows/macos.yaml +++ b/.github/workflows/macos.yaml @@ -50,25 +50,27 @@ jobs: path: target key: ${{ matrix.version }}-x86_64-apple-darwin-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }} - - name: check build + - name: Check build uses: actions-rs/cargo@v1 with: command: check args: --all --bins --examples --tests -vvv - - name: tests + - name: Tests uses: actions-rs/cargo@v1 - timeout-minutes: 40 + timeout-minutes: 5 with: command: test args: -p napi-rs --lib -- --nocapture - - name: test scripts + - name: Fuzzy tests run: | yarn cd test_module yarn build - yarn test + node fuzzy.js + env: + RUST_BACKTRACE: 1 - name: Clear the cargo caches run: | diff --git a/.github/workflows/windows.yaml b/.github/workflows/windows.yaml index b46b26ef..e75f6686 100644 --- a/.github/workflows/windows.yaml +++ b/.github/workflows/windows.yaml @@ -56,25 +56,25 @@ jobs: path: target key: ${{ matrix.version }}-${{ matrix.target }}-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }} - - name: check build + - name: Check build uses: actions-rs/cargo@v1 with: command: check args: -p napi-rs -vvv - - name: tests + - name: Tests uses: actions-rs/cargo@v1 - timeout-minutes: 40 + timeout-minutes: 5 with: command: test args: -p napi-rs --lib -- --nocapture - - name: test scripts + - name: Fuzzy tests run: | yarn cd test_module yarn build - yarn test + node fuzzy.js env: RUST_BACKTRACE: 1 diff --git a/napi/Cargo.toml b/napi/Cargo.toml index 5fe0f188..b2c56310 100644 --- a/napi/Cargo.toml +++ b/napi/Cargo.toml @@ -11,9 +11,6 @@ edition = "2018" [dependencies] futures = { version = "0.3", features = ["default", "thread-pool"] } -num_cpus = "1.13" -once_cell = "1.3" -threadpool = "1.8" [target.'cfg(windows)'.build-dependencies] flate2 = "1.0" diff --git a/napi/src/async_work.rs b/napi/src/async_work.rs new file mode 100644 index 00000000..e6f57b3c --- /dev/null +++ b/napi/src/async_work.rs @@ -0,0 +1,114 @@ +use std::mem; +use std::os::raw::{c_char, c_void}; +use std::ptr; + +use crate::{check_status, sys, Env, Result, Task}; + +pub struct AsyncWork { + inner_task: T, + deferred: sys::napi_deferred, + value: Result<*mut T::Output>, +} + +impl AsyncWork { + pub fn run(env: sys::napi_env, task: T, deferred: sys::napi_deferred) -> Result<()> { + let mut raw_resource = ptr::null_mut(); + let status = unsafe { sys::napi_create_object(env, &mut raw_resource) }; + check_status(status)?; + let mut raw_name = ptr::null_mut(); + let s = "napi_rs_async"; + let status = unsafe { + sys::napi_create_string_utf8( + env, + s.as_ptr() as *const c_char, + s.len() as u64, + &mut raw_name, + ) + }; + check_status(status)?; + let mut raw_context = ptr::null_mut(); + unsafe { + let status = sys::napi_async_init(env, raw_resource, raw_name, &mut raw_context); + check_status(status)?; + }; + let result = AsyncWork { + inner_task: task, + deferred, + value: Ok(ptr::null_mut()), + }; + let mut async_work = ptr::null_mut(); + check_status(unsafe { + sys::napi_create_async_work( + env, + raw_resource, + raw_name, + Some(execute:: as unsafe extern "C" fn(env: sys::napi_env, data: *mut c_void)), + Some( + complete:: + as unsafe extern "C" fn( + env: sys::napi_env, + status: sys::napi_status, + data: *mut c_void, + ), + ), + Box::leak(Box::new(result)) as *mut _ as *mut c_void, + &mut async_work, + ) + })?; + check_status(unsafe { sys::napi_queue_async_work(env, async_work) }) + } +} + +unsafe impl Send for AsyncWork {} + +unsafe impl Sync for AsyncWork {} + +unsafe extern "C" fn execute(_env: sys::napi_env, data: *mut c_void) { + let mut work = Box::from_raw(data as *mut AsyncWork); + work.value = work + .inner_task + .compute() + .map(|v| Box::into_raw(Box::from(v))); + Box::leak(work); +} + +unsafe extern "C" fn complete( + env: sys::napi_env, + status: sys::napi_status, + data: *mut c_void, +) { + let mut work = Box::from_raw(data as *mut AsyncWork); + let value_ptr = mem::replace(&mut work.value, Ok(ptr::null_mut())); + let deferred = mem::replace(&mut work.deferred, ptr::null_mut()); + let value = value_ptr.and_then(move |v| { + let mut env = Env::from_raw(env); + let output = ptr::read(v as *const _); + work.inner_task.resolve(&mut env, output) + }); + let mut handle_scope = ptr::null_mut(); + match check_status(status).and_then(move |_| value) { + Ok(v) => { + let open_handle_status = sys::napi_open_handle_scope(env, &mut handle_scope); + debug_assert!( + open_handle_status == sys::napi_status::napi_ok, + "OpenHandleScope failed" + ); + let status = sys::napi_resolve_deferred(env, deferred, v.raw_value); + debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed"); + } + Err(e) => { + let open_handle_status = sys::napi_open_handle_scope(env, &mut handle_scope); + debug_assert!( + open_handle_status == sys::napi_status::napi_ok, + "OpenHandleScope failed" + ); + let status = sys::napi_reject_deferred(env, deferred, e.into_raw(env)); + debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed"); + } + }; + let close_handle_scope_status = sys::napi_close_handle_scope(env, handle_scope); + debug_assert!( + close_handle_scope_status == sys::napi_status::napi_ok, + "Close handle scope failed" + ); +} diff --git a/napi/src/executor.rs b/napi/src/executor.rs index ef3da6f6..b2ce3064 100644 --- a/napi/src/executor.rs +++ b/napi/src/executor.rs @@ -1,82 +1,99 @@ -use futures::task::Poll; -use std::alloc::{alloc, Layout}; +extern crate alloc; + +use alloc::alloc::{alloc, alloc_zeroed, Layout}; +use futures::future::LocalBoxFuture; +use futures::task::{waker, ArcWake, Context, Poll}; use std::future::Future; use std::os::raw::c_void; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::task::{Context, RawWaker, RawWakerVTable, Waker}; use crate::{sys, Error, Result, Status}; -const UV_ASYNC_V_TABLE: RawWakerVTable = RawWakerVTable::new( - clone_executor, - wake_uv_async, - wake_uv_async_by_ref, - drop_uv_async, -); - -unsafe fn clone_executor(uv_async_t: *const ()) -> RawWaker { - RawWaker::new(uv_async_t, &UV_ASYNC_V_TABLE) -} - -unsafe fn wake_uv_async(uv_async_t: *const ()) { - let status = sys::uv_async_send(uv_async_t as *mut sys::uv_async_t); - assert!(status == 0, "wake_uv_async failed"); -} - -unsafe fn wake_uv_async_by_ref(uv_async_t: *const ()) { - let status = sys::uv_async_send(uv_async_t as *mut sys::uv_async_t); - assert!(status == 0, "wake_uv_async_by_ref failed"); -} - -unsafe fn drop_uv_async(_uv_async_t_ptr: *const ()) {} - struct Task<'a> { - future: Pin>>, + future: LocalBoxFuture<'a, ()>, context: Context<'a>, + is_polling: AtomicBool, +} + +struct UvWaker(*mut sys::uv_async_t); + +unsafe impl Send for UvWaker {} +unsafe impl Sync for UvWaker {} + +impl UvWaker { + fn new(event_loop: *mut sys::uv_loop_s) -> Result { + let uv_async_t = unsafe { + let layout = Layout::new::(); + debug_assert!(layout.size() != 0, "uv_async_t alloc size should not be 0"); + if cfg!(windows) { + alloc_zeroed(layout) as *mut sys::uv_async_t + } else { + alloc(layout) as *mut sys::uv_async_t + } + }; + unsafe { + let status = sys::uv_async_init(event_loop, uv_async_t, Some(poll_future)); + if status != 0 { + return Err(Error { + status: Status::Unknown, + reason: Some("Non-zero status returned from uv_async_init".to_owned()), + }); + } + }; + Ok(UvWaker(uv_async_t)) + } + + #[inline] + fn assign_task(&self, mut task: Task) { + if !task.poll_future() { + task.is_polling.store(false, Ordering::Relaxed); + let arc_task = Arc::new(task); + unsafe { + sys::uv_handle_set_data( + self.0 as *mut sys::uv_handle_t, + Arc::into_raw(arc_task) as *mut c_void, + ) + }; + } else { + unsafe { sys::uv_close(self.0 as *mut sys::uv_handle_t, None) }; + }; + } +} + +impl ArcWake for UvWaker { + fn wake_by_ref(arc_self: &Arc) { + let status = unsafe { sys::uv_async_send(arc_self.0) }; + assert!(status == 0, "wake_uv_async_by_ref failed"); + } } #[inline] -pub fn execute>( - event_loop: *mut sys::uv_loop_s, - future: F, -) -> Result<()> { - let uv_async_t = unsafe { alloc(Layout::new::()) as *mut sys::uv_async_t }; - unsafe { - let status = sys::uv_async_init(event_loop, uv_async_t, Some(poll_future)); - if status != 0 { - return Err(Error { - status: Status::Unknown, - reason: Some("Non-zero status returned from uv_async_init".to_owned()), - }); - } +pub fn execute(event_loop: *mut sys::uv_loop_s, future: LocalBoxFuture<()>) -> Result<()> { + let uv_waker = UvWaker::new(event_loop)?; + let arc_waker = Arc::new(uv_waker); + let waker_to_poll = Arc::clone(&arc_waker); + let waker = waker(arc_waker); + let context = Context::from_waker(&waker); + let task = Task { + future, + context, + is_polling: AtomicBool::from(false), }; - unsafe { - let waker = Waker::from_raw(RawWaker::new( - uv_async_t as *const _ as *const (), - &UV_ASYNC_V_TABLE, - )); - let context = Context::from_waker(&waker); - let mut task = Task { - future: Box::pin(future), - context, - }; - if !task.poll_future() { - let arc_task = Arc::new(task); - sys::uv_handle_set_data( - uv_async_t as *mut _ as *mut sys::uv_handle_t, - Arc::into_raw(arc_task) as *mut c_void, - ); - } else { - sys::uv_close(uv_async_t as *mut _ as *mut sys::uv_handle_t, None); - }; - Ok(()) - } + waker_to_poll.assign_task(task); + Ok(()) } impl<'a> Task<'a> { fn poll_future(&mut self) -> bool { - match self.future.as_mut().poll(&mut self.context) { + if self.is_polling.load(Ordering::Relaxed) { + return false; + } + self.is_polling.store(true, Ordering::Relaxed); + let mut pinned = Pin::new(&mut self.future); + let fut_mut = pinned.as_mut(); + match fut_mut.poll(&mut self.context) { Poll::Ready(_) => true, Poll::Pending => false, } @@ -90,6 +107,7 @@ unsafe extern "C" fn poll_future(handle: *mut sys::uv_async_t) { if mut_task.poll_future() { sys::uv_close(handle as *mut sys::uv_handle_t, None); } else { + mut_task.is_polling.store(false, Ordering::Relaxed); Arc::into_raw(task); }; } else { diff --git a/napi/src/lib.rs b/napi/src/lib.rs index 6964fbff..67bc73f8 100644 --- a/napi/src/lib.rs +++ b/napi/src/lib.rs @@ -1,8 +1,6 @@ +use async_work::AsyncWork; use core::fmt::Debug; -use futures::channel::oneshot::channel; use futures::prelude::*; -use num_cpus::get_physical; -use once_cell::sync::OnceCell; use std::any::TypeId; use std::convert::{TryFrom, TryInto}; use std::ffi::CString; @@ -14,9 +12,8 @@ use std::ptr; use std::slice; use std::str; use std::string::String as RustString; -use std::sync::Arc; -use threadpool::ThreadPool; +mod async_work; mod call_context; mod executor; mod promise; @@ -27,14 +24,11 @@ mod version; pub use call_context::CallContext; pub use sys::{napi_valuetype, Status}; pub use task::Task; -use task::{NapiRSThreadPool, ThreadSafeTask}; pub use version::NodeVersion; pub type Result = std::result::Result; pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value; -static THREAD_POOL: OnceCell = OnceCell::new(); - #[derive(Debug, Clone)] pub struct Error { pub status: Status, @@ -179,6 +173,27 @@ impl Error { reason: None, } } + + pub fn into_raw(self, env: sys::napi_env) -> sys::napi_value { + let mut err = ptr::null_mut(); + let s = self.reason.unwrap_or("NAPI error".to_owned()); + unsafe { + let mut err_reason = ptr::null_mut(); + let status = sys::napi_create_string_utf8( + env, + s.as_ptr() as *const c_char, + s.len() as u64, + &mut err_reason, + ); + debug_assert!( + status == sys::napi_status::napi_ok, + "Create error reason failed" + ); + let status = sys::napi_create_error(env, ptr::null_mut(), err_reason, &mut err); + debug_assert!(status == sys::napi_status::napi_ok, "Create error failed"); + }; + err + } } impl From for Error { @@ -555,8 +570,7 @@ impl Env { let event_loop = unsafe { sys::uv_default_loop() }; let raw_env = self.0; - executor::execute( - event_loop, + let future_to_execute = promise::resolve(self.0, deferred, resolver, raw_deferred).map(move |v| match v { Ok(value) => value, Err(e) => { @@ -571,47 +585,19 @@ impl Env { eprintln!("{:?}", &cloned_error); panic!(cloned_error); } - }), - )?; + }); + executor::execute(event_loop, Box::pin(future_to_execute))?; Ok(Value::from_raw_value(self, raw_promise, Object)) } - #[inline] - pub fn spawn< - V: 'static + ValueType, - O: Send + 'static, - T: 'static + Send + Task, - >( - &self, - task: T, - ) -> Result> { - let (sender, receiver) = channel::>(); - let threadpool = - THREAD_POOL.get_or_init(|| NapiRSThreadPool(ThreadPool::new(get_physical() * 2))); - let inner_task = Arc::new(ThreadSafeTask::new(task)); - let thread_task = Arc::clone(&inner_task); - let promise_task = Arc::clone(&inner_task); - threadpool.0.execute(move || { - let value = thread_task.borrow().compute(); - match sender.send(value) { - Err(e) => panic!(e), - _ => {} - }; - }); - let rev_value = async { - let result = receiver.await; - result - .map_err(|_| Error { - status: Status::Cancelled, - reason: Some("Receiver cancelled".to_owned()), - }) - .and_then(|v| v) - }; + pub fn spawn(&self, task: T) -> Result> { + let mut raw_promise = ptr::null_mut(); + let mut raw_deferred = ptr::null_mut(); - self.execute(rev_value, move |env, v| { - promise_task.borrow().resolve(env, v) - }) + check_status(unsafe { sys::napi_create_promise(self.0, &mut raw_deferred, &mut raw_promise) })?; + AsyncWork::run(self.0, task, raw_deferred)?; + Ok(Value::from_raw_value(self, raw_promise, Object)) } pub fn get_node_version(&self) -> Result { diff --git a/napi/src/task.rs b/napi/src/task.rs index 0b636de5..3951a403 100644 --- a/napi/src/task.rs +++ b/napi/src/task.rs @@ -1,40 +1,10 @@ -use threadpool::ThreadPool; - use crate::{Env, Result, Value, ValueType}; -pub struct NapiRSThreadPool(pub ThreadPool); - -unsafe impl Send for NapiRSThreadPool {} - -unsafe impl Sync for NapiRSThreadPool {} - pub trait Task { - type Output: Send + 'static; + type Output: Send + Sized + 'static; type JsValue: ValueType; - fn compute(&mut self) -> Result; + fn compute(&self) -> Result; fn resolve(&self, env: &mut Env, output: Self::Output) -> Result>; } - -pub struct ThreadSafeTask(pub *mut T); - -impl ThreadSafeTask { - pub fn new(task: T) -> ThreadSafeTask { - ThreadSafeTask(Box::into_raw(Box::new(task))) - } - - #[inline] - pub fn borrow(&self) -> &'static mut T { - Box::leak(unsafe { Box::from_raw(self.0) }) - } -} - -impl Drop for ThreadSafeTask { - fn drop(&mut self) { - unsafe { Box::from_raw(self.0) }; - } -} - -unsafe impl Send for ThreadSafeTask {} -unsafe impl Sync for ThreadSafeTask {} diff --git a/package.json b/package.json index 8dafa3d9..174e66c9 100644 --- a/package.json +++ b/package.json @@ -18,9 +18,10 @@ ], "license": "MIT", "scripts": { - "format": "run-p format:md format:json format:yaml format:source", + "format": "run-p format:md format:json format:yaml format:source format:rs", "format:md": "prettier --parser markdown --write './**/*.md'", "format:json": "prettier --parser json --write './**/*.json'", + "format:rs": "cargo fmt", "format:source": "prettier --config ./package.json --write './**/*.js'", "format:yaml": "prettier --parser yaml --write './**/*.{yml,yaml}'" }, diff --git a/test_module/index.js b/test_module/future.js similarity index 54% rename from test_module/index.js rename to test_module/future.js index aa8a95cf..f96b98c7 100644 --- a/test_module/index.js +++ b/test_module/future.js @@ -9,30 +9,17 @@ function testThrow() { console.log('=== Test throwing from Rust') try { testModule.testThrow() - console.error('Expected function to throw an error') + console.log('Expected function to throw an error') process.exit(1) } catch (e) { - console.error(e) + console.log(e) } } - -function testSpawnThread(n) { - console.info('=== Test spawn task to threadpool') - return testModule.testSpawnThread(n) -} - -const future = testSpawn() - -future +testSpawn() .then((value) => { console.info(`${value} from napi`) testThrow() }) - .then(() => testSpawnThread(20)) - .then((value) => { - console.assert(value === 6765) - console.info('=== fibonacci result', value) - }) .catch((e) => { console.error(e) process.exit(1) diff --git a/test_module/fuzzy.js b/test_module/fuzzy.js new file mode 100644 index 00000000..7a676778 --- /dev/null +++ b/test_module/fuzzy.js @@ -0,0 +1,34 @@ +const { exec } = require('child_process') + +Array.from({ length: 500 }) + .reduce(async (acc) => { + await acc + await run() + }, null) + .then(() => { + console.info(`Fuzzy test success, passed ${500} tests.`) + }) + .catch((e) => { + console.error(e) + process.exit(1) + }) + +const run = () => { + return new Promise((resolve, reject) => { + const testProcess = exec('node ./spawn.js', { + env: process.env, + }) + testProcess.stdout.pipe(process.stdout) + testProcess.stderr.pipe(process.stderr) + testProcess.on('error', (err) => { + reject(err) + }) + testProcess.on('exit', (code) => { + if (code) { + reject(new TypeError(`Child process exit code ${code}`)) + } else { + resolve() + } + }) + }) +} diff --git a/test_module/spawn.js b/test_module/spawn.js new file mode 100644 index 00000000..56a6417f --- /dev/null +++ b/test_module/spawn.js @@ -0,0 +1,16 @@ +const testModule = require('./index.node') + +function testSpawnThread(n) { + console.info('=== Test spawn task to threadpool') + return testModule.testSpawnThread(n) +} + +testSpawnThread(20) + .then((value) => { + console.assert(value === 6765) + console.info('=== fibonacci result', value) + }) + .catch((e) => { + console.error(e) + process.exit(1) + }) diff --git a/test_module/src/lib.rs b/test_module/src/lib.rs index 9d0f249b..de7adc3a 100644 --- a/test_module/src/lib.rs +++ b/test_module/src/lib.rs @@ -57,7 +57,7 @@ impl Task for ComputeFib { type Output = u32; type JsValue = Number; - fn compute(&mut self) -> Result { + fn compute(&self) -> Result { Ok(fibonacci_native(self.n)) }