You are currently viewing Widzę ciemność! Czyli aplikacja w kontenerze nasłuchuje na nowe pliki

Widzę ciemność! Czyli aplikacja w kontenerze nasłuchuje na nowe pliki

Czasem aplikacja ma reagować na zmiany zachodzące w systemie plików aby, np. przetwarzać nowe pliki, które właśnie ktoś do niego dodał. Nadaje się do tego świetnie klasa FileSystemWatcher (FSW). Działa w Windows i pod przy odpowiedniej konfiguracji także pod Linuxem. Ale słodycz się kończy po skonteneryzowaniu aplikacji. Trafił mi się właśnie taki przypadek. Kontener nie dostaje notyfikacji o zmianach w systemie plików woluminu zamontowanego do kontenera. Zupełna cisza, albo cytując klasyka „Ciemność, widzę ciemność!”

Przyczyną tego stanu rzeczy są różne mechanizmy generowania i transmisji zdarzeń o zmianie w systemie plików przez różne OS, systemy plików, protokoły sieciowe. FileSystemWatcher instancjonuje obiekt inotify dla każdego śledzonego folderu, który nasłuchuje na zdarzenia generowane przez system plików. Jeśli jednak nastąpi błąd sieci, to zdarzenie nie dotrze. Jeśli przepełni się bufor zdarzeń, to zdarzenie nie dotrze, Jeśli system plików nie potrafi w sposób powtarzalny wysłać powiadomień, to one nie będą docierały. Tych jeżeli… jest sporo i zawsze kończą się tym, że zdarzenie nie dotrze. FSW sprawdza się w zasadzie jedynie dla dysków lokalnych i to najlepiej pod Windows.

A tymczasem w folderze jest coraz więcej plików, a aplikacja nie reaguje. Co robić? Pooling udziału sieciowego? Nic innego nie pozostaje.

Można do tego zastosować PhysicalFileProvider. W odróżnieniu od FSW, nie pozwala on na wybór rodzaju zdarzeń, które notyfikuje, ale powinien wystarczyć przynajmniej jako źródło informacji, że „coś się zmieniło”. Po prostu zostanie ustawiony cykliczny pooling folderu. Nie ma innego wyjścia w Dockerze. Sami będziemy musieli dowiedzieć się, jak zmiana zaszła. Jedynie co wiadomo, to że zaszła. Ale dobre i to.

Za to wykonamy porządną implementację obsługi kolejki zdarzeń. Przecież może się zdarzyć, że zdarzenia o nowych plikach będą spływały szybciej niż trwa ich obsługa. Wykorzystamy generyczną BlockingCollection do jednoczesnego dodawania i konsumowania zdarzeń. To kolejka FIFO, która jest bezpieczna dla wielu wątków (thread-safe). Jeden wątek może dodawać zdarzenia informujące o nowych plikach, a inny obsługiwać te zdarzenia. Nasz DirectoryWatcher ma następujący interfejs:

public interface IDirectoryWatcher
{
    void RegisterCallback(Action<string> callback);
    Task StartWatching(CancellationToken cancellationToken);
    void Dispose();
}

Po zainstancjonowaniu DirectoryWatchera rejestruje się callback. Jest on wołany w momencie obsługi zdarzenia pobieranego z kolejki. W przykładzie callbackiem jest medoda synchroniczna. Jest ona wołana w osobnym wątku, tym samym który obsługuje kolejkę. Obsługa callbacka nie nie blokuje działania pozostałego kodu. W razie potrzeby można łatwo zmienić kod, aby wywołanie było asynchroniczne.

Potem wystarczy uruchomić DirectoryWatcher wywołując StartWatching. Metoda wygląda tak:

public async Task StartWatching(CancellationToken ct = default)
{
    _fileSystemEventBuffer = new BlockingCollection<IFileInfo>(EventBufferSize);

    CreateFileWatcher();
    _fileWatcherCallback = WatchForFileChanges();

    var processBuffer = Task.Run(() =>
    {
        foreach (var fileInfo in _fileSystemEventBuffer.GetConsumingEnumerable(ct))
        {
            _callback?.Invoke(fileInfo.PhysicalPath!);
            LogWhenBufferIsEmpty();
        }
    }, ct);

    // Implement resilience to transient IO errors
    _files = GetFiles();
    BufferFiles();

    await processBuffer.ConfigureAwait(false);
}

