All files / libs/data-capture/nightly-scheduler/core/src/lib nightly-scheduler.use-case.ts

0% Statements 0/83
0% Branches 0/1
0% Functions 0/1
0% Lines 0/83

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84                                                                                                                                                                       
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { validateCronExpression } from 'cron';
import { Not, Repository } from 'typeorm';

import { Company } from '@amalia/core/models';
import { IndexationType } from '@amalia/core/types';
import { assert } from '@amalia/ext/typescript';
import { MessageTasks, MessageType, QueueService, ScheduleRefreshmentPayload } from '@amalia/kernel/queue/core';
import { CompanyStatus } from '@amalia/tenants/companies/types';

import { getCurrentBatchCompanies } from './cron/cron';

@Injectable()
export class NightlySchedulerUseCase {
  protected readonly logger = new Logger(NightlySchedulerUseCase.name);

  public constructor(
    @InjectRepository(Company)
    private readonly companyRepository: Repository<Company>,
    private readonly queueService: QueueService,
  ) {}

  /**
   * Gets all companies with batchNumber not equal to -1 and orders them by batchNumber.
   * Then schedules a data refreshment for each company.
   */
  public async getCompaniesAndScheduleRefreshment(cronPattern?: string, date?: string): Promise<Company[]> {
    assert(cronPattern, 'cron pattern is required');
    const { error } = validateCronExpression(cronPattern);
    assert(!error, `cron pattern is invalid: ${error?.message}`);

    // Query companies where batchNumber is not -1, ordered by batchNumber
    const companies = await this.companyRepository.find({
      where: {
        batchNumber: Not(-1),
        status: CompanyStatus.ACTIVE,
      },
      order: {
        batchNumber: 'ASC',
        slug: 'ASC',
      },
    });

    const {
      companies: companiesInTheCurrentBatch,
      position,
      count,
    } = getCurrentBatchCompanies(cronPattern, date ? new Date(date) : new Date(), companies);

    this.logger.log(
      `Executing batch ${position + 1} of ${count} for cron pattern ${cronPattern}, will run on companies ${companiesInTheCurrentBatch.map((c) => c.slug).join(', ')}`,
    );

    if (position < 0 || position >= count) {
      throw new Error(`Invalid position ${position}, should be contained between 0 and ${count - 1} (date is ${date})`);
    }

    // Schedule refreshment for each company
    for (const company of companiesInTheCurrentBatch) {
      await this.queueService.sendToQueue(
        {
          type: MessageType.TASK,
          taskIdentifier: MessageTasks.SCHEDULE_REFRESHMENT,
        },
        {
          companyId: company.id,
          connectorId: null,
          options: {
            willLaunchCalculation: true,
            willPrune: true,
            indexationType: IndexationType.PARTIAL,
            calculationsOptions: { withRollingPeriods: true },
            dataConnectorObjectsNames: null,
            startOffset: 0,
          },
        } satisfies ScheduleRefreshmentPayload,
      );
    }

    return companies;
  }
}