using CRM.Core.BLL.Soft; using CRM.Core.DTO.AliYunSub; using ons; using System; using System.Configuration; using System.Text; using WX.CRM.Common; namespace Core.AliyuService.AliYun { public class MyMsgListener : MessageListener { public MyMsgListener() { } ~MyMsgListener() { } public override ons.Action consume(Message value, ConsumeContext context) { LogHelper.Info("获取的信息:"); LogHelper.Info(value.getBody()); try { string str = value.getBody(); AliYunSubInfo info = Newtonsoft.Json.JsonConvert.DeserializeObject(str); Soft_Userinfo_Sub_BL bl = new Soft_Userinfo_Sub_BL(); bool result = bl.UserInfoSubLog(new CRM.Core.Model.Entity.Soft_Userinfo_SubLog() { data = Newtonsoft.Json.JsonConvert.SerializeObject(info.data), type = info.type });//日志入库 bl.UserInfoSubInDb(info);//消息入库 if (result) { return ons.Action.CommitMessage; } else { return ons.Action.ReconsumeLater;//以后再消费 } } catch (Exception e) { LogHelper.Error(e.ToString()); LogHelper.Error("错误数据:" + value.getBody()); return ons.Action.ReconsumeLater;//以后再消费 } } } public class MyLocalTransactionChecker : LocalTransactionChecker { public MyLocalTransactionChecker() { } ~MyLocalTransactionChecker() { } public override TransactionStatus check(Message value) { return TransactionStatus.CommitTransaction; } } public class MyMsgOrderListener : MessageOrderListener { public MyMsgOrderListener() { } ~MyMsgOrderListener() { } public override ons.OrderAction consume(Message value, ConsumeOrderContext context) { Byte[] text = Encoding.Default.GetBytes(value.getBody()); Console.WriteLine(Encoding.UTF8.GetString(text)); //Console.WriteLine(value.getBody()); return ons.OrderAction.Success; } } public class OnscSharp { private static Producer _producer; private static PushConsumer _consumer; private static OrderConsumer _orderconsumer; private static OrderProducer _orderproducer; private static MyMsgListener _listen = new MyMsgListener(); private static object s_SyncLock = new Object(); private static string Ons_Topic = ConfigurationManager.AppSettings["Ons_Topic"]; private static string Ons_ProducerID = ConfigurationManager.AppSettings["Ons_ProducerID"]; private static string Ons_AccessKey = ConfigurationManager.AppSettings["Ons_AccessKey"]; private static string Ons_SecretKey = ConfigurationManager.AppSettings["Ons_SecretKey"]; private static string Ons_ConsumerId = ConfigurationManager.AppSettings["Ons_ConsumerId"]; private static string Ons_NameSrv = ConfigurationManager.AppSettings["Ons_NameSrv"]; private static string Ons_Tag = ConfigurationManager.AppSettings["Ons_Tag"]; private static string Ons_LogPath = ConfigurationManager.AppSettings["Ons_LogPath"]; public static void SendMessage(byte[] msgBody) { Message msg = new Message(Ons_Topic, "", ""); msg.setBody(msgBody, msgBody.Length); try { SendResultONS sendResult = _producer.send(msg); Console.WriteLine("send success {0}", sendResult.getMessageId()); } catch (Exception ex) { Console.WriteLine("send failure{0}", ex.ToString()); } } public static void SendMessage(string msgBody, String tag = "RegisterLog") { // Message msg = new Message(Ons_Topic, tag, msgBody); Message msg = new Message(Ons_Topic, tag, msgBody); msg.setKey(Guid.NewGuid().ToString()); try { SendResultONS sendResult = _producer.send(msg); Console.WriteLine("send success {0}", sendResult.getMessageId()); } catch (Exception ex) { Console.WriteLine("send failure{0}", ex.ToString()); } } public static void SendOrderMessage(string msgBody, String tag = "RegisterLog", String key = "test") { Message msg = new Message(Ons_Topic, tag, msgBody); byte[] data = new byte[10]; msg.setBody(data, 10); msg.setKey(Guid.NewGuid().ToString()); try { SendResultONS sendResult = _orderproducer.send(msg, key); Console.WriteLine("send success {0}", sendResult.getMessageId()); } catch (Exception ex) { Console.WriteLine("send failure{0}", ex.ToString()); } } /// /// 定义监听 /// public static void StartPushConsumer() { //MyMsgListener listen = new MyMsgListener(); if (_listen == null) _listen = new MyMsgListener(); _consumer.subscribe(Ons_Topic, Ons_Tag, _listen); _consumer.start(); Console.WriteLine("start cunsumer"); } public static void StartOrderConsumer() { MyMsgOrderListener listen = new MyMsgOrderListener(); _orderconsumer.subscribe(Ons_Topic, Ons_Tag, listen); _orderconsumer.start(); } public static void shutdownPushConsumer() { _consumer.shutdown(); } public static void shutdownOrderConsumer() { _orderconsumer.shutdown(); } public static void StartProducer() { _producer.start(); } public static void ShutdownProducer() { _producer.shutdown(); } public static void StartOrderProducer() { _orderproducer.start(); } public static void ShutdownOrderProducer() { _orderproducer.shutdown(); } private static ONSFactoryProperty getFactoryProperty() { ONSFactoryProperty factoryInfo = new ONSFactoryProperty(); factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey); factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey); factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_ConsumerId); factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, Ons_ProducerID); factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic); factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv); factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING); factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath); return factoryInfo; } public static void CreatePushConsumer() { _consumer = ONSFactory.getInstance().createPushConsumer(getFactoryProperty()); } public static void CreateProducer() { _producer = ONSFactory.getInstance().createProducer(getFactoryProperty()); } public static void CreateOrderConsumer() { _orderconsumer = ONSFactory.getInstance().createOrderConsumer(getFactoryProperty()); } public static void CreateOrderProducer() { _orderproducer = ONSFactory.getInstance().createOrderProducer(getFactoryProperty()); } } }