From a4448d3e242a7793c9133917db2a5dbae232c0d8 Mon Sep 17 00:00:00 2001 From: LongYinan Date: Sat, 9 Jul 2022 15:48:46 +0800 Subject: [PATCH] Revert "fix(napi): memory leak in ThreadsafeFunction" This reverts commit 4dfc770c2ae9aa4e27463570f062ce84ed5147f9. --- crates/napi/src/threadsafe_function.rs | 44 ++++++++++---------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/crates/napi/src/threadsafe_function.rs b/crates/napi/src/threadsafe_function.rs index 0ae37f3c..3f0dbb9a 100644 --- a/crates/napi/src/threadsafe_function.rs +++ b/crates/napi/src/threadsafe_function.rs @@ -5,7 +5,7 @@ use std::ffi::CString; use std::marker::PhantomData; use std::os::raw::c_void; use std::ptr; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use crate::bindgen_runtime::ToNapiValue; @@ -147,6 +147,7 @@ type_level_enum! { pub struct ThreadsafeFunction { raw_tsfn: sys::napi_threadsafe_function, aborted: Arc, + ref_count: Arc, _phantom: PhantomData<(T, ES)>, } @@ -163,6 +164,7 @@ impl Clone for ThreadsafeFunction { Self { raw_tsfn: self.raw_tsfn, aborted: Arc::clone(&self.aborted), + ref_count: Arc::clone(&self.ref_count), _phantom: PhantomData, } } @@ -194,13 +196,6 @@ impl ThreadsafeFunction { let initial_thread_count = 1usize; let mut raw_tsfn = ptr::null_mut(); let ptr = Box::into_raw(Box::new(callback)) as *mut c_void; - let aborted = Arc::new(AtomicBool::new(false)); - let aborted_ptr = Arc::into_raw(aborted.clone()) as *mut c_void; - // `aborted_ptr` is passed into both `finalize_callback` and `env_cleanup_callback`. - // So increase strong count here to prevent it from being dropped twice. - unsafe { - Arc::increment_strong_count(aborted_ptr); - } check_status!(unsafe { sys::napi_create_threadsafe_function( env, @@ -211,17 +206,20 @@ impl ThreadsafeFunction { initial_thread_count, ptr, Some(thread_finalize_cb::), - aborted_ptr, + ptr, Some(call_js_cb::), &mut raw_tsfn, ) })?; + let aborted = Arc::new(AtomicBool::new(false)); + let aborted_ptr = Arc::into_raw(aborted.clone()) as *mut c_void; check_status!(unsafe { sys::napi_add_env_cleanup_hook(env, Some(cleanup_cb), aborted_ptr) })?; Ok(ThreadsafeFunction { raw_tsfn, aborted, + ref_count: Arc::new(AtomicUsize::new(initial_thread_count)), _phantom: PhantomData, }) } @@ -237,6 +235,7 @@ impl ThreadsafeFunction { "Can not ref, Thread safe function already aborted".to_string(), )); } + self.ref_count.fetch_add(1, Ordering::AcqRel); check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.raw_tsfn) }) } @@ -249,11 +248,12 @@ impl ThreadsafeFunction { "Can not unref, Thread safe function already aborted".to_string(), )); } + self.ref_count.fetch_sub(1, Ordering::AcqRel); check_status!(unsafe { sys::napi_unref_threadsafe_function(env.0, self.raw_tsfn) }) } pub fn aborted(&self) -> bool { - self.aborted.load(Ordering::Acquire) + self.aborted.load(Ordering::Relaxed) } pub fn abort(self) -> Result<()> { @@ -280,18 +280,14 @@ impl ThreadsafeFunction { if self.aborted.load(Ordering::Acquire) { return Status::Closing; } - let status = unsafe { + unsafe { sys::napi_call_threadsafe_function( self.raw_tsfn, Box::into_raw(Box::new(value)) as *mut _, mode.into(), ) } - .into(); - if status == Status::Closing { - self.aborted.store(true, Ordering::Release); - } - status + .into() } } @@ -302,24 +298,20 @@ impl ThreadsafeFunction { if self.aborted.load(Ordering::Acquire) { return Status::Closing; } - let status = unsafe { + unsafe { sys::napi_call_threadsafe_function( self.raw_tsfn, Box::into_raw(Box::new(value)) as *mut _, mode.into(), ) } - .into(); - if status == Status::Closing { - self.aborted.store(true, Ordering::Release); - } - status + .into() } } impl Drop for ThreadsafeFunction { fn drop(&mut self) { - if !self.aborted.load(Ordering::Acquire) { + if !self.aborted.load(Ordering::Acquire) && self.ref_count.load(Ordering::Acquire) > 0usize { let release_status = unsafe { sys::napi_release_threadsafe_function( self.raw_tsfn, @@ -336,20 +328,18 @@ impl Drop for ThreadsafeFunction { unsafe extern "C" fn cleanup_cb(cleanup_data: *mut c_void) { let aborted = unsafe { Arc::::from_raw(cleanup_data.cast()) }; - aborted.store(true, Ordering::Release); + aborted.store(true, Ordering::SeqCst); } unsafe extern "C" fn thread_finalize_cb( _raw_env: sys::napi_env, finalize_data: *mut c_void, - finalize_hint: *mut c_void, + _finalize_hint: *mut c_void, ) where R: 'static + Send + FnMut(ThreadSafeCallContext) -> Result>, { // cleanup drop(unsafe { Box::::from_raw(finalize_data.cast()) }); - let aborted = unsafe { Arc::::from_raw(finalize_hint.cast()) }; - aborted.store(true, Ordering::Release); } unsafe extern "C" fn call_js_cb(