Merge pull request #960 from napi-rs/fix/undefined-module-in-worker

Fix undefined module in worker_threads
This commit is contained in:
LongYinan 2021-12-22 00:07:18 +08:00 committed by GitHub
commit de27470552
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 234 additions and 128 deletions

View file

@ -25,10 +25,11 @@ napi6 = ["napi5", "napi-sys/napi6"]
napi7 = ["napi6", "napi-sys/napi7"] napi7 = ["napi6", "napi-sys/napi7"]
napi8 = ["napi7", "napi-sys/napi8"] napi8 = ["napi7", "napi-sys/napi8"]
serde-json = ["serde", "serde_json"] serde-json = ["serde", "serde_json"]
tokio_rt = ["tokio", "once_cell", "napi4"] tokio_rt = ["tokio", "napi4"]
[dependencies] [dependencies]
ctor = "0.1" ctor = "0.1"
lazy_static = "1"
napi-sys = {version = "2.1.0", path = "../sys"} napi-sys = {version = "2.1.0", path = "../sys"}
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
@ -43,10 +44,6 @@ features = ["rt", "rt-multi-thread", "sync"]
optional = true optional = true
version = "1" version = "1"
[dependencies.once_cell]
optional = true
version = "1"
[dependencies.serde] [dependencies.serde]
optional = true optional = true
version = "1" version = "1"

View file

