Zxd.Core/code/DG.Kafka.Worker/KafkaWorkerBase.cs

71 lines
2.2 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace DG.Kafka.Worker
{
public abstract class KafkaWorkerBase<T>
{
private readonly ILogger<KafkaWorkerBase<T>> _logger;
private TaskConfig _taskConfig;
private int _milliSecondsDelay;
public KafkaWorkerBase(ILogger<KafkaWorkerBase<T>> logger)
{
_logger = logger;
_taskConfig = KafkaClient.Default.ConfigurationManager.GetSection("TaskConfig").Get<TaskConfig>();
if (_taskConfig == null)
{
throw new ArgumentNullException(nameof(_taskConfig));
}
_milliSecondsDelay = _taskConfig.MilliSecondsDelay;
}
public async Task Start(T t)
{
Stopwatch watch = new Stopwatch();
watch.Reset();
watch.Start();
_logger.LogDebug($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务开始执行1");
try
{
if (_milliSecondsDelay <= 0)
{
_logger.LogWarning($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务定时时间不能少于0");
return;
}
if (!_taskConfig.Enable)
{
await Task.Delay(_milliSecondsDelay);
_logger.LogWarning($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务停止启动!");
return;
}
await DoWorkAsync(t);
}
catch (Exception ex)
{
_logger.LogError(ex, $"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}]");
}
watch.Stop();
double costtime = watch.ElapsedMilliseconds;
_logger.LogDebug($"[{DateTimeOffset.Now}] [{_taskConfig.TaskName}] 任务执行结束,用时:{costtime}ms");
}
protected virtual async Task DoWorkAsync(T t)
{
}
public virtual async Task ShopAsync()
{
}
}
}