fix(napi): only shutdown tokio runtime once
This commit is contained in:
parent
13c9437b61
commit
915b423026
2 changed files with 48 additions and 29 deletions
|
@ -238,11 +238,13 @@ unsafe extern "C" fn napi_register_module_v1(
|
|||
});
|
||||
|
||||
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
|
||||
if let Err(e) = check_status!(
|
||||
sys::napi_add_env_cleanup_hook(env, Some(crate::shutdown_tokio_rt), ptr::null_mut()),
|
||||
"Failed to initialize module",
|
||||
) {
|
||||
JsError::from(e).throw_into(env);
|
||||
{
|
||||
let _ = crate::tokio_runtime::RT.clone();
|
||||
crate::tokio_runtime::TOKIO_RT_REF_COUNT.fetch_add(1, Ordering::Relaxed);
|
||||
assert_eq!(
|
||||
sys::napi_add_env_cleanup_hook(env, Some(crate::shutdown_tokio_rt), ptr::null_mut()),
|
||||
sys::Status::napi_ok
|
||||
);
|
||||
}
|
||||
|
||||
exports
|
||||
|
|
|
@ -1,35 +1,52 @@
|
|||
use std::{ffi::c_void, future::Future, ptr};
|
||||
use std::ffi::c_void;
|
||||
use std::future::Future;
|
||||
use std::ptr;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use tokio::{
|
||||
runtime::Handle,
|
||||
sync::mpsc::{self, error::TrySendError},
|
||||
};
|
||||
|
||||
use crate::{check_status, promise, sys, Result};
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::{runtime::Handle, sync::mpsc};
|
||||
|
||||
static RT: Lazy<(Handle, mpsc::Sender<()>)> = Lazy::new(|| {
|
||||
let runtime = tokio::runtime::Runtime::new();
|
||||
let (sender, mut receiver) = mpsc::channel::<()>(1);
|
||||
runtime
|
||||
.map(|rt| {
|
||||
let h = rt.handle();
|
||||
let handle = h.clone();
|
||||
handle.spawn(async move {
|
||||
if receiver.recv().await.is_some() {
|
||||
rt.shutdown_background();
|
||||
}
|
||||
});
|
||||
lazy_static! {
|
||||
pub(crate) static ref RT: (Handle, mpsc::Sender<()>) = {
|
||||
let runtime = tokio::runtime::Runtime::new();
|
||||
let (sender, mut receiver) = mpsc::channel::<()>(1);
|
||||
runtime
|
||||
.map(|rt| {
|
||||
let h = rt.handle();
|
||||
let handle = h.clone();
|
||||
handle.spawn(async move {
|
||||
if receiver.recv().await.is_some() {
|
||||
rt.shutdown_background();
|
||||
}
|
||||
});
|
||||
|
||||
(handle, sender)
|
||||
})
|
||||
.expect("Create tokio runtime failed")
|
||||
});
|
||||
(handle, sender)
|
||||
})
|
||||
.expect("Create tokio runtime failed")
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) static TOKIO_RT_REF_COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
#[doc(hidden)]
|
||||
#[inline(never)]
|
||||
pub extern "C" fn shutdown_tokio_rt(_arg: *mut c_void) {
|
||||
let sender = &RT.1;
|
||||
sender
|
||||
.clone()
|
||||
.try_send(())
|
||||
.expect("Shutdown tokio runtime failed");
|
||||
if TOKIO_RT_REF_COUNT.fetch_sub(1, Ordering::Relaxed) == 0 {
|
||||
let sender = &RT.1;
|
||||
if let Err(e) = sender.clone().try_send(()) {
|
||||
match e {
|
||||
TrySendError::Closed(_) => {}
|
||||
TrySendError::Full(_) => {
|
||||
panic!("Send shutdown signal to tokio runtime failed, queue is full");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn<F>(fut: F)
|
||||
|
|
Loading…
Reference in a new issue