ComplianceServer/code/Hg.Complaint.WebApi/workers/NegativeMessageWorker.cs

85 lines
3.3 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Confluent.Kafka;
using System.Text.Json.Serialization;
namespace Hg.Complaint.WebApi.Workers
{
internal class NegativeMessageWorker : BackgroundService
{
private readonly IConfiguration _configuration;
private readonly IServiceProvider _serviceProvider;
public NegativeMessageWorker(IConfiguration configuration,
IServiceProvider serviceProvider)
{
_configuration = configuration;
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumers = _configuration.GetSection("Consumers").Get<List<Consumer>>();
await Builder(consumers);
}
private async Task Builder(List<Consumer> consumers)
{
foreach (var consumerConfig in consumers)
{
var consumerCisposeDelegate = new ConsumerCisposeDelegate(ConsumerCispose);
Task.Run(async () => await consumerCisposeDelegate.Invoke(consumerConfig));
}
}
public delegate Task ConsumerCisposeDelegate(Consumer consumer);
private async Task ConsumerCispose(Consumer consumerConfig)
{
try
{
var config = new ConsumerConfig
{
BootstrapServers = consumerConfig.Host,
GroupId = consumerConfig.GroupId,
AutoOffsetReset = AutoOffsetReset.Earliest
};
var cancel = false;
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
var topic = consumerConfig.Topic;
consumer.Subscribe(topic);
var scope = _serviceProvider.CreateScope();
var complaintDomain = scope.ServiceProvider.GetRequiredService<IComplaintDomain>();
while (!cancel)
{
var consumeResult = consumer.Consume(CancellationToken.None);
try
{
var options = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNameCaseInsensitive = true
};
Log.Information($"Consumer message: {consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
var negativeMessage = JsonSerializer.Deserialize<NegativeMessageDto>(consumeResult.Message.Value, options);
if (negativeMessage != null)
{
await complaintDomain.AnalyseNegativeMessage(negativeMessage);
}
}
catch (Exception ex)
{
Log.Error(ex, $"消费者处理报错Consumer message: {consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
}
}
consumer.Close();
}
catch (Exception ex)
{
Log.Error(ex, "消费者处理报错!");
}
}
}
}