diff --git a/crates/napi/src/threadsafe_function.rs b/crates/napi/src/threadsafe_function.rs index 7f4953be..063aa555 100644 --- a/crates/napi/src/threadsafe_function.rs +++ b/crates/napi/src/threadsafe_function.rs @@ -4,10 +4,9 @@ use std::convert::Into; use std::ffi::CString; use std::marker::PhantomData; use std::os::raw::c_void; -use std::pin::Pin; use std::ptr::{self, null_mut}; use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, RwLock, Weak}; use crate::bindgen_runtime::{FromNapiValue, ToNapiValue, TypeName, ValidateNapiValue}; use crate::{check_status, sys, Env, JsError, JsUnknown, Result, Status}; @@ -103,20 +102,17 @@ struct ThreadsafeFunctionHandle { referred: AtomicBool, } -unsafe impl Send for ThreadsafeFunctionHandle {} -unsafe impl Sync for ThreadsafeFunctionHandle {} - impl ThreadsafeFunctionHandle { - /// create a pinned Arc to hold the `ThreadsafeFunctionHandle` - fn new(raw: sys::napi_threadsafe_function) -> Pin> { - Arc::pin(Self { + /// create a Arc to hold the `ThreadsafeFunctionHandle` + fn new(raw: sys::napi_threadsafe_function) -> Arc { + Arc::new(Self { raw: AtomicPtr::new(raw), aborted: RwLock::new(false), referred: AtomicBool::new(true), }) } - fn null() -> Pin> { + fn null() -> Arc { Self::new(null_mut()) } @@ -215,7 +211,7 @@ struct ThreadsafeFunctionCallJsBackData { /// } /// ``` pub struct ThreadsafeFunction { - handle: Pin>, + handle: Arc, _phantom: PhantomData<(T, ES)>, } @@ -337,7 +333,7 @@ impl ThreadsafeFunction { async_resource_name, max_queue_size, 1, - Arc::into_raw(Pin::into_inner(handle.clone())) as *mut c_void, // pass handler to thread_finalize_cb + Arc::downgrade(&handle).into_raw() as *mut c_void, // pass handler to thread_finalize_cb Some(thread_finalize_cb::), callback_ptr.cast(), Some(call_js_cb::), @@ -569,13 +565,18 @@ unsafe extern "C" fn thread_finalize_cb( ) where R: 'static + Send + FnMut(ThreadSafeCallContext) -> Result>, { - let handle = unsafe { Arc::from_raw(finalize_data.cast::()) }; - let mut aborted_guard = handle - .aborted - .write() - .expect("Threadsafe Function Handle aborted lock failed"); - if !*aborted_guard { - *aborted_guard = true; + let handle_option = + unsafe { Weak::from_raw(finalize_data.cast::()).upgrade() }; + + if let Some(handle) = handle_option { + let mut aborted_guard = handle + .aborted + .write() + .expect("Threadsafe Function Handle aborted lock failed"); + + if !*aborted_guard { + *aborted_guard = true; + } } // cleanup diff --git a/examples/napi/src/threadsafe_function.rs b/examples/napi/src/threadsafe_function.rs index a8573ec1..41a692b7 100644 --- a/examples/napi/src/threadsafe_function.rs +++ b/examples/napi/src/threadsafe_function.rs @@ -137,7 +137,7 @@ pub async fn tsfn_return_promise(func: ThreadsafeFunction) -> Result { pub async fn tsfn_return_promise_timeout(func: ThreadsafeFunction) -> Result { use tokio::time::{self, Duration}; let promise = func.call_async::>(Ok(1)).await?; - let sleep = time::sleep(Duration::from_millis(200)); + let sleep = time::sleep(Duration::from_millis(100)); tokio::select! { _ = sleep => { return Err(Error::new(Status::GenericFailure, "Timeout".to_owned()));