diff --git a/bench/Cargo.toml b/bench/Cargo.toml index d77b69ae..8f6f289c 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -8,7 +8,7 @@ version = "0.1.0" crate-type = ["cdylib"] [dependencies] -napi = {path = "../napi", features = ["napi4", "serde-json"]} +napi = {path = "../napi", features = ["tokio_rt", "serde-json"]} napi-derive = {path = "../napi-derive"} serde = "1" serde_json = "1" diff --git a/bench/async.ts b/bench/async.ts index e41ec044..464ca6c1 100644 --- a/bench/async.ts +++ b/bench/async.ts @@ -1,6 +1,10 @@ import b from 'benny' -const { benchAsyncTask, benchThreadsafeFunction } = require('./index.node') +const { + benchAsyncTask, + benchThreadsafeFunction, + benchTokioFuture, +} = require('./index.node') const buffer = Buffer.from('hello 🚀 rust!') @@ -10,7 +14,7 @@ export const benchAsync = () => b.add('spawn task', async () => { await benchAsyncTask(buffer) }), - b.add('thread safe function', async () => { + b.add('ThreadSafeFunction', async () => { await new Promise((resolve, reject) => { benchThreadsafeFunction(buffer, (err?: Error, value?: number) => { if (err) { @@ -21,6 +25,9 @@ export const benchAsync = () => }) }) }), + b.add('Tokio future to Promise', async () => { + await benchTokioFuture(buffer) + }), b.cycle(), b.complete(), ) diff --git a/bench/bench.ts b/bench/bench.ts index 717f3181..e1a54e94 100644 --- a/bench/bench.ts +++ b/bench/bench.ts @@ -10,6 +10,7 @@ import { benchGetArray } from './get-array-from-js' import { benchGetSetProperty } from './get-set-property' import { benchNoop } from './noop' import { benchPlus } from './plus' +import { benchQuery } from './query' async function run() { const output = [ @@ -20,6 +21,7 @@ async function run() { await benchGetArray(), await benchGetSetProperty(), await benchAsync(), + await benchQuery(), ] .map(formatSummary) .join('\n') diff --git a/bench/query.ts b/bench/query.ts new file mode 100644 index 00000000..48f1307c --- /dev/null +++ b/bench/query.ts @@ -0,0 +1,19 @@ +import b from 'benny' + +const { query, engine } = require('./index.node') + +const e = engine('model A {}') + +export const benchQuery = () => + b.suite( + 'Query', + b.add('napi-rs', async () => { + await Promise.all(Array.from({ length: 100 }).map(() => query(e))) + }), + b.add('neon', async () => { + await query(e) + }), + + b.cycle(), + b.complete(), + ) diff --git a/bench/src/async_compute.rs b/bench/src/async_compute.rs index 41d8c82f..d2630a76 100644 --- a/bench/src/async_compute.rs +++ b/bench/src/async_compute.rs @@ -51,8 +51,19 @@ fn bench_threadsafe_function(ctx: CallContext) -> Result { ctx.env.get_undefined() } +#[js_function(1)] +fn bench_tokio_future(ctx: CallContext) -> Result { + let buffer_ref = ctx.get::(0)?.into_ref()?; + ctx + .env + .execute_tokio_future(async move { Ok(buffer_ref.len()) }, |env, v: usize| { + env.create_uint32(v as u32 + 1) + }) +} + pub fn register_js(exports: &mut JsObject) -> Result<()> { exports.create_named_method("benchAsyncTask", bench_async_task)?; exports.create_named_method("benchThreadsafeFunction", bench_threadsafe_function)?; + exports.create_named_method("benchTokioFuture", bench_tokio_future)?; Ok(()) } diff --git a/bench/src/lib.rs b/bench/src/lib.rs index 501b0640..dbf146ec 100644 --- a/bench/src/lib.rs +++ b/bench/src/lib.rs @@ -18,6 +18,7 @@ mod get_set_property; mod get_value_from_js; mod noop; mod plus; +mod query; #[module_exports] fn init(mut exports: JsObject, env: Env) -> Result<()> { @@ -29,6 +30,7 @@ fn init(mut exports: JsObject, env: Env) -> Result<()> { get_set_property::register_js(&mut exports, &env)?; create_array::register_js(&mut exports)?; get_value_from_js::register_js(&mut exports)?; + query::register_js(&mut exports)?; Ok(()) } diff --git a/bench/src/query.rs b/bench/src/query.rs new file mode 100644 index 00000000..0ccf056b --- /dev/null +++ b/bench/src/query.rs @@ -0,0 +1,63 @@ +use napi::{CallContext, JsExternal, JsObject, JsString}; + +#[derive(Clone)] +pub struct QueryEngine { + datamodel: String, +} + +unsafe impl Sync for QueryEngine {} + +impl QueryEngine { + pub async fn query(&self) -> String { + let data = serde_json::json!({ + "findFirstBooking": { + "id": "ckovh15xa104945sj64rdk8oas", + "name": "1883da9ff9152", + "forename": "221c99bedc6a4", + "description": "8bf86b62ce6a", + "email": "9d57a869661cc", + "phone": "7e0c58d147215", + "arrivalDate": -92229669, + "departureDate": 202138795, + "price": -1592700387, + "advance": -369294193, + "advanceDueDate": 925000428, + "kids": 520124290, + "adults": 1160258464, + "status": "NO_PAYMENT", + "nourishment": "BB", + "createdAt": "2021-05-19T12:58:37.246Z", + "room": { "id": "ckovh15xa104955sj6r2tqaw1c", "name": "38683b87f2664" } + } + }); + + serde_json::to_string(&data).unwrap() + } +} + +#[js_function(1)] +fn new_engine(ctx: CallContext) -> napi::Result { + let a = ctx.get::(0)?.into_utf8()?; + let model = a.into_owned()?; + let model_len = model.len(); + let qe = QueryEngine { datamodel: model }; + ctx.env.create_external(qe, Some(model_len as i64)) +} + +#[js_function(1)] +fn query(ctx: CallContext) -> napi::Result { + let ext = ctx.get::(0)?; + let qe = ctx.env.get_value_external::(&ext)?; + let qe = qe.clone(); + ctx + .env + .execute_tokio_future(async move { Ok(qe.query().await) }, |env, v| { + env.create_string_from_std(v) + }) +} + +pub fn register_js(exports: &mut JsObject) -> napi::Result<()> { + exports.create_named_method("engine", new_engine)?; + exports.create_named_method("query", query)?; + Ok(()) +} diff --git a/napi/Cargo.toml b/napi/Cargo.toml index 745d0077..d5ed6bdb 100644 --- a/napi/Cargo.toml +++ b/napi/Cargo.toml @@ -21,7 +21,7 @@ napi6 = ["napi5", "napi-sys/napi6"] napi7 = ["napi6", "napi-sys/napi7"] napi8 = ["napi7", "napi-sys/napi8"] serde-json = ["serde", "serde_json"] -tokio_rt = ["futures", "tokio", "once_cell", "napi4"] +tokio_rt = ["tokio", "once_cell", "napi4"] [dependencies] napi-sys = {version = "1", path = "../sys"} @@ -33,10 +33,6 @@ winapi = {version = "0.3.9", features = ["winuser", "minwindef", "ntdef", "liblo optional = true version = "0.8" -[dependencies.futures] -optional = true -version = "0.3" - [dependencies.tokio] features = ["rt", "rt-multi-thread", "sync"] optional = true diff --git a/napi/src/env.rs b/napi/src/env.rs index 319ea073..d2c16fdc 100644 --- a/napi/src/env.rs +++ b/napi/src/env.rs @@ -5,6 +5,11 @@ use std::mem; use std::os::raw::{c_char, c_void}; use std::ptr; +#[cfg(all(feature = "tokio_rt", feature = "napi4"))] +use once_cell::sync::Lazy; +#[cfg(all(feature = "tokio_rt", feature = "napi4"))] +use tokio::{runtime::Handle, sync::mpsc}; + use crate::{ async_work::{self, AsyncWorkPromise}, check_status, @@ -24,19 +29,44 @@ use crate::js_values::{De, Ser}; use crate::promise; #[cfg(feature = "napi4")] use crate::threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction}; -#[cfg(all(feature = "tokio_rt", feature = "napi4"))] -use crate::tokio_rt::{get_tokio_sender, Message}; #[cfg(all(feature = "serde-json"))] use serde::de::DeserializeOwned; #[cfg(all(feature = "serde-json"))] use serde::Serialize; #[cfg(all(feature = "tokio_rt", feature = "napi4"))] use std::future::Future; -#[cfg(all(feature = "tokio_rt", feature = "napi4"))] -use tokio::sync::mpsc::error::TrySendError; pub type Callback = extern "C" fn(sys::napi_env, sys::napi_callback_info) -> sys::napi_value; +#[cfg(all(feature = "tokio_rt", feature = "napi4"))] +static RT: Lazy<(Handle, mpsc::Sender<()>)> = Lazy::new(|| { + let rt = tokio::runtime::Runtime::new(); + let (tx, mut rx) = mpsc::channel::<()>(1); + rt.map(|rt| { + let h = rt.handle(); + let handle = h.clone(); + handle.spawn(async move { + if rx.recv().await.is_some() { + rt.shutdown_background(); + } + }); + + (handle, tx) + }) + .expect("Create tokio runtime failed") +}); + +#[doc(hidden)] +#[cfg(all(feature = "tokio_rt", feature = "napi4"))] +#[inline(never)] +pub fn shutdown_tokio_rt() { + let sender = &RT.1; + sender + .clone() + .try_send(()) + .expect("Shutdown tokio runtime failed"); +} + #[derive(Clone, Copy)] /// `Env` is used to represent a context that the underlying N-API implementation can use to persist VM-specific state. /// @@ -1093,6 +1123,8 @@ impl Env { fut: F, resolver: R, ) -> Result { + let handle = &RT.0; + let mut raw_promise = ptr::null_mut(); let mut raw_deferred = ptr::null_mut(); check_status!(unsafe { @@ -1103,19 +1135,7 @@ impl Env { let future_promise = promise::FuturePromise::create(raw_env, raw_deferred, Box::from(resolver))?; let future_to_resolve = promise::resolve_from_future(future_promise.start()?, fut); - let sender = get_tokio_sender().clone(); - sender - .try_send(Message::Task(Box::pin(future_to_resolve))) - .map_err(|e| match e { - TrySendError::Full(_) => Error::new( - Status::QueueFull, - "Failed to run future: no available capacity".to_owned(), - ), - TrySendError::Closed(_) => Error::new( - Status::Closing, - "Failed to run future: receiver closed".to_string(), - ), - })?; + handle.spawn(future_to_resolve); Ok(unsafe { JsObject::from_raw_unchecked(self.0, raw_promise) }) } diff --git a/napi/src/lib.rs b/napi/src/lib.rs index 654859f5..0e8dc012 100644 --- a/napi/src/lib.rs +++ b/napi/src/lib.rs @@ -95,8 +95,7 @@ mod task; pub use cleanup_env::CleanupEnvHook; #[cfg(feature = "napi4")] pub mod threadsafe_function; -#[cfg(all(feature = "tokio_rt", feature = "napi4"))] -mod tokio_rt; + mod version; #[cfg(target_os = "windows")] mod win_delay_load_hook; @@ -113,9 +112,6 @@ pub use status::Status; pub use task::Task; pub use version::NodeVersion; -#[cfg(all(feature = "tokio_rt", feature = "napi4"))] -pub use tokio_rt::shutdown as shutdown_tokio_rt; - #[cfg(feature = "serde-json")] #[macro_use] extern crate serde; diff --git a/napi/src/promise.rs b/napi/src/promise.rs index e7f71439..45aea7e5 100644 --- a/napi/src/promise.rs +++ b/napi/src/promise.rs @@ -1,8 +1,7 @@ +use std::future::Future; use std::os::raw::{c_char, c_void}; use std::ptr; -use futures::prelude::*; - use crate::{check_status, sys, Env, JsError, NapiRaw, Result}; pub struct FuturePromise { diff --git a/napi/src/tokio_rt.rs b/napi/src/tokio_rt.rs deleted file mode 100644 index 38bdc583..00000000 --- a/napi/src/tokio_rt.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::env::var; -use std::ffi::c_void; -use std::pin::Pin; -use std::thread::spawn; -use std::time::Duration; - -use futures::future::Future; -use once_cell::sync::OnceCell; -use tokio::runtime::Runtime; -use tokio::sync::mpsc; - -use crate::Error; - -pub(crate) enum Message { - Task(Pin + Send>>), - Shutdown, -} - -static SENDER: OnceCell> = OnceCell::new(); - -#[inline] -pub(crate) fn get_tokio_sender() -> &'static mpsc::Sender { - SENDER.get_or_init(|| { - let buffer_size = var("NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE") - .map_err(|_| ()) - .and_then(|s| s.parse().map_err(|_| ())) - .unwrap_or(100); - let (sender, mut receiver) = mpsc::channel(buffer_size); - spawn(move || { - let rt = Runtime::new().expect("Failed to create tokio runtime"); - rt.block_on(async { - loop { - match receiver.recv().await { - Some(Message::Task(fut)) => fut.await, - Some(Message::Shutdown) => break, - None => {} - } - } - }); - rt.shutdown_timeout(Duration::from_secs(5)); - }); - - sender - }) -} - -#[doc(hidden)] -pub unsafe extern "C" fn shutdown(_data: *mut c_void) { - let sender = get_tokio_sender().clone(); - sender - .try_send(Message::Shutdown) - .map_err(|e| Error::from_reason(format!("Shutdown tokio runtime failed: {}", e))) - .unwrap() -} diff --git a/test_module/__test__/napi4/tokio_rt-isolate.spec.ts b/test_module/__test__/napi4/tokio_rt-isolate.spec.ts deleted file mode 100644 index 7dc63c2e..00000000 --- a/test_module/__test__/napi4/tokio_rt-isolate.spec.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { join } from 'path' - -import test from 'ava' - -import { napiVersion } from '../napi-version' - -const filepath = join(__dirname, './example.txt') - -process.env.NAPI_RS_TOKIO_CHANNEL_BUFFER_SIZE = '1' - -const bindings = require('../../index.node') - -test('should be able adjust queue size via process.env', async (t) => { - if (napiVersion < 4) { - t.is(bindings.testExecuteTokioReadfile, undefined) - return - } - try { - await Promise.all( - Array.from({ length: 50 }).map((_) => - bindings.testExecuteTokioReadfile(filepath), - ), - ) - throw new TypeError('Unreachable') - } catch (e) { - t.snapshot({ code: e.code, message: e.message }) - } -}) diff --git a/test_module/__test__/napi4/tokio_rt-isolate.spec.ts.md b/test_module/__test__/napi4/tokio_rt-isolate.spec.ts.md deleted file mode 100644 index 3c6eb0f0..00000000 --- a/test_module/__test__/napi4/tokio_rt-isolate.spec.ts.md +++ /dev/null @@ -1,14 +0,0 @@ -# Snapshot report for `test_module/__test__/napi4/tokio_rt-isolate.spec.ts` - -The actual snapshot is saved in `tokio_rt-isolate.spec.ts.snap`. - -Generated by [AVA](https://avajs.dev). - -## should be able adjust queue size via process.env - -> Snapshot 1 - - { - code: 'QueueFull', - message: 'Failed to run future: no available capacity', - } diff --git a/test_module/__test__/napi4/tokio_rt-isolate.spec.ts.snap b/test_module/__test__/napi4/tokio_rt-isolate.spec.ts.snap deleted file mode 100644 index 78ee14ca..00000000 Binary files a/test_module/__test__/napi4/tokio_rt-isolate.spec.ts.snap and /dev/null differ diff --git a/test_module/__test__/napi4/tokio_rt.spec.ts b/test_module/__test__/napi4/tokio_rt.spec.ts index 5b740097..751a942f 100644 --- a/test_module/__test__/napi4/tokio_rt.spec.ts +++ b/test_module/__test__/napi4/tokio_rt.spec.ts @@ -47,20 +47,3 @@ test.serial('should be able to execute future paralleled', async (t) => { t.deepEqual(readFileSync(filepath), fileContent) } }) - -test.serial('should reject if task queue is full', async (t) => { - if (napiVersion < 4) { - t.is(bindings.testExecuteTokioReadfile, undefined) - return - } - try { - await Promise.all( - Array.from({ length: 1000 * 1000 }).map((_) => - bindings.testExecuteTokioReadfile(filepath), - ), - ) - throw new TypeError('Unreachable') - } catch (e) { - t.snapshot({ code: e.code, message: e.message }) - } -}) diff --git a/test_module/__test__/napi4/tokio_rt.spec.ts.md b/test_module/__test__/napi4/tokio_rt.spec.ts.md deleted file mode 100644 index a16cfa66..00000000 --- a/test_module/__test__/napi4/tokio_rt.spec.ts.md +++ /dev/null @@ -1,14 +0,0 @@ -# Snapshot report for `test_module/__test__/napi4/tokio_rt.spec.ts` - -The actual snapshot is saved in `tokio_rt.spec.ts.snap`. - -Generated by [AVA](https://avajs.dev). - -## should reject if task queue is full - -> Snapshot 1 - - { - code: 'QueueFull', - message: 'Failed to run future: no available capacity', - } diff --git a/test_module/__test__/napi4/tokio_rt.spec.ts.snap b/test_module/__test__/napi4/tokio_rt.spec.ts.snap deleted file mode 100644 index 6409d3a2..00000000 Binary files a/test_module/__test__/napi4/tokio_rt.spec.ts.snap and /dev/null differ diff --git a/test_module/__test__/serde/ser.spec.ts.md b/test_module/__test__/serde/ser.spec.ts.md index 00620da3..4c389086 100644 --- a/test_module/__test__/serde/ser.spec.ts.md +++ b/test_module/__test__/serde/ser.spec.ts.md @@ -4,23 +4,11 @@ The actual snapshot is saved in `ser.spec.ts.snap`. Generated by [AVA](https://avajs.dev). -## serialize make_buff from bindings +## serialize make_num_77 from bindings > Snapshot 1 - Buffer @Uint8Array [ - fffefd - ] - -## serialize make_map from bindings - -> Snapshot 1 - - { - a: 1, - b: 2, - c: 3, - } + 77 ## serialize make_num_32 from bindings @@ -28,11 +16,11 @@ Generated by [AVA](https://avajs.dev). 32 -## serialize make_num_77 from bindings +## serialize make_str_hello from bindings > Snapshot 1 - 77 + 'Hello World' ## serialize make_num_array from bindings @@ -51,6 +39,14 @@ Generated by [AVA](https://avajs.dev). 9, ] +## serialize make_buff from bindings + +> Snapshot 1 + + Buffer @Uint8Array [ + fffefd + ] + ## serialize make_obj from bindings > Snapshot 1 @@ -66,6 +62,16 @@ Generated by [AVA](https://avajs.dev). c: 'Hi', } +## serialize make_map from bindings + +> Snapshot 1 + + { + a: 1, + b: 2, + c: 3, + } + ## serialize make_object from bindings > Snapshot 1 @@ -122,9 +128,3 @@ Generated by [AVA](https://avajs.dev). q: 9998881288248882845242411222333n, r: -340282363588614574563373375108745990111n, } - -## serialize make_str_hello from bindings - -> Snapshot 1 - - 'Hello World' diff --git a/test_module/__test__/serde/ser.spec.ts.snap b/test_module/__test__/serde/ser.spec.ts.snap index da74db6c..4bba6d53 100644 Binary files a/test_module/__test__/serde/ser.spec.ts.snap and b/test_module/__test__/serde/ser.spec.ts.snap differ