@ -2,6 +2,9 @@ use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::ffi::CStr; use std::ffi::CStr;
use std::ptr; use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use lazy_static::lazy_static;
use crate::{check_status, check_status_or_throw, sys, JsError, Property, Result}; use crate::{check_status, check_status_or_throw, sys, JsError, Property, Result};
@ -9,22 +12,92 @@ pub type ExportRegisterCallback = unsafe fn(sys::napi_env) -> Result<sys::napi_v
pub type ModuleExportsCallback = pub type ModuleExportsCallback =
unsafe fn(env: sys::napi_env, exports: sys::napi_value) -> Result<()>; unsafe fn(env: sys::napi_env, exports: sys::napi_value) -> Result<()>;
type ModuleRegisterCallback = struct PersistedSingleThreadVec<T> {
RefCell<Vec<(Option<&'static str>, (&'static str, ExportRegisterCallback))>>; inner: AtomicPtr<T>,
length: AtomicUsize,
}
type ModuleClassProperty = impl<T> Default for PersistedSingleThreadVec<T> {
RefCell<HashMap<&'static str, HashMap<Option<&'static str>, (&'static str, Vec<Property>)>>>; fn default() -> Self {
let mut vec: Vec<T> = Vec::with_capacity(1);
let ret = PersistedSingleThreadVec {
inner: AtomicPtr::new(vec.as_mut_ptr()),
length: AtomicUsize::new(0),
};
std::mem::forget(vec);
ret
}
}
impl<T> PersistedSingleThreadVec<T> {
#[allow(clippy::mut_from_ref)]
fn borrow_mut(&self) -> &mut [T] {
let length = self.length.load(Ordering::Relaxed);
if length == 0 {
return &mut [];
}
let inner = self.inner.load(Ordering::Relaxed);
unsafe { std::slice::from_raw_parts_mut(inner, length) }
}
fn push(&self, item: T) {
let length = self.length.load(Ordering::Relaxed);
let inner = self.inner.load(Ordering::Relaxed);
let mut temp = unsafe { Vec::from_raw_parts(inner, length, length) };
temp.push(item);
// Inner Vec has been reallocated, so we need to update the pointer
if temp.as_mut_ptr() != inner {
self.inner.store(temp.as_mut_ptr(), Ordering::Relaxed);
}
std::mem::forget(temp);
self.length.fetch_add(1, Ordering::Relaxed);
}
}
unsafe impl<T> Send for PersistedSingleThreadVec<T> {}
unsafe impl<T> Sync for PersistedSingleThreadVec<T> {}
struct PersistedSingleThreadHashMap<K, V>(*mut HashMap<K, V>);
impl<K, V> PersistedSingleThreadHashMap<K, V> {
#[allow(clippy::mut_from_ref)]
fn borrow_mut(&self) -> &mut HashMap<K, V> {
unsafe { Box::leak(Box::from_raw(self.0)) }
}
}
impl<K, V> Default for PersistedSingleThreadHashMap<K, V> {
fn default() -> Self {
let map = Default::default();
PersistedSingleThreadHashMap(Box::into_raw(Box::new(map)))
}
}
type ModuleRegisterCallback =
PersistedSingleThreadVec<(Option<&'static str>, (&'static str, ExportRegisterCallback))>;
type ModuleClassProperty = PersistedSingleThreadHashMap<
&'static str,
HashMap<Option<&'static str>, (&'static str, Vec<Property>)>,
>;
unsafe impl<K, V> Send for PersistedSingleThreadHashMap<K, V> {}
unsafe impl<K, V> Sync for PersistedSingleThreadHashMap<K, V> {}
lazy_static! {
static ref MODULE_REGISTER_CALLBACK: ModuleRegisterCallback = Default::default();
static ref MODULE_CLASS_PROPERTIES: ModuleClassProperty = Default::default();
// compatibility for #[module_exports]
#[cfg(feature = "compat-mode")]
static ref MODULE_EXPORTS: PersistedSingleThreadVec<ModuleExportsCallback> = Default::default();
}
thread_local! { thread_local! {
static MODULE_REGISTER_CALLBACK: ModuleRegisterCallback = Default::default();
static MODULE_CLASS_PROPERTIES: ModuleClassProperty = Default::default();
static REGISTERED_CLASSES: RefCell<HashMap< static REGISTERED_CLASSES: RefCell<HashMap<
/* export name */ &'static str, /* export name */ &'static str,
/* constructor */ sys::napi_ref, /* constructor */ sys::napi_ref,
>> = Default::default(); >> = Default::default();
// compatibility for #[module_exports]
#[cfg(feature = "compat-mode")]
static MODULE_EXPORTS: std::cell::Cell<Vec<ModuleExportsCallback>> = Default::default();
} }
pub fn get_class_constructor(js_name: &'static str) -> Option<sys::napi_ref> { pub fn get_class_constructor(js_name: &'static str) -> Option<sys::napi_ref> {
@ -37,7 +110,7 @@ pub fn get_class_constructor(js_name: &'static str) -> Option<sys::napi_ref> {
#[cfg(feature = "compat-mode")] #[cfg(feature = "compat-mode")]
// compatibility for #[module_exports] // compatibility for #[module_exports]
pub fn register_module_exports(callback: ModuleExportsCallback) { pub fn register_module_exports(callback: ModuleExportsCallback) {
MODULE_EXPORTS.with(|cell| cell.set(vec![callback])); MODULE_EXPORTS.push(callback);
} }
pub fn register_module_export( pub fn register_module_export(
@ -45,10 +118,7 @@ pub fn register_module_export(
name: &'static str, name: &'static str,
cb: ExportRegisterCallback, cb: ExportRegisterCallback,
) { ) {
MODULE_REGISTER_CALLBACK.with(|exports| { MODULE_REGISTER_CALLBACK.push((js_mod, (name, cb)));
let mut list = exports.borrow_mut();
list.push((js_mod, (name, cb)));
});
} }
pub fn register_class( pub fn register_class(
@ -57,14 +127,12 @@ pub fn register_class(
js_name: &'static str, js_name: &'static str,
props: Vec<Property>, props: Vec<Property>,
) { ) {
MODULE_CLASS_PROPERTIES.with(|map| { let map = MODULE_CLASS_PROPERTIES.borrow_mut();
let mut map = map.borrow_mut();
let val = map.entry(rust_name).or_default(); let val = map.entry(rust_name).or_default();
let val = val.entry(js_mod).or_default(); let val = val.entry(js_mod).or_default();
val.0 = js_name; val.0 = js_name;
val.1.extend(props.into_iter()); val.1.extend(props.into_iter());
});
} }
#[no_mangle] #[no_mangle]
@ -73,9 +141,8 @@ unsafe extern "C" fn napi_register_module_v1(
exports: sys::napi_value, exports: sys::napi_value,
) -> sys::napi_value { ) -> sys::napi_value {
let mut exports_objects: HashMap<Option<&'static str>, sys::napi_value> = HashMap::default(); let mut exports_objects: HashMap<Option<&'static str>, sys::napi_value> = HashMap::default();
MODULE_REGISTER_CALLBACK.with(|to_register_exports| { MODULE_REGISTER_CALLBACK
to_register_exports .borrow_mut()
.take()
.iter_mut() .iter_mut()
.fold( .fold(
HashMap::<Option<&'static str>, Vec<(&'static str, ExportRegisterCallback)>>::new(), HashMap::<Option<&'static str>, Vec<(&'static str, ExportRegisterCallback)>>::new(),
@ -138,11 +205,12 @@ unsafe extern "C" fn napi_register_module_v1(
} }
} }
} }
})
}); });
MODULE_CLASS_PROPERTIES.with(|to_register_classes| { MODULE_CLASS_PROPERTIES
for (rust_name, js_mods) in to_register_classes.take().iter() { .borrow_mut()
.iter()
.for_each(|(rust_name, js_mods)| {
for (js_mod, (js_name, props)) in js_mods { for (js_mod, (js_name, props)) in js_mods {
let mut exports_js_mod = ptr::null_mut(); let mut exports_js_mod = ptr::null_mut();
unsafe { unsafe {
@ -225,24 +293,23 @@ unsafe extern "C" fn napi_register_module_v1(
); );
} }
} }
}
}); });
#[cfg(feature = "compat-mode")] #[cfg(feature = "compat-mode")]
MODULE_EXPORTS.with(|callbacks| { MODULE_EXPORTS.borrow_mut().iter().for_each(|callback| {
for callback in callbacks.take().into_iter() {
if let Err(e) = callback(env, exports) { if let Err(e) = callback(env, exports) {
JsError::from(e).throw_into(env); JsError::from(e).throw_into(env);
} }
}
}); });
#[cfg(all(feature = "tokio_rt", feature = "napi4"))] #[cfg(all(feature = "tokio_rt", feature = "napi4"))]
if let Err(e) = check_status!( {
let _ = crate::tokio_runtime::RT.clone();
crate::tokio_runtime::TOKIO_RT_REF_COUNT.fetch_add(1, Ordering::Relaxed);
assert_eq!(
sys::napi_add_env_cleanup_hook(env, Some(crate::shutdown_tokio_rt), ptr::null_mut()), sys::napi_add_env_cleanup_hook(env, Some(crate::shutdown_tokio_rt), ptr::null_mut()),
"Failed to initialize module", sys::Status::napi_ok
) { );
JsError::from(e).throw_into(env);
} }
exports exports

