using SuperSocket.ClientEngine; using SuperSocket.ProtoBase; using System; using System.Data.Entity.Validation; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; using WX.CRM.Common; using WX.CRM.DataSynClient.Application; using WX.CRM.DataSynClient.Domain; namespace WX.CRM.DataSynClient.Client { internal class SocketClientManage { static EasyClient client; static Task ReadLogTask; static Task WatchSocketTask; static Task ConnectionTask; private static ManualResetEvent ResetEvent = new ManualResetEvent(false); private static readonly string _clinetCode = Utility.GetSettingOrNullByKey("DataClientCode"); /// /// 是否连接 /// public static bool IsConnected { get { if (client == null) return false; return client.IsConnected; } } /// /// 创建一个新的Socket连接 /// /// /// public static void StartTcp(string ip, int port) { //创建socket if (client == null) client = CreatClient(ip, port); //连接socket Connect(ip, port); //创建Socket连接监控任务 CreatSocketWatchTask(ip, port); //创建读取队列任务 CreatReadTask(); } #region 客户端事件 private static void Client_Error(object sender, ErrorEventArgs e) { Console.WriteLine($"客户端发生错误:{e.Exception}"); LogHelper.Info($"客户端发生错误:{e.Exception}"); ConnectionTask = null; ResetEvent.Reset(); } private static void Client_Connected(object sender, EventArgs e) { Console.WriteLine("客户端连接"); LogHelper.Info("客户端连接成功!"); //继续读取任务 ResetEvent.Set(); } private static void Client_Closed(object sender, EventArgs e) { Console.WriteLine("客户端关闭"); LogHelper.Info("客户端关闭"); ConnectionTask = null; //暂停读取日志 ResetEvent.Reset(); } #endregion private static EasyClient CreatClient(string ip, int port) { var client = new EasyClient(); client.Error += Client_Error; client.Connected += Client_Connected; client.Closed += Client_Closed; client.Initialize(new MyReceiveFilter(), (request) => { //Console.WriteLine(request.Key); //LogHelper.Info(request.Key + "," + request.Body); if (request.Key == "ClientGetInfo") { try { //{ "PKID":11216.0,"PICI":null,"JSONTEXT":"{\"ORDERID\":80008623.0,\"EMPLOYEEID\":null,\"PASSWORD\":null,\"USERID\":null,\"PRODUCTID\":1007.0,\"PRODUCTNAME\":\"A类\",\"SUBPRODUCTID\":10071.0,\"SUBPRODUCTNAME\":\"抄底先锋\",\"ORDERTYPE\":1.0,\"UPGRADEORDERIDS\":null,\"TOTALUPGRADEVALUE\":0.0,\"NEEDPAY\":99.0,\"OPENORDER\":0.0,\"SZZYORDERID\":1840301183010819.0,\"ORDERSTATUS\":\"180\",\"CTIME\":\"2019-01-18 10:38:01\",\"RETURNNEEDPAY\":99.0,\"CONTRACTCODE\":\"DN1840301183010819\",\"RET\":null,\"MSG\":null,\"RETURNDETAIL\":null,\"INNERUSERID\":0.0,\"OTIME\":null,\"ENDTIME\":null,\"RESID\":\"476632934773431575\",\"REQUESTSTATUS\":1.0,\"ISFINANCEPAY\":0.0,\"FINACEPAYDATE\":null,\"REMARK\":null,\"FINALPAY\":null,\"ORDERSTATUSNAME\":\"新订单\",\"OPERATETIME\":\"2019-01-18 10:38:01\",\"STIME\":null,\"ETIME\":null,\"SUSPENDCOMMENT\":null,\"UPDATEORDERTIME\":null,\"ISOPEN\":0.0,\"TEAMSERVE\":null,\"FCTEXT\":null,\"CUSTOMERUSERNAME\":null,\"ARRIVALPAY\":null,\"ARRIVALTIME\":null,\"SOURCE\":\"5\",\"CNAME\":null,\"OPENDAYS\":365.0,\"CUSTOMERCLASSIFY\":null,\"SALEDEPTID\":null,\"SOFTUSERNAME\":\"up0000022\",\"APPUSERNAME\":null,\"CHANNEL\":\"5001\",\"BOOKNUM\":1,\"OPENUSER\":null,\"ISPAYED\":0,\"PAYTYPE\":0,\"PRODUCTCODE\":\"UPCFJC_20181119180241\",\"BIGPRODUCTCODE\":\"UPCFJC_20181119A\",\"RISKCTRLSTATUS\":0,\"companycode\":\"DNZZ\",\"contractstatus\":1,\"contractctime\":null,\"qywxstatus\":null,\"qywxopendate\":null,\"activeproductcode\":null,\"activeproductext\":null}","FILENAME":null,"ISBATCH":0.0,"CTIME":"2019-01-18 10:49:37","BIDATATYPE":"Client_Order","DEPTCODE":"DNZZ","CANREPEATNUM":null} var body = request.Body.ToObject(); //var info = new SYNC_RECEIVE //{ // PKID = body.PKID, // CTIME = DateTime.Now, // PICI = body.PICI, // ISBATCH = body.ISBATCH, // DEPTCODE = body.DEPTCODE, // BIDATATYPE = body.BIDATATYPE, // JSONTEXT = body.JSONTEXT, // CANREPEATNUM = body.CANREPEATNUM //}; //LogHelper.Info("客户端写入数据:" + info.ToJson()); //if (body.JSONTEXT.Length >= 1000) //{ // body.JSONTEXT = body.JSONTEXT.PadRight(2001, ' '); // LogHelper.Info("jsontext:" + body.JSONTEXT.Length.ToString()); //} var info = new Domain.SYNC_RECEIVE(body.pkid, body.jsontext, body.bidatatype, body.deptcode); info.ISBATCH = body.isbatch; LogHelper.Info("客户端接收到数据:" + info.ToJson()); new SyncReceiveService().Insert(info); //如果接收数据没有错误,通知服务端接收数据成功 SendMessage("ClientGetOk", body, "/^", "^/"); } catch (DbEntityValidationException exception) { var errorMessages = exception.EntityValidationErrors .SelectMany(validationResult => validationResult.ValidationErrors) .Select(m => m.ErrorMessage); var fullErrorMessage = string.Join(", ", errorMessages); //记录日志 //Log.Error(fullErrorMessage); var exceptionMessage = string.Concat(exception.Message, " 验证异常消息是:", fullErrorMessage); LogHelper.Error(exceptionMessage); } catch (Exception ex) { LogHelper.Error("客户端接收数据出错ClientGetInfo:" + ex.ToString()); } } else if (request.Key == "ClientSendInfo") { try { //服务端确认消息接收成功,客户端移除待发数据 var body = request.Body.ToObject(); new SyncPushService().PushSucc(body); } catch (Exception ex) { LogHelper.Error("客户端接收数据出错ClientSendInfo:" + ex.ToString()); } } }); return client; } /// /// 创建连接任务 /// private static void CreatConnectTask() { } /// /// 创建读取任务 /// public static void CreatReadTask(bool isTcp = true) { if (ReadLogTask == null) { ReadLogTask = Task.Run(() => { while (true) { Thread.Sleep(5000); //连接成功了才开始读取 if (isTcp) ResetEvent.WaitOne(); //客户端发起请求获取数据(拉模式) SendMessage("ClientGet", new SYNC_PUSH() { deptcode = _clinetCode }, "/^", "^/"); try { var data = new SyncPushService().GetList(50); foreach (var item in data) { //客户端发送数据 SendMessage("ClientSend", item, "/^", "^/"); } } catch (Exception ex) { LogHelper.Error("客户端发送数据错误:" + ex.ToString()); } } }); } } /// /// 创建Socket监控任务 /// private static void CreatSocketWatchTask(string ip, int port) { if (WatchSocketTask == null) { WatchSocketTask = Task.Run(() => { while (true) { Thread.Sleep(10000); if (IsConnected) continue; Console.WriteLine("尝试重连!"); LogHelper.Info("尝试重连!"); if (ConnectionTask != null) LogHelper.Info("IsConnected:" + IsConnected + ",ConnectionTask:" + ConnectionTask.Result.ToString()); else LogHelper.Info("IsConnected:" + IsConnected + ",ConnectionTask:null"); Connect(ip, port); } }); } } private static void Connect(string ip, int port) { try { if (!IsConnected && ConnectionTask == null) { ConnectionTask = client?.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)); //if(await ConnectionTask) //{ // LogHelper.Info("客户端连接成功,准备发送数据"); // //SendMessage("XT", "abc", "/^", "^/"); //} //if (ConnectionTask.Result) //{ // LogHelper.Info("客户端连接成功,准备发送数据"); // //SendMessage("XT", "abc", null, "}}"); //} } } catch (Exception ex) { ConnectionTask = null; client = null; Console.WriteLine($"连接Socket服务失败:{ex}"); } } /// /// 通过Socket发送日志 /// /// 日志级别 /// Stocket信息头 /// Socket信息尾 /// 日志请求 private static void SendMessage(string key, SYNC_PUSH body, string beginMark, string endMark) { try { var pack = new StringPackageInfo(key, body.ToJson(), null); var msg = $"{beginMark}{pack.ToJson()}{endMark}"; //LogHelper.Info("客户端发送数据:" + msg); client.Send(Encoding.UTF8.GetBytes(msg)); } catch { //日志塞回队列 //MSHLogger.LoggingEvents.Add(loggingEvent); } } } }