Merge pull request #77 from oyyd/oyyd-impl-tsfn

feat(napi): impl threadsafe function api
This commit is contained in:
LongYinan 2020-06-28 23:00:03 +08:00 committed by GitHub
commit a16582629c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 442 additions and 3 deletions

View file

@ -19,4 +19,13 @@ bindgen = "0.54"
glob = "0.3" glob = "0.3"
napi-build = { version = "0.1", path = "../build" } napi-build = { version = "0.1", path = "../build" }
regex = "1.3" regex = "1.3"
semver = "0.10" semver = "0.10"
[features]
default = []
# See the N-API Version Matrix https://nodejs.org/api/n-api.html#n_api_n_api_version_matrix
napi2 = []
napi3 = []
napi4 = []
napi5 = []
napi6 = []

View file

@ -57,6 +57,22 @@ fn main() {
.expect("Unable to generate napi bindings") .expect("Unable to generate napi bindings")
.write_to_file(out_path.join("bindings.rs")) .write_to_file(out_path.join("bindings.rs"))
.expect("Unable to write napi bindings"); .expect("Unable to write napi bindings");
let napi_version = String::from_utf8(
Command::new("node")
.args(&[
"-e",
"console.log(process.versions.napi)",
])
.output()
.unwrap()
.stdout,
)
.unwrap();
for version in 2..napi_version.trim().parse::<u32>().unwrap() {
println!("cargo:rustc-cfg=napi{}", version);
}
} }
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]

View file

@ -16,6 +16,8 @@ mod async_work;
mod call_context; mod call_context;
pub mod sys; pub mod sys;
mod task; mod task;
#[cfg(napi4)]
pub mod threadsafe_function;
mod version; mod version;
pub use call_context::CallContext; pub use call_context::CallContext;

View file

