From f126a0581fa8ed0b2c385be77985b71d3abf1623 Mon Sep 17 00:00:00 2001 From: Ouyang Yadong Date: Fri, 19 Jun 2020 20:42:18 +0800 Subject: [PATCH] feat(napi): impl threadsafe function api --- napi/Cargo.toml | 11 +- napi/build.rs | 16 ++ napi/src/lib.rs | 2 + napi/src/threadsafe_function.rs | 233 ++++++++++++++++++ test_module/Cargo.toml | 1 + test_module/__test__/example.txt | 1 + .../__test__/threadsafe_function.spec.js | 22 ++ test_module/__test__/tokio_readfile.spec.js | 21 ++ test_module/__test__/tsfn_error.spec.js | 16 ++ test_module/src/lib.rs | 122 ++++++++- 10 files changed, 442 insertions(+), 3 deletions(-) create mode 100644 napi/src/threadsafe_function.rs create mode 100644 test_module/__test__/example.txt create mode 100644 test_module/__test__/threadsafe_function.spec.js create mode 100644 test_module/__test__/tokio_readfile.spec.js create mode 100644 test_module/__test__/tsfn_error.spec.js diff --git a/napi/Cargo.toml b/napi/Cargo.toml index 951f9862..c7a10721 100644 --- a/napi/Cargo.toml +++ b/napi/Cargo.toml @@ -19,4 +19,13 @@ bindgen = "0.54" glob = "0.3" napi-build = { version = "0.1", path = "../build" } regex = "1.3" -semver = "0.10" \ No newline at end of file +semver = "0.10" + +[features] +default = [] +# See the N-API Version Matrix https://nodejs.org/api/n-api.html#n_api_n_api_version_matrix +napi2 = [] +napi3 = [] +napi4 = [] +napi5 = [] +napi6 = [] diff --git a/napi/build.rs b/napi/build.rs index f80d29ad..179683d4 100644 --- a/napi/build.rs +++ b/napi/build.rs @@ -57,6 +57,22 @@ fn main() { .expect("Unable to generate napi bindings") .write_to_file(out_path.join("bindings.rs")) .expect("Unable to write napi bindings"); + + let napi_version = String::from_utf8( + Command::new("node") + .args(&[ + "-e", + "console.log(process.versions.napi)", + ]) + .output() + .unwrap() + .stdout, + ) + .unwrap(); + + for version in 2..napi_version.trim().parse::().unwrap() { + println!("cargo:rustc-cfg=napi{}", version); + } } #[cfg(target_os = "windows")] diff --git a/napi/src/lib.rs b/napi/src/lib.rs index 113a2dfa..1b9ed7c0 100644 --- a/napi/src/lib.rs +++ b/napi/src/lib.rs @@ -16,6 +16,8 @@ mod async_work; mod call_context; pub mod sys; mod task; +#[cfg(napi4)] +pub mod threadsafe_function; mod version; pub use call_context::CallContext; diff --git a/napi/src/threadsafe_function.rs b/napi/src/threadsafe_function.rs new file mode 100644 index 00000000..2d14829b --- /dev/null +++ b/napi/src/threadsafe_function.rs @@ -0,0 +1,233 @@ +use crate::{check_status, ptr, sys, Env, Function, Result, Value}; +use std::os::raw::{c_char, c_void}; + +use sys::napi_threadsafe_function_call_mode; +use sys::napi_threadsafe_function_release_mode; + +pub trait ToJs: Copy + Clone { + type Output; + type JsValue; + + fn resolve( + &self, + env: &mut Env, + output: Self::Output, + ) -> Result<(u64, Value)>; +} + +/// Communicate with the addon's main thread by invoking a JavaScript function from other threads. +/// +/// ## Example +/// An example of using `ThreadsafeFunction`: +/// +/// ``` +/// #[macro_use] +/// extern crate napi_rs_derive; +/// +/// use std::thread; +/// use napi_rs::{ +/// Number, Result, Value, Env, CallContext, Undefined, Function, +/// sys::{ +/// napi_threadsafe_function_call_mode::{ +/// napi_tsfn_blocking, +/// }, +/// napi_threadsafe_function_release_mode::{ +/// napi_tsfn_release, +/// } +/// } +/// }; +/// use napi_rs::threadsafe_function::{ +/// ToJs, ThreadsafeFunction, +/// }; +/// +/// // Define a struct for handling the data passed from `ThreadsafeFunction::call` +/// // and return the data to be used for the js callback. +/// #[derive(Clone, Copy)] +/// struct HandleNumber; +/// +/// impl ToJs for HandleNumber { +/// type Output = u8; +/// type JsValue = Number; +/// +/// fn resolve(&self, env: &mut Env, output: Self::Output) -> Result<(u64, Value)> { +/// let argv: u64 = 1; +/// let value = env.create_uint32(output as u32)?; +/// Ok((argv, value)) +/// } +/// } +/// +/// #[js_function(1)] +/// fn test_threadsafe_function(ctx: CallContext) -> Result> { +/// // The callback function from js which will be called in `ThreadsafeFunction::call`. +/// let func: Value = ctx.get::(0)?; +/// +/// let to_js = HandleNumber; +/// let tsfn = ThreadsafeFunction::create(ctx.env, func, to_js, 0)?; +/// +/// thread::spawn(move || { +/// let output: u8 = 42; +/// // It's okay to call a threadsafe function multiple times. +/// tsfn.call(Ok(output), napi_tsfn_blocking).unwrap(); +/// tsfn.call(Ok(output), napi_tsfn_blocking).unwrap(); +/// // We should call `ThreadsafeFunction::release` manually when we don't +/// // need the instance anymore, or it will prevent Node.js from exiting +/// // automatically and possiblely cause memory leaks. +/// tsfn.release(napi_tsfn_release).unwrap(); +/// }); +/// +/// Ok(Env::get_undefined(ctx.env)?) +/// } +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct ThreadsafeFunction { + raw_value: sys::napi_threadsafe_function, + to_js: T, +} + +unsafe impl Send for ThreadsafeFunction {} +unsafe impl Sync for ThreadsafeFunction {} + +impl ThreadsafeFunction { + /// See [napi_create_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_create_threadsafe_function) + /// for more information. + pub fn create(env: &Env, func: Value, to_js: T, max_queue_size: u64) -> Result { + let mut async_resource_name = ptr::null_mut(); + let s = "napi_rs_threadsafe_function"; + let status = unsafe { + sys::napi_create_string_utf8( + env.0, + s.as_ptr() as *const c_char, + s.len() as u64, + &mut async_resource_name, + ) + }; + check_status(status)?; + + let initial_thread_count: u64 = 1; + let mut result = ptr::null_mut(); + let tsfn = ThreadsafeFunction { + to_js, + raw_value: result, + }; + + let ptr = Box::into_raw(Box::from(tsfn)) as *mut _ as *mut c_void; + + let status = unsafe { + sys::napi_create_threadsafe_function( + env.0, + func.raw_value, + ptr::null_mut(), + async_resource_name, + max_queue_size, + initial_thread_count, + ptr, + Some(thread_finalize_cb::), + ptr, + Some(call_js_cb::), + &mut result, + ) + }; + check_status(status)?; + + Ok(ThreadsafeFunction { + to_js, + raw_value: result, + }) + } + + /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function) + /// for more information. + pub fn call( + &self, + value: Result, + mode: napi_threadsafe_function_call_mode, + ) -> Result<()> { + check_status(unsafe { + sys::napi_call_threadsafe_function( + self.raw_value, + Box::into_raw(Box::from(value)) as *mut _ as *mut c_void, + mode, + ) + }) + } + + /// See [napi_acquire_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_acquire_threadsafe_function) + /// for more information. + pub fn acquire(&self) -> Result<()> { + check_status(unsafe { sys::napi_acquire_threadsafe_function(self.raw_value) }) + } + + /// See [napi_release_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_release_threadsafe_function) + /// for more information. + pub fn release(&self, mode: napi_threadsafe_function_release_mode) -> Result<()> { + check_status(unsafe { sys::napi_release_threadsafe_function(self.raw_value, mode) }) + } + + /// See [napi_ref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_ref_threadsafe_function) + /// for more information. + /// + /// "ref" is a keyword so that we use "refer" here. + pub fn refer(&self, env: &Env) -> Result<()> { + check_status(unsafe { sys::napi_ref_threadsafe_function(env.0, self.raw_value) }) + } + + /// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function) + /// for more information. + pub fn unref(&self, env: &Env) -> Result<()> { + check_status(unsafe { sys::napi_unref_threadsafe_function(env.0, self.raw_value) }) + } +} + +unsafe extern "C" fn thread_finalize_cb( + _raw_env: sys::napi_env, + finalize_data: *mut c_void, + _finalize_hint: *mut c_void, +) { + // cleanup + Box::from_raw(finalize_data as *mut ThreadsafeFunction); +} + +unsafe extern "C" fn call_js_cb( + raw_env: sys::napi_env, + js_callback: sys::napi_value, + context: *mut c_void, + data: *mut c_void, +) { + let mut env = Env::from_raw(raw_env); + let mut recv = ptr::null_mut(); + sys::napi_get_undefined(raw_env, &mut recv); + + let tsfn = Box::leak(Box::from_raw(context as *mut ThreadsafeFunction)); + let val = Box::from_raw(data as *mut Result); + + let ret = val.and_then(|v| tsfn.to_js.resolve(&mut env, v)); + + let status; + + // Follow the convention of Node.js async callback. + if ret.is_ok() { + let (argv, js_value) = ret.unwrap(); + let js_null = env.get_null().unwrap(); + let values = [js_null.raw_value, js_value.raw_value]; + status = sys::napi_call_function( + raw_env, + recv, + js_callback, + argv + 1, + values.as_ptr(), + ptr::null_mut(), + ); + } else { + let mut err = env.create_error(ret.err().unwrap()).unwrap(); + status = sys::napi_call_function( + raw_env, + recv, + js_callback, + 1, + &mut err.raw_value, + ptr::null_mut(), + ); + } + + debug_assert!(status == sys::napi_status::napi_ok, "CallJsCB failed"); +} diff --git a/test_module/Cargo.toml b/test_module/Cargo.toml index 81a7e16f..d0dfd8c1 100644 --- a/test_module/Cargo.toml +++ b/test_module/Cargo.toml @@ -10,6 +10,7 @@ crate-type = ["cdylib"] [dependencies] napi-rs = { path = "../napi" } napi-rs-derive = { path = "../napi-derive" } +tokio = { version = "0.2", features = ["default", "fs"]} [build-dependencies] napi-build = { path = "../build" } diff --git a/test_module/__test__/example.txt b/test_module/__test__/example.txt new file mode 100644 index 00000000..cd087558 --- /dev/null +++ b/test_module/__test__/example.txt @@ -0,0 +1 @@ +Hello world! diff --git a/test_module/__test__/threadsafe_function.spec.js b/test_module/__test__/threadsafe_function.spec.js new file mode 100644 index 00000000..627b2952 --- /dev/null +++ b/test_module/__test__/threadsafe_function.spec.js @@ -0,0 +1,22 @@ +const test = require('ava') +const bindings = require('../index.node') + +test('should get js function called from a thread', async (t) => { + let called = 0 + + return new Promise((resolve, reject) => { + bindings.testThreadsafeFunction((err, ret) => { + called += 1 + try { + t.is(err, null) + t.is(ret, 42) + } catch (err) { + reject(err) + } + + if (called === 2) { + resolve() + } + }) + }) +}) diff --git a/test_module/__test__/tokio_readfile.spec.js b/test_module/__test__/tokio_readfile.spec.js new file mode 100644 index 00000000..695caa03 --- /dev/null +++ b/test_module/__test__/tokio_readfile.spec.js @@ -0,0 +1,21 @@ +const test = require('ava') +const fs = require('fs') +const path = require('path') +const bindings = require('../index.node') + +const filepath = path.resolve(__dirname, './example.txt') + +test('should read a file and return its a buffer', async (t) => { + return new Promise((resolve, reject) => { + bindings.testTokioReadfile(filepath, (err, value) => { + try { + t.is(err, null) + t.is(Buffer.isBuffer(value), true) + t.is(value.toString(), fs.readFileSync(filepath, 'utf8')) + resolve() + } catch (err) { + reject(err) + } + }) + }) +}) diff --git a/test_module/__test__/tsfn_error.spec.js b/test_module/__test__/tsfn_error.spec.js new file mode 100644 index 00000000..d692a920 --- /dev/null +++ b/test_module/__test__/tsfn_error.spec.js @@ -0,0 +1,16 @@ +const test = require('ava') +const bindings = require('../index.node') + +test('should call callback with the first arguments as an Error', async (t) => { + return new Promise((resolve, reject) => { + bindings.testTsfnError((err) => { + try { + t.is(err instanceof Error, true) + t.is(err.message, 'invalid') + resolve() + } catch (err) { + reject(err) + } + }) + }) +}) diff --git a/test_module/src/lib.rs b/test_module/src/lib.rs index 8b690273..17304501 100644 --- a/test_module/src/lib.rs +++ b/test_module/src/lib.rs @@ -5,8 +5,25 @@ extern crate napi_rs_derive; use napi::{ Any, Boolean, CallContext, Env, Error, JsString, Number, Object, Result, Status, Task, Value, + Undefined, Function, Buffer, + threadsafe_function::{ + ToJs, + ThreadsafeFunction, + } +}; +use napi::sys::{ + napi_threadsafe_function_call_mode:: { + napi_tsfn_blocking, + }, + napi_threadsafe_function_release_mode:: { + napi_tsfn_release, + } }; use std::convert::TryInto; +use std::thread; +use std::path::Path; +use std::ops::Deref; +use tokio; register_module!(test_module, init); @@ -24,16 +41,26 @@ fn init(env: &Env, exports: &mut Value) -> Result<()> { "testObjectIsDate", env.create_function("testObjectIsDate", test_object_is_date)?, )?; - exports.set_named_property( "createExternal", env.create_function("createExternal", create_external)?, )?; - exports.set_named_property( "getExternalCount", env.create_function("getExternalCount", get_external_count)?, )?; + exports.set_named_property( + "testTsfnError", + env.create_function("testTsfnError", test_tsfn_error)?, + )?; + exports.set_named_property( + "testThreadsafeFunction", + env.create_function("testThreadsafeFunction", test_threadsafe_function)? + )?; + exports.set_named_property( + "testTokioReadfile", + env.create_function("testTokioReadfile", test_tokio_readfile)? + )?; Ok(()) } @@ -112,3 +139,94 @@ fn get_external_count(ctx: CallContext) -> Result> { let native_object = ctx.env.get_value_external::(&attached_obj)?; ctx.env.create_int32(native_object.count) } + +#[derive(Clone, Copy)] +struct HandleNumber; + +impl ToJs for HandleNumber { + type Output = u8; + type JsValue = Number; + + fn resolve(&self, env: &mut Env, output: Self::Output) -> Result<(u64, Value)> { + let argv: u64 = 1; + + let value = env.create_uint32(output as u32)?; + + Ok((argv, value)) + } +} + +#[js_function(1)] +fn test_threadsafe_function(ctx: CallContext) -> Result> { + let func: Value = ctx.get::(0)?; + + let to_js = HandleNumber; + let tsfn = ThreadsafeFunction::create(ctx.env, func, to_js, 0)?; + + thread::spawn(move || { + let output: u8 = 42; + // It's okay to call a threadsafe function multiple times. + tsfn.call(Ok(output), napi_tsfn_blocking).unwrap(); + tsfn.call(Ok(output), napi_tsfn_blocking).unwrap(); + tsfn.release(napi_tsfn_release).unwrap(); + }); + + Ok(Env::get_undefined(ctx.env)?) +} + +#[js_function(1)] +fn test_tsfn_error(ctx: CallContext) -> Result> { + let func = ctx.get::(0)?; + let to_js = HandleNumber; + let tsfn = ThreadsafeFunction::create(ctx.env, func, to_js, 0)?; + + thread::spawn(move || { + tsfn.call(Err(Error { + status: napi::sys::Status::Unknown, + reason: Some(String::from("invalid")), + }), napi_tsfn_blocking).unwrap(); + tsfn.release(napi_tsfn_release).unwrap(); + }); + + Ok(Env::get_undefined(ctx.env)?) +} + +#[derive(Copy, Clone)] +struct HandleBuffer; + +impl ToJs for HandleBuffer { + type Output = Vec; + type JsValue = Buffer; + + fn resolve(&self, env: &mut Env, output: Self::Output) -> Result<(u64, Value)> { + let value = env.create_buffer_with_data(output.to_vec())?; + Ok((1u64, value)) + } +} + +async fn read_file_content(filepath: &Path) -> Result> { + tokio::fs::read(filepath).await.map_err(|_| Error { + status: Status::Unknown, + reason: Some(String::from("failed to read file")), + }) +} + +#[js_function(2)] +fn test_tokio_readfile(ctx: CallContext) -> Result> { + let js_filepath: Value = ctx.get::(0)?; + let js_func: Value = ctx.get::(1)?; + let path_str = String::from(js_filepath.as_str()?); + + let to_js = HandleBuffer; + let tsfn = ThreadsafeFunction::create(ctx.env, js_func, to_js, 0)?; + let mut rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async move { + let mut filepath = Path::new(path_str.deref()); + let ret = read_file_content(&mut filepath).await; + let _ = tsfn.call(ret, napi_tsfn_blocking); + tsfn.release(napi_tsfn_release).unwrap(); + }); + + Ok(Env::get_undefined(ctx.env)?) +}