1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv.h"
23#include "task.h"
24
25#include <stdio.h>
26#include <string.h>
27
28/* See test-ipc.c */
29void spawn_helper(uv_pipe_t* channel,
30                  uv_process_t* process,
31                  const char* helper);
32
33void ipc_send_recv_helper_threadproc(void* arg);
34
35union handles {
36  uv_handle_t handle;
37  uv_stream_t stream;
38  uv_pipe_t pipe;
39  uv_tcp_t tcp;
40  uv_tty_t tty;
41};
42
43struct test_ctx {
44  uv_pipe_t channel;
45  uv_connect_t connect_req;
46  uv_write_t write_req;
47  uv_write_t write_req2;
48  uv_handle_type expected_type;
49  union handles send;
50  union handles send2;
51  union handles recv;
52  union handles recv2;
53};
54
55struct echo_ctx {
56  uv_pipe_t listen;
57  uv_pipe_t channel;
58  uv_write_t write_req;
59  uv_write_t write_req2;
60  uv_handle_type expected_type;
61  union handles recv;
62  union handles recv2;
63};
64
65static struct test_ctx ctx;
66static struct echo_ctx ctx2;
67
68/* Used in write2_cb to decide if we need to cleanup or not */
69static int is_child_process;
70static int is_in_process;
71static int read_cb_count;
72static int recv_cb_count;
73static int write2_cb_called;
74
75
76static void alloc_cb(uv_handle_t* handle,
77                     size_t suggested_size,
78                     uv_buf_t* buf) {
79  /* We're not actually reading anything so a small buffer is okay
80   * but it needs to be heap-allocated to appease TSan.
81   */
82  buf->len = 8;
83  buf->base = malloc(buf->len);
84  ASSERT_NOT_NULL(buf->base);
85}
86
87
88static void recv_cb(uv_stream_t* handle,
89                    ssize_t nread,
90                    const uv_buf_t* buf) {
91  uv_handle_type pending;
92  uv_pipe_t* pipe;
93  int r;
94  union handles* recv;
95
96  free(buf->base);
97
98  pipe = (uv_pipe_t*) handle;
99  ASSERT_PTR_EQ(pipe, &ctx.channel);
100
101  do {
102    if (++recv_cb_count == 1) {
103      recv = &ctx.recv;
104    } else {
105      recv = &ctx.recv2;
106    }
107
108    /* Depending on the OS, the final recv_cb can be called after
109     * the child process has terminated which can result in nread
110     * being UV_EOF instead of the number of bytes read.  Since
111     * the other end of the pipe has closed this UV_EOF is an
112     * acceptable value. */
113    if (nread == UV_EOF) {
114      /* UV_EOF is only acceptable for the final recv_cb call */
115      ASSERT_EQ(2, recv_cb_count);
116    } else {
117      ASSERT_GE(nread, 0);
118      ASSERT_GT(uv_pipe_pending_count(pipe), 0);
119
120      pending = uv_pipe_pending_type(pipe);
121      ASSERT_EQ(pending, ctx.expected_type);
122
123      if (pending == UV_NAMED_PIPE)
124        r = uv_pipe_init(ctx.channel.loop, &recv->pipe, 0);
125      else if (pending == UV_TCP)
126        r = uv_tcp_init(ctx.channel.loop, &recv->tcp);
127      else
128        abort();
129      ASSERT_OK(r);
130
131      r = uv_accept(handle, &recv->stream);
132      ASSERT_OK(r);
133    }
134  } while (uv_pipe_pending_count(pipe) > 0);
135
136  /* Close after two writes received */
137  if (recv_cb_count == 2) {
138    uv_close((uv_handle_t*)&ctx.channel, NULL);
139  }
140}
141
142static void connect_cb(uv_connect_t* req, int status) {
143  int r;
144  uv_buf_t buf;
145
146  ASSERT_PTR_EQ(req, &ctx.connect_req);
147  ASSERT_OK(status);
148
149  buf = uv_buf_init(".", 1);
150  r = uv_write2(&ctx.write_req,
151                (uv_stream_t*)&ctx.channel,
152                &buf, 1,
153                &ctx.send.stream,
154                NULL);
155  ASSERT_OK(r);
156
157  /* Perform two writes to the same pipe to make sure that on Windows we are
158   * not running into issue 505:
159   *   https://github.com/libuv/libuv/issues/505 */
160  buf = uv_buf_init(".", 1);
161  r = uv_write2(&ctx.write_req2,
162                (uv_stream_t*)&ctx.channel,
163                &buf, 1,
164                &ctx.send2.stream,
165                NULL);
166  ASSERT_OK(r);
167
168  r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, recv_cb);
169  ASSERT_OK(r);
170}
171
172static int run_test(int inprocess) {
173  uv_process_t process;
174  uv_thread_t tid;
175  int r;
176
177  if (inprocess) {
178    r = uv_thread_create(&tid, ipc_send_recv_helper_threadproc, (void *) 42);
179    ASSERT_OK(r);
180
181    uv_sleep(1000);
182
183    r = uv_pipe_init(uv_default_loop(), &ctx.channel, 1);
184    ASSERT_OK(r);
185
186    uv_pipe_connect(&ctx.connect_req, &ctx.channel, TEST_PIPENAME_3, connect_cb);
187  } else {
188    spawn_helper(&ctx.channel, &process, "ipc_send_recv_helper");
189
190    connect_cb(&ctx.connect_req, 0);
191  }
192
193  r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
194  ASSERT_OK(r);
195
196  ASSERT_EQ(2, recv_cb_count);
197
198  if (inprocess) {
199    r = uv_thread_join(&tid);
200    ASSERT_OK(r);
201  }
202
203  return 0;
204}
205
206static int run_ipc_send_recv_pipe(int inprocess) {
207  int r;
208
209  ctx.expected_type = UV_NAMED_PIPE;
210
211  r = uv_pipe_init(uv_default_loop(), &ctx.send.pipe, 1);
212  ASSERT_OK(r);
213
214  r = uv_pipe_bind(&ctx.send.pipe, TEST_PIPENAME);
215  ASSERT_OK(r);
216
217  r = uv_pipe_init(uv_default_loop(), &ctx.send2.pipe, 1);
218  ASSERT_OK(r);
219
220  r = uv_pipe_bind(&ctx.send2.pipe, TEST_PIPENAME_2);
221  ASSERT_OK(r);
222
223  r = run_test(inprocess);
224  ASSERT_OK(r);
225
226  MAKE_VALGRIND_HAPPY(uv_default_loop());
227  return 0;
228}
229
230TEST_IMPL(ipc_send_recv_pipe) {
231#if defined(NO_SEND_HANDLE_ON_PIPE)
232  RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
233#endif
234  return run_ipc_send_recv_pipe(0);
235}
236
237TEST_IMPL(ipc_send_recv_pipe_inprocess) {
238#if defined(NO_SEND_HANDLE_ON_PIPE)
239  RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
240#endif
241  return run_ipc_send_recv_pipe(1);
242}
243
244static int run_ipc_send_recv_tcp(int inprocess) {
245  struct sockaddr_in addr;
246  int r;
247
248  ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
249
250  ctx.expected_type = UV_TCP;
251
252  r = uv_tcp_init(uv_default_loop(), &ctx.send.tcp);
253  ASSERT_OK(r);
254
255  r = uv_tcp_init(uv_default_loop(), &ctx.send2.tcp);
256  ASSERT_OK(r);
257
258  r = uv_tcp_bind(&ctx.send.tcp, (const struct sockaddr*) &addr, 0);
259  ASSERT_OK(r);
260
261  r = uv_tcp_bind(&ctx.send2.tcp, (const struct sockaddr*) &addr, 0);
262  ASSERT_OK(r);
263
264  r = run_test(inprocess);
265  ASSERT_OK(r);
266
267  MAKE_VALGRIND_HAPPY(uv_default_loop());
268  return 0;
269}
270
271TEST_IMPL(ipc_send_recv_tcp) {
272#if defined(NO_SEND_HANDLE_ON_PIPE)
273  RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
274#endif
275  return run_ipc_send_recv_tcp(0);
276}
277
278TEST_IMPL(ipc_send_recv_tcp_inprocess) {
279#if defined(NO_SEND_HANDLE_ON_PIPE)
280  RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
281#endif
282  return run_ipc_send_recv_tcp(1);
283}
284
285
286/* Everything here runs in a child process or second thread. */
287
288static void write2_cb(uv_write_t* req, int status) {
289  ASSERT_OK(status);
290
291  /* After two successful writes in the child process, allow the child
292   * process to be closed. */
293  if (++write2_cb_called == 2 && (is_child_process || is_in_process)) {
294    uv_close(&ctx2.recv.handle, NULL);
295    uv_close(&ctx2.recv2.handle, NULL);
296    uv_close((uv_handle_t*)&ctx2.channel, NULL);
297    uv_close((uv_handle_t*)&ctx2.listen, NULL);
298  }
299}
300
301static void read_cb(uv_stream_t* handle,
302                    ssize_t nread,
303                    const uv_buf_t* rdbuf) {
304  uv_buf_t wrbuf;
305  uv_pipe_t* pipe;
306  uv_handle_type pending;
307  int r;
308  union handles* recv;
309  uv_write_t* write_req;
310
311  free(rdbuf->base);
312
313  if (nread == UV_EOF || nread == UV_ECONNABORTED) {
314    return;
315  }
316
317  ASSERT_GE(nread, 0);
318
319  pipe = (uv_pipe_t*) handle;
320  ASSERT_PTR_EQ(pipe, &ctx2.channel);
321
322  while (uv_pipe_pending_count(pipe) > 0) {
323    if (++read_cb_count == 2) {
324      recv = &ctx2.recv;
325      write_req = &ctx2.write_req;
326    } else {
327      recv = &ctx2.recv2;
328      write_req = &ctx2.write_req2;
329    }
330
331    pending = uv_pipe_pending_type(pipe);
332    ASSERT(pending == UV_NAMED_PIPE || pending == UV_TCP);
333
334    if (pending == UV_NAMED_PIPE)
335      r = uv_pipe_init(ctx2.channel.loop, &recv->pipe, 0);
336    else if (pending == UV_TCP)
337      r = uv_tcp_init(ctx2.channel.loop, &recv->tcp);
338    else
339      abort();
340    ASSERT_OK(r);
341
342    r = uv_accept(handle, &recv->stream);
343    ASSERT_OK(r);
344
345    wrbuf = uv_buf_init(".", 1);
346    r = uv_write2(write_req,
347                  (uv_stream_t*)&ctx2.channel,
348                  &wrbuf,
349                  1,
350                  &recv->stream,
351                  write2_cb);
352    ASSERT_OK(r);
353  }
354}
355
356static void send_recv_start(void) {
357  int r;
358  ASSERT_EQ(1, uv_is_readable((uv_stream_t*)&ctx2.channel));
359  ASSERT_EQ(1, uv_is_writable((uv_stream_t*)&ctx2.channel));
360  ASSERT_OK(uv_is_closing((uv_handle_t*)&ctx2.channel));
361
362  r = uv_read_start((uv_stream_t*)&ctx2.channel, alloc_cb, read_cb);
363  ASSERT_OK(r);
364}
365
366static void listen_cb(uv_stream_t* handle, int status) {
367  int r;
368  ASSERT_PTR_EQ(handle, (uv_stream_t*)&ctx2.listen);
369  ASSERT_OK(status);
370
371  r = uv_accept((uv_stream_t*)&ctx2.listen, (uv_stream_t*)&ctx2.channel);
372  ASSERT_OK(r);
373
374  send_recv_start();
375}
376
377int run_ipc_send_recv_helper(uv_loop_t* loop, int inprocess) {
378  int r;
379
380  is_in_process = inprocess;
381
382  memset(&ctx2, 0, sizeof(ctx2));
383
384  r = uv_pipe_init(loop, &ctx2.listen, 0);
385  ASSERT_OK(r);
386
387  r = uv_pipe_init(loop, &ctx2.channel, 1);
388  ASSERT_OK(r);
389
390  if (inprocess) {
391    r = uv_pipe_bind(&ctx2.listen, TEST_PIPENAME_3);
392    ASSERT_OK(r);
393
394    r = uv_listen((uv_stream_t*)&ctx2.listen, SOMAXCONN, listen_cb);
395    ASSERT_OK(r);
396  } else {
397    r = uv_pipe_open(&ctx2.channel, 0);
398    ASSERT_OK(r);
399
400    send_recv_start();
401  }
402
403  notify_parent_process();
404  r = uv_run(loop, UV_RUN_DEFAULT);
405  ASSERT_OK(r);
406
407  return 0;
408}
409
410/* stdin is a duplex channel over which a handle is sent.
411 * We receive it and send it back where it came from.
412 */
413int ipc_send_recv_helper(void) {
414  int r;
415
416  r = run_ipc_send_recv_helper(uv_default_loop(), 0);
417  ASSERT_OK(r);
418
419  MAKE_VALGRIND_HAPPY(uv_default_loop());
420  return 0;
421}
422
423void ipc_send_recv_helper_threadproc(void* arg) {
424  int r;
425  uv_loop_t loop;
426
427  r = uv_loop_init(&loop);
428  ASSERT_OK(r);
429
430  r = run_ipc_send_recv_helper(&loop, 1);
431  ASSERT_OK(r);
432
433  r = uv_loop_close(&loop);
434  ASSERT_OK(r);
435}
436