SFTP XML Feed Ingestion

Summary

The file watcher (lib/file-watcher.ts) runs inside the Fastify API process. It uses chokidar to monitor /home/*/feeds/*.xml in real-time, with a 5-second write-finish stability threshold to avoid processing partial uploads. Each detected file is hashed (SHA-256), checked for duplicates, and — for active partners — parsed via the XML adapter (adapters/xml-feed.ts using fast-xml-parser) and upserted into the database. A per-partner mutex (ingestionQueue) ensures sequential processing per partner while allowing parallelism across partners. On startup and every 2 minutes, a safety-net scan catches any files chokidar may have missed. Old archives (>90 days) are cleaned on startup.

State Diagram

stateDiagram-v2
    state "Fichier détecté" as FileDetected
    state "Utilisateur extrait" as UsernameExtracted
    state "Recherche partenaire" as PartnerLookup
    state "Fichier ignoré" as Ignored
    state "Empreinte calculée" as Hashed
    state "Vérification doublon" as DuplicateCheck
    state "Doublon enregistré" as DuplicateRecorded
    state "Dépôt enregistré" as Detected
    state "Partenaire en pause" as Paused
    state "Mode audit" as AuditMode
    state "Audit terminé" as AuditReady
    state "En file d'attente" as Queued
    state "Ingestion en cours" as Ingesting
    state "Ingéré avec succès" as Ingested
    state "Erreur d'ingestion" as Error
    state "Contrôle qualité" as DataQualityCheck
    [*] --> FileDetected: chokidar add/change OR startup/periodic scan
    FileDetected --> UsernameExtracted: extract username from file path
    UsernameExtracted --> PartnerLookup: lookup partner by sftpUsername
    PartnerLookup --> Ignored: no partner found
    PartnerLookup --> Hashed: readAndHash() — SHA-256
    Hashed --> DuplicateCheck: findFirst by (partnerId, checksum, status in [ingested, duplicate])
    DuplicateCheck --> DuplicateRecorded: create FileDeposit status=duplicate, archive, notify Slack
    DuplicateCheck --> Detected: create FileDeposit status=detected, notify Slack
    Detected --> Paused: partner.status == paused
    Detected --> AuditMode: partner.status in [pending_mapping, audit_ready]
    AuditMode --> AuditReady: runFileAudit() → partner.status = audit_ready
    Detected --> Queued: enqueueIngestion() via per-partner mutex
    Queued --> Ingesting: FileDeposit.status = ingesting
    Ingesting --> Ingested: ingestXmlFeed() success → FileDeposit.status = ingested, archive
    Ingesting --> Error: ingestXmlFeed() throws → FileDeposit.status = error
    Ingested --> DataQualityCheck: runDataQualityCheck() fire-and-forget

Steps

1. Startup Scan (Actor: system)

On server start, startupScan() runs non-blocking in background:

  1. Reads /home/ directory for all user directories.
  2. For each user, reads /home/{user}/feeds/*.xml.
  3. Calls processFile() on each file.

Also cleans archive files older than 90 days.

Outcome: Files deposited while the server was down are processed

2. Real-Time Watch (Actor: system, tools: chokidar)

chokidar.watch("/home/*/feeds/*.xml", { awaitWriteFinish: { stabilityThreshold: 5000, pollInterval: 1000 } }) fires add and change events. Catella overwrites the same file daily, so both events are handled identically via handleFile().

Outcome: New and overwritten XML files trigger processFile()

3. Periodic Safety-Net Scan (Actor: system)

setInterval(scanFeedDirectories, 2 * 60 * 1000) runs every 2 minutes to catch any files that chokidar misses due to inotify limits, race conditions, or timing issues.

Outcome: Missed files processed within 2 minutes

4. File Identification (Actor: system)

extractUsername(filePath) parses /home/([^/]+)/feeds/ from the path. prisma.partner.findUnique({ where: { sftpUsername } }) resolves the partner. Files without a matching partner are silently skipped with a warning log.

5. Deduplication (Actor: system)

readAndHash(filePath) reads the file into memory and computes SHA-256. prisma.fileDeposit.findFirst({ where: { partnerId, fileChecksum: checksum, status: { in: ["ingested", "duplicate"] } } }) detects exact re-deposits. Duplicates are:

  1. Recorded with status: "duplicate".
  2. Archived to /home/{user}/feeds/archive/.
  3. Notified to Slack via notifyDuplicateDeposit().
  4. Not ingested.

6. Deposit Record Creation (Actor: system)

For non-duplicate files: prisma.fileDeposit.create({ status: "detected" }). Slack notification via notifyDeposit(partnerName, filename, size).

7. Partner Status Branching (Actor: system)

  • "paused": skip ingestion, return.
  • "pending_mapping" or "audit_ready": archive file, run runFileAudit() to extract code mappings. Partner transitions to "audit_ready".
  • "active": call enqueueIngestion().

8. Per-Partner Mutex Queue (Actor: system)

ingestionQueue is a Map<partnerId, Promise<void>>. New tasks are chained after the existing promise for the same partner. Promises for different partners run concurrently. Queue entry is cleaned up in finally once a task completes.

9. Ingestion Execution (Actor: system, tools: fast-xml-parser)

runIngestion():

  1. FileDeposit.status → "ingesting".
  2. ingestXmlFeed(filePath, partnerId) from adapters/xml-feed.ts:
    • Parses XML with fast-xml-parser.
    • Applies partner mappings from PartnerMapping table.
    • Upserts Programme and Lot records by (partnerId, externalId) composite key (full-replace strategy).
  3. FileDeposit.status → "ingested", ingestedAt set.
  4. Slack notification via notifyIngestionSuccess().
  5. runDataQualityCheck() fire-and-forget: computes anomalies and posts to Slack.
  6. Archive file to /home/{user}/feeds/archive/.

On error: FileDeposit.status → "error", errorMessage set, Slack notification via notifyIngestionError().

10. Graceful Shutdown (Actor: system)

app.addHook("onClose") closes the chokidar watcher and clears the periodic scan interval.

Error States

  • No SFTP username in path → skip with warning
  • No partner for username → skip with warning
  • Disk read error → exception propagates to chokidar handler, logged
  • Ingestion parse error → FileDeposit.status = "error", admin can retry
  • Slack notification failure → logged as error, does not block ingestion