Merge pull request #848 from napi-rs/tsfn

feat(napi): create ThreadsafeFunction from JsFunction
This commit is contained in:
LongYinan 2021-11-12 17:36:12 +08:00 committed by GitHub
commit 1013052de3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 107 additions and 13 deletions

View file

@ -245,5 +245,5 @@ yarn test
| (NOT YET) | global | 1 | v8.0.0 |
| JsSymbol | Symbol | 1 | v8.0.0 |
| (NOT YET) | ArrayBuffer/TypedArray | 1 | v8.0.0 |
| (NOT YET) | threadsafe function | 4 | v10.6.0 | napi4 |
| JsFunction | threadsafe function | 4 | v10.6.0 | napi4 |
| BigInt | BigInt | 6 | v10.7.0 | napi6 |

View file

@ -74,7 +74,7 @@ static KNOWN_TYPES: Lazy<HashMap<&'static str, &'static str>> = Lazy::new(|| {
("symbol", "symbol"),
("external", "object"),
("AbortSignal", "AbortSignal"),
("Function", "(...args: any[]) => any"),
("JsFunction", "(...args: any[]) => any"),
]);
map

View file

@ -8,6 +8,7 @@ mod bigint;
mod boolean;
mod buffer;
mod either;
mod function;
mod map;
mod nil;
mod number;
@ -22,6 +23,8 @@ pub use array::*;
pub use bigint::*;
pub use buffer::*;
pub use either::*;
#[cfg(feature = "napi4")]
pub use function::*;
pub use nil::*;
pub use object::*;
pub use string::*;

View file

@ -0,0 +1 @@
pub use crate::JsFunction;

View file

@ -1017,7 +1017,7 @@ impl Env {
max_queue_size: usize,
callback: R,
) -> Result<ThreadsafeFunction<T>> {
ThreadsafeFunction::create(self.0, func, max_queue_size, callback)
ThreadsafeFunction::create(self.0, func.0.value, max_queue_size, callback)
}
#[cfg(all(feature = "tokio_rt", feature = "napi4"))]

View file

@ -2,6 +2,8 @@ use std::ptr;
use super::Value;
use crate::bindgen_runtime::TypeName;
#[cfg(feature = "napi4")]
use crate::threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction};
use crate::{check_status, ValueType};
use crate::{sys, Env, Error, JsObject, JsUnknown, NapiRaw, NapiValue, Result, Status};
@ -95,8 +97,7 @@ impl JsFunction {
/// https://nodejs.org/api/n-api.html#n_api_napi_new_instance
///
/// This method is used to instantiate a new `JavaScript` value using a given `JsFunction` that represents the constructor for the object.
#[allow(clippy::new_ret_no_self)]
pub fn new<V>(&self, args: &[V]) -> Result<JsObject>
pub fn new_instance<V>(&self, args: &[V]) -> Result<JsObject>
where
V: NapiRaw,
{
@ -117,4 +118,18 @@ impl JsFunction {
})?;
Ok(unsafe { JsObject::from_raw_unchecked(self.0.env, js_instance) })
}
#[cfg(feature = "napi4")]
pub fn create_threadsafe_function<T, V, F>(
&self,
max_queue_size: usize,
callback: F,
) -> Result<ThreadsafeFunction<T>>
where
T: 'static,
V: NapiRaw,
F: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>,
{
ThreadsafeFunction::create(self.0.env, self.0.value, max_queue_size, callback)
}
}

View file

