11cb0ef41Sopenharmony_ci// Flags: --expose-internals
21cb0ef41Sopenharmony_ci'use strict';
31cb0ef41Sopenharmony_ciconst common = require('../common');
41cb0ef41Sopenharmony_ciif (common.isWindows)
51cb0ef41Sopenharmony_ci  common.skip('dgram clustering is currently not supported on Windows.');
61cb0ef41Sopenharmony_ci
71cb0ef41Sopenharmony_ciconst NUM_WORKERS = 4;
81cb0ef41Sopenharmony_ciconst PACKETS_PER_WORKER = 10;
91cb0ef41Sopenharmony_ci
101cb0ef41Sopenharmony_ciconst assert = require('assert');
111cb0ef41Sopenharmony_ciconst cluster = require('cluster');
121cb0ef41Sopenharmony_ciconst dgram = require('dgram');
131cb0ef41Sopenharmony_ci
141cb0ef41Sopenharmony_ciif (cluster.isPrimary)
151cb0ef41Sopenharmony_ci  primary();
161cb0ef41Sopenharmony_cielse
171cb0ef41Sopenharmony_ci  worker();
181cb0ef41Sopenharmony_ci
191cb0ef41Sopenharmony_ci
201cb0ef41Sopenharmony_cifunction primary() {
211cb0ef41Sopenharmony_ci  const { internalBinding } = require('internal/test/binding');
221cb0ef41Sopenharmony_ci  const { UDP } = internalBinding('udp_wrap');
231cb0ef41Sopenharmony_ci
241cb0ef41Sopenharmony_ci  // Create a handle and use its fd.
251cb0ef41Sopenharmony_ci  const rawHandle = new UDP();
261cb0ef41Sopenharmony_ci  const err = rawHandle.bind(common.localhostIPv4, 0, 0);
271cb0ef41Sopenharmony_ci  assert(err >= 0, String(err));
281cb0ef41Sopenharmony_ci  assert.notStrictEqual(rawHandle.fd, -1);
291cb0ef41Sopenharmony_ci
301cb0ef41Sopenharmony_ci  const fd = rawHandle.fd;
311cb0ef41Sopenharmony_ci
321cb0ef41Sopenharmony_ci  let listening = 0;
331cb0ef41Sopenharmony_ci
341cb0ef41Sopenharmony_ci  // Fork 4 workers.
351cb0ef41Sopenharmony_ci  for (let i = 0; i < NUM_WORKERS; i++)
361cb0ef41Sopenharmony_ci    cluster.fork();
371cb0ef41Sopenharmony_ci
381cb0ef41Sopenharmony_ci  // Wait until all workers are listening.
391cb0ef41Sopenharmony_ci  cluster.on('listening', common.mustCall((worker, address) => {
401cb0ef41Sopenharmony_ci    if (++listening < NUM_WORKERS)
411cb0ef41Sopenharmony_ci      return;
421cb0ef41Sopenharmony_ci
431cb0ef41Sopenharmony_ci    // Start sending messages.
441cb0ef41Sopenharmony_ci    const buf = Buffer.from('hello world');
451cb0ef41Sopenharmony_ci    const socket = dgram.createSocket('udp4');
461cb0ef41Sopenharmony_ci    let sent = 0;
471cb0ef41Sopenharmony_ci    doSend();
481cb0ef41Sopenharmony_ci
491cb0ef41Sopenharmony_ci    function doSend() {
501cb0ef41Sopenharmony_ci      socket.send(buf, 0, buf.length, address.port, address.address, afterSend);
511cb0ef41Sopenharmony_ci    }
521cb0ef41Sopenharmony_ci
531cb0ef41Sopenharmony_ci    function afterSend() {
541cb0ef41Sopenharmony_ci      sent++;
551cb0ef41Sopenharmony_ci      if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
561cb0ef41Sopenharmony_ci        doSend();
571cb0ef41Sopenharmony_ci      } else {
581cb0ef41Sopenharmony_ci        socket.close();
591cb0ef41Sopenharmony_ci      }
601cb0ef41Sopenharmony_ci    }
611cb0ef41Sopenharmony_ci  }, NUM_WORKERS));
621cb0ef41Sopenharmony_ci
631cb0ef41Sopenharmony_ci  // Set up event handlers for every worker. Each worker sends a message when
641cb0ef41Sopenharmony_ci  // it has received the expected number of packets. After that it disconnects.
651cb0ef41Sopenharmony_ci  for (const key in cluster.workers) {
661cb0ef41Sopenharmony_ci    if (Object.hasOwn(cluster.workers, key))
671cb0ef41Sopenharmony_ci      setupWorker(cluster.workers[key]);
681cb0ef41Sopenharmony_ci  }
691cb0ef41Sopenharmony_ci
701cb0ef41Sopenharmony_ci  function setupWorker(worker) {
711cb0ef41Sopenharmony_ci    let received = 0;
721cb0ef41Sopenharmony_ci
731cb0ef41Sopenharmony_ci    worker.send({
741cb0ef41Sopenharmony_ci      fd,
751cb0ef41Sopenharmony_ci    });
761cb0ef41Sopenharmony_ci
771cb0ef41Sopenharmony_ci    worker.on('message', common.mustCall((msg) => {
781cb0ef41Sopenharmony_ci      received = msg.received;
791cb0ef41Sopenharmony_ci      worker.disconnect();
801cb0ef41Sopenharmony_ci    }));
811cb0ef41Sopenharmony_ci
821cb0ef41Sopenharmony_ci    worker.on('exit', common.mustCall(() => {
831cb0ef41Sopenharmony_ci      assert.strictEqual(received, PACKETS_PER_WORKER);
841cb0ef41Sopenharmony_ci    }));
851cb0ef41Sopenharmony_ci  }
861cb0ef41Sopenharmony_ci}
871cb0ef41Sopenharmony_ci
881cb0ef41Sopenharmony_ci
891cb0ef41Sopenharmony_cifunction worker() {
901cb0ef41Sopenharmony_ci  let received = 0;
911cb0ef41Sopenharmony_ci
921cb0ef41Sopenharmony_ci  process.on('message', common.mustCall((data) => {
931cb0ef41Sopenharmony_ci    const { fd } = data;
941cb0ef41Sopenharmony_ci    // Create udp socket and start listening.
951cb0ef41Sopenharmony_ci    const socket = dgram.createSocket('udp4');
961cb0ef41Sopenharmony_ci
971cb0ef41Sopenharmony_ci    socket.on('message', common.mustCall((data, info) => {
981cb0ef41Sopenharmony_ci      received++;
991cb0ef41Sopenharmony_ci
1001cb0ef41Sopenharmony_ci      // Every 10 messages, notify the primary.
1011cb0ef41Sopenharmony_ci      if (received === PACKETS_PER_WORKER) {
1021cb0ef41Sopenharmony_ci        process.send({ received });
1031cb0ef41Sopenharmony_ci        socket.close();
1041cb0ef41Sopenharmony_ci      }
1051cb0ef41Sopenharmony_ci    }, PACKETS_PER_WORKER));
1061cb0ef41Sopenharmony_ci
1071cb0ef41Sopenharmony_ci    socket.bind({
1081cb0ef41Sopenharmony_ci      fd,
1091cb0ef41Sopenharmony_ci    });
1101cb0ef41Sopenharmony_ci  }));
1111cb0ef41Sopenharmony_ci}
112