1'use strict'; 2const common = require('../common'); 3// Skip test in FreeBSD jails. 4if (common.inFreeBSDJail) 5 common.skip('In a FreeBSD jail'); 6 7const assert = require('assert'); 8const dgram = require('dgram'); 9const fork = require('child_process').fork; 10const networkInterfaces = require('os').networkInterfaces(); 11const GROUP_ADDRESS = '232.1.1.1'; 12const TIMEOUT = common.platformTimeout(5000); 13const messages = [ 14 Buffer.from('First message to send'), 15 Buffer.from('Second message to send'), 16 Buffer.from('Third message to send'), 17 Buffer.from('Fourth message to send'), 18]; 19const workers = {}; 20const listeners = 3; 21let listening, sendSocket, done, timer, dead; 22 23let sourceAddress = null; 24 25// Take the first non-internal interface as the IPv4 address for binding. 26// Ideally, this should favor internal/private interfaces. 27get_sourceAddress: for (const name in networkInterfaces) { 28 const interfaces = networkInterfaces[name]; 29 for (let i = 0; i < interfaces.length; i++) { 30 const localInterface = interfaces[i]; 31 if (!localInterface.internal && localInterface.family === 'IPv4') { 32 sourceAddress = localInterface.address; 33 break get_sourceAddress; 34 } 35 } 36} 37assert.ok(sourceAddress); 38 39function launchChildProcess() { 40 const worker = fork(__filename, ['child']); 41 workers[worker.pid] = worker; 42 43 worker.messagesReceived = []; 44 45 // Handle the death of workers. 46 worker.on('exit', function(code) { 47 // Don't consider this the true death if the worker has finished 48 // successfully or if the exit code is 0. 49 if (worker.isDone || code === 0) { 50 return; 51 } 52 53 dead += 1; 54 console.error('[PARENT] Worker %d died. %d dead of %d', 55 worker.pid, 56 dead, 57 listeners); 58 59 if (dead === listeners) { 60 console.error('[PARENT] All workers have died.'); 61 console.error('[PARENT] Fail'); 62 assert.fail(); 63 } 64 }); 65 66 worker.on('message', function(msg) { 67 if (msg.listening) { 68 listening += 1; 69 70 if (listening === listeners) { 71 // All child process are listening, so start sending. 72 sendSocket.sendNext(); 73 } 74 return; 75 } 76 if (msg.message) { 77 worker.messagesReceived.push(msg.message); 78 79 if (worker.messagesReceived.length === messages.length) { 80 done += 1; 81 worker.isDone = true; 82 console.error('[PARENT] %d received %d messages total.', 83 worker.pid, 84 worker.messagesReceived.length); 85 } 86 87 if (done === listeners) { 88 console.error('[PARENT] All workers have received the ' + 89 'required number of messages. Will now compare.'); 90 91 Object.keys(workers).forEach(function(pid) { 92 const worker = workers[pid]; 93 94 let count = 0; 95 96 worker.messagesReceived.forEach(function(buf) { 97 for (let i = 0; i < messages.length; ++i) { 98 if (buf.toString() === messages[i].toString()) { 99 count++; 100 break; 101 } 102 } 103 }); 104 105 console.error('[PARENT] %d received %d matching messages.', 106 worker.pid, count); 107 108 assert.strictEqual(count, messages.length); 109 }); 110 111 clearTimeout(timer); 112 console.error('[PARENT] Success'); 113 killChildren(workers); 114 } 115 } 116 }); 117} 118 119function killChildren(children) { 120 Object.keys(children).forEach(function(key) { 121 const child = children[key]; 122 child.kill(); 123 }); 124} 125 126if (process.argv[2] !== 'child') { 127 listening = 0; 128 dead = 0; 129 let i = 0; 130 done = 0; 131 132 // Exit the test if it doesn't succeed within TIMEOUT. 133 timer = setTimeout(function() { 134 console.error('[PARENT] Responses were not received within %d ms.', 135 TIMEOUT); 136 console.error('[PARENT] Fail'); 137 138 killChildren(workers); 139 140 assert.fail(); 141 }, TIMEOUT); 142 143 // Launch child processes. 144 for (let x = 0; x < listeners; x++) { 145 launchChildProcess(x); 146 } 147 148 sendSocket = dgram.createSocket('udp4'); 149 150 // The socket is actually created async now. 151 sendSocket.on('listening', function() { 152 sendSocket.setTTL(1); 153 sendSocket.setBroadcast(true); 154 sendSocket.setMulticastTTL(1); 155 sendSocket.setMulticastLoopback(true); 156 sendSocket.addSourceSpecificMembership(sourceAddress, GROUP_ADDRESS); 157 }); 158 159 sendSocket.on('close', function() { 160 console.error('[PARENT] sendSocket closed'); 161 }); 162 163 sendSocket.sendNext = function() { 164 const buf = messages[i++]; 165 166 if (!buf) { 167 try { sendSocket.close(); } catch { 168 // Continue regardless of error. 169 } 170 return; 171 } 172 173 sendSocket.send( 174 buf, 175 0, 176 buf.length, 177 common.PORT, 178 GROUP_ADDRESS, 179 function(err) { 180 assert.ifError(err); 181 console.error('[PARENT] sent "%s" to %s:%s', 182 buf.toString(), 183 GROUP_ADDRESS, common.PORT); 184 process.nextTick(sendSocket.sendNext); 185 }, 186 ); 187 }; 188} 189 190if (process.argv[2] === 'child') { 191 const receivedMessages = []; 192 const listenSocket = dgram.createSocket({ 193 type: 'udp4', 194 reuseAddr: true, 195 }); 196 197 listenSocket.on('listening', function() { 198 listenSocket.setMulticastLoopback(true); 199 listenSocket.addSourceSpecificMembership(sourceAddress, GROUP_ADDRESS); 200 201 listenSocket.on('message', function(buf, rinfo) { 202 console.error('[CHILD] %s received "%s" from %j', process.pid, 203 buf.toString(), rinfo); 204 205 receivedMessages.push(buf); 206 207 process.send({ message: buf.toString() }); 208 209 if (receivedMessages.length === messages.length) { 210 // .dropSourceSpecificMembership() not strictly needed, 211 // it is here as a sanity check. 212 listenSocket.dropSourceSpecificMembership(sourceAddress, GROUP_ADDRESS); 213 process.nextTick(function() { 214 listenSocket.close(); 215 }); 216 } 217 }); 218 219 listenSocket.on('close', function() { 220 // HACK: Wait to exit the process to ensure that the parent 221 // process has had time to receive all messages via process.send() 222 // This may be indicative of some other issue. 223 setTimeout(function() { 224 process.exit(); 225 }, common.platformTimeout(1000)); 226 }); 227 process.send({ listening: true }); 228 }); 229 230 listenSocket.bind(common.PORT); 231} 232