@ -0,0 +1,233 @@
use crate::{check_status, ptr, sys, Env, Function, Result, Value};
use std::os::raw::{c_char, c_void};
use sys::napi_threadsafe_function_call_mode;
use sys::napi_threadsafe_function_release_mode;
pub trait ToJs: Copy + Clone {
type Output;
type JsValue;
fn resolve(
&self,
env: &mut Env,
output: Self::Output,
) -> Result<(u64, Value<Self::JsValue>)>;
}
/// Communicate with the addon's main thread by invoking a JavaScript function from other threads.
///
/// ## Example
/// An example of using `ThreadsafeFunction`:
///
/// ```
/// #[macro_use]
/// extern crate napi_rs_derive;
///
/// use std::thread;
/// use napi_rs::{
/// Number, Result, Value, Env, CallContext, Undefined, Function,
/// sys::{
/// napi_threadsafe_function_call_mode::{
/// napi_tsfn_blocking,
/// },
/// napi_threadsafe_function_release_mode::{
/// napi_tsfn_release,
/// }
/// }
/// };
/// use napi_rs::threadsafe_function::{
/// ToJs, ThreadsafeFunction,
/// };
///
/// // Define a struct for handling the data passed from `ThreadsafeFunction::call`
/// // and return the data to be used for the js callback.
/// #[derive(Clone, Copy)]
/// struct HandleNumber;
///
/// impl ToJs for HandleNumber {
/// type Output = u8;
/// type JsValue = Number;
///
/// fn resolve(&self, env: &mut Env, output: Self::Output) -> Result<(u64, Value<Self::JsValue>)> {
/// let argv: u64 = 1;
/// let value = env.create_uint32(output as u32)?;
/// Ok((argv, value))
/// }
/// }
///
/// #[js_function(1)]
/// fn test_threadsafe_function(ctx: CallContext) -> Result<Value<Undefined>> {
/// // The callback function from js which will be called in `ThreadsafeFunction::call`.
/// let func: Value<Function> = ctx.get::<Function>(0)?;
///
/// let to_js = HandleNumber;
/// let tsfn = ThreadsafeFunction::create(ctx.env, func, to_js, 0)?;
///
/// thread::spawn(move || {
/// let output: u8 = 42;
/// // It's okay to call a threadsafe function multiple times.
/// tsfn.call(Ok(output), napi_tsfn_blocking).unwrap();
/// tsfn.call(Ok(output), napi_tsfn_blocking).unwrap();
/// // We should call `ThreadsafeFunction::release` manually when we don't
/// // need the instance anymore, or it will prevent Node.js from exiting
/// // automatically and possiblely cause memory leaks.
/// tsfn.release(napi_tsfn_release).unwrap();
/// });
///
/// Ok(Env::get_undefined(ctx.env)?)
/// }
/// ```
#[derive(Debug, Clone, Copy)]
pub struct ThreadsafeFunction<T: ToJs> {
raw_value: sys::napi_threadsafe_function,
to_js: T,
}
unsafe impl<T: ToJs> Send for ThreadsafeFunction<T> {}
unsafe impl<T: ToJs> Sync for ThreadsafeFunction<T> {}
impl<T: ToJs> ThreadsafeFunction<T> {
/// See [napi_create_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_create_threadsafe_function)
/// for more information.
pub fn create(env: &Env, func: Value<Function>, to_js: T, max_queue_size: u64) -> Result<Self> {
let mut async_resource_name = ptr::null_mut();
let s = "napi_rs_threadsafe_function";
let status = unsafe {
sys::napi_create_string_utf8(
env.0,
s.as_ptr() as *const c_char,
s.len() as u64,
&mut async_resource_name,
)
};
check_status(status)?;
let initial_thread_count: u64 = 1;
let mut result = ptr::null_mut();
let tsfn = ThreadsafeFunction {
to_js,
raw_value: result,
};
let ptr = Box::into_raw(Box::from(tsfn)) as *mut _ as *mut c_void;
let status = unsafe {
sys::napi_create_threadsafe_function(
env.0,
func.raw_value,
ptr::null_mut(),
async_resource_name,
max_queue_size,
initial_thread_count,
ptr,
Some(thread_finalize_cb::<T>),
ptr,
Some(call_js_cb::<T>),
&mut result,
)
};
check_status(status)?;
Ok(ThreadsafeFunction {
to_js,
raw_value: result,
})
}
/// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function)
/// for more information.
pub fn call(
&self,
value: Result<T::Output>,
mode: napi_threadsafe_function_call_mode,
) -> Result<()> {
check_status(unsafe {
sys::napi_call_threadsafe_function(
self.raw_value,
Box::into_raw(Box::from(value)) as *mut _ as *mut c_void,
mode,
)
})
}
/// See [napi_acquire_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_acquire_threadsafe_function)
/// for more information.
pub fn acquire(&self) -> Result<()> {
check_status(unsafe { sys::napi_acquire_threadsafe_function(self.raw_value) })
}
/// See [napi_release_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_release_threadsafe_function)
/// for more information.
pub fn release(&self, mode: napi_threadsafe_function_release_mode) -> Result<()> {
check_status(unsafe { sys::napi_release_threadsafe_function(self.raw_value, mode) })
}
/// See [napi_ref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_ref_threadsafe_function)
/// for more information.
///
/// "ref" is a keyword so that we use "refer" here.
pub fn refer(&self, env: &Env) -> Result<()> {
check_status(unsafe { sys::napi_ref_threadsafe_function(env.0, self.raw_value) })
}
/// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function)
/// for more information.
pub fn unref(&self, env: &Env) -> Result<()> {
check_status(unsafe { sys::napi_unref_threadsafe_function(env.0, self.raw_value) })
}
}
unsafe extern "C" fn thread_finalize_cb<T: ToJs>(
_raw_env: sys::napi_env,
finalize_data: *mut c_void,
_finalize_hint: *mut c_void,
) {
// cleanup
Box::from_raw(finalize_data as *mut ThreadsafeFunction<T>);
}
unsafe extern "C" fn call_js_cb<T: ToJs>(
raw_env: sys::napi_env,
js_callback: sys::napi_value,
context: *mut c_void,
data: *mut c_void,
) {
let mut env = Env::from_raw(raw_env);
let mut recv = ptr::null_mut();
sys::napi_get_undefined(raw_env, &mut recv);
let tsfn = Box::leak(Box::from_raw(context as *mut ThreadsafeFunction<T>));
let val = Box::from_raw(data as *mut Result<T::Output>);
let ret = val.and_then(|v| tsfn.to_js.resolve(&mut env, v));
let status;
// Follow the convention of Node.js async callback.
if ret.is_ok() {
let (argv, js_value) = ret.unwrap();
let js_null = env.get_null().unwrap();
let values = [js_null.raw_value, js_value.raw_value];
status = sys::napi_call_function(
raw_env,
recv,
js_callback,
argv + 1,
values.as_ptr(),
ptr::null_mut(),
);
} else {
let mut err = env.create_error(ret.err().unwrap()).unwrap();
status = sys::napi_call_function(
raw_env,
recv,
js_callback,
1,
&mut err.raw_value,
ptr::null_mut(),
);
}
debug_assert!(status == sys::napi_status::napi_ok, "CallJsCB failed");
}

