refactor(spawn): napi_async_worker implementation

This commit is contained in:
LongYinan 2020-05-12 13:59:20 +08:00
parent 208bcac996
commit ec235d284d
No known key found for this signature in database
GPG key ID: A3FFE134A3E20881
13 changed files with 303 additions and 176 deletions

View file

@ -50,25 +50,27 @@ jobs:
path: target
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: check build
- name: Check build
uses: actions-rs/cargo@v1
with:
command: check
args: --all --bins --examples --tests -vvv
- name: tests
- name: Tests
uses: actions-rs/cargo@v1
timeout-minutes: 40
timeout-minutes: 5
with:
command: test
args: -p napi-rs --lib -- --nocapture
- name: test scripts
- name: Fuzzy tests
run: |
yarn
cd test_module
yarn build
yarn test
node fuzzy.js
env:
RUST_BACKTRACE: 1
- name: Clear the cargo caches
run: |

View file

@ -50,25 +50,27 @@ jobs:
path: target
key: ${{ matrix.version }}-x86_64-apple-darwin-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: check build
- name: Check build
uses: actions-rs/cargo@v1
with:
command: check
args: --all --bins --examples --tests -vvv
- name: tests
- name: Tests
uses: actions-rs/cargo@v1
timeout-minutes: 40
timeout-minutes: 5
with:
command: test
args: -p napi-rs --lib -- --nocapture
- name: test scripts
- name: Fuzzy tests
run: |
yarn
cd test_module
yarn build
yarn test
node fuzzy.js
env:
RUST_BACKTRACE: 1
- name: Clear the cargo caches
run: |

View file

@ -56,25 +56,25 @@ jobs:
path: target
key: ${{ matrix.version }}-${{ matrix.target }}-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: check build
- name: Check build
uses: actions-rs/cargo@v1
with:
command: check
args: -p napi-rs -vvv
- name: tests
- name: Tests
uses: actions-rs/cargo@v1
timeout-minutes: 40
timeout-minutes: 5
with:
command: test
args: -p napi-rs --lib -- --nocapture
- name: test scripts
- name: Fuzzy tests
run: |
yarn
cd test_module
yarn build
yarn test
node fuzzy.js
env:
RUST_BACKTRACE: 1

View file

@ -11,9 +11,6 @@ edition = "2018"
[dependencies]
futures = { version = "0.3", features = ["default", "thread-pool"] }
num_cpus = "1.13"
once_cell = "1.3"
threadpool = "1.8"
[target.'cfg(windows)'.build-dependencies]
flate2 = "1.0"

114
napi/src/async_work.rs Normal file
View file

