fix(napi): revert Promise changes because of the flaky test

This commit is contained in:
LongYinan 2023-04-10 17:02:47 +08:00
parent 88773a7a8e
commit 752ffea1d9
No known key found for this signature in database
GPG key ID: C3666B7FC82ADAD7
3 changed files with 46 additions and 62 deletions

View file

@ -2,6 +2,7 @@ name: Linux-armv7
env: env:
DEBUG: 'napi:*' DEBUG: 'napi:*'
RUST_BACKTRACE: 1
concurrency: concurrency:
group: ${{ github.workflow }}-${{ github.ref }} group: ${{ github.workflow }}-${{ github.ref }}

View file

@ -1,33 +1,30 @@
use std::ffi::{c_void, CStr}; use std::ffi::CStr;
use std::future; use std::future;
use std::pin::Pin; use std::pin::Pin;
use std::ptr; use std::ptr;
use std::sync::{ use std::sync::{
atomic::{AtomicBool, AtomicPtr, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
}; };
use std::task::{Context, Poll, Waker}; use std::task::{Context, Poll};
use crate::{check_status, sys, Error, JsUnknown, NapiValue, Result}; use tokio::sync::oneshot::{channel, Receiver, Sender};
use crate::{check_status, sys, Error, JsUnknown, NapiValue, Result, Status};
use super::{FromNapiValue, TypeName, ValidateNapiValue}; use super::{FromNapiValue, TypeName, ValidateNapiValue};
struct PromiseInner<T: FromNapiValue> { pub struct Promise<T: FromNapiValue> {
value: AtomicPtr<Result<T>>, value: Pin<Box<Receiver<*mut Result<T>>>>,
waker: AtomicPtr<Waker>, aborted: Arc<AtomicBool>,
aborted: AtomicBool,
} }
impl<T: FromNapiValue> Drop for PromiseInner<T> { impl<T: FromNapiValue> Drop for Promise<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.aborted.store(true, Ordering::SeqCst); self.aborted.store(true, Ordering::SeqCst);
} }
} }
pub struct Promise<T: FromNapiValue> {
inner: Arc<PromiseInner<T>>,
}
impl<T: FromNapiValue> TypeName for Promise<T> { impl<T: FromNapiValue> TypeName for Promise<T> {
fn type_name() -> &'static str { fn type_name() -> &'static str {
"Promise" "Promise"
@ -103,13 +100,9 @@ impl<T: FromNapiValue> FromNapiValue for Promise<T> {
)?; )?;
let mut promise_after_then = ptr::null_mut(); let mut promise_after_then = ptr::null_mut();
let mut then_js_cb = ptr::null_mut(); let mut then_js_cb = ptr::null_mut();
let promise_inner = PromiseInner { let (tx, rx) = channel();
value: AtomicPtr::new(ptr::null_mut()), let aborted = Arc::new(AtomicBool::new(false));
waker: AtomicPtr::new(ptr::null_mut()), let tx_ptr = Box::into_raw(Box::new((tx, aborted.clone())));
aborted: AtomicBool::new(false),
};
let shared_inner = Arc::new(promise_inner);
let context_ptr = Arc::into_raw(shared_inner.clone());
check_status!( check_status!(
unsafe { unsafe {
sys::napi_create_function( sys::napi_create_function(
@ -117,7 +110,7 @@ impl<T: FromNapiValue> FromNapiValue for Promise<T> {
then_c_string.as_ptr(), then_c_string.as_ptr(),
4, 4,
Some(then_callback::<T>), Some(then_callback::<T>),
context_ptr as *mut c_void, tx_ptr.cast(),
&mut then_js_cb, &mut then_js_cb,
) )
}, },
@ -152,7 +145,7 @@ impl<T: FromNapiValue> FromNapiValue for Promise<T> {
catch_c_string.as_ptr(), catch_c_string.as_ptr(),
5, 5,
Some(catch_callback::<T>), Some(catch_callback::<T>),
context_ptr as *mut c_void, tx_ptr.cast(),
&mut catch_js_cb, &mut catch_js_cb,
) )
}, },
@ -172,7 +165,8 @@ impl<T: FromNapiValue> FromNapiValue for Promise<T> {
"Failed to call catch method" "Failed to call catch method"
)?; )?;
Ok(Promise { Ok(Promise {
inner: shared_inner, value: Box::pin(rx),
aborted,
}) })
} }
} }
@ -180,19 +174,13 @@ impl<T: FromNapiValue> FromNapiValue for Promise<T> {
impl<T: FromNapiValue> future::Future for Promise<T> { impl<T: FromNapiValue> future::Future for Promise<T> {
type Output = Result<T>; type Output = Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.inner.value.load(Ordering::Relaxed).is_null() { match self.value.as_mut().poll(cx) {
if self.inner.waker.load(Ordering::Acquire).is_null() { Poll::Pending => Poll::Pending,
self.inner.waker.store( Poll::Ready(v) => Poll::Ready(
Box::into_raw(Box::new(cx.waker().clone())), v.map_err(|e| Error::new(Status::GenericFailure, format!("{}", e)))
Ordering::Release, .and_then(|v| unsafe { *Box::from_raw(v) }.map_err(Error::from)),
); ),
}
Poll::Pending
} else {
Poll::Ready(
unsafe { Box::from_raw(self.inner.value.load(Ordering::Relaxed)) }.map_err(Error::from),
)
} }
} }
} }
@ -218,20 +206,15 @@ unsafe extern "C" fn then_callback<T: FromNapiValue>(
get_cb_status == sys::Status::napi_ok, get_cb_status == sys::Status::napi_ok,
"Get callback info from Promise::then failed" "Get callback info from Promise::then failed"
); );
let PromiseInner { let (sender, aborted) =
value, *unsafe { Box::from_raw(data as *mut (Sender<*mut Result<T>>, Arc<AtomicBool>)) };
waker,
aborted,
} = &*unsafe { Arc::from_raw(data as *mut PromiseInner<T>) };
if aborted.load(Ordering::SeqCst) { if aborted.load(Ordering::SeqCst) {
return this; return this;
} }
let resolve_value_t = Box::new(unsafe { T::from_napi_value(env, resolved_value[0]) }); let resolve_value_t = Box::new(unsafe { T::from_napi_value(env, resolved_value[0]) });
value.store(Box::into_raw(resolve_value_t), Ordering::Relaxed); sender
let waker = waker.load(Ordering::Acquire); .send(Box::into_raw(resolve_value_t))
if !waker.is_null() { .expect("Send Promise resolved value error");
unsafe { Box::from_raw(waker) }.wake();
}
this this
} }
@ -258,23 +241,15 @@ unsafe extern "C" fn catch_callback<T: FromNapiValue>(
"Get callback info from Promise::catch failed" "Get callback info from Promise::catch failed"
); );
let rejected_value = rejected_value[0]; let rejected_value = rejected_value[0];
let PromiseInner { let (sender, aborted) =
value, *unsafe { Box::from_raw(data as *mut (Sender<*mut Result<T>>, Arc<AtomicBool>)) };
waker,
aborted,
} = &*unsafe { Arc::from_raw(data as *mut PromiseInner<T>) };
if aborted.load(Ordering::SeqCst) { if aborted.load(Ordering::SeqCst) {
return this; return this;
} }
value.store( sender
Box::into_raw(Box::new(Err(Error::from(unsafe { .send(Box::into_raw(Box::new(Err(Error::from(unsafe {
JsUnknown::from_raw_unchecked(env, rejected_value) JsUnknown::from_raw_unchecked(env, rejected_value)
})))), })))))
Ordering::Relaxed, .expect("Send Promise resolved value error");
);
let waker = waker.load(Ordering::Acquire);
if !waker.is_null() {
unsafe { Box::from_raw(waker) }.wake();
}
this this
} }

View file

@ -115,12 +115,20 @@ extern "C" fn napi_resolve_deferred<Data: ToNapiValue, Resolver: FnOnce(Env) ->
match result { match result {
Ok(res) => { Ok(res) => {
let status = unsafe { sys::napi_resolve_deferred(env, deferred, res) }; let status = unsafe { sys::napi_resolve_deferred(env, deferred, res) };
debug_assert!(status == sys::Status::napi_ok, "Resolve promise failed"); debug_assert!(
status == sys::Status::napi_ok,
"Resolve promise failed {:?}",
crate::Status::from(status)
);
} }
Err(e) => { Err(e) => {
let status = let status =
unsafe { sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)) }; unsafe { sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)) };
debug_assert!(status == sys::Status::napi_ok, "Reject promise failed"); debug_assert!(
status == sys::Status::napi_ok,
"Reject promise failed {:?}",
crate::Status::from(status)
);
} }
} }
} }