fix(napi): make buffer Send & Sync safe

This commit is contained in:
LongYinan 2022-04-06 21:08:00 +08:00
parent 6eec0f93c1
commit 6a252c70d2
7 changed files with 79 additions and 13 deletions

View file

@ -1,13 +1,17 @@
use std::mem;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::{mem, ptr}; use std::ptr;
use std::slice;
use crate::{bindgen_prelude::*, check_status, sys, Result, ValueType}; use crate::{bindgen_prelude::*, check_status, sys, Result, ValueType};
/// Zero copy u8 vector shared between rust and napi. /// Zero copy u8 vector shared between rust and napi.
/// Auto reference the raw JavaScript value, and release it when dropped. /// Auto reference the raw JavaScript value, and release it when dropped.
/// So it is safe to use it in `async fn`, the `&[u8]` under the hood will not be dropped until the `drop` called. /// So it is safe to use it in `async fn`, the `&[u8]` under the hood will not be dropped until the `drop` called.
/// Clone will create a new `Reference` to the same underlying `JavaScript Buffer`.
pub struct Buffer { pub struct Buffer {
inner: mem::ManuallyDrop<Vec<u8>>, inner: &'static mut [u8],
capacity: usize,
raw: Option<(sys::napi_ref, sys::napi_env)>, raw: Option<(sys::napi_ref, sys::napi_env)>,
} }
@ -16,7 +20,7 @@ impl Drop for Buffer {
if let Some((ref_, env)) = self.raw { if let Some((ref_, env)) = self.raw {
check_status_or_throw!( check_status_or_throw!(
env, env,
unsafe { sys::napi_delete_reference(env, ref_) }, unsafe { sys::napi_reference_unref(env, ref_, &mut 0) },
"Failed to delete Buffer reference in drop" "Failed to delete Buffer reference in drop"
); );
} }
@ -25,10 +29,36 @@ impl Drop for Buffer {
unsafe impl Send for Buffer {} unsafe impl Send for Buffer {}
impl Buffer {
pub fn clone(&mut self, env: &Env) -> Result<Self> {
if let Some((ref_, _)) = self.raw {
check_status!(
unsafe { sys::napi_reference_ref(env.0, ref_, &mut 0) },
"Failed to ref Buffer reference in Buffer::clone"
)?;
Ok(Self {
inner: unsafe { slice::from_raw_parts_mut(self.inner.as_mut_ptr(), self.inner.len()) },
capacity: self.capacity,
raw: Some((ref_, env.0)),
})
} else {
Err(Error::new(
Status::InvalidArg,
"clone only available when the buffer is created from a JavaScript Buffer".to_owned(),
))
}
}
}
impl From<Vec<u8>> for Buffer { impl From<Vec<u8>> for Buffer {
fn from(data: Vec<u8>) -> Self { fn from(mut data: Vec<u8>) -> Self {
let inner_ptr = data.as_mut_ptr();
let len = data.len();
let capacity = data.capacity();
mem::forget(data);
Buffer { Buffer {
inner: mem::ManuallyDrop::new(data), inner: unsafe { slice::from_raw_parts_mut(inner_ptr, len) },
capacity,
raw: None, raw: None,
} }
} }
@ -48,13 +78,13 @@ impl From<&[u8]> for Buffer {
impl AsRef<[u8]> for Buffer { impl AsRef<[u8]> for Buffer {
fn as_ref(&self) -> &[u8] { fn as_ref(&self) -> &[u8] {
self.inner.as_slice() self.inner
} }
} }
impl AsMut<[u8]> for Buffer { impl AsMut<[u8]> for Buffer {
fn as_mut(&mut self) -> &mut [u8] { fn as_mut(&mut self) -> &mut [u8] {
self.inner.as_mut_slice() self.inner
} }
} }
@ -62,13 +92,13 @@ impl Deref for Buffer {
type Target = [u8]; type Target = [u8];
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
self.inner.as_slice() self.inner
} }
} }
impl DerefMut for Buffer { impl DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target { fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut_slice() self.inner
} }
} }
@ -97,7 +127,8 @@ impl FromNapiValue for Buffer {
)?; )?;
Ok(Self { Ok(Self {
inner: mem::ManuallyDrop::new(unsafe { Vec::from_raw_parts(buf as *mut _, len, len) }), inner: unsafe { slice::from_raw_parts_mut(buf as *mut _, len) },
capacity: len,
raw: Some((ref_, env)), raw: Some((ref_, env)),
}) })
} }
@ -128,7 +159,7 @@ impl ToNapiValue for Buffer {
len, len,
val.inner.as_mut_ptr() as *mut _, val.inner.as_mut_ptr() as *mut _,
Some(drop_buffer), Some(drop_buffer),
Box::into_raw(Box::new((len, val.inner.capacity()))) as *mut _, Box::into_raw(Box::new((len, val.capacity))) as *mut _,
&mut ret, &mut ret,
) )
}, },