@ -0,0 +1,114 @@
use std::mem;
use std::os::raw::{c_char, c_void};
use std::ptr;
use crate::{check_status, sys, Env, Result, Task};
pub struct AsyncWork<T: Task> {
inner_task: T,
deferred: sys::napi_deferred,
value: Result<*mut T::Output>,
}
impl<T: Task> AsyncWork<T> {
pub fn run(env: sys::napi_env, task: T, deferred: sys::napi_deferred) -> Result<()> {
let mut raw_resource = ptr::null_mut();
let status = unsafe { sys::napi_create_object(env, &mut raw_resource) };
check_status(status)?;
let mut raw_name = ptr::null_mut();
let s = "napi_rs_async";
let status = unsafe {
sys::napi_create_string_utf8(
env,
s.as_ptr() as *const c_char,
s.len() as u64,
&mut raw_name,
)
};
check_status(status)?;
let mut raw_context = ptr::null_mut();
unsafe {
let status = sys::napi_async_init(env, raw_resource, raw_name, &mut raw_context);
check_status(status)?;
};
let result = AsyncWork {
inner_task: task,
deferred,
value: Ok(ptr::null_mut()),
};
let mut async_work = ptr::null_mut();
check_status(unsafe {
sys::napi_create_async_work(
env,
raw_resource,
raw_name,
Some(execute::<T> as unsafe extern "C" fn(env: sys::napi_env, data: *mut c_void)),
Some(
complete::<T>
as unsafe extern "C" fn(
env: sys::napi_env,
status: sys::napi_status,
data: *mut c_void,
),
),
Box::leak(Box::new(result)) as *mut _ as *mut c_void,
&mut async_work,
)
})?;
check_status(unsafe { sys::napi_queue_async_work(env, async_work) })
}
}
unsafe impl<T: Task> Send for AsyncWork<T> {}
unsafe impl<T: Task> Sync for AsyncWork<T> {}
unsafe extern "C" fn execute<T: Task>(_env: sys::napi_env, data: *mut c_void) {
let mut work = Box::from_raw(data as *mut AsyncWork<T>);
work.value = work
.inner_task
.compute()
.map(|v| Box::into_raw(Box::from(v)));
Box::leak(work);
}
unsafe extern "C" fn complete<T: Task>(
env: sys::napi_env,
status: sys::napi_status,
data: *mut c_void,
) {
let mut work = Box::from_raw(data as *mut AsyncWork<T>);
let value_ptr = mem::replace(&mut work.value, Ok(ptr::null_mut()));
let deferred = mem::replace(&mut work.deferred, ptr::null_mut());
let value = value_ptr.and_then(move |v| {
let mut env = Env::from_raw(env);
let output = ptr::read(v as *const _);
work.inner_task.resolve(&mut env, output)
});
let mut handle_scope = ptr::null_mut();
match check_status(status).and_then(move |_| value) {
Ok(v) => {
let open_handle_status = sys::napi_open_handle_scope(env, &mut handle_scope);
debug_assert!(
open_handle_status == sys::napi_status::napi_ok,
"OpenHandleScope failed"
);
let status = sys::napi_resolve_deferred(env, deferred, v.raw_value);
debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed");
}
Err(e) => {
let open_handle_status = sys::napi_open_handle_scope(env, &mut handle_scope);
debug_assert!(
open_handle_status == sys::napi_status::napi_ok,
"OpenHandleScope failed"
);
let status = sys::napi_reject_deferred(env, deferred, e.into_raw(env));
debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed");
}
};
let close_handle_scope_status = sys::napi_close_handle_scope(env, handle_scope);
debug_assert!(
close_handle_scope_status == sys::napi_status::napi_ok,
"Close handle scope failed"
);
}

View file

