Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | import { Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { validateCronExpression } from 'cron'; import { Not, Repository } from 'typeorm'; import { Company } from '@amalia/core/models'; import { IndexationType } from '@amalia/core/types'; import { assert } from '@amalia/ext/typescript'; import { MessageTasks, MessageType, QueueService, ScheduleRefreshmentPayload } from '@amalia/kernel/queue/core'; import { CompanyStatus } from '@amalia/tenants/companies/types'; import { getCurrentBatchCompanies } from './cron/cron'; @Injectable() export class NightlySchedulerUseCase { protected readonly logger = new Logger(NightlySchedulerUseCase.name); public constructor( @InjectRepository(Company) private readonly companyRepository: Repository<Company>, private readonly queueService: QueueService, ) {} /** * Gets all companies with batchNumber not equal to -1 and orders them by batchNumber. * Then schedules a data refreshment for each company. */ public async getCompaniesAndScheduleRefreshment(cronPattern?: string, date?: string): Promise<Company[]> { assert(cronPattern, 'cron pattern is required'); const { error } = validateCronExpression(cronPattern); assert(!error, `cron pattern is invalid: ${error?.message}`); // Query companies where batchNumber is not -1, ordered by batchNumber const companies = await this.companyRepository.find({ where: { batchNumber: Not(-1), status: CompanyStatus.ACTIVE, }, order: { batchNumber: 'ASC', slug: 'ASC', }, }); const { companies: companiesInTheCurrentBatch, position, count, } = getCurrentBatchCompanies(cronPattern, date ? new Date(date) : new Date(), companies); this.logger.log( `Executing batch ${position + 1} of ${count} for cron pattern ${cronPattern}, will run on companies ${companiesInTheCurrentBatch.map((c) => c.slug).join(', ')}`, ); if (position < 0 || position >= count) { throw new Error(`Invalid position ${position}, should be contained between 0 and ${count - 1} (date is ${date})`); } // Schedule refreshment for each company for (const company of companiesInTheCurrentBatch) { await this.queueService.sendToQueue( { type: MessageType.TASK, taskIdentifier: MessageTasks.SCHEDULE_REFRESHMENT, }, { companyId: company.id, connectorId: null, options: { willLaunchCalculation: true, willPrune: true, indexationType: IndexationType.PARTIAL, calculationsOptions: { withRollingPeriods: true }, dataConnectorObjectsNames: null, startOffset: 0, }, } satisfies ScheduleRefreshmentPayload, ); } return companies; } } |