Deklaracje zmiennych i stałych na razie pominę. Zostaną pokazane w dalszej części. Podobnie metody CreateFileWatcher() i WatchForFileChanges(). Teraz opiszę działanie typu BlockingCollection. Tworzymy bufor new BlockingCollection(EventBufferSize). W linii 8 uruchamiany jest wątek, który pobiera z kolejki informacje o nowych plikach i wywołuje callback. Kluczowe tu jest, że pętla się „nie kończy”. Po skonsumowaniu wszyskich zdarzeń zostaje „uśpiona” i wznawia działanie kiedy do kolekcji wpadną nowe elementy. A mogą one być dodawane w każdym momencie w innym wątku. Możemy zatem spokojnie w swoim tempie obsługiwać zdarzenia nie martwiąc się o zablokowanie możliwości ich dodawania. W linii 18 pobieramy informacje o plikach, bo na starcie programu zakładamy, że wszystkie pliki są nowe. Definicja metody GetFiles():

private List<IFileInfo> GetFiles() => _fileWatcher!.GetDirectoryContents(string.Empty).ToList();

Następnie wszyskie dane wrzucamy do bufora (linia 19). Kiedy tylko zostanie dodany pierwszy rekord, wznawia pracę pętla foreach z linii 10 i wywoływany jest callback.

private void BufferFiles()
{
    foreach (var fileInfo in _files!)
    {
        if (!_fileSystemEventBuffer!.TryAdd(fileInfo))
            Console.WriteLine($"Buffer size exceeded ({EventBufferSize}) or buffer is disposed");
    }
}

I do by było na tyle, jeśli idzie o mechanizm bufora. Wielowątkowo dodajemy do niego i konsumujemy obiekty. Dla reszty kodu działanie „głównej pętli” odbywa się „w tle” nie wpływając na pracę innych wątków. Nie jest to wątek tła, ale dzięki asynchroniczności i bezpiecznemu dla wątków api BlockingCollection możemy tak roboczo przyjąć.

Do omówienia został mechanizm notyfikacji o nowych plikach w folderze. Na początek metoda CreateFileWatcher() wołana w linii 5 metody StartWatching(). Instancjonujemy PhysicalFileProvider, który nie będzie brał pod uwagę plików ukrytych (także z kropką) ani systemowych. Będzie za to cyklicznie sięgał do folderu na okoliczność wykrycia zmian w systemie plików. Nic specjalnego.

private void CreateFileWatcher()
{
    _fileWatcher = new PhysicalFileProvider(_directoryToWatch, ExclusionFilters.Sensitive)
    {
        UsePollingFileWatcher = true,
        UseActivePolling = true
    };
}

Za to metoda WatchForFileChanges() a właściwie to co ona zapoczątkowuje jest bardziej interesujące.

private static ulong _changeLevel;
private IChangeToken? _changeToken;
private IDisposable? _fileWatcherCallback;

private IDisposable WatchForFileChanges()
{
    _changeToken = _fileWatcher!.Watch("**/*.*");
    return _changeToken.RegisterChangeCallback(_ => NotifyFileChange(), default);
}

private void NotifyFileChange()
{
    Console.WriteLine("Directory has changed. Callback invoked");

    _fileWatcherCallback = WatchForFileChanges();

    if (0 == Interlocked.CompareExchange(ref _changeLevel, 1, 0))
        BufferNewFiles();
    else
        Interlocked.Exchange(ref _changeLevel, 2);
}

W linii 7 wskazujemy jakie pliki nas interesują używając filtra. Metoda Watch() zwraca IChangeToken, który będzie notyfikowany o zmianach śledzonych plików. Następnie temu tokenowi wskazujemy callback jaki ma wywołać w wyniku notyfikacji (NotifyFileChange). W moich próbach po jednorazowej notyfikacji IChangeToken przestawał być użyteczny. Albo przestawał być notyfikowany o zmianach, albo rejestacja callbacka wygasała. W każdym razie w linii 15 ponownie wołam metodę WatchForFileChanges(), aby notyfikacje znów działały. A chcemy, żeby działały nieustannie nasłuchując na nowe pliki.

Teraz robi się ciekawiej. Trzeba jakoś ogarnąć różne stany w jakich może się znaleźć Watcher. Wyróżniłem trzy stany:

  • 0 – nie trwa proces buforowania (dodawania zmian do kolekcji), brak notyfikacji o zmianach
  • 1 – trwa proces buforowania, brak notyfikacji o zmianach,
  • 2 – trwa proces buforowania i nadeszła notyfikacja o zmianach

