Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 6x 6x 6x 6x 6x 6x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 4x 1x | import { Injectable, Logger } from '@nestjs/common';
import { InjectDataSource } from '@nestjs/typeorm';
import { DataSource } from 'typeorm';
import { Company, DataConnectionName, DefaultConnectionName, REPLICATED_ENTITIES_IN_DATA } from '@amalia/core/models';
import { isUndefinedObjectError } from '@amalia/ext/typeorm';
import { assert } from '@amalia/ext/typescript';
import { AmaliaPostgresQueryRunner } from './AmaliaPostgresQueryRunner';
import { configuration } from './configuration';
import { DatabaseSetupService } from './setup.service';
/**
* Service period.
*/
@Injectable()
export class DatabaseReplicationService {
private readonly logger = new Logger(DatabaseReplicationService.name);
public constructor(
@InjectDataSource(DefaultConnectionName)
private readonly defaultDbDataSource: DataSource,
@InjectDataSource(DataConnectionName)
private readonly dataDbDataSource: DataSource,
private readonly databaseSetupService: DatabaseSetupService,
) {}
public async setUpReplication() {
const { subscriptionName, user, password, publicationName } = configuration.databaseReplication;
const slotName = `${subscriptionName}_slot`;
const {
host,
port,
database: databaseName,
} = configuration.development.isLocalDevelopment
? // Cannot use the db env vars for local development because the nodejs process is out of the docker
// network, we have to use internal docker network names here.
{ host: 'db', port: 5432, database: 'amalia' }
: {
host: configuration.databaseReplication.host,
port: configuration.database.defaultConnection.port,
database: configuration.database.defaultConnection.database,
};
// First drop the subscription if it already exists in order to have a data refill.
// Or else the publication won't resend us the records we already got.
this.logger.log('Dropping existing subscription');
await this.dataDbDataSource.query(`DROP SUBSCRIPTION IF EXISTS "${subscriptionName}"`);
// And drop our replication slot on the default database.
try {
await this.defaultDbDataSource.query(`SELECT pg_drop_replication_slot('${slotName}');`);
} catch (err) {
// Unfortunately, there is no "drop if exists", it'll throw, so we're checking the error code to silence that specific error.
if (!isUndefinedObjectError(err)) {
throw err;
}
}
const queryRunner = new AmaliaPostgresQueryRunner(this.dataDbDataSource);
// Recreate those tables.
this.logger.log('Dropping and recreating replicated tables');
for (const entity of REPLICATED_ENTITIES_IN_DATA) {
await queryRunner.createTableInCompanySchema(
this.dataDbDataSource,
entity,
'public',
// Drop the copied tables. We could also TRUNCATE them, but dropping and recreating them is better
// since it allows us to change their structure. The `IF EXISTS` clause is useful here for the setup.
{ dropExistingFirst: true },
);
}
// Recreate slot on the default database.
await this.defaultDbDataSource.query(`SELECT pg_create_logical_replication_slot('${slotName}', 'pgoutput');`);
// Recreate subscription. That will also trigger a backfill.
this.logger.log('Recreating subscription');
await this.dataDbDataSource.query(`
CREATE SUBSCRIPTION "${subscriptionName}"
CONNECTION 'dbname=${databaseName} host=${host} port=${port} user=${user} password=${password}'
PUBLICATION "${publicationName}"
WITH (slot_name=${slotName}, create_slot=false);
`);
// And we're done!
this.logger.log('Replication synced, now creating views.');
const companies = await this.defaultDbDataSource.getRepository(Company).find();
await Promise.all(
companies.map(async (company) => {
await this.databaseSetupService.setupDataDbForCompany(company);
}),
);
this.logger.log('Done!');
}
/**
* To monitor the replication status between the main and data database, we're
* using the `pg_stat_replication` table on the main database.
*
* This base will know the last time its consumer updated. We can compare the lag value
* to a threshold and trigger alerts if needed.
* @see https://www.postgresql.org/docs/13/monitoring-stats.html#MONITORING-PG-STAT-REPLICATION-VIEW
*
* @param maxLagSeconds
*/
public async probe(maxLagSeconds: number) {
const { subscriptionName } = configuration.databaseReplication;
// Lag columns are a postgresql Interval, we have an easy way to convert them to seconds
// using EXTRACT and EPOCH, see https://www.postgresql.org/docs/13/functions-datetime.html#FUNCTIONS-DATETIME-EXTRACT
const [result] = await this.defaultDbDataSource.query<
[
{
state: string;
writeLag: number;
flushLag: number;
replayLag: number;
},
]
>(`
SELECT
state,
EXTRACT(EPOCH FROM write_lag) as "writeLag",
EXTRACT(EPOCH FROM flush_lag) as "flushLag",
EXTRACT(EPOCH FROM replay_lag) as "replayLag",
reply_time
FROM pg_stat_replication
WHERE application_name = '${subscriptionName}';
`);
assert(result, `Subscription ${subscriptionName} is no longer available.`);
const { state, writeLag, flushLag, replayLag } = result;
assert(
state === 'streaming' && Math.max(writeLag, flushLag, replayLag) < maxLagSeconds,
`PSQL Replication is late! ${JSON.stringify(result)}`,
);
}
}
|