251 lines
8.0 KiB
C#
251 lines
8.0 KiB
C#
using CRM.Core.BLL.Soft;
|
|
using CRM.Core.DTO.AliYunSub;
|
|
using ons;
|
|
using System;
|
|
using System.Configuration;
|
|
using System.Text;
|
|
using WX.CRM.Common;
|
|
|
|
namespace Core.AliyuService.AliYun
|
|
{
|
|
public class MyMsgListener : MessageListener
|
|
{
|
|
public MyMsgListener()
|
|
{
|
|
}
|
|
|
|
~MyMsgListener()
|
|
{
|
|
}
|
|
|
|
public override ons.Action consume(Message value, ConsumeContext context)
|
|
{
|
|
LogHelper.Info("获取的信息:");
|
|
LogHelper.Info(value.getBody());
|
|
try
|
|
{
|
|
string str = value.getBody();
|
|
|
|
|
|
AliYunSubInfo info = Newtonsoft.Json.JsonConvert.DeserializeObject<AliYunSubInfo>(str);
|
|
Soft_Userinfo_Sub_BL bl = new Soft_Userinfo_Sub_BL();
|
|
bool result = bl.UserInfoSubLog(new CRM.Core.Model.Entity.Soft_Userinfo_SubLog()
|
|
{
|
|
data = Newtonsoft.Json.JsonConvert.SerializeObject(info.data),
|
|
type = info.type
|
|
});//日志入库
|
|
bl.UserInfoSubInDb(info);//消息入库
|
|
if (result)
|
|
{
|
|
return ons.Action.CommitMessage;
|
|
}
|
|
else
|
|
{
|
|
return ons.Action.ReconsumeLater;//以后再消费
|
|
}
|
|
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
LogHelper.Error(e.ToString());
|
|
LogHelper.Error("错误数据:" + value.getBody());
|
|
return ons.Action.ReconsumeLater;//以后再消费
|
|
}
|
|
|
|
}
|
|
}
|
|
public class MyLocalTransactionChecker : LocalTransactionChecker
|
|
{
|
|
public MyLocalTransactionChecker()
|
|
{
|
|
|
|
}
|
|
~MyLocalTransactionChecker()
|
|
{
|
|
|
|
}
|
|
public override TransactionStatus check(Message value)
|
|
{
|
|
return TransactionStatus.CommitTransaction;
|
|
}
|
|
|
|
}
|
|
|
|
|
|
public class MyMsgOrderListener : MessageOrderListener
|
|
{
|
|
public MyMsgOrderListener()
|
|
{
|
|
|
|
}
|
|
|
|
~MyMsgOrderListener()
|
|
{
|
|
}
|
|
|
|
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
|
|
{
|
|
Byte[] text = Encoding.Default.GetBytes(value.getBody());
|
|
Console.WriteLine(Encoding.UTF8.GetString(text));
|
|
//Console.WriteLine(value.getBody());
|
|
return ons.OrderAction.Success;
|
|
}
|
|
}
|
|
|
|
|
|
public class OnscSharp
|
|
{
|
|
private static Producer _producer;
|
|
private static PushConsumer _consumer;
|
|
private static OrderConsumer _orderconsumer;
|
|
private static OrderProducer _orderproducer;
|
|
private static MyMsgListener _listen = new MyMsgListener();
|
|
private static object s_SyncLock = new Object();
|
|
private static string Ons_Topic = ConfigurationManager.AppSettings["Ons_Topic"];
|
|
private static string Ons_ProducerID = ConfigurationManager.AppSettings["Ons_ProducerID"];
|
|
private static string Ons_AccessKey = ConfigurationManager.AppSettings["Ons_AccessKey"];
|
|
private static string Ons_SecretKey = ConfigurationManager.AppSettings["Ons_SecretKey"];
|
|
private static string Ons_ConsumerId = ConfigurationManager.AppSettings["Ons_ConsumerId"];
|
|
private static string Ons_NameSrv = ConfigurationManager.AppSettings["Ons_NameSrv"];
|
|
private static string Ons_Tag = ConfigurationManager.AppSettings["Ons_Tag"];
|
|
private static string Ons_LogPath = ConfigurationManager.AppSettings["Ons_LogPath"];
|
|
|
|
public static void SendMessage(byte[] msgBody)
|
|
{
|
|
Message msg = new Message(Ons_Topic, "", "");
|
|
msg.setBody(msgBody, msgBody.Length);
|
|
try
|
|
{
|
|
SendResultONS sendResult = _producer.send(msg);
|
|
Console.WriteLine("send success {0}", sendResult.getMessageId());
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine("send failure{0}", ex.ToString());
|
|
}
|
|
}
|
|
public static void SendMessage(string msgBody, String tag = "RegisterLog")
|
|
{
|
|
// Message msg = new Message(Ons_Topic, tag, msgBody);
|
|
Message msg = new Message(Ons_Topic, tag, msgBody);
|
|
msg.setKey(Guid.NewGuid().ToString());
|
|
try
|
|
{
|
|
SendResultONS sendResult = _producer.send(msg);
|
|
Console.WriteLine("send success {0}", sendResult.getMessageId());
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine("send failure{0}", ex.ToString());
|
|
}
|
|
}
|
|
|
|
public static void SendOrderMessage(string msgBody, String tag = "RegisterLog", String key = "test")
|
|
{
|
|
Message msg = new Message(Ons_Topic, tag, msgBody);
|
|
byte[] data = new byte[10];
|
|
msg.setBody(data, 10);
|
|
msg.setKey(Guid.NewGuid().ToString());
|
|
try
|
|
{
|
|
SendResultONS sendResult = _orderproducer.send(msg, key);
|
|
Console.WriteLine("send success {0}", sendResult.getMessageId());
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine("send failure{0}", ex.ToString());
|
|
}
|
|
}
|
|
/// <summary>
|
|
/// 定义监听
|
|
/// </summary>
|
|
public static void StartPushConsumer()
|
|
{
|
|
//MyMsgListener listen = new MyMsgListener();
|
|
if (_listen == null)
|
|
_listen = new MyMsgListener();
|
|
_consumer.subscribe(Ons_Topic, Ons_Tag, _listen);
|
|
_consumer.start();
|
|
Console.WriteLine("start cunsumer");
|
|
}
|
|
|
|
public static void StartOrderConsumer()
|
|
{
|
|
MyMsgOrderListener listen = new MyMsgOrderListener();
|
|
_orderconsumer.subscribe(Ons_Topic, Ons_Tag, listen);
|
|
_orderconsumer.start();
|
|
}
|
|
|
|
public static void shutdownPushConsumer()
|
|
{
|
|
_consumer.shutdown();
|
|
}
|
|
|
|
public static void shutdownOrderConsumer()
|
|
{
|
|
_orderconsumer.shutdown();
|
|
}
|
|
|
|
public static void StartProducer()
|
|
{
|
|
_producer.start();
|
|
}
|
|
|
|
public static void ShutdownProducer()
|
|
{
|
|
_producer.shutdown();
|
|
}
|
|
|
|
|
|
public static void StartOrderProducer()
|
|
{
|
|
_orderproducer.start();
|
|
}
|
|
|
|
public static void ShutdownOrderProducer()
|
|
{
|
|
_orderproducer.shutdown();
|
|
}
|
|
|
|
private static ONSFactoryProperty getFactoryProperty()
|
|
{
|
|
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
|
|
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
|
|
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
|
|
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_ConsumerId);
|
|
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, Ons_ProducerID);
|
|
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
|
|
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
|
|
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
|
|
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath);
|
|
return factoryInfo;
|
|
}
|
|
|
|
public static void CreatePushConsumer()
|
|
{
|
|
|
|
_consumer = ONSFactory.getInstance().createPushConsumer(getFactoryProperty());
|
|
}
|
|
|
|
public static void CreateProducer()
|
|
{
|
|
|
|
_producer = ONSFactory.getInstance().createProducer(getFactoryProperty());
|
|
}
|
|
|
|
|
|
public static void CreateOrderConsumer()
|
|
{
|
|
|
|
_orderconsumer = ONSFactory.getInstance().createOrderConsumer(getFactoryProperty());
|
|
}
|
|
|
|
public static void CreateOrderProducer()
|
|
{
|
|
|
|
_orderproducer = ONSFactory.getInstance().createOrderProducer(getFactoryProperty());
|
|
}
|
|
}
|
|
|
|
}
|