In the production environment a few days ago redis Create a connection failure , During the analysis ServiceStack.Redis We have a better understanding of connection creation and connection pooling mechanism . After problem analysis , Through the systematic of this paper, the learned knowledge points are sorted out .
From the connection pool RedisClient The process of
Look at the code
public IRedisClient GetClient()
{
RedisClient redisClient = null;
DateTime now = DateTime.Now;
for (; ; )
{
if (!this.deactiveClientQueue.TryPop(out redisClient))
{
if (this.redisClientSize >= this.maxRedisClient)
{
Thread.Sleep(3);
if (this.PoolTimeout != null && (DateTime.Now - now).TotalMilliseconds >= (double)this.PoolTimeout.Value)
{
break;
}
}
else
{
redisClient = this.CreateRedisClient();
if (redisClient != null)
{
goto Block_5;
}
}
}
else
{
if (!redisClient.HadExceptions)
{
goto Block_6;
}
List<RedisClient> obj = this.writeClients;
lock (obj)
{
this.writeClients.Remove(redisClient);
this.redisClientSize--;
}
RedisState.DisposeDeactivatedClient(redisClient);
}
}
bool flag2 = true;
if (flag2)
{
throw new TimeoutException("Redis Timeout expired. The timeout period elapsed prior to obtaining a connection from the pool. This may have occurred because all pooled connections were in use.");
}
return redisClient;
Block_5:
this.writeClients.Add(redisClient);
return redisClient;
Block_6:
redisClient.Active = true;
this.InitClient(redisClient);
return redisClient;
}The main body of this method is an endless loop , It mainly realizes these functions :
- this.deactiveClientQueue Stands for free Client aggregate , yes ConcurrentStack<RedisClient> Type of .
- When this.deactiveClientQueue can Pop Out redisClient when , The jump to Block_6 Branch : Mark redisClient.Active attribute , And implement this.InitClient(redisClient), And then redisClient The instance returned .
- When this.deactiveClientQueue No one can Pop The element is , First, execute Client Judgment of the upper limit of quantity this.redisClientSize >= this.maxRedisClient;
- If the upper limit is not reached , execute redisClient = this.CreateRedisClient();
- If it reaches the upper limit , Then sleep first 3 millisecond , Then determine whether the connection pool timeout is exceeded this.PoolTimeout, Unit millisecond . If you time out, just break Break the loop , If you don't time out, continue next time for loop .
The above process is to obtain... From the connection pool Client Main process of , among this.deactiveClientQueue amount to “Client pool ”. We need to pay attention to this.PoolTimeout It means the waiting time of the caller when the connection pool is exhausted .
The above process is represented as :

Create a new Client The process of :CreateRedisClient()
Source code is as follows :
Look at the code
private RedisClient CreateRedisClient()
{
if (this.redisClientSize >= this.maxRedisClient)
{
return null;
}
object obj = this.lckObj;
RedisClient result;
lock (obj)
{
if (this.redisClientSize >= this.maxRedisClient)
{
result = null;
}
else
{
Random random = new Random((int)DateTime.Now.Ticks);
RedisClient newClient = this.InitNewClient(this.RedisResolver.CreateMasterClient(random.Next(100)));
newClient.OnDispose += delegate()
{
if (!newClient.HadExceptions)
{
List<RedisClient> obj2 = this.writeClients;
lock (obj2)
{
if (!newClient.HadExceptions)
{
try
{
this.deactiveClientQueue.Push(newClient);
return;
}
catch
{
this.writeClients.Remove(newClient);
this.redisClientSize--;
RedisState.DisposeDeactivatedClient(newClient);
}
}
}
}
this.writeClients.Remove(newClient);
this.redisClientSize--;
RedisState.DisposeDeactivatedClient(newClient);
};
this.redisClientSize++;
result = newClient;
}
}
return result;
}Based on the consideration of concurrency , Create a new Client The process needs to increase the concurrency lock limit , namely lock (obj) It's about . At this point, if multiple threads enter CreateRedisClient() Method , Then only one thread actually executes , Other threads are blocking waiting for the lock to be released . This phenomenon can be through windbg Of syncblk、clrstack Command analysis view . The rest is to continue calling this.InitNewClient(this.RedisResolver.CreateMasterClient(random.Next(100))) Create objects , Also on newClient Of OnDispose Events add processing logic . It should be noted that here OnDispose Events are not the deconstruction of traditional meaning , Instead, the caller runs out of this RedisClient After the object , The operation used to recycle it to the connection pool , namely :newClient On the premise that the object has no exception , Put it Push To this.deactiveClientQueue In the stack , The connection pool is recycled and expanded here .
this.InitNewClient() Reading,
Here is a description of the newly created RedisClient Object initialization , Include Id、Active etc. , And continue to call this.InitClient() Further initialization .
this.RedisResolver.CreateMasterClient() Reading
this.redisResolver yes IRedisResolver Interface type , There are three implementations in the source code , The following screenshots . Here, take the common sentinel mode of production as an example to analyze .

