48 lines
1.8 KiB
C#
48 lines
1.8 KiB
C#
using Confluent.Kafka;
|
|
using Microsoft.Extensions.Configuration;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using System.Threading.Tasks;
|
|
using static Confluent.Kafka.ConfigPropertyNames;
|
|
|
|
namespace DG.Kafka
|
|
{
|
|
internal class KafkaProducer : IKafkaProducer
|
|
{
|
|
private static List<Consumer> _consumers { get; set; } = new List<Consumer>();
|
|
private Dictionary<string, IProducer<Null, string>> _producers;
|
|
public KafkaProducer()
|
|
{
|
|
_consumers = KafkaClient.Default.ConfigurationManager.GetSection("Consumers").Get<List<Consumer>>();
|
|
_producers = new Dictionary<string, IProducer<Null, string>>();
|
|
foreach (var consumer in _consumers)
|
|
{
|
|
var config = new ProducerConfig
|
|
{
|
|
BootstrapServers = consumer.Host,
|
|
};
|
|
var topic = consumer.Topic;
|
|
Action<DeliveryReport<Null, string>> handler = r =>
|
|
Console.WriteLine(!r.Error.IsError
|
|
? $"Delivered message to {r.TopicPartitionOffset}"
|
|
: $"Delivery Error: {r.Error.Reason}");
|
|
_producers.Add(consumer.Topic, new ProducerBuilder<Null, string>(config).Build());
|
|
}
|
|
}
|
|
|
|
public async Task ProduceAsync<TMessage>(string? topic, TMessage message)
|
|
{
|
|
var producer = _producers.First().Value;
|
|
if (!string.IsNullOrEmpty(topic))
|
|
{
|
|
producer = _producers.GetValueOrDefault(topic);
|
|
}
|
|
await producer.ProduceAsync(topic, new Message<Null, string> { Value = JsonSerializer.Serialize(message) });
|
|
producer.Flush();
|
|
}
|
|
}
|
|
}
|