Signed-off-by: Matheus Sampaio Queiroga <srherobrine20@gmail.com>
330 lines
13 KiB
Go
330 lines
13 KiB
Go
package napi
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
"runtime"
|
|
"runtime/cgo"
|
|
"sync"
|
|
"unsafe"
|
|
|
|
"sirherobrine23.com.br/Sirherobrine23/napi-go/internal/napi"
|
|
)
|
|
|
|
/*
|
|
#include <node/node_api.h>
|
|
|
|
// Forward declaration of the C callback functions
|
|
extern void executeThreadsafeFunctionCallJSCallback(napi_env env, napi_value js_callback, void* context, void* data);
|
|
extern void finalizeThreadsafeFunctionCallback(napi_env env, void* finalize_data, void* finalize_hint);
|
|
*/
|
|
import "C"
|
|
|
|
// ThreadsafeFunction represents a N-API thread-safe function.
|
|
type ThreadsafeFunction struct {
|
|
value
|
|
tsfn napi.ThreadsafeFunction
|
|
callJSCallback ThreadsafeFunctionCallJSCallback // Callback to execute on the main thread
|
|
goCallbackHandle cgo.Handle // Handle to the Go callback data
|
|
}
|
|
|
|
// ThreadsafeFunctionReleaseMode is an alias for napi.ThreadsafeFunctionReleaseMode,
|
|
// representing the release mode for a N-API threadsafe function.
|
|
// It determines how the threadsafe function is released, such as whether it is
|
|
// released automatically or manually by the user.
|
|
type ThreadsafeFunctionReleaseMode = napi.ThreadsafeFunctionReleaseMode
|
|
|
|
// ThreadsafeFunctionCallMode is an alias for napi.ThreadsafeFunctionCallMode,
|
|
// representing the mode in which a threadsafe function call is executed.
|
|
// It is used to specify how calls to a threadsafe function are handled,
|
|
// such as whether they are blocking or non-blocking.
|
|
type ThreadsafeFunctionCallMode = napi.ThreadsafeFunctionCallMode
|
|
|
|
// ThreadsafeFunctionFinalizeCallback defines a function type that is called when a threadsafe function is being finalized.
|
|
// It receives the N-API environment (env) and a user-provided context value (context) as arguments.
|
|
// This callback can be used to perform cleanup or resource release operations associated with the threadsafe function.
|
|
type ThreadsafeFunctionFinalizeCallback func(env EnvType, context any)
|
|
|
|
// ThreadsafeFunctionCallJSCallback is the signature for the callback function that
|
|
// will be invoked on the main Node.js thread when data is sent from another thread.
|
|
// The `jsCallback` is the original JavaScript function passed to CreateThreadsafeFunction.
|
|
// The `data` is the value passed to the Call method from the background thread.
|
|
type ThreadsafeFunctionCallJSCallback func(env EnvType, jsCallback *Function, data any)
|
|
|
|
// Release indicates that the threadsafe function should be released after all queued calls have been completed.
|
|
// This mode is used to signal that no further calls will be made and resources can be cleaned up.
|
|
const (
|
|
Release ThreadsafeFunctionReleaseMode = napi.Release
|
|
Abort ThreadsafeFunctionReleaseMode = napi.Abort
|
|
NonBlocking ThreadsafeFunctionCallMode = napi.NonBlocking
|
|
Blocking ThreadsafeFunctionCallMode = napi.Blocking
|
|
)
|
|
|
|
// Global map to store callbacks associated with thread-safe functions
|
|
var (
|
|
tsfnCallbacks = make(map[napi.ThreadsafeFunction]*ThreadsafeFunction)
|
|
tsfnCallbacksMutex sync.RWMutex
|
|
)
|
|
|
|
type threadsafeCallbackData struct {
|
|
tsfn *ThreadsafeFunction
|
|
goData any // Data received from the background thread
|
|
}
|
|
|
|
//export executeThreadsafeFunctionCallJSCallback
|
|
func executeThreadsafeFunctionCallJSCallback(cEnv C.napi_env, cJsCallback C.napi_value, _ unsafe.Pointer, data unsafe.Pointer) {
|
|
// This function runs on the main Node.js thread
|
|
env := N_APIEnv(napi.Env(cEnv))
|
|
|
|
// Retrieve the ThreadsafeFunction instance and the data from Go handles
|
|
callbackDataHandle := cgo.Handle(data)
|
|
callbackData := callbackDataHandle.Value().(*threadsafeCallbackData)
|
|
tsfn := callbackData.tsfn
|
|
goData := callbackData.goData
|
|
callbackDataHandle.Delete() // Clean up the handle for the data
|
|
|
|
// It's crucial to handle potential panics in the callback
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
errStr := fmt.Sprintf("panic recovered in threadsafe function callback: %v", r)
|
|
napi.ThrowError(env.NapiValue(), "", errStr) // Optionally throw an error back to the main loop
|
|
}
|
|
}()
|
|
|
|
if tsfn == nil || tsfn.callJSCallback == nil {
|
|
napi.ThrowError(env.NapiValue(), "", "Error: ThreadsafeFunction or its callback is nil in executeThreadsafeFunctionCallJSCallback")
|
|
return
|
|
}
|
|
|
|
// Wrap the JS callback N-API value
|
|
jsCallbackFunc := ToFunction(N_APIValue(env, napi.Value(cJsCallback)))
|
|
|
|
// Execute the user-provided Go callback
|
|
tsfn.callJSCallback(env, jsCallbackFunc, goData)
|
|
}
|
|
|
|
//export finalizeThreadsafeFunctionCallback
|
|
func finalizeThreadsafeFunctionCallback(cEnv C.napi_env, finalizeData unsafe.Pointer, _ unsafe.Pointer) {
|
|
// This function is called when the threadsafe function is being destroyed
|
|
// Retrieve the Go handle for the callback data
|
|
handle := cgo.Handle(finalizeData)
|
|
callbackData := handle.Value().(*struct {
|
|
Context any
|
|
FinalizeCallback ThreadsafeFunctionFinalizeCallback
|
|
TsfnWrapper *ThreadsafeFunction
|
|
})
|
|
|
|
// Call the user-provided finalize callback if present
|
|
if callbackData.FinalizeCallback != nil {
|
|
env := N_APIEnv(napi.Env(cEnv))
|
|
// It's crucial to handle potential panics in the callback
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
errStr := fmt.Sprintf("panic recovered in threadsafe function finalize callback: %v", r)
|
|
napi.ThrowError(env.NapiValue(), "", errStr)
|
|
}
|
|
}()
|
|
|
|
callbackData.FinalizeCallback(env, callbackData.Context)
|
|
}
|
|
|
|
// Clean up the Go handle
|
|
handle.Delete()
|
|
}
|
|
|
|
// CreateThreadsafeFunction creates a new N-API thread-safe function.
|
|
//
|
|
// Parameters:
|
|
// - env: The N-API environment.
|
|
// - jsFunc: The JavaScript function to be called from other threads (optional, can be nil if callJSCallback handles everything).
|
|
// - resourceName: A string identifying the resource associated with the async work.
|
|
// - maxQueueSize: The maximum size of the queue. 0 for no limit.
|
|
// - initialThreadCount: The initial number of threads that will use this function (used for reference counting). Must be >= 1.
|
|
// - callJSCallback: The Go function to be executed on the main thread when data arrives.
|
|
// - context: Optional Go data accessible within callJSCallback via GetContext.
|
|
// - finalizeCallback: Optional Go function called when the thread-safe function is being destroyed.
|
|
func CreateThreadsafeFunction(env EnvType, jsFunc Callback, finalizeCallback ThreadsafeFunctionFinalizeCallback, callJSCallback ThreadsafeFunctionCallJSCallback, resourceName string, maxQueueSize, initialThreadCount int, context any) (*ThreadsafeFunction, error) {
|
|
if initialThreadCount < 1 {
|
|
return nil, fmt.Errorf("initialThreadCount must be at least 1")
|
|
} else if callJSCallback == nil {
|
|
return nil, fmt.Errorf("callJSCallback cannot be nil")
|
|
}
|
|
|
|
resourceNameVal, err := CreateString(env, resourceName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create resource name string: %w", err)
|
|
}
|
|
|
|
var jsFuncVal napi.Value
|
|
if jsFunc != nil {
|
|
jsFn, err := CreateFunction(env, runtime.FuncForPC(reflect.ValueOf(jsFunc).Pointer()).Name(), jsFunc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create JavaScript function: %w", err)
|
|
}
|
|
jsFuncVal = jsFn.NapiValue()
|
|
}
|
|
|
|
tsfnWrapper := &ThreadsafeFunction{
|
|
callJSCallback: callJSCallback,
|
|
}
|
|
|
|
// Store context and finalize callback if provided
|
|
callbackData := &struct {
|
|
Context any
|
|
FinalizeCallback ThreadsafeFunctionFinalizeCallback
|
|
TsfnWrapper *ThreadsafeFunction // Reference back to the wrapper
|
|
}{
|
|
Context: context,
|
|
FinalizeCallback: finalizeCallback,
|
|
TsfnWrapper: tsfnWrapper,
|
|
}
|
|
tsfnWrapper.goCallbackHandle = cgo.NewHandle(callbackData)
|
|
|
|
var finalizeCbPtr C.napi_finalize
|
|
if finalizeCallback != nil {
|
|
finalizeCbPtr = C.napi_finalize(C.finalizeThreadsafeFunctionCallback)
|
|
}
|
|
|
|
var cTsfn napi.ThreadsafeFunction
|
|
status := napi.Status(C.napi_create_threadsafe_function(
|
|
C.napi_env(env.NapiValue()),
|
|
C.napi_value(jsFuncVal),
|
|
nil, // async_resource (optional)
|
|
C.napi_value(resourceNameVal.NapiValue()),
|
|
C.size_t(maxQueueSize),
|
|
C.size_t(initialThreadCount),
|
|
unsafe.Pointer(tsfnWrapper.goCallbackHandle), // thread_finalize_data
|
|
finalizeCbPtr, // thread_finalize_cb (optional)
|
|
unsafe.Pointer(tsfnWrapper.goCallbackHandle), // context
|
|
C.napi_threadsafe_function_call_js(C.executeThreadsafeFunctionCallJSCallback), // call_js_cb
|
|
(*C.napi_threadsafe_function)(unsafe.Pointer(&cTsfn)),
|
|
))
|
|
|
|
if err := status.ToError(); err != nil {
|
|
tsfnWrapper.goCallbackHandle.Delete() // Clean up handle on failure
|
|
return nil, fmt.Errorf("failed to create threadsafe function: %w", err)
|
|
}
|
|
|
|
tsfnWrapper.tsfn = cTsfn
|
|
tsfnWrapper.value = N_APIValue(env, jsFuncVal) // Use jsFunc as the underlying value if provided
|
|
|
|
// Store the mapping from N-API handle to Go wrapper
|
|
tsfnCallbacksMutex.Lock()
|
|
tsfnCallbacks[cTsfn] = tsfnWrapper
|
|
tsfnCallbacksMutex.Unlock()
|
|
|
|
return tsfnWrapper, nil
|
|
}
|
|
|
|
// GetContext retrieves the context data provided during creation.
|
|
func (tsfn *ThreadsafeFunction) GetContext() (any, error) {
|
|
if tsfn.goCallbackHandle == 0 {
|
|
return nil, fmt.Errorf("threadsafe function has no associated context handle (possibly already finalized)")
|
|
}
|
|
// We stored the handle in the wrapper itself during creation
|
|
callbackData := tsfn.goCallbackHandle.Value().(*struct {
|
|
Context any
|
|
FinalizeCallback ThreadsafeFunctionFinalizeCallback
|
|
TsfnWrapper *ThreadsafeFunction
|
|
})
|
|
return callbackData.Context, nil
|
|
}
|
|
|
|
// Call sends data from a background thread to the main Node.js thread.
|
|
// The data will be received by the callJSCallback provided during creation.
|
|
// This method is safe to call from any thread.
|
|
func (tsfn *ThreadsafeFunction) Call(data any, mode ThreadsafeFunctionCallMode) error {
|
|
if tsfn.tsfn == nil {
|
|
return fmt.Errorf("threadsafe function is not initialized or already released")
|
|
}
|
|
|
|
// Create a handle for the Go data to pass it safely through C
|
|
// The handle will be deleted by the executeThreadsafeFunctionCallJSCallback on the main thread.
|
|
dataHandle := cgo.NewHandle(&threadsafeCallbackData{
|
|
tsfn: tsfn,
|
|
goData: data,
|
|
})
|
|
|
|
status := napi.CallThreadsafeFunction(
|
|
tsfn.tsfn,
|
|
// unsafe.Pointer(dataHandle), // Pass the handle as data
|
|
mode,
|
|
)
|
|
if err := status.ToError(); err != nil {
|
|
// If the call fails, we need to delete the handle ourselves
|
|
dataHandle.Delete()
|
|
// Specific error handling for queue full might be needed
|
|
if status == napi.StatusQueueFull && mode == NonBlocking {
|
|
return fmt.Errorf("threadsafe function queue is full: %w", err)
|
|
}
|
|
return fmt.Errorf("failed to call threadsafe function: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Acquire increments the reference count for the thread-safe function,
|
|
// indicating that a new thread is about to start using it.
|
|
// This method is safe to call from any thread.
|
|
func (tsfn *ThreadsafeFunction) Acquire() error {
|
|
if tsfn.tsfn == nil {
|
|
return fmt.Errorf("threadsafe function is not initialized or already released")
|
|
}
|
|
return singleMustValueErr(napi.AcquireThreadsafeFunction(tsfn.tsfn))
|
|
}
|
|
|
|
// Release decrements the reference count for the thread-safe function.
|
|
// It should be called when a thread finishes using the function.
|
|
// The mode specifies whether to close the queue immediately (Abort) or
|
|
// wait for pending items to be processed (Release).
|
|
// This method is safe to call from any thread.
|
|
func (tsfn *ThreadsafeFunction) Release(mode napi.ThreadsafeFunctionReleaseMode) error {
|
|
if tsfn.tsfn == nil {
|
|
return fmt.Errorf("threadsafe function is not initialized or already released")
|
|
}
|
|
|
|
// Remove from the global map *before* the final N-API release
|
|
// This prevents potential race conditions if the finalizer runs quickly.
|
|
// Only remove if the reference count will drop to zero or below (though N-API handles the actual destruction).
|
|
// Predicting the exact count is tricky, so removing might be premature if Release fails.
|
|
// A safer approach might involve coordination within the finalizer callback.
|
|
// For now, we remove it pessimistically.
|
|
tsfnCallbacksMutex.Lock()
|
|
delete(tsfnCallbacks, tsfn.tsfn)
|
|
tsfnCallbacksMutex.Unlock()
|
|
|
|
err := singleMustValueErr(napi.ReleaseThreadsafeFunction(tsfn.tsfn, mode))
|
|
if err == nil {
|
|
// If release was successful, mark the Go wrapper as invalid
|
|
// tsfn.tsfn = nil // Be careful with concurrent access if doing this
|
|
// Deleting the handle should ideally happen in the C finalizer callback
|
|
// tsfn.goCallbackHandle.Delete() // Potential double delete if finalizer runs
|
|
} else {
|
|
// If release failed, potentially re-add to map? Complex error recovery needed.
|
|
fmt.Printf("Warning: Failed to release threadsafe function: %v. State might be inconsistent.\n", err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Ref increments the N-API reference count, preventing the JS function object
|
|
// from being garbage collected while the thread-safe function is active.
|
|
// Must be called from the main Node.js thread.
|
|
func (tsfn *ThreadsafeFunction) Ref(env EnvType) error {
|
|
if tsfn.tsfn == nil {
|
|
return fmt.Errorf("threadsafe function is not initialized or already released")
|
|
}
|
|
// Ensure this is called from the main thread (N-API doesn't enforce this, but it's best practice)
|
|
// Checking the thread ID might be complex. Rely on user discipline for now.
|
|
return singleMustValueErr(napi.RefThreadsafeFunction(env.NapiValue(), tsfn.tsfn))
|
|
}
|
|
|
|
// Unref decrements the N-API reference count.
|
|
// Must be called from the main Node.js thread.
|
|
func (tsfn *ThreadsafeFunction) Unref(env EnvType) error {
|
|
if tsfn.tsfn == nil {
|
|
return fmt.Errorf("threadsafe function is not initialized or already released")
|
|
}
|
|
// Ensure this is called from the main thread
|
|
return singleMustValueErr(napi.UnrefThreadsafeFunction(env.NapiValue(), tsfn.tsfn))
|
|
}
|