RedisSentinelResolver Class corresponds to sentinel mode , The relevant operation source code is as follows :
Look at the code
public RedisClient CreateMasterClient(int desiredIndex)
{
return this.CreateRedisClient(this.GetReadWriteHost(desiredIndex), true);
}
public RedisEndpoint GetReadWriteHost(int desiredIndex)
{
return this.sentinel.GetMaster() ?? this.masters[desiredIndex % this.masters.Length];
}
public virtual RedisClient CreateRedisClient(RedisEndpoint config, bool master)
{
RedisClient result = this.ClientFactory(config);
if (master)
{
RedisServerRole redisServerRole = RedisServerRole.Unknown;
try
{
using (RedisClient redisClient = this.ClientFactory(config))
{
redisClient.ConnectTimeout = 5000;
redisClient.ReceiveTimeout = 5000;
redisServerRole = redisClient.GetServerRole();
if (redisServerRole == RedisServerRole.Master)
{
this.lastValidMasterFromSentinelAt = DateTime.UtcNow;
return result;
}
}
}
catch (Exception exception)
{
Interlocked.Increment(ref RedisState.TotalInvalidMasters);
using (RedisClient redisClient2 = this.ClientFactory(config))
{
redisClient2.ConnectTimeout = 5000;
redisClient2.ReceiveTimeout = 5000;
if (redisClient2.GetHostString() == this.lastInvalidMasterHost)
{
object obj = this.oLock;
lock (obj)
{
if (DateTime.UtcNow - this.lastValidMasterFromSentinelAt > this.sentinel.WaitBeforeForcingMasterFailover)
{
this.lastInvalidMasterHost = null;
this.lastValidMasterFromSentinelAt = DateTime.UtcNow;
RedisSentinelResolver.log.Error("Valid master was not found at '{0}' within '{1}'. Sending SENTINEL failover...".Fmt(redisClient2.GetHostString(), this.sentinel.WaitBeforeForcingMasterFailover), exception);
Interlocked.Increment(ref RedisState.TotalForcedMasterFailovers);
this.sentinel.ForceMasterFailover();
Thread.Sleep(this.sentinel.WaitBetweenFailedHosts);
redisServerRole = redisClient2.GetServerRole();
}
goto IL_16E;
}
}
this.lastInvalidMasterHost = redisClient2.GetHostString();
IL_16E:;
}
}
if (redisServerRole != RedisServerRole.Master && RedisConfig.VerifyMasterConnections)
{
try
{
Stopwatch stopwatch = Stopwatch.StartNew();
for (;;)
{
try
{
RedisEndpoint master2 = this.sentinel.GetMaster();
using (RedisClient redisClient3 = this.ClientFactory(master2))
{
redisClient3.ReceiveTimeout = 5000;
redisClient3.ConnectTimeout = this.sentinel.SentinelWorkerConnectTimeoutMs;
if (redisClient3.GetServerRole() == RedisServerRole.Master)
{
this.lastValidMasterFromSentinelAt = DateTime.UtcNow;
return this.ClientFactory(master2);
}
Interlocked.Increment(ref RedisState.TotalInvalidMasters);
}
}
catch
{
}
if (stopwatch.Elapsed > this.sentinel.MaxWaitBetweenFailedHosts)
{
break;
}
Thread.Sleep(this.sentinel.WaitBetweenFailedHosts);
}
throw new TimeoutException("Max Wait Between Sentinel Lookups Elapsed: {0}".Fmt(this.sentinel.MaxWaitBetweenFailedHosts.ToString()));
}
catch (Exception exception2)
{
RedisSentinelResolver.log.Error("Redis Master Host '{0}' is {1}. Resetting allHosts...".Fmt(config.GetHostString(), redisServerRole), exception2);
List<RedisEndpoint> list = new List<RedisEndpoint>();
List<RedisEndpoint> list2 = new List<RedisEndpoint>();
RedisClient redisClient4 = null;
foreach (RedisEndpoint redisEndpoint in this.allHosts)
{
try
{
using (RedisClient redisClient5 = this.ClientFactory(redisEndpoint))
{
redisClient5.ReceiveTimeout = 5000;
redisClient5.ConnectTimeout = RedisConfig.HostLookupTimeoutMs;
RedisServerRole serverRole = redisClient5.GetServerRole();
if (serverRole != RedisServerRole.Master)
{
if (serverRole == RedisServerRole.Slave)
{
list2.Add(redisEndpoint);
}
}
else
{
list.Add(redisEndpoint);
if (redisClient4 == null)
{
redisClient4 = this.ClientFactory(redisEndpoint);
}
}
}
}
catch
{
}
}
if (redisClient4 == null)
{
Interlocked.Increment(ref RedisState.TotalNoMastersFound);
string message = "No master found in: " + string.Join(", ", this.allHosts.Map((RedisEndpoint x) => x.GetHostString()));
RedisSentinelResolver.log.Error(message);
throw new Exception(message);
}
this.ResetMasters(list);
this.ResetSlaves(list2);
return redisClient4;
}
return result;
}
return result;
}
return result;
}among GetReadWriteHost() The logic of the method is : priority of use this.sentinel.GetMaster() Get the master node information . If GetMaster() Failure , From the existing master node set masters Select one at random to connect .
Then enter CreateRedisClient() In the way :
- First, through this.ClientFactory() Factory create object redisClient, Counting and... Are implemented inside the factory new RedisClient() operation . Not much .
- Then there's execution redisClient.GetServerRole(), The representative verifies to the server that the currently connected node is indeed Master role . If confirmed , Then it is returned directly to the caller .【 If an exception occurs in the process of sending a query request , And meet certain conditions , A failover request is initiated , namely this.sentinel.ForceMasterFailover();】
- If the current connection is not Master Nodes of a role , Call... Multiple times this.sentinel.GetMaster() Inquire about Master Node information and re instantiate RedisClient object ;
- If the timeout still fails to connect to Master node , You will enter catch Exception handling process , Traverse this.allHosts All nodes and update the corresponding node roles .
thus , Through the above process , Finally get master Node RedisClient object , And return it to the caller .
In the above process , The implementation of several other methods is more important and complex , Let's explain them one by one :
RedisSentinel Class GetMaster() Implementation principle analysis
The call is simple , But there are many implementation operations of this method ,RedisSentinel class Source code is as follows :
Look at the code
public RedisEndpoint GetMaster()
{
RedisSentinelWorker validSentinelWorker = this.GetValidSentinelWorker();
RedisSentinelWorker obj = validSentinelWorker;
RedisEndpoint result;
lock (obj)
{
string masterHost = validSentinelWorker.GetMasterHost(this.masterName);
if (this.ScanForOtherSentinels && DateTime.UtcNow - this.lastSentinelsRefresh > this.RefreshSentinelHostsAfter)
{
this.RefreshActiveSentinels();
}
result = ((masterHost != null) ? ((this.HostFilter != null) ? this.HostFilter(masterHost) : masterHost).ToRedisEndpoint(null) : null);
}
return result;
}
private RedisSentinelWorker GetValidSentinelWorker()
{
if (this.isDisposed)
{
throw new ObjectDisposedException(base.GetType().Name);
}
if (this.worker != null)
{
return this.worker;
}
RedisException innerException = null;
while (this.worker == null && this.ShouldRetry())
{
try
{
this.worker = this.GetNextSentinel();
this.GetSentinelInfo();
this.worker.BeginListeningForConfigurationChanges();
this.failures = 0;
return this.worker;
}
catch (RedisException ex)
{
if (this.OnWorkerError != null)
{
this.OnWorkerError(ex);
}
innerException = ex;
this.worker = null;
this.failures++;
Interlocked.Increment(ref RedisState.TotalFailedSentinelWorkers);
}
}
this.failures = 0;
Thread.Sleep(this.WaitBetweenFailedHosts);
throw new RedisException("No Redis Sentinels were available", innerException);
}
private RedisSentinelWorker GetNextSentinel()
{
object obj = this.oLock;
RedisSentinelWorker result;
lock (obj)
{
if (this.worker != null)
{
this.worker.Dispose();
this.worker = null;
}
int num = this.sentinelIndex + 1;
this.sentinelIndex = num;
if (num >= this.SentinelEndpoints.Length)
{
this.sentinelIndex = 0;
}
result = new RedisSentinelWorker(this, this.SentinelEndpoints[this.sentinelIndex])
{
OnSentinelError = new Action<Exception>(this.OnSentinelError)
};
}
return result;
}
private void OnSentinelError(Exception ex)
{
if (this.worker != null)
{
RedisSentinel.Log.Error("Error on existing SentinelWorker, reconnecting...");
if (this.OnWorkerError != null)
{
this.OnWorkerError(ex);
}
this.worker = this.GetNextSentinel();
this.worker.BeginListeningForConfigurationChanges();
}
}Through the first GetValidSentinelWorker() get RedisSentinelWorker object . The implementation of this method includes the control of retry mechanism , And finally through this.GetNextSentinel() Methods this.worker Field , namely RedisSentinelWorker Object instances .
and GetNextSentinel() Method contains a synchronization lock 、 call this.worker.Dispose()、 Randomly select sentinel nodes 、 Instantiation RedisSentinelWorker Object and so on .
The back is right validSentinelWorker To lock , And then go ahead and do it string masterHost = validSentinelWorker.GetMasterHost(this.masterName);
Corresponding RedisSentinelWorker The code for the class is as follows :
Look at the code
internal string GetMasterHost(string masterName)
{
string result;
try
{
result = this.GetMasterHostInternal(masterName);
}
catch (Exception obj)
{
if (this.OnSentinelError != null)
{
this.OnSentinelError(obj);
}
result = null;
}
return result;
}
private string GetMasterHostInternal(string masterName)
{
List<string> list = this.sentinelClient.SentinelGetMasterAddrByName(masterName);
if (list.Count <= 0)
{
return null;
}
return this.SanitizeMasterConfig(list);
}
public void Dispose()
{
new IDisposable[]
{
this.sentinelClient,
this.sentinePubSub
}.Dispose(RedisSentinelWorker.Log);
}Be careful GetMasterHost() In the way : When an exception occurs , Will trigger this Object's OnSentinelError event , As the name suggests, this event is used for subsequent processing of sentinel exceptions . Search through the source code , Only GetNextSentinel() The method is internal to OnSentinelError Event handler added --> namely RedisSentinel Internal private void OnSentinelError(Exception ex) Method . This method is internally responsible for printing logs and triggering events this.OnWorkerError after , Call again GetNextSentinel() Back to the this.worker Field assignment .
We need to pay attention to :Dispose() Methods actually call this.sentinelClient and this.sentinePubSub Log off of .
RedisNativeClient Related functions and implementation of class
Then called RedisNativeClient Class SentinelGetMasterAddrByName() Method :
The meaning of several methods in this class is : Pass the query instruction of the sentinel client through Socket Send to server , And format the returned result into the desired RedisEndpoint type .
In the method SendReceive() It also contains Socket Connect 、 retry 、 Frequency control 、 Timeout control and other mechanisms .
Look at the code
public List<string> SentinelGetMasterAddrByName(string masterName)
{
List<byte[]> list = new List<byte[]>
{
Commands.Sentinel,
Commands.GetMasterAddrByName,
masterName.ToUtf8Bytes()
};
return this.SendExpectMultiData(list.ToArray()).ToStringList();
}
protected byte[][] SendExpectMultiData(params byte[][] cmdWithBinaryArgs)
{
return this.SendReceive<byte[][]>(cmdWithBinaryArgs, new Func<byte[][]>(this.ReadMultiData), (this.Pipeline != null) ? new Action<Func<byte[][]>>(this.Pipeline.CompleteMultiBytesQueuedCommand) : null, false) ?? TypeConstants.EmptyByteArrayArray;
}
protected T SendReceive<T>(byte[][] cmdWithBinaryArgs, Func<T> fn, Action<Func<T>> completePipelineFn = null, bool sendWithoutRead = false)
{
int num = 0;
Exception ex = null;
DateTime utcNow = DateTime.UtcNow;
T t;
for (;;)
{
try
{
this.TryConnectIfNeeded();
if (this.socket == null)
{
throw new RedisRetryableException("Socket is not connected");
}
if (num == 0)
{
this.WriteCommandToSendBuffer(cmdWithBinaryArgs);
}
if (this.Pipeline == null)
{
this.FlushSendBuffer();
}
else if (!sendWithoutRead)
{
if (completePipelineFn == null)
{
throw new NotSupportedException("Pipeline is not supported.");
}
completePipelineFn(fn);
t = default(T);
t = t;
break;
}
T t2 = default(T);
if (fn != null)
{
t2 = fn();
}
if (this.Pipeline == null)
{
this.ResetSendBuffer();
}
if (num > 0)
{
Interlocked.Increment(ref RedisState.TotalRetrySuccess);
}
Interlocked.Increment(ref RedisState.TotalCommandsSent);
t = t2;
}
catch (Exception ex2)
{
RedisRetryableException ex3 = ex2 as RedisRetryableException;
if ((ex3 == null && ex2 is RedisException) || ex2 is LicenseException)
{
this.ResetSendBuffer();
throw;
}
Exception ex4 = ex3 ?? this.GetRetryableException(ex2);
if (ex4 == null)
{
throw this.CreateConnectionError(ex ?? ex2);
}
if (ex == null)
{
ex = ex4;
}
if (!(DateTime.UtcNow - utcNow < this.retryTimeout))
{
if (this.Pipeline == null)
{
this.ResetSendBuffer();
}
Interlocked.Increment(ref RedisState.TotalRetryTimedout);
throw this.CreateRetryTimeoutException(this.retryTimeout, ex);
}
Interlocked.Increment(ref RedisState.TotalRetryCount);
Thread.Sleep(RedisNativeClient.GetBackOffMultiplier(++num));
continue;
}
break;
}
return t;
}summary
This paper focuses on Redis Connection creation 、 Get as a clue , Yes SDK Have a deeper understanding of the internal implementation mechanism . On this basis , Analyze the production environment Redis SDK More handy in case of relevant faults .
ServiceStack.Redis Source code analysis ( Connection and connection pool ) More articles about
- ElasticSearch6.3.2 Node connection implementation of source code analysis
ElasticSearch6.3.2 Node connection implementation of source code analysis The main analysis of this article ES How to maintain the connection between nodes . Before we start , First pull ES Some experience of reading the source code : In the use of ES Encountered a problem in the process , Want to know more about , Available source ...
- [ Source code analysis ] OpenTracing Tracking Redis
[ Source code analysis ] OpenTracing Tracking Redis Catalog [ Source code analysis ] OpenTracing Tracking Redis 0x00 Abstract 0x01 The overall logical 1.1 Relevant concepts 1.2 Embedded point plug in 1.3 General logic ...
- Redis Study ——ae Event processing source code analysis
0. Preface Redis In the process of encapsulating events, we use Reactor Pattern , Added timed event handling .Redis Handling events is single process, single thread , And classic Reator Patterns are serial to events . That is, if there is an event blocking for too long, it will cause the whole ...
- SOFABolt Source code analysis
SOFABolt It's a lightweight . High performance . Easy to use telecommunication framework , be based on netty4.1, By ant financial services . This series of blogs will analyze SOFABolt Using posture of , Design and detailed source code analysis . It will be analyzed later SOFABo ...
- java Multithreading ---- Thread pool source code analysis
http://www.cnblogs.com/skywang12345/p/3509954.html Thread pool example Before analyzing the thread pool , Let's start with a simple thread pool example . 1 import java.util.c ...
- netty Source code analysis - Recycler Design of object pool
Catalog One . Why object pools are needed Two . Using posture 2.1 Create recycle objects on the same thread 2.2 Different threads create recycled objects 3、 ... and . data structure 3.1 Physical data structure diagram 3.2 Logical data structure diagram ( important ) Four . Source code analysis 4.2. Get... On the same thread ...
- Distributed cache technology Redis_Redis Cluster connection and underlying source code analysis
Catalog 1. Jedis Single point connection 2. Jedis be based on sentinel Connect Basic use Source code analysis This source code analysis is based on : jedis-3.0.1 1. Jedis Single point connection When it is a single point of service ,Java ...
- redis Source code analysis transactions Transaction( Next )
Next to the last one , This article analyzes redis In transaction operation multi,exec,discard Three core commands . Original address :http://www.jianshu.com/p/e22615586595 Before reading this article ...
- Redis learning zskiplist Jump table source code analysis
Definition of jump table Jump table is an ordered data structure , It maintains multiple pointers to other nodes in each node , So as to achieve the purpose of quickly accessing other nodes The structure of the jump table For the study of jump table, please refer to :https://www.jianshu.co ...
- 【 Reprint 】Redis 4.0 Automatic memory defragmentation (Active Defrag) Source code analysis
click Original link original link :https://blog.csdn.net/zouhuajianclever/article/details/90669409 It is recommended to read this blog before reading this article : Redis Source code ...
Random recommendation
- C Code implementation of acyclic single linked list
C Code implementation of acyclic single linked list , Go straight to the code . # include <stdio.h> # include <stdlib.h> # include <malloc.h> ...
- POJ 2031 Building a Space Station ( Minimum spanning tree )
Building a Space Station Time Limit: 1000MS Memory Limit: 30000K Total Submissions: 5173 Accepte ...
- CSS - When the text display is too long, it is omitted
One . add to - Hide properties when text is out of range overflow:hidden; Two . add to - Text ellipsis attribute exceeded text-overflow:ellipsis; 3、 ... and . add to - Text does not wrap property white-space: ...
- bash,bg,bind,break,builtin,caller,compgen, complete,compopt,continue,declare,dirs,disown,enable,eval,exec,expo
bash,bg,bind,break,builtin,caller,compgen, complete,compopt,continue,declare,dirs,disown,enable,eval ...
- Android System -- input system ( 7、 ... and )Reader_Dispatcher Thread start analysis
Android System -- input system ( 7、 ... and )Reader_Dispatcher Thread start analysis 1. Reader/Dispatcher The introduction of For input systems , Two threads will be created : Reader Threads ( Read event ) ...
- The first 12 Chapter MySQL Basic principles of extensible design
Preface : With the rapid increase of information , The development of hardware equipment has been slowly unable to keep up with the requirements of application system for processing power . here , How can we solve the performance requirements of the system ? There's only one way , That is through the transformation of the system architecture , Enhance the expansion ability of the system , By combination ...
- 1.SSM Integrate _ Look up the addition, deletion and modification of single table
The goal is : Additions and deletions Environmental Science :Maven+Eclipse+Tomcat7+JDK7 Mind mapping : Table structure Directory structure rely on <dependencies> <dependency> < ...
- JSJ— Classes and objects
When you are designing classes , Remember that objects are modeled by classes , You can look at it like this : —— Objects are known things —— The action that the object will perform The object itself is known as an instance variable , They represent the state of the object ( data ), And each object of this type will independently support ...
- mac change launchpad Icon size
Set the number of icons per column to 8 defaults write com.apple.dock springboard-columns -int 8 Set the number of icons per line to 7 defaults write c ...
- js Object oriented design function class
This article only discusses how to use function stay javascript Implementation of an object-oriented design class . Well known ,javascript It does not implement a real class , such as protect such as function overloading . below ...

![[numpy] numpy's judgment on Nan value](/img/aa/dc75a86bbb9f5a235b1baf5f3495ff.png)





