Merge pull request #597 from napi-rs/tokio-tweak

perf(napi): reducer tokio future execution overhead
This commit is contained in:
LongYinan 2021-06-02 23:34:09 +08:00 committed by GitHub
commit 712f7909f3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 169 additions and 181 deletions

View file

@ -8,7 +8,7 @@ version = "0.1.0"
crate-type = ["cdylib"] crate-type = ["cdylib"]
[dependencies] [dependencies]
napi = {path = "../napi", features = ["napi4", "serde-json"]} napi = {path = "../napi", features = ["tokio_rt", "serde-json"]}
napi-derive = {path = "../napi-derive"} napi-derive = {path = "../napi-derive"}
serde = "1" serde = "1"
serde_json = "1" serde_json = "1"

View file

@ -1,6 +1,10 @@
import b from 'benny' import b from 'benny'
const { benchAsyncTask, benchThreadsafeFunction } = require('./index.node') const {
benchAsyncTask,
benchThreadsafeFunction,
benchTokioFuture,
} = require('./index.node')
const buffer = Buffer.from('hello 🚀 rust!') const buffer = Buffer.from('hello 🚀 rust!')
@ -10,7 +14,7 @@ export const benchAsync = () =>
b.add('spawn task', async () => { b.add('spawn task', async () => {
await benchAsyncTask(buffer) await benchAsyncTask(buffer)
}), }),
b.add('thread safe function', async () => { b.add('ThreadSafeFunction', async () => {
await new Promise<number | undefined>((resolve, reject) => { await new Promise<number | undefined>((resolve, reject) => {
benchThreadsafeFunction(buffer, (err?: Error, value?: number) => { benchThreadsafeFunction(buffer, (err?: Error, value?: number) => {
if (err) { if (err) {
@ -21,6 +25,9 @@ export const benchAsync = () =>
}) })
}) })
}), }),
b.add('Tokio future to Promise', async () => {
await benchTokioFuture(buffer)
}),
b.cycle(), b.cycle(),
b.complete(), b.complete(),
) )

View file

