Puppeteer-Cluster Not using nodeJS workers

1k views Asked by At

I'm using puppeteer-cluster in multi worker mode in nodeJS, for some reason only one worker is opening the number of concurrency browsers that I defined. The other are ignored. what am I doing wrong?

Basically i start a cluster with 2 browser concurrency for each worker so I would expect it to open number of workers (number of cpus as i defined) * 2 browsers per worker, but in reality only two browsers are opened.

For example: let's say i have 8 cores, so i am launching 8 workers, each worker will launch two puppeteer browsers, for a total of 16. instead when debugging it in headless: true mode, i see only two browsers are opened.

deps: "puppeteer": "^5.2.1", "puppeteer-cluster": "^0.22.0"

import {Cluster} from 'puppeteer-cluster';
import {ReportTimeouts} from "../../config/reports.consts";
import {isDebug} from "../../utils/env.utils";
import {IPuppeteerClusterService} from "../../interfaces/services.interfaces";

export default class PuppeteerClusterService implements IPuppeteerClusterService {

    private cluster;

    public getCluster() {
        if (!this.cluster) {
            throw new Error(`PuppeteerClusterService.getCluster: init didn't run`);
        }
        return this.cluster;
    }

    public async init() {
        const args = [
            '--no-first-run',
            '--no-zygote',
            '--no-sandbox',
            '--disable-extensions',
            '--disable-setuid-sandbox',
            '--disable-dev-shm-usage',
            '--ignore-certificate-errors',
            "--proxy-server='direct://'",
            '--proxy-bypass-list=*',
            '--lang=en-US,en'];
        const debugMode = isDebug;
        const headless = !debugMode;
        const concurrency = Cluster.CONCURRENCY_BROWSER;
        const maxConcurrency = 2;
        const cluster = await Cluster.launch({
            concurrency,
            maxConcurrency,
            puppeteerOptions: {
                headless,
                dumpio: debugMode,
                handleSIGTERM: true,
                handleSIGINT: true,
                args
            },
            monitor: false, // turn this on to get cpu / memory usages
            timeout: ReportTimeouts.PuppeteerClusterTimeout,
        });
        console.log(`PuppeteerClusterService.init: initialized puppeteer cluster with concurrency type ${concurrency} and max concurrency of ${maxConcurrency}`);
        console.log('PuppeteerClusterService.init: running headless?: ', headless);
        this.cluster = cluster;
    }
}
import './src/services/monitoring/tracer';
import {config} from 'dotenv';
import * as process from 'process';
import * as http from 'http';
import * as cluster from 'cluster';
import * as os from 'os';
import App from './app';
import initORM from './src/config/sequelize_config.handler';
import routes from './src/routes';
import {DEFAULT_PORTS, REQUEST_TIMEOUT_MINUTES} from './src/config/networking.consts';
import {IServices} from "./src/interfaces/services.interfaces";
import SystemSetting from "./src/models/system_setting.model";
import Services from "./src/services";
import {ISystemSetting} from "./src/interfaces/models/system_setting.interface";
import ActiveReportSendingScheduler from "./src/logic/scheduled_tasks/active_report_sending.scheduler";
import appConfig from './src/config';
import LoggerService from './src/services/logger.service';
import DayTaggingReportSendingScheduler from './src/logic/scheduled_tasks/day_tagging_report_sending.scheduler';
import { MessageConsumingManager } from './src/logic/messaging/message_consuming.manager';
import { isDebug } from './src/utils/env.utils';

config()
const env = process.env.NODE_ENV?.toLocaleLowerCase() || 'dev';
monitorServer(env, process);

const ports = {
    http: process.env.HTTP_PORT ? Number(process.env.HTTP_PORT) : DEFAULT_PORTS.http,
};

const workers = [];

function setupOrm() {
    const logger = LoggerService;

    console.info(`setupOrm: app initiating on env ${env}`);
    initORM({logger});
    console.info(`setupOrm: sequelize ORM initiated`);
    console.info('setupOrm: loading static tables into memory');
    console.info(`setupOrm: env params are ${JSON.stringify(process.env)}`);
}

