All files / libs/payout-calculation/compute-engine/core-lifecycle/src/lifecycle/usecases trigger-step.use-case.ts

31.09% Statements 37/119
0% Branches 0/2
0% Functions 0/4
31.09% Lines 37/119

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 1201x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x             1x 1x                                                                                                           1x 1x 1x 1x 1x 1x 1x 1x                   1x 1x                             1x  
import { forwardRef as CircularDependency, Inject, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { omit } from 'lodash-es';
import { Repository } from 'typeorm';
 
import { Calculation, Company } from '@amalia/core/models';
import { CalculationStatus, CalculationType, ScheduleCalculationEvent } from '@amalia/core/types';
import { type Typeof } from '@amalia/ext/typescript';
import { MessageTasks, MessageType, QueueService, type CalculationPayload } from '@amalia/kernel/queue/core';
 
import { CreateCalculationUseCase } from './create-calculation/create-calculation.use-case';
 
/**
 * This use-case serves multiple purposes:
 *  - If the step is 0, then it triggers the first computation step
 *  - If called at the end of a step, it either
 *    - Triggers the next step
 *    - Starts a forecast calculation if needed
 *    - Or mark the computation as finished
 */
export class TriggerStepUseCase {
  private readonly logger = new Logger(TriggerStepUseCase.name);
 
  public constructor(
    @InjectRepository(Calculation)
    private readonly calculationRepository: Repository<Calculation>,
    private readonly queueService: QueueService,
    @Inject(CircularDependency(() => CreateCalculationUseCase))
    private readonly createCalculationUseCase: Typeof<CreateCalculationUseCase>,
  ) {}
 
  public async execute(company: Company, calculation: Calculation, nextStep: number) {
    const step = calculation.descriptor.at(nextStep);

    // If there is a next step, trigger the pubSub.
    if (step) {
      await Promise.all(
        step.batches.map((_, index) =>
          this.queueService.sendToQueue(
            {
              type: MessageType.TASK,
              taskIdentifier: MessageTasks.EXECUTE_CALCULATIONS,
            },
            {
              calculationId: calculation.id,
              companyId: company.id,
              step: nextStep,
              batch: index,
            } as CalculationPayload,
          ),
        ),
      );
    } else {
      // Mark the calculation as finished.
      const status = this.extractCalculationStatusFromBatches(calculation);

      this.logger.debug({
        message: `Calculation ${calculation.id} finished, setting status ${status}`,
        calculation: omit(calculation, 'descriptor'),
        status,
      });
      await this.calculationRepository.update(
        { company: { id: company.id }, id: calculation.id },
        { status, finishedAt: new Date() },
      );

      // Do check if we should start a forecasted calculation after the current step.
      if (this.shouldStartForecastedCalculation(company, calculation, status)) {
        const scheduleForecastEvent: ScheduleCalculationEvent = {
          companyId: company.id,
          options: {
            withRollingPeriods: calculation.withRollingPeriods,
          },
          query: {
            ...calculation.query,
            // Skip refreshing objects during forecast computation as they are already up to date.
            dataConnectorObjectsNames: null,
            type: CalculationType.FORECAST,
          },
          triggeredFromCalculationId: calculation.id,
        };
        await this.createCalculationUseCase.execute({ calculationEvent: scheduleForecastEvent });
      }
    }
  }
 
  /**
   * Check if we should start a forecasted calculation.
   *
   * We'll do common checks here, such as if the company has the feature flag enabled,
   * if the previous calculation was a statement, or if it was successful, etc...
   */
  private shouldStartForecastedCalculation(company: Company, calculation: Calculation, status: CalculationStatus) {
    const isForecastFeatureEnabled = !!company.featureFlags.FORECAST;
    const isPreviousCalculationAStatement = calculation.type === CalculationType.STATEMENT;
    const isPreviousCalculationSuccessful = ![
      CalculationStatus.ERROR,
      CalculationStatus.STOPPED,
      CalculationStatus.STOPPING,
    ].includes(status);
    return isForecastFeatureEnabled && isPreviousCalculationAStatement && isPreviousCalculationSuccessful;
  }
 
  private extractCalculationStatusFromBatches(calculation: Calculation): CalculationStatus {
    const allStatuses = new Set(calculation.descriptor.flatMap((s) => s.batches.map((b) => b.status)));

    switch (true) {
      case allStatuses.has(CalculationStatus.ERROR):
        return CalculationStatus.ERROR;
      case allStatuses.has(CalculationStatus.STOPPED):
      case allStatuses.has(CalculationStatus.STOPPING):
        return CalculationStatus.STOPPED;
      case allStatuses.has(CalculationStatus.INCOMPLETE):
        return CalculationStatus.INCOMPLETE;
      default:
        return CalculationStatus.SUCCESS;
    }
  }
}