using Microsoft.Extensions.Logging; using Microsoft.Extensions.Configuration; namespace DG.Kafka.Worker { internal class WorkerManager : IWorkerManager { private TaskConfig _taskConfig; private int _milliSecondsDelay; private readonly ILogger _logger; public WorkerManager(ILogger logger) { _logger = logger; // 读取任务配置 _taskConfig = KafkaClient.Default.ConfigurationManager.GetSection("TaskConfig").Get(); if (_taskConfig == null) { throw new ArgumentNullException(nameof(_taskConfig)); } _milliSecondsDelay = _taskConfig.MilliSecondsDelay; } public void DoWork(Action doWork) { try { doWork(); } catch (Exception ex) { _logger.LogError(ex, $"[{DateTimeOffset.Now}] [{_taskConfig?.TaskName}] 获取配置失败!"); return; } } public async Task DoWorkAsync(Func doWork) { while(true) { _logger.LogDebug($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务开始执行11"); try { if (_milliSecondsDelay <= 0) { _logger.LogWarning($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务定时时间不能少于0!"); continue; } if (!_taskConfig.Enable) { await Task.Delay(_milliSecondsDelay); _logger.LogWarning($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务停止启动!"); continue; } // 执行主逻辑 await doWork(); } catch (Exception ex) { _logger.LogError(ex, $"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}]"); } _logger.LogDebug($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务执行结束"); await Task.Delay(_milliSecondsDelay); } } } }