diff --git a/bench/src/async_compute.rs b/bench/src/async_compute.rs index d2630a76..df0158ea 100644 --- a/bench/src/async_compute.rs +++ b/bench/src/async_compute.rs @@ -11,10 +11,14 @@ impl Task for BufferLength { Ok(self.0.len() + 1) } - fn resolve(self, env: Env, output: Self::Output) -> Result { - self.0.unref(env)?; + fn resolve(&mut self, env: Env, output: Self::Output) -> Result { env.create_uint32(output as u32) } + + fn finally(&mut self, env: Env) -> Result<()> { + self.0.unref(env)?; + Ok(()) + } } #[js_function(1)] @@ -33,7 +37,7 @@ fn bench_threadsafe_function(ctx: CallContext) -> Result { let tsfn = ctx.env.create_threadsafe_function( &callback, 0, - |ctx: ThreadSafeCallContext<(usize, Ref)>| { + |mut ctx: ThreadSafeCallContext<(usize, Ref)>| { ctx .env .create_uint32(ctx.value.0 as u32) diff --git a/crates/backend/src/codegen/struct.rs b/crates/backend/src/codegen/struct.rs index 323b3039..c85d9dc7 100644 --- a/crates/backend/src/codegen/struct.rs +++ b/crates/backend/src/codegen/struct.rs @@ -76,13 +76,6 @@ impl TryToTokens for NapiStruct { impl NapiStruct { fn gen_helper_mod(&self) -> TokenStream { - if crate::typegen::r#struct::TASK_STRUCTS.with(|t| { - println!("{:?}", t); - t.borrow().get(&self.name.to_string()).is_some() - }) { - return quote! {}; - } - let mod_name = Ident::new( &format!("__napi_helper__{}", self.name.to_string()), Span::call_site(), diff --git a/crates/backend/src/typegen.rs b/crates/backend/src/typegen.rs index 3b561faf..7b3b9da8 100644 --- a/crates/backend/src/typegen.rs +++ b/crates/backend/src/typegen.rs @@ -71,7 +71,7 @@ static KNOWN_TYPES: Lazy> = Lazy::new(|| { ("null", "null"), ("symbol", "symbol"), ("external", "object"), - ("AsyncTaskAbortController", "AbortController"), + ("AbortSignal", "AbortSignal"), ("Function", "(...args: any[]) => any"), ]); diff --git a/crates/macro/src/parser/attrs.rs b/crates/macro/src/parser/attrs.rs index 5d215d09..d3a98d1b 100644 --- a/crates/macro/src/parser/attrs.rs +++ b/crates/macro/src/parser/attrs.rs @@ -50,6 +50,7 @@ macro_rules! attrgen { (skip, Skip(Span)), (strict, Strict(Span)), (object, Object(Span)), + (task, Task(Span)), // impl later // (inspectable, Inspectable(Span)), diff --git a/crates/napi/src/async_work.rs b/crates/napi/src/async_work.rs index 4684d3a0..231a7f21 100644 --- a/crates/napi/src/async_work.rs +++ b/crates/napi/src/async_work.rs @@ -1,7 +1,7 @@ use std::mem; use std::os::raw::c_void; use std::ptr; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicU8, Ordering}; use std::{ffi::CString, rc::Rc}; use crate::{ @@ -14,7 +14,7 @@ struct AsyncWork { deferred: sys::napi_deferred, value: Result>, napi_async_work: sys::napi_async_work, - abort: Rc, + status: Rc, } pub struct AsyncWorkPromise { @@ -22,8 +22,11 @@ pub struct AsyncWorkPromise { raw_promise: sys::napi_value, pub(crate) deferred: sys::napi_deferred, env: sys::napi_env, - // share with AsyncWork - pub(crate) abort: Rc, + /// share with AsyncWork + /// 0: not started + /// 1: completed + /// 2: canceled + pub(crate) status: Rc, } impl AsyncWorkPromise { @@ -33,7 +36,7 @@ impl AsyncWorkPromise { pub fn cancel(&self) -> Result<()> { // must be happened in the main thread, relaxed is enough - self.abort.store(true, Ordering::Relaxed); + self.status.store(2, Ordering::Relaxed); check_status!(unsafe { sys::napi_cancel_async_work(self.env, self.napi_async_work) }) } } @@ -41,20 +44,20 @@ impl AsyncWorkPromise { pub fn run( env: sys::napi_env, task: T, - abort_status: Option>, + abort_status: Option>, ) -> Result { let mut raw_resource = ptr::null_mut(); check_status!(unsafe { sys::napi_create_object(env, &mut raw_resource) })?; let mut raw_promise = ptr::null_mut(); let mut deferred = ptr::null_mut(); check_status!(unsafe { sys::napi_create_promise(env, &mut deferred, &mut raw_promise) })?; - let task_abort = abort_status.unwrap_or_else(|| Rc::new(AtomicBool::new(false))); + let task_status = abort_status.unwrap_or_else(|| Rc::new(AtomicU8::new(0))); let result = Box::leak(Box::new(AsyncWork { inner_task: task, deferred, value: Ok(mem::MaybeUninit::zeroed()), napi_async_work: ptr::null_mut(), - abort: task_abort.clone(), + status: task_status.clone(), })); check_status!(unsafe { sys::napi_create_async_work( @@ -76,7 +79,7 @@ pub fn run( raw_promise, deferred, env, - abort: task_abort, + status: task_status, }) } @@ -88,16 +91,11 @@ unsafe impl Sync for AsyncWork {} /// So it actually could do nothing here, because `execute` function is called in the other thread mostly. unsafe extern "C" fn execute(_env: sys::napi_env, data: *mut c_void) { let mut work = Box::from_raw(data as *mut AsyncWork); - if work.abort.load(Ordering::Relaxed) { - return; - } let _ = mem::replace( &mut work.value, work.inner_task.compute().map(mem::MaybeUninit::new), ); - if !work.abort.load(Ordering::Relaxed) { - Box::leak(work); - } + Box::leak(work); } unsafe extern "C" fn complete( @@ -116,22 +114,28 @@ unsafe extern "C" fn complete( } Err(e) => work.inner_task.reject(Env::from_raw(env), e), }; - match check_status!(status) - .and_then(move |_| value) - .and_then(|v| ToNapiValue::to_napi_value(env, v)) - { - Ok(v) => { - let status = sys::napi_resolve_deferred(env, deferred, v); - debug_assert!(status == sys::Status::napi_ok, "Resolve promise failed"); - } - Err(e) => { - let status = sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)); - debug_assert!(status == sys::Status::napi_ok, "Reject promise failed"); - } - }; + if status != sys::Status::napi_cancelled && work.status.load(Ordering::Relaxed) != 2 { + match check_status!(status) + .and_then(move |_| value) + .and_then(|v| ToNapiValue::to_napi_value(env, v)) + { + Ok(v) => { + let status = sys::napi_resolve_deferred(env, deferred, v); + debug_assert!(status == sys::Status::napi_ok, "Resolve promise failed"); + } + Err(e) => { + let status = sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)); + debug_assert!(status == sys::Status::napi_ok, "Reject promise failed"); + } + }; + } + if let Err(e) = work.inner_task.finally(Env::from_raw(env)) { + debug_assert!(false, "Panic in Task finally fn: {:?}", e); + } let delete_status = sys::napi_delete_async_work(env, napi_async_work); debug_assert!( delete_status == sys::Status::napi_ok, "Delete async work failed" ); + work.status.store(1, Ordering::Relaxed); } diff --git a/crates/napi/src/bindgen_runtime/js_values/task.rs b/crates/napi/src/bindgen_runtime/js_values/task.rs index 717072d3..5c91ad4f 100644 --- a/crates/napi/src/bindgen_runtime/js_values/task.rs +++ b/crates/napi/src/bindgen_runtime/js_values/task.rs @@ -1,14 +1,14 @@ use std::ffi::c_void; use std::ptr; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; +use std::sync::atomic::{AtomicPtr, AtomicU8, Ordering}; use super::{FromNapiValue, ToNapiValue, TypeName}; use crate::{async_work, check_status, Env, Error, JsError, JsObject, NapiValue, Status, Task}; pub struct AsyncTask { inner: T, - abort_controller: Option, + abort_signal: Option, } impl TypeName for T { @@ -25,41 +25,40 @@ impl AsyncTask { pub fn new(task: T) -> Self { Self { inner: task, - abort_controller: None, + abort_signal: None, } } - pub fn with_abort_controller(task: T, abort_controller: AsyncTaskAbortController) -> Self { + pub fn with_signal(task: T, signal: AbortSignal) -> Self { Self { inner: task, - abort_controller: Some(abort_controller), + abort_signal: Some(signal), } } } /// https://developer.mozilla.org/zh-CN/docs/Web/API/AbortController -pub struct AsyncTaskAbortController { +pub struct AbortSignal { raw_work: Rc>, raw_deferred: Rc>, - abort: Rc, + status: Rc, } -impl FromNapiValue for AsyncTaskAbortController { +impl FromNapiValue for AbortSignal { unsafe fn from_napi_value( env: napi_sys::napi_env, napi_val: napi_sys::napi_value, ) -> crate::Result { - let raw_abort_controller = JsObject::from_raw_unchecked(env, napi_val); - let mut signal = raw_abort_controller.get_named_property::("signal")?; + let mut signal = JsObject::from_raw_unchecked(env, napi_val); let async_work_inner: Rc> = Rc::new(AtomicPtr::new(ptr::null_mut())); let raw_promise: Rc> = Rc::new(AtomicPtr::new(ptr::null_mut())); - let abort_status = Rc::new(AtomicBool::new(false)); - let abort_controller = AsyncTaskAbortController { + let task_status = Rc::new(AtomicU8::new(0)); + let abort_controller = AbortSignal { raw_work: async_work_inner.clone(), raw_deferred: raw_promise.clone(), - abort: abort_status.clone(), + status: task_status.clone(), }; let js_env = Env::from_raw(env); check_status!(napi_sys::napi_wrap( @@ -71,10 +70,10 @@ impl FromNapiValue for AsyncTaskAbortController { ptr::null_mut(), ))?; signal.set_named_property("onabort", js_env.create_function("onabort", on_abort)?)?; - Ok(AsyncTaskAbortController { + Ok(AbortSignal { raw_work: async_work_inner, raw_deferred: raw_promise, - abort: abort_status, + status: task_status, }) } } @@ -107,13 +106,16 @@ extern "C" fn on_abort( "{}", "Unwrap async_task from AbortSignal failed" ); - let abort_controller = Box::leak(Box::from_raw(async_task as *mut AsyncTaskAbortController)); + let abort_controller = Box::leak(Box::from_raw(async_task as *mut AbortSignal)); + // Task Completed, return now + if abort_controller.status.load(Ordering::Relaxed) == 1 { + return ptr::null_mut(); + } let raw_async_work = abort_controller.raw_work.load(Ordering::Relaxed); let deferred = abort_controller.raw_deferred.load(Ordering::Relaxed); - // abort function must be called from JavaScript main thread, so Relaxed Ordering is ok. - abort_controller.abort.store(true, Ordering::Relaxed); napi_sys::napi_cancel_async_work(env, raw_async_work); - // napi_sys::napi_delete_async_work(env, raw_async_work); + // abort function must be called from JavaScript main thread, so Relaxed Ordering is ok. + abort_controller.status.store(2, Ordering::Relaxed); let abort_error = Error::new(Status::Cancelled, "AbortError".to_owned()); let reject_status = napi_sys::napi_reject_deferred(env, deferred, JsError::from(abort_error).into_value(env)); @@ -123,10 +125,8 @@ extern "C" fn on_abort( "{}", "Reject AbortError failed" ); - let mut undefined = ptr::null_mut(); - napi_sys::napi_get_undefined(env, &mut undefined); - undefined } + ptr::null_mut() } impl ToNapiValue for AsyncTask { @@ -134,8 +134,8 @@ impl ToNapiValue for AsyncTask { env: napi_sys::napi_env, val: Self, ) -> crate::Result { - if let Some(abort_controller) = val.abort_controller { - let async_promise = async_work::run(env, val.inner, Some(abort_controller.abort.clone()))?; + if let Some(abort_controller) = val.abort_signal { + let async_promise = async_work::run(env, val.inner, Some(abort_controller.status.clone()))?; abort_controller .raw_work .store(async_promise.napi_async_work, Ordering::Relaxed); @@ -155,5 +155,5 @@ unsafe extern "C" fn async_task_abort_controller_finalize( finalize_data: *mut c_void, _finalize_hint: *mut c_void, ) { - Box::from_raw(finalize_data as *mut AsyncTaskAbortController); + Box::from_raw(finalize_data as *mut AbortSignal); } diff --git a/crates/napi/src/bindgen_runtime/module_register.rs b/crates/napi/src/bindgen_runtime/module_register.rs index 5637e46e..3632a0d6 100644 --- a/crates/napi/src/bindgen_runtime/module_register.rs +++ b/crates/napi/src/bindgen_runtime/module_register.rs @@ -77,7 +77,11 @@ unsafe extern "C" fn napi_register_module_v1( for (rust_name, (js_name, props)) in to_register_classes.take().into_iter() { unsafe { let (ctor, props): (Vec<_>, Vec<_>) = props.into_iter().partition(|prop| prop.is_ctor); - // one or more? + // one or more or zero? + // zero is for `#[napi(task)]` + if ctor.is_empty() { + continue; + } let ctor = ctor[0].raw().method.unwrap(); let raw_props: Vec<_> = props.iter().map(|prop| prop.raw()).collect(); diff --git a/crates/napi/src/js_values/buffer.rs b/crates/napi/src/js_values/buffer.rs index 2ca4c92c..a0a7fcb2 100644 --- a/crates/napi/src/js_values/buffer.rs +++ b/crates/napi/src/js_values/buffer.rs @@ -2,12 +2,8 @@ use std::mem; use std::ops::{Deref, DerefMut}; use std::ptr; -use super::Value; -#[cfg(feature = "serde-json")] -use super::ValueType; -use crate::bindgen_runtime::TypeName; -use crate::check_status; -use crate::{sys, JsUnknown, NapiValue, Ref, Result}; +use super::{Value, ValueType}; +use crate::{bindgen_runtime::TypeName, check_status, sys, JsUnknown, NapiValue, Ref, Result}; pub struct JsBuffer(pub(crate) Value); diff --git a/crates/napi/src/js_values/value_ref.rs b/crates/napi/src/js_values/value_ref.rs index 198e0293..41aeb6bf 100644 --- a/crates/napi/src/js_values/value_ref.rs +++ b/crates/napi/src/js_values/value_ref.rs @@ -34,7 +34,7 @@ impl Ref { Ok(self.count) } - pub fn unref(mut self, env: Env) -> Result { + pub fn unref(&mut self, env: Env) -> Result { check_status!(unsafe { sys::napi_reference_unref(env.0, self.raw_ref, &mut self.count) })?; if self.count == 0 { diff --git a/crates/napi/src/task.rs b/crates/napi/src/task.rs index c85c8002..96ba80c9 100644 --- a/crates/napi/src/task.rs +++ b/crates/napi/src/task.rs @@ -11,13 +11,15 @@ pub trait Task: Send + Sized { fn compute(&mut self) -> Result; /// Into this method if `compute` return `Ok` - fn resolve(self, env: Env, output: Self::Output) -> Result; + fn resolve(&mut self, env: Env, output: Self::Output) -> Result; /// Into this method if `compute` return `Err` - fn reject(self, _env: Env, err: Error) -> Result { + fn reject(&mut self, _env: Env, err: Error) -> Result { Err(err) } // after resolve or reject - fn finally() {} + fn finally(&mut self, _env: Env) -> Result<()> { + Ok(()) + } } diff --git a/examples/napi-compat-mode/src/napi4/tsfn.rs b/examples/napi-compat-mode/src/napi4/tsfn.rs index 70f18e18..dd464e38 100644 --- a/examples/napi-compat-mode/src/napi4/tsfn.rs +++ b/examples/napi-compat-mode/src/napi4/tsfn.rs @@ -154,15 +154,16 @@ pub fn test_tsfn_with_ref(ctx: CallContext) -> Result { let callback = ctx.get::(0)?; let options = ctx.get::(1)?; let options_ref = ctx.env.create_reference(options)?; - let tsfn = - ctx - .env - .create_threadsafe_function(&callback, 0, |ctx: ThreadSafeCallContext>| { - ctx - .env - .get_reference_value_unchecked::(&ctx.value) - .and_then(|obj| ctx.value.unref(ctx.env).map(|_| vec![obj])) - })?; + let tsfn = ctx.env.create_threadsafe_function( + &callback, + 0, + |mut ctx: ThreadSafeCallContext>| { + ctx + .env + .get_reference_value_unchecked::(&ctx.value) + .and_then(|obj| ctx.value.unref(ctx.env).map(|_| vec![obj])) + }, + )?; thread::spawn(move || { tsfn.call(Ok(options_ref), ThreadsafeFunctionCallMode::Blocking); diff --git a/examples/napi-compat-mode/src/task.rs b/examples/napi-compat-mode/src/task.rs index 485befd2..52a688de 100644 --- a/examples/napi-compat-mode/src/task.rs +++ b/examples/napi-compat-mode/src/task.rs @@ -22,7 +22,7 @@ impl Task for ComputeFib { Ok(fibonacci_native(self.n)) } - fn resolve(self, env: Env, output: Self::Output) -> Result { + fn resolve(&mut self, env: Env, output: Self::Output) -> Result { env.create_uint32(output) } } @@ -63,15 +63,18 @@ impl Task for CountBufferLength { Ok((&self.data).len()) } - fn resolve(self, env: Env, output: Self::Output) -> Result { - self.data.unref(env)?; + fn resolve(&mut self, env: Env, output: Self::Output) -> Result { env.create_uint32(output as _) } - fn reject(self, env: Env, err: Error) -> Result { - self.data.unref(env)?; + fn reject(&mut self, env: Env, err: Error) -> Result { Err(err) } + + fn finally(&mut self, env: Env) -> Result<()> { + self.data.unref(env)?; + Ok(()) + } } #[js_function(1)] diff --git a/examples/napi/__test__/typegen.spec.ts.md b/examples/napi/__test__/typegen.spec.ts.md index f3f295de..f2b5d698 100644 --- a/examples/napi/__test__/typegen.spec.ts.md +++ b/examples/napi/__test__/typegen.spec.ts.md @@ -43,7 +43,7 @@ Generated by [AVA](https://avajs.dev). export function concatUtf16(s: string): string␊ export function concatLatin1(s: string): string␊ export function withoutAbortController(a: number, b: number): Promise␊ - export function withAbortController(a: number, b: number, ctrl: AbortController): Promise␊ + export function withAbortController(a: number, b: number, signal: AbortSignal): Promise␊ export function getBuffer(): Buffer␊ export class Animal {␊ readonly kind: Kind␊ diff --git a/examples/napi/__test__/typegen.spec.ts.snap b/examples/napi/__test__/typegen.spec.ts.snap index ba59cedd..6203a4e4 100644 Binary files a/examples/napi/__test__/typegen.spec.ts.snap and b/examples/napi/__test__/typegen.spec.ts.snap differ diff --git a/examples/napi/__test__/values.spec.ts b/examples/napi/__test__/values.spec.ts index eb8d9ad1..359b1b51 100644 --- a/examples/napi/__test__/values.spec.ts +++ b/examples/napi/__test__/values.spec.ts @@ -167,9 +167,11 @@ test('async task without abort controller', async (t) => { t.is(await withoutAbortController(1, 2), 3) }) -test('async task with abort controller', async (t) => { +const MaybeTest = typeof AbortController !== 'undefined' ? test : test.skip + +MaybeTest('async task with abort controller', async (t) => { const ctrl = new AbortController() - const promise = withAbortController(1, 2, ctrl) + const promise = withAbortController(1, 2, ctrl.signal) try { ctrl.abort() await promise @@ -178,3 +180,9 @@ test('async task with abort controller', async (t) => { t.is((err as Error).message, 'AbortError') } }) + +MaybeTest('abort resolved task', async (t) => { + const ctrl = new AbortController() + await withAbortController(1, 2, ctrl.signal).then(() => ctrl.abort()) + t.pass('should not throw') +}) diff --git a/examples/napi/index.d.ts b/examples/napi/index.d.ts index a2b7d8bc..eb922314 100644 --- a/examples/napi/index.d.ts +++ b/examples/napi/index.d.ts @@ -33,7 +33,7 @@ export function concatStr(mutS: string): string export function concatUtf16(s: string): string export function concatLatin1(s: string): string export function withoutAbortController(a: number, b: number): Promise -export function withAbortController(a: number, b: number, ctrl: AbortController): Promise +export function withAbortController(a: number, b: number, signal: AbortSignal): Promise export function getBuffer(): Buffer export class Animal { readonly kind: Kind diff --git a/examples/napi/src/task.rs b/examples/napi/src/task.rs index c1b6bad0..0292c1fc 100644 --- a/examples/napi/src/task.rs +++ b/examples/napi/src/task.rs @@ -5,17 +5,17 @@ use napi::Task; struct DelaySum(u32, u32); -#[napi] +#[napi(task)] impl Task for DelaySum { type Output = u32; type JsValue = u32; fn compute(&mut self) -> Result { - sleep(std::time::Duration::from_secs(1)); + sleep(std::time::Duration::from_millis(100)); Ok(self.0 + self.1) } - fn resolve(self, _env: napi::Env, output: Self::Output) -> Result { + fn resolve(&mut self, _env: napi::Env, output: Self::Output) -> Result { Ok(output) } } @@ -26,6 +26,6 @@ fn without_abort_controller(a: u32, b: u32) -> AsyncTask { } #[napi] -fn with_abort_controller(a: u32, b: u32, ctrl: AsyncTaskAbortController) -> AsyncTask { - AsyncTask::with_abort_controller(DelaySum(a, b), ctrl) +fn with_abort_controller(a: u32, b: u32, signal: AbortSignal) -> AsyncTask { + AsyncTask::with_signal(DelaySum(a, b), signal) }