using Dapper; using DG.Kafka.Worker; using EventConsole.Dto; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Oracle.ManagedDataAccess.Client; using System.Text; namespace EventConsole.Worker { public class EventData : KafkaWorkerBase { private readonly ILogger _logger; private readonly IConfiguration _configuration; public EventData( ILogger logger, IConfiguration configuration) : base(logger) { _logger = logger; _configuration = configuration; } protected override async Task DoWorkAsync(EventDto t) { //{"data":"{ //\"act_date\":\"2023-06-12\", //\"act_time\":1686552844000, //\"appid\":\"wxddde96d102f8a305\", //\"appuserid\":\"oqhukuDkzhBX7SWbS_KL5tppIFPw\", //\"channel\":7000, //\"customerid\":1573295, //\"deptid\":27, //\"insert_date\":\"2023-06-12\", //\"insert_time\":1686552846000, //\"ip\":\"119.3.119.18\", //\"resid\": \"366054818867948809\", //\"scene_id\":\"1191\", //\"scene_name\":\"东方投研服务中心\", //\"scene_type_id\":5, //\"scene_type_name\":\"工作室\", //\"sinkClickhouseTable\":\"act_soft_visit\", //\"source_table\":\"action\", //\"uid\":1890484, //\"unionid\":\"o5bj2weLouL2c1Cn4wHoGRq3JHFU\", //\"user_type\":1, //\"with_centerid\":0, //\"with_deptid\":0, //\"with_eid\":0, //\"with_groupid\":0, //\"with_orgid\":0}","tableName":"act_soft_visit"} try { var data = JsonConvert.DeserializeObject(t.data); var depts = await GetDeptsAsync(); var dept = depts.Where(w => w.id == data.deptid).FirstOrDefault(); if (dept!= null && !string.IsNullOrEmpty(dept.appid)) { var connectionString = _configuration.GetConnectionString(dept.appid); if (!string.IsNullOrEmpty(connectionString)) { using (var dbConnection = new OracleConnection(connectionString)) { var model = new CSVR_TODOITEM { RESID = data.resid, RESOURCETAG = t.tableName }; var sql = @" insert into CSVR_TODOITEM (Sendeduserid,Receiveduserid,ISPRIVATE,Resid,Memo,Starttime,Dostatus,Doremark,Dotime,Douserid,Ctime,URL,Urltitle,Resourcetag,REMARK) select :Sendeduserid,:Receiveduserid,:ISPRIVATE,:Resid,:Memo,:Starttime,:Dostatus,:Doremark,:Dotime,:Douserid,:Ctime,:URL,:Urltitle,:Resourcetag,:REMARK from dual"; await dbConnection.ExecuteAsync(sql, model); } } } } catch (Exception ex) { _logger.LogError(ex, "EventDataWorkerError", t); } await base.DoWorkAsync(t); } /// /// 获取所有部门信息 /// /// private async Task> GetDeptsAsync() { var items = new List(); var host = _configuration.GetValue("ZxdCoreWebApiHost"); if (string.IsNullOrEmpty(host)) { host = "http://120.77.165.155:8089"; } var url = $"{host}/Api/Deptment/Depts"; var handler = new HttpClientHandler { ServerCertificateCustomValidationCallback = (message, cert, chain, error) => true }; var client = new HttpClient(handler) { Timeout = TimeSpan.FromSeconds(30) }; var result = await client.GetAsync(url); if (result.IsSuccessStatusCode) { var bytes = await result.Content.ReadAsByteArrayAsync(); var rspJson = Encoding.UTF8.GetString(bytes); var rsp = JsonConvert.DeserializeObject(rspJson); if (rsp != null && rsp.data != null) { items.AddRange(rsp.data.Where(w => !string.IsNullOrEmpty(w.appid)).Select(o => new DeptItem { id=o.id, departmentId=o.departmentId, appid=o.appid })); } } return items; } } public class DeptItem { public int id { get; set; } public int departmentId { get; set; } public string appid { get; set; } } public class GetDepartmentResponse { public List data { get; set; } public int code { get; set; } public string message { get; set; } } public class DepartmentItem { public int id { get; set; } public string title { get; set; } public string code { get; set; } public string appid { get; set; } public int departmentId { get; set; } public bool isDept { get; set; } public string companyCode { get; set; } public List deptmentCampains { get; set; } } public class Deptmentcampain { public int startCampainId { get; set; } public int endCampainId { get; set; } } }