refactor(napi): implement async task cancel
This commit is contained in:
parent
f1281af260
commit
aadca83d2e
3 changed files with 82 additions and 59 deletions
|
@ -4,54 +4,76 @@ use std::ptr;
|
||||||
|
|
||||||
use crate::error::check_status;
|
use crate::error::check_status;
|
||||||
use crate::js_values::NapiValue;
|
use crate::js_values::NapiValue;
|
||||||
use crate::{sys, Env, Result, Task};
|
use crate::{sys, Env, JsObject, Result, Task};
|
||||||
|
|
||||||
pub struct AsyncWork<T: Task> {
|
struct AsyncWork<T: Task> {
|
||||||
inner_task: T,
|
inner_task: T,
|
||||||
deferred: sys::napi_deferred,
|
deferred: sys::napi_deferred,
|
||||||
value: Result<*mut T::Output>,
|
value: Result<mem::MaybeUninit<T::Output>>,
|
||||||
|
napi_async_work: sys::napi_async_work,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Task> AsyncWork<T> {
|
#[derive(Debug)]
|
||||||
pub fn run(env: sys::napi_env, task: T, deferred: sys::napi_deferred) -> Result<()> {
|
pub struct AsyncWorkPromise<'env> {
|
||||||
|
napi_async_work: sys::napi_async_work,
|
||||||
|
raw_promise: sys::napi_value,
|
||||||
|
env: &'env Env,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'env> AsyncWorkPromise<'env> {
|
||||||
|
#[inline(always)]
|
||||||
|
pub fn promise_object(&self) -> JsObject {
|
||||||
|
JsObject::from_raw_unchecked(self.env.0, self.raw_promise)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cancel(self) -> Result<()> {
|
||||||
|
check_status(unsafe { sys::napi_cancel_async_work(self.env.0, self.napi_async_work) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
pub fn run<'env, T: Task>(env: &'env Env, task: T) -> Result<AsyncWorkPromise<'env>> {
|
||||||
let mut raw_resource = ptr::null_mut();
|
let mut raw_resource = ptr::null_mut();
|
||||||
check_status(unsafe { sys::napi_create_object(env, &mut raw_resource) })?;
|
check_status(unsafe { sys::napi_create_object(env.0, &mut raw_resource) })?;
|
||||||
|
let mut raw_promise = ptr::null_mut();
|
||||||
|
let mut deferred = ptr::null_mut();
|
||||||
|
check_status(unsafe { sys::napi_create_promise(env.0, &mut deferred, &mut raw_promise) })?;
|
||||||
let mut raw_name = ptr::null_mut();
|
let mut raw_name = ptr::null_mut();
|
||||||
let s = "napi_rs_async";
|
let s = "napi_rs_async_work";
|
||||||
check_status(unsafe {
|
check_status(unsafe {
|
||||||
sys::napi_create_string_utf8(
|
sys::napi_create_string_utf8(
|
||||||
env,
|
env.0,
|
||||||
s.as_ptr() as *const c_char,
|
s.as_ptr() as *const c_char,
|
||||||
s.len() as u64,
|
s.len() as u64,
|
||||||
&mut raw_name,
|
&mut raw_name,
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
let result = AsyncWork {
|
let result = Box::leak(Box::new(AsyncWork {
|
||||||
inner_task: task,
|
inner_task: task,
|
||||||
deferred,
|
deferred,
|
||||||
value: Ok(ptr::null_mut()),
|
value: Ok(mem::MaybeUninit::zeroed()),
|
||||||
};
|
napi_async_work: ptr::null_mut(),
|
||||||
let mut async_work = ptr::null_mut();
|
}));
|
||||||
check_status(unsafe {
|
check_status(unsafe {
|
||||||
sys::napi_create_async_work(
|
sys::napi_create_async_work(
|
||||||
env,
|
env.0,
|
||||||
raw_resource,
|
raw_resource,
|
||||||
raw_name,
|
raw_name,
|
||||||
Some(execute::<T> as unsafe extern "C" fn(env: sys::napi_env, data: *mut c_void)),
|
Some(execute::<T> as unsafe extern "C" fn(env: sys::napi_env, data: *mut c_void)),
|
||||||
Some(
|
Some(
|
||||||
complete::<T>
|
complete::<T>
|
||||||
as unsafe extern "C" fn(
|
as unsafe extern "C" fn(env: sys::napi_env, status: sys::napi_status, data: *mut c_void),
|
||||||
env: sys::napi_env,
|
|
||||||
status: sys::napi_status,
|
|
||||||
data: *mut c_void,
|
|
||||||
),
|
),
|
||||||
),
|
result as *mut _ as *mut c_void,
|
||||||
Box::leak(Box::new(result)) as *mut _ as *mut c_void,
|
&mut result.napi_async_work,
|
||||||
&mut async_work,
|
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
check_status(unsafe { sys::napi_queue_async_work(env, async_work) })
|
check_status(unsafe { sys::napi_queue_async_work(env.0, result.napi_async_work) })?;
|
||||||
}
|
Ok(AsyncWorkPromise {
|
||||||
|
napi_async_work: result.napi_async_work,
|
||||||
|
raw_promise,
|
||||||
|
env,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl<T: Task> Send for AsyncWork<T> {}
|
unsafe impl<T: Task> Send for AsyncWork<T> {}
|
||||||
|
@ -60,10 +82,10 @@ unsafe impl<T: Task> Sync for AsyncWork<T> {}
|
||||||
|
|
||||||
unsafe extern "C" fn execute<T: Task>(_env: sys::napi_env, data: *mut c_void) {
|
unsafe extern "C" fn execute<T: Task>(_env: sys::napi_env, data: *mut c_void) {
|
||||||
let mut work = Box::from_raw(data as *mut AsyncWork<T>);
|
let mut work = Box::from_raw(data as *mut AsyncWork<T>);
|
||||||
work.value = work
|
let _ = mem::replace(
|
||||||
.inner_task
|
&mut work.value,
|
||||||
.compute()
|
work.inner_task.compute().map(|v| mem::MaybeUninit::new(v)),
|
||||||
.map(|v| Box::into_raw(Box::from(v)));
|
);
|
||||||
Box::leak(work);
|
Box::leak(work);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,12 +95,12 @@ unsafe extern "C" fn complete<T: Task>(
|
||||||
data: *mut c_void,
|
data: *mut c_void,
|
||||||
) {
|
) {
|
||||||
let mut work = Box::from_raw(data as *mut AsyncWork<T>);
|
let mut work = Box::from_raw(data as *mut AsyncWork<T>);
|
||||||
let value_ptr = mem::replace(&mut work.value, Ok(ptr::null_mut()));
|
let value_ptr = mem::replace(&mut work.value, Ok(mem::MaybeUninit::zeroed()));
|
||||||
let deferred = mem::replace(&mut work.deferred, ptr::null_mut());
|
let deferred = mem::replace(&mut work.deferred, ptr::null_mut());
|
||||||
|
let napi_async_work = mem::replace(&mut work.napi_async_work, ptr::null_mut());
|
||||||
let value = value_ptr.and_then(move |v| {
|
let value = value_ptr.and_then(move |v| {
|
||||||
let mut env = Env::from_raw(env);
|
let output = v.assume_init();
|
||||||
let output = ptr::read(v as *const _);
|
work.inner_task.resolve(&mut Env::from_raw(env), output)
|
||||||
work.inner_task.resolve(&mut env, output)
|
|
||||||
});
|
});
|
||||||
match check_status(status).and_then(move |_| value) {
|
match check_status(status).and_then(move |_| value) {
|
||||||
Ok(v) => {
|
Ok(v) => {
|
||||||
|
@ -90,4 +112,9 @@ unsafe extern "C" fn complete<T: Task>(
|
||||||
debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed");
|
debug_assert!(status == sys::napi_status::napi_ok, "Reject promise failed");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let delete_status = sys::napi_delete_async_work(env, napi_async_work);
|
||||||
|
debug_assert!(
|
||||||
|
delete_status == sys::napi_status::napi_ok,
|
||||||
|
"Delete async work failed"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ use std::mem;
|
||||||
use std::os::raw::{c_char, c_void};
|
use std::os::raw::{c_char, c_void};
|
||||||
use std::ptr;
|
use std::ptr;
|
||||||
|
|
||||||
use crate::async_work::AsyncWork;
|
use crate::async_work::{self, AsyncWorkPromise};
|
||||||
use crate::error::check_status;
|
use crate::error::check_status;
|
||||||
use crate::js_values::*;
|
use crate::js_values::*;
|
||||||
use crate::task::Task;
|
use crate::task::Task;
|
||||||
|
@ -487,13 +487,8 @@ impl Env {
|
||||||
Ok(JsObject::from_raw_unchecked(self.0, result))
|
Ok(JsObject::from_raw_unchecked(self.0, result))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn spawn<T: 'static + Task>(&self, task: T) -> Result<JsObject> {
|
pub fn spawn<T: 'static + Task>(&self, task: T) -> Result<AsyncWorkPromise> {
|
||||||
let mut raw_promise = ptr::null_mut();
|
async_work::run(self, task)
|
||||||
let mut raw_deferred = ptr::null_mut();
|
|
||||||
|
|
||||||
check_status(unsafe { sys::napi_create_promise(self.0, &mut raw_deferred, &mut raw_promise) })?;
|
|
||||||
AsyncWork::run(self.0, task, raw_deferred)?;
|
|
||||||
Ok(JsObject::from_raw_unchecked(self.0, raw_promise))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_global(&self) -> Result<JsObject> {
|
pub fn get_global(&self) -> Result<JsObject> {
|
||||||
|
|
|
@ -37,7 +37,8 @@ fn fibonacci_native(n: u32) -> u32 {
|
||||||
fn test_spawn_thread(ctx: CallContext) -> Result<JsObject> {
|
fn test_spawn_thread(ctx: CallContext) -> Result<JsObject> {
|
||||||
let n = ctx.get::<JsNumber>(0)?;
|
let n = ctx.get::<JsNumber>(0)?;
|
||||||
let task = ComputeFib::new(n.try_into()?);
|
let task = ComputeFib::new(n.try_into()?);
|
||||||
ctx.env.spawn(task)
|
let async_promise = ctx.env.spawn(task)?;
|
||||||
|
Ok(async_promise.promise_object())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn register_js(module: &mut Module) -> Result<()> {
|
pub fn register_js(module: &mut Module) -> Result<()> {
|
||||||
|
|
Loading…
Reference in a new issue