Merge pull request #621 from napi-rs/optimize-execute-tokio

perf(napi): make FuturePromise static dispatch
This commit is contained in:
LongYinan 2021-06-21 23:50:53 +08:00 committed by GitHub
commit b4158e5e76
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 55 additions and 42 deletions

View file

@ -9,3 +9,6 @@ members = [
"./bench",
"./memory-testing",
]
[profile.release]
lto = true

View file

@ -13,10 +13,7 @@ napi-derive = {path = "../napi-derive"}
serde = "1"
serde_json = "1"
[target.'cfg(all(unix, not(target_env = "musl"), not(target_arch = "aarch64"), not(target_arch = "arm")))'.dependencies]
jemallocator = {version = "0.3", features = ["disable_initial_exec_tls"]}
[target.'cfg(all(windows, not(target_arch = "aarch64")))'.dependencies]
[target.'cfg(all(target_arch = "x86_64", not(target_env = "musl")))'.dependencies]
mimalloc = {version = "0.1"}
[build-dependencies]

View file

@ -1,3 +1,5 @@
import { cpus } from 'os'
import b from 'benny'
const {
@ -8,25 +10,32 @@ const {
const buffer = Buffer.from('hello 🚀 rust!')
const ALL_THREADS = Array.from({ length: cpus().length })
export const benchAsync = () =>
b.suite(
'Async task',
b.add('spawn task', async () => {
await benchAsyncTask(buffer)
await Promise.all(ALL_THREADS.map(() => benchAsyncTask(buffer)))
}),
b.add('ThreadSafeFunction', async () => {
await new Promise<number | undefined>((resolve, reject) => {
benchThreadsafeFunction(buffer, (err?: Error, value?: number) => {
if (err) {
reject(err)
} else {
resolve(value)
}
})
})
await Promise.all(
ALL_THREADS.map(
() =>
new Promise<number | undefined>((resolve, reject) => {
benchThreadsafeFunction(buffer, (err?: Error, value?: number) => {
if (err) {
reject(err)
} else {
resolve(value)
}
})
}),
),
)
}),
b.add('Tokio future to Promise', async () => {
await benchTokioFuture(buffer)
await Promise.all(ALL_THREADS.map(() => benchTokioFuture(buffer)))
}),
b.cycle(),
b.complete(),

View file

@ -3,7 +3,7 @@ import b from 'benny'
const { benchCreateBuffer } = require('./index.node')
function createBuffer() {
const buf = Buffer.alloc(100000)
const buf = Buffer.allocUnsafe(1024)
buf[0] = 1
buf[1] = 2
return buf

View file

@ -7,10 +7,10 @@ const e = engine('model A {}')
export const benchQuery = () =>
b.suite(
'Query',
b.add('napi-rs', async () => {
b.add('query * 100', async () => {
await Promise.all(Array.from({ length: 100 }).map(() => query(e)))
}),
b.add('neon', async () => {
b.add('query * 1', async () => {
await query(e)
}),

View file

@ -2,7 +2,7 @@ use napi::{ContextlessResult, Env, JsBuffer, JsObject, Result};
#[contextless_function]
pub fn bench_create_buffer(env: Env) -> ContextlessResult<JsBuffer> {
let mut output = Vec::with_capacity(100000);
let mut output = Vec::with_capacity(1024);
output.push(1);
output.push(2);
env

View file

@ -3,11 +3,11 @@ extern crate napi_derive;
use napi::{Env, JsObject, Result};
#[cfg(all(unix, not(target_env = "musl"), not(target_arch = "aarch64")))]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[cfg(all(windows, not(target_arch = "aarch64")))]
#[cfg(all(
target_arch = "x86_64",
not(target_env = "musl"),
not(debug_assertions)
))]
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

View file

@ -1132,8 +1132,7 @@ impl Env {
})?;
let raw_env = self.0;
let future_promise =
promise::FuturePromise::create(raw_env, raw_deferred, Box::from(resolver))?;
let future_promise = promise::FuturePromise::create(raw_env, raw_deferred, resolver)?;
let future_to_resolve = promise::resolve_from_future(future_promise.start()?, fut);
handle.spawn(future_to_resolve);
Ok(unsafe { JsObject::from_raw_unchecked(self.0, raw_promise) })
@ -1286,6 +1285,7 @@ impl Env {
/// }
/// ```
#[cfg(feature = "serde-json")]
#[allow(clippy::wrong_self_convention)]
#[inline]
pub fn to_js_value<T>(&self, node: &T) -> Result<JsUnknown>
where

View file

@ -1,26 +1,25 @@
use std::future::Future;
use std::marker::PhantomData;
use std::os::raw::{c_char, c_void};
use std::ptr;
use crate::{check_status, sys, Env, JsError, NapiRaw, Result};
pub struct FuturePromise<T, V: NapiRaw> {
pub struct FuturePromise<T, V: NapiRaw, F: FnOnce(&mut Env, T) -> Result<V>> {
deferred: sys::napi_deferred,
env: sys::napi_env,
tsfn: sys::napi_threadsafe_function,
async_resource_name: sys::napi_value,
resolver: Box<dyn FnOnce(&mut Env, T) -> Result<V>>,
resolver: F,
_data: PhantomData<T>,
_value: PhantomData<V>,
}
unsafe impl<T, V: NapiRaw> Send for FuturePromise<T, V> {}
unsafe impl<T, V: NapiRaw, F: FnOnce(&mut Env, T) -> Result<V>> Send for FuturePromise<T, V, F> {}
impl<T, V: NapiRaw> FuturePromise<T, V> {
impl<T, V: NapiRaw, F: FnOnce(&mut Env, T) -> Result<V>> FuturePromise<T, V, F> {
#[inline]
pub fn create(
env: sys::napi_env,
raw_deferred: sys::napi_deferred,
resolver: Box<dyn FnOnce(&mut Env, T) -> Result<V>>,
) -> Result<Self> {
pub fn create(env: sys::napi_env, raw_deferred: sys::napi_deferred, resolver: F) -> Result<Self> {
let mut async_resource_name = ptr::null_mut();
let s = "napi_resolve_promise_from_future";
check_status!(unsafe {
@ -38,6 +37,8 @@ impl<T, V: NapiRaw> FuturePromise<T, V> {
env,
tsfn: ptr::null_mut(),
async_resource_name,
_data: PhantomData,
_value: PhantomData,
})
}
@ -57,8 +58,8 @@ impl<T, V: NapiRaw> FuturePromise<T, V> {
1,
ptr::null_mut(),
None,
self_ref as *mut FuturePromise<T, V> as *mut c_void,
Some(call_js_cb::<T, V>),
self_ref as *mut FuturePromise<T, V, F> as *mut c_void,
Some(call_js_cb::<T, V, F>),
&mut tsfn_value,
)
})?;
@ -94,14 +95,14 @@ pub(crate) async fn resolve_from_future<T: Send, F: Future<Output = Result<T>>>(
.expect("Failed to release thread safe function");
}
unsafe extern "C" fn call_js_cb<T, V: NapiRaw>(
unsafe extern "C" fn call_js_cb<T, V: NapiRaw, F: FnOnce(&mut Env, T) -> Result<V>>(
raw_env: sys::napi_env,
_js_callback: sys::napi_value,
context: *mut c_void,
data: *mut c_void,
) {
let mut env = Env::from_raw(raw_env);
let future_promise = Box::from_raw(context as *mut FuturePromise<T, V>);
let future_promise = Box::from_raw(context as *mut FuturePromise<T, V, F>);
let value = Box::from_raw(data as *mut Result<T>);
let resolver = future_promise.resolver;
let deferred = future_promise.deferred;

View file

@ -201,7 +201,7 @@ impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
let initial_thread_count = 1usize;
let mut raw_tsfn = ptr::null_mut();
let ptr = Box::into_raw(Box::new(callback)) as *mut _;
let ptr = Box::into_raw(Box::new(callback)) as *mut c_void;
check_status!(unsafe {
sys::napi_create_threadsafe_function(
env,
@ -389,7 +389,7 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: NapiRaw, R, ES>(
);
}
Err(e) if ES::VALUE == ErrorStrategy::Fatal::VALUE => {
panic!("{:?}", e);
panic!("{}", e);
}
Err(e) => {
status = sys::napi_call_function(
@ -402,6 +402,9 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: NapiRaw, R, ES>(
);
}
}
if status == sys::Status::napi_ok {
return;
}
if status == sys::Status::napi_pending_exception {
let mut error_result = ptr::null_mut();
assert_eq!(
@ -412,7 +415,7 @@ unsafe extern "C" fn call_js_cb<T: 'static, V: NapiRaw, R, ES>(
sys::napi_fatal_exception(raw_env, error_result),
sys::Status::napi_ok
);
} else if status != sys::Status::napi_ok {
} else {
let error_code: Status = status.into();
let error_code_string = format!("{:?}", error_code);
let mut error_code_value = ptr::null_mut();