SFTP XML Feed Ingestion
Summary
The file watcher (lib/file-watcher.ts) runs inside the Fastify API process. It uses chokidar to monitor /home/*/feeds/*.xml in real-time, with a 5-second write-finish stability threshold to avoid processing partial uploads. Each detected file is hashed (SHA-256), checked for duplicates, and — for active partners — parsed via the XML adapter (adapters/xml-feed.ts using fast-xml-parser) and upserted into the database. A per-partner mutex (ingestionQueue) ensures sequential processing per partner while allowing parallelism across partners. On startup and every 2 minutes, a safety-net scan catches any files chokidar may have missed. Old archives (>90 days) are cleaned on startup.
State Diagram
stateDiagram-v2 state "Fichier détecté" as FileDetected state "Utilisateur extrait" as UsernameExtracted state "Recherche partenaire" as PartnerLookup state "Fichier ignoré" as Ignored state "Empreinte calculée" as Hashed state "Vérification doublon" as DuplicateCheck state "Doublon enregistré" as DuplicateRecorded state "Dépôt enregistré" as Detected state "Partenaire en pause" as Paused state "Mode audit" as AuditMode state "Audit terminé" as AuditReady state "En file d'attente" as Queued state "Ingestion en cours" as Ingesting state "Ingéré avec succès" as Ingested state "Erreur d'ingestion" as Error state "Contrôle qualité" as DataQualityCheck [*] --> FileDetected: chokidar add/change OR startup/periodic scan FileDetected --> UsernameExtracted: extract username from file path UsernameExtracted --> PartnerLookup: lookup partner by sftpUsername PartnerLookup --> Ignored: no partner found PartnerLookup --> Hashed: readAndHash() — SHA-256 Hashed --> DuplicateCheck: findFirst by (partnerId, checksum, status in [ingested, duplicate]) DuplicateCheck --> DuplicateRecorded: create FileDeposit status=duplicate, archive, notify Slack DuplicateCheck --> Detected: create FileDeposit status=detected, notify Slack Detected --> Paused: partner.status == paused Detected --> AuditMode: partner.status in [pending_mapping, audit_ready] AuditMode --> AuditReady: runFileAudit() → partner.status = audit_ready Detected --> Queued: enqueueIngestion() via per-partner mutex Queued --> Ingesting: FileDeposit.status = ingesting Ingesting --> Ingested: ingestXmlFeed() success → FileDeposit.status = ingested, archive Ingesting --> Error: ingestXmlFeed() throws → FileDeposit.status = error Ingested --> DataQualityCheck: runDataQualityCheck() fire-and-forget
Steps
1. Startup Scan (Actor: system)
On server start, startupScan() runs non-blocking in background:
- Reads
/home/directory for all user directories. - For each user, reads
/home/{user}/feeds/*.xml. - Calls
processFile()on each file.
Also cleans archive files older than 90 days.
Outcome: Files deposited while the server was down are processed
2. Real-Time Watch (Actor: system, tools: chokidar)
chokidar.watch("/home/*/feeds/*.xml", { awaitWriteFinish: { stabilityThreshold: 5000, pollInterval: 1000 } }) fires add and change events. Catella overwrites the same file daily, so both events are handled identically via handleFile().
Outcome: New and overwritten XML files trigger processFile()
3. Periodic Safety-Net Scan (Actor: system)
setInterval(scanFeedDirectories, 2 * 60 * 1000) runs every 2 minutes to catch any files that chokidar misses due to inotify limits, race conditions, or timing issues.
Outcome: Missed files processed within 2 minutes
4. File Identification (Actor: system)
extractUsername(filePath) parses /home/([^/]+)/feeds/ from the path. prisma.partner.findUnique({ where: { sftpUsername } }) resolves the partner. Files without a matching partner are silently skipped with a warning log.
5. Deduplication (Actor: system)
readAndHash(filePath) reads the file into memory and computes SHA-256. prisma.fileDeposit.findFirst({ where: { partnerId, fileChecksum: checksum, status: { in: ["ingested", "duplicate"] } } }) detects exact re-deposits. Duplicates are:
- Recorded with
status: "duplicate". - Archived to
/home/{user}/feeds/archive/. - Notified to Slack via
notifyDuplicateDeposit(). - Not ingested.
6. Deposit Record Creation (Actor: system)
For non-duplicate files: prisma.fileDeposit.create({ status: "detected" }). Slack notification via notifyDeposit(partnerName, filename, size).
7. Partner Status Branching (Actor: system)
"paused": skip ingestion, return."pending_mapping"or"audit_ready": archive file, runrunFileAudit()to extract code mappings. Partner transitions to"audit_ready"."active": callenqueueIngestion().
8. Per-Partner Mutex Queue (Actor: system)
ingestionQueue is a Map<partnerId, Promise<void>>. New tasks are chained after the existing promise for the same partner. Promises for different partners run concurrently. Queue entry is cleaned up in finally once a task completes.
9. Ingestion Execution (Actor: system, tools: fast-xml-parser)
runIngestion():
FileDeposit.status → "ingesting".ingestXmlFeed(filePath, partnerId)fromadapters/xml-feed.ts:- Parses XML with
fast-xml-parser. - Applies partner mappings from
PartnerMappingtable. - Upserts
ProgrammeandLotrecords by(partnerId, externalId)composite key (full-replace strategy).
- Parses XML with
FileDeposit.status → "ingested",ingestedAtset.- Slack notification via
notifyIngestionSuccess(). runDataQualityCheck()fire-and-forget: computes anomalies and posts to Slack.- Archive file to
/home/{user}/feeds/archive/.
On error: FileDeposit.status → "error", errorMessage set, Slack notification via notifyIngestionError().
10. Graceful Shutdown (Actor: system)
app.addHook("onClose") closes the chokidar watcher and clears the periodic scan interval.
Error States
- No SFTP username in path → skip with warning
- No partner for username → skip with warning
- Disk read error → exception propagates to chokidar handler, logged
- Ingestion parse error →
FileDeposit.status = "error", admin can retry - Slack notification failure → logged as error, does not block ingestion
Related Processes
- partner-onboarding — partner must exist with a valid SFTP username
- partner-mapping-activation — audit mode applies to pending_mapping partners
- file-deposit-monitoring — admin view of FileDeposit records and retry/reingest
- data-quality-reports — post-ingestion quality check feeds into the report system