feat(napi): implement AsyncTask with AbortSignal support

This commit is contained in:
LongYinan 2021-11-04 18:44:06 +08:00
parent d36c303dec
commit e74fe2fb94
No known key found for this signature in database
GPG key ID: C3666B7FC82ADAD7
17 changed files with 114 additions and 98 deletions

View file

@ -11,10 +11,14 @@ impl Task for BufferLength {
Ok(self.0.len() + 1)
}
fn resolve(self, env: Env, output: Self::Output) -> Result<Self::JsValue> {
self.0.unref(env)?;
fn resolve(&mut self, env: Env, output: Self::Output) -> Result<Self::JsValue> {
env.create_uint32(output as u32)
}
fn finally(&mut self, env: Env) -> Result<()> {
self.0.unref(env)?;
Ok(())
}
}
#[js_function(1)]
@ -33,7 +37,7 @@ fn bench_threadsafe_function(ctx: CallContext) -> Result<JsUndefined> {
let tsfn = ctx.env.create_threadsafe_function(
&callback,
0,
|ctx: ThreadSafeCallContext<(usize, Ref<JsBufferValue>)>| {
|mut ctx: ThreadSafeCallContext<(usize, Ref<JsBufferValue>)>| {
ctx
.env
.create_uint32(ctx.value.0 as u32)

View file

@ -76,13 +76,6 @@ impl TryToTokens for NapiStruct {
impl NapiStruct {
fn gen_helper_mod(&self) -> TokenStream {
if crate::typegen::r#struct::TASK_STRUCTS.with(|t| {
println!("{:?}", t);
t.borrow().get(&self.name.to_string()).is_some()
}) {
return quote! {};
}
let mod_name = Ident::new(
&format!("__napi_helper__{}", self.name.to_string()),
Span::call_site(),

View file

@ -71,7 +71,7 @@ static KNOWN_TYPES: Lazy<HashMap<&'static str, &'static str>> = Lazy::new(|| {
("null", "null"),
("symbol", "symbol"),
("external", "object"),
("AsyncTaskAbortController", "AbortController"),
("AbortSignal", "AbortSignal"),
("Function", "(...args: any[]) => any"),
]);

View file

@ -50,6 +50,7 @@ macro_rules! attrgen {
(skip, Skip(Span)),
(strict, Strict(Span)),
(object, Object(Span)),
(task, Task(Span)),
// impl later
// (inspectable, Inspectable(Span)),

View file

@ -1,7 +1,7 @@
use std::mem;
use std::os::raw::c_void;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicU8, Ordering};
use std::{ffi::CString, rc::Rc};
use crate::{
@ -14,7 +14,7 @@ struct AsyncWork<T: Task> {
deferred: sys::napi_deferred,
value: Result<mem::MaybeUninit<T::Output>>,
napi_async_work: sys::napi_async_work,
abort: Rc<AtomicBool>,
status: Rc<AtomicU8>,
}
pub struct AsyncWorkPromise {
@ -22,8 +22,11 @@ pub struct AsyncWorkPromise {
raw_promise: sys::napi_value,
pub(crate) deferred: sys::napi_deferred,
env: sys::napi_env,
// share with AsyncWork
pub(crate) abort: Rc<AtomicBool>,
/// share with AsyncWork
/// 0: not started
/// 1: completed
/// 2: canceled
pub(crate) status: Rc<AtomicU8>,
}
impl AsyncWorkPromise {
@ -33,7 +36,7 @@ impl AsyncWorkPromise {
pub fn cancel(&self) -> Result<()> {
// must be happened in the main thread, relaxed is enough
self.abort.store(true, Ordering::Relaxed);
self.status.store(2, Ordering::Relaxed);
check_status!(unsafe { sys::napi_cancel_async_work(self.env, self.napi_async_work) })
}
}
@ -41,20 +44,20 @@ impl AsyncWorkPromise {
pub fn run<T: Task>(
env: sys::napi_env,
task: T,
abort_status: Option<Rc<AtomicBool>>,
abort_status: Option<Rc<AtomicU8>>,
) -> Result<AsyncWorkPromise> {
let mut raw_resource = ptr::null_mut();
check_status!(unsafe { sys::napi_create_object(env, &mut raw_resource) })?;
let mut raw_promise = ptr::null_mut();
let mut deferred = ptr::null_mut();
check_status!(unsafe { sys::napi_create_promise(env, &mut deferred, &mut raw_promise) })?;
let task_abort = abort_status.unwrap_or_else(|| Rc::new(AtomicBool::new(false)));
let task_status = abort_status.unwrap_or_else(|| Rc::new(AtomicU8::new(0)));
let result = Box::leak(Box::new(AsyncWork {
inner_task: task,
deferred,
value: Ok(mem::MaybeUninit::zeroed()),
napi_async_work: ptr::null_mut(),
abort: task_abort.clone(),
status: task_status.clone(),
}));
check_status!(unsafe {
sys::napi_create_async_work(
@ -76,7 +79,7 @@ pub fn run<T: Task>(
raw_promise,
deferred,
env,
abort: task_abort,
status: task_status,
})
}
@ -88,16 +91,11 @@ unsafe impl<T: Task> Sync for AsyncWork<T> {}
/// So it actually could do nothing here, because `execute` function is called in the other thread mostly.
unsafe extern "C" fn execute<T: Task>(_env: sys::napi_env, data: *mut c_void) {
let mut work = Box::from_raw(data as *mut AsyncWork<T>);
if work.abort.load(Ordering::Relaxed) {
return;
}
let _ = mem::replace(
&mut work.value,
work.inner_task.compute().map(mem::MaybeUninit::new),
);
if !work.abort.load(Ordering::Relaxed) {
Box::leak(work);
}
}
unsafe extern "C" fn complete<T: Task>(
@ -116,6 +114,7 @@ unsafe extern "C" fn complete<T: Task>(
}
Err(e) => work.inner_task.reject(Env::from_raw(env), e),
};
if status != sys::Status::napi_cancelled && work.status.load(Ordering::Relaxed) != 2 {
match check_status!(status)
.and_then(move |_| value)
.and_then(|v| ToNapiValue::to_napi_value(env, v))
@ -129,9 +128,14 @@ unsafe extern "C" fn complete<T: Task>(
debug_assert!(status == sys::Status::napi_ok, "Reject promise failed");
}
};
}
if let Err(e) = work.inner_task.finally(Env::from_raw(env)) {
debug_assert!(false, "Panic in Task finally fn: {:?}", e);
}
let delete_status = sys::napi_delete_async_work(env, napi_async_work);
debug_assert!(
delete_status == sys::Status::napi_ok,
"Delete async work failed"
);
work.status.store(1, Ordering::Relaxed);
}

View file

@ -1,14 +1,14 @@
use std::ffi::c_void;
use std::ptr;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use std::sync::atomic::{AtomicPtr, AtomicU8, Ordering};
use super::{FromNapiValue, ToNapiValue, TypeName};
use crate::{async_work, check_status, Env, Error, JsError, JsObject, NapiValue, Status, Task};
pub struct AsyncTask<T: Task> {
inner: T,
abort_controller: Option<AsyncTaskAbortController>,
abort_signal: Option<AbortSignal>,
}
impl<T: Task> TypeName for T {
@ -25,41 +25,40 @@ impl<T: Task> AsyncTask<T> {
pub fn new(task: T) -> Self {
Self {
inner: task,
abort_controller: None,
abort_signal: None,
}
}
pub fn with_abort_controller(task: T, abort_controller: AsyncTaskAbortController) -> Self {
pub fn with_signal(task: T, signal: AbortSignal) -> Self {
Self {
inner: task,
abort_controller: Some(abort_controller),
abort_signal: Some(signal),
}
}
}
/// https://developer.mozilla.org/zh-CN/docs/Web/API/AbortController
pub struct AsyncTaskAbortController {
pub struct AbortSignal {
raw_work: Rc<AtomicPtr<napi_sys::napi_async_work__>>,
raw_deferred: Rc<AtomicPtr<napi_sys::napi_deferred__>>,
abort: Rc<AtomicBool>,
status: Rc<AtomicU8>,
}
impl FromNapiValue for AsyncTaskAbortController {
impl FromNapiValue for AbortSignal {
unsafe fn from_napi_value(
env: napi_sys::napi_env,
napi_val: napi_sys::napi_value,
) -> crate::Result<Self> {
let raw_abort_controller = JsObject::from_raw_unchecked(env, napi_val);
let mut signal = raw_abort_controller.get_named_property::<JsObject>("signal")?;
let mut signal = JsObject::from_raw_unchecked(env, napi_val);
let async_work_inner: Rc<AtomicPtr<napi_sys::napi_async_work__>> =
Rc::new(AtomicPtr::new(ptr::null_mut()));
let raw_promise: Rc<AtomicPtr<napi_sys::napi_deferred__>> =
Rc::new(AtomicPtr::new(ptr::null_mut()));
let abort_status = Rc::new(AtomicBool::new(false));
let abort_controller = AsyncTaskAbortController {
let task_status = Rc::new(AtomicU8::new(0));
let abort_controller = AbortSignal {
raw_work: async_work_inner.clone(),
raw_deferred: raw_promise.clone(),
abort: abort_status.clone(),
status: task_status.clone(),
};
let js_env = Env::from_raw(env);
check_status!(napi_sys::napi_wrap(
@ -71,10 +70,10 @@ impl FromNapiValue for AsyncTaskAbortController {
ptr::null_mut(),
))?;
signal.set_named_property("onabort", js_env.create_function("onabort", on_abort)?)?;
Ok(AsyncTaskAbortController {
Ok(AbortSignal {
raw_work: async_work_inner,
raw_deferred: raw_promise,
abort: abort_status,
status: task_status,
})
}
}
@ -107,13 +106,16 @@ extern "C" fn on_abort(
"{}",
"Unwrap async_task from AbortSignal failed"
);
let abort_controller = Box::leak(Box::from_raw(async_task as *mut AsyncTaskAbortController));
let abort_controller = Box::leak(Box::from_raw(async_task as *mut AbortSignal));
// Task Completed, return now
if abort_controller.status.load(Ordering::Relaxed) == 1 {
return ptr::null_mut();
}
let raw_async_work = abort_controller.raw_work.load(Ordering::Relaxed);
let deferred = abort_controller.raw_deferred.load(Ordering::Relaxed);
// abort function must be called from JavaScript main thread, so Relaxed Ordering is ok.
abort_controller.abort.store(true, Ordering::Relaxed);
napi_sys::napi_cancel_async_work(env, raw_async_work);
// napi_sys::napi_delete_async_work(env, raw_async_work);
// abort function must be called from JavaScript main thread, so Relaxed Ordering is ok.
abort_controller.status.store(2, Ordering::Relaxed);
let abort_error = Error::new(Status::Cancelled, "AbortError".to_owned());
let reject_status =
napi_sys::napi_reject_deferred(env, deferred, JsError::from(abort_error).into_value(env));
@ -123,10 +125,8 @@ extern "C" fn on_abort(
"{}",
"Reject AbortError failed"
);
let mut undefined = ptr::null_mut();
napi_sys::napi_get_undefined(env, &mut undefined);
undefined
}
ptr::null_mut()
}
impl<T: Task> ToNapiValue for AsyncTask<T> {
@ -134,8 +134,8 @@ impl<T: Task> ToNapiValue for AsyncTask<T> {
env: napi_sys::napi_env,
val: Self,
) -> crate::Result<napi_sys::napi_value> {
if let Some(abort_controller) = val.abort_controller {
let async_promise = async_work::run(env, val.inner, Some(abort_controller.abort.clone()))?;
if let Some(abort_controller) = val.abort_signal {
let async_promise = async_work::run(env, val.inner, Some(abort_controller.status.clone()))?;
abort_controller
.raw_work
.store(async_promise.napi_async_work, Ordering::Relaxed);
@ -155,5 +155,5 @@ unsafe extern "C" fn async_task_abort_controller_finalize(
finalize_data: *mut c_void,
_finalize_hint: *mut c_void,
) {
Box::from_raw(finalize_data as *mut AsyncTaskAbortController);
Box::from_raw(finalize_data as *mut AbortSignal);
}

View file

@ -77,7 +77,11 @@ unsafe extern "C" fn napi_register_module_v1(
for (rust_name, (js_name, props)) in to_register_classes.take().into_iter() {
unsafe {
let (ctor, props): (Vec<_>, Vec<_>) = props.into_iter().partition(|prop| prop.is_ctor);
// one or more?
// one or more or zero?
// zero is for `#[napi(task)]`
if ctor.is_empty() {
continue;
}
let ctor = ctor[0].raw().method.unwrap();
let raw_props: Vec<_> = props.iter().map(|prop| prop.raw()).collect();

View file

@ -2,12 +2,8 @@ use std::mem;
use std::ops::{Deref, DerefMut};
use std::ptr;
use super::Value;
#[cfg(feature = "serde-json")]
use super::ValueType;
use crate::bindgen_runtime::TypeName;
use crate::check_status;
use crate::{sys, JsUnknown, NapiValue, Ref, Result};
use super::{Value, ValueType};
use crate::{bindgen_runtime::TypeName, check_status, sys, JsUnknown, NapiValue, Ref, Result};
pub struct JsBuffer(pub(crate) Value);

View file

@ -34,7 +34,7 @@ impl<T> Ref<T> {
Ok(self.count)
}
pub fn unref(mut self, env: Env) -> Result<u32> {
pub fn unref(&mut self, env: Env) -> Result<u32> {
check_status!(unsafe { sys::napi_reference_unref(env.0, self.raw_ref, &mut self.count) })?;
if self.count == 0 {

View file

@ -11,13 +11,15 @@ pub trait Task: Send + Sized {
fn compute(&mut self) -> Result<Self::Output>;
/// Into this method if `compute` return `Ok`
fn resolve(self, env: Env, output: Self::Output) -> Result<Self::JsValue>;
fn resolve(&mut self, env: Env, output: Self::Output) -> Result<Self::JsValue>;
/// Into this method if `compute` return `Err`
fn reject(self, _env: Env, err: Error) -> Result<Self::JsValue> {
fn reject(&mut self, _env: Env, err: Error) -> Result<Self::JsValue> {
Err(err)
}
// after resolve or reject
fn finally() {}
fn finally(&mut self, _env: Env) -> Result<()> {
Ok(())
}
}

View file

@ -154,15 +154,16 @@ pub fn test_tsfn_with_ref(ctx: CallContext) -> Result<JsUndefined> {
let callback = ctx.get::<JsFunction>(0)?;
let options = ctx.get::<JsObject>(1)?;
let options_ref = ctx.env.create_reference(options)?;
let tsfn =
ctx
.env
.create_threadsafe_function(&callback, 0, |ctx: ThreadSafeCallContext<Ref<()>>| {
let tsfn = ctx.env.create_threadsafe_function(
&callback,
0,
|mut ctx: ThreadSafeCallContext<Ref<()>>| {
ctx
.env
.get_reference_value_unchecked::<JsObject>(&ctx.value)
.and_then(|obj| ctx.value.unref(ctx.env).map(|_| vec![obj]))
})?;
},
)?;
thread::spawn(move || {
tsfn.call(Ok(options_ref), ThreadsafeFunctionCallMode::Blocking);

View file

@ -22,7 +22,7 @@ impl Task for ComputeFib {
Ok(fibonacci_native(self.n))
}
fn resolve(self, env: Env, output: Self::Output) -> Result<Self::JsValue> {
fn resolve(&mut self, env: Env, output: Self::Output) -> Result<Self::JsValue> {
env.create_uint32(output)
}
}
@ -63,15 +63,18 @@ impl Task for CountBufferLength {
Ok((&self.data).len())
}
fn resolve(self, env: Env, output: Self::Output) -> Result<Self::JsValue> {
self.data.unref(env)?;
fn resolve(&mut self, env: Env, output: Self::Output) -> Result<Self::JsValue> {
env.create_uint32(output as _)
}
fn reject(self, env: Env, err: Error) -> Result<Self::JsValue> {
self.data.unref(env)?;
fn reject(&mut self, env: Env, err: Error) -> Result<Self::JsValue> {
Err(err)
}
fn finally(&mut self, env: Env) -> Result<()> {
self.data.unref(env)?;
Ok(())
}
}
#[js_function(1)]

View file

@ -43,7 +43,7 @@ Generated by [AVA](https://avajs.dev).
export function concatUtf16(s: string): string␊
export function concatLatin1(s: string): string␊
export function withoutAbortController(a: number, b: number): Promise<number>
export function withAbortController(a: number, b: number, ctrl: AbortController): Promise<number>
export function withAbortController(a: number, b: number, signal: AbortSignal): Promise<number>
export function getBuffer(): Buffer␊
export class Animal {␊
readonly kind: Kind␊

View file

@ -167,9 +167,11 @@ test('async task without abort controller', async (t) => {
t.is(await withoutAbortController(1, 2), 3)
})
test('async task with abort controller', async (t) => {
const MaybeTest = typeof AbortController !== 'undefined' ? test : test.skip
MaybeTest('async task with abort controller', async (t) => {
const ctrl = new AbortController()
const promise = withAbortController(1, 2, ctrl)
const promise = withAbortController(1, 2, ctrl.signal)
try {
ctrl.abort()
await promise
@ -178,3 +180,9 @@ test('async task with abort controller', async (t) => {
t.is((err as Error).message, 'AbortError')
}
})
MaybeTest('abort resolved task', async (t) => {
const ctrl = new AbortController()
await withAbortController(1, 2, ctrl.signal).then(() => ctrl.abort())
t.pass('should not throw')
})

View file

@ -33,7 +33,7 @@ export function concatStr(mutS: string): string
export function concatUtf16(s: string): string
export function concatLatin1(s: string): string
export function withoutAbortController(a: number, b: number): Promise<number>
export function withAbortController(a: number, b: number, ctrl: AbortController): Promise<number>
export function withAbortController(a: number, b: number, signal: AbortSignal): Promise<number>
export function getBuffer(): Buffer
export class Animal {
readonly kind: Kind

View file

@ -5,17 +5,17 @@ use napi::Task;
struct DelaySum(u32, u32);
#[napi]
#[napi(task)]
impl Task for DelaySum {
type Output = u32;
type JsValue = u32;
fn compute(&mut self) -> Result<Self::Output> {
sleep(std::time::Duration::from_secs(1));
sleep(std::time::Duration::from_millis(100));
Ok(self.0 + self.1)
}
fn resolve(self, _env: napi::Env, output: Self::Output) -> Result<Self::JsValue> {
fn resolve(&mut self, _env: napi::Env, output: Self::Output) -> Result<Self::JsValue> {
Ok(output)
}
}
@ -26,6 +26,6 @@ fn without_abort_controller(a: u32, b: u32) -> AsyncTask<DelaySum> {
}
#[napi]
fn with_abort_controller(a: u32, b: u32, ctrl: AsyncTaskAbortController) -> AsyncTask<DelaySum> {
AsyncTask::with_abort_controller(DelaySum(a, b), ctrl)
fn with_abort_controller(a: u32, b: u32, signal: AbortSignal) -> AsyncTask<DelaySum> {
AsyncTask::with_signal(DelaySum(a, b), signal)
}