@ -1,82 +1,99 @@
use futures::task::Poll;
use std::alloc::{alloc, Layout};
extern crate alloc;
use alloc::alloc::{alloc, alloc_zeroed, Layout};
use futures::future::LocalBoxFuture;
use futures::task::{waker, ArcWake, Context, Poll};
use std::future::Future;
use std::os::raw::c_void;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, RawWaker, RawWakerVTable, Waker};
use crate::{sys, Error, Result, Status};
const UV_ASYNC_V_TABLE: RawWakerVTable = RawWakerVTable::new(
clone_executor,
wake_uv_async,
wake_uv_async_by_ref,
drop_uv_async,
);
unsafe fn clone_executor(uv_async_t: *const ()) -> RawWaker {
RawWaker::new(uv_async_t, &UV_ASYNC_V_TABLE)
}
unsafe fn wake_uv_async(uv_async_t: *const ()) {
let status = sys::uv_async_send(uv_async_t as *mut sys::uv_async_t);
assert!(status == 0, "wake_uv_async failed");
}
unsafe fn wake_uv_async_by_ref(uv_async_t: *const ()) {
let status = sys::uv_async_send(uv_async_t as *mut sys::uv_async_t);
assert!(status == 0, "wake_uv_async_by_ref failed");
}
unsafe fn drop_uv_async(_uv_async_t_ptr: *const ()) {}
struct Task<'a> {
future: Pin<Box<dyn Future<Output = ()>>>,
future: LocalBoxFuture<'a, ()>,
context: Context<'a>,
is_polling: AtomicBool,
}
struct UvWaker(*mut sys::uv_async_t);
unsafe impl Send for UvWaker {}
unsafe impl Sync for UvWaker {}
impl UvWaker {
fn new(event_loop: *mut sys::uv_loop_s) -> Result<UvWaker> {
let uv_async_t = unsafe {
let layout = Layout::new::<sys::uv_async_t>();
debug_assert!(layout.size() != 0, "uv_async_t alloc size should not be 0");
if cfg!(windows) {
alloc_zeroed(layout) as *mut sys::uv_async_t
} else {
alloc(layout) as *mut sys::uv_async_t
}
};
unsafe {
let status = sys::uv_async_init(event_loop, uv_async_t, Some(poll_future));
if status != 0 {
return Err(Error {
status: Status::Unknown,
reason: Some("Non-zero status returned from uv_async_init".to_owned()),
});
}
};
Ok(UvWaker(uv_async_t))
}
#[inline]
fn assign_task(&self, mut task: Task) {
if !task.poll_future() {
task.is_polling.store(false, Ordering::Relaxed);
let arc_task = Arc::new(task);
unsafe {
sys::uv_handle_set_data(
self.0 as *mut sys::uv_handle_t,
Arc::into_raw(arc_task) as *mut c_void,
)
};
} else {
unsafe { sys::uv_close(self.0 as *mut sys::uv_handle_t, None) };
};
}
}
impl ArcWake for UvWaker {
fn wake_by_ref(arc_self: &Arc<Self>) {
let status = unsafe { sys::uv_async_send(arc_self.0) };
assert!(status == 0, "wake_uv_async_by_ref failed");
}
}
#[inline]
pub fn execute<F: 'static + Future<Output = ()>>(
event_loop: *mut sys::uv_loop_s,
future: F,
) -> Result<()> {
let uv_async_t = unsafe { alloc(Layout::new::<sys::uv_async_t>()) as *mut sys::uv_async_t };
unsafe {
let status = sys::uv_async_init(event_loop, uv_async_t, Some(poll_future));
if status != 0 {
return Err(Error {
status: Status::Unknown,
reason: Some("Non-zero status returned from uv_async_init".to_owned()),
});
}
pub fn execute(event_loop: *mut sys::uv_loop_s, future: LocalBoxFuture<()>) -> Result<()> {
let uv_waker = UvWaker::new(event_loop)?;
let arc_waker = Arc::new(uv_waker);
let waker_to_poll = Arc::clone(&arc_waker);
let waker = waker(arc_waker);
let context = Context::from_waker(&waker);
let task = Task {
future,
context,
is_polling: AtomicBool::from(false),
};
unsafe {
let waker = Waker::from_raw(RawWaker::new(
uv_async_t as *const _ as *const (),
&UV_ASYNC_V_TABLE,
));
let context = Context::from_waker(&waker);
let mut task = Task {
future: Box::pin(future),
context,
};
if !task.poll_future() {
let arc_task = Arc::new(task);
sys::uv_handle_set_data(
uv_async_t as *mut _ as *mut sys::uv_handle_t,
Arc::into_raw(arc_task) as *mut c_void,
);
} else {
sys::uv_close(uv_async_t as *mut _ as *mut sys::uv_handle_t, None);
};
Ok(())
}
waker_to_poll.assign_task(task);
Ok(())
}
impl<'a> Task<'a> {
fn poll_future(&mut self) -> bool {
match self.future.as_mut().poll(&mut self.context) {
if self.is_polling.load(Ordering::Relaxed) {
return false;
}
self.is_polling.store(true, Ordering::Relaxed);
let mut pinned = Pin::new(&mut self.future);
let fut_mut = pinned.as_mut();
match fut_mut.poll(&mut self.context) {
Poll::Ready(_) => true,
Poll::Pending => false,
}
@ -90,6 +107,7 @@ unsafe extern "C" fn poll_future(handle: *mut sys::uv_async_t) {
if mut_task.poll_future() {
sys::uv_close(handle as *mut sys::uv_handle_t, None);
} else {
mut_task.is_polling.store(false, Ordering::Relaxed);
Arc::into_raw(task);
};
} else {

View file

@ -1,8 +1,6 @@
use async_work::AsyncWork;
use core::fmt::Debug;
use futures::channel::oneshot::channel;
use futures::prelude::*;
use num_cpus::get_physical;
use once_cell::sync::OnceCell;
use std::any::TypeId;
use std::convert::{TryFrom, TryInto};
use std::ffi::CString;
@ -14,9 +12,8 @@ use std::ptr;
use std::slice;
use std::str;
use std::string::String as RustString;
use std::sync::Arc;
use threadpool::ThreadPool;
mod async_work;
mod call_context;
mod executor;
mod promise;
@ -27,14 +24,11 @@ mod version;
pub use call_context::CallContext;
pub use sys::{napi_valuetype, Status};
pub use task::Task;
use task::{NapiRSThreadPool, ThreadSafeTask};
pub use version::NodeVersion;
pub type Result<T> = std::result::Result<T, Error>;
pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value;
static THREAD_POOL: OnceCell<NapiRSThreadPool> = OnceCell::new();
#[derive(Debug, Clone)]
pub struct Error {
pub status: Status,
@ -179,6 +173,27 @@ impl Error {
reason: None,
}
}
pub fn into_raw(self, env: sys::napi_env) -> sys::napi_value {
let mut err = ptr::null_mut();
let s = self.reason.unwrap_or("NAPI error".to_owned());
unsafe {
let mut err_reason = ptr::null_mut();
let status = sys::napi_create_string_utf8(
env,
s.as_ptr() as *const c_char,
s.len() as u64,
&mut err_reason,
);
debug_assert!(
status == sys::napi_status::napi_ok,
"Create error reason failed"
);
let status = sys::napi_create_error(env, ptr::null_mut(), err_reason, &mut err);
debug_assert!(status == sys::napi_status::napi_ok, "Create error failed");
};
err
}
}
impl From<std::ffi::NulError> for Error {
@ -555,8 +570,7 @@ impl Env {
let event_loop = unsafe { sys::uv_default_loop() };
let raw_env = self.0;
executor::execute(
event_loop,
let future_to_execute =
promise::resolve(self.0, deferred, resolver, raw_deferred).map(move |v| match v {
Ok(value) => value,
Err(e) => {
@ -571,47 +585,19 @@ impl Env {
eprintln!("{:?}", &cloned_error);
panic!(cloned_error);
}
}),
)?;
});
executor::execute(event_loop, Box::pin(future_to_execute))?;
Ok(Value::from_raw_value(self, raw_promise, Object))
}
#[inline]
pub fn spawn<
V: 'static + ValueType,
O: Send + 'static,
T: 'static + Send + Task<Output = O, JsValue = V>,
>(
&self,
task: T,
) -> Result<Value<Object>> {
let (sender, receiver) = channel::<Result<O>>();
let threadpool =
THREAD_POOL.get_or_init(|| NapiRSThreadPool(ThreadPool::new(get_physical() * 2)));
let inner_task = Arc::new(ThreadSafeTask::new(task));
let thread_task = Arc::clone(&inner_task);
let promise_task = Arc::clone(&inner_task);
threadpool.0.execute(move || {
let value = thread_task.borrow().compute();
match sender.send(value) {
Err(e) => panic!(e),
_ => {}
};
});
let rev_value = async {
let result = receiver.await;
result
.map_err(|_| Error {
status: Status::Cancelled,
reason: Some("Receiver cancelled".to_owned()),
})
.and_then(|v| v)
};
pub fn spawn<T: 'static + Task>(&self, task: T) -> Result<Value<Object>> {
let mut raw_promise = ptr::null_mut();
let mut raw_deferred = ptr::null_mut();
self.execute(rev_value, move |env, v| {
promise_task.borrow().resolve(env, v)
})
check_status(unsafe { sys::napi_create_promise(self.0, &mut raw_deferred, &mut raw_promise) })?;
AsyncWork::run(self.0, task, raw_deferred)?;
Ok(Value::from_raw_value(self, raw_promise, Object))
}
pub fn get_node_version(&self) -> Result<NodeVersion> {

View file

@ -1,40 +1,10 @@
use threadpool::ThreadPool;
use crate::{Env, Result, Value, ValueType};
pub struct NapiRSThreadPool(pub ThreadPool);
unsafe impl Send for NapiRSThreadPool {}
unsafe impl Sync for NapiRSThreadPool {}
pub trait Task {
type Output: Send + 'static;
type Output: Send + Sized + 'static;
type JsValue: ValueType;
fn compute(&mut self) -> Result<Self::Output>;
fn compute(&self) -> Result<Self::Output>;
fn resolve(&self, env: &mut Env, output: Self::Output) -> Result<Value<Self::JsValue>>;
}
pub struct ThreadSafeTask<T: Task>(pub *mut T);
impl<T: Task> ThreadSafeTask<T> {
pub fn new(task: T) -> ThreadSafeTask<T> {
ThreadSafeTask(Box::into_raw(Box::new(task)))
}
#[inline]
pub fn borrow(&self) -> &'static mut T {
Box::leak(unsafe { Box::from_raw(self.0) })
}
}
impl<T: Task> Drop for ThreadSafeTask<T> {
fn drop(&mut self) {
unsafe { Box::from_raw(self.0) };
}
}
unsafe impl<T: Task> Send for ThreadSafeTask<T> {}
unsafe impl<T: Task> Sync for ThreadSafeTask<T> {}

View file

@ -18,9 +18,10 @@
],
"license": "MIT",
"scripts": {
"format": "run-p format:md format:json format:yaml format:source",
"format": "run-p format:md format:json format:yaml format:source format:rs",
"format:md": "prettier --parser markdown --write './**/*.md'",
"format:json": "prettier --parser json --write './**/*.json'",
"format:rs": "cargo fmt",
"format:source": "prettier --config ./package.json --write './**/*.js'",
"format:yaml": "prettier --parser yaml --write './**/*.{yml,yaml}'"
},

View file

@ -9,30 +9,17 @@ function testThrow() {
console.log('=== Test throwing from Rust')
try {
testModule.testThrow()
console.error('Expected function to throw an error')
console.log('Expected function to throw an error')
process.exit(1)
} catch (e) {
console.error(e)
console.log(e)
}
}
function testSpawnThread(n) {
console.info('=== Test spawn task to threadpool')
return testModule.testSpawnThread(n)
}
const future = testSpawn()
future
testSpawn()
.then((value) => {
console.info(`${value} from napi`)
testThrow()
})
.then(() => testSpawnThread(20))
.then((value) => {
console.assert(value === 6765)
console.info('=== fibonacci result', value)
})
.catch((e) => {
console.error(e)
process.exit(1)

34
test_module/fuzzy.js Normal file
View file

@ -0,0 +1,34 @@
const { exec } = require('child_process')
Array.from({ length: 500 })
.reduce(async (acc) => {
await acc
await run()
}, null)
.then(() => {
console.info(`Fuzzy test success, passed ${500} tests.`)
})
.catch((e) => {
console.error(e)
process.exit(1)
})
const run = () => {
return new Promise((resolve, reject) => {
const testProcess = exec('node ./spawn.js', {
env: process.env,
})
testProcess.stdout.pipe(process.stdout)
testProcess.stderr.pipe(process.stderr)
testProcess.on('error', (err) => {
reject(err)
})
testProcess.on('exit', (code) => {
if (code) {
reject(new TypeError(`Child process exit code ${code}`))
} else {
resolve()
}
})
})
}

16
test_module/spawn.js Normal file
View file

@ -0,0 +1,16 @@
const testModule = require('./index.node')
function testSpawnThread(n) {
console.info('=== Test spawn task to threadpool')
return testModule.testSpawnThread(n)
}
testSpawnThread(20)
.then((value) => {
console.assert(value === 6765)
console.info('=== fibonacci result', value)
})
.catch((e) => {
console.error(e)
process.exit(1)
})

View file

@ -57,7 +57,7 @@ impl Task for ComputeFib {
type Output = u32;
type JsValue = Number;
fn compute(&mut self) -> Result<Self::Output> {
fn compute(&self) -> Result<Self::Output> {
Ok(fibonacci_native(self.n))
}