@ -8,7 +8,7 @@ use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use crate::{check_status, sys, Env, Error, JsError, JsFunction, NapiRaw, Result, Status};
use crate::{check_status, sys, Env, Error, JsError, NapiRaw, Result, Status};
use sys::napi_threadsafe_function_call_mode;
@ -186,7 +186,7 @@ impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>,
>(
env: sys::napi_env,
func: &JsFunction,
func: sys::napi_value,
max_queue_size: usize,
callback: R,
) -> Result<Self> {
@ -204,7 +204,7 @@ impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
check_status!(unsafe {
sys::napi_create_threadsafe_function(
env,
func.0.value,
func,
ptr::null_mut(),
async_resource_name,
max_queue_size,

View file

@ -65,7 +65,7 @@ fn new_test_class(ctx: CallContext) -> Result<JsObject> {
.env
.define_class("TestClass", test_class_constructor, properties.as_slice())?;
test_class.new(&[ctx.env.create_int32(42)?])
test_class.new_instance(&[ctx.env.create_int32(42)?])
}
pub fn register_js(exports: &mut JsObject) -> Result<()> {

View file

@ -49,7 +49,7 @@ pub fn throw_syntax_error(ctx: CallContext) -> Result<JsUndefined> {
.env
.get_global()?
.get_named_property::<JsFunction>("SyntaxError")?;
ctx.env.throw(syntax_error.new(&[message])?)?;
ctx.env.throw(syntax_error.new_instance(&[message])?)?;
ctx.env.get_undefined()
}

View file

@ -48,6 +48,8 @@ Generated by [AVA](https://avajs.dev).
export function concatLatin1(s: string): string␊
export function withoutAbortController(a: number, b: number): Promise<number>
export function withAbortController(a: number, b: number, signal: AbortSignal): Promise<number>
export function callThreadsafeFunction(callback: (...args: any[]) => any): void␊
export function threadsafeFunctionThrowError(cb: (...args: any[]) => any): void␊
export function getBuffer(): Buffer␊
export class Animal {␊
readonly kind: Kind␊

View file

@ -38,6 +38,8 @@ import {
bigintAdd,
createBigInt,
createBigIntI64,
callThreadsafeFunction,
threadsafeFunctionThrowError,
} from '../'
test('number', (t) => {
@ -201,9 +203,10 @@ test('async task without abort controller', async (t) => {
t.is(await withoutAbortController(1, 2), 3)
})
const MaybeTest = typeof AbortController !== 'undefined' ? test : test.skip
const AbortSignalTest =
typeof AbortController !== 'undefined' ? test : test.skip
MaybeTest('async task with abort controller', async (t) => {
AbortSignalTest('async task with abort controller', async (t) => {
const ctrl = new AbortController()
const promise = withAbortController(1, 2, ctrl.signal)
try {
@ -215,7 +218,7 @@ MaybeTest('async task with abort controller', async (t) => {
}
})
MaybeTest('abort resolved task', async (t) => {
AbortSignalTest('abort resolved task', async (t) => {
const ctrl = new AbortController()
await withAbortController(1, 2, ctrl.signal).then(() => ctrl.abort())
t.pass('should not throw')
@ -234,3 +237,33 @@ BigIntTest('create BigInt', (t) => {
BigIntTest('create BigInt i64', (t) => {
t.is(createBigIntI64(), BigInt(100))
})
const ThreadsafeFunctionTest =
Number(process.versions.napi) >= 4 ? test : test.skip
ThreadsafeFunctionTest('call thread safe function', (t) => {
let i = 0
let value = 0
return new Promise((resolve) => {
callThreadsafeFunction((err, v) => {
t.is(err, null)
i++
value += v
if (i === 100) {
resolve()
t.is(
value,
Array.from({ length: 100 }, (_, i) => i + 1).reduce((a, b) => a + b),
)
}
})
})
})
ThreadsafeFunctionTest('throw error from thread safe function', async (t) => {
const throwPromise = new Promise((_, reject) => {
threadsafeFunctionThrowError(reject)
})
const err = await t.throwsAsync(throwPromise)
t.is(err.message, 'ThrowFromNative')
})

View file

@ -38,6 +38,8 @@ export function concatUtf16(s: string): string
export function concatLatin1(s: string): string
export function withoutAbortController(a: number, b: number): Promise<number>
export function withAbortController(a: number, b: number, signal: AbortSignal): Promise<number>
export function callThreadsafeFunction(callback: (...args: any[]) => any): void
export function threadsafeFunctionThrowError(cb: (...args: any[]) => any): void
export function getBuffer(): Buffer
export class Animal {
readonly kind: Kind

View file

@ -18,4 +18,5 @@ mod object;
mod serde;
mod string;
mod task;
mod threadsafe_function;
mod typed_array;

View file

@ -0,0 +1,37 @@
use std::thread;
use napi::{
bindgen_prelude::*,
threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunctionCallMode},
};
#[napi]
pub fn call_threadsafe_function(callback: JsFunction) -> Result<()> {
let tsfn = callback.create_threadsafe_function(0, |ctx: ThreadSafeCallContext<u32>| {
ctx.env.create_uint32(ctx.value + 1).map(|v| vec![v])
})?;
for n in 0..100 {
let tsfn = tsfn.clone();
thread::spawn(move || {
tsfn.call(Ok(n), ThreadsafeFunctionCallMode::Blocking);
});
}
Ok(())
}
#[napi]
pub fn threadsafe_function_throw_error(cb: JsFunction) -> Result<()> {
let tsfn = cb.create_threadsafe_function(0, |ctx: ThreadSafeCallContext<bool>| {
ctx.env.get_boolean(ctx.value).map(|v| vec![v])
})?;
thread::spawn(move || {
tsfn.call(
Err(Error::new(
Status::GenericFailure,
"ThrowFromNative".to_owned(),
)),
ThreadsafeFunctionCallMode::Blocking,
);
});
Ok(())
}