TG.WXCRM.V4/AliYunSub.Common/Program.cs

242 lines
7.5 KiB
C#

using ons;
using System;
using System.Text;
namespace AliYunSub.Common
{
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Console.WriteLine("consume:");
Console.WriteLine(value.getBody());
return ons.Action.CommitMessage;
}
}
public class MyLocalTransactionChecker : LocalTransactionChecker
{
public MyLocalTransactionChecker()
{
}
~MyLocalTransactionChecker()
{
}
public override TransactionStatus check(Message value)
{
return TransactionStatus.CommitTransaction;
}
}
public class MyMsgOrderListener : MessageOrderListener
{
public MyMsgOrderListener()
{
}
~MyMsgOrderListener()
{
}
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
//Console.WriteLine(value.getBody());
return ons.OrderAction.Success;
}
}
public class OnscSharp
{
private static Producer _producer;
private static PushConsumer _consumer;
private static OrderConsumer _orderconsumer;
private static OrderProducer _orderproducer;
private static MyMsgListener _listen;
private static object s_SyncLock = new Object();
private static string Ons_Topic = "TOP_test";//
private static string Ons_ProducerID = "PID_ivan";
private static string Ons_AccessKey = "LTAIx1788cERk3CX";//
private static string Ons_SecretKey = "BaHgY1ovuPk72U2QGQGoXxS1ZgQYkG";//
private static string Ons_ConsumerId = "GID_Tcp";
private static string Ons_NameSrv = "http://MQ_INST_1824023413705812_BXaOH55h.mq-internet-access.mq-internet.aliyuncs.com:80";
//private static string Ons_NameSrv = "http://1824023413705812.mqrest.cn-qingdao-public.aliyuncs.com";
public static void SendMessage(byte[] msgBody)
{
Message msg = new Message(Ons_Topic, "", "");
msg.setBody(msgBody, msgBody.Length);
try
{
SendResultONS sendResult = _producer.send(msg);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
public static void SendMessage(string msgBody, String tag = "RegisterLog")
{
// Message msg = new Message(Ons_Topic, tag, msgBody);
Message msg = new Message(Ons_Topic, tag, msgBody);
msg.setKey(Guid.NewGuid().ToString());
try
{
SendResultONS sendResult = _producer.send(msg);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
public static void SendOrderMessage(string msgBody, String tag = "RegisterLog", String key = "test")
{
Message msg = new Message(Ons_Topic, tag, msgBody);
byte[] data = new byte[10];
msg.setBody(data, 10);
msg.setKey(Guid.NewGuid().ToString());
try
{
SendResultONS sendResult = _orderproducer.send(msg, key);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
/// <summary>
/// 定义监听
/// </summary>
public static void StartPushConsumer()
{
MyMsgListener listen = new MyMsgListener();
_consumer.subscribe(Ons_Topic, "*", listen);
_consumer.start();
Console.WriteLine("start cunsumer");
}
public static void StartOrderConsumer()
{
MyMsgOrderListener listen = new MyMsgOrderListener();
_orderconsumer.subscribe(Ons_Topic, "*", listen);
_orderconsumer.start();
}
public static void shutdownPushConsumer()
{
_consumer.shutdown();
}
public static void shutdownOrderConsumer()
{
_orderconsumer.shutdown();
}
public static void StartProducer()
{
_producer.start();
}
public static void ShutdownProducer()
{
_producer.shutdown();
}
public static void StartOrderProducer()
{
_orderproducer.start();
}
public static void ShutdownOrderProducer()
{
_orderproducer.shutdown();
}
private static ONSFactoryProperty getFactoryProperty()
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_ConsumerId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, Ons_ProducerID);
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
return factoryInfo;
}
public static void CreatePushConsumer()
{
_consumer = ONSFactory.getInstance().createPushConsumer(getFactoryProperty());
}
public static void CreateProducer()
{
_producer = ONSFactory.getInstance().createProducer(getFactoryProperty());
}
public static void CreateOrderConsumer()
{
_orderconsumer = ONSFactory.getInstance().createOrderConsumer(getFactoryProperty());
}
public static void CreateOrderProducer()
{
_orderproducer = ONSFactory.getInstance().createOrderProducer(getFactoryProperty());
}
}
class Program
{
static void Main(string[] args)
{
OnscSharp.CreatePushConsumer();
OnscSharp.StartPushConsumer();
//OnscSharp.StartProducer();
//System.DateTime beforDt = System.DateTime.Now;
//byte[] data = Encoding.UTF8.GetBytes(" 万会觉得 哎哎啊啊啊 啊哎啊啊啊 cds ");
//// string body = Encoding.Default.GetString(data);
//// string body = " 万会觉得 哎哎啊啊啊 啊哎啊啊啊 cds ";
//for (int i = 0;i < 2; ++i)
//{
// OnscSharp.SendMessage(data);
//}
//System.DateTime endDt = System.DateTime.Now;
//System.TimeSpan ts = endDt.Subtract(beforDt);
//Console.WriteLine("per message:{0}ms.", ts.TotalMilliseconds / 10000);
Console.ReadKey();
//Thread.Sleep(1000 * 100);
OnscSharp.ShutdownProducer();
OnscSharp.shutdownPushConsumer();
}
}
}