const setupWorkerProcesses = (services:IServices) => {
    const numCores = os.cpus().length;
    services.logger.info('setupWorkerProcesses: master cluster setting up ' + numCores + ' workers');

    // iterate on number of cores need to be utilized by an application
    // current example will utilize all of them
    for(let i = 0; i < numCores; i++) {
        // creating workers and pushing reference in an array
        // these references can be used to receive messages from workers
        workers.push(cluster.fork());

        // to receive messages from worker process
        workers[i].on('message', function(message) {
            services.logger.info(message);
        });
    }

    // process is clustered on a core and process id is assigned
    cluster.on('online', function(worker) {
        services.logger.info('setupWorkerProcesses: worker ' + worker.process.pid + ' is listening');
    });

    // if any of the worker process dies then start a new one by simply forking another one
    cluster.on('exit', function(worker, code, signal) {
        services.logger.info('setupWorkerProcesses: worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
        services.logger.info('setupWorkerProcesses: starting a new worker');
        cluster.fork();
        workers.push(cluster.fork());
        // to receive messages from worker process
        workers[workers.length-1].on('message', function(message) {
            services.logger.info(message);
        });
    });
};

async function setApp(services:IServices, systemSettings: ISystemSetting[]) {
  const app = await App.init({routes, services, systemSettings, env});
  services.logger.info(`setApp: app initiated on env ${env}`);
  services.logger.info(`setApp: app initiated with config ${JSON.stringify(appConfig)}`);
  services.logger.info(`setApp: routes: ${Object.keys(routes).join(' | ')}`);
  services.logger.info(`setApp: ports: ${JSON.stringify(ports)}`);
  services.logger.info(`setApp: database connection gained`);

  const server = http.createServer(app)
    .listen(
      ports.http,
      async () => {
        services.logger.info(`setApp: HTTP Server successfully started at port ${ports.http}`);
      }
    );
    server.keepAliveTimeout = REQUEST_TIMEOUT_MINUTES * 60 * 1000; // Time (in ms) server will wait and keep the connection open after last response.
    server.headersTimeout = (REQUEST_TIMEOUT_MINUTES * 60 * 1000) + 1000; // https://github.com/nodejs/node/issues/27363#issuecomment-603489130
}

function setSchedulers(services:IServices, systemSettings: ISystemSetting[]) {
    const schedulers = [
        new ActiveReportSendingScheduler(services,systemSettings),
        new DayTaggingReportSendingScheduler(services,systemSettings),
    ];
    schedulers.forEach(s=>s.init());
    services.logger.info(`setSchedulers: schedulers initiated on env ${env}`);
}

function setMessageConsumers(services:IServices, systemSettings: ISystemSetting[]){
    new MessageConsumingManager(services, systemSettings).initialize();
    services.logger.info(`setMessageConsumers: message consumers initiated on env ${env}`);
}
/**
 * Setup server either with clustering or without it
 * @param isClusterRequired
 */
const setupServer = async () => {
    console.info(`setupServer: initating app in multiprocess mode`);
    setupOrm();

    const systemSettings = await SystemSetting.findAll().then((settings) => settings.filter((s:SystemSetting) => !s.env || s.env.includes(env) || env.includes(s.env)));
    const services: IServices = new Services(systemSettings, process.env);
    await services.init();
    services.logger.info(`setupServer: initiated app in multi process mode`);

    if (cluster.isMaster) {
        setupWorkerProcesses(services);
        setSchedulers(services, systemSettings);
        setMessageConsumers(services, systemSettings);
    } else {
        await setApp(services, systemSettings);
    }

};

function monitorServer(env:string, proc: NodeJS.Process){
    if (env !== 'production'){
        console.warn(`monitorServer - not production so no monitoring.`);
        return;
    }

    if (!process.env.NEW_RELIC_KEY){
        console.warn(`monitorServer - NEW_RELIC_KEY not provided, not loading.`);
        return;
    }
    if (!process.env.APP_NAME){
        console.warn(`monitorServer - APP_NAME not provided, not loading.`);
        return;
    }

    const newRelic = require('newrelic');
    console.info(`monitorServer - newrelic loaded: ${typeof newRelic === 'object'}`);
}

setupServer();

import {
    IAnalyticsService,
    ICacheService,
    ICycleTaggingService,
    IEmailSendingService,
    IFileUploader,
    ILogger,
    IMonitoringService,
    IPowerBIAuthService,
    IPowerBIService,
    IPuppeteerClusterService,
    IReportMonitoringServiceFactory,
    IServices,
    ISiteService,
    IUserService
} from "../interfaces/services.interfaces";
import {ISystemSetting} from "../interfaces/models/system_setting.interface";
import CacheService from "./cache.service";
import LoggerService from './logger.service';
import {S3FileUploader} from "./persistence/s3_file.uploader";
import {EmailSendingService} from "./sendouts/email_sending.service";
import {AdminUserService} from "./external_models/user.service";
import {ModerationService} from "./sendouts/moderation.service";
import {PowerBIAuthService} from "./power_bi/power_bi_auth.service";
import {PowerBIService} from "./power_bi/power_bi.service";
import {AdminSiteService} from "./external_models/site.service";
import {CycleTaggingService} from "./cycle_tagging.service";
import MonitoringService from "./monitoring/monitoring.service";
import ReportMonitoringServiceFactory from "./monitoring/report_monitoring.service.factory";
import {AutoSendoutCalculatorFactory} from "../logic/sendout/auto_sendout_calculator.factory";
import PuppeteerClusterService from "./screenshots/puppeteer_cluster.service";
import {RedisFactory} from "./redis.factory";
import {MutexFactory} from "./mutex.factory";
import {IMutexFactory} from "../interfaces/general.interfaces";
import {AnalyticsService} from "./analytics/analytics.service";
import {DummyAnalyticsService} from "./analytics/dummy.service";
import {IAutoSendoutCalculatorFactory} from "../interfaces/sendouts.interface";

export default class Services implements IServices {
  puppeteerClusterService: IPuppeteerClusterService;

  constructor(systemSettings: ISystemSetting[], processEnv: Record<string, string | undefined>){
    this.puppeteerClusterService = new PuppeteerClusterService();
  }

  public async init(): Promise<void> {
      await this.puppeteerClusterService.init();
  }

}

0

There are 0 answers