Do przechowywania informacji, w jakim stanie jest Watcher, wykorzystałem zmienną statyczną, do której dostęp uzyskuję przez klasę Interlocked. Ta klasa zapewnia dostęp do zmiennej w sposób bezpieczny z różnych wątków (thread-safe).

W metodzie NotifyFileChange() jeśli mamy stan 0, to buforujemy nowe pliki i następuje zmiana stanu na 1. W przeciwnych przypadku ustawiamy stan na 2. Co się dzieje dalej, pokazują kolejne listingi.

private void BufferNewFiles()
{
    do
    {
        Interlocked.CompareExchange(ref _changeLevel, 1, 2);
        
        var filesActual = GetFiles();
        var newFiles = filesActual.ExceptBy(_files!.Select(f => f.Name), fi => fi.Name);
        _files = filesActual;

        foreach (var fileInfo in newFiles)
        {
            if (!_fileSystemEventBuffer?.TryAdd(fileInfo) ?? false)
                Console.WriteLine($"Buffer size exceeded ({EventBufferSize}) or buffer is disposed");
        }

    } while (1 < Interlocked.Read(ref _changeLevel));

    Interlocked.Exchange(ref _changeLevel, 0);
}

Metoda BufferNewFiles() na początek zmienia stan na 1 „trwa proces buforowania”. Pobiera aktualną listę plików w folderze, porównuje z zachowaną w pamięci i nowe pliki dodaje do bufora. Zastosowałem proste porównanie nazw plików, co nie zawsze musi być jednoznaczne. Gdybyśmy chcieli wykrywać zmianę plików (a nie dodanie nowych), to można by liczyć hash zawartości i przechowywać go jako wartość w słowniku, a znormalizowaną nazwę (albo jej hash) jako klucz. Ale w tym wpisie skupiam się na funkcji bezpiecznego i skutecznego notyfikowania o zmianach w folderze na poziomie systemu plików.

Po dodaniu plików do bufora, sprawdzana jest aktualna wartość _changeLevel, Jeśli jest 2, czyli w trakcie aktualizacji bufora pojawiła się notyfikacja o zmianach, powtarzamy proces buforowania. Jeśli nie, to kończy się proces buforowania a stan przyjmuje wartość 0. Watcher oczekuje na notyfikację.

W praktyce zdarzają się sytuacje, że notyfikacja nie nadchodzi. Pisałem o tym na początku. Można przyjąć dwie strategie wobec takich przypaków. Albo czeka się na następną notyfikację, po której i tak badana jest zawartość folderu, więc nowy plik nie umknie. Ale jeżeli założenie biznesowe jest takie, że nowe pliki mogą się pojawiać w takich dużych odstępach, że nie można sobie pozwolić na oczekiwanie z przetworzeniem pominiętego pliku, to trzeba znaleźć inne rozwiązanie. Ja przyjąłem drugi scenariusz. Timer co jakiś czas sprawdza folder nawet jak notyfikacja nie nadejdzie. Natomiast robi to w dużych interwałach i jego praca jest wstrzymywana, jeśli pliki są buforowane (_changeLevel != 0). Timer jest widoczny poniżej na pełnym listu Watchera.

using System.Collections.Concurrent;
using Microsoft.Extensions.FileProviders;
using Microsoft.Extensions.FileProviders.Physical;
using Microsoft.Extensions.Primitives;

namespace DirectoryWatcher.DirectoryWatcher;

public class DirectoryWatcherWithPolling : IDirectoryWatcher, IDisposable
{
    private const int EventBufferSize = 100000;
    private const int DirectoryPollingInterval = 300;
    
    private readonly string _directoryToWatch;
    private static ulong _changeLevel;
    private Action<string>? _callback;
    private IEnumerable<IFileInfo>? _files;
    private BlockingCollection<IFileInfo>? _fileSystemEventBuffer;
    private PhysicalFileProvider? _fileWatcher;
    private IChangeToken? _changeToken;
    private IDisposable? _fileWatcherCallback;
    private PeriodicTimer? _timer;

    public DirectoryWatcherWithPolling(string directoryToWatch)
    {
        if (string.IsNullOrEmpty(directoryToWatch) || !Directory.Exists(directoryToWatch))
            throw new ArgumentException("Directory can not be empty string");
        
        _directoryToWatch = directoryToWatch;
    }

    public void RegisterCallback(Action<string>? callback) => _callback = callback;
    
    public async Task StartWatching(Action<string>? callback, CancellationToken ct = default)
    {
        RegisterCallback(callback);
        await StartWatching(ct);
    }

