Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | import { Logger } from '@nestjs/common'; import { pick, throttle } from 'lodash-es'; import { type Repository } from 'typeorm'; import { type Calculation } from '@amalia/core/models'; import { CalculationStatus } from '@amalia/core/types'; export type StoppableCalculation = Calculation & { // Call this function to verify if the calculation should continue. If not, it will throw. verifyCalculationIsOngoingThrottled: () => Promise<void>; verifyCalculationIsOngoingNonThrottled: () => Promise<void>; }; export class CalculationStoppedError extends Error {} export class StoppableCalculationMapper { private readonly logger = new Logger(StoppableCalculationMapper.name); /** * Add the possibility to stop the calculation in the prototype of calculation. * * Here we're using a closure to keep the reference to the calculation service, that way we can stop * the calculation from anywhere in the engine. * * @param calculation * @param calculationRepository */ public calculationToStoppableCalculation = ( calculation: Calculation, calculationRepository: Repository<Calculation>, ): StoppableCalculation => { const isCalculationStillGoing = async () => { try { // Fetching the calculation to get its current status. const { status } = await calculationRepository.findOneOrFail({ where: { id: calculation.id } }); // Stop evaluation if another batch crashed, but don't change the status. if (status === CalculationStatus.ERROR) { return false; } // If the user asked for a stop, update the status and throw the error. // Also, we're stopping evaluation if another batch crashed. if ([CalculationStatus.STOPPED, CalculationStatus.STOPPING].includes(status)) { this.logger.debug({ message: 'Changing the status of calculation to STOPPED', company: pick(calculation.company, ['id', 'name']), }); await calculationRepository.update({ id: calculation.id }, { status: CalculationStatus.STOPPED }); return false; } return true; } catch { // Silent the error if something crashed here. // This is tricky because of the throttle; it isn't really executed in the "thread" of // the calculation, so sometimes the Promise of the calculation resolves before the code // of this throttle is called. // In the context of unit tests, it means that the connection // to the database can be closed before this code runs for the last time, which means // it'll crash the e2e tests for no reason (and we don't want that). return false; } }; // Throttling the function to only call the database once every 500ms. const isCalculationStillGoingThrottled = throttle(isCalculationStillGoing, 500, { leading: true }); const verifyCalculationIsOngoingNonThrottled = async () => { const stillGoing = await isCalculationStillGoing(); if (stillGoing) { await calculationRepository.update({ id: calculation.id }, { lastUpdatedAt: new Date() }); } else { throw new CalculationStoppedError(); } }; const verifyCalculationIsOngoingThrottled = async () => { const stillGoing = await isCalculationStillGoingThrottled(); if (stillGoing) { await calculationRepository.update({ id: calculation.id }, { lastUpdatedAt: new Date() }); } else { throw new CalculationStoppedError(); } }; return { ...calculation, verifyCalculationIsOngoingNonThrottled, verifyCalculationIsOngoingThrottled, }; }; } |