Zxd.Core/code/DG.Kafka/KafkaClient.cs

221 lines
8.3 KiB
C#

using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace DG.Kafka
{
public class KafkaClient
{
/// <summary>
/// 是否停止服务
/// </summary>
public static bool Shop { get; set; } = false;
public void ReadFromConfiguration(IConfiguration configuration)
{
ConfigurationManager = configuration;
}
public IConfiguration ConfigurationManager { get; private set; }
private static readonly Lazy<KafkaClient> _defaultClient = new(() => new KafkaClient());
public static KafkaClient Default
{
get { return _defaultClient.Value; }
}
private static List<Consumer> _consumers { get; set; } = new List<Consumer>();
public static List<Consumer> GetConsumers()
{
_consumers = Default.ConfigurationManager.GetSection("Consumers").Get<List<Consumer>>();
return _consumers;
}
public static async Task Builder<T>(Consumer consumer, Func<T, Task> received, Func<Task> shoped)
{
await Task.Run(() =>
{
var tasks = new List<Task>();
//var receivedDelegate = new ReceivedDelegate<T>(ReceivedAsync);
ThreadPool.QueueUserWorkItem(async task =>
{
await ReceivedAsync(consumer, received, shoped);
});
});
}
public static async Task BatchBuilder<T>(Consumer consumer, Func<List<T>, Task> received, Func<Task> shoped, int batchsize = 1000)
{
await Task.Run(() =>
{
var tasks = new List<Task>();
//var receivedDelegate = new BatchReceivedDelegate<T>(BatchReceivedAsync);
ThreadPool.QueueUserWorkItem(async task =>
{
await BatchReceivedAsync(consumer, received, shoped, batchsize);
});
});
}
public delegate Task ReceivedDelegate<T>(Consumer consumer, Func<T, Task> received);
public delegate Task BatchReceivedDelegate<T>(Consumer consumer, Func<List<T>, Task> received, int batchsize = 1000);
public static async Task ReceivedAsync<T>(Consumer consumer, Func<T, Task> received, Func<Task> shoped)
{
try {
var consumerConfig = new ConsumerConfig
{
BootstrapServers = consumer.Host,
GroupId = consumer.GroupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
var cancel = false;
using var consumerBuilder = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
Console.WriteLine($"Kafka alone 连接成功,配置: {JsonSerializer.Serialize(consumerConfig)}");
var topic = consumer.Topic;
consumerBuilder.Subscribe(topic);
while (!cancel)
{
try
{
if (Shop)
{
await shoped();
return;
}
var consumeResult = consumerBuilder.Consume(CancellationToken.None);
Console.WriteLine($"Consumer message: {consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
var message = JsonSerializer.Deserialize<T>(consumeResult.Message.Value);
if (message != null)
{
await received(message);
}
try
{
consumerBuilder.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine(e.Message);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
await Task.Delay(1);
}
consumerBuilder.Close();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
public static async Task BatchReceivedAsync<T>(Consumer consumer, Func<List<T>, Task> received, Func<Task> shoped, int batchsize = 1000)
{
try
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = consumer.Host,
GroupId = consumer.GroupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
var cancel = false;
using var consumerBuilder = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
Console.WriteLine($"Kafka batch 连接成功,配置: {JsonSerializer.Serialize(consumerConfig)}");
var topic = consumer.Topic;
consumerBuilder.Subscribe(topic);
while (!cancel)
{
try
{
if (Shop)
{
await shoped();
return;
}
var batchRecords = new List<T>();
var consumeResults = new List<ConsumeResult<Ignore, string>>();
while (batchRecords.Count < batchsize)
{
var consumeResult = consumerBuilder.Consume(CancellationToken.None);
Console.WriteLine($"Consumer message: {consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
var message = JsonSerializer.Deserialize<T>(consumeResult.Message.Value);
if (message == null)
break; // 贤有更多的消启可供消裁
consumeResults.Add(consumeResult);
batchRecords.Add(message);
}
await received(batchRecords);
foreach (var consumeResult in consumeResults)
{
consumerBuilder.Commit(consumeResult);
}
}
catch (KafkaException e)
{
Console.WriteLine("KafkaException:" + e.Message);
}
catch (Exception ex)
{
Console.WriteLine("KafkaException:" + ex.ToString());
}
await Task.Delay(1);
}
consumerBuilder.Close();
}
catch (Exception ex)
{
Console.WriteLine("KafkaException:" + ex.ToString());
}
}
public static async Task SendMessage<TMessage>(Consumer consumer, TMessage message)
{
var config = new ProducerConfig
{
BootstrapServers = consumer.Host,
};
var topic = consumer.Topic;
Action<DeliveryReport<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
p.Produce(topic, new Message<Null, string> { Value = JsonSerializer.Serialize(message) });
p.Flush(TimeSpan.FromSeconds(10));
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
}