Merge pull request #48 from Brooooooklyn/fix-windows-alloc
refactor(spawn): napi_async_worker implementation
This commit is contained in:
commit
96b8bc104d
13 changed files with 303 additions and 176 deletions
12
.github/workflows/linux.yaml
vendored
12
.github/workflows/linux.yaml
vendored
|
@ -50,25 +50,27 @@ jobs:
|
||||||
path: target
|
path: target
|
||||||
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
|
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
|
||||||
|
|
||||||
- name: check build
|
- name: Check build
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
with:
|
with:
|
||||||
command: check
|
command: check
|
||||||
args: --all --bins --examples --tests -vvv
|
args: --all --bins --examples --tests -vvv
|
||||||
|
|
||||||
- name: tests
|
- name: Tests
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
timeout-minutes: 40
|
timeout-minutes: 5
|
||||||
with:
|
with:
|
||||||
command: test
|
command: test
|
||||||
args: -p napi-rs --lib -- --nocapture
|
args: -p napi-rs --lib -- --nocapture
|
||||||
|
|
||||||
- name: test scripts
|
- name: Fuzzy tests
|
||||||
run: |
|
run: |
|
||||||
yarn
|
yarn
|
||||||
cd test_module
|
cd test_module
|
||||||
yarn build
|
yarn build
|
||||||
yarn test
|
node fuzzy.js
|
||||||
|
env:
|
||||||
|
RUST_BACKTRACE: 1
|
||||||
|
|
||||||
- name: Clear the cargo caches
|
- name: Clear the cargo caches
|
||||||
run: |
|
run: |
|
||||||
|
|
12
.github/workflows/macos.yaml
vendored
12
.github/workflows/macos.yaml
vendored
|
@ -50,25 +50,27 @@ jobs:
|
||||||
path: target
|
path: target
|
||||||
key: ${{ matrix.version }}-x86_64-apple-darwin-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
|
key: ${{ matrix.version }}-x86_64-apple-darwin-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
|
||||||
|
|
||||||
- name: check build
|
- name: Check build
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
with:
|
with:
|
||||||
command: check
|
command: check
|
||||||
args: --all --bins --examples --tests -vvv
|
args: --all --bins --examples --tests -vvv
|
||||||
|
|
||||||
- name: tests
|
- name: Tests
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
timeout-minutes: 40
|
timeout-minutes: 5
|
||||||
with:
|
with:
|
||||||
command: test
|
command: test
|
||||||
args: -p napi-rs --lib -- --nocapture
|
args: -p napi-rs --lib -- --nocapture
|
||||||
|
|
||||||
- name: test scripts
|
- name: Fuzzy tests
|
||||||
run: |
|
run: |
|
||||||
yarn
|
yarn
|
||||||
cd test_module
|
cd test_module
|
||||||
yarn build
|
yarn build
|
||||||
yarn test
|
node fuzzy.js
|
||||||
|
env:
|
||||||
|
RUST_BACKTRACE: 1
|
||||||
|
|
||||||
- name: Clear the cargo caches
|
- name: Clear the cargo caches
|
||||||
run: |
|
run: |
|
||||||
|
|
10
.github/workflows/windows.yaml
vendored
10
.github/workflows/windows.yaml
vendored
|
@ -56,25 +56,25 @@ jobs:
|
||||||
path: target
|
path: target
|
||||||
key: ${{ matrix.version }}-${{ matrix.target }}-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
|
key: ${{ matrix.version }}-${{ matrix.target }}-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
|
||||||
|
|
||||||
- name: check build
|
- name: Check build
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
with:
|
with:
|
||||||
command: check
|
command: check
|
||||||
args: -p napi-rs -vvv
|
args: -p napi-rs -vvv
|
||||||
|
|
||||||
- name: tests
|
- name: Tests
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
timeout-minutes: 40
|
timeout-minutes: 5
|
||||||
with:
|
with:
|
||||||
command: test
|
command: test
|
||||||
args: -p napi-rs --lib -- --nocapture
|
args: -p napi-rs --lib -- --nocapture
|
||||||
|
|
||||||
- name: test scripts
|
- name: Fuzzy tests
|
||||||
run: |
|
run: |
|
||||||
yarn
|
yarn
|
||||||
cd test_module
|
cd test_module
|
||||||
yarn build
|
yarn build
|
||||||
yarn test
|
node fuzzy.js
|
||||||
env:
|
env:
|
||||||
RUST_BACKTRACE: 1
|
RUST_BACKTRACE: 1
|
||||||
|
|
||||||
|
|
|
@ -11,9 +11,6 @@ edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
futures = { version = "0.3", features = ["default", "thread-pool"] }
|
futures = { version = "0.3", features = ["default", "thread-pool"] }
|
||||||
num_cpus = "1.13"
|
|
||||||
once_cell = "1.3"
|
|
||||||
threadpool = "1.8"
|
|
||||||
|
|
||||||
[target.'cfg(windows)'.build-dependencies]
|
[target.'cfg(windows)'.build-dependencies]
|
||||||
flate2 = "1.0"
|
flate2 = "1.0"
|
||||||
|
|
114
napi/src/async_work.rs
Normal file
114
napi/src/async_work.rs
Normal file
|
@ -0,0 +1,114 @@
|
||||||
|
use std::mem;
|
||||||
|
use std::os::raw::{c_char, c_void};
|
||||||
|
use std::ptr;
|
||||||
|
|
||||||
|
use crate::{check_status, sys, Env, Result, Task};
|
||||||
|
|
||||||
|
pub struct AsyncWork<T: Task> {
|
||||||
|
inner_task: T,
|
||||||
|
deferred: sys::napi_deferred,
|
||||||
|
value: Result<*mut T::Output>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Task> AsyncWork<T> {
|
||||||
|
pub fn run(env: sys::napi_env, task: T, deferred: sys::napi_deferred) -> Result<()> {
|
||||||
|
let mut raw_resource = ptr::null_mut();
|
||||||
|
let status = unsafe { sys::napi_create_object(env, &mut raw_resource) };
|
||||||
|
check_status(status)?;
|
||||||
|
let mut raw_name = ptr::null_mut();
|
||||||
|
let s = "napi_rs_async";
|
||||||
|
let status = unsafe {
|
||||||
|
sys::napi_create_string_utf8(
|
||||||
|
env,
|
||||||
|
s.as_ptr() as *const c_char,
|
||||||
|
s.len() as u64,
|
||||||
|
&mut raw_name,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
check_status(status)?;
|
||||||
|
let mut raw_context = ptr::null_mut();
|
||||||
|
unsafe {
|
||||||
|
let status = sys::napi_async_init(env, raw_resource, raw_name, &mut raw_context);
|
||||||
|
check_status(status)?;
|
||||||
|
};
|
||||||
|
let result = AsyncWork {
|
||||||
|
inner_task: task,
|
||||||
|
deferred,
|
||||||
|
value: Ok(ptr::null_mut()),
|
||||||
|
};
|
||||||
|
let mut async_work = ptr::null_mut();
|
||||||
|
check_status(unsafe {
|
||||||
|
sys::napi_create_async_work(
|
||||||
|
env,
|
||||||
|
raw_resource,
|
||||||
|
raw_name,
|
||||||
|
Some(execute::<T> as unsafe extern "C" fn(env: sys::napi_env, data: *mut c_void)),
|
||||||
|
Some(
|
||||||
|
complete::<T>
|
||||||
|
as unsafe extern "C" fn(
|
||||||
|
env: sys::napi_env,
|
||||||
|
status: sys::napi_status,
|
||||||
|
data: *mut c_void,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
Box::leak(Box::new(result)) as *mut _ as *mut c_void,
|
||||||
|
&mut async_work,
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
check_status(unsafe { sys::napi_queue_async_work(env, async_work) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl<T: Task> Send for AsyncWork<T> {}
|
||||||
|
|
||||||
|
unsafe impl<T: Task> Sync for AsyncWork<T> {}
|
||||||
|
|
||||||
|
unsafe extern "C" fn execute<T: Task>(_env: sys::napi_env, data: *mut c_void) {
|
||||||
|
let mut work = Box::from_raw(data as *mut AsyncWork<T>);
|
||||||
|
work.value = work
|
||||||
|
.inner_task
|
||||||
|
.compute()
|
||||||
|
.map(|v| Box::into_raw(Box::from(v)));
|
||||||
|
Box::leak(work);
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe extern "C" fn complete<T: Task>(
|
||||||
|
env: sys::napi_env,
|
||||||
|
status: sys::napi_status,
|
||||||
|
data: *mut c_void,
|
||||||
|
) {
|
||||||
|
let mut work = Box::from_raw(data as *mut AsyncWork<T>);
|
||||||
|
let value_ptr = mem::replace(&mut work.value, Ok(ptr::null_mut()));
|
||||||
|
let deferred = mem::replace(&mut work.deferred, ptr::null_mut());
|
||||||
|
let value = value_ptr.and_then(move |v| {
|
||||||
|
let mut env = Env::from_raw(env);
|
||||||
|
let output = ptr::read(v as *const _);
|
||||||
|
work.inner_task.resolve(&mut env, output)
|
||||||
|
});
|
||||||
|
let mut handle_scope = ptr::null_mut();
|
||||||
|
match check_status(status).and_then(move |_| value) {
|
||||||
|
Ok(v) => {
|
||||||
|
let open_handle_status = sys::napi_open_handle_scope(env, &mut handle_scope);
|
||||||
|
debug_assert!(
|
||||||
|
open_handle_status == sys::napi_status::napi_ok,
|
||||||
|
"OpenHandleScope failed"
|
||||||
|
);
|
||||||
|
let status = sys::napi_resolve_deferred(env, deferred, v.raw_value);
|
||||||
|
debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let open_handle_status = sys::napi_open_handle_scope(env, &mut handle_scope);
|
||||||
|
debug_assert!(
|
||||||
|
open_handle_status == sys::napi_status::napi_ok,
|
||||||
|
"OpenHandleScope failed"
|
||||||
|
);
|
||||||
|
let status = sys::napi_reject_deferred(env, deferred, e.into_raw(env));
|
||||||
|
debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let close_handle_scope_status = sys::napi_close_handle_scope(env, handle_scope);
|
||||||
|
debug_assert!(
|
||||||
|
close_handle_scope_status == sys::napi_status::napi_ok,
|
||||||
|
"Close handle scope failed"
|
||||||
|
);
|
||||||
|
}
|
|
@ -1,47 +1,38 @@
|
||||||
use futures::task::Poll;
|
extern crate alloc;
|
||||||
use std::alloc::{alloc, Layout};
|
|
||||||
|
use alloc::alloc::{alloc, alloc_zeroed, Layout};
|
||||||
|
use futures::future::LocalBoxFuture;
|
||||||
|
use futures::task::{waker, ArcWake, Context, Poll};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::os::raw::c_void;
|
use std::os::raw::c_void;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::task::{Context, RawWaker, RawWakerVTable, Waker};
|
|
||||||
|
|
||||||
use crate::{sys, Error, Result, Status};
|
use crate::{sys, Error, Result, Status};
|
||||||
|
|
||||||
const UV_ASYNC_V_TABLE: RawWakerVTable = RawWakerVTable::new(
|
|
||||||
clone_executor,
|
|
||||||
wake_uv_async,
|
|
||||||
wake_uv_async_by_ref,
|
|
||||||
drop_uv_async,
|
|
||||||
);
|
|
||||||
|
|
||||||
unsafe fn clone_executor(uv_async_t: *const ()) -> RawWaker {
|
|
||||||
RawWaker::new(uv_async_t, &UV_ASYNC_V_TABLE)
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe fn wake_uv_async(uv_async_t: *const ()) {
|
|
||||||
let status = sys::uv_async_send(uv_async_t as *mut sys::uv_async_t);
|
|
||||||
assert!(status == 0, "wake_uv_async failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe fn wake_uv_async_by_ref(uv_async_t: *const ()) {
|
|
||||||
let status = sys::uv_async_send(uv_async_t as *mut sys::uv_async_t);
|
|
||||||
assert!(status == 0, "wake_uv_async_by_ref failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe fn drop_uv_async(_uv_async_t_ptr: *const ()) {}
|
|
||||||
|
|
||||||
struct Task<'a> {
|
struct Task<'a> {
|
||||||
future: Pin<Box<dyn Future<Output = ()>>>,
|
future: LocalBoxFuture<'a, ()>,
|
||||||
context: Context<'a>,
|
context: Context<'a>,
|
||||||
|
is_polling: AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
struct UvWaker(*mut sys::uv_async_t);
|
||||||
pub fn execute<F: 'static + Future<Output = ()>>(
|
|
||||||
event_loop: *mut sys::uv_loop_s,
|
unsafe impl Send for UvWaker {}
|
||||||
future: F,
|
unsafe impl Sync for UvWaker {}
|
||||||
) -> Result<()> {
|
|
||||||
let uv_async_t = unsafe { alloc(Layout::new::<sys::uv_async_t>()) as *mut sys::uv_async_t };
|
impl UvWaker {
|
||||||
|
fn new(event_loop: *mut sys::uv_loop_s) -> Result<UvWaker> {
|
||||||
|
let uv_async_t = unsafe {
|
||||||
|
let layout = Layout::new::<sys::uv_async_t>();
|
||||||
|
debug_assert!(layout.size() != 0, "uv_async_t alloc size should not be 0");
|
||||||
|
if cfg!(windows) {
|
||||||
|
alloc_zeroed(layout) as *mut sys::uv_async_t
|
||||||
|
} else {
|
||||||
|
alloc(layout) as *mut sys::uv_async_t
|
||||||
|
}
|
||||||
|
};
|
||||||
unsafe {
|
unsafe {
|
||||||
let status = sys::uv_async_init(event_loop, uv_async_t, Some(poll_future));
|
let status = sys::uv_async_init(event_loop, uv_async_t, Some(poll_future));
|
||||||
if status != 0 {
|
if status != 0 {
|
||||||
|
@ -51,32 +42,58 @@ pub fn execute<F: 'static + Future<Output = ()>>(
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
unsafe {
|
Ok(UvWaker(uv_async_t))
|
||||||
let waker = Waker::from_raw(RawWaker::new(
|
|
||||||
uv_async_t as *const _ as *const (),
|
|
||||||
&UV_ASYNC_V_TABLE,
|
|
||||||
));
|
|
||||||
let context = Context::from_waker(&waker);
|
|
||||||
let mut task = Task {
|
|
||||||
future: Box::pin(future),
|
|
||||||
context,
|
|
||||||
};
|
|
||||||
if !task.poll_future() {
|
|
||||||
let arc_task = Arc::new(task);
|
|
||||||
sys::uv_handle_set_data(
|
|
||||||
uv_async_t as *mut _ as *mut sys::uv_handle_t,
|
|
||||||
Arc::into_raw(arc_task) as *mut c_void,
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
sys::uv_close(uv_async_t as *mut _ as *mut sys::uv_handle_t, None);
|
|
||||||
};
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn assign_task(&self, mut task: Task) {
|
||||||
|
if !task.poll_future() {
|
||||||
|
task.is_polling.store(false, Ordering::Relaxed);
|
||||||
|
let arc_task = Arc::new(task);
|
||||||
|
unsafe {
|
||||||
|
sys::uv_handle_set_data(
|
||||||
|
self.0 as *mut sys::uv_handle_t,
|
||||||
|
Arc::into_raw(arc_task) as *mut c_void,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
unsafe { sys::uv_close(self.0 as *mut sys::uv_handle_t, None) };
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ArcWake for UvWaker {
|
||||||
|
fn wake_by_ref(arc_self: &Arc<Self>) {
|
||||||
|
let status = unsafe { sys::uv_async_send(arc_self.0) };
|
||||||
|
assert!(status == 0, "wake_uv_async_by_ref failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn execute(event_loop: *mut sys::uv_loop_s, future: LocalBoxFuture<()>) -> Result<()> {
|
||||||
|
let uv_waker = UvWaker::new(event_loop)?;
|
||||||
|
let arc_waker = Arc::new(uv_waker);
|
||||||
|
let waker_to_poll = Arc::clone(&arc_waker);
|
||||||
|
let waker = waker(arc_waker);
|
||||||
|
let context = Context::from_waker(&waker);
|
||||||
|
let task = Task {
|
||||||
|
future,
|
||||||
|
context,
|
||||||
|
is_polling: AtomicBool::from(false),
|
||||||
|
};
|
||||||
|
waker_to_poll.assign_task(task);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Task<'a> {
|
impl<'a> Task<'a> {
|
||||||
fn poll_future(&mut self) -> bool {
|
fn poll_future(&mut self) -> bool {
|
||||||
match self.future.as_mut().poll(&mut self.context) {
|
if self.is_polling.load(Ordering::Relaxed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
self.is_polling.store(true, Ordering::Relaxed);
|
||||||
|
let mut pinned = Pin::new(&mut self.future);
|
||||||
|
let fut_mut = pinned.as_mut();
|
||||||
|
match fut_mut.poll(&mut self.context) {
|
||||||
Poll::Ready(_) => true,
|
Poll::Ready(_) => true,
|
||||||
Poll::Pending => false,
|
Poll::Pending => false,
|
||||||
}
|
}
|
||||||
|
@ -90,6 +107,7 @@ unsafe extern "C" fn poll_future(handle: *mut sys::uv_async_t) {
|
||||||
if mut_task.poll_future() {
|
if mut_task.poll_future() {
|
||||||
sys::uv_close(handle as *mut sys::uv_handle_t, None);
|
sys::uv_close(handle as *mut sys::uv_handle_t, None);
|
||||||
} else {
|
} else {
|
||||||
|
mut_task.is_polling.store(false, Ordering::Relaxed);
|
||||||
Arc::into_raw(task);
|
Arc::into_raw(task);
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
|
use async_work::AsyncWork;
|
||||||
use core::fmt::Debug;
|
use core::fmt::Debug;
|
||||||
use futures::channel::oneshot::channel;
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use num_cpus::get_physical;
|
|
||||||
use once_cell::sync::OnceCell;
|
|
||||||
use std::any::TypeId;
|
use std::any::TypeId;
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::convert::{TryFrom, TryInto};
|
||||||
use std::ffi::CString;
|
use std::ffi::CString;
|
||||||
|
@ -14,9 +12,8 @@ use std::ptr;
|
||||||
use std::slice;
|
use std::slice;
|
||||||
use std::str;
|
use std::str;
|
||||||
use std::string::String as RustString;
|
use std::string::String as RustString;
|
||||||
use std::sync::Arc;
|
|
||||||
use threadpool::ThreadPool;
|
|
||||||
|
|
||||||
|
mod async_work;
|
||||||
mod call_context;
|
mod call_context;
|
||||||
mod executor;
|
mod executor;
|
||||||
mod promise;
|
mod promise;
|
||||||
|
@ -27,14 +24,11 @@ mod version;
|
||||||
pub use call_context::CallContext;
|
pub use call_context::CallContext;
|
||||||
pub use sys::{napi_valuetype, Status};
|
pub use sys::{napi_valuetype, Status};
|
||||||
pub use task::Task;
|
pub use task::Task;
|
||||||
use task::{NapiRSThreadPool, ThreadSafeTask};
|
|
||||||
pub use version::NodeVersion;
|
pub use version::NodeVersion;
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value;
|
pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value;
|
||||||
|
|
||||||
static THREAD_POOL: OnceCell<NapiRSThreadPool> = OnceCell::new();
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Error {
|
pub struct Error {
|
||||||
pub status: Status,
|
pub status: Status,
|
||||||
|
@ -179,6 +173,27 @@ impl Error {
|
||||||
reason: None,
|
reason: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn into_raw(self, env: sys::napi_env) -> sys::napi_value {
|
||||||
|
let mut err = ptr::null_mut();
|
||||||
|
let s = self.reason.unwrap_or("NAPI error".to_owned());
|
||||||
|
unsafe {
|
||||||
|
let mut err_reason = ptr::null_mut();
|
||||||
|
let status = sys::napi_create_string_utf8(
|
||||||
|
env,
|
||||||
|
s.as_ptr() as *const c_char,
|
||||||
|
s.len() as u64,
|
||||||
|
&mut err_reason,
|
||||||
|
);
|
||||||
|
debug_assert!(
|
||||||
|
status == sys::napi_status::napi_ok,
|
||||||
|
"Create error reason failed"
|
||||||
|
);
|
||||||
|
let status = sys::napi_create_error(env, ptr::null_mut(), err_reason, &mut err);
|
||||||
|
debug_assert!(status == sys::napi_status::napi_ok, "Create error failed");
|
||||||
|
};
|
||||||
|
err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<std::ffi::NulError> for Error {
|
impl From<std::ffi::NulError> for Error {
|
||||||
|
@ -555,8 +570,7 @@ impl Env {
|
||||||
|
|
||||||
let event_loop = unsafe { sys::uv_default_loop() };
|
let event_loop = unsafe { sys::uv_default_loop() };
|
||||||
let raw_env = self.0;
|
let raw_env = self.0;
|
||||||
executor::execute(
|
let future_to_execute =
|
||||||
event_loop,
|
|
||||||
promise::resolve(self.0, deferred, resolver, raw_deferred).map(move |v| match v {
|
promise::resolve(self.0, deferred, resolver, raw_deferred).map(move |v| match v {
|
||||||
Ok(value) => value,
|
Ok(value) => value,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -571,47 +585,19 @@ impl Env {
|
||||||
eprintln!("{:?}", &cloned_error);
|
eprintln!("{:?}", &cloned_error);
|
||||||
panic!(cloned_error);
|
panic!(cloned_error);
|
||||||
}
|
}
|
||||||
}),
|
});
|
||||||
)?;
|
executor::execute(event_loop, Box::pin(future_to_execute))?;
|
||||||
|
|
||||||
Ok(Value::from_raw_value(self, raw_promise, Object))
|
Ok(Value::from_raw_value(self, raw_promise, Object))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
pub fn spawn<T: 'static + Task>(&self, task: T) -> Result<Value<Object>> {
|
||||||
pub fn spawn<
|
let mut raw_promise = ptr::null_mut();
|
||||||
V: 'static + ValueType,
|
let mut raw_deferred = ptr::null_mut();
|
||||||
O: Send + 'static,
|
|
||||||
T: 'static + Send + Task<Output = O, JsValue = V>,
|
|
||||||
>(
|
|
||||||
&self,
|
|
||||||
task: T,
|
|
||||||
) -> Result<Value<Object>> {
|
|
||||||
let (sender, receiver) = channel::<Result<O>>();
|
|
||||||
let threadpool =
|
|
||||||
THREAD_POOL.get_or_init(|| NapiRSThreadPool(ThreadPool::new(get_physical() * 2)));
|
|
||||||
let inner_task = Arc::new(ThreadSafeTask::new(task));
|
|
||||||
let thread_task = Arc::clone(&inner_task);
|
|
||||||
let promise_task = Arc::clone(&inner_task);
|
|
||||||
threadpool.0.execute(move || {
|
|
||||||
let value = thread_task.borrow().compute();
|
|
||||||
match sender.send(value) {
|
|
||||||
Err(e) => panic!(e),
|
|
||||||
_ => {}
|
|
||||||
};
|
|
||||||
});
|
|
||||||
let rev_value = async {
|
|
||||||
let result = receiver.await;
|
|
||||||
result
|
|
||||||
.map_err(|_| Error {
|
|
||||||
status: Status::Cancelled,
|
|
||||||
reason: Some("Receiver cancelled".to_owned()),
|
|
||||||
})
|
|
||||||
.and_then(|v| v)
|
|
||||||
};
|
|
||||||
|
|
||||||
self.execute(rev_value, move |env, v| {
|
check_status(unsafe { sys::napi_create_promise(self.0, &mut raw_deferred, &mut raw_promise) })?;
|
||||||
promise_task.borrow().resolve(env, v)
|
AsyncWork::run(self.0, task, raw_deferred)?;
|
||||||
})
|
Ok(Value::from_raw_value(self, raw_promise, Object))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_node_version(&self) -> Result<NodeVersion> {
|
pub fn get_node_version(&self) -> Result<NodeVersion> {
|
||||||
|
|
|
@ -1,40 +1,10 @@
|
||||||
use threadpool::ThreadPool;
|
|
||||||
|
|
||||||
use crate::{Env, Result, Value, ValueType};
|
use crate::{Env, Result, Value, ValueType};
|
||||||
|
|
||||||
pub struct NapiRSThreadPool(pub ThreadPool);
|
|
||||||
|
|
||||||
unsafe impl Send for NapiRSThreadPool {}
|
|
||||||
|
|
||||||
unsafe impl Sync for NapiRSThreadPool {}
|
|
||||||
|
|
||||||
pub trait Task {
|
pub trait Task {
|
||||||
type Output: Send + 'static;
|
type Output: Send + Sized + 'static;
|
||||||
type JsValue: ValueType;
|
type JsValue: ValueType;
|
||||||
|
|
||||||
fn compute(&mut self) -> Result<Self::Output>;
|
fn compute(&self) -> Result<Self::Output>;
|
||||||
|
|
||||||
fn resolve(&self, env: &mut Env, output: Self::Output) -> Result<Value<Self::JsValue>>;
|
fn resolve(&self, env: &mut Env, output: Self::Output) -> Result<Value<Self::JsValue>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ThreadSafeTask<T: Task>(pub *mut T);
|
|
||||||
|
|
||||||
impl<T: Task> ThreadSafeTask<T> {
|
|
||||||
pub fn new(task: T) -> ThreadSafeTask<T> {
|
|
||||||
ThreadSafeTask(Box::into_raw(Box::new(task)))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub fn borrow(&self) -> &'static mut T {
|
|
||||||
Box::leak(unsafe { Box::from_raw(self.0) })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Task> Drop for ThreadSafeTask<T> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
unsafe { Box::from_raw(self.0) };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl<T: Task> Send for ThreadSafeTask<T> {}
|
|
||||||
unsafe impl<T: Task> Sync for ThreadSafeTask<T> {}
|
|
||||||
|
|
|
@ -18,9 +18,10 @@
|
||||||
],
|
],
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"format": "run-p format:md format:json format:yaml format:source",
|
"format": "run-p format:md format:json format:yaml format:source format:rs",
|
||||||
"format:md": "prettier --parser markdown --write './**/*.md'",
|
"format:md": "prettier --parser markdown --write './**/*.md'",
|
||||||
"format:json": "prettier --parser json --write './**/*.json'",
|
"format:json": "prettier --parser json --write './**/*.json'",
|
||||||
|
"format:rs": "cargo fmt",
|
||||||
"format:source": "prettier --config ./package.json --write './**/*.js'",
|
"format:source": "prettier --config ./package.json --write './**/*.js'",
|
||||||
"format:yaml": "prettier --parser yaml --write './**/*.{yml,yaml}'"
|
"format:yaml": "prettier --parser yaml --write './**/*.{yml,yaml}'"
|
||||||
},
|
},
|
||||||
|
|
|
@ -9,30 +9,17 @@ function testThrow() {
|
||||||
console.log('=== Test throwing from Rust')
|
console.log('=== Test throwing from Rust')
|
||||||
try {
|
try {
|
||||||
testModule.testThrow()
|
testModule.testThrow()
|
||||||
console.error('Expected function to throw an error')
|
console.log('Expected function to throw an error')
|
||||||
process.exit(1)
|
process.exit(1)
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error(e)
|
console.log(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
testSpawn()
|
||||||
function testSpawnThread(n) {
|
|
||||||
console.info('=== Test spawn task to threadpool')
|
|
||||||
return testModule.testSpawnThread(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
const future = testSpawn()
|
|
||||||
|
|
||||||
future
|
|
||||||
.then((value) => {
|
.then((value) => {
|
||||||
console.info(`${value} from napi`)
|
console.info(`${value} from napi`)
|
||||||
testThrow()
|
testThrow()
|
||||||
})
|
})
|
||||||
.then(() => testSpawnThread(20))
|
|
||||||
.then((value) => {
|
|
||||||
console.assert(value === 6765)
|
|
||||||
console.info('=== fibonacci result', value)
|
|
||||||
})
|
|
||||||
.catch((e) => {
|
.catch((e) => {
|
||||||
console.error(e)
|
console.error(e)
|
||||||
process.exit(1)
|
process.exit(1)
|
34
test_module/fuzzy.js
Normal file
34
test_module/fuzzy.js
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
const { exec } = require('child_process')
|
||||||
|
|
||||||
|
Array.from({ length: 500 })
|
||||||
|
.reduce(async (acc) => {
|
||||||
|
await acc
|
||||||
|
await run()
|
||||||
|
}, null)
|
||||||
|
.then(() => {
|
||||||
|
console.info(`Fuzzy test success, passed ${500} tests.`)
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
console.error(e)
|
||||||
|
process.exit(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
const run = () => {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const testProcess = exec('node ./spawn.js', {
|
||||||
|
env: process.env,
|
||||||
|
})
|
||||||
|
testProcess.stdout.pipe(process.stdout)
|
||||||
|
testProcess.stderr.pipe(process.stderr)
|
||||||
|
testProcess.on('error', (err) => {
|
||||||
|
reject(err)
|
||||||
|
})
|
||||||
|
testProcess.on('exit', (code) => {
|
||||||
|
if (code) {
|
||||||
|
reject(new TypeError(`Child process exit code ${code}`))
|
||||||
|
} else {
|
||||||
|
resolve()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
16
test_module/spawn.js
Normal file
16
test_module/spawn.js
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
const testModule = require('./index.node')
|
||||||
|
|
||||||
|
function testSpawnThread(n) {
|
||||||
|
console.info('=== Test spawn task to threadpool')
|
||||||
|
return testModule.testSpawnThread(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
testSpawnThread(20)
|
||||||
|
.then((value) => {
|
||||||
|
console.assert(value === 6765)
|
||||||
|
console.info('=== fibonacci result', value)
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
console.error(e)
|
||||||
|
process.exit(1)
|
||||||
|
})
|
|
@ -57,7 +57,7 @@ impl Task for ComputeFib {
|
||||||
type Output = u32;
|
type Output = u32;
|
||||||
type JsValue = Number;
|
type JsValue = Number;
|
||||||
|
|
||||||
fn compute(&mut self) -> Result<Self::Output> {
|
fn compute(&self) -> Result<Self::Output> {
|
||||||
Ok(fibonacci_native(self.n))
|
Ok(fibonacci_native(self.n))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue