tethys.backend/commands/index_datasets.ts
Arno Kaimbacher b5bbe26ec2
Some checks failed
build.yaml / feat: Enhance background job settings UI and functionality (push) Failing after 0s
feat: Enhance background job settings UI and functionality
- Updated BackgroundJob.vue to improve the display of background job statuses, including missing cross-references and current job mode.
- Added auto-refresh functionality for background job status.
- Introduced success toast notifications for successful status refreshes.
- Modified the XML serialization process in DatasetXmlSerializer for better caching and performance.
- Implemented a new RuleProvider for managing custom validation rules.
- Improved error handling in routes for loading background job settings.
- Enhanced ClamScan configuration with socket support for virus scanning.
- Refactored dayjs utility to streamline locale management.
2025-10-14 12:19:09 +02:00

176 lines
7.2 KiB
TypeScript

// podman exec -it tethys_backend_1 node ace validate:checksum
// sudo crontab -u www-data -e
// */5 * * * * podman exec -u www-data tethys_backend_1 node ace validate:checksum
import { XMLBuilder } from 'xmlbuilder2/lib/interfaces.js';
import { create } from 'xmlbuilder2';
import Dataset from '#models/dataset';
import XmlModel from '#app/Library/DatasetXmlSerializer';
import { readFileSync } from 'fs';
import SaxonJS from 'saxon-js';
import { Client } from '@opensearch-project/opensearch';
import { getDomain } from '#app/utils/utility-functions';
import { BaseCommand, flags } from '@adonisjs/core/ace';
import { CommandOptions } from '@adonisjs/core/types/ace';
import env from '#start/env';
import logger from '@adonisjs/core/services/logger';
import { DateTime } from 'luxon';
const opensearchNode = env.get('OPENSEARCH_HOST', 'localhost');
const client = new Client({ node: `${opensearchNode}` }); // replace with your OpenSearch endpoint
export default class IndexDatasets extends BaseCommand {
static commandName = 'index:datasets';
static description = 'Index datasets based on publish_id';
public static needsApplication = true;
@flags.number({ alias: 'p' })
public publish_id: number;
public static options: CommandOptions = {
startApp: true, // Ensures the IoC container is ready to use
staysAlive: false, // Command exits after running
};
async run() {
logger.debug('Hello world!');
// const { default: Dataset } = await import('#models/dataset');
// const datasets = await Dataset.query().where('server_state', 'published').exec(); //this.getDatasets();
const datasets = await this.getDatasets();
const proc = readFileSync('public/assets2/solr.sef.json');
const index_name = 'tethys-records';
for (var dataset of datasets) {
const shouldUpdate = await this.shouldUpdateDataset(dataset, index_name);
if (shouldUpdate) {
await this.indexDocument(dataset, index_name, proc);
} else {
logger.info(`Dataset with publish_id ${dataset.publish_id} is up to date, skipping indexing`);
}
}
}
private async getDatasets(): Promise<any[]> {
// const { default: Dataset } = await import('#models/dataset');
// const Dataset = (await import('#models/dataset')).default
// const Dataset = (
// await this.app.container.make('#models/dataset')
// ).default;
// const query: ModelQueryBuilder<Dataset, any> = db.from(Dataset);
const query = Dataset.query().preload('xmlCache').where('server_state', 'published');
if (this.publish_id) {
query.where('publish_id', this.publish_id);
}
return await query.exec();
}
private async shouldUpdateDataset(dataset: Dataset, index_name: string): Promise<boolean> {
try {
// Check if publish_id exists before proceeding
if (!dataset.publish_id) {
// Return true to update since document doesn't exist in OpenSearch yet
return true;
}
// Get the existing document from OpenSearch
const response = await client.get({
index: index_name,
id: dataset.publish_id?.toString(),
});
const existingDoc = response.body._source;
// Compare server_date_modified
if (existingDoc && existingDoc.server_date_modified) {
// Convert Unix timestamp (seconds) to milliseconds for DateTime.fromMillis()
const existingModified = DateTime.fromMillis(Number(existingDoc.server_date_modified) * 1000);
const currentModified = dataset.server_date_modified;
// Only update if the dataset has been modified more recently
if (currentModified <= existingModified) {
return false;
}
}
return true;
} catch (error) {
// If document doesn't exist or other error, we should index it
if (error.statusCode === 404) {
logger.info(`Dataset with publish_id ${dataset.publish_id} not found in index, will create new document`);
return true;
}
logger.warn(`Error checking existing document for publish_id ${dataset.publish_id}: ${error.message}`);
return true; // Index anyway if we can't determine the status
}
}
private async indexDocument(dataset: Dataset, index_name: string, proc: Buffer): Promise<void> {
try {
const doc = await this.getJsonString(dataset, proc);
let document = JSON.parse(doc);
await client.index({
id: dataset.publish_id?.toString(),
index: index_name,
body: document,
refresh: true,
});
logger.info(`dataset with publish_id ${dataset.publish_id} successfully indexed`);
} catch (error) {
logger.error(`An error occurred while indexing dataset with publish_id ${dataset.publish_id}.
Error: ${error.message}`);
}
}
private async getJsonString(dataset: Dataset, proc: Buffer) {
let xml = create({ version: '1.0', encoding: 'UTF-8', standalone: true }, '<root></root>');
const datasetNode = xml.root().ele('Dataset');
await this.createXmlRecord(dataset, datasetNode);
const xmlString = xml.end({ prettyPrint: false });
try {
const result = await SaxonJS.transform({
stylesheetText: proc,
destination: 'serialized',
sourceText: xmlString,
});
return result.principalResult;
} catch (error) {
logger.error(`An error occurred while creating the user, error: ${error.message},`);
return '';
}
}
private async createXmlRecord(dataset: Dataset, datasetNode: XMLBuilder): Promise<void> {
const domNode = await this.getDatasetXmlDomNode(dataset);
if (domNode) {
dataset.publish_id && this.addLandingPageAttribute(domNode, dataset.publish_id.toString());
this.addSpecInformation(domNode, 'data-type:' + dataset.type);
datasetNode.import(domNode);
}
}
private async getDatasetXmlDomNode(dataset: Dataset): Promise<XMLBuilder | null> {
const serializer = new XmlModel(dataset).enableCaching().excludeEmptyFields();
// xmlModel.setModel(dataset);
if (dataset.xmlCache) {
serializer.setCache(dataset.xmlCache);
}
// return cache.toXmlDocument();
const xmlDocument: XMLBuilder | null = await serializer.toXmlDocument();
return xmlDocument;
}
private addSpecInformation(domNode: XMLBuilder, information: string) {
domNode.ele('SetSpec').att('Value', information);
}
private addLandingPageAttribute(domNode: XMLBuilder, dataid: string) {
const baseDomain = process.env.OAI_BASE_DOMAIN || 'localhost';
const url = 'https://' + getDomain(baseDomain) + '/dataset/' + dataid;
// add attribute du dataset xml element
domNode.att('landingpage', url);
}
}