redis 学习问题总结 | http://aperise.iteye.com/blog/2310639 |
ehcache memcached redis 缓存技术总结 | http://aperise.iteye.com/blog/2296219 |
redis-stat 离线安装 | http://aperise.iteye.com/blog/2310254 |
redis cluster 非ruby方式启动 | http://aperise.iteye.com/blog/2310254 |
redis-sentinel安装部署 | http://aperise.iteye.com/blog/2342693 |
spring-data-redis使用 | http://aperise.iteye.com/blog/2342615 |
redis客户端redisson实战 | http://aperise.iteye.com/blog/2396196 |
redisson-2.10.4源代码分析 | http://aperise.iteye.com/blog/2400528 |
tcmalloc jemalloc libc选择 |
1.RedissonClient一主两从部署时连接池组成
主从部署(1主2从):
redisson纯java操作代码如下:
1. Config config = new Config();// 创建配置
2. // 指定使用主从部署方式
3. //.setReadMode(ReadMode.SLAVE) 默认值SLAVE,读操作只在从节点进行
4. //.setSubscriptionMode(SubscriptionMode.SLAVE) 默认值SLAVE,订阅操作只在从节点进行
5. //.setMasterConnectionMinimumIdleSize(10) 默认值10,针对每个master节点初始化10个连接
6. //.setMasterConnectionPoolSize(64) 默认值64,针对每个master节点初始化10个连接,最大可以扩展至64个连接
7. //.setSlaveConnectionMinimumIdleSize(10) 默认值10,针对每个slave节点初始化10个连接
8. //.setSlaveConnectionPoolSize(64) 默认值,针对每个slave节点初始化10个连接,最大可以扩展至64个连接
9. //.setSubscriptionConnectionMinimumIdleSize(1) 默认值1,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接
10. //.setSubscriptionConnectionPoolSize(50) 默认值50,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接,最大可以扩展至50个连接
11. "redis://192.168.29.24:6379") // 设置redis主节点
12. "redis://192.168.29.24:7000") // 设置redis从节点
13. "redis://192.168.29.24:7001"); // 设置redis从节点
14. RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右)
上面代码执行完毕后,如果在redis服务端所在服务器执行以下linux命令:
1. #6379上建立了10个连接
2. netstat -ant |grep 6379|grep ESTABLISHED
3. #7000上建立了11个连接
4. netstat -ant |grep 7000|grep ESTABLISHED
5. #7001上建立了11个连接
6. netstat -ant |grep 7001|grep ESTABLISHED
你会发现redisson连接到redis服务端总计建立了32个连接,其中masterpool占据10个连接,slavepool占据20个连接,另外pubSubConnectionPool占据2个连接,连接池中池化对象分布如下图:
- MasterConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为只有一个master,所以上图创建了10个连接;
- MasterPubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为默认SubscriptionMode=SubscriptionMode.SLAVE,所以master上不会创建连接池,所以上图MasterPubSubConnectionPool里没有创建任何连接;
- SlaveConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为有两个slave,每个slave上图创建了10个连接,总计创建了20个连接;
- PubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为有两个slave,每个slave上图创建了1个连接,总计创建了2个连接。
哪里初始化的?如何初始化的?读操作和写操作如何进行的?这就是今天要解答的问题,要解答这些问题最好还是查看redisson的源码。
2.Redisson初始化连接池源码分析
2.1 Redisson.java
RedissonClient.java是一个接口类,它的实现类是Redisson.java,对于Redisson.java的介绍先以一张Redisson的4大组件关系图开始,如下图:
对Redisson.java的代码注释如下:
1. /**
2. * 根据配置Config创建redisson操作类RedissonClient
3. * @param config for Redisson
4. * @return Redisson instance
5. */
6. public static
7. //调用构造方法
8. new
9. if
10. redisson.enableRedissonReferenceSupport();
11. }
12. return
13. }
14.
15. /**
16. * Redisson构造方法
17. * @param config for Redisson
18. * @return Redisson instance
19. */
20. protected
21. //赋值变量config
22. this.config = config;
23. //产生一份对于传入config的备份
24. new
25.
26. //根据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化
27. connectionManager = ConfigSupport.createConnectionManager(configCopy);
28. //连接池对象回收调度器
29. new
30. //Redisson的对象编码类
31. codecProvider = configCopy.getCodecProvider();
32. //Redisson的ResolverProvider,默认为org.redisson.liveobject.provider.DefaultResolverProvider
33. resolverProvider = configCopy.getResolverProvider();
34. }
其中与连接池相关的就是ConnectionManager,ConnectionManager的初始化转交工具类ConfigSupport.java进行,ConfigSupport.java会根据部署方式(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)的不同而分别进行。
2.2 ConfigSupport.java
这里现将ConfigSupport.java创建ConnectionManager的核心代码注释如下:
1. /**
2. * 据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化
3. * @param configCopy for Redisson
4. * @return ConnectionManager instance
5. */
6. public static
7. if (configCopy.getMasterSlaveServersConfig() != null) {//配置configCopy类型为主从模式
8. validate(configCopy.getMasterSlaveServersConfig());
9. return new
10. else if (configCopy.getSingleServerConfig() != null) {//配置configCopy类型为单机模式
11. validate(configCopy.getSingleServerConfig());
12. return new
13. else if (configCopy.getSentinelServersConfig() != null) {//配置configCopy类型为哨兵模式
14. validate(configCopy.getSentinelServersConfig());
15. return new
16. else if (configCopy.getClusterServersConfig() != null) {//配置configCopy类型为集群模式
17. validate(configCopy.getClusterServersConfig());
18. return new
19. else if (configCopy.getElasticacheServersConfig() != null) {//配置configCopy类型为亚马逊云模式
20. validate(configCopy.getElasticacheServersConfig());
21. return new
22. else if (configCopy.getReplicatedServersConfig() != null) {//配置configCopy类型为微软云模式
23. validate(configCopy.getReplicatedServersConfig());
24. return new
25. else if (configCopy.getConnectionManager() != null) {//直接返回configCopy自带的默认ConnectionManager
26. return
27. else
28. throw new IllegalArgumentException("server(s) address(es) not defined!");
29. }
30. }
上面可以看到根据传入的配置Config.java的不同,会分别创建不同的ConnectionManager的实现类。
2.3 MasterSlaveConnectionManager.java
主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式),如下如所示:
这里以主从部署方式进行讲解,先通过一张图了解MasterSlaveConnectionManager的组成:
上图中最终要的组件要数MasterSlaveEntry,在后面即将进行介绍,这里注释MasterSlaveConnectionManager.java的核心代码如下:
1. /**
2. * MasterSlaveConnectionManager的构造方法
3. * @param cfg for MasterSlaveServersConfig
4. * @param config for Config
5. */
6. public
7. //调用构造方法
8. this(config);
9. //
10. initTimer(cfg);
11. this.config = cfg;
12. //初始化MasterSlaveEntry
13. initSingleEntry();
14. }
15. /**
16. * MasterSlaveConnectionManager的构造方法
17. * @param cfg for Config
18. */
19. public
20. //读取redisson的jar中的文件META-INF/MANIFEST.MF,打印出Bundle-Version对应的Redisson版本信息
21. Version.logVersion();
22. //EPOLL是linux的多路复用IO模型的增强版本,这里如果启用EPOLL,就让redisson底层netty使用EPOLL的方式,否则配置netty里的NIO非阻塞方式
23. if
24. if (cfg.getEventLoopGroup() == null) {
25. //使用linux IO非阻塞模型EPOLL
26. this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
27. else
28. this.group = cfg.getEventLoopGroup();
29. }
30. this.socketChannelClass = EpollSocketChannel.class;
31. else
32. if (cfg.getEventLoopGroup() == null) {
33. //使用linux IO非阻塞模型NIO
34. this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
35. else
36. this.group = cfg.getEventLoopGroup();
37. }
38. this.socketChannelClass = NioSocketChannel.class;
39. }
40. if (cfg.getExecutor() == null) {
41. //线程池大小,对于2U 2CPU 8cores/cpu,意思是有2块板子,每个板子上8个物理CPU,那么总计物理CPU个数为16
42. //对于linux有个超线程概念,意思是每个物理CPU可以虚拟出2个逻辑CPU,那么总计逻辑CPU个数为32
43. //这里Runtime.getRuntime().availableProcessors()取的是逻辑CPU的个数,所以这里线程池大小会是64
44. int threads = Runtime.getRuntime().availableProcessors() * 2;
45. if (cfg.getThreads() != 0) {
46. threads = cfg.getThreads();
47. }
48. new DefaultThreadFactory("redisson"));
49. else
50. executor = cfg.getExecutor();
51. }
52.
53. this.cfg = cfg;
54. this.codec = cfg.getCodec();
55. //一个可以获取异步执行任务返回值的回调对象,本质是对于java的Future的实现,监控MasterSlaveConnectionManager的shutdown进行一些必要的处理
56. this.shutdownPromise = newPromise();
57. //一个持有MasterSlaveConnectionManager的异步执行服务
58. this.commandExecutor = new CommandSyncService(this);
59. }
60. /**
61. * 初始化定时调度器
62. * @param config for MasterSlaveServersConfig
63. */
64. protected void
65. //读取超时时间配置信息
66. int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};
67. Arrays.sort(timeouts);
68. int minTimeout = timeouts[0];
69. //设置默认超时时间
70. if (minTimeout % 100 != 0) {
71. 100) / 2;
72. else if (minTimeout == 100) {
73. 50;
74. else
75. 100;
76. }
77. //创建定时调度器
78. new HashedWheelTimer(Executors.defaultThreadFactory(), minTimeout, TimeUnit.MILLISECONDS, 1024);
79.
80. // to avoid assertion error during timer.stop invocation
81. try
82. class.getDeclaredField("leak");
83. true);
84. null);
85. catch
86. throw new
87. }
88. //检测MasterSlaveConnectionManager的空闲连接的监视器IdleConnectionWatcher,会清理不用的空闲的池中连接对象
89. new IdleConnectionWatcher(this, config);
90. }
91.
92. /**
93. * 创建MasterSlaveConnectionManager的MasterSlaveEntry
94. */
95. protected void
96. try
97. //主从模式下0~16383加入到集合slots
98. new
99. slots.add(singleSlotRange);
100.
101. MasterSlaveEntry entry;
102. if (config.checkSkipSlavesInit()) {//ReadMode不为MASTER并且SubscriptionMode不为MASTER才执行
103. new SingleEntry(slots, this, config);
104. RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
105. f.syncUninterruptibly();
106. else {//默认主从部署ReadMode=SLAVE,SubscriptionMode=SLAVE,这里会执行
107. entry = createMasterSlaveEntry(config, slots);
108. }
109. //将每个分片0~16383都指向创建的MasterSlaveEntry
110. for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
111. addEntry(slot, entry);
112. }
113. //DNS相关
114. if (config.getDnsMonitoringInterval() != -1) {
115. new DNSMonitor(this, Collections.singleton(config.getMasterAddress()),
116. config.getSlaveAddresses(), config.getDnsMonitoringInterval());
117. dnsMonitor.start();
118. }
119. catch
120. stopThreads();
121. throw
122. }
123. }
124. /**
125. * MasterSlaveEntry的构造方法
126. * @param config for MasterSlaveServersConfig
127. * @param slots for HashSet<ClusterSlotRange>
128. * @return MasterSlaveEntry
129. */
130. protected
131. //创建MasterSlaveEntry
132. new MasterSlaveEntry(slots, this, config);
133. //从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
134. List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());
135. for
136. future.syncUninterruptibly();
137. }
138. 主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化
139. RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
140. f.syncUninterruptibly();
141. return
142. }
上面个人觉得有两处代码值得我们特别关注,特别说明如下:
- entry.initSlaveBalancer:从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化。
- entry.setupMasterEntry:主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化。
2.4 MasterSlaveEntry.java
用一张图来解释MasterSlaveEntry的组件如下:
MasterSlaveEntry.java里正是我们一直在寻找着的四个连接池MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,这里注释MasterSlaveEntry.java的核心代码如下:
1. /**
2. * MasterSlaveEntry的构造方法
3. * @param slotRanges for Set<ClusterSlotRange>
4. * @param connectionManager for ConnectionManager
5. * @param config for MasterSlaveServersConfig
6. */
7. public
8. //主从模式下0~16383加入到集合slots
9. for
10. for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
11. slots.add(i);
12. }
13. }
14. //赋值MasterSlaveConnectionManager给connectionManager
15. this.connectionManager = connectionManager;
16. //赋值config
17. this.config = config;
18.
19. //创建LoadBalancerManager
20. //其实LoadBalancerManager里持有者从节点的SlaveConnectionPool和PubSubConnectionPool
21. //并且此时连接池里还没有初始化默认的最小连接数
22. new LoadBalancerManager(config, connectionManager, this);
23. //创建主节点连接池MasterConnectionPool,此时连接池里还没有初始化默认的最小连接数
24. new MasterConnectionPool(config, connectionManager, this);
25. //创建主节点连接池MasterPubSubConnectionPool,此时连接池里还没有初始化默认的最小连接数
26. new MasterPubSubConnectionPool(config, connectionManager, this);
27. }
28.
29. /**
30. * 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
31. * @param disconnectedNodes for Collection<URI>
32. * @return List<RFuture<Void>>
33. */
34. public
35. //这里freezeMasterAsSlave=true
36. boolean
37.
38. new
39. //把主节点当作从节点处理,因为默认ReadMode=ReadMode.SLAVE,所以这里不会添加针对该节点的连接池
40. RFuture<Void> f = addSlave(config.getMasterAddress(), freezeMasterAsSlave, NodeType.MASTER);
41. result.add(f);
42. //读取从节点的地址信息,然后针对每个从节点地址创建SlaveConnectionPool和PubSubConnectionPool
43. //SlaveConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】
44. //PubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】
45. for
46. f = addSlave(address, disconnectedNodes.contains(address), NodeType.SLAVE);
47. result.add(f);
48. }
49. return
50. }
51.
52. /**
53. * 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
54. * @param address for URI
55. * @param freezed for boolean
56. * @param nodeType for NodeType
57. * @return RFuture<Void>
58. */
59. private RFuture<Void> addSlave(URI address, boolean
60. //创建到从节点的连接RedisClient
61. RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
62. new
63. this.config.getSlaveConnectionMinimumIdleSize(),
64. this.config.getSlaveConnectionPoolSize(),
65. this.config.getSubscriptionConnectionMinimumIdleSize(),
66. this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);
67. //默认只有主节点当作从节点是会设置freezed=true
68. if
69. synchronized
70. entry.setFreezed(freezed);
71. entry.setFreezeReason(FreezeReason.SYSTEM);
72. }
73. }
74. //调用slaveBalancer来对从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
75. return
76. }
77.
78. /**
79. * 主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化
80. * @param address for URI
81. * @return RFuture<Void>
82. */
83. public
84. //创建到主节点的连接RedisClient
85. RedisClient client = connectionManager.createClient(NodeType.MASTER, address);
86. new
87. client,
88. config.getMasterConnectionMinimumIdleSize(),
89. config.getMasterConnectionPoolSize(),
90. config.getSubscriptionConnectionMinimumIdleSize(),
91. config.getSubscriptionConnectionPoolSize(),
92. connectionManager,
93. NodeType.MASTER);
94. //如果配置的SubscriptionMode=SubscriptionMode.MASTER就初始化MasterPubSubConnectionPool
95. //默认SubscriptionMode=SubscriptionMode.SLAVE,MasterPubSubConnectionPool这里不会初始化最小连接数
96. if
97. //MasterPubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】
98. RFuture<Void> f = writeConnectionHolder.add(masterEntry);
99. RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);
100. return
101. }
102. //调用MasterConnectionPool使得连接池MasterConnectionPool里的对象最小个数为10个
103. //MasterConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】
104. return
105. }
- writeConnectionHolder.add(masterEntry):其实writeConnectionHolder的类型就是MasterConnectionPool,这里是连接池MasterConnectionPool里添加对象
- pubSubConnectionHolder.add(masterEntry):其实pubSubConnectionHolder的类型是MasterPubSubConnectionPool,这里是连接池MasterPubSubConnectionPool添加对象
- slaveConnectionPool.add(entry):这里是连接池SlaveConnectionPool里添加对象
- pubSubConnectionPool.add(entry):这里是连接池PubSubConnectionPool里添加对象
2.5 LoadBalancerManager.java
图解LoadBalancerManager.java的内部组成如下:
LoadBalancerManager.java里面有着从节点相关的两个重要的连接池SlaveConnectionPool和PubSubConnectionPool,这里注释LoadBalancerManager.java的核心代码如下:
1. /**
2. * LoadBalancerManager的构造方法
3. * @param config for MasterSlaveServersConfig
4. * @param connectionManager for ConnectionManager
5. * @param entry for MasterSlaveEntry
6. */
7. public
8. //赋值connectionManager
9. this.connectionManager = connectionManager;
10. //创建连接池SlaveConnectionPool
11. new
12. //创建连接池PubSubConnectionPool
13. new
14. }
15. /**
16. * LoadBalancerManager的连接池SlaveConnectionPool和PubSubConnectionPool里池化对象添加方法,也即池中需要对象时,调用此方法添加
17. * @param entry for ClientConnectionsEntry
18. * @return RFuture<Void>
19. */
20. public RFuture<Void> add(final
21. final
22. //创建一个回调监听器,在池中对象创建失败时进行2次莫仍尝试
23. new
24. new AtomicInteger(2);
25. @Override
26. public void operationComplete(Future<Void> future) throws
27. if
28. result.tryFailure(future.cause());
29. return;
30. }
31. if (counter.decrementAndGet() == 0) {
32. String addr = entry.getClient().getIpAddr();
33. ip2Entry.put(addr, entry);
34. null);
35. }
36. }
37. };
38. //调用slaveConnectionPool添加RedisConnection对象到池中
39. RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
40. slaveFuture.addListener(listener);
41. //调用pubSubConnectionPool添加RedisPubSubConnection对象到池中
42. RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
43. pubSubFuture.addListener(listener);
44. return
45. }
我们已经了解了开篇提到的四个连接池是在哪里创建的。
3. Redisson的4类连接池
MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,它们的父类都是ConnectionPool,其类继承关系图如下:
通过上图我们了解了ConnectionPool类的继承关系图,再来一张图来了解下ConnectionPool.java类的组成,如下:
好了,再来图就有点啰嗦了,注释ConnectionPool.java代码如下:
1. abstract class ConnectionPool<T extends
2. private final
3. //维持着连接池对应的redis节点信息
4. //比如1主2从部署MasterConnectionPool里的entries只有一个主节点(192.168.29.24 6379)
5. //比如1主2从部署MasterPubSubConnectionPool里的entries为空,因为SubscriptionMode=SubscriptionMode.SLAVE
6. //比如1主2从部署SlaveConnectionPool里的entries有3个节点(192.168.29.24 6379,192.168.29.24 7000,192.168.29.24 7001,但是注意192.168.29.24 6379冻结属性freezed=true不会参与读操作除非2个从节点全部宕机才参与读操作)
7. //比如1主2从部署PubSubConnectionPool里的entries有2个节点(192.168.29.24 7000,192.168.29.24 7001),因为SubscriptionMode=SubscriptionMode.SLAVE,主节点不会加入
8. protected final List<ClientConnectionsEntry> entries = new
9. //持有者RedissonClient的组件ConnectionManager
10. final
11. //持有者RedissonClient的组件ConnectionManager里的MasterSlaveServersConfig
12. final
13. //持有者RedissonClient的组件ConnectionManager里的MasterSlaveEntry
14. final
15.
16. //构造函数
17. public
18. this.config = config;
19. this.masterSlaveEntry = masterSlaveEntry;
20. this.connectionManager = connectionManager;
21. }
22.
23. //连接池中需要增加对象时候调用此方法
24. public RFuture<Void> add(final
25. final
26. new
27. @Override
28. public void operationComplete(Future<Void> future) throws
29. entries.add(entry);
30. }
31. });
32. true);
33. return
34. }
35.
36. //初始化连接池中最小连接数
37. private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean
38. final int
39.
40. if (minimumIdleSize == 0
41. null);
42. return;
43. }
44.
45. final AtomicInteger initializedConnections = new
46. int startAmount = Math.min(50, minimumIdleSize);
47. final AtomicInteger requests = new
48. for (int i = 0; i < startAmount; i++) {
49. createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
50. }
51. }
52.
53. //创建连接对象到连接池中
54. private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final
55. final int minimumIdleSize, final
56.
57. if
58. int
59. new
60. "Unable to init enough connections amount! Only " + totalInitializedConnections + " from " + minimumIdleSize + " were initialized. Server: "
61. + entry.getClient().getAddr());
62. initPromise.tryFailure(cause);
63. return;
64. }
65.
66. new
67.
68. @Override
69. public void
70. RPromise<T> promise = connectionManager.newPromise();
71. createConnection(entry, promise);
72. new
73. @Override
74. public void operationComplete(Future<T> future) throws
75. if
76. T conn = future.getNow();
77.
78. releaseConnection(entry, conn);
79. }
80.
81. releaseConnection(entry);
82.
83. if
84. int
85. String errorMsg;
86. if (totalInitializedConnections == 0) {
87. "Unable to connect to Redis server: "
88. else
89. "Unable to init enough connections amount! Only "
90. " from " + minimumIdleSize + " were initialized. Redis server: "
91. }
92. new
93. initPromise.tryFailure(cause);
94. return;
95. }
96.
97. int
98. if (value == 0) {
99. "{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
100. if (!initPromise.trySuccess(null)) {
101. throw new
102. }
103. else if (value > 0
104. if
105. createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
106. }
107. }
108. }
109. });
110. }
111. });
112.
113. }
114.
115. //连接池中租借出连接对象
116. public
117. for (int j = entries.size() - 1; j >= 0; j--) {
118. final
119. if
120. && tryAcquireConnection(entry)) {
121. return
122. }
123. }
124.
125. new
126. new
127. for
128. if
129. freezed.add(entry.getClient().getAddr());
130. else
131. failedAttempts.add(entry.getClient().getAddr());
132. }
133. }
134.
135. new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
136. if
137. " Disconnected hosts: "
138. }
139. if
140. " Hosts disconnected due to `failedAttempts` limit reached: "
141. }
142.
143. new
144. return
145. }
146.
147. //连接池中租借出连接对象执行操作RedisCommand
148. public
149. if
150. tryAcquireConnection(entry)) {
151. return
152. }
153.
154. new
155. "Can't aquire connection to "
156. return
157. }
158.
159. //通过向redis服务端发送PING看是否返回PONG来检测连接
160. private void ping(RedisConnection c, final
161. RFuture<String> f = c.async(RedisCommands.PING);
162. f.addListener(pingListener);
163. }
164.
165. //归还连接对象到连接池
166. public void
167. if
168. connection.closeAsync();
169. else
170. releaseConnection(entry, connection);
171. }
172. releaseConnection(entry);
173. }
174.
175. //释放连接池中连接对象
176. protected void
177. entry.releaseConnection();
178. }
179.
180. //释放连接池中连接对象
181. protected void
182. entry.releaseConnection(conn);
183. }
184. }
用一张图来解释ConnectionPool干了些啥,如下图:
都到这里了,不介意再送一张图了解各种部署方式下的连接池分布了,如下图:
4.Redisson的读写操作句柄类RedissonObject
操作句柄类RedissonObject,RedissonObject根据不同的数据类型有不同的RedissonObject实现类,RedissonObject的类继承关系图如下:
例如想设置redis服务端的key=key的值value=123,你需要查询Redis命令和Redisson对象匹配列表,找到如下对应关系:
然后我们就知道调用代码这么写:
1. Config config = new Config();// 创建配置
2. // 指定使用主从部署方式
3. "redis://192.168.29.24:6379") // 设置redis主节点
4. "redis://192.168.29.24:7000") // 设置redis从节点
5. "redis://192.168.29.24:7001"); // 设置redis从节点
6. RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右)
7.
8. //任何Redisson操作首先需要获取对应的操作句柄
9. //RBucket是操作句柄之一,实现类是RedissonBucket
10. RBucket<String> rBucket = redissonClient.getBucket("key");
11.
12. //通过操作句柄rBucket进行读操作
13. rBucket.get();
14.
15. //通过操作句柄rBucket进行写操作
16. rBucket.set("123");
至于其它的redis命令对应的redisson操作对象,都可以官网的Redis命令和Redisson对象匹配列表 查到。
6.Redisson的读写操作源码分析
从一个读操作的代码作为入口分析代码,如下:
- //任何Redisson操作首先需要获取对应的操作句柄,RBucket是操作句柄之一,实现类是RedissonBucket
- RBucket<String> rBucket = redissonClient.getBucket("key");
- //通过操作句柄rBucket进行读操作
- rBucket.get();
继续追踪上面RBucket的get方法,如下:
上面我们看到不管是读操作还是写操作都转交 CommandAsyncExecutor进行处理,那么这里我们需要看一下 CommandAsyncExecutor.java里关于读写操作处理的核心代码,注释代码如下:
1. private
2. //通过公式CRC16.crc16(key.getBytes()) % MAX_SLOT
3. //计算出一个字符串key对应的分片在0~16383中哪个分片
4. int
5. //之前已经将0~16383每个分片对应到唯一的一个MasterSlaveEntry,这里取出来
6. MasterSlaveEntry entry = connectionManager.getEntry(slot);
7. //这里将MasterSlaveEntry包装成NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
8. return new
9. }
10. @Override
11. public
12. RPromise<R> mainPromise = connectionManager.newPromise();
13. //获取NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
14. NodeSource source = getNodeSource(key);
15. 调用异步执行方法async
16. true, source, codec, command, params, mainPromise, 0);
17. return
18. }
19. protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final
20. final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int
21. //操作被取消,那么直接返回
22. if
23. free(params);
24. return;
25. }
26. //连接管理器无法连接,释放参数所占资源,然后返回
27. if
28. free(params);
29. new RedissonShutdownException("Redisson is shutdown"));
30. return;
31. }
32.
33. final
34. if
35. try
36. for (int i = 0; i < params.length; i++) {
37. RedissonReference reference = RedissonObjectFactory.toReference(getConnectionManager().getCfg(), params[i]);
38. if (reference != null) {
39. params[i] = reference;
40. }
41. }
42. catch
43. connectionManager.getShutdownLatch().release();
44. free(params);
45. mainPromise.tryFailure(e);
46. return;
47. }
48. }
49.
50. //开始从connectionManager获取池中的连接
51. //这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作
52. final
53. if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行
54. connectionFuture = connectionManager.connectionReadOp(source, command);
55. else {//对于写操作默认readOnlyMode=false,这里会执行
56. connectionFuture = connectionManager.connectionWriteOp(source, command);
57. }
58.
59. //创建RPromise,用于操作失败时候重试
60. final
61. details.init(connectionFuture, attemptPromise, readOnlyMode, source, codec, command, params, mainPromise, attempt);
62. //创建FutureListener,监测外部请求是否已经取消了之前提交的读写操作,如果取消了,那么就让正在执行的读写操作停止
63. new
64. @Override
65. public void operationComplete(Future<R> future) throws
66. if (future.isCancelled() && connectionFuture.cancel(false)) {
67. "Connection obtaining canceled for {}", command);
68. details.getTimeout().cancel();
69. if (details.getAttemptPromise().cancel(false)) {
70. free(params);
71. }
72. }
73. }
74. };
75.
76. //创建TimerTask,用于操作失败后通过定时器进行操作重试
77. final TimerTask retryTimerTask = new
78. @Override
79. public void run(Timeout t) throws
80. if
81. return;
82. }
83. if (details.getConnectionFuture().cancel(false)) {
84. connectionManager.getShutdownLatch().release();
85. else
86. if
87. if (details.getWriteFuture() == null
88. if
89. if (details.getWriteFuture().cancel(false)) {
90. if (details.getException() == null) {
91. new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams()) + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
92. }
93. details.getAttemptPromise().tryFailure(details.getException());
94. }
95. return;
96. }
97. details.incAttempt();
98. this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
99. details.setTimeout(timeout);
100. return;
101. }
102.
103. if
104. return;
105. }
106. }
107. }
108. if
109. if (details.getAttemptPromise().cancel(false)) {
110. free(details);
111. AsyncDetails.release(details);
112. }
113. return;
114. }
115. if
116. if (details.getException() == null) {
117. new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")));
118. }
119. details.getAttemptPromise().tryFailure(details.getException());
120. return;
121. }
122. if (!details.getAttemptPromise().cancel(false)) {
123. return;
124. }
125. int count = details.getAttempt() + 1;
126. if
127. "attempt {} for command {} and params {}",
128. count, details.getCommand(), Arrays.toString(details.getParams()));
129. }
130. details.removeMainPromiseListener();
131. async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count);
132. AsyncDetails.release(details);
133. }
134. };
135.
136. //配置对于读写操作的超时时间
137. Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
138. details.setTimeout(timeout);
139. details.setupMainPromiseListener(mainPromiseListener);
140.
141. //给connectionFuture增加监听事件,当从连接池中获取连接成功,成功的事件会被触发,通知这里执行后续读写动作
142. new
143. @Override
144. public void operationComplete(Future<RedisConnection> connFuture) throws
145. if (connFuture.isCancelled()) {//从池中获取连接被取消,直接返回
146. return;
147. }
148.
149. if (!connFuture.isSuccess()) {//从池中获取连接失败
150. connectionManager.getShutdownLatch().release();
151. details.setException(convertException(connectionFuture));
152. return;
153. }
154.
155. if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {//从池中获取连接失败,并且尝试了一定次数仍然失败,默认尝试次数为0
156. releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
157. return;
158. }
159.
160. //从池中获取连接成功,这里取出连接对象RedisConnection
161. final
162. //如果需要重定向,这里进行重定向
163. //重定向的情况有:集群模式对应的slot分布在其他节点,就需要进行重定向
164. if
165. new ArrayList<CommandData<?, ?>>(2);
166. RPromise<Void> promise = connectionManager.newPromise();
167. new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new
168. new
169. RPromise<Void> main = connectionManager.newPromise();
170. new
171. details.setWriteFuture(future);
172. else
173. if
174. "acquired connection for command {} and params {} from slot {} using node {}... {}",
175. details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);
176. }
177. //发送读写操作到RedisConnection,进行执行
178. new
179. details.setWriteFuture(future);
180. }
181. //对于写操作增加监听事件回调,对写操作是否成功,失败原因进行日志打印
182. new
183. @Override
184. public void operationComplete(ChannelFuture future) throws
185. checkWriteFuture(details, connection);
186. }
187. });
188. //返回RedisConnection连接到连接池
189. releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
190. }
191. });
192.
193. new
194. @Override
195. public void operationComplete(Future<R> future) throws
196. checkAttemptFuture(source, details, future);
197. }
198. });
199. }
上面的代码我用一张读写操作处理流程图总结如下:
至此,关于读写操作的源码讲解完毕。在上面的代码注释中,列出如下重点。
6.1 分片SLOT的计算公式
SLOT=CRC16.crc16(key.getBytes()) % MAX_SLOT
6.2 每个ConnectionPool持有的ClientConnectionsEntry对象冻结判断条件
一个节点被判断为冻结,必须同时满足以下条件:
- 该节点有slave节点,并且从节点个数大于0;
- 设置的配置ReadMode不为并且SubscriptionMode不为MASTER;
- 该节点的从节点至少有一个存活着,也即如果有从节点宕机,宕机的从节点的个数小于该节点总的从节点个数
6.3 读写负载图
7.Redisson的读写操作从连接池获取连接对象源码分析和Redisson里RedisClient使用netty源码分析
CommandAsyncExecutor.java里的如下代码获取连接对象:
1. //开始从connectionManager获取池中的连接
2. //这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作
3. final
4. if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行
5. connectionFuture = connectionManager.connectionReadOp(source, command);
6. } else {//对于写操作默认readOnlyMode=false,这里会执行
7. connectionFuture = connectionManager.connectionWriteOp(source, command);
8. }
上面读操作调用了 connectionManager.connectionReadOp从连接池获取连接对象,写操作调用了 connectionManager.connectionWriteOp从连接池获取连接对象,我们继续跟进 connectionManager关于connectionReadOp和connectionWriteOp的源代码,注释如下:
1. /**
2. * 读操作通过ConnectionManager从连接池获取连接对象
3. * @param source for NodeSource
4. * @param command for RedisCommand<?>
5. * @return RFuture<RedisConnection>
6. */
7. public
8. //这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
9. MasterSlaveEntry entry = source.getEntry();
10. if (entry == null && source.getSlot() != null) {//这里不会执行source里slot=null
11. entry = getEntry(source.getSlot());
12. }
13. if (source.getAddr() != null) {//这里不会执行source里addr=null
14. entry = getEntry(source.getAddr());
15. if (entry == null) {
16. for
17. if
18. entry = e;
19. break;
20. }
21. }
22. }
23. if (entry == null) {
24. new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
25. return
26. }
27.
28. return
29. }
30.
31. if (entry == null) {//这里不会执行source里entry不等于null
32. new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
33. return
34. }
35. //MasterSlaveEntry里从连接池获取连接对象
36. return
37. }
38. /**
39. * 写操作通过ConnectionManager从连接池获取连接对象
40. * @param source for NodeSource
41. * @param command for RedisCommand<?>
42. * @return RFuture<RedisConnection>
43. */
44. public
45. //这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
46. MasterSlaveEntry entry = source.getEntry();
47. if (entry == null) {
48. entry = getEntry(source);
49. }
50. if (entry == null) {//这里不会执行source里entry不等于null
51. new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
52. return
53. }
54. //MasterSlaveEntry里从连接池获取连接对象
55. return
56. }
我们看到上面调用 ConnectionManager从连接池获取连接对象,但是 ConnectionManager却将获取连接操作转交 MasterSlaveEntry处理,我们再一次回顾一下 MasterSlaveEntry的组成:
MasterSlaveEntry里持有中我们开篇所提到的 四个连接池,那么这里我们继续关注 MasterSlaveEntry.java的源代码.
1. /**
2. * 写操作从MasterConnectionPool连接池里获取连接对象
3. * @param command for RedisCommand<?>
4. * @return RFuture<RedisConnection>
5. */
6. public
7. //我们知道writeConnectionHolder的类型为MasterConnectionPool
8. //这里就是从MasterConnectionPool里获取连接对象
9. return
10. }
11.
12. /**
13. * 写操作从LoadBalancerManager里获取连接对象
14. * @param command for RedisCommand<?>
15. * @return RFuture<RedisConnection>
16. */
17. public
18. if
19. //我们知道默认ReadMode=ReadMode.SLAVE,所以对于读操作这里不会执行
20. return
21. }
22. //我们知道slaveBalancer里持有者SlaveConnectionPool和PubSubConnectionPool
23. //这里就是从SlaveConnectionPool里获取连接对象
24. return
25. }
似乎又绕回来了,最终的获取连接对象都转交到了从连接池 ConnectionPool里获取连接对象,注释 ConnectionPool里的获取连接对象代码如下:
1. /**
2. * 读写操作从ConnectionPool.java连接池里获取连接对象
3. * @param command for RedisCommand<?>
4. * @return RFuture<T>
5. */
6. public
7. for (int j = entries.size() - 1; j >= 0; j--) {
8. final
9. if
10. //遍历ConnectionPool里维持的ClientConnectionsEntry列表
11. //遍历的算法默认为RoundRobinLoadBalancer
12. //ClientConnectionsEntry里对应的redis节点为非冻结节点,也即freezed=false
13. return
14. }
15. }
16.
17. //记录失败重试信息
18. new
19. new
20. for
21. if
22. freezed.add(entry.getClient().getAddr());
23. else
24. failedAttempts.add(entry.getClient().getAddr());
25. }
26. }
27.
28. new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
29. if
30. " Disconnected hosts: "
31. }
32. if
33. " Hosts disconnected due to `failedAttempts` limit reached: "
34. }
35. //获取连接失败抛出异常
36. new
37. return
38. }
39.
40. /**
41. * 读写操作从ConnectionPool.java连接池里获取连接对象
42. * @param command for RedisCommand<?>
43. * @param entry for ClientConnectionsEntry
44. * @return RFuture<T>
45. */
46. private RFuture<T> acquireConnection(RedisCommand<?> command, final
47. //创建一个异步结果获取RPromise
48. final
49. //获取连接前首先将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1
50. //该操作成功后将调用这里的回调函数AcquireCallback<T>
51. new
52. @Override
53. public void
54. this);
55. //freeConnectionsCounter值减1成功,说明获取可以获取到连接
56. //这里才是真正获取连接的操作
57. connectTo(entry, result);
58. }
59.
60. @Override
61. public void operationComplete(Future<T> future) throws
62. this);
63. }
64. };
65. //异步结果获取RPromise绑定到上面的回调函数callback
66. result.addListener(callback);
67. //尝试将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1,如果成功就调用callback从连接池获取连接
68. acquireConnection(entry, callback);
69. //返回异步结果获取RPromise
70. return
71. }
72.
73. /**
74. * 真正从连接池中获取连接
75. * @param entry for ClientConnectionsEntry
76. * @param promise for RPromise<T>
77. */
78. private void
79. if
80. releaseConnection(entry);
81. return;
82. }
83. //从连接池中取出一个连接
84. T conn = poll(entry);
85. if (conn != null) {
86. if
87. promiseFailure(entry, promise, conn);
88. return;
89. }
90.
91. connectedSuccessful(entry, promise, conn);
92. return;
93. }
94. //如果仍然获取不到连接,可能连接池中连接对象都被租借了,这里开始创建一个新的连接对象放到连接池中
95. createConnection(entry, promise);
96. }
97.
98. /**
99. * 从连接池中获取连接
100. * @param entry for ClientConnectionsEntry
101. * @return T
102. */
103. protected
104. return
105. }
106.
107. /**
108. * 调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接
109. * @param entry for ClientConnectionsEntry
110. * @param promise for RPromise<T>
111. */
112. private void createConnection(final ClientConnectionsEntry entry, final
113. //调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接
114. RFuture<T> connFuture = connect(entry);
115. new
116. @Override
117. public void operationComplete(Future<T> future) throws
118. if
119. promiseFailure(entry, promise, future.cause());
120. return;
121. }
122.
123. T conn = future.getNow();
124. if
125. promiseFailure(entry, promise, conn);
126. return;
127. }
128.
129. connectedSuccessful(entry, promise, conn);
130. }
131. });
132. }
ConnectionPool.java里获取读写操作的连接,是遍历ConnectionPool里维持的ClientConnectionsEntry列表,找到一非冻结的ClientConnectionsEntry,然后调用ClientConnectionsEntry里的freeConnectionsCounter尝试将值减1,如果成功,说明连接池中可以获取到连接,那么就从ClientConnectionsEntry里获取一个连接出来,如果拿不到连接,会调用ClientConnectionsEntry创建一个新连接放置到连接池中,并返回此连接,这里回顾一下 ClientConnectionsEntry的组成图:
我们继续跟进 ClientConnectionsEntry.java的源代码,注释如下:
1. /**
2. * ClientConnectionsEntry里从freeConnections里获取一个连接并返回给读写操作使用
3. */
4. public
5. return
6. }
7.
8. /**
9. * ClientConnectionsEntry里新创建一个连接对象返回给读写操作使用
10. */
11. public
12. //调用RedisClient利用netty连接redis服务端,将返回的netty的outboundchannel包装成RedisConnection并返回
13. RFuture<RedisConnection> future = client.connectAsync();
14. new
15. @Override
16. public void operationComplete(Future<RedisConnection> future) throws
17. if
18. return;
19. }
20.
21. RedisConnection conn = future.getNow();
22. onConnect(conn);
23. "new connection created: {}", conn);
24. }
25. });
26. return
27. }
上面的代码说明如果 ClientConnectionsEntry
里的 freeConnections
有空闲连接,那么直接返回该连接,如果没有那么调用 RedisClient.connectAsync创建一个新的连接
,这里我继续注释一下 RedisClient.java
的源代码如下:
1. package
2.
3. import
4. import
5. import
6. import
7. import
8. import
9. import
10. import
11. import
12. import
13. import
14. import
15. import
16. import
17. import
18. import
19. import
20. import
21. import
22. import
23. import
24. import
25. import
26. import
27. import
28. import
29. import
30. import
31.
32. /**
33. * 使用java里的网络编程框架Netty连接redis服务端
34. * 作者: Nikita Koksharov
35. */
36. public class
37. private final Bootstrap bootstrap;//Netty的工具类Bootstrap,用于连接建立等作用
38. private final Bootstrap pubSubBootstrap;//Netty的工具类Bootstrap,用于连接建立等作用
39. private final InetSocketAddress addr;//socket连接的地址
40. //channels是netty提供的一个全局对象,里面记录着当前socket连接上的所有处于可用状态的连接channel
41. //channels会自动监测里面的channel,当channel断开时,会主动踢出该channel,永远保留当前可用的channel列表
42. private final ChannelGroup channels = new
43.
44. private ExecutorService executor;//REACOTR模型的java异步执行线程池
45. private final long commandTimeout;//超时时间
46. private Timer timer;//定时器
47. private boolean
48. private RedisClientConfig config;//redis连接配置信息
49.
50. //构造方法
51. public static
52. if (config.getTimer() == null) {
53. new
54. }
55. return new
56. }
57. //构造方法
58. private
59. this.config = config;
60. this.executor = config.getExecutor();
61. this.timer = config.getTimer();
62.
63. new
64.
65. bootstrap = createBootstrap(config, Type.PLAIN);
66. pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
67.
68. this.commandTimeout = config.getCommandTimeout();
69. }
70.
71. //java的网路编程框架Netty工具类Bootstrap初始化
72. private
73. new
74. .channel(config.getSocketChannelClass())
75. .group(config.getGroup())
76. .remoteAddress(addr);
77. //注册netty相关socket数据处理RedisChannelInitializer
78. new RedisChannelInitializer(bootstrap, config, this, channels, type));
79. //设置超时时间
80. bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
81. return
82. }
83.
84. //构造方法
85. @Deprecated
86. public
87. this(URIBuilder.create(address));
88. }
89.
90. //构造方法
91. @Deprecated
92. public
93. this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new
94. true;
95. }
96.
97. //构造方法
98. @Deprecated
99. public
100. this(timer, executor, group, address.getHost(), address.getPort());
101. }
102.
103. //构造方法
104. @Deprecated
105. public RedisClient(String host, int
106. this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000, 10000);
107. true;
108. }
109.
110. //构造方法
111. @Deprecated
112. public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, String host, int
113. this(timer, executor, group, NioSocketChannel.class, host, port, 10000, 10000);
114. }
115.
116. //构造方法
117. @Deprecated
118. public RedisClient(String host, int port, int connectTimeout, int
119. this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, connectTimeout, commandTimeout);
120. }
121.
122. //构造方法
123. @Deprecated
124. public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int
125. int connectTimeout, int
126. new
127. config.setTimer(timer).setExecutor(executor).setGroup(group).setSocketChannelClass(socketChannelClass)
128. .setAddress(host, port).setConnectTimeout(connectTimeout).setCommandTimeout(commandTimeout);
129.
130. this.config = config;
131. this.executor = config.getExecutor();
132. this.timer = config.getTimer();
133.
134. new
135.
136. //java的网路编程框架Netty工具类Bootstrap初始化
137. bootstrap = createBootstrap(config, Type.PLAIN);
138. pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
139.
140. this.commandTimeout = config.getCommandTimeout();
141. }
142.
143. //获取连接的IP地址
144. public
145. return addr.getAddress().getHostAddress() + ":"
146. }
147. //获取socket连接的地址
148. public
149. return
150. }
151. //获取超时时间
152. public long
153. return
154. }
155. //获取netty的线程池
156. public
157. return
158. }
159. //获取redis连接配置
160. public
161. return
162. }
163. //获取连接RedisConnection
164. public
165. try
166. return
167. catch
168. throw new RedisConnectionException("Unable to connect to: "
169. }
170. }
171. //启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection
172. public
173. final RPromise<RedisConnection> f = new
174. //netty连接redis服务端
175. ChannelFuture channelFuture = bootstrap.connect();
176. new
177. @Override
178. public void operationComplete(final ChannelFuture future) throws
179. if
180. //将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection
181. final
182. new
183. @Override
184. public void operationComplete(final Future<RedisConnection> future) throws
185. new
186. @Override
187. public void
188. if
189. if
190. c.closeAsync();
191. }
192. else
193. f.tryFailure(future.cause());
194. c.closeAsync();
195. }
196. }
197. });
198. }
199. });
200. else
201. new
202. public void
203. f.tryFailure(future.cause());
204. }
205. });
206. }
207. }
208. });
209. return
210. }
211. //获取订阅相关连接RedisPubSubConnection
212. public
213. try
214. return
215. catch
216. throw new RedisConnectionException("Unable to connect to: "
217. }
218. }
219.
220. //启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection
221. public
222. final RPromise<RedisPubSubConnection> f = new
223. //netty连接redis服务端
224. ChannelFuture channelFuture = pubSubBootstrap.connect();
225. new
226. @Override
227. public void operationComplete(final ChannelFuture future) throws
228. if
229. //将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection
230. final
231. new
232. @Override
233. public void operationComplete(final Future<RedisPubSubConnection> future) throws
234. new
235. @Override
236. public void
237. if
238. if
239. c.closeAsync();
240. }
241. else
242. f.tryFailure(future.cause());
243. c.closeAsync();
244. }
245. }
246. });
247. }
248. });
249. else
250. new
251. public void
252. f.tryFailure(future.cause());
253. }
254. });
255. }
256. }
257. });
258. return
259. }
260.
261. //关闭netty网络连接
262. public void
263. shutdownAsync().syncUninterruptibly();
264. if
265. timer.stop();
266. executor.shutdown();
267. try
268. 15, TimeUnit.SECONDS);
269. catch
270. Thread.currentThread().interrupt();
271. }
272. bootstrap.config().group().shutdownGracefully();
273.
274. }
275. }
276.
277. //异步关闭netty网络连接
278. public
279. for
280. RedisConnection connection = RedisConnection.getFrom(channel);
281. if (connection != null) {
282. true);
283. }
284. }
285. return
286. }
287.
288. @Override
289. public
290. return "[addr=" + addr + "]";
291. }
292. }
上面就是Redisson利用java网络编程框架netty连接redis的全过程
,如果你对 netty
比较熟悉,阅读上面的代码应该不是问题。