wwservice/wwsync/kafkatool.cs

324 lines
13 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using Confluent.Kafka;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
using model;
namespace wwsync
{
internal class kafkatool
{
DelayQueue<Tuple<string, string, JObject>> delayqueue = new();
HashSet<string> eventset = new();
public void kafka_consume()
{
var conf = new ConsumerConfig
{
GroupId = appsetteings.kafka_topicgroup,
BootstrapServers = appsetteings.kafka_server,
AutoOffsetReset = AutoOffsetReset.Latest
};
LogHelper.Info($"start kafka_consume at {DateTime.Now}");
//
Task.Run(async () => {
while (true)
{
try
{
var s = delayqueue.Dequeue();
if (s != default(Tuple<string, string, JObject>))
{
var sevent = s.Item1;
var key = s.Item2;
var keys = key.Split('|');
var obj = s.Item3;
if (eventset.Contains(key))
eventset.Remove(key);
if (sevent == "change_external_contact")
{
var changetype = keys[0];
var extuserid = keys[2];
var t = wwuser_update(changetype, extuserid, obj);
t.Wait();
}
else if (sevent == "change_external_tag")
{
var corpid = keys[1];
var t = refreshcorptag(corpid);
t.Wait();
}
}
else
{
if (delayqueue.Count == 0)
eventset.Clear();
Thread.Sleep(5 * 1000);
}
}
catch (Exception ex)
{
LogHelper.Error(ex.ToString());
}
}
});
Task.Run(async () =>
{
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(appsetteings.kafka_wwtopic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
//LogHelper.Info($"{cr.Topic}:{cr.Message.Value} offset:{cr.TopicPartitionOffset}");
domessage(cr.Message.Value);
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
});
}
void domessage(string data)
{
var dd = JObject.Parse(data);
var dt = (JObject)dd["xml"];
if (!dt.ContainsKey("Event"))
return;
if (!appsetteings.allcorp.ContainsKey(dt["ToUserName"].ToString()))
return;
try
{
string strevent = dt["Event"].ToString();
if (strevent == "change_external_contact")
{
Enqueue_obj(strevent, $"{dt["ChangeType"].ToString()}|{dt["ToUserName"].ToString()}|{dt["ExternalUserID"].ToString()}", dt, new TimeSpan(0, 0, 10));
return;
}
if (strevent == "change_external_tag")
{
Enqueue_obj(strevent, $"{strevent}|{dt["ToUserName"].ToString()}", dt, new TimeSpan(0, 2, 0));
}
}
catch(Exception ex)
{
LogHelper.Error($"domessage() {ex.ToString()}");
}
}
void Enqueue_obj(string strevent, string key, JObject obj, TimeSpan span)
{
//new TimeSpan(0, 0, 10)
if (eventset.Contains(key))
return;
delayqueue.Enqueue(new Tuple<string, string, JObject>(strevent, key, obj), span);
eventset.Add(key);
}
async Task refreshcorptag(string corpid)
{
if (!appsetteings.allcorp.ContainsKey(corpid))
return;
var corp = appsetteings.allcorp[corpid];
try
{
Console.WriteLine($"corptag_sync {corp.corpid}");
var obj = await apicaller.wwapiwithtoken(appsetteings.URL_corptaglist, corp.corpid, corp.khsecret);
if (obj["errcode"].ToString() != "0")
return;
Dictionary<string, ww_grouptag> dttags = new Dictionary<string, ww_grouptag>();
foreach (JObject dpt in obj["tag_group"])
{
ww_grouptag oo = new ww_grouptag()
{
corpid = corp.corpid,
group_id = dpt["group_id"].ToString(),
group_name = dpt["group_name"].ToString(),
create_time = long.Parse(dpt["create_time"].ToString()),
order = int.Parse(dpt["order"].ToString()),
deleted = dpt.ContainsKey("deleted") ? bool.Parse(dpt["deleted"].ToString()) : false,
tag = dpt["tag"].ToString()
};
dttags[oo.group_id] = oo;
}
if (dttags.Count == 0)
return;
using (var context = new DataContext())
{
List<ww_grouptag> oldtags = context.ww_grouptags.Where(obj => obj.corpid == corp.corpid).ToList();
HashSet<string> old_ids = new HashSet<string>();
foreach (var gtag in oldtags)
{
old_ids.Add(gtag.group_id);
if (!dttags.ContainsKey(gtag.group_id))
{
context.ww_grouptags.Remove(gtag);
context.SaveChanges();
}
else
{
var webgtag = dttags[gtag.group_id];
context.Entry(gtag).CurrentValues.SetValues(webgtag);
context.SaveChanges();
}
}
var newids = dttags.Keys.Except(old_ids).ToList();
foreach (var id in newids)
{
var webgtag = dttags[id];
context.ww_grouptags.Add(webgtag);
context.SaveChanges();
}
}
}
catch (Exception ex)
{
LogHelper.Error(ex.ToString());
}
}
async Task wwuser_update(string changetype, string extuserid, JObject obj)
{
string corpid = obj["ToUserName"].ToString();
string userid = obj.ContainsKey("UserID") ? obj["UserID"].ToString() : string.Empty;
var wbuser = await apicaller.wwapiwithtoken(string.Format(appsetteings.URL_extuserinfo, extuserid), corpid, appsetteings.allcorp[corpid].khsecret);
if (wbuser["errcode"].ToString() != "0")
return;
DataContext context = new DataContext();
try
{
wbuser.Remove("errcode");
wbuser.Remove("errmsg");
ww_extuser? dbuser = context.ww_extusres.FirstOrDefault(obj => obj.userid == extuserid && obj.corpid == corpid);
ww_user_extuser? dbueu = context.ww_user_extusers.FirstOrDefault(obj => obj.userid == userid && obj.extuserid == extuserid);
ww_extuser user = new()
{
corpid = corpid,
userid = extuserid,
unionid = (wbuser["external_contact"] as JObject).ContainsKey("unionid") ? wbuser["external_contact"]["unionid"].ToString() : "",
name = wbuser["external_contact"]["name"].ToString(),
avatar = wbuser["external_contact"]["avatar"].ToString(),
lastupdate = DateTime.Now,
exinfo = wbuser.ToString()
};
if (dbuser == null)
{
user.ctime = DateTime.Now;
context.ww_extusres.Add(user);
}
else
{
user.ctime = dbuser.ctime;
context.Entry(dbuser).CurrentValues.SetValues(user);
}
context.SaveChanges();
//
//add_external_contact
if (changetype == "add_external_contact" || changetype == "add_half_external_contact" || changetype== "edit_external_contact")
{
ww_user_extuser friend = null;
if (!wbuser.ContainsKey("follow_user"))
return;
foreach (var item in wbuser["follow_user"])
{
if (item["userid"].ToString() != userid)
continue;
int type = 0;
if ((item as JObject).ContainsKey("oper_userid"))
{
if (item["oper_userid"].ToString() == userid)
type = 1;
else if (item["oper_userid"].ToString() != extuserid)
type = 2;
}
//tags
JArray tag1 = new JArray();
JArray tag2 = new JArray();
if ((item as JObject).ContainsKey("tags"))
{
foreach (JObject tag in item["tags"])
{
if (tag["type"].ToString() == "1")
tag1.Add(tag);
else if (tag["type"].ToString() == "2")
tag2.Add(tag);
}
}
friend = new()
{
corpid = corpid,
userid = userid,
extuserid = extuserid,
remark = item["remark"].ToString(),
description = item["description"].ToString(),
createtime = long.Parse(item["createtime"].ToString()),
deleted = false,
tags_type1 = tag1.ToString(),
tags_type2 = tag2.ToString(),
remark_corp_name = (item as JObject).ContainsKey("remark_corp_name") ? item["remark_corp_name"].ToString() : "",
add_type = type,
add_way = Int32.Parse(item["add_way"].ToString())
};
}
if (friend == null)
return;
if (dbueu == null)
context.ww_user_extusers.Add(friend);
else
context.Entry(dbueu).CurrentValues.SetValues(friend);
context.SaveChanges();
}
else if (changetype == "del_external_contact"||changetype== "del_follow_user")
{
if (dbueu != null)
dbueu.deleted = true;
context.SaveChanges();
}
}
catch (Exception ex)
{
LogHelper.Error(ex.ToString());
}
finally
{
context.Dispose();
}
}
}
}