TG.WXCRM.V4/DAL/Redis/RedisSettings.cs

299 lines
14 KiB
C#

using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using WX.CRM.Model.Redis;
namespace WX.CRM.DAL.Redis
{
public class RedisSettings
{
private static readonly Dictionary<int, RedisSettings> _settingList = null;
readonly ConfigurationOptions configuration;
readonly System.IO.TextWriter connectionMultiplexerLog;
ConnectionMultiplexer connection;
object connectionLock = new object();
public int Db { get; private set; }
public ConfigurationOptions ConfigurationOptions { get { return configuration; } }
public IRedisValueConverter ValueConverter { get; private set; }
static RedisSettings()
{
_settingList = new Dictionary<int, RedisSettings>();
var section = RedisConfigurationSection.GetSection();
foreach (var redisSetting in section.RedisSettings)
{
_settingList.Add(redisSetting.Db, new RedisSettings(redisSetting.ConnectionString, redisSetting.Db));
WX.CRM.Common.LogHelper.Info("【RedisSettings实例】RedisSettings()._settingList.kyes:" + string.Join(",", _settingList.Keys.ToArray()));
}
}
public static RedisSettings GetRedisSettings(RedisConfig config)
{
var conn = _settingList[Convert.ToInt32(config)];
if (conn == null)
throw new Exception("配置错误");
return conn;
}
// events
public Action<OpenConnectionEventArgs> OnConnectionOpen { private get; set; }
public Action<OpenConnectionFailedEventArgs> OnConnectionOpenFailed { private get; set; }
public Action<OpenConnectionFailedEventArgs> OnConnectAsyncFailed { private get; set; }
public Action<ConnectionMultiplexer, EndPointEventArgs> OnConfigurationChanged { private get; set; }
public Action<ConnectionMultiplexer, EndPointEventArgs> OnConfigurationChangedBroadcast { private get; set; }
public Action<ConnectionMultiplexer, ConnectionFailedEventArgs> OnConnectionFailed { private get; set; }
public Action<ConnectionMultiplexer, ConnectionFailedEventArgs> OnConnectionRestored { private get; set; }
public Action<ConnectionMultiplexer, RedisErrorEventArgs> OnErrorMessage { private get; set; }
public Action<ConnectionMultiplexer, HashSlotMovedEventArgs> OnHashSlotMoved { private get; set; }
public Action<ConnectionMultiplexer, InternalErrorEventArgs> OnInternalError { private get; set; }
//public RedisSettings(string connectionString, int db = 0, IRedisValueConverter converter = null, Func<ICommandTracer> tracerFactory = null, System.IO.TextWriter connectionMultiplexerLog = null)
// : this(ConfigurationOptions.Parse(connectionString), db, converter, tracerFactory, connectionMultiplexerLog)
//{
//}
//public RedisSettings(ConfigurationOptions configuration, int db = 0, IRedisValueConverter converter = null, Func<ICommandTracer> tracerFactory = null, System.IO.TextWriter connectionMultiplexerLog = null)
//{
// this.configuration = configuration;
// this.Db = db;
// this.ValueConverter = converter ?? new JsonRedisValueConverter();
// this.CommandTracerFactory = tracerFactory;
// this.connectionMultiplexerLog = connectionMultiplexerLog;
//}
private RedisSettings(string connectionString, int db = 0, System.IO.TextWriter connectionMultiplexerLog = null)
: this(ConfigurationOptions.Parse(connectionString), db, connectionMultiplexerLog)
{
}
private RedisSettings(ConfigurationOptions configuration, int db = 0, System.IO.TextWriter connectionMultiplexerLog = null)
{
this.configuration = configuration;
this.Db = db;
this.ValueConverter = new JsonRedisValueConverter();
this.connectionMultiplexerLog = connectionMultiplexerLog;
}
public ConnectionMultiplexer GetConnection()
{
if (connection == null || !connection.IsConnected)
{
lock (connectionLock)
{
if (connection != null && connection.IsConnected) return connection;
if (connection != null)
{
connection.Close(false);
connection.Dispose();
connection = null;
}
var tryCount = 0;
var allowRetry = false;
do
{
allowRetry = false;
try
{
var sw = Stopwatch.StartNew();
var innerSw = Stopwatch.StartNew();
try
{
// Sometimes ConnectionMultiplexer.Connect is failed and issue does not solved https://github.com/StackExchange/StackExchange.Redis/issues/42
// I've created manualy Connect and control timeout.
// I recommend set connectTimeout from 1000 to 5000. (configure your network latency)
var tcs = new System.Threading.Tasks.TaskCompletionSource<ConnectionMultiplexer>();
var connectThread = new Thread(_ =>
{
#region
try
{
var connTask = StackExchange.Redis.ConnectionMultiplexer.ConnectAsync(configuration, connectionMultiplexerLog)
.ContinueWith(x =>
{
innerSw.Stop();
if (x.IsCompleted)
{
try
{
if (!tcs.TrySetResult(x.Result))
{
// already faulted
x.Result.Close(false);
x.Result.Dispose();
}
}
catch (Exception ex)
{
var ev = OnConnectAsyncFailed;
if (ev != null)
{
ev(new OpenConnectionFailedEventArgs(configuration, innerSw.Elapsed, ex));
}
tcs.TrySetException(ex);
}
}
else if (x.IsFaulted)
{
var ev = OnConnectAsyncFailed;
if (ev != null)
{
ev(new OpenConnectionFailedEventArgs(configuration, innerSw.Elapsed, x.Exception));
}
}
});
if (!connTask.Wait(this.configuration.ConnectTimeout))
{
tcs.TrySetException(new TimeoutException("Redis Connect Timeout. Elapsed:" + sw.Elapsed.TotalMilliseconds + "ms"));
}
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
#endregion
});
connectThread.Start();
connection = tcs.Task.GetAwaiter().GetResult();
connection.IncludeDetailInExceptions = true;
sw.Stop();
}
catch (Exception ex)
{
#region
sw.Stop();
try
{
var ev = OnConnectionOpenFailed;
if (ev != null)
{
ev(new OpenConnectionFailedEventArgs(configuration, sw.Elapsed, ex));
}
throw;
}
finally
{
connection = null;
}
#endregion
}
#region events
try
{
var openEv = OnConnectionOpen;
if (openEv != null)
{
openEv(new OpenConnectionEventArgs(configuration, sw.Elapsed));
}
// attach events
connection.ConfigurationChanged += connection_ConfigurationChanged;
connection.ConfigurationChangedBroadcast += connection_ConfigurationChangedBroadcast;
connection.ConnectionFailed += connection_ConnectionFailed;
connection.ConnectionRestored += connection_ConnectionRestored;
connection.ErrorMessage += connection_ErrorMessage;
connection.HashSlotMoved += connection_HashSlotMoved;
connection.InternalError += connection_InternalError;
}
catch
{
connection = null;
throw;
}
#endregion
}
catch (TimeoutException)
{
if (tryCount < configuration.ConnectRetry)
{
tryCount++;
allowRetry = true;
}
}
} while (connection == null && allowRetry);
}
}
return connection;
}
void connection_InternalError(object sender, InternalErrorEventArgs e)
{
var ev = OnInternalError;
if (ev != null) ev(sender as ConnectionMultiplexer, e);
}
void connection_HashSlotMoved(object sender, HashSlotMovedEventArgs e)
{
var ev = OnHashSlotMoved;
if (ev != null) ev(sender as ConnectionMultiplexer, e);
}
void connection_ErrorMessage(object sender, RedisErrorEventArgs e)
{
var ev = OnErrorMessage;
if (ev != null) ev(sender as ConnectionMultiplexer, e);
}
void connection_ConnectionRestored(object sender, ConnectionFailedEventArgs e)
{
var ev = OnConnectionRestored;
if (ev != null) ev(sender as ConnectionMultiplexer, e);
}
void connection_ConnectionFailed(object sender, ConnectionFailedEventArgs e)
{
var ev = OnConnectionFailed;
if (ev != null) ev(sender as ConnectionMultiplexer, e);
}
void connection_ConfigurationChangedBroadcast(object sender, EndPointEventArgs e)
{
var ev = OnConfigurationChangedBroadcast;
if (ev != null) ev(sender as ConnectionMultiplexer, e);
}
void connection_ConfigurationChanged(object sender, EndPointEventArgs e)
{
var ev = OnConfigurationChanged;
if (ev != null) ev(sender as ConnectionMultiplexer, e);
}
public class OpenConnectionEventArgs : EventArgs
{
public ConfigurationOptions ConfigurationOption { get; private set; }
public TimeSpan Duration { get; private set; }
public OpenConnectionEventArgs(ConfigurationOptions configurationOptions, TimeSpan duration)
{
this.ConfigurationOption = configurationOptions;
this.Duration = duration;
}
}
public class OpenConnectionFailedEventArgs : EventArgs
{
public ConfigurationOptions ConfigurationOption { get; private set; }
public TimeSpan Duration { get; private set; }
public Exception Exception { get; private set; }
public OpenConnectionFailedEventArgs(ConfigurationOptions configurationOption, TimeSpan duration, Exception exception)
{
this.ConfigurationOption = configurationOption;
this.Duration = duration;
this.Exception = exception;
}
}
}
}