using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace DG.Kafka.Worker { internal class KafkaWorkerManager : IKafkaWorkerManager { private TaskConfig _taskConfig; private List _consumers; private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; public KafkaWorkerManager(ILogger logger, IServiceProvider serviceProvider) { _logger = logger; // 读取任务配置 _taskConfig = KafkaClient.Default.ConfigurationManager.GetSection("TaskConfig").Get(); _consumers = KafkaClient.Default.ConfigurationManager.GetSection("Consumers").Get>(); if (_taskConfig == null) { throw new ArgumentNullException(nameof(_taskConfig)); } if (_consumers == null) { throw new ArgumentNullException(nameof(_consumers)); } _serviceProvider = serviceProvider; } public async Task RegisterWorker(string? topic) where TWorker : KafkaWorkerBase { var t = _serviceProvider.GetRequiredService(); _logger.LogDebug($"注册worker: {typeof(TWorker).Name}"); var consumer = _consumers.FirstOrDefault(c => c.Topic == topic); if (consumer == null) { throw new ArgumentNullException(nameof(consumer)); } await KafkaClient.Builder(consumer, t.Start, t.ShopAsync); } public async Task RegisterBatchWorker(string? topic, int batchsize = 1000) where TWorker : BatchKafkaWorkerBase { var t = _serviceProvider.GetRequiredService(); _logger.LogDebug($"注册worker: {typeof(TWorker).Name}"); var consumer = _consumers.FirstOrDefault(c => c.Topic == topic); if (consumer == null) { throw new ArgumentNullException(nameof(consumer)); } await KafkaClient.BatchBuilder(consumer, t.Start, t.ShopAsync, batchsize); } } }