11cb0ef41Sopenharmony_ci// Flags: --expose-internals 21cb0ef41Sopenharmony_ci'use strict'; 31cb0ef41Sopenharmony_ciconst common = require('../common'); 41cb0ef41Sopenharmony_ciif (common.isWindows) 51cb0ef41Sopenharmony_ci common.skip('dgram clustering is currently not supported on Windows.'); 61cb0ef41Sopenharmony_ci 71cb0ef41Sopenharmony_ciconst NUM_WORKERS = 4; 81cb0ef41Sopenharmony_ciconst PACKETS_PER_WORKER = 10; 91cb0ef41Sopenharmony_ci 101cb0ef41Sopenharmony_ciconst assert = require('assert'); 111cb0ef41Sopenharmony_ciconst cluster = require('cluster'); 121cb0ef41Sopenharmony_ciconst dgram = require('dgram'); 131cb0ef41Sopenharmony_ci 141cb0ef41Sopenharmony_ciif (cluster.isPrimary) 151cb0ef41Sopenharmony_ci primary(); 161cb0ef41Sopenharmony_cielse 171cb0ef41Sopenharmony_ci worker(); 181cb0ef41Sopenharmony_ci 191cb0ef41Sopenharmony_ci 201cb0ef41Sopenharmony_cifunction primary() { 211cb0ef41Sopenharmony_ci const { internalBinding } = require('internal/test/binding'); 221cb0ef41Sopenharmony_ci const { UDP } = internalBinding('udp_wrap'); 231cb0ef41Sopenharmony_ci 241cb0ef41Sopenharmony_ci // Create a handle and use its fd. 251cb0ef41Sopenharmony_ci const rawHandle = new UDP(); 261cb0ef41Sopenharmony_ci const err = rawHandle.bind(common.localhostIPv4, 0, 0); 271cb0ef41Sopenharmony_ci assert(err >= 0, String(err)); 281cb0ef41Sopenharmony_ci assert.notStrictEqual(rawHandle.fd, -1); 291cb0ef41Sopenharmony_ci 301cb0ef41Sopenharmony_ci const fd = rawHandle.fd; 311cb0ef41Sopenharmony_ci 321cb0ef41Sopenharmony_ci let listening = 0; 331cb0ef41Sopenharmony_ci 341cb0ef41Sopenharmony_ci // Fork 4 workers. 351cb0ef41Sopenharmony_ci for (let i = 0; i < NUM_WORKERS; i++) 361cb0ef41Sopenharmony_ci cluster.fork(); 371cb0ef41Sopenharmony_ci 381cb0ef41Sopenharmony_ci // Wait until all workers are listening. 391cb0ef41Sopenharmony_ci cluster.on('listening', common.mustCall((worker, address) => { 401cb0ef41Sopenharmony_ci if (++listening < NUM_WORKERS) 411cb0ef41Sopenharmony_ci return; 421cb0ef41Sopenharmony_ci 431cb0ef41Sopenharmony_ci // Start sending messages. 441cb0ef41Sopenharmony_ci const buf = Buffer.from('hello world'); 451cb0ef41Sopenharmony_ci const socket = dgram.createSocket('udp4'); 461cb0ef41Sopenharmony_ci let sent = 0; 471cb0ef41Sopenharmony_ci doSend(); 481cb0ef41Sopenharmony_ci 491cb0ef41Sopenharmony_ci function doSend() { 501cb0ef41Sopenharmony_ci socket.send(buf, 0, buf.length, address.port, address.address, afterSend); 511cb0ef41Sopenharmony_ci } 521cb0ef41Sopenharmony_ci 531cb0ef41Sopenharmony_ci function afterSend() { 541cb0ef41Sopenharmony_ci sent++; 551cb0ef41Sopenharmony_ci if (sent < NUM_WORKERS * PACKETS_PER_WORKER) { 561cb0ef41Sopenharmony_ci doSend(); 571cb0ef41Sopenharmony_ci } else { 581cb0ef41Sopenharmony_ci socket.close(); 591cb0ef41Sopenharmony_ci } 601cb0ef41Sopenharmony_ci } 611cb0ef41Sopenharmony_ci }, NUM_WORKERS)); 621cb0ef41Sopenharmony_ci 631cb0ef41Sopenharmony_ci // Set up event handlers for every worker. Each worker sends a message when 641cb0ef41Sopenharmony_ci // it has received the expected number of packets. After that it disconnects. 651cb0ef41Sopenharmony_ci for (const key in cluster.workers) { 661cb0ef41Sopenharmony_ci if (Object.hasOwn(cluster.workers, key)) 671cb0ef41Sopenharmony_ci setupWorker(cluster.workers[key]); 681cb0ef41Sopenharmony_ci } 691cb0ef41Sopenharmony_ci 701cb0ef41Sopenharmony_ci function setupWorker(worker) { 711cb0ef41Sopenharmony_ci let received = 0; 721cb0ef41Sopenharmony_ci 731cb0ef41Sopenharmony_ci worker.send({ 741cb0ef41Sopenharmony_ci fd, 751cb0ef41Sopenharmony_ci }); 761cb0ef41Sopenharmony_ci 771cb0ef41Sopenharmony_ci worker.on('message', common.mustCall((msg) => { 781cb0ef41Sopenharmony_ci received = msg.received; 791cb0ef41Sopenharmony_ci worker.disconnect(); 801cb0ef41Sopenharmony_ci })); 811cb0ef41Sopenharmony_ci 821cb0ef41Sopenharmony_ci worker.on('exit', common.mustCall(() => { 831cb0ef41Sopenharmony_ci assert.strictEqual(received, PACKETS_PER_WORKER); 841cb0ef41Sopenharmony_ci })); 851cb0ef41Sopenharmony_ci } 861cb0ef41Sopenharmony_ci} 871cb0ef41Sopenharmony_ci 881cb0ef41Sopenharmony_ci 891cb0ef41Sopenharmony_cifunction worker() { 901cb0ef41Sopenharmony_ci let received = 0; 911cb0ef41Sopenharmony_ci 921cb0ef41Sopenharmony_ci process.on('message', common.mustCall((data) => { 931cb0ef41Sopenharmony_ci const { fd } = data; 941cb0ef41Sopenharmony_ci // Create udp socket and start listening. 951cb0ef41Sopenharmony_ci const socket = dgram.createSocket('udp4'); 961cb0ef41Sopenharmony_ci 971cb0ef41Sopenharmony_ci socket.on('message', common.mustCall((data, info) => { 981cb0ef41Sopenharmony_ci received++; 991cb0ef41Sopenharmony_ci 1001cb0ef41Sopenharmony_ci // Every 10 messages, notify the primary. 1011cb0ef41Sopenharmony_ci if (received === PACKETS_PER_WORKER) { 1021cb0ef41Sopenharmony_ci process.send({ received }); 1031cb0ef41Sopenharmony_ci socket.close(); 1041cb0ef41Sopenharmony_ci } 1051cb0ef41Sopenharmony_ci }, PACKETS_PER_WORKER)); 1061cb0ef41Sopenharmony_ci 1071cb0ef41Sopenharmony_ci socket.bind({ 1081cb0ef41Sopenharmony_ci fd, 1091cb0ef41Sopenharmony_ci }); 1101cb0ef41Sopenharmony_ci })); 1111cb0ef41Sopenharmony_ci} 112