1// For the purpose of this test we use libuv's threading library. When deciding
2// on a threading library for a new project it bears remembering that in the
3// future libuv may introduce API changes which may render it non-ABI-stable,
4// which, in turn, may affect the ABI stability of the project despite its use
5// of N-API.
6#include <uv.h>
7#include <node_api.h>
8#include "../../js-native-api/common.h"
9
10#define ARRAY_LENGTH 10000
11#define MAX_QUEUE_SIZE 2
12
13static uv_thread_t uv_threads[2];
14static napi_threadsafe_function ts_fn;
15
16typedef struct {
17  napi_threadsafe_function_call_mode block_on_full;
18  napi_threadsafe_function_release_mode abort;
19  bool start_secondary;
20  napi_ref js_finalize_cb;
21  uint32_t max_queue_size;
22} ts_fn_hint;
23
24static ts_fn_hint ts_info;
25
26// Thread data to transmit to JS
27static int ints[ARRAY_LENGTH];
28
29static void secondary_thread(void* data) {
30  napi_threadsafe_function ts_fn = data;
31
32  if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
33    napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH,
34        "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
35  }
36}
37
38// Source thread producing the data
39static void data_source_thread(void* data) {
40  napi_threadsafe_function ts_fn = data;
41  int index;
42  void* hint;
43  ts_fn_hint *ts_fn_info;
44  napi_status status;
45  bool queue_was_full = false;
46  bool queue_was_closing = false;
47
48  if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) {
49    napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
50        "napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH);
51  }
52
53  ts_fn_info = (ts_fn_hint *)hint;
54
55  if (ts_fn_info != &ts_info) {
56    napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
57      "thread-safe function hint is not as expected", NAPI_AUTO_LENGTH);
58  }
59
60  if (ts_fn_info->start_secondary) {
61    if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) {
62      napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
63        "napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH);
64    }
65
66    if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) {
67      napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
68        "failed to start secondary thread", NAPI_AUTO_LENGTH);
69    }
70  }
71
72  for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
73    status = napi_call_threadsafe_function(ts_fn, &ints[index],
74        ts_fn_info->block_on_full);
75    if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
76      // Let's make this thread really busy for 200 ms to give the main thread a
77      // chance to abort.
78      uint64_t start = uv_hrtime();
79      for (; uv_hrtime() - start < 200000000;);
80    }
81    switch (status) {
82      case napi_queue_full:
83        queue_was_full = true;
84        index++;
85        // fall through
86
87      case napi_ok:
88        continue;
89
90      case napi_closing:
91        queue_was_closing = true;
92        break;
93
94      default:
95        napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
96            "napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH);
97    }
98  }
99
100  // Assert that the enqueuing of a value was refused at least once, if this is
101  // a non-blocking test run.
102  if (!ts_fn_info->block_on_full && !queue_was_full) {
103    napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
104        "queue was never full", NAPI_AUTO_LENGTH);
105  }
106
107  // Assert that the queue was marked as closing at least once, if this is an
108  // aborting test run.
109  if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) {
110    napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
111      "queue was never closing", NAPI_AUTO_LENGTH);
112  }
113
114  if (!queue_was_closing &&
115      napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
116    napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
117        "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
118  }
119}
120
121// Getting the data into JS
122static void call_js(napi_env env, napi_value cb, void* hint, void* data) {
123  if (!(env == NULL || cb == NULL)) {
124    napi_value argv, undefined;
125    NODE_API_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
126    NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
127    NODE_API_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv,
128        NULL));
129  }
130}
131
132static napi_ref alt_ref;
133// Getting the data into JS with the alternative reference
134static void call_ref(napi_env env, napi_value _, void* hint, void* data) {
135  if (!(env == NULL || alt_ref == NULL)) {
136    napi_value fn, argv, undefined;
137    NODE_API_CALL_RETURN_VOID(env, napi_get_reference_value(env, alt_ref, &fn));
138    NODE_API_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
139    NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
140    NODE_API_CALL_RETURN_VOID(env, napi_call_function(env, undefined, fn, 1, &argv,
141        NULL));
142  }
143}
144
145// Cleanup
146static napi_value StopThread(napi_env env, napi_callback_info info) {
147  size_t argc = 2;
148  napi_value argv[2];
149  NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
150  napi_valuetype value_type;
151  NODE_API_CALL(env, napi_typeof(env, argv[0], &value_type));
152  NODE_API_ASSERT(env, value_type == napi_function,
153      "StopThread argument is a function");
154  NODE_API_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function");
155  NODE_API_CALL(env,
156      napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb)));
157  bool abort;
158  NODE_API_CALL(env, napi_get_value_bool(env, argv[1], &abort));
159  NODE_API_CALL(env,
160      napi_release_threadsafe_function(ts_fn,
161          abort ? napi_tsfn_abort : napi_tsfn_release));
162  ts_fn = NULL;
163  return NULL;
164}
165
166// Join the thread and inform JS that we're done.
167static void join_the_threads(napi_env env, void *data, void *hint) {
168  uv_thread_t *the_threads = data;
169  ts_fn_hint *the_hint = hint;
170  napi_value js_cb, undefined;
171
172  uv_thread_join(&the_threads[0]);
173  if (the_hint->start_secondary) {
174    uv_thread_join(&the_threads[1]);
175  }
176
177  NODE_API_CALL_RETURN_VOID(env,
178      napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb));
179  NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
180  NODE_API_CALL_RETURN_VOID(env,
181      napi_call_function(env, undefined, js_cb, 0, NULL, NULL));
182  NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env,
183      the_hint->js_finalize_cb));
184  if (alt_ref != NULL) {
185    NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env, alt_ref));
186    alt_ref = NULL;
187  }
188}
189
190static napi_value StartThreadInternal(napi_env env,
191                                      napi_callback_info info,
192                                      napi_threadsafe_function_call_js cb,
193                                      bool block_on_full,
194                                      bool alt_ref_js_cb) {
195
196  size_t argc = 4;
197  napi_value argv[4];
198
199  NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
200  if (alt_ref_js_cb) {
201    NODE_API_CALL(env, napi_create_reference(env, argv[0], 1, &alt_ref));
202    argv[0] = NULL;
203  }
204
205  ts_info.block_on_full =
206      (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
207
208  NODE_API_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function");
209  napi_value async_name;
210  NODE_API_CALL(env, napi_create_string_utf8(env,
211      "N-API Thread-safe Function Test", NAPI_AUTO_LENGTH, &async_name));
212  NODE_API_CALL(env,
213      napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size));
214  NODE_API_CALL(env, napi_create_threadsafe_function(env,
215                                                     argv[0],
216                                                     NULL,
217                                                     async_name,
218                                                     ts_info.max_queue_size,
219                                                     2,
220                                                     uv_threads,
221                                                     join_the_threads,
222                                                     &ts_info,
223                                                     cb,
224                                                     &ts_fn));
225  bool abort;
226  NODE_API_CALL(env, napi_get_value_bool(env, argv[1], &abort));
227  ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
228  NODE_API_CALL(env,
229      napi_get_value_bool(env, argv[2], &(ts_info.start_secondary)));
230
231  NODE_API_ASSERT(env,
232      (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0),
233      "Thread creation");
234
235  return NULL;
236}
237
238static napi_value Unref(napi_env env, napi_callback_info info) {
239  NODE_API_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
240  NODE_API_CALL(env, napi_unref_threadsafe_function(env, ts_fn));
241  return NULL;
242}
243
244static napi_value Release(napi_env env, napi_callback_info info) {
245  NODE_API_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
246  NODE_API_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release));
247  return NULL;
248}
249
250// Startup
251static napi_value StartThread(napi_env env, napi_callback_info info) {
252  return StartThreadInternal(env, info, call_js,
253    /** block_on_full */true, /** alt_ref_js_cb */false);
254}
255
256static napi_value StartThreadNonblocking(napi_env env,
257                                         napi_callback_info info) {
258  return StartThreadInternal(env, info, call_js,
259    /** block_on_full */false, /** alt_ref_js_cb */false);
260}
261
262static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) {
263  return StartThreadInternal(env, info, NULL,
264    /** block_on_full */true, /** alt_ref_js_cb */false);
265}
266
267static napi_value StartThreadNoJsFunc(napi_env env, napi_callback_info info) {
268  return StartThreadInternal(env, info, call_ref,
269    /** block_on_full */true, /** alt_ref_js_cb */true);
270}
271
272// Testing calling into JavaScript
273static void ThreadSafeFunctionFinalize(napi_env env,
274                              void* finalize_data,
275                              void* finalize_hint) {
276  napi_ref js_func_ref = (napi_ref) finalize_data;
277  napi_value js_func;
278  napi_value recv;
279  NODE_API_CALL_RETURN_VOID(env, napi_get_reference_value(env, js_func_ref, &js_func));
280  NODE_API_CALL_RETURN_VOID(env, napi_get_global(env, &recv));
281  NODE_API_CALL_RETURN_VOID(env, napi_call_function(env, recv, js_func, 0, NULL, NULL));
282  NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env, js_func_ref));
283}
284
285// Testing calling into JavaScript
286static napi_value CallIntoModule(napi_env env, napi_callback_info info) {
287  size_t argc = 4;
288  napi_value argv[4];
289  NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
290
291  napi_ref finalize_func;
292  NODE_API_CALL(env, napi_create_reference(env, argv[3], 1, &finalize_func));
293
294  napi_threadsafe_function tsfn;
295  NODE_API_CALL(env, napi_create_threadsafe_function(env, argv[0], argv[1], argv[2], 0, 1, finalize_func, ThreadSafeFunctionFinalize, NULL, NULL, &tsfn));
296  NODE_API_CALL(env, napi_call_threadsafe_function(tsfn, NULL, napi_tsfn_blocking));
297  NODE_API_CALL(env, napi_release_threadsafe_function(tsfn, napi_tsfn_release));
298  return NULL;
299}
300
301// Module init
302static napi_value Init(napi_env env, napi_value exports) {
303  size_t index;
304  for (index = 0; index < ARRAY_LENGTH; index++) {
305    ints[index] = index;
306  }
307  napi_value js_array_length, js_max_queue_size;
308  napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
309  napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size);
310
311  napi_property_descriptor properties[] = {
312    {
313      "ARRAY_LENGTH",
314      NULL,
315      NULL,
316      NULL,
317      NULL,
318      js_array_length,
319      napi_enumerable,
320      NULL
321    },
322    {
323      "MAX_QUEUE_SIZE",
324      NULL,
325      NULL,
326      NULL,
327      NULL,
328      js_max_queue_size,
329      napi_enumerable,
330      NULL
331    },
332    DECLARE_NODE_API_PROPERTY("StartThread", StartThread),
333    DECLARE_NODE_API_PROPERTY("StartThreadNoNative", StartThreadNoNative),
334    DECLARE_NODE_API_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
335    DECLARE_NODE_API_PROPERTY("StartThreadNoJsFunc", StartThreadNoJsFunc),
336    DECLARE_NODE_API_PROPERTY("StopThread", StopThread),
337    DECLARE_NODE_API_PROPERTY("Unref", Unref),
338    DECLARE_NODE_API_PROPERTY("Release", Release),
339    DECLARE_NODE_API_PROPERTY("CallIntoModule", CallIntoModule),
340  };
341
342  NODE_API_CALL(env, napi_define_properties(env, exports,
343    sizeof(properties)/sizeof(properties[0]), properties));
344
345  return exports;
346}
347NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)
348