using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading; using Confluent.Kafka; using Newtonsoft.Json.Linq; using Newtonsoft.Json; using model; namespace wwsync { internal class kafkatool { DelayQueue> delayqueue = new(); HashSet eventset = new(); public void kafka_consume() { var conf = new ConsumerConfig { GroupId = appsetteings.kafka_topicgroup, BootstrapServers = appsetteings.kafka_server, AutoOffsetReset = AutoOffsetReset.Latest }; LogHelper.Info($"start kafka_consume at {DateTime.Now}"); // Task.Run(async () => { while (true) { try { var s = delayqueue.Dequeue(); if (s != default(Tuple)) { var sevent = s.Item1; var key = s.Item2; var keys = key.Split('|'); var obj = s.Item3; if (eventset.Contains(key)) eventset.Remove(key); if (sevent == "change_external_contact") { var changetype = keys[0]; var extuserid = keys[2]; var t = wwuser_update(changetype, extuserid, obj); t.Wait(); } else if (sevent == "change_external_tag") { var corpid = keys[1]; var t = refreshcorptag(corpid); t.Wait(); } } else { if (delayqueue.Count == 0) eventset.Clear(); Thread.Sleep(5 * 1000); } } catch (Exception ex) { LogHelper.Error(ex.ToString()); } } }); Task.Run(async () => { using (var c = new ConsumerBuilder(conf).Build()) { c.Subscribe(appsetteings.kafka_wwtopic); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); //LogHelper.Info($"{cr.Topic}:{cr.Message.Value} offset:{cr.TopicPartitionOffset}"); domessage(cr.Message.Value); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. c.Close(); } } }); } void domessage(string data) { var dd = JObject.Parse(data); var dt = (JObject)dd["xml"]; if (!dt.ContainsKey("Event")) return; if (!appsetteings.allcorp.ContainsKey(dt["ToUserName"].ToString())) return; try { string strevent = dt["Event"].ToString(); if (strevent == "change_external_contact") { Enqueue_obj(strevent, $"{dt["ChangeType"].ToString()}|{dt["ToUserName"].ToString()}|{dt["ExternalUserID"].ToString()}", dt, new TimeSpan(0, 0, 10)); return; } if (strevent == "change_external_tag") { Enqueue_obj(strevent, $"{strevent}|{dt["ToUserName"].ToString()}", dt, new TimeSpan(0, 2, 0)); } } catch(Exception ex) { LogHelper.Error($"domessage() {ex.ToString()}"); } } void Enqueue_obj(string strevent, string key, JObject obj, TimeSpan span) { //new TimeSpan(0, 0, 10) if (eventset.Contains(key)) return; delayqueue.Enqueue(new Tuple(strevent, key, obj), span); eventset.Add(key); } async Task refreshcorptag(string corpid) { if (!appsetteings.allcorp.ContainsKey(corpid)) return; var corp = appsetteings.allcorp[corpid]; try { Console.WriteLine($"corptag_sync {corp.corpid}"); var obj = await apicaller.wwapiwithtoken(appsetteings.URL_corptaglist, corp.corpid, corp.khsecret); if (obj["errcode"].ToString() != "0") return; Dictionary dttags = new Dictionary(); foreach (JObject dpt in obj["tag_group"]) { ww_grouptag oo = new ww_grouptag() { corpid = corp.corpid, group_id = dpt["group_id"].ToString(), group_name = dpt["group_name"].ToString(), create_time = long.Parse(dpt["create_time"].ToString()), order = int.Parse(dpt["order"].ToString()), deleted = dpt.ContainsKey("deleted") ? bool.Parse(dpt["deleted"].ToString()) : false, tag = dpt["tag"].ToString() }; dttags[oo.group_id] = oo; } if (dttags.Count == 0) return; using (var context = new DataContext()) { List oldtags = context.ww_grouptags.Where(obj => obj.corpid == corp.corpid).ToList(); HashSet old_ids = new HashSet(); foreach (var gtag in oldtags) { old_ids.Add(gtag.group_id); if (!dttags.ContainsKey(gtag.group_id)) { context.ww_grouptags.Remove(gtag); context.SaveChanges(); } else { var webgtag = dttags[gtag.group_id]; context.Entry(gtag).CurrentValues.SetValues(webgtag); context.SaveChanges(); } } var newids = dttags.Keys.Except(old_ids).ToList(); foreach (var id in newids) { var webgtag = dttags[id]; context.ww_grouptags.Add(webgtag); context.SaveChanges(); } } } catch (Exception ex) { LogHelper.Error(ex.ToString()); } } async Task wwuser_update(string changetype, string extuserid, JObject obj) { string corpid = obj["ToUserName"].ToString(); string userid = obj.ContainsKey("UserID") ? obj["UserID"].ToString() : string.Empty; var wbuser = await apicaller.wwapiwithtoken(string.Format(appsetteings.URL_extuserinfo, extuserid), corpid, appsetteings.allcorp[corpid].khsecret); if (wbuser["errcode"].ToString() != "0") return; DataContext context = new DataContext(); try { wbuser.Remove("errcode"); wbuser.Remove("errmsg"); ww_extuser? dbuser = context.ww_extusres.FirstOrDefault(obj => obj.userid == extuserid && obj.corpid == corpid); ww_user_extuser? dbueu = context.ww_user_extusers.FirstOrDefault(obj => obj.userid == userid && obj.extuserid == extuserid); ww_extuser user = new() { corpid = corpid, userid = extuserid, unionid = (wbuser["external_contact"] as JObject).ContainsKey("unionid") ? wbuser["external_contact"]["unionid"].ToString() : "", name = wbuser["external_contact"]["name"].ToString(), avatar = wbuser["external_contact"]["avatar"].ToString(), lastupdate = DateTime.Now, exinfo = wbuser.ToString() }; if (dbuser == null) { user.ctime = DateTime.Now; context.ww_extusres.Add(user); } else { user.ctime = dbuser.ctime; context.Entry(dbuser).CurrentValues.SetValues(user); } context.SaveChanges(); // //add_external_contact if (changetype == "add_external_contact" || changetype == "add_half_external_contact" || changetype== "edit_external_contact") { ww_user_extuser friend = null; if (!wbuser.ContainsKey("follow_user")) return; foreach (var item in wbuser["follow_user"]) { if (item["userid"].ToString() != userid) continue; int type = 0; if ((item as JObject).ContainsKey("oper_userid")) { if (item["oper_userid"].ToString() == userid) type = 1; else if (item["oper_userid"].ToString() != extuserid) type = 2; } //tags JArray tag1 = new JArray(); JArray tag2 = new JArray(); if ((item as JObject).ContainsKey("tags")) { foreach (JObject tag in item["tags"]) { if (tag["type"].ToString() == "1") tag1.Add(tag); else if (tag["type"].ToString() == "2") tag2.Add(tag); } } friend = new() { corpid = corpid, userid = userid, extuserid = extuserid, remark = item["remark"].ToString(), description = item["description"].ToString(), createtime = long.Parse(item["createtime"].ToString()), deleted = false, tags_type1 = tag1.ToString(), tags_type2 = tag2.ToString(), remark_corp_name = (item as JObject).ContainsKey("remark_corp_name") ? item["remark_corp_name"].ToString() : "", add_type = type, add_way = Int32.Parse(item["add_way"].ToString()) }; } if (friend == null) return; if (dbueu == null) context.ww_user_extusers.Add(friend); else context.Entry(dbueu).CurrentValues.SetValues(friend); context.SaveChanges(); } else if (changetype == "del_external_contact"||changetype== "del_follow_user") { if (dbueu != null) dbueu.deleted = true; context.SaveChanges(); } } catch (Exception ex) { LogHelper.Error(ex.ToString()); } finally { context.Dispose(); } } } }