All files / libs/payout-calculation/compute-engine/core-statement-calculation-cache/src/statementCalculationCache stoppableCalculation.ts

0% Statements 0/94
0% Branches 0/1
0% Functions 0/1
0% Lines 0/94

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95                                                                                                                                                                                             
import { Logger } from '@nestjs/common';
import { pick, throttle } from 'lodash-es';
import { type Repository } from 'typeorm';

import { type Calculation } from '@amalia/core/models';
import { CalculationStatus } from '@amalia/core/types';

export type StoppableCalculation = Calculation & {
  // Call this function to verify if the calculation should continue. If not, it will throw.
  verifyCalculationIsOngoingThrottled: () => Promise<void>;
  verifyCalculationIsOngoingNonThrottled: () => Promise<void>;
};

export class CalculationStoppedError extends Error {}

export class StoppableCalculationMapper {
  private readonly logger = new Logger(StoppableCalculationMapper.name);

  /**
   * Add the possibility to stop the calculation in the prototype of calculation.
   *
   * Here we're using a closure to keep the reference to the calculation service, that way we can stop
   * the calculation from anywhere in the engine.
   *
   * @param calculation
   * @param calculationRepository
   */
  public calculationToStoppableCalculation = (
    calculation: Calculation,
    calculationRepository: Repository<Calculation>,
  ): StoppableCalculation => {
    const isCalculationStillGoing = async () => {
      try {
        // Fetching the calculation to get its current status.
        const { status } = await calculationRepository.findOneOrFail({ where: { id: calculation.id } });

        // Stop evaluation if another batch crashed, but don't change the status.
        if (status === CalculationStatus.ERROR) {
          return false;
        }

        // If the user asked for a stop, update the status and throw the error.
        // Also, we're stopping evaluation if another batch crashed.
        if ([CalculationStatus.STOPPED, CalculationStatus.STOPPING].includes(status)) {
          this.logger.debug({
            message: 'Changing the status of calculation to STOPPED',
            company: pick(calculation.company, ['id', 'name']),
          });
          await calculationRepository.update({ id: calculation.id }, { status: CalculationStatus.STOPPED });

          return false;
        }

        return true;
      } catch {
        // Silent the error if something crashed here.
        // This is tricky because of the throttle; it isn't really executed in the "thread" of
        // the calculation, so sometimes the Promise of the calculation resolves before the code
        // of this throttle is called.
        // In the context of unit tests, it means that the connection
        // to the database can be closed before this code runs for the last time, which means
        // it'll crash the e2e tests for no reason (and we don't want that).
        return false;
      }
    };

    // Throttling the function to only call the database once every 500ms.
    const isCalculationStillGoingThrottled = throttle(isCalculationStillGoing, 500, { leading: true });

    const verifyCalculationIsOngoingNonThrottled = async () => {
      const stillGoing = await isCalculationStillGoing();
      if (stillGoing) {
        await calculationRepository.update({ id: calculation.id }, { lastUpdatedAt: new Date() });
      } else {
        throw new CalculationStoppedError();
      }
    };

    const verifyCalculationIsOngoingThrottled = async () => {
      const stillGoing = await isCalculationStillGoingThrottled();
      if (stillGoing) {
        await calculationRepository.update({ id: calculation.id }, { lastUpdatedAt: new Date() });
      } else {
        throw new CalculationStoppedError();
      }
    };

    return {
      ...calculation,
      verifyCalculationIsOngoingNonThrottled,
      verifyCalculationIsOngoingThrottled,
    };
  };
}