Files

435 lines
16 KiB
C#
Raw Permalink Normal View History

using System.Threading.Channels;
using EngineeringSync.Domain.Constants;
using EngineeringSync.Domain.Entities;
using EngineeringSync.Infrastructure;
using EngineeringSync.Service.Hubs;
using EngineeringSync.Service.Models;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
namespace EngineeringSync.Service.Services;
/// <summary>
/// Verwaltet FileSystemWatcher-Instanzen für alle aktiven Projekte.
/// Nutzt ein Channel als Puffer zwischen dem schnellen FSW-Event-Thread
/// und dem langsamen DB-Schreib-Thread (Debouncing).
/// </summary>
public sealed class WatcherService(
IDbContextFactory<AppDbContext> dbFactory,
IHubContext<NotificationHub> hub,
ILogger<WatcherService> logger) : BackgroundService
{
// Unbounded Channel: FSW-Events kommen schnell, Verarbeitung ist langsam
private readonly Channel<FileEvent> _channel =
Channel.CreateUnbounded<FileEvent>(new UnboundedChannelOptions { SingleReader = true });
// Watcher pro Projekt-ID wird dynamisch verwaltet
private readonly Dictionary<Guid, FileSystemWatcher[]> _watchers = [];
private readonly SemaphoreSlim _watcherLock = new(1, 1);
private readonly ILogger<WatcherService> _logger = logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Alle aktiven Projekte beim Start laden
List<ProjectConfig> projects;
await using (var db = await dbFactory.CreateDbContextAsync(stoppingToken))
{
projects = await db.Projects.Where(p => p.IsActive).ToListAsync(stoppingToken);
foreach (var project in projects)
await StartWatchingAsync(project);
}
// Initialer Scan: Engineering- vs. Simulations-Ordner vergleichen
foreach (var project in projects)
{
try { await ScanExistingFilesAsync(project, stoppingToken); }
catch (Exception ex)
{
_logger.LogError(ex, "Initialer Scan fehlgeschlagen für '{Name}'", project.Name);
}
}
// Channel-Consumer läuft, bis der Service gestoppt wird
await ConsumeChannelAsync(stoppingToken);
}
/// <summary>Startet Watcher für ein Projekt. Idempotent stoppt alten Watcher zuerst.</summary>
public async Task StartWatchingAsync(ProjectConfig project)
{
await _watcherLock.WaitAsync();
try
{
StopWatchingInternal(project.Id);
if (!project.IsActive || !Directory.Exists(project.EngineeringPath))
return;
var extensions = project.GetExtensions().ToArray();
// Pro Dateiendung einen eigenen Watcher (FSW unterstützt nur ein Filter-Pattern)
var watchers = extensions.Select(ext => CreateWatcher(project, ext)).ToArray();
_watchers[project.Id] = watchers;
_logger.LogInformation("Watcher gestartet für Projekt '{Name}' ({Count} Extensions)",
project.Name, watchers.Length);
}
finally
{
_watcherLock.Release();
}
}
/// <summary>
/// Initialer Scan: Vergleicht Engineering- und Simulations-Ordner.
/// Erkennt Dateien, die im Engineering-Ordner vorhanden sind, aber im
/// Simulations-Ordner fehlen oder sich unterscheiden.
/// </summary>
public async Task ScanExistingFilesAsync(ProjectConfig project, CancellationToken ct = default)
{
if (!Directory.Exists(project.EngineeringPath)) return;
var extensions = project.GetExtensions().ToList();
// Wenn keine Erweiterungen angegeben oder "*" → alle Dateien scannen
var scanAllFiles = extensions.Count == 0 || (extensions.Count == 1 && extensions[0] == "*");
var engFiles = scanAllFiles
? Directory.EnumerateFiles(project.EngineeringPath, "*", SearchOption.AllDirectories).ToList()
: Directory.EnumerateFiles(project.EngineeringPath, "*", SearchOption.AllDirectories)
.Where(f => extensions.Contains(Path.GetExtension(f), StringComparer.OrdinalIgnoreCase))
.ToList();
if (engFiles.Count == 0) return;
await using var db = await dbFactory.CreateDbContextAsync(ct);
var newChanges = 0;
foreach (var engFile in engFiles)
{
var relativePath = Path.GetRelativePath(project.EngineeringPath, engFile);
var simFile = Path.Combine(project.SimulationPath, relativePath);
var engHash = await FileHasher.ComputeAsync(engFile, ct);
var engInfo = new FileInfo(engFile);
// FileRevision anlegen/aktualisieren
var revision = await db.FileRevisions
.FirstOrDefaultAsync(r => r.ProjectId == project.Id && r.RelativePath == relativePath, ct);
if (revision is null)
{
db.FileRevisions.Add(new FileRevision
{
ProjectId = project.Id,
RelativePath = relativePath,
FileHash = engHash,
Size = engInfo.Length,
LastModified = engInfo.LastWriteTimeUtc
});
}
else
{
revision.FileHash = engHash;
revision.Size = engInfo.Length;
revision.LastModified = engInfo.LastWriteTimeUtc;
}
// Prüfen: Existiert die Datei im Simulations-Ordner und ist sie identisch?
bool needsSync;
if (!File.Exists(simFile))
{
needsSync = true;
}
else
{
var simHash = await FileHasher.ComputeAsync(simFile, ct);
needsSync = engHash != simHash;
}
if (!needsSync) continue;
// Nur anlegen, wenn nicht bereits ein offener PendingChange existiert
var alreadyPending = await db.PendingChanges
.AnyAsync(c => c.ProjectId == project.Id
&& c.RelativePath == relativePath
&& c.Status == ChangeStatus.Pending, ct);
if (alreadyPending) continue;
db.PendingChanges.Add(new PendingChange
{
ProjectId = project.Id,
RelativePath = relativePath,
ChangeType = File.Exists(simFile) ? ChangeType.Modified : ChangeType.Created
});
newChanges++;
}
if (newChanges > 0)
{
await db.SaveChangesAsync(ct);
var totalPending = await db.PendingChanges
.CountAsync(c => c.ProjectId == project.Id && c.Status == ChangeStatus.Pending, ct);
await hub.Clients.All.SendAsync(HubMethodNames.ReceiveChangeNotification,
project.Id, project.Name, totalPending, ct);
_logger.LogInformation("Initialer Scan für '{Name}': {Count} Unterschied(e) erkannt",
project.Name, newChanges);
}
else
{
_logger.LogInformation("Initialer Scan für '{Name}': Ordner sind synchron", project.Name);
}
}
/// <summary>Stoppt und entfernt Watcher für ein Projekt.</summary>
public async Task StopWatchingAsync(Guid projectId)
{
await _watcherLock.WaitAsync();
try { StopWatchingInternal(projectId); }
finally { _watcherLock.Release(); }
}
private void StopWatchingInternal(Guid projectId)
{
if (!_watchers.Remove(projectId, out var old)) return;
foreach (var w in old) { w.EnableRaisingEvents = false; w.Dispose(); }
}
private FileSystemWatcher CreateWatcher(ProjectConfig project, string extension)
{
var watcher = new FileSystemWatcher(project.EngineeringPath)
{
Filter = extension == "*" ? "*.*" : $"*{extension}",
IncludeSubdirectories = true,
NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.FileName | NotifyFilters.Size,
InternalBufferSize = 65536 // 64KB verhindert Buffer-Overflow bei vielen gleichzeitigen Events
};
// Lokale Kopie für Lambda-Capture
var projectId = project.Id;
var basePath = project.EngineeringPath;
watcher.Created += (_, e) => EnqueueEvent(projectId, basePath, e.FullPath, FileEventType.CreatedOrChanged);
watcher.Changed += (_, e) => EnqueueEvent(projectId, basePath, e.FullPath, FileEventType.CreatedOrChanged);
watcher.Deleted += (_, e) => EnqueueEvent(projectId, basePath, e.FullPath, FileEventType.Deleted);
watcher.Renamed += (_, e) => EnqueueRenamedEvent(projectId, basePath, e);
watcher.Error += (_, args) =>
{
_logger.LogWarning(args.GetException(), "FSW Buffer-Overflow für Projekt {Id} starte Re-Scan", projectId);
_ = Task.Run(async () =>
{
try
{
await ScanExistingFilesAsync(project);
}
catch (Exception ex)
{
_logger.LogError(ex, "Re-Scan nach Buffer-Overflow fehlgeschlagen für Projekt {Id}", projectId);
}
});
};
watcher.EnableRaisingEvents = true;
return watcher;
}
private void EnqueueEvent(Guid projectId, string basePath, string fullPath, FileEventType type)
{
var rel = Path.GetRelativePath(basePath, fullPath);
_channel.Writer.TryWrite(new FileEvent(projectId, fullPath, rel, type));
}
private void EnqueueRenamedEvent(Guid projectId, string basePath, RenamedEventArgs e)
{
var rel = Path.GetRelativePath(basePath, e.FullPath);
var oldRel = Path.GetRelativePath(basePath, e.OldFullPath);
_channel.Writer.TryWrite(new FileEvent(projectId, e.FullPath, rel, FileEventType.Renamed, oldRel));
}
/// <summary>
/// Debouncing-Consumer: Gruppiert Events nach (ProjectId, RelativePath) innerhalb
/// eines 2000ms-Fensters. Verhindert, dass eine Datei 10× verarbeitet wird.
/// </summary>
private async Task ConsumeChannelAsync(CancellationToken ct)
{
// Dictionary: Key=(ProjectId,RelativePath) → letztes Event
var pending = new Dictionary<(Guid, string), FileEvent>();
while (!ct.IsCancellationRequested)
{
// Warte auf erstes Event (blockierend)
if (!await _channel.Reader.WaitToReadAsync(ct)) break;
// Sammle alle sofort verfügbaren Events (nicht blockierend)
while (_channel.Reader.TryRead(out var evt))
pending[(evt.ProjectId, evt.RelativePath)] = evt; // neuester gewinnt
// 2s warten kommen in dieser Zeit weitere Events, werden sie im nächsten Batch verarbeitet
await Task.Delay(2000, ct);
// Noch mehr eingelaufene Events sammeln
while (_channel.Reader.TryRead(out var evt))
pending[(evt.ProjectId, evt.RelativePath)] = evt;
if (pending.Count == 0) continue;
var batch = pending.Values.ToList();
pending.Clear();
foreach (var evt in batch)
{
try { await ProcessEventAsync(evt, ct); }
catch (Exception ex)
{
_logger.LogError(ex, "Fehler beim Verarbeiten von {Path}", evt.RelativePath);
}
}
}
}
private async Task ProcessEventAsync(FileEvent evt, CancellationToken ct)
{
await using var db = await dbFactory.CreateDbContextAsync(ct);
var project = await db.Projects.FindAsync([evt.ProjectId], ct);
if (project is null) return;
// Existierende Revision lesen
var revision = await db.FileRevisions
.FirstOrDefaultAsync(r => r.ProjectId == evt.ProjectId && r.RelativePath == evt.RelativePath, ct);
if (evt.EventType == FileEventType.Deleted)
{
await HandleDeleteAsync(db, project, revision, evt, ct);
return;
}
if (!File.Exists(evt.FullPath)) return; // Race condition: Datei schon weg
// Hash berechnen mit Retry-Logik für gesperrte Dateien
string newHash;
try
{
newHash = await ComputeHashWithRetryAsync(evt.FullPath, ct);
}
catch (IOException ex)
{
_logger.LogWarning(ex, "Hash-Berechnung für {Path} fehlgeschlagen nach Retrys überspringe Event", evt.RelativePath);
return;
}
if (revision is not null && revision.FileHash == newHash) return; // Keine echte Änderung
var info = new FileInfo(evt.FullPath);
// FileRevision anlegen oder aktualisieren
if (revision is null)
{
db.FileRevisions.Add(new FileRevision
{
ProjectId = evt.ProjectId,
RelativePath = evt.RelativePath,
FileHash = newHash,
Size = info.Length,
LastModified = info.LastWriteTimeUtc
});
}
else
{
revision.FileHash = newHash;
revision.Size = info.Length;
revision.LastModified = info.LastWriteTimeUtc;
if (evt.EventType == FileEventType.Renamed)
revision.RelativePath = evt.RelativePath;
}
// PendingChange schreiben
var changeType = evt.EventType switch
{
FileEventType.Renamed => ChangeType.Renamed,
_ when revision is null => ChangeType.Created,
_ => ChangeType.Modified
};
db.PendingChanges.Add(new PendingChange
{
ProjectId = evt.ProjectId,
RelativePath = evt.RelativePath,
ChangeType = changeType,
OldRelativePath = evt.OldRelativePath
});
await db.SaveChangesAsync(ct);
// Alle SignalR-Clients benachrichtigen
var count = await db.PendingChanges
.CountAsync(c => c.ProjectId == evt.ProjectId && c.Status == ChangeStatus.Pending, ct);
await hub.Clients.All.SendAsync(HubMethodNames.ReceiveChangeNotification,
evt.ProjectId, project.Name, count, ct);
_logger.LogInformation("[{Type}] {Path} in Projekt '{Name}'",
changeType, evt.RelativePath, project.Name);
}
private async Task HandleDeleteAsync(AppDbContext db, ProjectConfig project,
FileRevision? revision, FileEvent evt, CancellationToken ct)
{
if (revision is not null)
db.FileRevisions.Remove(revision);
db.PendingChanges.Add(new PendingChange
{
ProjectId = evt.ProjectId,
RelativePath = evt.RelativePath,
ChangeType = ChangeType.Deleted
});
await db.SaveChangesAsync(ct);
var count = await db.PendingChanges
.CountAsync(c => c.ProjectId == evt.ProjectId && c.Status == ChangeStatus.Pending, ct);
await hub.Clients.All.SendAsync(HubMethodNames.ReceiveChangeNotification,
evt.ProjectId, project.Name, count, ct);
}
/// <summary>
/// Berechnet den Hash einer Datei mit Retry-Logik.
/// Versucht bis zu 3 Mal, mit exponentieller Backoff-Wartezeit.
/// Wirft IOException, wenn alle Versuche scheitern.
/// </summary>
private async Task<string> ComputeHashWithRetryAsync(string fullPath, CancellationToken ct)
{
const int maxAttempts = 3;
for (int attempt = 0; attempt < maxAttempts; attempt++)
{
try
{
return await FileHasher.ComputeAsync(fullPath, ct);
}
catch (IOException) when (attempt < maxAttempts - 1)
{
var delaySeconds = 2 * (attempt + 1);
_logger.LogDebug("Hash-Berechnung für {Path} fehlgeschlagen (Versuch {Attempt}), warte {Seconds}s...",
fullPath, attempt + 1, delaySeconds);
await Task.Delay(TimeSpan.FromSeconds(delaySeconds), ct);
}
}
// Wenn alle Versuche fehlschlagen, IOException werfen
throw new IOException($"Hash-Berechnung für {fullPath} fehlgeschlagen nach {maxAttempts} Versuchen");
}
public override void Dispose()
{
// Channel-Writer schließen, um ConsumeChannelAsync zum Beenden zu bringen
_channel.Writer.TryComplete();
foreach (var watchers in _watchers.Values)
foreach (var w in watchers) w.Dispose();
_watcherLock.Dispose();
base.Dispose();
}
}