using System.Threading.Channels; using EngineeringSync.Domain.Constants; using EngineeringSync.Domain.Entities; using EngineeringSync.Infrastructure; using EngineeringSync.Service.Hubs; using EngineeringSync.Service.Models; using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; namespace EngineeringSync.Service.Services; /// /// Verwaltet FileSystemWatcher-Instanzen für alle aktiven Projekte. /// Nutzt ein Channel als Puffer zwischen dem schnellen FSW-Event-Thread /// und dem langsamen DB-Schreib-Thread (Debouncing). /// public sealed class WatcherService( IDbContextFactory dbFactory, IHubContext hub, ILogger logger) : BackgroundService { // Unbounded Channel: FSW-Events kommen schnell, Verarbeitung ist langsam private readonly Channel _channel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); // Watcher pro Projekt-ID – wird dynamisch verwaltet private readonly Dictionary _watchers = []; private readonly SemaphoreSlim _watcherLock = new(1, 1); private readonly ILogger _logger = logger; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { // Alle aktiven Projekte beim Start laden List projects; await using (var db = await dbFactory.CreateDbContextAsync(stoppingToken)) { projects = await db.Projects.Where(p => p.IsActive).ToListAsync(stoppingToken); foreach (var project in projects) await StartWatchingAsync(project); } // Initialer Scan: Engineering- vs. Simulations-Ordner vergleichen foreach (var project in projects) { try { await ScanExistingFilesAsync(project, stoppingToken); } catch (Exception ex) { _logger.LogError(ex, "Initialer Scan fehlgeschlagen für '{Name}'", project.Name); } } // Channel-Consumer läuft, bis der Service gestoppt wird await ConsumeChannelAsync(stoppingToken); } /// Startet Watcher für ein Projekt. Idempotent – stoppt alten Watcher zuerst. public async Task StartWatchingAsync(ProjectConfig project) { await _watcherLock.WaitAsync(); try { StopWatchingInternal(project.Id); if (!project.IsActive || !Directory.Exists(project.EngineeringPath)) return; var extensions = project.GetExtensions().ToArray(); // Pro Dateiendung einen eigenen Watcher (FSW unterstützt nur ein Filter-Pattern) var watchers = extensions.Select(ext => CreateWatcher(project, ext)).ToArray(); _watchers[project.Id] = watchers; _logger.LogInformation("Watcher gestartet für Projekt '{Name}' ({Count} Extensions)", project.Name, watchers.Length); } finally { _watcherLock.Release(); } } /// /// Initialer Scan: Vergleicht Engineering- und Simulations-Ordner. /// Erkennt Dateien, die im Engineering-Ordner vorhanden sind, aber im /// Simulations-Ordner fehlen oder sich unterscheiden. /// public async Task ScanExistingFilesAsync(ProjectConfig project, CancellationToken ct = default) { if (!Directory.Exists(project.EngineeringPath)) return; var extensions = project.GetExtensions().ToList(); // Wenn keine Erweiterungen angegeben oder "*" → alle Dateien scannen var scanAllFiles = extensions.Count == 0 || (extensions.Count == 1 && extensions[0] == "*"); var engFiles = scanAllFiles ? Directory.EnumerateFiles(project.EngineeringPath, "*", SearchOption.AllDirectories).ToList() : Directory.EnumerateFiles(project.EngineeringPath, "*", SearchOption.AllDirectories) .Where(f => extensions.Contains(Path.GetExtension(f), StringComparer.OrdinalIgnoreCase)) .ToList(); if (engFiles.Count == 0) return; await using var db = await dbFactory.CreateDbContextAsync(ct); var newChanges = 0; foreach (var engFile in engFiles) { var relativePath = Path.GetRelativePath(project.EngineeringPath, engFile); var simFile = Path.Combine(project.SimulationPath, relativePath); var engHash = await FileHasher.ComputeAsync(engFile, ct); var engInfo = new FileInfo(engFile); // FileRevision anlegen/aktualisieren var revision = await db.FileRevisions .FirstOrDefaultAsync(r => r.ProjectId == project.Id && r.RelativePath == relativePath, ct); if (revision is null) { db.FileRevisions.Add(new FileRevision { ProjectId = project.Id, RelativePath = relativePath, FileHash = engHash, Size = engInfo.Length, LastModified = engInfo.LastWriteTimeUtc }); } else { revision.FileHash = engHash; revision.Size = engInfo.Length; revision.LastModified = engInfo.LastWriteTimeUtc; } // Prüfen: Existiert die Datei im Simulations-Ordner und ist sie identisch? bool needsSync; if (!File.Exists(simFile)) { needsSync = true; } else { var simHash = await FileHasher.ComputeAsync(simFile, ct); needsSync = engHash != simHash; } if (!needsSync) continue; // Nur anlegen, wenn nicht bereits ein offener PendingChange existiert var alreadyPending = await db.PendingChanges .AnyAsync(c => c.ProjectId == project.Id && c.RelativePath == relativePath && c.Status == ChangeStatus.Pending, ct); if (alreadyPending) continue; db.PendingChanges.Add(new PendingChange { ProjectId = project.Id, RelativePath = relativePath, ChangeType = File.Exists(simFile) ? ChangeType.Modified : ChangeType.Created }); newChanges++; } if (newChanges > 0) { await db.SaveChangesAsync(ct); var totalPending = await db.PendingChanges .CountAsync(c => c.ProjectId == project.Id && c.Status == ChangeStatus.Pending, ct); await hub.Clients.All.SendAsync(HubMethodNames.ReceiveChangeNotification, project.Id, project.Name, totalPending, ct); _logger.LogInformation("Initialer Scan für '{Name}': {Count} Unterschied(e) erkannt", project.Name, newChanges); } else { _logger.LogInformation("Initialer Scan für '{Name}': Ordner sind synchron", project.Name); } } /// Stoppt und entfernt Watcher für ein Projekt. public async Task StopWatchingAsync(Guid projectId) { await _watcherLock.WaitAsync(); try { StopWatchingInternal(projectId); } finally { _watcherLock.Release(); } } private void StopWatchingInternal(Guid projectId) { if (!_watchers.Remove(projectId, out var old)) return; foreach (var w in old) { w.EnableRaisingEvents = false; w.Dispose(); } } private FileSystemWatcher CreateWatcher(ProjectConfig project, string extension) { var watcher = new FileSystemWatcher(project.EngineeringPath) { Filter = extension == "*" ? "*.*" : $"*{extension}", IncludeSubdirectories = true, NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.FileName | NotifyFilters.Size, InternalBufferSize = 65536 // 64KB – verhindert Buffer-Overflow bei vielen gleichzeitigen Events }; // Lokale Kopie für Lambda-Capture var projectId = project.Id; var basePath = project.EngineeringPath; watcher.Created += (_, e) => EnqueueEvent(projectId, basePath, e.FullPath, FileEventType.CreatedOrChanged); watcher.Changed += (_, e) => EnqueueEvent(projectId, basePath, e.FullPath, FileEventType.CreatedOrChanged); watcher.Deleted += (_, e) => EnqueueEvent(projectId, basePath, e.FullPath, FileEventType.Deleted); watcher.Renamed += (_, e) => EnqueueRenamedEvent(projectId, basePath, e); watcher.Error += (_, args) => { _logger.LogWarning(args.GetException(), "FSW Buffer-Overflow für Projekt {Id} – starte Re-Scan", projectId); _ = Task.Run(async () => { try { await ScanExistingFilesAsync(project); } catch (Exception ex) { _logger.LogError(ex, "Re-Scan nach Buffer-Overflow fehlgeschlagen für Projekt {Id}", projectId); } }); }; watcher.EnableRaisingEvents = true; return watcher; } private void EnqueueEvent(Guid projectId, string basePath, string fullPath, FileEventType type) { var rel = Path.GetRelativePath(basePath, fullPath); _channel.Writer.TryWrite(new FileEvent(projectId, fullPath, rel, type)); } private void EnqueueRenamedEvent(Guid projectId, string basePath, RenamedEventArgs e) { var rel = Path.GetRelativePath(basePath, e.FullPath); var oldRel = Path.GetRelativePath(basePath, e.OldFullPath); _channel.Writer.TryWrite(new FileEvent(projectId, e.FullPath, rel, FileEventType.Renamed, oldRel)); } /// /// Debouncing-Consumer: Gruppiert Events nach (ProjectId, RelativePath) innerhalb /// eines 2000ms-Fensters. Verhindert, dass eine Datei 10× verarbeitet wird. /// private async Task ConsumeChannelAsync(CancellationToken ct) { // Dictionary: Key=(ProjectId,RelativePath) → letztes Event var pending = new Dictionary<(Guid, string), FileEvent>(); while (!ct.IsCancellationRequested) { // Warte auf erstes Event (blockierend) if (!await _channel.Reader.WaitToReadAsync(ct)) break; // Sammle alle sofort verfügbaren Events (nicht blockierend) while (_channel.Reader.TryRead(out var evt)) pending[(evt.ProjectId, evt.RelativePath)] = evt; // neuester gewinnt // 2s warten – kommen in dieser Zeit weitere Events, werden sie im nächsten Batch verarbeitet await Task.Delay(2000, ct); // Noch mehr eingelaufene Events sammeln while (_channel.Reader.TryRead(out var evt)) pending[(evt.ProjectId, evt.RelativePath)] = evt; if (pending.Count == 0) continue; var batch = pending.Values.ToList(); pending.Clear(); foreach (var evt in batch) { try { await ProcessEventAsync(evt, ct); } catch (Exception ex) { _logger.LogError(ex, "Fehler beim Verarbeiten von {Path}", evt.RelativePath); } } } } private async Task ProcessEventAsync(FileEvent evt, CancellationToken ct) { await using var db = await dbFactory.CreateDbContextAsync(ct); var project = await db.Projects.FindAsync([evt.ProjectId], ct); if (project is null) return; // Existierende Revision lesen var revision = await db.FileRevisions .FirstOrDefaultAsync(r => r.ProjectId == evt.ProjectId && r.RelativePath == evt.RelativePath, ct); if (evt.EventType == FileEventType.Deleted) { await HandleDeleteAsync(db, project, revision, evt, ct); return; } if (!File.Exists(evt.FullPath)) return; // Race condition: Datei schon weg // Hash berechnen mit Retry-Logik für gesperrte Dateien string newHash; try { newHash = await ComputeHashWithRetryAsync(evt.FullPath, ct); } catch (IOException ex) { _logger.LogWarning(ex, "Hash-Berechnung für {Path} fehlgeschlagen nach Retrys – überspringe Event", evt.RelativePath); return; } if (revision is not null && revision.FileHash == newHash) return; // Keine echte Änderung var info = new FileInfo(evt.FullPath); // FileRevision anlegen oder aktualisieren if (revision is null) { db.FileRevisions.Add(new FileRevision { ProjectId = evt.ProjectId, RelativePath = evt.RelativePath, FileHash = newHash, Size = info.Length, LastModified = info.LastWriteTimeUtc }); } else { revision.FileHash = newHash; revision.Size = info.Length; revision.LastModified = info.LastWriteTimeUtc; if (evt.EventType == FileEventType.Renamed) revision.RelativePath = evt.RelativePath; } // PendingChange schreiben var changeType = evt.EventType switch { FileEventType.Renamed => ChangeType.Renamed, _ when revision is null => ChangeType.Created, _ => ChangeType.Modified }; db.PendingChanges.Add(new PendingChange { ProjectId = evt.ProjectId, RelativePath = evt.RelativePath, ChangeType = changeType, OldRelativePath = evt.OldRelativePath }); await db.SaveChangesAsync(ct); // Alle SignalR-Clients benachrichtigen var count = await db.PendingChanges .CountAsync(c => c.ProjectId == evt.ProjectId && c.Status == ChangeStatus.Pending, ct); await hub.Clients.All.SendAsync(HubMethodNames.ReceiveChangeNotification, evt.ProjectId, project.Name, count, ct); _logger.LogInformation("[{Type}] {Path} in Projekt '{Name}'", changeType, evt.RelativePath, project.Name); } private async Task HandleDeleteAsync(AppDbContext db, ProjectConfig project, FileRevision? revision, FileEvent evt, CancellationToken ct) { if (revision is not null) db.FileRevisions.Remove(revision); db.PendingChanges.Add(new PendingChange { ProjectId = evt.ProjectId, RelativePath = evt.RelativePath, ChangeType = ChangeType.Deleted }); await db.SaveChangesAsync(ct); var count = await db.PendingChanges .CountAsync(c => c.ProjectId == evt.ProjectId && c.Status == ChangeStatus.Pending, ct); await hub.Clients.All.SendAsync(HubMethodNames.ReceiveChangeNotification, evt.ProjectId, project.Name, count, ct); } /// /// Berechnet den Hash einer Datei mit Retry-Logik. /// Versucht bis zu 3 Mal, mit exponentieller Backoff-Wartezeit. /// Wirft IOException, wenn alle Versuche scheitern. /// private async Task ComputeHashWithRetryAsync(string fullPath, CancellationToken ct) { const int maxAttempts = 3; for (int attempt = 0; attempt < maxAttempts; attempt++) { try { return await FileHasher.ComputeAsync(fullPath, ct); } catch (IOException) when (attempt < maxAttempts - 1) { var delaySeconds = 2 * (attempt + 1); _logger.LogDebug("Hash-Berechnung für {Path} fehlgeschlagen (Versuch {Attempt}), warte {Seconds}s...", fullPath, attempt + 1, delaySeconds); await Task.Delay(TimeSpan.FromSeconds(delaySeconds), ct); } } // Wenn alle Versuche fehlschlagen, IOException werfen throw new IOException($"Hash-Berechnung für {fullPath} fehlgeschlagen nach {maxAttempts} Versuchen"); } public override void Dispose() { // Channel-Writer schließen, um ConsumeChannelAsync zum Beenden zu bringen _channel.Writer.TryComplete(); foreach (var watchers in _watchers.Values) foreach (var w in watchers) w.Dispose(); _watcherLock.Dispose(); base.Dispose(); } }