Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x | import { forwardRef as CircularDependency, Inject, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { omit } from 'lodash-es';
import { Repository } from 'typeorm';
import { Calculation, Company } from '@amalia/core/models';
import { CalculationStatus, CalculationType, ScheduleCalculationEvent } from '@amalia/core/types';
import { type Typeof } from '@amalia/ext/typescript';
import { MessageTasks, MessageType, QueueService, type CalculationPayload } from '@amalia/kernel/queue/core';
import { CreateCalculationUseCase } from './create-calculation/create-calculation.use-case';
/**
* This use-case serves multiple purposes:
* - If the step is 0, then it triggers the first computation step
* - If called at the end of a step, it either
* - Triggers the next step
* - Starts a forecast calculation if needed
* - Or mark the computation as finished
*/
export class TriggerStepUseCase {
private readonly logger = new Logger(TriggerStepUseCase.name);
public constructor(
@InjectRepository(Calculation)
private readonly calculationRepository: Repository<Calculation>,
private readonly queueService: QueueService,
@Inject(CircularDependency(() => CreateCalculationUseCase))
private readonly createCalculationUseCase: Typeof<CreateCalculationUseCase>,
) {}
public async execute(company: Company, calculation: Calculation, nextStep: number) {
const step = calculation.descriptor.at(nextStep);
// If there is a next step, trigger the pubSub.
if (step) {
await Promise.all(
step.batches.map((_, index) =>
this.queueService.sendToQueue(
{
type: MessageType.TASK,
taskIdentifier: MessageTasks.EXECUTE_CALCULATIONS,
},
{
calculationId: calculation.id,
companyId: company.id,
step: nextStep,
batch: index,
} as CalculationPayload,
),
),
);
} else {
// Mark the calculation as finished.
const status = this.extractCalculationStatusFromBatches(calculation);
this.logger.debug({
message: `Calculation ${calculation.id} finished, setting status ${status}`,
calculation: omit(calculation, 'descriptor'),
status,
});
await this.calculationRepository.update(
{ company: { id: company.id }, id: calculation.id },
{ status, finishedAt: new Date() },
);
// Do check if we should start a forecasted calculation after the current step.
if (this.shouldStartForecastedCalculation(company, calculation, status)) {
const scheduleForecastEvent: ScheduleCalculationEvent = {
companyId: company.id,
options: {
withRollingPeriods: calculation.withRollingPeriods,
},
query: {
...calculation.query,
// Skip refreshing objects during forecast computation as they are already up to date.
dataConnectorObjectsNames: null,
type: CalculationType.FORECAST,
},
triggeredFromCalculationId: calculation.id,
};
await this.createCalculationUseCase.execute({ calculationEvent: scheduleForecastEvent });
}
}
}
/**
* Check if we should start a forecasted calculation.
*
* We'll do common checks here, such as if the company has the feature flag enabled,
* if the previous calculation was a statement, or if it was successful, etc...
*/
private shouldStartForecastedCalculation(company: Company, calculation: Calculation, status: CalculationStatus) {
const isForecastFeatureEnabled = !!company.featureFlags.FORECAST;
const isPreviousCalculationAStatement = calculation.type === CalculationType.STATEMENT;
const isPreviousCalculationSuccessful = ![
CalculationStatus.ERROR,
CalculationStatus.STOPPED,
CalculationStatus.STOPPING,
].includes(status);
return isForecastFeatureEnabled && isPreviousCalculationAStatement && isPreviousCalculationSuccessful;
}
private extractCalculationStatusFromBatches(calculation: Calculation): CalculationStatus {
const allStatuses = new Set(calculation.descriptor.flatMap((s) => s.batches.map((b) => b.status)));
switch (true) {
case allStatuses.has(CalculationStatus.ERROR):
return CalculationStatus.ERROR;
case allStatuses.has(CalculationStatus.STOPPED):
case allStatuses.has(CalculationStatus.STOPPING):
return CalculationStatus.STOPPED;
case allStatuses.has(CalculationStatus.INCOMPLETE):
return CalculationStatus.INCOMPLETE;
default:
return CalculationStatus.SUCCESS;
}
}
}
|