1'use strict';
2const common = require('../common');
3// Skip test in FreeBSD jails.
4if (common.inFreeBSDJail)
5  common.skip('In a FreeBSD jail');
6
7const assert = require('assert');
8const dgram = require('dgram');
9const fork = require('child_process').fork;
10const networkInterfaces = require('os').networkInterfaces();
11const GROUP_ADDRESS = '232.1.1.1';
12const TIMEOUT = common.platformTimeout(5000);
13const messages = [
14  Buffer.from('First message to send'),
15  Buffer.from('Second message to send'),
16  Buffer.from('Third message to send'),
17  Buffer.from('Fourth message to send'),
18];
19const workers = {};
20const listeners = 3;
21let listening, sendSocket, done, timer, dead;
22
23let sourceAddress = null;
24
25// Take the first non-internal interface as the IPv4 address for binding.
26// Ideally, this should favor internal/private interfaces.
27get_sourceAddress: for (const name in networkInterfaces) {
28  const interfaces = networkInterfaces[name];
29  for (let i = 0; i < interfaces.length; i++) {
30    const localInterface = interfaces[i];
31    if (!localInterface.internal && localInterface.family === 'IPv4') {
32      sourceAddress = localInterface.address;
33      break get_sourceAddress;
34    }
35  }
36}
37assert.ok(sourceAddress);
38
39function launchChildProcess() {
40  const worker = fork(__filename, ['child']);
41  workers[worker.pid] = worker;
42
43  worker.messagesReceived = [];
44
45  // Handle the death of workers.
46  worker.on('exit', function(code) {
47    // Don't consider this the true death if the worker has finished
48    // successfully or if the exit code is 0.
49    if (worker.isDone || code === 0) {
50      return;
51    }
52
53    dead += 1;
54    console.error('[PARENT] Worker %d died. %d dead of %d',
55                  worker.pid,
56                  dead,
57                  listeners);
58
59    if (dead === listeners) {
60      console.error('[PARENT] All workers have died.');
61      console.error('[PARENT] Fail');
62      assert.fail();
63    }
64  });
65
66  worker.on('message', function(msg) {
67    if (msg.listening) {
68      listening += 1;
69
70      if (listening === listeners) {
71        // All child process are listening, so start sending.
72        sendSocket.sendNext();
73      }
74      return;
75    }
76    if (msg.message) {
77      worker.messagesReceived.push(msg.message);
78
79      if (worker.messagesReceived.length === messages.length) {
80        done += 1;
81        worker.isDone = true;
82        console.error('[PARENT] %d received %d messages total.',
83                      worker.pid,
84                      worker.messagesReceived.length);
85      }
86
87      if (done === listeners) {
88        console.error('[PARENT] All workers have received the ' +
89                      'required number of messages. Will now compare.');
90
91        Object.keys(workers).forEach(function(pid) {
92          const worker = workers[pid];
93
94          let count = 0;
95
96          worker.messagesReceived.forEach(function(buf) {
97            for (let i = 0; i < messages.length; ++i) {
98              if (buf.toString() === messages[i].toString()) {
99                count++;
100                break;
101              }
102            }
103          });
104
105          console.error('[PARENT] %d received %d matching messages.',
106                        worker.pid, count);
107
108          assert.strictEqual(count, messages.length);
109        });
110
111        clearTimeout(timer);
112        console.error('[PARENT] Success');
113        killChildren(workers);
114      }
115    }
116  });
117}
118
119function killChildren(children) {
120  Object.keys(children).forEach(function(key) {
121    const child = children[key];
122    child.kill();
123  });
124}
125
126if (process.argv[2] !== 'child') {
127  listening = 0;
128  dead = 0;
129  let i = 0;
130  done = 0;
131
132  // Exit the test if it doesn't succeed within TIMEOUT.
133  timer = setTimeout(function() {
134    console.error('[PARENT] Responses were not received within %d ms.',
135                  TIMEOUT);
136    console.error('[PARENT] Fail');
137
138    killChildren(workers);
139
140    assert.fail();
141  }, TIMEOUT);
142
143  // Launch child processes.
144  for (let x = 0; x < listeners; x++) {
145    launchChildProcess(x);
146  }
147
148  sendSocket = dgram.createSocket('udp4');
149
150  // The socket is actually created async now.
151  sendSocket.on('listening', function() {
152    sendSocket.setTTL(1);
153    sendSocket.setBroadcast(true);
154    sendSocket.setMulticastTTL(1);
155    sendSocket.setMulticastLoopback(true);
156    sendSocket.addSourceSpecificMembership(sourceAddress, GROUP_ADDRESS);
157  });
158
159  sendSocket.on('close', function() {
160    console.error('[PARENT] sendSocket closed');
161  });
162
163  sendSocket.sendNext = function() {
164    const buf = messages[i++];
165
166    if (!buf) {
167      try { sendSocket.close(); } catch {
168        // Continue regardless of error.
169      }
170      return;
171    }
172
173    sendSocket.send(
174      buf,
175      0,
176      buf.length,
177      common.PORT,
178      GROUP_ADDRESS,
179      function(err) {
180        assert.ifError(err);
181        console.error('[PARENT] sent "%s" to %s:%s',
182                      buf.toString(),
183                      GROUP_ADDRESS, common.PORT);
184        process.nextTick(sendSocket.sendNext);
185      },
186    );
187  };
188}
189
190if (process.argv[2] === 'child') {
191  const receivedMessages = [];
192  const listenSocket = dgram.createSocket({
193    type: 'udp4',
194    reuseAddr: true,
195  });
196
197  listenSocket.on('listening', function() {
198    listenSocket.setMulticastLoopback(true);
199    listenSocket.addSourceSpecificMembership(sourceAddress, GROUP_ADDRESS);
200
201    listenSocket.on('message', function(buf, rinfo) {
202      console.error('[CHILD] %s received "%s" from %j', process.pid,
203                    buf.toString(), rinfo);
204
205      receivedMessages.push(buf);
206
207      process.send({ message: buf.toString() });
208
209      if (receivedMessages.length === messages.length) {
210        // .dropSourceSpecificMembership() not strictly needed,
211        // it is here as a sanity check.
212        listenSocket.dropSourceSpecificMembership(sourceAddress, GROUP_ADDRESS);
213        process.nextTick(function() {
214          listenSocket.close();
215        });
216      }
217    });
218
219    listenSocket.on('close', function() {
220      // HACK: Wait to exit the process to ensure that the parent
221      // process has had time to receive all messages via process.send()
222      // This may be indicative of some other issue.
223      setTimeout(function() {
224        process.exit();
225      }, common.platformTimeout(1000));
226    });
227    process.send({ listening: true });
228  });
229
230  listenSocket.bind(common.PORT);
231}
232