using Confluent.Kafka; using System.Text.Json.Serialization; namespace Hg.Complaint.WebApi.Workers { internal class NegativeMessageWorker : BackgroundService { private readonly IConfiguration _configuration; private readonly IServiceProvider _serviceProvider; public NegativeMessageWorker(IConfiguration configuration, IServiceProvider serviceProvider) { _configuration = configuration; _serviceProvider = serviceProvider; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var consumers = _configuration.GetSection("Consumers").Get>(); await Builder(consumers); } private async Task Builder(List consumers) { foreach (var consumerConfig in consumers) { var consumerCisposeDelegate = new ConsumerCisposeDelegate(ConsumerCispose); Task.Run(async () => await consumerCisposeDelegate.Invoke(consumerConfig)); } } public delegate Task ConsumerCisposeDelegate(Consumer consumer); private async Task ConsumerCispose(Consumer consumerConfig) { try { var config = new ConsumerConfig { BootstrapServers = consumerConfig.Host, GroupId = consumerConfig.GroupId, AutoOffsetReset = AutoOffsetReset.Earliest }; var cancel = false; using var consumer = new ConsumerBuilder(config).Build(); var topic = consumerConfig.Topic; consumer.Subscribe(topic); var scope = _serviceProvider.CreateScope(); var complaintDomain = scope.ServiceProvider.GetRequiredService(); while (!cancel) { var consumeResult = consumer.Consume(CancellationToken.None); try { var options = new JsonSerializerOptions { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, PropertyNameCaseInsensitive = true }; Log.Information($"Consumer message: {consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}"); var negativeMessage = JsonSerializer.Deserialize(consumeResult.Message.Value, options); if (negativeMessage != null) { await complaintDomain.AnalyseNegativeMessage(negativeMessage); } } catch (Exception ex) { Log.Error(ex, $"消费者处理报错!Consumer message: {consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}"); } } consumer.Close(); } catch (Exception ex) { Log.Error(ex, "消费者处理报错!"); } } } }