67 lines
2.1 KiB
C#
67 lines
2.1 KiB
C#
using Microsoft.Extensions.Configuration;
|
||
using Microsoft.Extensions.Logging;
|
||
using System;
|
||
using System.Collections.Generic;
|
||
using System.Diagnostics;
|
||
using System.Linq;
|
||
using System.Text;
|
||
using System.Threading.Tasks;
|
||
|
||
namespace DG.Kafka.Worker
|
||
{
|
||
public abstract class BatchKafkaWorkerBase<T>
|
||
{
|
||
private readonly ILogger<BatchKafkaWorkerBase<T>> _logger;
|
||
private TaskConfig _taskConfig;
|
||
private int _milliSecondsDelay;
|
||
|
||
public BatchKafkaWorkerBase(ILogger<BatchKafkaWorkerBase<T>> logger)
|
||
{
|
||
_logger = logger;
|
||
_taskConfig = KafkaClient.Default.ConfigurationManager.GetSection("TaskConfig").Get<TaskConfig>();
|
||
if (_taskConfig == null)
|
||
{
|
||
throw new ArgumentNullException(nameof(_taskConfig));
|
||
}
|
||
_milliSecondsDelay = _taskConfig.MilliSecondsDelay;
|
||
}
|
||
|
||
public async Task Start(List<T> t)
|
||
{
|
||
Stopwatch watch = new Stopwatch();
|
||
watch.Reset();
|
||
watch.Start();
|
||
try
|
||
{
|
||
if (_milliSecondsDelay <= 0)
|
||
{
|
||
_logger.LogWarning($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务定时时间不能少于0!");
|
||
return;
|
||
}
|
||
if (!_taskConfig.Enable)
|
||
{
|
||
await Task.Delay(_milliSecondsDelay);
|
||
_logger.LogWarning($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务停止启动!");
|
||
return;
|
||
}
|
||
|
||
await DoWorkAsync(t);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, $"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}]");
|
||
}
|
||
watch.Stop();
|
||
double costtime = watch.ElapsedMilliseconds;
|
||
_logger.LogDebug($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务执行结束,用时:{costtime}ms");
|
||
}
|
||
|
||
protected virtual async Task DoWorkAsync(List<T> t)
|
||
{
|
||
}
|
||
|
||
public virtual async Task ShopAsync()
|
||
{
|
||
}
|
||
}
|
||
} |