fix(executor): use Arc to ensure thread safe

This commit is contained in:
LongYinan 2020-03-12 22:17:39 +08:00
parent 5b93dd187b
commit ef99000955
No known key found for this signature in database
GPG key ID: C3666B7FC82ADAD7
6 changed files with 28 additions and 44 deletions

View file

@ -68,7 +68,7 @@ jobs:
yarn yarn
cd test_module cd test_module
yarn build yarn build
node tests.js yarn test
- name: Clear the cargo caches - name: Clear the cargo caches
run: | run: |

View file

@ -68,7 +68,7 @@ jobs:
yarn yarn
cd test_module cd test_module
yarn build yarn build
node tests.js yarn test
- name: Clear the cargo caches - name: Clear the cargo caches
run: | run: |

View file

@ -74,7 +74,7 @@ jobs:
yarn yarn
cd test_module cd test_module
yarn build yarn build
node tests.js yarn test
env: env:
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1

View file

@ -4,7 +4,7 @@ use std::future::Future;
use std::mem; use std::mem;
use std::os::raw::c_void; use std::os::raw::c_void;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc;
use std::task::{Context, RawWaker, RawWakerVTable, Waker}; use std::task::{Context, RawWaker, RawWakerVTable, Waker};
pub struct LibuvExecutor { pub struct LibuvExecutor {
@ -46,7 +46,6 @@ unsafe fn drop_uv_async(uv_async_t_ptr: *const ()) {
struct Task<'a> { struct Task<'a> {
future: Pin<Box<dyn Future<Output = ()>>>, future: Pin<Box<dyn Future<Output = ()>>>,
context: Context<'a>, context: Context<'a>,
resolved: AtomicBool,
} }
impl LibuvExecutor { impl LibuvExecutor {
@ -68,46 +67,45 @@ impl LibuvExecutor {
&UV_ASYNC_V_TABLE, &UV_ASYNC_V_TABLE,
)); ));
let context = Context::from_waker(&waker); let context = Context::from_waker(&waker);
let task = Box::leak(Box::new(Task { let mut task = Box::new(Task {
future: Box::pin(future), future: Box::pin(future),
context, context,
resolved: AtomicBool::new(false), });
})); if !task.as_mut().poll_future() {
sys::uv_handle_set_data( let arc_task = Arc::new(task);
uv_async_t_ref as *mut _ as *mut sys::uv_handle_t, sys::uv_handle_set_data(
task as *mut _ as *mut c_void, uv_async_t_ref as *mut _ as *mut sys::uv_handle_t,
); Arc::into_raw(arc_task) as *mut c_void,
task.poll_future(); );
}
} }
} }
} }
impl<'a> Task<'a> { impl<'a> Task<'a> {
fn poll_future(&mut self) -> bool { fn poll_future(&mut self) -> bool {
if self.resolved.load(Ordering::Relaxed) {
return true;
}
match self.future.as_mut().poll(&mut self.context) { match self.future.as_mut().poll(&mut self.context) {
Poll::Ready(_) => { Poll::Ready(_) => true,
while !self.resolved.swap(true, Ordering::Relaxed) {}
true
}
Poll::Pending => false, Poll::Pending => false,
} }
} }
} }
unsafe extern "C" fn poll_future(handle: *mut sys::uv_async_t) { unsafe extern "C" fn poll_future(handle: *mut sys::uv_async_t) {
let data_ptr = sys::uv_handle_get_data(handle as *mut sys::uv_handle_t) as *mut Task; let data_ptr = sys::uv_handle_get_data(handle as *mut sys::uv_handle_t) as *mut Box<Task>;
let mut task = Box::from_raw(data_ptr); let mut task = Arc::from_raw(data_ptr);
if !task.as_mut().poll_future() { if let Some(mut_task) = Arc::get_mut(&mut task) {
Box::leak(task); if mut_task.poll_future() {
Arc::into_raw(task);
} else {
sys::uv_close(
handle as *mut sys::uv_handle_t,
Some(drop_handle_after_close),
);
};
} else { } else {
sys::uv_close( Arc::into_raw(task);
handle as *mut sys::uv_handle_t, }
Some(drop_handle_after_close),
);
};
} }
unsafe extern "C" fn drop_handle_after_close(handle: *mut sys::uv_handle_t) { unsafe extern "C" fn drop_handle_after_close(handle: *mut sys::uv_handle_t) {

View file

@ -4,6 +4,6 @@
"scripts": { "scripts": {
"build": "cargo build && node ../scripts/napi.js", "build": "cargo build && node ../scripts/napi.js",
"build-release": "cargo build --release && node ../scripts/napi.js --release", "build-release": "cargo build --release && node ../scripts/napi.js --release",
"test": "node ./tests.js" "test": "node ./index.js"
} }
} }

View file

@ -1,14 +0,0 @@
const { platform } = require('os')
const { fork } = require('child_process')
fork('./index.js', {
stdio: 'inherit',
}).on('exit', (code) => {
if (code !== 0) {
if (code === 3221225477 && platform() === 'win32') {
console.error(code)
process.exit(0)
}
process.exit(code)
}
})