View file

@ -10,6 +10,7 @@ crate-type = ["cdylib"]
[dependencies] [dependencies]
napi-rs = { path = "../napi" } napi-rs = { path = "../napi" }
napi-rs-derive = { path = "../napi-derive" } napi-rs-derive = { path = "../napi-derive" }
tokio = { version = "0.2", features = ["default", "fs"]}
[build-dependencies] [build-dependencies]
napi-build = { path = "../build" } napi-build = { path = "../build" }

View file

@ -0,0 +1 @@
Hello world!

View file

@ -0,0 +1,22 @@
const test = require('ava')
const bindings = require('../index.node')
test('should get js function called from a thread', async (t) => {
let called = 0
return new Promise((resolve, reject) => {
bindings.testThreadsafeFunction((err, ret) => {
called += 1
try {
t.is(err, null)
t.is(ret, 42)
} catch (err) {
reject(err)
}
if (called === 2) {
resolve()
}
})
})
})

View file

@ -0,0 +1,21 @@
const test = require('ava')
const fs = require('fs')
const path = require('path')
const bindings = require('../index.node')
const filepath = path.resolve(__dirname, './example.txt')
test('should read a file and return its a buffer', async (t) => {
return new Promise((resolve, reject) => {
bindings.testTokioReadfile(filepath, (err, value) => {
try {
t.is(err, null)
t.is(Buffer.isBuffer(value), true)
t.is(value.toString(), fs.readFileSync(filepath, 'utf8'))
resolve()
} catch (err) {
reject(err)
}
})
})
})

View file

@ -0,0 +1,16 @@
const test = require('ava')
const bindings = require('../index.node')
test('should call callback with the first arguments as an Error', async (t) => {
return new Promise((resolve, reject) => {
bindings.testTsfnError((err) => {
try {
t.is(err instanceof Error, true)
t.is(err.message, 'invalid')
resolve()
} catch (err) {
reject(err)
}
})
})
})

View file

