xref: /third_party/node/deps/uv/src/win/pipe.c (revision 1cb0ef41)
1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include <assert.h>
23#include <io.h>
24#include <stdio.h>
25#include <stdlib.h>
26#include <string.h>
27
28#include "handle-inl.h"
29#include "internal.h"
30#include "req-inl.h"
31#include "stream-inl.h"
32#include "uv-common.h"
33#include "uv.h"
34
35#include <aclapi.h>
36#include <accctrl.h>
37
38/* A zero-size buffer for use by uv_pipe_read */
39static char uv_zero_[] = "";
40
41/* Null uv_buf_t */
42static const uv_buf_t uv_null_buf_ = { 0, NULL };
43
44/* The timeout that the pipe will wait for the remote end to write data when
45 * the local ends wants to shut it down. */
46static const int64_t eof_timeout = 50; /* ms */
47
48static const int default_pending_pipe_instances = 4;
49
50/* Pipe prefix */
51static char pipe_prefix[] = "\\\\?\\pipe";
52static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
53
54/* IPC incoming xfer queue item. */
55typedef struct {
56  uv__ipc_socket_xfer_type_t xfer_type;
57  uv__ipc_socket_xfer_info_t xfer_info;
58  QUEUE member;
59} uv__ipc_xfer_queue_item_t;
60
61/* IPC frame header flags. */
62/* clang-format off */
63enum {
64  UV__IPC_FRAME_HAS_DATA                = 0x01,
65  UV__IPC_FRAME_HAS_SOCKET_XFER         = 0x02,
66  UV__IPC_FRAME_XFER_IS_TCP_CONNECTION  = 0x04,
67  /* These are combinations of the flags above. */
68  UV__IPC_FRAME_XFER_FLAGS              = 0x06,
69  UV__IPC_FRAME_VALID_FLAGS             = 0x07
70};
71/* clang-format on */
72
73/* IPC frame header. */
74typedef struct {
75  uint32_t flags;
76  uint32_t reserved1;   /* Ignored. */
77  uint32_t data_length; /* Must be zero if there is no data. */
78  uint32_t reserved2;   /* Must be zero. */
79} uv__ipc_frame_header_t;
80
81/* To implement the IPC protocol correctly, these structures must have exactly
82 * the right size. */
83STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
84STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
85
86/* Coalesced write request. */
87typedef struct {
88  uv_write_t req;       /* Internal heap-allocated write request. */
89  uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
90} uv__coalesced_write_t;
91
92
93static void eof_timer_init(uv_pipe_t* pipe);
94static void eof_timer_start(uv_pipe_t* pipe);
95static void eof_timer_stop(uv_pipe_t* pipe);
96static void eof_timer_cb(uv_timer_t* timer);
97static void eof_timer_destroy(uv_pipe_t* pipe);
98static void eof_timer_close_cb(uv_handle_t* handle);
99
100
101static void uv__unique_pipe_name(char* ptr, char* name, size_t size) {
102  snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
103}
104
105
106int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
107  uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
108
109  handle->reqs_pending = 0;
110  handle->handle = INVALID_HANDLE_VALUE;
111  handle->name = NULL;
112  handle->pipe.conn.ipc_remote_pid = 0;
113  handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
114  QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
115  handle->pipe.conn.ipc_xfer_queue_length = 0;
116  handle->ipc = ipc;
117  handle->pipe.conn.non_overlapped_writes_tail = NULL;
118
119  return 0;
120}
121
122
123static void uv__pipe_connection_init(uv_pipe_t* handle) {
124  assert(!(handle->flags & UV_HANDLE_PIPESERVER));
125  uv__connection_init((uv_stream_t*) handle);
126  handle->read_req.data = handle;
127  handle->pipe.conn.eof_timer = NULL;
128}
129
130
131static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
132  HANDLE pipeHandle;
133
134  /*
135   * Assume that we have a duplex pipe first, so attempt to
136   * connect with GENERIC_READ | GENERIC_WRITE.
137   */
138  pipeHandle = CreateFileW(name,
139                           GENERIC_READ | GENERIC_WRITE,
140                           0,
141                           NULL,
142                           OPEN_EXISTING,
143                           FILE_FLAG_OVERLAPPED,
144                           NULL);
145  if (pipeHandle != INVALID_HANDLE_VALUE) {
146    *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
147    return pipeHandle;
148  }
149
150  /*
151   * If the pipe is not duplex CreateFileW fails with
152   * ERROR_ACCESS_DENIED.  In that case try to connect
153   * as a read-only or write-only.
154   */
155  if (GetLastError() == ERROR_ACCESS_DENIED) {
156    pipeHandle = CreateFileW(name,
157                             GENERIC_READ | FILE_WRITE_ATTRIBUTES,
158                             0,
159                             NULL,
160                             OPEN_EXISTING,
161                             FILE_FLAG_OVERLAPPED,
162                             NULL);
163
164    if (pipeHandle != INVALID_HANDLE_VALUE) {
165      *duplex_flags = UV_HANDLE_READABLE;
166      return pipeHandle;
167    }
168  }
169
170  if (GetLastError() == ERROR_ACCESS_DENIED) {
171    pipeHandle = CreateFileW(name,
172                             GENERIC_WRITE | FILE_READ_ATTRIBUTES,
173                             0,
174                             NULL,
175                             OPEN_EXISTING,
176                             FILE_FLAG_OVERLAPPED,
177                             NULL);
178
179    if (pipeHandle != INVALID_HANDLE_VALUE) {
180      *duplex_flags = UV_HANDLE_WRITABLE;
181      return pipeHandle;
182    }
183  }
184
185  return INVALID_HANDLE_VALUE;
186}
187
188
189static void close_pipe(uv_pipe_t* pipe) {
190  assert(pipe->u.fd == -1 || pipe->u.fd > 2);
191  if (pipe->u.fd == -1)
192    CloseHandle(pipe->handle);
193  else
194    close(pipe->u.fd);
195
196  pipe->u.fd = -1;
197  pipe->handle = INVALID_HANDLE_VALUE;
198}
199
200
201static int uv__pipe_server(
202    HANDLE* pipeHandle_ptr, DWORD access,
203    char* name, size_t nameSize, char* random) {
204  HANDLE pipeHandle;
205  int err;
206
207  for (;;) {
208    uv__unique_pipe_name(random, name, nameSize);
209
210    pipeHandle = CreateNamedPipeA(name,
211      access | FILE_FLAG_FIRST_PIPE_INSTANCE,
212      PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
213      NULL);
214
215    if (pipeHandle != INVALID_HANDLE_VALUE) {
216      /* No name collisions.  We're done. */
217      break;
218    }
219
220    err = GetLastError();
221    if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
222      goto error;
223    }
224
225    /* Pipe name collision.  Increment the random number and try again. */
226    random++;
227  }
228
229  *pipeHandle_ptr = pipeHandle;
230
231  return 0;
232
233 error:
234  if (pipeHandle != INVALID_HANDLE_VALUE)
235    CloseHandle(pipeHandle);
236
237  return err;
238}
239
240
241static int uv__create_pipe_pair(
242    HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
243    unsigned int server_flags, unsigned int client_flags,
244    int inherit_client, char* random) {
245  /* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
246  char pipe_name[64];
247  SECURITY_ATTRIBUTES sa;
248  DWORD server_access;
249  DWORD client_access;
250  HANDLE server_pipe;
251  HANDLE client_pipe;
252  int err;
253
254  server_pipe = INVALID_HANDLE_VALUE;
255  client_pipe = INVALID_HANDLE_VALUE;
256
257  server_access = 0;
258  if (server_flags & UV_READABLE_PIPE)
259    server_access |= PIPE_ACCESS_INBOUND;
260  if (server_flags & UV_WRITABLE_PIPE)
261    server_access |= PIPE_ACCESS_OUTBOUND;
262  if (server_flags & UV_NONBLOCK_PIPE)
263    server_access |= FILE_FLAG_OVERLAPPED;
264  server_access |= WRITE_DAC;
265
266  client_access = 0;
267  if (client_flags & UV_READABLE_PIPE)
268    client_access |= GENERIC_READ;
269  else
270    client_access |= FILE_READ_ATTRIBUTES;
271  if (client_flags & UV_WRITABLE_PIPE)
272    client_access |= GENERIC_WRITE;
273  else
274    client_access |= FILE_WRITE_ATTRIBUTES;
275  client_access |= WRITE_DAC;
276
277  /* Create server pipe handle. */
278  err = uv__pipe_server(&server_pipe,
279                        server_access,
280                        pipe_name,
281                        sizeof(pipe_name),
282                        random);
283  if (err)
284    goto error;
285
286  /* Create client pipe handle. */
287  sa.nLength = sizeof sa;
288  sa.lpSecurityDescriptor = NULL;
289  sa.bInheritHandle = inherit_client;
290
291  client_pipe = CreateFileA(pipe_name,
292                            client_access,
293                            0,
294                            &sa,
295                            OPEN_EXISTING,
296                            (client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
297                            NULL);
298  if (client_pipe == INVALID_HANDLE_VALUE) {
299    err = GetLastError();
300    goto error;
301  }
302
303#ifndef NDEBUG
304  /* Validate that the pipe was opened in the right mode. */
305  {
306    DWORD mode;
307    BOOL r;
308    r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
309    if (r == TRUE) {
310      assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
311    } else {
312      fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
313    }
314  }
315#endif
316
317  /* Do a blocking ConnectNamedPipe.  This should not block because we have
318   * both ends of the pipe created. */
319  if (!ConnectNamedPipe(server_pipe, NULL)) {
320    if (GetLastError() != ERROR_PIPE_CONNECTED) {
321      err = GetLastError();
322      goto error;
323    }
324  }
325
326  *client_pipe_ptr = client_pipe;
327  *server_pipe_ptr = server_pipe;
328  return 0;
329
330 error:
331  if (server_pipe != INVALID_HANDLE_VALUE)
332    CloseHandle(server_pipe);
333
334  if (client_pipe != INVALID_HANDLE_VALUE)
335    CloseHandle(client_pipe);
336
337  return err;
338}
339
340
341int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
342  uv_file temp[2];
343  int err;
344  HANDLE readh;
345  HANDLE writeh;
346
347  /* Make the server side the inbound (read) end, */
348  /* so that both ends will have FILE_READ_ATTRIBUTES permission. */
349  /* TODO: better source of local randomness than &fds? */
350  read_flags |= UV_READABLE_PIPE;
351  write_flags |= UV_WRITABLE_PIPE;
352  err = uv__create_pipe_pair(&readh, &writeh, read_flags, write_flags, 0, (char*) &fds[0]);
353  if (err != 0)
354    return err;
355  temp[0] = _open_osfhandle((intptr_t) readh, 0);
356  if (temp[0] == -1) {
357    if (errno == UV_EMFILE)
358      err = UV_EMFILE;
359    else
360      err = UV_UNKNOWN;
361    CloseHandle(readh);
362    CloseHandle(writeh);
363    return err;
364  }
365  temp[1] = _open_osfhandle((intptr_t) writeh, 0);
366  if (temp[1] == -1) {
367    if (errno == UV_EMFILE)
368      err = UV_EMFILE;
369    else
370      err = UV_UNKNOWN;
371    _close(temp[0]);
372    CloseHandle(writeh);
373    return err;
374  }
375  fds[0] = temp[0];
376  fds[1] = temp[1];
377  return 0;
378}
379
380
381int uv__create_stdio_pipe_pair(uv_loop_t* loop,
382    uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
383  /* The parent_pipe is always the server_pipe and kept by libuv.
384   * The child_pipe is always the client_pipe and is passed to the child.
385   * The flags are specified with respect to their usage in the child. */
386  HANDLE server_pipe;
387  HANDLE client_pipe;
388  unsigned int server_flags;
389  unsigned int client_flags;
390  int err;
391
392  uv__pipe_connection_init(parent_pipe);
393
394  server_pipe = INVALID_HANDLE_VALUE;
395  client_pipe = INVALID_HANDLE_VALUE;
396
397  server_flags = 0;
398  client_flags = 0;
399  if (flags & UV_READABLE_PIPE) {
400    /* The server needs inbound (read) access too, otherwise CreateNamedPipe()
401     * won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
402     * the state of the write buffer when we're trying to shutdown the pipe. */
403    server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
404    client_flags |= UV_READABLE_PIPE;
405  }
406  if (flags & UV_WRITABLE_PIPE) {
407    server_flags |= UV_READABLE_PIPE;
408    client_flags |= UV_WRITABLE_PIPE;
409  }
410  server_flags |= UV_NONBLOCK_PIPE;
411  if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
412    client_flags |= UV_NONBLOCK_PIPE;
413  }
414
415  err = uv__create_pipe_pair(&server_pipe, &client_pipe,
416          server_flags, client_flags, 1, (char*) server_pipe);
417  if (err)
418    goto error;
419
420  if (CreateIoCompletionPort(server_pipe,
421                             loop->iocp,
422                             (ULONG_PTR) parent_pipe,
423                             0) == NULL) {
424    err = GetLastError();
425    goto error;
426  }
427
428  parent_pipe->handle = server_pipe;
429  *child_pipe_ptr = client_pipe;
430
431  /* The server end is now readable and/or writable. */
432  if (flags & UV_READABLE_PIPE)
433    parent_pipe->flags |= UV_HANDLE_WRITABLE;
434  if (flags & UV_WRITABLE_PIPE)
435    parent_pipe->flags |= UV_HANDLE_READABLE;
436
437  return 0;
438
439 error:
440  if (server_pipe != INVALID_HANDLE_VALUE)
441    CloseHandle(server_pipe);
442
443  if (client_pipe != INVALID_HANDLE_VALUE)
444    CloseHandle(client_pipe);
445
446  return err;
447}
448
449
450static int uv__set_pipe_handle(uv_loop_t* loop,
451                               uv_pipe_t* handle,
452                               HANDLE pipeHandle,
453                               int fd,
454                               DWORD duplex_flags) {
455  NTSTATUS nt_status;
456  IO_STATUS_BLOCK io_status;
457  FILE_MODE_INFORMATION mode_info;
458  DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
459  DWORD current_mode = 0;
460  DWORD err = 0;
461
462  assert(handle->flags & UV_HANDLE_CONNECTION);
463  assert(!(handle->flags & UV_HANDLE_PIPESERVER));
464  if (handle->flags & UV_HANDLE_CLOSING)
465    return UV_EINVAL;
466  if (handle->handle != INVALID_HANDLE_VALUE)
467    return UV_EBUSY;
468
469  if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
470    err = GetLastError();
471    if (err == ERROR_ACCESS_DENIED) {
472      /*
473       * SetNamedPipeHandleState can fail if the handle doesn't have either
474       * GENERIC_WRITE  or FILE_WRITE_ATTRIBUTES.
475       * But if the handle already has the desired wait and blocking modes
476       * we can continue.
477       */
478      if (!GetNamedPipeHandleState(pipeHandle, &current_mode, NULL, NULL,
479                                   NULL, NULL, 0)) {
480        return uv_translate_sys_error(GetLastError());
481      } else if (current_mode & PIPE_NOWAIT) {
482        return UV_EACCES;
483      }
484    } else {
485      /* If this returns ERROR_INVALID_PARAMETER we probably opened
486       * something that is not a pipe. */
487      if (err == ERROR_INVALID_PARAMETER) {
488        return UV_ENOTSOCK;
489      }
490      return uv_translate_sys_error(err);
491    }
492  }
493
494  /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
495  nt_status = pNtQueryInformationFile(pipeHandle,
496                                      &io_status,
497                                      &mode_info,
498                                      sizeof(mode_info),
499                                      FileModeInformation);
500  if (nt_status != STATUS_SUCCESS) {
501    return uv_translate_sys_error(err);
502  }
503
504  if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
505      mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
506    /* Non-overlapped pipe. */
507    handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
508    handle->pipe.conn.readfile_thread_handle = NULL;
509    InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
510  } else {
511    /* Overlapped pipe.  Try to associate with IOCP. */
512    if (CreateIoCompletionPort(pipeHandle,
513                               loop->iocp,
514                               (ULONG_PTR) handle,
515                               0) == NULL) {
516      handle->flags |= UV_HANDLE_EMULATE_IOCP;
517    }
518  }
519
520  handle->handle = pipeHandle;
521  handle->u.fd = fd;
522  handle->flags |= duplex_flags;
523
524  return 0;
525}
526
527
528static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
529                             uv_pipe_accept_t* req, BOOL firstInstance) {
530  assert(req->pipeHandle == INVALID_HANDLE_VALUE);
531
532  req->pipeHandle =
533      CreateNamedPipeW(handle->name,
534                       PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
535                         (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
536                       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
537                       PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
538
539  if (req->pipeHandle == INVALID_HANDLE_VALUE) {
540    return 0;
541  }
542
543  /* Associate it with IOCP so we can get events. */
544  if (CreateIoCompletionPort(req->pipeHandle,
545                             loop->iocp,
546                             (ULONG_PTR) handle,
547                             0) == NULL) {
548    uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
549  }
550
551  /* Stash a handle in the server object for use from places such as
552   * getsockname and chmod. As we transfer ownership of these to client
553   * objects, we'll allocate new ones here. */
554  handle->handle = req->pipeHandle;
555
556  return 1;
557}
558
559
560static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
561  uv_loop_t* loop;
562  uv_pipe_t* handle;
563  uv_shutdown_t* req;
564
565  req = (uv_shutdown_t*) parameter;
566  assert(req);
567  handle = (uv_pipe_t*) req->handle;
568  assert(handle);
569  loop = handle->loop;
570  assert(loop);
571
572  FlushFileBuffers(handle->handle);
573
574  /* Post completed */
575  POST_COMPLETION_FOR_REQ(loop, req);
576
577  return 0;
578}
579
580
581void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
582  DWORD result;
583  NTSTATUS nt_status;
584  IO_STATUS_BLOCK io_status;
585  FILE_PIPE_LOCAL_INFORMATION pipe_info;
586
587  assert(handle->flags & UV_HANDLE_CONNECTION);
588  assert(req != NULL);
589  assert(handle->stream.conn.write_reqs_pending == 0);
590  SET_REQ_SUCCESS(req);
591
592  if (handle->flags & UV_HANDLE_CLOSING) {
593    uv__insert_pending_req(loop, (uv_req_t*) req);
594    return;
595  }
596
597  /* Try to avoid flushing the pipe buffer in the thread pool. */
598  nt_status = pNtQueryInformationFile(handle->handle,
599                                      &io_status,
600                                      &pipe_info,
601                                      sizeof pipe_info,
602                                      FilePipeLocalInformation);
603
604  if (nt_status != STATUS_SUCCESS) {
605    SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
606    handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
607    uv__insert_pending_req(loop, (uv_req_t*) req);
608    return;
609  }
610
611  if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
612    /* Short-circuit, no need to call FlushFileBuffers:
613     * all writes have been read. */
614    uv__insert_pending_req(loop, (uv_req_t*) req);
615    return;
616  }
617
618  /* Run FlushFileBuffers in the thread pool. */
619  result = QueueUserWorkItem(pipe_shutdown_thread_proc,
620                             req,
621                             WT_EXECUTELONGFUNCTION);
622  if (!result) {
623    SET_REQ_ERROR(req, GetLastError());
624    handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
625    uv__insert_pending_req(loop, (uv_req_t*) req);
626    return;
627  }
628}
629
630
631void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
632  uv__ipc_xfer_queue_item_t* xfer_queue_item;
633
634  assert(handle->reqs_pending == 0);
635  assert(handle->flags & UV_HANDLE_CLOSING);
636  assert(!(handle->flags & UV_HANDLE_CLOSED));
637
638  if (handle->flags & UV_HANDLE_CONNECTION) {
639    /* Free pending sockets */
640    while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
641      QUEUE* q;
642      SOCKET socket;
643
644      q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
645      QUEUE_REMOVE(q);
646      xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
647
648      /* Materialize socket and close it */
649      socket = WSASocketW(FROM_PROTOCOL_INFO,
650                          FROM_PROTOCOL_INFO,
651                          FROM_PROTOCOL_INFO,
652                          &xfer_queue_item->xfer_info.socket_info,
653                          0,
654                          WSA_FLAG_OVERLAPPED);
655      uv__free(xfer_queue_item);
656
657      if (socket != INVALID_SOCKET)
658        closesocket(socket);
659    }
660    handle->pipe.conn.ipc_xfer_queue_length = 0;
661
662    if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
663      if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
664        UnregisterWait(handle->read_req.wait_handle);
665        handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
666      }
667      if (handle->read_req.event_handle != NULL) {
668        CloseHandle(handle->read_req.event_handle);
669        handle->read_req.event_handle = NULL;
670      }
671    }
672
673    if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
674      DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
675  }
676
677  if (handle->flags & UV_HANDLE_PIPESERVER) {
678    assert(handle->pipe.serv.accept_reqs);
679    uv__free(handle->pipe.serv.accept_reqs);
680    handle->pipe.serv.accept_reqs = NULL;
681  }
682
683  uv__handle_close(handle);
684}
685
686
687void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
688  if (handle->flags & UV_HANDLE_BOUND)
689    return;
690  handle->pipe.serv.pending_instances = count;
691  handle->flags |= UV_HANDLE_PIPESERVER;
692}
693
694
695/* Creates a pipe server. */
696int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
697  uv_loop_t* loop = handle->loop;
698  int i, err, nameSize;
699  uv_pipe_accept_t* req;
700
701  if (handle->flags & UV_HANDLE_BOUND) {
702    return UV_EINVAL;
703  }
704
705  if (!name) {
706    return UV_EINVAL;
707  }
708  if (uv__is_closing(handle)) {
709    return UV_EINVAL;
710  }
711  if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
712    handle->pipe.serv.pending_instances = default_pending_pipe_instances;
713  }
714
715  handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
716    uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
717  if (!handle->pipe.serv.accept_reqs) {
718    uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
719  }
720
721  for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
722    req = &handle->pipe.serv.accept_reqs[i];
723    UV_REQ_INIT(req, UV_ACCEPT);
724    req->data = handle;
725    req->pipeHandle = INVALID_HANDLE_VALUE;
726    req->next_pending = NULL;
727  }
728
729  /* Convert name to UTF16. */
730  nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
731  handle->name = uv__malloc(nameSize);
732  if (!handle->name) {
733    uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
734  }
735
736  if (!MultiByteToWideChar(CP_UTF8,
737                           0,
738                           name,
739                           -1,
740                           handle->name,
741                           nameSize / sizeof(WCHAR))) {
742    err = GetLastError();
743    goto error;
744  }
745
746  /*
747   * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
748   * If this fails then there's already a pipe server for the given pipe name.
749   */
750  if (!pipe_alloc_accept(loop,
751                         handle,
752                         &handle->pipe.serv.accept_reqs[0],
753                         TRUE)) {
754    err = GetLastError();
755    if (err == ERROR_ACCESS_DENIED) {
756      err = WSAEADDRINUSE;  /* Translates to UV_EADDRINUSE. */
757    } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
758      err = WSAEACCES;  /* Translates to UV_EACCES. */
759    }
760    goto error;
761  }
762
763  handle->pipe.serv.pending_accepts = NULL;
764  handle->flags |= UV_HANDLE_PIPESERVER;
765  handle->flags |= UV_HANDLE_BOUND;
766
767  return 0;
768
769error:
770  if (handle->name) {
771    uv__free(handle->name);
772    handle->name = NULL;
773  }
774
775  return uv_translate_sys_error(err);
776}
777
778
779static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
780  uv_loop_t* loop;
781  uv_pipe_t* handle;
782  uv_connect_t* req;
783  HANDLE pipeHandle = INVALID_HANDLE_VALUE;
784  DWORD duplex_flags;
785
786  req = (uv_connect_t*) parameter;
787  assert(req);
788  handle = (uv_pipe_t*) req->handle;
789  assert(handle);
790  loop = handle->loop;
791  assert(loop);
792
793  /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
794   * up to 30 seconds for the pipe to become available with WaitNamedPipe. */
795  while (WaitNamedPipeW(handle->name, 30000)) {
796    /* The pipe is now available, try to connect. */
797    pipeHandle = open_named_pipe(handle->name, &duplex_flags);
798    if (pipeHandle != INVALID_HANDLE_VALUE)
799      break;
800
801    SwitchToThread();
802  }
803
804  if (pipeHandle != INVALID_HANDLE_VALUE) {
805    SET_REQ_SUCCESS(req);
806    req->u.connect.pipeHandle = pipeHandle;
807    req->u.connect.duplex_flags = duplex_flags;
808  } else {
809    SET_REQ_ERROR(req, GetLastError());
810  }
811
812  /* Post completed */
813  POST_COMPLETION_FOR_REQ(loop, req);
814
815  return 0;
816}
817
818
819void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
820    const char* name, uv_connect_cb cb) {
821  uv_loop_t* loop = handle->loop;
822  int err, nameSize;
823  HANDLE pipeHandle = INVALID_HANDLE_VALUE;
824  DWORD duplex_flags;
825
826  UV_REQ_INIT(req, UV_CONNECT);
827  req->handle = (uv_stream_t*) handle;
828  req->cb = cb;
829  req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
830  req->u.connect.duplex_flags = 0;
831
832  if (handle->flags & UV_HANDLE_PIPESERVER) {
833    err = ERROR_INVALID_PARAMETER;
834    goto error;
835  }
836  if (handle->flags & UV_HANDLE_CONNECTION) {
837    err = ERROR_PIPE_BUSY;
838    goto error;
839  }
840  uv__pipe_connection_init(handle);
841
842  /* Convert name to UTF16. */
843  nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
844  handle->name = uv__malloc(nameSize);
845  if (!handle->name) {
846    uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
847  }
848
849  if (!MultiByteToWideChar(CP_UTF8,
850                           0,
851                           name,
852                           -1,
853                           handle->name,
854                           nameSize / sizeof(WCHAR))) {
855    err = GetLastError();
856    goto error;
857  }
858
859  pipeHandle = open_named_pipe(handle->name, &duplex_flags);
860  if (pipeHandle == INVALID_HANDLE_VALUE) {
861    if (GetLastError() == ERROR_PIPE_BUSY) {
862      /* Wait for the server to make a pipe instance available. */
863      if (!QueueUserWorkItem(&pipe_connect_thread_proc,
864                             req,
865                             WT_EXECUTELONGFUNCTION)) {
866        err = GetLastError();
867        goto error;
868      }
869
870      REGISTER_HANDLE_REQ(loop, handle, req);
871      handle->reqs_pending++;
872
873      return;
874    }
875
876    err = GetLastError();
877    goto error;
878  }
879
880  req->u.connect.pipeHandle = pipeHandle;
881  req->u.connect.duplex_flags = duplex_flags;
882  SET_REQ_SUCCESS(req);
883  uv__insert_pending_req(loop, (uv_req_t*) req);
884  handle->reqs_pending++;
885  REGISTER_HANDLE_REQ(loop, handle, req);
886  return;
887
888error:
889  if (handle->name) {
890    uv__free(handle->name);
891    handle->name = NULL;
892  }
893
894  if (pipeHandle != INVALID_HANDLE_VALUE)
895    CloseHandle(pipeHandle);
896
897  /* Make this req pending reporting an error. */
898  SET_REQ_ERROR(req, err);
899  uv__insert_pending_req(loop, (uv_req_t*) req);
900  handle->reqs_pending++;
901  REGISTER_HANDLE_REQ(loop, handle, req);
902  return;
903}
904
905
906void uv__pipe_interrupt_read(uv_pipe_t* handle) {
907  BOOL r;
908
909  if (!(handle->flags & UV_HANDLE_READ_PENDING))
910    return; /* No pending reads. */
911  if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
912    return; /* Already cancelled. */
913  if (handle->handle == INVALID_HANDLE_VALUE)
914    return; /* Pipe handle closed. */
915
916  if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
917    /* Cancel asynchronous read. */
918    r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
919    assert(r || GetLastError() == ERROR_NOT_FOUND);
920    (void) r;
921  } else {
922    /* Cancel synchronous read (which is happening in the thread pool). */
923    HANDLE thread;
924    volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
925
926    EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
927
928    thread = *thread_ptr;
929    if (thread == NULL) {
930      /* The thread pool thread has not yet reached the point of blocking, we
931       * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
932      *thread_ptr = INVALID_HANDLE_VALUE;
933
934    } else {
935      /* Spin until the thread has acknowledged (by setting the thread to
936       * INVALID_HANDLE_VALUE) that it is past the point of blocking. */
937      while (thread != INVALID_HANDLE_VALUE) {
938        r = CancelSynchronousIo(thread);
939        assert(r || GetLastError() == ERROR_NOT_FOUND);
940        SwitchToThread(); /* Yield thread. */
941        thread = *thread_ptr;
942      }
943    }
944
945    LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
946  }
947
948  /* Set flag to indicate that read has been cancelled. */
949  handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
950}
951
952
953void uv__pipe_read_stop(uv_pipe_t* handle) {
954  handle->flags &= ~UV_HANDLE_READING;
955  DECREASE_ACTIVE_COUNT(handle->loop, handle);
956  uv__pipe_interrupt_read(handle);
957}
958
959
960/* Cleans up uv_pipe_t (server or connection) and all resources associated with
961 * it. */
962void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
963  int i;
964  HANDLE pipeHandle;
965
966  if (handle->flags & UV_HANDLE_READING) {
967    handle->flags &= ~UV_HANDLE_READING;
968    DECREASE_ACTIVE_COUNT(loop, handle);
969  }
970
971  if (handle->flags & UV_HANDLE_LISTENING) {
972    handle->flags &= ~UV_HANDLE_LISTENING;
973    DECREASE_ACTIVE_COUNT(loop, handle);
974  }
975
976  handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
977
978  uv__handle_closing(handle);
979
980  uv__pipe_interrupt_read(handle);
981
982  if (handle->name) {
983    uv__free(handle->name);
984    handle->name = NULL;
985  }
986
987  if (handle->flags & UV_HANDLE_PIPESERVER) {
988    for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
989      pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
990      if (pipeHandle != INVALID_HANDLE_VALUE) {
991        CloseHandle(pipeHandle);
992        handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
993      }
994    }
995    handle->handle = INVALID_HANDLE_VALUE;
996  }
997
998  if (handle->flags & UV_HANDLE_CONNECTION) {
999    eof_timer_destroy(handle);
1000  }
1001
1002  if ((handle->flags & UV_HANDLE_CONNECTION)
1003      && handle->handle != INVALID_HANDLE_VALUE) {
1004    /* This will eventually destroy the write queue for us too. */
1005    close_pipe(handle);
1006  }
1007
1008  if (handle->reqs_pending == 0)
1009    uv__want_endgame(loop, (uv_handle_t*) handle);
1010}
1011
1012
1013static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
1014    uv_pipe_accept_t* req, BOOL firstInstance) {
1015  assert(handle->flags & UV_HANDLE_LISTENING);
1016
1017  if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
1018    SET_REQ_ERROR(req, GetLastError());
1019    uv__insert_pending_req(loop, (uv_req_t*) req);
1020    handle->reqs_pending++;
1021    return;
1022  }
1023
1024  assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1025
1026  /* Prepare the overlapped structure. */
1027  memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
1028
1029  if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
1030      GetLastError() != ERROR_IO_PENDING) {
1031    if (GetLastError() == ERROR_PIPE_CONNECTED) {
1032      SET_REQ_SUCCESS(req);
1033    } else {
1034      CloseHandle(req->pipeHandle);
1035      req->pipeHandle = INVALID_HANDLE_VALUE;
1036      /* Make this req pending reporting an error. */
1037      SET_REQ_ERROR(req, GetLastError());
1038    }
1039    uv__insert_pending_req(loop, (uv_req_t*) req);
1040    handle->reqs_pending++;
1041    return;
1042  }
1043
1044  /* Wait for completion via IOCP */
1045  handle->reqs_pending++;
1046}
1047
1048
1049int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
1050  uv_loop_t* loop = server->loop;
1051  uv_pipe_t* pipe_client;
1052  uv_pipe_accept_t* req;
1053  QUEUE* q;
1054  uv__ipc_xfer_queue_item_t* item;
1055  int err;
1056
1057  if (server->ipc) {
1058    if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) {
1059      /* No valid pending sockets. */
1060      return WSAEWOULDBLOCK;
1061    }
1062
1063    q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue);
1064    QUEUE_REMOVE(q);
1065    server->pipe.conn.ipc_xfer_queue_length--;
1066    item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
1067
1068    err = uv__tcp_xfer_import(
1069        (uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
1070    if (err != 0)
1071      return err;
1072
1073    uv__free(item);
1074
1075  } else {
1076    pipe_client = (uv_pipe_t*) client;
1077    uv__pipe_connection_init(pipe_client);
1078
1079    /* Find a connection instance that has been connected, but not yet
1080     * accepted. */
1081    req = server->pipe.serv.pending_accepts;
1082
1083    if (!req) {
1084      /* No valid connections found, so we error out. */
1085      return WSAEWOULDBLOCK;
1086    }
1087
1088    /* Initialize the client handle and copy the pipeHandle to the client */
1089    pipe_client->handle = req->pipeHandle;
1090    pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1091
1092    /* Prepare the req to pick up a new connection */
1093    server->pipe.serv.pending_accepts = req->next_pending;
1094    req->next_pending = NULL;
1095    req->pipeHandle = INVALID_HANDLE_VALUE;
1096
1097    server->handle = INVALID_HANDLE_VALUE;
1098    if (!(server->flags & UV_HANDLE_CLOSING)) {
1099      uv__pipe_queue_accept(loop, server, req, FALSE);
1100    }
1101  }
1102
1103  return 0;
1104}
1105
1106
1107/* Starts listening for connections for the given pipe. */
1108int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
1109  uv_loop_t* loop = handle->loop;
1110  int i;
1111
1112  if (handle->flags & UV_HANDLE_LISTENING) {
1113    handle->stream.serv.connection_cb = cb;
1114  }
1115
1116  if (!(handle->flags & UV_HANDLE_BOUND)) {
1117    return WSAEINVAL;
1118  }
1119
1120  if (handle->flags & UV_HANDLE_READING) {
1121    return WSAEISCONN;
1122  }
1123
1124  if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
1125    return ERROR_NOT_SUPPORTED;
1126  }
1127
1128  if (handle->ipc) {
1129    return WSAEINVAL;
1130  }
1131
1132  handle->flags |= UV_HANDLE_LISTENING;
1133  INCREASE_ACTIVE_COUNT(loop, handle);
1134  handle->stream.serv.connection_cb = cb;
1135
1136  /* First pipe handle should have already been created in uv_pipe_bind */
1137  assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
1138
1139  for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1140    uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
1141  }
1142
1143  return 0;
1144}
1145
1146
1147static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
1148  uv_read_t* req = (uv_read_t*) arg;
1149  uv_pipe_t* handle = (uv_pipe_t*) req->data;
1150  uv_loop_t* loop = handle->loop;
1151  volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1152  CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
1153  HANDLE thread;
1154  DWORD bytes;
1155  DWORD err;
1156
1157  assert(req->type == UV_READ);
1158  assert(handle->type == UV_NAMED_PIPE);
1159
1160  err = 0;
1161
1162  /* Create a handle to the current thread. */
1163  if (!DuplicateHandle(GetCurrentProcess(),
1164                       GetCurrentThread(),
1165                       GetCurrentProcess(),
1166                       &thread,
1167                       0,
1168                       FALSE,
1169                       DUPLICATE_SAME_ACCESS)) {
1170    err = GetLastError();
1171    goto out1;
1172  }
1173
1174  /* The lock needs to be held when thread handle is modified. */
1175  EnterCriticalSection(lock);
1176  if (*thread_ptr == INVALID_HANDLE_VALUE) {
1177    /* uv__pipe_interrupt_read() cancelled reading before we got here. */
1178    err = ERROR_OPERATION_ABORTED;
1179  } else {
1180    /* Let main thread know which worker thread is doing the blocking read. */
1181    assert(*thread_ptr == NULL);
1182    *thread_ptr = thread;
1183  }
1184  LeaveCriticalSection(lock);
1185
1186  if (err)
1187    goto out2;
1188
1189  /* Block the thread until data is available on the pipe, or the read is
1190   * cancelled. */
1191  if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
1192    err = GetLastError();
1193
1194  /* Let the main thread know the worker is past the point of blocking. */
1195  assert(thread == *thread_ptr);
1196  *thread_ptr = INVALID_HANDLE_VALUE;
1197
1198  /* Briefly acquire the mutex. Since the main thread holds the lock while it
1199   * is spinning trying to cancel this thread's I/O, we will block here until
1200   * it stops doing that. */
1201  EnterCriticalSection(lock);
1202  LeaveCriticalSection(lock);
1203
1204out2:
1205  /* Close the handle to the current thread. */
1206  CloseHandle(thread);
1207
1208out1:
1209  /* Set request status and post a completion record to the IOCP. */
1210  if (err)
1211    SET_REQ_ERROR(req, err);
1212  else
1213    SET_REQ_SUCCESS(req);
1214  POST_COMPLETION_FOR_REQ(loop, req);
1215
1216  return 0;
1217}
1218
1219
1220static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1221  int result;
1222  DWORD bytes;
1223  uv_write_t* req = (uv_write_t*) parameter;
1224  uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1225  uv_loop_t* loop = handle->loop;
1226
1227  assert(req != NULL);
1228  assert(req->type == UV_WRITE);
1229  assert(handle->type == UV_NAMED_PIPE);
1230
1231  result = WriteFile(handle->handle,
1232                     req->write_buffer.base,
1233                     req->write_buffer.len,
1234                     &bytes,
1235                     NULL);
1236
1237  if (!result) {
1238    SET_REQ_ERROR(req, GetLastError());
1239  }
1240
1241  POST_COMPLETION_FOR_REQ(loop, req);
1242  return 0;
1243}
1244
1245
1246static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1247  uv_read_t* req;
1248  uv_tcp_t* handle;
1249
1250  req = (uv_read_t*) context;
1251  assert(req != NULL);
1252  handle = (uv_tcp_t*)req->data;
1253  assert(handle != NULL);
1254  assert(!timed_out);
1255
1256  if (!PostQueuedCompletionStatus(handle->loop->iocp,
1257                                  req->u.io.overlapped.InternalHigh,
1258                                  0,
1259                                  &req->u.io.overlapped)) {
1260    uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1261  }
1262}
1263
1264
1265static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1266  uv_write_t* req;
1267  uv_tcp_t* handle;
1268
1269  req = (uv_write_t*) context;
1270  assert(req != NULL);
1271  handle = (uv_tcp_t*)req->handle;
1272  assert(handle != NULL);
1273  assert(!timed_out);
1274
1275  if (!PostQueuedCompletionStatus(handle->loop->iocp,
1276                                  req->u.io.overlapped.InternalHigh,
1277                                  0,
1278                                  &req->u.io.overlapped)) {
1279    uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1280  }
1281}
1282
1283
1284static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1285  uv_read_t* req;
1286  int result;
1287
1288  assert(handle->flags & UV_HANDLE_READING);
1289  assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1290
1291  assert(handle->handle != INVALID_HANDLE_VALUE);
1292
1293  req = &handle->read_req;
1294
1295  if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1296    handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
1297    if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1298                           req,
1299                           WT_EXECUTELONGFUNCTION)) {
1300      /* Make this req pending reporting an error. */
1301      SET_REQ_ERROR(req, GetLastError());
1302      goto error;
1303    }
1304  } else {
1305    memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1306    if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1307      assert(req->event_handle != NULL);
1308      req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1309    }
1310
1311    /* Do 0-read */
1312    result = ReadFile(handle->handle,
1313                      &uv_zero_,
1314                      0,
1315                      NULL,
1316                      &req->u.io.overlapped);
1317
1318    if (!result && GetLastError() != ERROR_IO_PENDING) {
1319      /* Make this req pending reporting an error. */
1320      SET_REQ_ERROR(req, GetLastError());
1321      goto error;
1322    }
1323
1324    if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1325      if (req->wait_handle == INVALID_HANDLE_VALUE) {
1326        if (!RegisterWaitForSingleObject(&req->wait_handle,
1327            req->event_handle, post_completion_read_wait, (void*) req,
1328            INFINITE, WT_EXECUTEINWAITTHREAD)) {
1329          SET_REQ_ERROR(req, GetLastError());
1330          goto error;
1331        }
1332      }
1333    }
1334  }
1335
1336  /* Start the eof timer if there is one */
1337  eof_timer_start(handle);
1338  handle->flags |= UV_HANDLE_READ_PENDING;
1339  handle->reqs_pending++;
1340  return;
1341
1342error:
1343  uv__insert_pending_req(loop, (uv_req_t*)req);
1344  handle->flags |= UV_HANDLE_READ_PENDING;
1345  handle->reqs_pending++;
1346}
1347
1348
1349int uv__pipe_read_start(uv_pipe_t* handle,
1350                        uv_alloc_cb alloc_cb,
1351                        uv_read_cb read_cb) {
1352  uv_loop_t* loop = handle->loop;
1353
1354  handle->flags |= UV_HANDLE_READING;
1355  INCREASE_ACTIVE_COUNT(loop, handle);
1356  handle->read_cb = read_cb;
1357  handle->alloc_cb = alloc_cb;
1358
1359  /* If reading was stopped and then started again, there could still be a read
1360   * request pending. */
1361  if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
1362    if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
1363        handle->read_req.event_handle == NULL) {
1364      handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
1365      if (handle->read_req.event_handle == NULL) {
1366        uv_fatal_error(GetLastError(), "CreateEvent");
1367      }
1368    }
1369    uv__pipe_queue_read(loop, handle);
1370  }
1371
1372  return 0;
1373}
1374
1375
1376static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
1377    uv_write_t* req) {
1378  req->next_req = NULL;
1379  if (handle->pipe.conn.non_overlapped_writes_tail) {
1380    req->next_req =
1381      handle->pipe.conn.non_overlapped_writes_tail->next_req;
1382    handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1383    handle->pipe.conn.non_overlapped_writes_tail = req;
1384  } else {
1385    req->next_req = (uv_req_t*)req;
1386    handle->pipe.conn.non_overlapped_writes_tail = req;
1387  }
1388}
1389
1390
1391static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1392  uv_write_t* req;
1393
1394  if (handle->pipe.conn.non_overlapped_writes_tail) {
1395    req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1396
1397    if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1398      handle->pipe.conn.non_overlapped_writes_tail = NULL;
1399    } else {
1400      handle->pipe.conn.non_overlapped_writes_tail->next_req =
1401        req->next_req;
1402    }
1403
1404    return req;
1405  } else {
1406    /* queue empty */
1407    return NULL;
1408  }
1409}
1410
1411
1412static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
1413  uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1414  if (req) {
1415    if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1416                           req,
1417                           WT_EXECUTELONGFUNCTION)) {
1418      uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1419    }
1420  }
1421}
1422
1423
1424static int uv__build_coalesced_write_req(uv_write_t* user_req,
1425                                         const uv_buf_t bufs[],
1426                                         size_t nbufs,
1427                                         uv_write_t** req_out,
1428                                         uv_buf_t* write_buf_out) {
1429  /* Pack into a single heap-allocated buffer:
1430   *   (a) a uv_write_t structure where libuv stores the actual state.
1431   *   (b) a pointer to the original uv_write_t.
1432   *   (c) data from all `bufs` entries.
1433   */
1434  char* heap_buffer;
1435  size_t heap_buffer_length, heap_buffer_offset;
1436  uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
1437  char* data_start;                           /* (c) */
1438  size_t data_length;
1439  unsigned int i;
1440
1441  /* Compute combined size of all combined buffers from `bufs`. */
1442  data_length = 0;
1443  for (i = 0; i < nbufs; i++)
1444    data_length += bufs[i].len;
1445
1446  /* The total combined size of data buffers should not exceed UINT32_MAX,
1447   * because WriteFile() won't accept buffers larger than that. */
1448  if (data_length > UINT32_MAX)
1449    return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1450
1451  /* Compute heap buffer size. */
1452  heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
1453                       data_length;                  /* (c) */
1454
1455  /* Allocate buffer. */
1456  heap_buffer = uv__malloc(heap_buffer_length);
1457  if (heap_buffer == NULL)
1458    return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1459
1460  /* Copy uv_write_t information to the buffer. */
1461  coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
1462  coalesced_write_req->req = *user_req; /* copy (a) */
1463  coalesced_write_req->req.coalesced = 1;
1464  coalesced_write_req->user_req = user_req;         /* copy (b) */
1465  heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
1466
1467  /* Copy data buffers to the heap buffer. */
1468  data_start = &heap_buffer[heap_buffer_offset];
1469  for (i = 0; i < nbufs; i++) {
1470    memcpy(&heap_buffer[heap_buffer_offset],
1471           bufs[i].base,
1472           bufs[i].len);               /* copy (c) */
1473    heap_buffer_offset += bufs[i].len; /* offset (c) */
1474  }
1475  assert(heap_buffer_offset == heap_buffer_length);
1476
1477  /* Set out arguments and return. */
1478  *req_out = &coalesced_write_req->req;
1479  *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
1480  return 0;
1481}
1482
1483
1484static int uv__pipe_write_data(uv_loop_t* loop,
1485                               uv_write_t* req,
1486                               uv_pipe_t* handle,
1487                               const uv_buf_t bufs[],
1488                               size_t nbufs,
1489                               uv_write_cb cb,
1490                               int copy_always) {
1491  int err;
1492  int result;
1493  uv_buf_t write_buf;
1494
1495  assert(handle->handle != INVALID_HANDLE_VALUE);
1496
1497  UV_REQ_INIT(req, UV_WRITE);
1498  req->handle = (uv_stream_t*) handle;
1499  req->send_handle = NULL;
1500  req->cb = cb;
1501  /* Private fields. */
1502  req->coalesced = 0;
1503  req->event_handle = NULL;
1504  req->wait_handle = INVALID_HANDLE_VALUE;
1505
1506  /* Prepare the overlapped structure. */
1507  memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1508  if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
1509    req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1510    if (req->event_handle == NULL) {
1511      uv_fatal_error(GetLastError(), "CreateEvent");
1512    }
1513    req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1514  }
1515  req->write_buffer = uv_null_buf_;
1516
1517  if (nbufs == 0) {
1518    /* Write empty buffer. */
1519    write_buf = uv_null_buf_;
1520  } else if (nbufs == 1 && !copy_always) {
1521    /* Write directly from bufs[0]. */
1522    write_buf = bufs[0];
1523  } else {
1524    /* Coalesce all `bufs` into one big buffer. This also creates a new
1525     * write-request structure that replaces the old one. */
1526    err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
1527    if (err != 0)
1528      return err;
1529  }
1530
1531  if ((handle->flags &
1532      (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1533      (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1534    DWORD bytes;
1535    result =
1536        WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
1537
1538    if (!result) {
1539      err = GetLastError();
1540      return err;
1541    } else {
1542      /* Request completed immediately. */
1543      req->u.io.queued_bytes = 0;
1544    }
1545
1546    REGISTER_HANDLE_REQ(loop, handle, req);
1547    handle->reqs_pending++;
1548    handle->stream.conn.write_reqs_pending++;
1549    POST_COMPLETION_FOR_REQ(loop, req);
1550    return 0;
1551  } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1552    req->write_buffer = write_buf;
1553    uv__insert_non_overlapped_write_req(handle, req);
1554    if (handle->stream.conn.write_reqs_pending == 0) {
1555      uv__queue_non_overlapped_write(handle);
1556    }
1557
1558    /* Request queued by the kernel. */
1559    req->u.io.queued_bytes = write_buf.len;
1560    handle->write_queue_size += req->u.io.queued_bytes;
1561  } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1562    /* Using overlapped IO, but wait for completion before returning */
1563    result = WriteFile(handle->handle,
1564                       write_buf.base,
1565                       write_buf.len,
1566                       NULL,
1567                       &req->u.io.overlapped);
1568
1569    if (!result && GetLastError() != ERROR_IO_PENDING) {
1570      err = GetLastError();
1571      CloseHandle(req->event_handle);
1572      req->event_handle = NULL;
1573      return err;
1574    }
1575
1576    if (result) {
1577      /* Request completed immediately. */
1578      req->u.io.queued_bytes = 0;
1579    } else {
1580      /* Request queued by the kernel. */
1581      req->u.io.queued_bytes = write_buf.len;
1582      handle->write_queue_size += req->u.io.queued_bytes;
1583      if (WaitForSingleObject(req->event_handle, INFINITE) !=
1584          WAIT_OBJECT_0) {
1585        err = GetLastError();
1586        CloseHandle(req->event_handle);
1587        req->event_handle = NULL;
1588        return err;
1589      }
1590    }
1591    CloseHandle(req->event_handle);
1592    req->event_handle = NULL;
1593
1594    REGISTER_HANDLE_REQ(loop, handle, req);
1595    handle->reqs_pending++;
1596    handle->stream.conn.write_reqs_pending++;
1597    return 0;
1598  } else {
1599    result = WriteFile(handle->handle,
1600                       write_buf.base,
1601                       write_buf.len,
1602                       NULL,
1603                       &req->u.io.overlapped);
1604
1605    if (!result && GetLastError() != ERROR_IO_PENDING) {
1606      return GetLastError();
1607    }
1608
1609    if (result) {
1610      /* Request completed immediately. */
1611      req->u.io.queued_bytes = 0;
1612    } else {
1613      /* Request queued by the kernel. */
1614      req->u.io.queued_bytes = write_buf.len;
1615      handle->write_queue_size += req->u.io.queued_bytes;
1616    }
1617
1618    if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1619      if (!RegisterWaitForSingleObject(&req->wait_handle,
1620          req->event_handle, post_completion_write_wait, (void*) req,
1621          INFINITE, WT_EXECUTEINWAITTHREAD)) {
1622        return GetLastError();
1623      }
1624    }
1625  }
1626
1627  REGISTER_HANDLE_REQ(loop, handle, req);
1628  handle->reqs_pending++;
1629  handle->stream.conn.write_reqs_pending++;
1630
1631  return 0;
1632}
1633
1634
1635static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
1636  DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
1637
1638  /* If the both ends of the IPC pipe are owned by the same process,
1639   * the remote end pid may not yet be set. If so, do it here.
1640   * TODO: this is weird; it'd probably better to use a handshake. */
1641  if (*pid == 0)
1642    *pid = GetCurrentProcessId();
1643
1644  return *pid;
1645}
1646
1647
1648int uv__pipe_write_ipc(uv_loop_t* loop,
1649                       uv_write_t* req,
1650                       uv_pipe_t* handle,
1651                       const uv_buf_t data_bufs[],
1652                       size_t data_buf_count,
1653                       uv_stream_t* send_handle,
1654                       uv_write_cb cb) {
1655  uv_buf_t stack_bufs[6];
1656  uv_buf_t* bufs;
1657  size_t buf_count, buf_index;
1658  uv__ipc_frame_header_t frame_header;
1659  uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
1660  uv__ipc_socket_xfer_info_t xfer_info;
1661  uint64_t data_length;
1662  size_t i;
1663  int err;
1664
1665  /* Compute the combined size of data buffers. */
1666  data_length = 0;
1667  for (i = 0; i < data_buf_count; i++)
1668    data_length += data_bufs[i].len;
1669  if (data_length > UINT32_MAX)
1670    return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1671
1672  /* Prepare the frame's socket xfer payload. */
1673  if (send_handle != NULL) {
1674    uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
1675
1676    /* Verify that `send_handle` it is indeed a tcp handle. */
1677    if (send_tcp_handle->type != UV_TCP)
1678      return ERROR_NOT_SUPPORTED;
1679
1680    /* Export the tcp handle. */
1681    err = uv__tcp_xfer_export(send_tcp_handle,
1682                              uv__pipe_get_ipc_remote_pid(handle),
1683                              &xfer_type,
1684                              &xfer_info);
1685    if (err != 0)
1686      return err;
1687  }
1688
1689  /* Compute the number of uv_buf_t's required. */
1690  buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
1691  if (send_handle != NULL)
1692    buf_count += 1; /* One extra for the socket xfer information. */
1693
1694  /* Use the on-stack buffer array if it is big enough; otherwise allocate
1695   * space for it on the heap. */
1696  if (buf_count < ARRAY_SIZE(stack_bufs)) {
1697    /* Use on-stack buffer array. */
1698    bufs = stack_bufs;
1699  } else {
1700    /* Use heap-allocated buffer array. */
1701    bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
1702    if (bufs == NULL)
1703      return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1704  }
1705  buf_index = 0;
1706
1707  /* Initialize frame header and add it to the buffers list. */
1708  memset(&frame_header, 0, sizeof frame_header);
1709  bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
1710
1711  if (send_handle != NULL) {
1712    /* Add frame header flags. */
1713    switch (xfer_type) {
1714      case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
1715        frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
1716                              UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
1717        break;
1718      case UV__IPC_SOCKET_XFER_TCP_SERVER:
1719        frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
1720        break;
1721      default:
1722        assert(0);  /* Unreachable. */
1723    }
1724    /* Add xfer info buffer. */
1725    bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
1726  }
1727
1728  if (data_length > 0) {
1729    /* Update frame header. */
1730    frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
1731    frame_header.data_length = (uint32_t) data_length;
1732    /* Add data buffers to buffers list. */
1733    for (i = 0; i < data_buf_count; i++)
1734      bufs[buf_index++] = data_bufs[i];
1735  }
1736
1737  /* Write buffers. We set the `always_copy` flag, so it is not a problem that
1738   * some of the written data lives on the stack. */
1739  err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
1740
1741  /* If we had to heap-allocate the bufs array, free it now. */
1742  if (bufs != stack_bufs) {
1743    uv__free(bufs);
1744  }
1745
1746  return err;
1747}
1748
1749
1750int uv__pipe_write(uv_loop_t* loop,
1751                   uv_write_t* req,
1752                   uv_pipe_t* handle,
1753                   const uv_buf_t bufs[],
1754                   size_t nbufs,
1755                   uv_stream_t* send_handle,
1756                   uv_write_cb cb) {
1757  if (handle->ipc) {
1758    /* IPC pipe write: use framing protocol. */
1759    return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
1760  } else {
1761    /* Non-IPC pipe write: put data on the wire directly. */
1762    assert(send_handle == NULL);
1763    return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
1764  }
1765}
1766
1767
1768static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1769    uv_buf_t buf) {
1770  /* If there is an eof timer running, we don't need it any more, so discard
1771   * it. */
1772  eof_timer_destroy(handle);
1773
1774  uv_read_stop((uv_stream_t*) handle);
1775
1776  handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1777}
1778
1779
1780static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1781    uv_buf_t buf) {
1782  /* If there is an eof timer running, we don't need it any more, so discard
1783   * it. */
1784  eof_timer_destroy(handle);
1785
1786  uv_read_stop((uv_stream_t*) handle);
1787
1788  handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1789}
1790
1791
1792static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1793    int error, uv_buf_t buf) {
1794  if (error == ERROR_BROKEN_PIPE) {
1795    uv__pipe_read_eof(loop, handle, buf);
1796  } else {
1797    uv__pipe_read_error(loop, handle, error, buf);
1798  }
1799}
1800
1801
1802static void uv__pipe_queue_ipc_xfer_info(
1803    uv_pipe_t* handle,
1804    uv__ipc_socket_xfer_type_t xfer_type,
1805    uv__ipc_socket_xfer_info_t* xfer_info) {
1806  uv__ipc_xfer_queue_item_t* item;
1807
1808  item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
1809  if (item == NULL)
1810    uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1811
1812  item->xfer_type = xfer_type;
1813  item->xfer_info = *xfer_info;
1814
1815  QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
1816  handle->pipe.conn.ipc_xfer_queue_length++;
1817}
1818
1819
1820/* Read an exact number of bytes from a pipe. If an error or end-of-file is
1821 * encountered before the requested number of bytes are read, an error is
1822 * returned. */
1823static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
1824  DWORD bytes_read, bytes_read_now;
1825
1826  bytes_read = 0;
1827  while (bytes_read < count) {
1828    if (!ReadFile(h,
1829                  (char*) buffer + bytes_read,
1830                  count - bytes_read,
1831                  &bytes_read_now,
1832                  NULL)) {
1833      return GetLastError();
1834    }
1835
1836    bytes_read += bytes_read_now;
1837  }
1838
1839  assert(bytes_read == count);
1840  return 0;
1841}
1842
1843
1844static DWORD uv__pipe_read_data(uv_loop_t* loop,
1845                                uv_pipe_t* handle,
1846                                DWORD suggested_bytes,
1847                                DWORD max_bytes) {
1848  DWORD bytes_read;
1849  uv_buf_t buf;
1850
1851  /* Ask the user for a buffer to read data into. */
1852  buf = uv_buf_init(NULL, 0);
1853  handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
1854  if (buf.base == NULL || buf.len == 0) {
1855    handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1856    return 0; /* Break out of read loop. */
1857  }
1858
1859  /* Ensure we read at most the smaller of:
1860   *   (a) the length of the user-allocated buffer.
1861   *   (b) the maximum data length as specified by the `max_bytes` argument.
1862   */
1863  if (max_bytes > buf.len)
1864    max_bytes = buf.len;
1865
1866  /* Read into the user buffer. */
1867  if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
1868    uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
1869    return 0; /* Break out of read loop. */
1870  }
1871
1872  /* Call the read callback. */
1873  handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
1874
1875  return bytes_read;
1876}
1877
1878
1879static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
1880  uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
1881  int err;
1882
1883  if (*data_remaining > 0) {
1884    /* Read frame data payload. */
1885    DWORD bytes_read =
1886        uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
1887    *data_remaining -= bytes_read;
1888    return bytes_read;
1889
1890  } else {
1891    /* Start of a new IPC frame. */
1892    uv__ipc_frame_header_t frame_header;
1893    uint32_t xfer_flags;
1894    uv__ipc_socket_xfer_type_t xfer_type;
1895    uv__ipc_socket_xfer_info_t xfer_info;
1896
1897    /* Read the IPC frame header. */
1898    err = uv__pipe_read_exactly(
1899        handle->handle, &frame_header, sizeof frame_header);
1900    if (err)
1901      goto error;
1902
1903    /* Validate that flags are valid. */
1904    if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
1905      goto invalid;
1906    /* Validate that reserved2 is zero. */
1907    if (frame_header.reserved2 != 0)
1908      goto invalid;
1909
1910    /* Parse xfer flags. */
1911    xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
1912    if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
1913      /* Socket coming -- determine the type. */
1914      xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
1915                      ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
1916                      : UV__IPC_SOCKET_XFER_TCP_SERVER;
1917    } else if (xfer_flags == 0) {
1918      /* No socket. */
1919      xfer_type = UV__IPC_SOCKET_XFER_NONE;
1920    } else {
1921      /* Invalid flags. */
1922      goto invalid;
1923    }
1924
1925    /* Parse data frame information. */
1926    if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
1927      *data_remaining = frame_header.data_length;
1928    } else if (frame_header.data_length != 0) {
1929      /* Data length greater than zero but data flag not set -- invalid. */
1930      goto invalid;
1931    }
1932
1933    /* If no socket xfer info follows, return here. Data will be read in a
1934     * subsequent invocation of uv__pipe_read_ipc(). */
1935    if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
1936      return sizeof frame_header; /* Number of bytes read. */
1937
1938    /* Read transferred socket information. */
1939    err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
1940    if (err)
1941      goto error;
1942
1943    /* Store the pending socket info. */
1944    uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
1945
1946    /* Return number of bytes read. */
1947    return sizeof frame_header + sizeof xfer_info;
1948  }
1949
1950invalid:
1951  /* Invalid frame. */
1952  err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
1953
1954error:
1955  uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
1956  return 0; /* Break out of read loop. */
1957}
1958
1959
1960void uv__process_pipe_read_req(uv_loop_t* loop,
1961                               uv_pipe_t* handle,
1962                               uv_req_t* req) {
1963  assert(handle->type == UV_NAMED_PIPE);
1964
1965  handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
1966  DECREASE_PENDING_REQ_COUNT(handle);
1967  eof_timer_stop(handle);
1968
1969  /* At this point, we're done with bookkeeping. If the user has stopped
1970   * reading the pipe in the meantime, there is nothing left to do, since there
1971   * is no callback that we can call. */
1972  if (!(handle->flags & UV_HANDLE_READING))
1973    return;
1974
1975  if (!REQ_SUCCESS(req)) {
1976    /* An error occurred doing the zero-read. */
1977    DWORD err = GET_REQ_ERROR(req);
1978
1979    /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
1980     * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
1981     * the user; we'll start a new zero-read at the end of this function. */
1982    if (err != ERROR_OPERATION_ABORTED)
1983      uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
1984
1985  } else {
1986    /* The zero-read completed without error, indicating there is data
1987     * available in the kernel buffer. */
1988    DWORD avail;
1989
1990    /* Get the number of bytes available. */
1991    avail = 0;
1992    if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
1993      uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1994
1995    /* Read until we've either read all the bytes available, or the 'reading'
1996     * flag is cleared. */
1997    while (avail > 0 && handle->flags & UV_HANDLE_READING) {
1998      /* Depending on the type of pipe, read either IPC frames or raw data. */
1999      DWORD bytes_read =
2000          handle->ipc ? uv__pipe_read_ipc(loop, handle)
2001                      : uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
2002
2003      /* If no bytes were read, treat this as an indication that an error
2004       * occurred, and break out of the read loop. */
2005      if (bytes_read == 0)
2006        break;
2007
2008      /* It is possible that more bytes were read than we thought were
2009       * available. To prevent `avail` from underflowing, break out of the loop
2010       * if this is the case. */
2011      if (bytes_read > avail)
2012        break;
2013
2014      /* Recompute the number of bytes available. */
2015      avail -= bytes_read;
2016    }
2017  }
2018
2019  /* Start another zero-read request if necessary. */
2020  if ((handle->flags & UV_HANDLE_READING) &&
2021      !(handle->flags & UV_HANDLE_READ_PENDING)) {
2022    uv__pipe_queue_read(loop, handle);
2023  }
2024}
2025
2026
2027void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
2028    uv_write_t* req) {
2029  int err;
2030
2031  assert(handle->type == UV_NAMED_PIPE);
2032
2033  assert(handle->write_queue_size >= req->u.io.queued_bytes);
2034  handle->write_queue_size -= req->u.io.queued_bytes;
2035
2036  UNREGISTER_HANDLE_REQ(loop, handle, req);
2037
2038  if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
2039    if (req->wait_handle != INVALID_HANDLE_VALUE) {
2040      UnregisterWait(req->wait_handle);
2041      req->wait_handle = INVALID_HANDLE_VALUE;
2042    }
2043    if (req->event_handle) {
2044      CloseHandle(req->event_handle);
2045      req->event_handle = NULL;
2046    }
2047  }
2048
2049  err = GET_REQ_ERROR(req);
2050
2051  /* If this was a coalesced write, extract pointer to the user_provided
2052   * uv_write_t structure so we can pass the expected pointer to the callback,
2053   * then free the heap-allocated write req. */
2054  if (req->coalesced) {
2055    uv__coalesced_write_t* coalesced_write =
2056        container_of(req, uv__coalesced_write_t, req);
2057    req = coalesced_write->user_req;
2058    uv__free(coalesced_write);
2059  }
2060  if (req->cb) {
2061    req->cb(req, uv_translate_sys_error(err));
2062  }
2063
2064  handle->stream.conn.write_reqs_pending--;
2065
2066  if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
2067      handle->pipe.conn.non_overlapped_writes_tail) {
2068    assert(handle->stream.conn.write_reqs_pending > 0);
2069    uv__queue_non_overlapped_write(handle);
2070  }
2071
2072  if (handle->stream.conn.write_reqs_pending == 0)
2073    if (handle->flags & UV_HANDLE_SHUTTING)
2074      uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
2075
2076  DECREASE_PENDING_REQ_COUNT(handle);
2077}
2078
2079
2080void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
2081    uv_req_t* raw_req) {
2082  uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
2083
2084  assert(handle->type == UV_NAMED_PIPE);
2085
2086  if (handle->flags & UV_HANDLE_CLOSING) {
2087    /* The req->pipeHandle should be freed already in uv__pipe_close(). */
2088    assert(req->pipeHandle == INVALID_HANDLE_VALUE);
2089    DECREASE_PENDING_REQ_COUNT(handle);
2090    return;
2091  }
2092
2093  if (REQ_SUCCESS(req)) {
2094    assert(req->pipeHandle != INVALID_HANDLE_VALUE);
2095    req->next_pending = handle->pipe.serv.pending_accepts;
2096    handle->pipe.serv.pending_accepts = req;
2097
2098    if (handle->stream.serv.connection_cb) {
2099      handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
2100    }
2101  } else {
2102    if (req->pipeHandle != INVALID_HANDLE_VALUE) {
2103      CloseHandle(req->pipeHandle);
2104      req->pipeHandle = INVALID_HANDLE_VALUE;
2105    }
2106    if (!(handle->flags & UV_HANDLE_CLOSING)) {
2107      uv__pipe_queue_accept(loop, handle, req, FALSE);
2108    }
2109  }
2110
2111  DECREASE_PENDING_REQ_COUNT(handle);
2112}
2113
2114
2115void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
2116    uv_connect_t* req) {
2117  HANDLE pipeHandle;
2118  DWORD duplex_flags;
2119  int err;
2120
2121  assert(handle->type == UV_NAMED_PIPE);
2122
2123  UNREGISTER_HANDLE_REQ(loop, handle, req);
2124
2125  err = 0;
2126  if (REQ_SUCCESS(req)) {
2127    pipeHandle = req->u.connect.pipeHandle;
2128    duplex_flags = req->u.connect.duplex_flags;
2129    err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
2130    if (err)
2131      CloseHandle(pipeHandle);
2132  } else {
2133    err = uv_translate_sys_error(GET_REQ_ERROR(req));
2134  }
2135
2136  if (req->cb)
2137    req->cb(req, err);
2138
2139  DECREASE_PENDING_REQ_COUNT(handle);
2140}
2141
2142
2143
2144void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
2145    uv_shutdown_t* req) {
2146  int err;
2147
2148  assert(handle->type == UV_NAMED_PIPE);
2149
2150  /* Clear the shutdown_req field so we don't go here again. */
2151  handle->stream.conn.shutdown_req = NULL;
2152  handle->flags &= ~UV_HANDLE_SHUTTING;
2153  UNREGISTER_HANDLE_REQ(loop, handle, req);
2154
2155  if (handle->flags & UV_HANDLE_CLOSING) {
2156    /* Already closing. Cancel the shutdown. */
2157    err = UV_ECANCELED;
2158  } else if (!REQ_SUCCESS(req)) {
2159    /* An error occurred in trying to shutdown gracefully. */
2160    err = uv_translate_sys_error(GET_REQ_ERROR(req));
2161  } else {
2162    if (handle->flags & UV_HANDLE_READABLE) {
2163      /* Initialize and optionally start the eof timer. Only do this if the pipe
2164       * is readable and we haven't seen EOF come in ourselves. */
2165      eof_timer_init(handle);
2166
2167      /* If reading start the timer right now. Otherwise uv__pipe_queue_read will
2168       * start it. */
2169      if (handle->flags & UV_HANDLE_READ_PENDING) {
2170        eof_timer_start(handle);
2171      }
2172
2173    } else {
2174      /* This pipe is not readable. We can just close it to let the other end
2175       * know that we're done writing. */
2176      close_pipe(handle);
2177    }
2178    err = 0;
2179  }
2180
2181  if (req->cb)
2182    req->cb(req, err);
2183
2184  DECREASE_PENDING_REQ_COUNT(handle);
2185}
2186
2187
2188static void eof_timer_init(uv_pipe_t* pipe) {
2189  int r;
2190
2191  assert(pipe->pipe.conn.eof_timer == NULL);
2192  assert(pipe->flags & UV_HANDLE_CONNECTION);
2193
2194  pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
2195
2196  r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
2197  assert(r == 0);  /* timers can't fail */
2198  (void) r;
2199  pipe->pipe.conn.eof_timer->data = pipe;
2200  uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
2201}
2202
2203
2204static void eof_timer_start(uv_pipe_t* pipe) {
2205  assert(pipe->flags & UV_HANDLE_CONNECTION);
2206
2207  if (pipe->pipe.conn.eof_timer != NULL) {
2208    uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
2209  }
2210}
2211
2212
2213static void eof_timer_stop(uv_pipe_t* pipe) {
2214  assert(pipe->flags & UV_HANDLE_CONNECTION);
2215
2216  if (pipe->pipe.conn.eof_timer != NULL) {
2217    uv_timer_stop(pipe->pipe.conn.eof_timer);
2218  }
2219}
2220
2221
2222static void eof_timer_cb(uv_timer_t* timer) {
2223  uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
2224  uv_loop_t* loop = timer->loop;
2225
2226  assert(pipe->type == UV_NAMED_PIPE);
2227
2228  /* This should always be true, since we start the timer only in
2229   * uv__pipe_queue_read after successfully calling ReadFile, or in
2230   * uv__process_pipe_shutdown_req if a read is pending, and we always
2231   * immediately stop the timer in uv__process_pipe_read_req. */
2232  assert(pipe->flags & UV_HANDLE_READ_PENDING);
2233
2234  /* If there are many packets coming off the iocp then the timer callback may
2235   * be called before the read request is coming off the queue. Therefore we
2236   * check here if the read request has completed but will be processed later.
2237   */
2238  if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
2239      HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
2240    return;
2241  }
2242
2243  /* Force both ends off the pipe. */
2244  close_pipe(pipe);
2245
2246  /* Stop reading, so the pending read that is going to fail will not be
2247   * reported to the user. */
2248  uv_read_stop((uv_stream_t*) pipe);
2249
2250  /* Report the eof and update flags. This will get reported even if the user
2251   * stopped reading in the meantime. TODO: is that okay? */
2252  uv__pipe_read_eof(loop, pipe, uv_null_buf_);
2253}
2254
2255
2256static void eof_timer_destroy(uv_pipe_t* pipe) {
2257  assert(pipe->flags & UV_HANDLE_CONNECTION);
2258
2259  if (pipe->pipe.conn.eof_timer) {
2260    uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
2261    pipe->pipe.conn.eof_timer = NULL;
2262  }
2263}
2264
2265
2266static void eof_timer_close_cb(uv_handle_t* handle) {
2267  assert(handle->type == UV_TIMER);
2268  uv__free(handle);
2269}
2270
2271
2272int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
2273  HANDLE os_handle = uv__get_osfhandle(file);
2274  NTSTATUS nt_status;
2275  IO_STATUS_BLOCK io_status;
2276  FILE_ACCESS_INFORMATION access;
2277  DWORD duplex_flags = 0;
2278  int err;
2279
2280  if (os_handle == INVALID_HANDLE_VALUE)
2281    return UV_EBADF;
2282  if (pipe->flags & UV_HANDLE_PIPESERVER)
2283    return UV_EINVAL;
2284  if (pipe->flags & UV_HANDLE_CONNECTION)
2285    return UV_EBUSY;
2286
2287  uv__pipe_connection_init(pipe);
2288  uv__once_init();
2289  /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
2290   * underlying OS handle and forget about the original fd.
2291   * We could also opt to use the original OS handle and just never close it,
2292   * but then there would be no reliable way to cancel pending read operations
2293   * upon close.
2294   */
2295  if (file <= 2) {
2296    if (!DuplicateHandle(INVALID_HANDLE_VALUE,
2297                         os_handle,
2298                         INVALID_HANDLE_VALUE,
2299                         &os_handle,
2300                         0,
2301                         FALSE,
2302                         DUPLICATE_SAME_ACCESS))
2303      return uv_translate_sys_error(GetLastError());
2304    assert(os_handle != INVALID_HANDLE_VALUE);
2305    file = -1;
2306  }
2307
2308  /* Determine what kind of permissions we have on this handle.
2309   * Cygwin opens the pipe in message mode, but we can support it,
2310   * just query the access flags and set the stream flags accordingly.
2311   */
2312  nt_status = pNtQueryInformationFile(os_handle,
2313                                      &io_status,
2314                                      &access,
2315                                      sizeof(access),
2316                                      FileAccessInformation);
2317  if (nt_status != STATUS_SUCCESS)
2318    return UV_EINVAL;
2319
2320  if (pipe->ipc) {
2321    if (!(access.AccessFlags & FILE_WRITE_DATA) ||
2322        !(access.AccessFlags & FILE_READ_DATA)) {
2323      return UV_EINVAL;
2324    }
2325  }
2326
2327  if (access.AccessFlags & FILE_WRITE_DATA)
2328    duplex_flags |= UV_HANDLE_WRITABLE;
2329  if (access.AccessFlags & FILE_READ_DATA)
2330    duplex_flags |= UV_HANDLE_READABLE;
2331
2332  err = uv__set_pipe_handle(pipe->loop,
2333                            pipe,
2334                            os_handle,
2335                            file,
2336                            duplex_flags);
2337  if (err) {
2338    if (file == -1)
2339      CloseHandle(os_handle);
2340    return err;
2341  }
2342
2343  if (pipe->ipc) {
2344    assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
2345    pipe->pipe.conn.ipc_remote_pid = uv_os_getppid();
2346    assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
2347  }
2348  return 0;
2349}
2350
2351
2352static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2353  NTSTATUS nt_status;
2354  IO_STATUS_BLOCK io_status;
2355  FILE_NAME_INFORMATION tmp_name_info;
2356  FILE_NAME_INFORMATION* name_info;
2357  WCHAR* name_buf;
2358  unsigned int addrlen;
2359  unsigned int name_size;
2360  unsigned int name_len;
2361  int err;
2362
2363  uv__once_init();
2364  name_info = NULL;
2365
2366  if (handle->name != NULL) {
2367    /* The user might try to query the name before we are connected,
2368     * and this is just easier to return the cached value if we have it. */
2369    name_buf = handle->name;
2370    name_len = wcslen(name_buf);
2371
2372    /* check how much space we need */
2373    addrlen = WideCharToMultiByte(CP_UTF8,
2374                                  0,
2375                                  name_buf,
2376                                  name_len,
2377                                  NULL,
2378                                  0,
2379                                  NULL,
2380                                  NULL);
2381    if (!addrlen) {
2382      *size = 0;
2383      err = uv_translate_sys_error(GetLastError());
2384      return err;
2385    } else if (addrlen >= *size) {
2386      *size = addrlen + 1;
2387      err = UV_ENOBUFS;
2388      goto error;
2389    }
2390
2391    addrlen = WideCharToMultiByte(CP_UTF8,
2392                                  0,
2393                                  name_buf,
2394                                  name_len,
2395                                  buffer,
2396                                  addrlen,
2397                                  NULL,
2398                                  NULL);
2399    if (!addrlen) {
2400      *size = 0;
2401      err = uv_translate_sys_error(GetLastError());
2402      return err;
2403    }
2404
2405    *size = addrlen;
2406    buffer[addrlen] = '\0';
2407
2408    return 0;
2409  }
2410
2411  if (handle->handle == INVALID_HANDLE_VALUE) {
2412    *size = 0;
2413    return UV_EINVAL;
2414  }
2415
2416  /* NtQueryInformationFile will block if another thread is performing a
2417   * blocking operation on the queried handle. If the pipe handle is
2418   * synchronous, there may be a worker thread currently calling ReadFile() on
2419   * the pipe handle, which could cause a deadlock. To avoid this, interrupt
2420   * the read. */
2421  if (handle->flags & UV_HANDLE_CONNECTION &&
2422      handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
2423    uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
2424  }
2425
2426  nt_status = pNtQueryInformationFile(handle->handle,
2427                                      &io_status,
2428                                      &tmp_name_info,
2429                                      sizeof tmp_name_info,
2430                                      FileNameInformation);
2431  if (nt_status == STATUS_BUFFER_OVERFLOW) {
2432    name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2433    name_info = uv__malloc(name_size);
2434    if (!name_info) {
2435      *size = 0;
2436      err = UV_ENOMEM;
2437      goto cleanup;
2438    }
2439
2440    nt_status = pNtQueryInformationFile(handle->handle,
2441                                        &io_status,
2442                                        name_info,
2443                                        name_size,
2444                                        FileNameInformation);
2445  }
2446
2447  if (nt_status != STATUS_SUCCESS) {
2448    *size = 0;
2449    err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2450    goto error;
2451  }
2452
2453  if (!name_info) {
2454    /* the struct on stack was used */
2455    name_buf = tmp_name_info.FileName;
2456    name_len = tmp_name_info.FileNameLength;
2457  } else {
2458    name_buf = name_info->FileName;
2459    name_len = name_info->FileNameLength;
2460  }
2461
2462  if (name_len == 0) {
2463    *size = 0;
2464    err = 0;
2465    goto error;
2466  }
2467
2468  name_len /= sizeof(WCHAR);
2469
2470  /* check how much space we need */
2471  addrlen = WideCharToMultiByte(CP_UTF8,
2472                                0,
2473                                name_buf,
2474                                name_len,
2475                                NULL,
2476                                0,
2477                                NULL,
2478                                NULL);
2479  if (!addrlen) {
2480    *size = 0;
2481    err = uv_translate_sys_error(GetLastError());
2482    goto error;
2483  } else if (pipe_prefix_len + addrlen >= *size) {
2484    /* "\\\\.\\pipe" + name */
2485    *size = pipe_prefix_len + addrlen + 1;
2486    err = UV_ENOBUFS;
2487    goto error;
2488  }
2489
2490  memcpy(buffer, pipe_prefix, pipe_prefix_len);
2491  addrlen = WideCharToMultiByte(CP_UTF8,
2492                                0,
2493                                name_buf,
2494                                name_len,
2495                                buffer+pipe_prefix_len,
2496                                *size-pipe_prefix_len,
2497                                NULL,
2498                                NULL);
2499  if (!addrlen) {
2500    *size = 0;
2501    err = uv_translate_sys_error(GetLastError());
2502    goto error;
2503  }
2504
2505  addrlen += pipe_prefix_len;
2506  *size = addrlen;
2507  buffer[addrlen] = '\0';
2508
2509  err = 0;
2510
2511error:
2512  uv__free(name_info);
2513
2514cleanup:
2515  return err;
2516}
2517
2518
2519int uv_pipe_pending_count(uv_pipe_t* handle) {
2520  if (!handle->ipc)
2521    return 0;
2522  return handle->pipe.conn.ipc_xfer_queue_length;
2523}
2524
2525
2526int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2527  if (handle->flags & UV_HANDLE_BOUND)
2528    return uv__pipe_getname(handle, buffer, size);
2529
2530  if (handle->flags & UV_HANDLE_CONNECTION ||
2531      handle->handle != INVALID_HANDLE_VALUE) {
2532    *size = 0;
2533    return 0;
2534  }
2535
2536  return UV_EBADF;
2537}
2538
2539
2540int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2541  /* emulate unix behaviour */
2542  if (handle->flags & UV_HANDLE_BOUND)
2543    return UV_ENOTCONN;
2544
2545  if (handle->handle != INVALID_HANDLE_VALUE)
2546    return uv__pipe_getname(handle, buffer, size);
2547
2548  if (handle->flags & UV_HANDLE_CONNECTION) {
2549    if (handle->name != NULL)
2550      return uv__pipe_getname(handle, buffer, size);
2551  }
2552
2553  return UV_EBADF;
2554}
2555
2556
2557uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2558  if (!handle->ipc)
2559    return UV_UNKNOWN_HANDLE;
2560  if (handle->pipe.conn.ipc_xfer_queue_length == 0)
2561    return UV_UNKNOWN_HANDLE;
2562  else
2563    return UV_TCP;
2564}
2565
2566int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2567  SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
2568  PACL old_dacl, new_dacl;
2569  PSECURITY_DESCRIPTOR sd;
2570  EXPLICIT_ACCESS ea;
2571  PSID everyone;
2572  int error;
2573
2574  if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2575    return UV_EBADF;
2576
2577  if (mode != UV_READABLE &&
2578      mode != UV_WRITABLE &&
2579      mode != (UV_WRITABLE | UV_READABLE))
2580    return UV_EINVAL;
2581
2582  if (!AllocateAndInitializeSid(&sid_world,
2583                                1,
2584                                SECURITY_WORLD_RID,
2585                                0, 0, 0, 0, 0, 0, 0,
2586                                &everyone)) {
2587    error = GetLastError();
2588    goto done;
2589  }
2590
2591  if (GetSecurityInfo(handle->handle,
2592                      SE_KERNEL_OBJECT,
2593                      DACL_SECURITY_INFORMATION,
2594                      NULL,
2595                      NULL,
2596                      &old_dacl,
2597                      NULL,
2598                      &sd)) {
2599    error = GetLastError();
2600    goto clean_sid;
2601  }
2602
2603  memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2604  if (mode & UV_READABLE)
2605    ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2606  if (mode & UV_WRITABLE)
2607    ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2608  ea.grfAccessPermissions |= SYNCHRONIZE;
2609  ea.grfAccessMode = SET_ACCESS;
2610  ea.grfInheritance = NO_INHERITANCE;
2611  ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2612  ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2613  ea.Trustee.ptstrName = (LPTSTR)everyone;
2614
2615  if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2616    error = GetLastError();
2617    goto clean_sd;
2618  }
2619
2620  if (SetSecurityInfo(handle->handle,
2621                      SE_KERNEL_OBJECT,
2622                      DACL_SECURITY_INFORMATION,
2623                      NULL,
2624                      NULL,
2625                      new_dacl,
2626                      NULL)) {
2627    error = GetLastError();
2628    goto clean_dacl;
2629  }
2630
2631  error = 0;
2632
2633clean_dacl:
2634  LocalFree((HLOCAL) new_dacl);
2635clean_sd:
2636  LocalFree((HLOCAL) sd);
2637clean_sid:
2638  FreeSid(everyone);
2639done:
2640  return uv_translate_sys_error(error);
2641}
2642