@ -10,6 +10,7 @@ import { benchGetArray } from './get-array-from-js'
import { benchGetSetProperty } from './get-set-property' import { benchGetSetProperty } from './get-set-property'
import { benchNoop } from './noop' import { benchNoop } from './noop'
import { benchPlus } from './plus' import { benchPlus } from './plus'
import { benchQuery } from './query'
async function run() { async function run() {
const output = [ const output = [
@ -20,6 +21,7 @@ async function run() {
await benchGetArray(), await benchGetArray(),
await benchGetSetProperty(), await benchGetSetProperty(),
await benchAsync(), await benchAsync(),
await benchQuery(),
] ]
.map(formatSummary) .map(formatSummary)
.join('\n') .join('\n')

19
bench/query.ts Normal file
View file

@ -0,0 +1,19 @@
import b from 'benny'
const { query, engine } = require('./index.node')
const e = engine('model A {}')
export const benchQuery = () =>
b.suite(
'Query',
b.add('napi-rs', async () => {
await Promise.all(Array.from({ length: 100 }).map(() => query(e)))
}),
b.add('neon', async () => {
await query(e)
}),
b.cycle(),
b.complete(),
)

View file

@ -51,8 +51,19 @@ fn bench_threadsafe_function(ctx: CallContext) -> Result<JsUndefined> {
ctx.env.get_undefined() ctx.env.get_undefined()
} }
#[js_function(1)]
fn bench_tokio_future(ctx: CallContext) -> Result<JsObject> {
let buffer_ref = ctx.get::<JsBuffer>(0)?.into_ref()?;
ctx
.env
.execute_tokio_future(async move { Ok(buffer_ref.len()) }, |env, v: usize| {
env.create_uint32(v as u32 + 1)
})
}
pub fn register_js(exports: &mut JsObject) -> Result<()> { pub fn register_js(exports: &mut JsObject) -> Result<()> {
exports.create_named_method("benchAsyncTask", bench_async_task)?; exports.create_named_method("benchAsyncTask", bench_async_task)?;
exports.create_named_method("benchThreadsafeFunction", bench_threadsafe_function)?; exports.create_named_method("benchThreadsafeFunction", bench_threadsafe_function)?;
exports.create_named_method("benchTokioFuture", bench_tokio_future)?;
Ok(()) Ok(())
} }

View file

@ -18,6 +18,7 @@ mod get_set_property;
mod get_value_from_js; mod get_value_from_js;
mod noop; mod noop;
mod plus; mod plus;
mod query;
#[module_exports] #[module_exports]
fn init(mut exports: JsObject, env: Env) -> Result<()> { fn init(mut exports: JsObject, env: Env) -> Result<()> {
@ -29,6 +30,7 @@ fn init(mut exports: JsObject, env: Env) -> Result<()> {
get_set_property::register_js(&mut exports, &env)?; get_set_property::register_js(&mut exports, &env)?;
create_array::register_js(&mut exports)?; create_array::register_js(&mut exports)?;
get_value_from_js::register_js(&mut exports)?; get_value_from_js::register_js(&mut exports)?;
query::register_js(&mut exports)?;
Ok(()) Ok(())
} }

63
bench/src/query.rs Normal file
View file

@ -0,0 +1,63 @@
use napi::{CallContext, JsExternal, JsObject, JsString};
#[derive(Clone)]
pub struct QueryEngine {
datamodel: String,
}
unsafe impl Sync for QueryEngine {}
impl QueryEngine {
pub async fn query(&self) -> String {
let data = serde_json::json!({
"findFirstBooking": {
"id": "ckovh15xa104945sj64rdk8oas",
"name": "1883da9ff9152",
"forename": "221c99bedc6a4",
"description": "8bf86b62ce6a",
"email": "9d57a869661cc",
"phone": "7e0c58d147215",
"arrivalDate": -92229669,
"departureDate": 202138795,
"price": -1592700387,
"advance": -369294193,
"advanceDueDate": 925000428,
"kids": 520124290,
"adults": 1160258464,
"status": "NO_PAYMENT",
"nourishment": "BB",
"createdAt": "2021-05-19T12:58:37.246Z",
"room": { "id": "ckovh15xa104955sj6r2tqaw1c", "name": "38683b87f2664" }
}
});
serde_json::to_string(&data).unwrap()
}
}
#[js_function(1)]
fn new_engine(ctx: CallContext) -> napi::Result<napi::JsExternal> {
let a = ctx.get::<JsString>(0)?.into_utf8()?;
let model = a.into_owned()?;
let model_len = model.len();
let qe = QueryEngine { datamodel: model };
ctx.env.create_external(qe, Some(model_len as i64))
}
#[js_function(1)]
fn query(ctx: CallContext) -> napi::Result<JsObject> {
let ext = ctx.get::<JsExternal>(0)?;
let qe = ctx.env.get_value_external::<QueryEngine>(&ext)?;
let qe = qe.clone();
ctx
.env
.execute_tokio_future(async move { Ok(qe.query().await) }, |env, v| {
env.create_string_from_std(v)
})
}
pub fn register_js(exports: &mut JsObject) -> napi::Result<()> {
exports.create_named_method("engine", new_engine)?;
exports.create_named_method("query", query)?;
Ok(())
}

View file

@ -21,7 +21,7 @@ napi6 = ["napi5", "napi-sys/napi6"]
napi7 = ["napi6", "napi-sys/napi7"] napi7 = ["napi6", "napi-sys/napi7"]
napi8 = ["napi7", "napi-sys/napi8"] napi8 = ["napi7", "napi-sys/napi8"]
serde-json = ["serde", "serde_json"] serde-json = ["serde", "serde_json"]
tokio_rt = ["futures", "tokio", "once_cell", "napi4"] tokio_rt = ["tokio", "once_cell", "napi4"]
[dependencies] [dependencies]
napi-sys = {version = "1", path = "../sys"} napi-sys = {version = "1", path = "../sys"}
@ -33,10 +33,6 @@ winapi = {version = "0.3.9", features = ["winuser", "minwindef", "ntdef", "liblo
optional = true optional = true
version = "0.8" version = "0.8"
[dependencies.futures]
optional = true
version = "0.3"
[dependencies.tokio] [dependencies.tokio]
features = ["rt", "rt-multi-thread", "sync"] features = ["rt", "rt-multi-thread", "sync"]
optional = true optional = true

View file

@ -5,6 +5,11 @@ use std::mem;
use std::os::raw::{c_char, c_void}; use std::os::raw::{c_char, c_void};
use std::ptr; use std::ptr;
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
use once_cell::sync::Lazy;
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
use tokio::{runtime::Handle, sync::mpsc};
use crate::{ use crate::{
async_work::{self, AsyncWorkPromise}, async_work::{self, AsyncWorkPromise},
check_status, check_status,
@ -24,19 +29,44 @@ use crate::js_values::{De, Ser};
use crate::promise; use crate::promise;
#[cfg(feature = "napi4")] #[cfg(feature = "napi4")]
use crate::threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction}; use crate::threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction};
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
use crate::tokio_rt::{get_tokio_sender, Message};
#[cfg(all(feature = "serde-json"))] #[cfg(all(feature = "serde-json"))]
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
#[cfg(all(feature = "serde-json"))] #[cfg(all(feature = "serde-json"))]
use serde::Serialize; use serde::Serialize;
#[cfg(all(feature = "tokio_rt", feature = "napi4"))] #[cfg(all(feature = "tokio_rt", feature = "napi4"))]
use std::future::Future; use std::future::Future;
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
use tokio::sync::mpsc::error::TrySendError;
pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value; pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value;
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
static RT: Lazy<(Handle, mpsc::Sender<()>)> = Lazy::new(|| {
let rt = tokio::runtime::Runtime::new();
let (tx, mut rx) = mpsc::channel::<()>(1);
rt.map(|rt| {
let h = rt.handle();
let handle = h.clone();
handle.spawn(async move {
if rx.recv().await.is_some() {
rt.shutdown_background();
}
});
(handle, tx)
})
.expect("Create tokio runtime failed")
});
#[doc(hidden)]
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
#[inline(never)]
pub fn shutdown_tokio_rt() {
let sender = &RT.1;
sender
.clone()
.try_send(())
.expect("Shutdown tokio runtime failed");
}
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
/// `Env` is used to represent a context that the underlying N-API implementation can use to persist VM-specific state. /// `Env` is used to represent a context that the underlying N-API implementation can use to persist VM-specific state.
/// ///
@ -1093,6 +1123,8 @@ impl Env {
fut: F, fut: F,
resolver: R, resolver: R,
) -> Result<JsObject> { ) -> Result<JsObject> {
let handle = &RT.0;
let mut raw_promise = ptr::null_mut(); let mut raw_promise = ptr::null_mut();
let mut raw_deferred = ptr::null_mut(); let mut raw_deferred = ptr::null_mut();
check_status!(unsafe { check_status!(unsafe {
@ -1103,19 +1135,7 @@ impl Env {
let future_promise = let future_promise =
promise::FuturePromise::create(raw_env, raw_deferred, Box::from(resolver))?; promise::FuturePromise::create(raw_env, raw_deferred, Box::from(resolver))?;
let future_to_resolve = promise::resolve_from_future(future_promise.start()?, fut); let future_to_resolve = promise::resolve_from_future(future_promise.start()?, fut);
let sender = get_tokio_sender().clone(); handle.spawn(future_to_resolve);
sender
.try_send(Message::Task(Box::pin(future_to_resolve)))
.map_err(|e| match e {
TrySendError::Full(_) => Error::new(
Status::QueueFull,
"Failed to run future: no available capacity".to_owned(),
),
TrySendError::Closed(_) => Error::new(
Status::Closing,
"Failed to run future: receiver closed".to_string(),
),
})?;
Ok(unsafe { JsObject::from_raw_unchecked(self.0, raw_promise) }) Ok(unsafe { JsObject::from_raw_unchecked(self.0, raw_promise) })
} }

View file

@ -95,8 +95,7 @@ mod task;
pub use cleanup_env::CleanupEnvHook; pub use cleanup_env::CleanupEnvHook;
#[cfg(feature = "napi4")] #[cfg(feature = "napi4")]
pub mod threadsafe_function; pub mod threadsafe_function;
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
mod tokio_rt;
mod version; mod version;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod win_delay_load_hook; mod win_delay_load_hook;
@ -113,9 +112,6 @@ pub use status::Status;
pub use task::Task; pub use task::Task;
pub use version::NodeVersion; pub use version::NodeVersion;
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]
pub use tokio_rt::shutdown as shutdown_tokio_rt;
#[cfg(feature = "serde-json")] #[cfg(feature = "serde-json")]
#[macro_use] #[macro_use]
extern crate serde; extern crate serde;

View file

@ -1,8 +1,7 @@
use std::future::Future;
use std::os::raw::{c_char, c_void}; use std::os::raw::{c_char, c_void};
use std::ptr; use std::ptr;
use futures::prelude::*;
use crate::{check_status, sys, Env, JsError, NapiRaw, Result}; use crate::{check_status, sys, Env, JsError, NapiRaw, Result};
pub struct FuturePromise<T, V: NapiRaw> { pub struct FuturePromise<T, V: NapiRaw> {

View file

@ -1,54 +0,0 @@
use std::env::var;
use std::ffi::c_void;
use std::pin::Pin;
use std::thread::spawn;
use std::time::Duration;
use futures::future::Future;
use once_cell::sync::OnceCell;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use crate::Error;
pub(crate) enum Message {
Task(Pin<Box<dyn Future<Output = ()> + Send>>),
Shutdown,
}
static SENDER: OnceCell<mpsc::Sender<Message>> = OnceCell::new();
#[inline]
pub(crate) fn get_tokio_sender() -> &'static mpsc::Sender<Message> {
SENDER.get_or_init(|| {
let buffer_size = var("NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE")
.map_err(|_| ())
.and_then(|s| s.parse().map_err(|_| ()))
.unwrap_or(100);
let (sender, mut receiver) = mpsc::channel(buffer_size);
spawn(move || {
let rt = Runtime::new().expect("Failed to create tokio runtime");
rt.block_on(async {
loop {
match receiver.recv().await {
Some(Message::Task(fut)) => fut.await,
Some(Message::Shutdown) => break,
None => {}
}
}
});
rt.shutdown_timeout(Duration::from_secs(5));
});
sender
})
}
#[doc(hidden)]
pub unsafe extern "C" fn shutdown(_data: *mut c_void) {
let sender = get_tokio_sender().clone();
sender
.try_send(Message::Shutdown)
.map_err(|e| Error::from_reason(format!("Shutdown tokio runtime failed: {}", e)))
.unwrap()
}

View file

@ -1,28 +0,0 @@
import { join } from 'path'
import test from 'ava'
import { napiVersion } from '../napi-version'
const filepath = join(__dirname, './example.txt')
process.env.NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE = '1'
const bindings = require('../../index.node')
test('should be able adjust queue size via process.env', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testExecuteTokioReadfile, undefined)
return
}
try {
await Promise.all(
Array.from({ length: 50 }).map((_) =>
bindings.testExecuteTokioReadfile(filepath),
),
)
throw new TypeError('Unreachable')
} catch (e) {
t.snapshot({ code: e.code, message: e.message })
}
})

View file

@ -1,14 +0,0 @@
# Snapshot report for `test_module/__test__/napi4/tokio_rt-isolate.spec.ts`
The actual snapshot is saved in `tokio_rt-isolate.spec.ts.snap`.
Generated by [AVA](https://avajs.dev).
## should be able adjust queue size via process.env
> Snapshot 1
{
code: 'QueueFull',
message: 'Failed to run future: no available capacity',
}

View file

@ -47,20 +47,3 @@ test.serial('should be able to execute future paralleled', async (t) => {
t.deepEqual(readFileSync(filepath), fileContent) t.deepEqual(readFileSync(filepath), fileContent)
} }
}) })
test.serial('should reject if task queue is full', async (t) => {
if (napiVersion < 4) {
t.is(bindings.testExecuteTokioReadfile, undefined)
return
}
try {
await Promise.all(
Array.from({ length: 1000 * 1000 }).map((_) =>
bindings.testExecuteTokioReadfile(filepath),
),
)
throw new TypeError('Unreachable')
} catch (e) {
t.snapshot({ code: e.code, message: e.message })
}
})

View file

@ -1,14 +0,0 @@
# Snapshot report for `test_module/__test__/napi4/tokio_rt.spec.ts`
The actual snapshot is saved in `tokio_rt.spec.ts.snap`.
Generated by [AVA](https://avajs.dev).
## should reject if task queue is full
> Snapshot 1
{
code: 'QueueFull',
message: 'Failed to run future: no available capacity',
}

View file

@ -4,23 +4,11 @@ The actual snapshot is saved in `ser.spec.ts.snap`.
Generated by [AVA](https://avajs.dev). Generated by [AVA](https://avajs.dev).
## serialize make_buff from bindings ## serialize make_num_77 from bindings
> Snapshot 1 > Snapshot 1
Buffer @Uint8Array [ 77
fffefd
]
## serialize make_map from bindings
> Snapshot 1
{
a: 1,
b: 2,
c: 3,
}
## serialize make_num_32 from bindings ## serialize make_num_32 from bindings
@ -28,11 +16,11 @@ Generated by [AVA](https://avajs.dev).
32 32
## serialize make_num_77 from bindings ## serialize make_str_hello from bindings
> Snapshot 1 > Snapshot 1
77 'Hello World'
## serialize make_num_array from bindings ## serialize make_num_array from bindings
@ -51,6 +39,14 @@ Generated by [AVA](https://avajs.dev).
9, 9,
] ]
## serialize make_buff from bindings
> Snapshot 1
Buffer @Uint8Array [
fffefd
]
## serialize make_obj from bindings ## serialize make_obj from bindings
> Snapshot 1 > Snapshot 1
@ -66,6 +62,16 @@ Generated by [AVA](https://avajs.dev).
c: 'Hi', c: 'Hi',
} }
## serialize make_map from bindings
> Snapshot 1
{
a: 1,
b: 2,
c: 3,
}
## serialize make_object from bindings ## serialize make_object from bindings
> Snapshot 1 > Snapshot 1
@ -122,9 +128,3 @@ Generated by [AVA](https://avajs.dev).
q: 9998881288248882845242411222333n, q: 9998881288248882845242411222333n,
r: -340282363588614574563373375108745990111n, r: -340282363588614574563373375108745990111n,
} }
## serialize make_str_hello from bindings
> Snapshot 1
'Hello World'