85 lines
3.3 KiB
C#
85 lines
3.3 KiB
C#
using Confluent.Kafka;
|
||
using System.Text.Json.Serialization;
|
||
|
||
namespace Hg.Complaint.WebApi.Workers
|
||
{
|
||
internal class NegativeMessageWorker : BackgroundService
|
||
{
|
||
private readonly IConfiguration _configuration;
|
||
private readonly IServiceProvider _serviceProvider;
|
||
|
||
public NegativeMessageWorker(IConfiguration configuration,
|
||
IServiceProvider serviceProvider)
|
||
{
|
||
_configuration = configuration;
|
||
_serviceProvider = serviceProvider;
|
||
}
|
||
|
||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||
{
|
||
var consumers = _configuration.GetSection("Consumers").Get<List<Consumer>>();
|
||
await Builder(consumers);
|
||
}
|
||
|
||
private async Task Builder(List<Consumer> consumers)
|
||
{
|
||
foreach (var consumerConfig in consumers)
|
||
{
|
||
var consumerCisposeDelegate = new ConsumerCisposeDelegate(ConsumerCispose);
|
||
Task.Run(async () => await consumerCisposeDelegate.Invoke(consumerConfig));
|
||
}
|
||
}
|
||
|
||
public delegate Task ConsumerCisposeDelegate(Consumer consumer);
|
||
|
||
private async Task ConsumerCispose(Consumer consumerConfig)
|
||
{
|
||
try
|
||
{
|
||
var config = new ConsumerConfig
|
||
{
|
||
BootstrapServers = consumerConfig.Host,
|
||
GroupId = consumerConfig.GroupId,
|
||
AutoOffsetReset = AutoOffsetReset.Earliest
|
||
};
|
||
|
||
var cancel = false;
|
||
|
||
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
|
||
var topic = consumerConfig.Topic;
|
||
consumer.Subscribe(topic);
|
||
var scope = _serviceProvider.CreateScope();
|
||
var complaintDomain = scope.ServiceProvider.GetRequiredService<IComplaintDomain>();
|
||
while (!cancel)
|
||
{
|
||
var consumeResult = consumer.Consume(CancellationToken.None);
|
||
try
|
||
{
|
||
var options = new JsonSerializerOptions
|
||
{
|
||
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
|
||
PropertyNameCaseInsensitive = true
|
||
};
|
||
Log.Information($"Consumer message: {consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
|
||
var negativeMessage = JsonSerializer.Deserialize<NegativeMessageDto>(consumeResult.Message.Value, options);
|
||
if (negativeMessage != null)
|
||
{
|
||
await complaintDomain.AnalyseNegativeMessage(negativeMessage);
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Log.Error(ex, $"消费者处理报错!Consumer message: {consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
|
||
}
|
||
}
|
||
|
||
consumer.Close();
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Log.Error(ex, "消费者处理报错!");
|
||
}
|
||
}
|
||
}
|
||
}
|