Zxd.Core/code/DG.Kafka/KafkaProducer.cs

48 lines
1.8 KiB
C#

using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using static Confluent.Kafka.ConfigPropertyNames;
namespace DG.Kafka
{
internal class KafkaProducer : IKafkaProducer
{
private static List<Consumer> _consumers { get; set; } = new List<Consumer>();
private Dictionary<string, IProducer<Null, string>> _producers;
public KafkaProducer()
{
_consumers = KafkaClient.Default.ConfigurationManager.GetSection("Consumers").Get<List<Consumer>>();
_producers = new Dictionary<string, IProducer<Null, string>>();
foreach (var consumer in _consumers)
{
var config = new ProducerConfig
{
BootstrapServers = consumer.Host,
};
var topic = consumer.Topic;
Action<DeliveryReport<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");
_producers.Add(consumer.Topic, new ProducerBuilder<Null, string>(config).Build());
}
}
public async Task ProduceAsync<TMessage>(string? topic, TMessage message)
{
var producer = _producers.First().Value;
if (!string.IsNullOrEmpty(topic))
{
producer = _producers.GetValueOrDefault(topic);
}
await producer.ProduceAsync(topic, new Message<Null, string> { Value = JsonSerializer.Serialize(message) });
producer.Flush();
}
}
}