View file

@ -1,10 +1,18 @@
use std::{ffi::c_void, future::Future, ptr}; use std::ffi::c_void;
use std::future::Future;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
use lazy_static::lazy_static;
use tokio::{
runtime::Handle,
sync::mpsc::{self, error::TrySendError},
};
use crate::{check_status, promise, sys, Result}; use crate::{check_status, promise, sys, Result};
use once_cell::sync::Lazy;
use tokio::{runtime::Handle, sync::mpsc};
static RT: Lazy<(Handle, mpsc::Sender<()>)> = Lazy::new(|| { lazy_static! {
pub(crate) static ref RT: (Handle, mpsc::Sender<()>) = {
let runtime = tokio::runtime::Runtime::new(); let runtime = tokio::runtime::Runtime::new();
let (sender, mut receiver) = mpsc::channel::<()>(1); let (sender, mut receiver) = mpsc::channel::<()>(1);
runtime runtime
@ -20,16 +28,25 @@ static RT: Lazy<(Handle, mpsc::Sender<()>)> = Lazy::new(|| {
(handle, sender) (handle, sender)
}) })
.expect("Create tokio runtime failed") .expect("Create tokio runtime failed")
}); };
}
pub(crate) static TOKIO_RT_REF_COUNT: AtomicUsize = AtomicUsize::new(0);
#[doc(hidden)] #[doc(hidden)]
#[inline(never)] #[inline(never)]
pub extern "C" fn shutdown_tokio_rt(_arg: *mut c_void) { pub extern "C" fn shutdown_tokio_rt(_arg: *mut c_void) {
if TOKIO_RT_REF_COUNT.fetch_sub(1, Ordering::Relaxed) == 0 {
let sender = &RT.1; let sender = &RT.1;
sender if let Err(e) = sender.clone().try_send(()) {
.clone() match e {
.try_send(()) TrySendError::Closed(_) => {}
.expect("Shutdown tokio runtime failed"); TrySendError::Full(_) => {
panic!("Send shutdown signal to tokio runtime failed, queue is full");
}
}
}
}
} }
pub fn spawn<F>(fut: F) pub fn spawn<F>(fut: F)