    public async Task StartWatching(CancellationToken ct = default)
    {
        try
        {
            _timer = new PeriodicTimer(TimeSpan.FromSeconds(DirectoryPollingInterval));
            _fileSystemEventBuffer = new BlockingCollection<IFileInfo>(EventBufferSize);

            CreateFileWatcher();
            _fileWatcherCallback = WatchForFileChanges();

            var processBuffer = Task.Run(() =>
            {
                foreach (var fileInfo in _fileSystemEventBuffer.GetConsumingEnumerable(ct))
                {
                    _callback?.Invoke(fileInfo.PhysicalPath!);
                    LogWhenBufferIsEmpty();
                }
            }, ct);

            // TODO Implement resilience policy
            _files = GetFiles();
            BufferFiles();

            while (await _timer.WaitForNextTickAsync(ct))
            {
                if (Interlocked.Read(ref _changeLevel) != 0) continue;

                _fileWatcher?.Dispose();
                Console.WriteLine("Directory polling upon timer");

                BufferNewFiles();
                CreateFileWatcher();
                _fileWatcherCallback = WatchForFileChanges();
            }

            await processBuffer.ConfigureAwait(false);
        }
        catch (OperationCanceledException)
        {
            // ignore
        }
        finally
        {
            Dispose();
        }
    }

    private void LogWhenBufferIsEmpty()
    {
        if (_fileSystemEventBuffer?.Count == 0)
            Console.WriteLine("Synchronisation buffer is empty");
    }

    private List<IFileInfo> GetFiles() =>
        _fileWatcher!.GetDirectoryContents(string.Empty).ToList();

    private void CreateFileWatcher()
    {
        // TODO Implement resilience policy
        _fileWatcher = new PhysicalFileProvider(_directoryToWatch, ExclusionFilters.Sensitive)
        {
            UsePollingFileWatcher = true,
            UseActivePolling = true
        };
    }

    private IDisposable WatchForFileChanges()
    {
        _changeToken = _fileWatcher!.Watch("**/*.*");
        return _changeToken.RegisterChangeCallback(_ => NotifyFileChange(), default);
    }

    private void NotifyFileChange()
    {
        Console.WriteLine("Directory has changed. Callback invoked");

        _fileWatcherCallback = WatchForFileChanges();

        if (0 == Interlocked.CompareExchange(ref _changeLevel, 1, 0))
            BufferNewFiles();
        else
            Interlocked.Exchange(ref _changeLevel, 2);
    }
    
    private void BufferFiles()
    {
        foreach (var fileInfo in _files!)
        {
            if (!_fileSystemEventBuffer!.TryAdd(fileInfo))
                Console.WriteLine($"Buffer size exceeded ({EventBufferSize}) or buffer is disposed");
        }
    }
    
    private void BufferNewFiles()
    {
        do
        {
            Interlocked.CompareExchange(ref _changeLevel, 1, 2);
            
            // TODO Implement resilience policy
            var filesActual = GetFiles();
            var newFiles = filesActual.ExceptBy(_files!.Select(f => f.Name), fi => fi.Name);
            _files = filesActual;

            foreach (var fileInfo in newFiles)
            {
                if (!_fileSystemEventBuffer?.TryAdd(fileInfo) ?? false)
                    Console.WriteLine($"Buffer size exceeded ({EventBufferSize}) or buffer is disposed");
            }

        } while (1 < Interlocked.Read(ref _changeLevel));

        Interlocked.Exchange(ref _changeLevel, 0);
    }

    private bool _isDisposed;
    public void Dispose() => Dispose(true);
    private void Dispose(bool disposing)
    {
        if (_isDisposed) return;

        if (disposing)
        {
            _fileWatcherCallback?.Dispose();
            _fileSystemEventBuffer?.Dispose();
            _timer?.Dispose();
        }
        _isDisposed = true;
    }
}

Uwagi na koniec. Skupiłem się na pokazaniu, jak sprawnie nasłuchiwać na zmiany w systemie plików przez aplikację działającą w kontenerze Dockera, gdzie nie zadziała FileSystemWatcher. Zaprezentowany mechanizm działa bardzo dobrze. W produkcji klasa DirectoryWatcherWithPolling jest instancjonowana z kontenera DI. Wstrzykiwany jest logger (zamiast wyjścia na konsolę), polityki odporności na błędy IO (każde odwołanie do dysku jest „uodpornione” na chwilowe błędy IO) i ustawienia.

Na Githubie można zapoznać się z przykładową aplikacją, wykorzystującą DirectoryWatcher.

Dodaj komentarz