From 915b4230266a50b001eee464543d7c2db8ae9520 Mon Sep 17 00:00:00 2001 From: LongYinan Date: Tue, 21 Dec 2021 23:22:23 +0800 Subject: [PATCH 1/2] fix(napi): only shutdown tokio runtime once --- .../src/bindgen_runtime/module_register.rs | 12 ++-- crates/napi/src/tokio_runtime.rs | 65 ++++++++++++------- 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/crates/napi/src/bindgen_runtime/module_register.rs b/crates/napi/src/bindgen_runtime/module_register.rs index a2cbf83e..69055294 100644 --- a/crates/napi/src/bindgen_runtime/module_register.rs +++ b/crates/napi/src/bindgen_runtime/module_register.rs @@ -238,11 +238,13 @@ unsafe extern "C" fn napi_register_module_v1( }); #[cfg(all(feature = "tokio_rt", feature = "napi4"))] - if let Err(e) = check_status!( - sys::napi_add_env_cleanup_hook(env, Some(crate::shutdown_tokio_rt), ptr::null_mut()), - "Failed to initialize module", - ) { - JsError::from(e).throw_into(env); + { + let _ = crate::tokio_runtime::RT.clone(); + crate::tokio_runtime::TOKIO_RT_REF_COUNT.fetch_add(1, Ordering::Relaxed); + assert_eq!( + sys::napi_add_env_cleanup_hook(env, Some(crate::shutdown_tokio_rt), ptr::null_mut()), + sys::Status::napi_ok + ); } exports diff --git a/crates/napi/src/tokio_runtime.rs b/crates/napi/src/tokio_runtime.rs index 118196ae..8828e331 100644 --- a/crates/napi/src/tokio_runtime.rs +++ b/crates/napi/src/tokio_runtime.rs @@ -1,35 +1,52 @@ -use std::{ffi::c_void, future::Future, ptr}; +use std::ffi::c_void; +use std::future::Future; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use lazy_static::lazy_static; +use tokio::{ + runtime::Handle, + sync::mpsc::{self, error::TrySendError}, +}; use crate::{check_status, promise, sys, Result}; -use once_cell::sync::Lazy; -use tokio::{runtime::Handle, sync::mpsc}; -static RT: Lazy<(Handle, mpsc::Sender<()>)> = Lazy::new(|| { - let runtime = tokio::runtime::Runtime::new(); - let (sender, mut receiver) = mpsc::channel::<()>(1); - runtime - .map(|rt| { - let h = rt.handle(); - let handle = h.clone(); - handle.spawn(async move { - if receiver.recv().await.is_some() { - rt.shutdown_background(); - } - }); +lazy_static! { + pub(crate) static ref RT: (Handle, mpsc::Sender<()>) = { + let runtime = tokio::runtime::Runtime::new(); + let (sender, mut receiver) = mpsc::channel::<()>(1); + runtime + .map(|rt| { + let h = rt.handle(); + let handle = h.clone(); + handle.spawn(async move { + if receiver.recv().await.is_some() { + rt.shutdown_background(); + } + }); - (handle, sender) - }) - .expect("Create tokio runtime failed") -}); + (handle, sender) + }) + .expect("Create tokio runtime failed") + }; +} + +pub(crate) static TOKIO_RT_REF_COUNT: AtomicUsize = AtomicUsize::new(0); #[doc(hidden)] #[inline(never)] pub extern "C" fn shutdown_tokio_rt(_arg: *mut c_void) { - let sender = &RT.1; - sender - .clone() - .try_send(()) - .expect("Shutdown tokio runtime failed"); + if TOKIO_RT_REF_COUNT.fetch_sub(1, Ordering::Relaxed) == 0 { + let sender = &RT.1; + if let Err(e) = sender.clone().try_send(()) { + match e { + TrySendError::Closed(_) => {} + TrySendError::Full(_) => { + panic!("Send shutdown signal to tokio runtime failed, queue is full"); + } + } + } + } } pub fn spawn(fut: F) From 4406059de1a6c197334dc813896b54ee314a7056 Mon Sep 17 00:00:00 2001 From: LongYinan Date: Tue, 21 Dec 2021 23:24:07 +0800 Subject: [PATCH 2/2] fix(napi): addon packages become undefined in worker_threads --- crates/napi/Cargo.toml | 7 +- .../src/bindgen_runtime/module_register.rs | 247 +++++++++++------- examples/napi/__test__/worker-thread.spec.ts | 20 ++ examples/napi/__test__/worker.js | 5 + examples/napi/package.json | 2 +- examples/napi/src/typed_array.rs | 2 +- yarn.lock | 2 +- 7 files changed, 186 insertions(+), 99 deletions(-) create mode 100644 examples/napi/__test__/worker-thread.spec.ts create mode 100644 examples/napi/__test__/worker.js diff --git a/crates/napi/Cargo.toml b/crates/napi/Cargo.toml index 1f83e105..a5f94db7 100644 --- a/crates/napi/Cargo.toml +++ b/crates/napi/Cargo.toml @@ -25,10 +25,11 @@ napi6 = ["napi5", "napi-sys/napi6"] napi7 = ["napi6", "napi-sys/napi7"] napi8 = ["napi7", "napi-sys/napi8"] serde-json = ["serde", "serde_json"] -tokio_rt = ["tokio", "once_cell", "napi4"] +tokio_rt = ["tokio", "napi4"] [dependencies] ctor = "0.1" +lazy_static = "1" napi-sys = {version = "2.1.0", path = "../sys"} [target.'cfg(windows)'.dependencies] @@ -43,10 +44,6 @@ features = ["rt", "rt-multi-thread", "sync"] optional = true version = "1" -[dependencies.once_cell] -optional = true -version = "1" - [dependencies.serde] optional = true version = "1" diff --git a/crates/napi/src/bindgen_runtime/module_register.rs b/crates/napi/src/bindgen_runtime/module_register.rs index 69055294..fc0c273a 100644 --- a/crates/napi/src/bindgen_runtime/module_register.rs +++ b/crates/napi/src/bindgen_runtime/module_register.rs @@ -2,6 +2,9 @@ use std::cell::RefCell; use std::collections::HashMap; use std::ffi::CStr; use std::ptr; +use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; + +use lazy_static::lazy_static; use crate::{check_status, check_status_or_throw, sys, JsError, Property, Result}; @@ -9,22 +12,92 @@ pub type ExportRegisterCallback = unsafe fn(sys::napi_env) -> Result Result<()>; -type ModuleRegisterCallback = - RefCell, (&'static str, ExportRegisterCallback))>>; +struct PersistedSingleThreadVec { + inner: AtomicPtr, + length: AtomicUsize, +} -type ModuleClassProperty = - RefCell, (&'static str, Vec)>>>; +impl Default for PersistedSingleThreadVec { + fn default() -> Self { + let mut vec: Vec = Vec::with_capacity(1); + let ret = PersistedSingleThreadVec { + inner: AtomicPtr::new(vec.as_mut_ptr()), + length: AtomicUsize::new(0), + }; + std::mem::forget(vec); + ret + } +} + +impl PersistedSingleThreadVec { + #[allow(clippy::mut_from_ref)] + fn borrow_mut(&self) -> &mut [T] { + let length = self.length.load(Ordering::Relaxed); + if length == 0 { + return &mut []; + } + let inner = self.inner.load(Ordering::Relaxed); + unsafe { std::slice::from_raw_parts_mut(inner, length) } + } + + fn push(&self, item: T) { + let length = self.length.load(Ordering::Relaxed); + let inner = self.inner.load(Ordering::Relaxed); + let mut temp = unsafe { Vec::from_raw_parts(inner, length, length) }; + temp.push(item); + // Inner Vec has been reallocated, so we need to update the pointer + if temp.as_mut_ptr() != inner { + self.inner.store(temp.as_mut_ptr(), Ordering::Relaxed); + } + std::mem::forget(temp); + + self.length.fetch_add(1, Ordering::Relaxed); + } +} + +unsafe impl Send for PersistedSingleThreadVec {} +unsafe impl Sync for PersistedSingleThreadVec {} + +struct PersistedSingleThreadHashMap(*mut HashMap); + +impl PersistedSingleThreadHashMap { + #[allow(clippy::mut_from_ref)] + fn borrow_mut(&self) -> &mut HashMap { + unsafe { Box::leak(Box::from_raw(self.0)) } + } +} + +impl Default for PersistedSingleThreadHashMap { + fn default() -> Self { + let map = Default::default(); + PersistedSingleThreadHashMap(Box::into_raw(Box::new(map))) + } +} + +type ModuleRegisterCallback = + PersistedSingleThreadVec<(Option<&'static str>, (&'static str, ExportRegisterCallback))>; + +type ModuleClassProperty = PersistedSingleThreadHashMap< + &'static str, + HashMap, (&'static str, Vec)>, +>; + +unsafe impl Send for PersistedSingleThreadHashMap {} +unsafe impl Sync for PersistedSingleThreadHashMap {} + +lazy_static! { + static ref MODULE_REGISTER_CALLBACK: ModuleRegisterCallback = Default::default(); + static ref MODULE_CLASS_PROPERTIES: ModuleClassProperty = Default::default(); + // compatibility for #[module_exports] + #[cfg(feature = "compat-mode")] + static ref MODULE_EXPORTS: PersistedSingleThreadVec = Default::default(); +} thread_local! { - static MODULE_REGISTER_CALLBACK: ModuleRegisterCallback = Default::default(); - static MODULE_CLASS_PROPERTIES: ModuleClassProperty = Default::default(); static REGISTERED_CLASSES: RefCell> = Default::default(); - // compatibility for #[module_exports] - #[cfg(feature = "compat-mode")] - static MODULE_EXPORTS: std::cell::Cell> = Default::default(); } pub fn get_class_constructor(js_name: &'static str) -> Option { @@ -37,7 +110,7 @@ pub fn get_class_constructor(js_name: &'static str) -> Option { #[cfg(feature = "compat-mode")] // compatibility for #[module_exports] pub fn register_module_exports(callback: ModuleExportsCallback) { - MODULE_EXPORTS.with(|cell| cell.set(vec![callback])); + MODULE_EXPORTS.push(callback); } pub fn register_module_export( @@ -45,10 +118,7 @@ pub fn register_module_export( name: &'static str, cb: ExportRegisterCallback, ) { - MODULE_REGISTER_CALLBACK.with(|exports| { - let mut list = exports.borrow_mut(); - list.push((js_mod, (name, cb))); - }); + MODULE_REGISTER_CALLBACK.push((js_mod, (name, cb))); } pub fn register_class( @@ -57,14 +127,12 @@ pub fn register_class( js_name: &'static str, props: Vec, ) { - MODULE_CLASS_PROPERTIES.with(|map| { - let mut map = map.borrow_mut(); - let val = map.entry(rust_name).or_default(); - let val = val.entry(js_mod).or_default(); + let map = MODULE_CLASS_PROPERTIES.borrow_mut(); + let val = map.entry(rust_name).or_default(); + let val = val.entry(js_mod).or_default(); - val.0 = js_name; - val.1.extend(props.into_iter()); - }); + val.0 = js_name; + val.1.extend(props.into_iter()); } #[no_mangle] @@ -73,76 +141,76 @@ unsafe extern "C" fn napi_register_module_v1( exports: sys::napi_value, ) -> sys::napi_value { let mut exports_objects: HashMap, sys::napi_value> = HashMap::default(); - MODULE_REGISTER_CALLBACK.with(|to_register_exports| { - to_register_exports - .take() - .iter_mut() - .fold( - HashMap::, Vec<(&'static str, ExportRegisterCallback)>>::new(), - |mut acc, (js_mod, item)| { - if let Some(k) = acc.get_mut(js_mod) { - k.push(*item); - } else { - acc.insert(*js_mod, vec![*item]); - } - acc - }, - ) - .iter() - .for_each(|(js_mod, items)| { - let mut exports_js_mod = ptr::null_mut(); - if let Some(js_mod_str) = js_mod { - if let Some(exports_object) = exports_objects.get(js_mod) { - exports_js_mod = *exports_object; - } else { - check_status_or_throw!( - env, - sys::napi_create_object(env, &mut exports_js_mod), - "Create export JavaScript Object [{}] failed", - js_mod_str - ); - check_status_or_throw!( + MODULE_REGISTER_CALLBACK + .borrow_mut() + .iter_mut() + .fold( + HashMap::, Vec<(&'static str, ExportRegisterCallback)>>::new(), + |mut acc, (js_mod, item)| { + if let Some(k) = acc.get_mut(js_mod) { + k.push(*item); + } else { + acc.insert(*js_mod, vec![*item]); + } + acc + }, + ) + .iter() + .for_each(|(js_mod, items)| { + let mut exports_js_mod = ptr::null_mut(); + if let Some(js_mod_str) = js_mod { + if let Some(exports_object) = exports_objects.get(js_mod) { + exports_js_mod = *exports_object; + } else { + check_status_or_throw!( + env, + sys::napi_create_object(env, &mut exports_js_mod), + "Create export JavaScript Object [{}] failed", + js_mod_str + ); + check_status_or_throw!( + env, + sys::napi_set_named_property( env, + exports, + js_mod_str.as_ptr() as *const _, + exports_js_mod + ), + "Set exports Object [{}] into exports object failed", + js_mod_str + ); + exports_objects.insert(*js_mod, exports_js_mod); + } + } + for (name, callback) in items { + let js_name = CStr::from_bytes_with_nul_unchecked(name.as_bytes()); + unsafe { + if let Err(e) = callback(env).and_then(|v| { + check_status!( sys::napi_set_named_property( env, - exports, - js_mod_str.as_ptr() as *const _, - exports_js_mod + if exports_js_mod.is_null() { + exports + } else { + exports_js_mod + }, + js_name.as_ptr(), + v ), - "Set exports Object [{}] into exports object failed", - js_mod_str - ); - exports_objects.insert(*js_mod, exports_js_mod); + "Failed to register export `{}`", + name, + ) + }) { + JsError::from(e).throw_into(env) } } - for (name, callback) in items { - let js_name = CStr::from_bytes_with_nul_unchecked(name.as_bytes()); - unsafe { - if let Err(e) = callback(env).and_then(|v| { - check_status!( - sys::napi_set_named_property( - env, - if exports_js_mod.is_null() { - exports - } else { - exports_js_mod - }, - js_name.as_ptr(), - v - ), - "Failed to register export `{}`", - name, - ) - }) { - JsError::from(e).throw_into(env) - } - } - } - }) - }); + } + }); - MODULE_CLASS_PROPERTIES.with(|to_register_classes| { - for (rust_name, js_mods) in to_register_classes.take().iter() { + MODULE_CLASS_PROPERTIES + .borrow_mut() + .iter() + .for_each(|(rust_name, js_mods)| { for (js_mod, (js_name, props)) in js_mods { let mut exports_js_mod = ptr::null_mut(); unsafe { @@ -225,15 +293,12 @@ unsafe extern "C" fn napi_register_module_v1( ); } } - } - }); + }); #[cfg(feature = "compat-mode")] - MODULE_EXPORTS.with(|callbacks| { - for callback in callbacks.take().into_iter() { - if let Err(e) = callback(env, exports) { - JsError::from(e).throw_into(env); - } + MODULE_EXPORTS.borrow_mut().iter().for_each(|callback| { + if let Err(e) = callback(env, exports) { + JsError::from(e).throw_into(env); } }); diff --git a/examples/napi/__test__/worker-thread.spec.ts b/examples/napi/__test__/worker-thread.spec.ts new file mode 100644 index 00000000..1f2251f3 --- /dev/null +++ b/examples/napi/__test__/worker-thread.spec.ts @@ -0,0 +1,20 @@ +import { join } from 'path' +import { Worker } from 'worker_threads' + +import test from 'ava' + +import { DEFAULT_COST } from '../index' + +test('should be able to require in worker thread', (t) => { + const w = new Worker(join(__dirname, 'worker.js')) + return new Promise((resolve) => { + w.on('message', (msg) => { + t.is(msg, DEFAULT_COST) + resolve() + }) + }) + .then(() => w.terminate()) + .then(() => { + t.pass() + }) +}) diff --git a/examples/napi/__test__/worker.js b/examples/napi/__test__/worker.js new file mode 100644 index 00000000..84239acc --- /dev/null +++ b/examples/napi/__test__/worker.js @@ -0,0 +1,5 @@ +const { parentPort } = require('worker_threads') + +const native = require('../index') + +parentPort.postMessage(native.DEFAULT_COST) diff --git a/examples/napi/package.json b/examples/napi/package.json index b306dc9e..fbdfc263 100644 --- a/examples/napi/package.json +++ b/examples/napi/package.json @@ -14,6 +14,6 @@ }, "dependencies": { "@types/lodash": "^4.14.178", - "lodash": "4.17.21" + "lodash": "^4.17.21" } } diff --git a/examples/napi/src/typed_array.rs b/examples/napi/src/typed_array.rs index 297c597e..68a92500 100644 --- a/examples/napi/src/typed_array.rs +++ b/examples/napi/src/typed_array.rs @@ -8,7 +8,7 @@ fn get_buffer() -> Buffer { #[napi] fn append_buffer(buf: Buffer) -> Buffer { let mut buf = Vec::::from(buf); - buf.push(b'!' as u8); + buf.push(b'!'); buf.into() } diff --git a/yarn.lock b/yarn.lock index 379d760e..9913379d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4392,7 +4392,7 @@ lodash.truncate@^4.4.2: resolved "https://registry.npmjs.org/lodash.truncate/-/lodash.truncate-4.4.2.tgz#5a350da0b1113b837ecfffd5812cbe58d6eae193" integrity sha1-WjUNoLERO4N+z//VgSy+WNbq4ZM= -lodash@4.17.21, lodash@^4.17.15, lodash@^4.17.19, lodash@^4.17.20, lodash@^4.17.21, lodash@^4.17.4, lodash@^4.7.0: +lodash@^4.17.15, lodash@^4.17.19, lodash@^4.17.20, lodash@^4.17.21, lodash@^4.17.4, lodash@^4.7.0: version "4.17.21" resolved "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz#679591c564c3bffaae8454cf0b3df370c3d6911c" integrity sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==