using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.Reliable; using System.Net; using System.Net.Security; using System.Text; namespace SuperstreamPrototype { /// Příklad RabbitMQ Superstream SAC Consumera /// Demonstruje jak konzumovat zprávy z RabbitMQ superstreamu s SSL autentizací a funkcí Single Active Consumer internal class SuperStreamConsumer { static async Task Main(string[] args) { // Konfigurace koncového bodu RabbitMQ serveru var addressResolver = new AddressResolver(new DnsEndPoint("esb-test.nipez.cz", 5551)); // Vytvoření konfigurace stream systému var config = new StreamSystemConfig { AddressResolver = addressResolver, Endpoints = new List() { addressResolver.EndPoint }, VirtualHost = "IDM", Ssl = new SslOption { Enabled = true, AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateChainErrors | SslPolicyErrors.RemoteCertificateNameMismatch, CertPath = @"Path\To\Your\Certificate\file.pfx", CertPassphrase = "CertificatePassphrase" }, AuthMechanism = AuthMechanism.External }; // Inicializace stream systému var streamSystem = await StreamSystem.Create(config); var superstreamExchange = "IDM_Events_Superstream"; // SAC skupina - jméno systému, který čte zprávy z RabbitMQ superstreamu var sacGroupName = "KodExtSys"; Dictionary storeOffsetTimes = new(); // Vytvoření SAC consumera s retry logikou pro error StreamNotAvailable var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, superstreamExchange) { // Povolení SuperStream konzumace - automaticky napojí 1 consumera na každou partici IsSuperStream = true, // Povolení Single Active Consumer - pouze jeden consumer na partici je aktivní IsSingleActiveConsumer = true, // SAC skupina - consumeři se stejným Reference soutěží o partice Reference = sacGroupName, // Handler zpráv - přijímá zprávy ze VŠECH partic MessageHandler = async (stream, consumerSource, context, message) => { // Konverze obsahu zprávy na string var messageBody = Encoding.UTF8.GetString(message.Data.Contents); //Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] SuperStreamConsumer received from {stream}: {messageBody}"); // Extrakce vlastností zprávy var messageId = message.Properties?.MessageId ?? string.Empty; var subject = message.Properties?.Subject ?? string.Empty; var contentType = message.Properties?.ContentType ?? string.Empty; var userId = message.Properties?.UserId != null ? Encoding.UTF8.GetString(message.Properties.UserId) : string.Empty; // Extrakce application properties var version = message.ApplicationProperties?.ContainsKey("Version") == true ? message.ApplicationProperties["Version"]?.ToString() : string.Empty; // Zde implementujte vlastní logiku zpracování zprávy // ProcessMessage(messageBody, subject, userId, messageId, version); // Hlídáme si čas uložení offsetu pro SAC failover - nevolat příliš často (např. s každou přečtenou zprávou) - takto maximálně jednou za minutu // https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#consumer-offset-tracking if (storeOffsetTimes.TryGetValue(stream, out var lastStoreOffset)) { if (lastStoreOffset < DateTime.Now.AddMinutes(-1)) { await consumerSource.StoreOffset(context.Offset); storeOffsetTimes[stream] = DateTime.Now; } } else { await consumerSource.StoreOffset(context.Offset); storeOffsetTimes.Add(stream, DateTime.Now); } }, // Listener pro změny SAC stavu ConsumerUpdateListener = async (reference, stream, isActive) => { try { // Obnovení od posledního uloženého offsetu var offset = await streamSystem.QueryOffset(reference, stream); return new OffsetTypeOffset(offset + 1); } catch (OffsetNotFoundException) { // Pokud není žádný uložený offset - začít čtení od začátku return new OffsetTypeFirst(); } } }); // Consumer je nyní aktivní a naslouchá zprávám // Implementujte vlastní logiku pro udržení aplikace v běhu // Například: await Task.Delay(Timeout.Infinite); //Console.ReadLine(); // Uvolnění zdrojů při ukončení aplikace await consumer.Close(); await streamSystem.Close(); } } }