View file

@ -0,0 +1,20 @@
import { join } from 'path'
import { Worker } from 'worker_threads'
import test from 'ava'
import { DEFAULT_COST } from '../index'
test('should be able to require in worker thread', (t) => {
const w = new Worker(join(__dirname, 'worker.js'))
return new Promise<void>((resolve) => {
w.on('message', (msg) => {
t.is(msg, DEFAULT_COST)
resolve()
})
})
.then(() => w.terminate())
.then(() => {
t.pass()
})
})

View file

@ -0,0 +1,5 @@
const { parentPort } = require('worker_threads')
const native = require('../index')
parentPort.postMessage(native.DEFAULT_COST)

View file

@ -14,6 +14,6 @@
}, },
"dependencies": { "dependencies": {
"@types/lodash": "^4.14.178", "@types/lodash": "^4.14.178",
"lodash": "4.17.21" "lodash": "^4.17.21"
} }
} }

View file

@ -8,7 +8,7 @@ fn get_buffer() -> Buffer {
#[napi] #[napi]
fn append_buffer(buf: Buffer) -> Buffer { fn append_buffer(buf: Buffer) -> Buffer {
let mut buf = Vec::<u8>::from(buf); let mut buf = Vec::<u8>::from(buf);
buf.push(b'!' as u8); buf.push(b'!');
buf.into() buf.into()
} }

View file

@ -4392,7 +4392,7 @@ lodash.truncate@^4.4.2:
resolved "https://registry.npmjs.org/lodash.truncate/-/lodash.truncate-4.4.2.tgz#5a350da0b1113b837ecfffd5812cbe58d6eae193" resolved "https://registry.npmjs.org/lodash.truncate/-/lodash.truncate-4.4.2.tgz#5a350da0b1113b837ecfffd5812cbe58d6eae193"
integrity sha1-WjUNoLERO4N+z//VgSy+WNbq4ZM= integrity sha1-WjUNoLERO4N+z//VgSy+WNbq4ZM=
lodash@4.17.21, lodash@^4.17.15, lodash@^4.17.19, lodash@^4.17.20, lodash@^4.17.21, lodash@^4.17.4, lodash@^4.7.0: lodash@^4.17.15, lodash@^4.17.19, lodash@^4.17.20, lodash@^4.17.21, lodash@^4.17.4, lodash@^4.7.0:
version "4.17.21" version "4.17.21"
resolved "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz#679591c564c3bffaae8454cf0b3df370c3d6911c" resolved "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz#679591c564c3bffaae8454cf0b3df370c3d6911c"
integrity sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg== integrity sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==