TG.WXCRM.V4/Core.AliyuService/AliYun/OnscSharp.cs

251 lines
8.0 KiB
C#

using CRM.Core.BLL.Soft;
using CRM.Core.DTO.AliYunSub;
using ons;
using System;
using System.Configuration;
using System.Text;
using WX.CRM.Common;
namespace Core.AliyuService.AliYun
{
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
LogHelper.Info("获取的信息:");
LogHelper.Info(value.getBody());
try
{
string str = value.getBody();
AliYunSubInfo info = Newtonsoft.Json.JsonConvert.DeserializeObject<AliYunSubInfo>(str);
Soft_Userinfo_Sub_BL bl = new Soft_Userinfo_Sub_BL();
bool result = bl.UserInfoSubLog(new CRM.Core.Model.Entity.Soft_Userinfo_SubLog()
{
data = Newtonsoft.Json.JsonConvert.SerializeObject(info.data),
type = info.type
});//日志入库
bl.UserInfoSubInDb(info);//消息入库
if (result)
{
return ons.Action.CommitMessage;
}
else
{
return ons.Action.ReconsumeLater;//以后再消费
}
}
catch (Exception e)
{
LogHelper.Error(e.ToString());
LogHelper.Error("错误数据:" + value.getBody());
return ons.Action.ReconsumeLater;//以后再消费
}
}
}
public class MyLocalTransactionChecker : LocalTransactionChecker
{
public MyLocalTransactionChecker()
{
}
~MyLocalTransactionChecker()
{
}
public override TransactionStatus check(Message value)
{
return TransactionStatus.CommitTransaction;
}
}
public class MyMsgOrderListener : MessageOrderListener
{
public MyMsgOrderListener()
{
}
~MyMsgOrderListener()
{
}
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
//Console.WriteLine(value.getBody());
return ons.OrderAction.Success;
}
}
public class OnscSharp
{
private static Producer _producer;
private static PushConsumer _consumer;
private static OrderConsumer _orderconsumer;
private static OrderProducer _orderproducer;
private static MyMsgListener _listen = new MyMsgListener();
private static object s_SyncLock = new Object();
private static string Ons_Topic = ConfigurationManager.AppSettings["Ons_Topic"];
private static string Ons_ProducerID = ConfigurationManager.AppSettings["Ons_ProducerID"];
private static string Ons_AccessKey = ConfigurationManager.AppSettings["Ons_AccessKey"];
private static string Ons_SecretKey = ConfigurationManager.AppSettings["Ons_SecretKey"];
private static string Ons_ConsumerId = ConfigurationManager.AppSettings["Ons_ConsumerId"];
private static string Ons_NameSrv = ConfigurationManager.AppSettings["Ons_NameSrv"];
private static string Ons_Tag = ConfigurationManager.AppSettings["Ons_Tag"];
private static string Ons_LogPath = ConfigurationManager.AppSettings["Ons_LogPath"];
public static void SendMessage(byte[] msgBody)
{
Message msg = new Message(Ons_Topic, "", "");
msg.setBody(msgBody, msgBody.Length);
try
{
SendResultONS sendResult = _producer.send(msg);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
public static void SendMessage(string msgBody, String tag = "RegisterLog")
{
// Message msg = new Message(Ons_Topic, tag, msgBody);
Message msg = new Message(Ons_Topic, tag, msgBody);
msg.setKey(Guid.NewGuid().ToString());
try
{
SendResultONS sendResult = _producer.send(msg);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
public static void SendOrderMessage(string msgBody, String tag = "RegisterLog", String key = "test")
{
Message msg = new Message(Ons_Topic, tag, msgBody);
byte[] data = new byte[10];
msg.setBody(data, 10);
msg.setKey(Guid.NewGuid().ToString());
try
{
SendResultONS sendResult = _orderproducer.send(msg, key);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
/// <summary>
/// 定义监听
/// </summary>
public static void StartPushConsumer()
{
//MyMsgListener listen = new MyMsgListener();
if (_listen == null)
_listen = new MyMsgListener();
_consumer.subscribe(Ons_Topic, Ons_Tag, _listen);
_consumer.start();
Console.WriteLine("start cunsumer");
}
public static void StartOrderConsumer()
{
MyMsgOrderListener listen = new MyMsgOrderListener();
_orderconsumer.subscribe(Ons_Topic, Ons_Tag, listen);
_orderconsumer.start();
}
public static void shutdownPushConsumer()
{
_consumer.shutdown();
}
public static void shutdownOrderConsumer()
{
_orderconsumer.shutdown();
}
public static void StartProducer()
{
_producer.start();
}
public static void ShutdownProducer()
{
_producer.shutdown();
}
public static void StartOrderProducer()
{
_orderproducer.start();
}
public static void ShutdownOrderProducer()
{
_orderproducer.shutdown();
}
private static ONSFactoryProperty getFactoryProperty()
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_ConsumerId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, Ons_ProducerID);
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath);
return factoryInfo;
}
public static void CreatePushConsumer()
{
_consumer = ONSFactory.getInstance().createPushConsumer(getFactoryProperty());
}
public static void CreateProducer()
{
_producer = ONSFactory.getInstance().createProducer(getFactoryProperty());
}
public static void CreateOrderConsumer()
{
_orderconsumer = ONSFactory.getInstance().createOrderConsumer(getFactoryProperty());
}
public static void CreateOrderProducer()
{
_orderproducer = ONSFactory.getInstance().createOrderProducer(getFactoryProperty());
}
}
}