@ -5,8 +5,25 @@ extern crate napi_rs_derive;
use napi::{ use napi::{
Any, Boolean, CallContext, Env, Error, JsString, Number, Object, Result, Status, Task, Value, Any, Boolean, CallContext, Env, Error, JsString, Number, Object, Result, Status, Task, Value,
Undefined, Function, Buffer,
threadsafe_function::{
ToJs,
ThreadsafeFunction,
}
};
use napi::sys::{
napi_threadsafe_function_call_mode:: {
napi_tsfn_blocking,
},
napi_threadsafe_function_release_mode:: {
napi_tsfn_release,
}
}; };
use std::convert::TryInto; use std::convert::TryInto;
use std::thread;
use std::path::Path;
use std::ops::Deref;
use tokio;
register_module!(test_module, init); register_module!(test_module, init);
@ -24,16 +41,26 @@ fn init(env: &Env, exports: &mut Value<Object>) -> Result<()> {
"testObjectIsDate", "testObjectIsDate",
env.create_function("testObjectIsDate", test_object_is_date)?, env.create_function("testObjectIsDate", test_object_is_date)?,
)?; )?;
exports.set_named_property( exports.set_named_property(
"createExternal", "createExternal",
env.create_function("createExternal", create_external)?, env.create_function("createExternal", create_external)?,
)?; )?;
exports.set_named_property( exports.set_named_property(
"getExternalCount", "getExternalCount",
env.create_function("getExternalCount", get_external_count)?, env.create_function("getExternalCount", get_external_count)?,
)?; )?;
exports.set_named_property(
"testTsfnError",
env.create_function("testTsfnError", test_tsfn_error)?,
)?;
exports.set_named_property(
"testThreadsafeFunction",
env.create_function("testThreadsafeFunction", test_threadsafe_function)?
)?;
exports.set_named_property(
"testTokioReadfile",
env.create_function("testTokioReadfile", test_tokio_readfile)?
)?;
Ok(()) Ok(())
} }
@ -112,3 +139,94 @@ fn get_external_count(ctx: CallContext) -> Result<Value<Number>> {
let native_object = ctx.env.get_value_external::<NativeObject>(&attached_obj)?; let native_object = ctx.env.get_value_external::<NativeObject>(&attached_obj)?;
ctx.env.create_int32(native_object.count) ctx.env.create_int32(native_object.count)
} }
#[derive(Clone, Copy)]
struct HandleNumber;
impl ToJs for HandleNumber {
type Output = u8;
type JsValue = Number;
fn resolve(&self, env: &mut Env, output: Self::Output) -> Result<(u64, Value<Self::JsValue>)> {
let argv: u64 = 1;
let value = env.create_uint32(output as u32)?;
Ok((argv, value))
}
}
#[js_function(1)]
fn test_threadsafe_function(ctx: CallContext) -> Result<Value<Undefined>> {
let func: Value<Function> = ctx.get::<Function>(0)?;
let to_js = HandleNumber;
let tsfn = ThreadsafeFunction::create(ctx.env, func, to_js, 0)?;
thread::spawn(move || {
let output: u8 = 42;
// It's okay to call a threadsafe function multiple times.
tsfn.call(Ok(output), napi_tsfn_blocking).unwrap();
tsfn.call(Ok(output), napi_tsfn_blocking).unwrap();
tsfn.release(napi_tsfn_release).unwrap();
});
Ok(Env::get_undefined(ctx.env)?)
}
#[js_function(1)]
fn test_tsfn_error(ctx: CallContext) -> Result<Value<Undefined>> {
let func = ctx.get::<Function>(0)?;
let to_js = HandleNumber;
let tsfn = ThreadsafeFunction::create(ctx.env, func, to_js, 0)?;
thread::spawn(move || {
tsfn.call(Err(Error {
status: napi::sys::Status::Unknown,
reason: Some(String::from("invalid")),
}), napi_tsfn_blocking).unwrap();
tsfn.release(napi_tsfn_release).unwrap();
});
Ok(Env::get_undefined(ctx.env)?)
}
#[derive(Copy, Clone)]
struct HandleBuffer;
impl ToJs for HandleBuffer {
type Output = Vec<u8>;
type JsValue = Buffer;
fn resolve(&self, env: &mut Env, output: Self::Output) -> Result<(u64, Value<Self::JsValue>)> {
let value = env.create_buffer_with_data(output.to_vec())?;
Ok((1u64, value))
}
}
async fn read_file_content(filepath: &Path) -> Result<Vec<u8>> {
tokio::fs::read(filepath).await.map_err(|_| Error {
status: Status::Unknown,
reason: Some(String::from("failed to read file")),
})
}
#[js_function(2)]
fn test_tokio_readfile(ctx: CallContext) -> Result<Value<Undefined>> {
let js_filepath: Value<JsString> = ctx.get::<JsString>(0)?;
let js_func: Value<Function> = ctx.get::<Function>(1)?;
let path_str = String::from(js_filepath.as_str()?);
let to_js = HandleBuffer;
let tsfn = ThreadsafeFunction::create(ctx.env, js_func, to_js, 0)?;
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let mut filepath = Path::new(path_str.deref());
let ret = read_file_content(&mut filepath).await;
let _ = tsfn.call(ret, napi_tsfn_blocking);
tsfn.release(napi_tsfn_release).unwrap();
});
Ok(Env::get_undefined(ctx.env)?)
}