View file

@ -172,6 +172,7 @@ Generated by [AVA](https://avajs.dev).
export function mutateTypedArray(input: Float32Array): void␊ export function mutateTypedArray(input: Float32Array): void␊
export function derefUint8Array(a: Uint8Array, b: Uint8ClampedArray): number␊ export function derefUint8Array(a: Uint8Array, b: Uint8ClampedArray): number␊
export function bufferPassThrough(buf: Buffer): Promise<Buffer> export function bufferPassThrough(buf: Buffer): Promise<Buffer>
export function asyncReduceBuffer(buf: Buffer): Promise<number>
/**␊ /**␊
* \`constructor\` option for \`struct\` requires all fields to be public,␊ * \`constructor\` option for \`struct\` requires all fields to be public,␊
* otherwise tag impl fn as constructor␊ * otherwise tag impl fn as constructor␊

View file

@ -88,6 +88,7 @@ import {
chronoDateAdd1Minute, chronoDateAdd1Minute,
bufferPassThrough, bufferPassThrough,
JsRepo, JsRepo,
asyncReduceBuffer,
} from '../' } from '../'
test('export const', (t) => { test('export const', (t) => {
@ -395,6 +396,15 @@ test('buffer passthrough', async (t) => {
t.deepEqual(ret, fixture) t.deepEqual(ret, fixture)
}) })
test('async reduce buffer', async (t) => {
const input = [1, 2, 3, 4, 5, 6]
const fixture = Buffer.from(input)
t.is(
await asyncReduceBuffer(fixture),
input.reduce((acc, cur) => acc + cur),
)
})
test('either', (t) => { test('either', (t) => {
t.is(eitherStringOrNumber(2), 2) t.is(eitherStringOrNumber(2), 2)
t.is(eitherStringOrNumber('hello'), 'hello'.length) t.is(eitherStringOrNumber('hello'), 'hello'.length)

View file

@ -162,6 +162,7 @@ export function createExternalTypedArray(): Uint32Array
export function mutateTypedArray(input: Float32Array): void export function mutateTypedArray(input: Float32Array): void
export function derefUint8Array(a: Uint8Array, b: Uint8ClampedArray): number export function derefUint8Array(a: Uint8Array, b: Uint8ClampedArray): number
export function bufferPassThrough(buf: Buffer): Promise<Buffer> export function bufferPassThrough(buf: Buffer): Promise<Buffer>
export function asyncReduceBuffer(buf: Buffer): Promise<number>
/** /**
* `constructor` option for `struct` requires all fields to be public, * `constructor` option for `struct` requires all fields to be public,
* otherwise tag impl fn as constructor * otherwise tag impl fn as constructor

View file

@ -38,3 +38,28 @@ fn deref_uint8_array(a: Uint8Array, b: Uint8ClampedArray) -> u32 {
async fn buffer_pass_through(buf: Buffer) -> Result<Buffer> { async fn buffer_pass_through(buf: Buffer) -> Result<Buffer> {
Ok(buf) Ok(buf)
} }
struct AsyncBuffer {
buf: Buffer,
}
#[napi]
impl Task for AsyncBuffer {
type Output = u32;
type JsValue = u32;
fn compute(&mut self) -> Result<Self::Output> {
Ok(self.buf.iter().fold(0u32, |a, b| a + *b as u32))
}
fn resolve(&mut self, _env: Env, output: Self::Output) -> Result<Self::JsValue> {
Ok(output)
}
}
#[napi]
fn async_reduce_buffer(mut buf: Buffer, env: Env) -> Result<AsyncTask<AsyncBuffer>> {
Ok(AsyncTask::new(AsyncBuffer {
buf: buf.clone(&env)?,
}))
}

View file

@ -2,5 +2,3 @@ import { createSuite } from './test-util.mjs'
await createSuite('tokio-future') await createSuite('tokio-future')
await createSuite('serde') await createSuite('serde')
process.exit(0)