封面来源:碧蓝航线 永夜幻光 活动CG

Redis 命令参考:Redis 命令参考

本文参考:尚硅谷超经典Redis教程,redis实战,阳哥版从入门到精通

1. Redis 事务

1.1 事务概述

什么是事务

可以一次执行多个命令,本质是一组命令的集合。一个事务中的所有命令都会序列化,按顺序地串行化执行而不会被其它命令插入,不许加塞。

Redis 支持事务,但只部分支持,并不像常见关系型数据库那样强一致性。

事务可以干嘛

一个队列中,一次性、顺序性、排他性的执行一系列命令。

1.2 怎么使用

在官网上 Transactions 内有这样一段话:

Redis事务Usage

常用命令

命令 描述
DISCARD 取消事务,放弃执行事务块内的所有命令。
EXEC 执行所有事务块内的命令。
MULTI 标记一个事务块的开始。
UNWATCH 取消 WATCH 命令对所有 key 的监视。
WATCH key [key …] 监视一个(或多个)key ,如果在事务执行之前这个(或这些)key 被其他命令所改动,那么事务将被打断。

正常执行

[root@cheny bin]# redis-server /Redis/redis_aof.conf
[root@cheny bin]# redis-cli -p 6379
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> EXEC
1) OK
2) OK
3) "v2"
4) OK

放弃事务

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> set k1 v11
QUEUED
127.0.0.1:6379> set k2 v22
QUEUED
127.0.0.1:6379> set k3 v33
QUEUED
127.0.0.1:6379> DISCARD
OK
127.0.0.1:6379> mget k1 k2 k3
1) "v1"
2) "v2"
3) "v3"

全体连坐

Redis事务全体连坐

这里的命令就错了,类似 Java 的编译异常,所以都不会执行。

冤头债主

Redis事务冤头债主

这里的命令没错,过程中没有报错,都入队了,但 k1 是 “aa”,不是数字不能加一,因此针对 k1 的错误不执行,但其他的操作会执行。类似 Java 的运行异常。

在 Redis 事务中还有 watch 监控,这部分内容较多且重要,单独提出来编写。

1.3 watch 监控

知识回顾

回顾一下行锁和表锁:

参考链接:深入理解数据库行锁与表锁

行锁:顾名思义,行锁就是一锁锁一行或者多行记录,MySQL 的 行锁是基于索引加载的,所以行锁是要加在索引响应的行上,即命中索引。行锁锁冲突概率低,并发性高,但是会有死锁的情况出现。

表锁:顾名思义,表锁就是一锁锁一整张表,在表被锁定期间,其他事务不能对该表进行操作,必须等当前表的锁被释放后才能进行操作。表锁响应的是非索引字段,即全表扫描,全表扫描时锁定整张表,SQL 语句可以通过执行计划看出扫描了多少条记录。表锁的锁冲突几率特别高,表锁不会出现死锁的情况。

再回顾一下悲观锁和乐观锁:

悲观锁(Pessimistic Lock),顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会 block 直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁、表锁等,读锁、写锁等,都是在做操作之前先上锁。

乐观锁(Optimistic Lock),顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。

乐观锁策略:提交版本必须大于记录当前版本才能执行更新。

CAS,即:Check And Set,意为检查并设置,运用了乐观锁的机制。

信用卡可用余额和欠款

初始化信用卡可用余额和欠款:

127.0.0.1:6379> set balance 100
OK
127.0.0.1:6379> set debt 0
OK
127.0.0.1:6379> KEYS *
1) "debt"
2) "balance"
127.0.0.1:6379> MGET balance debt
1) "100"
2) "0"

无加塞篡改,先监控再开启 MULTI,保证两笔金额变动在同一个事务内

127.0.0.1:6379> WATCH balance
OK
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> DECRBY balance 20
QUEUED
127.0.0.1:6379> INCRBY debt 20
QUEUED
127.0.0.1:6379> EXEC
1) (integer) 80
2) (integer) 20

有加塞篡改,监控了 key,如果 key 被修改,后面一个事务的执行将会失败

watch有篡改加塞

unwatch:

unwatch

一旦执行了 EXEC 之前加的监控锁都会被取消掉了(一次性)

小结

WATCH 指令,类似乐观锁,事务提交时,如果看 key 的值已被别的客户端改变,比如某个 list 已被别的客户端 push / pop 过了,整个事务队列都不会被执行。

通过 WATCH 命令在事务执行之前监控了多个 keys,倘若在 WATCH 之后有任何 key 的值发生了变化, EXEC 命令执行的事务都将被放弃,同时返回 Nullmulti-bulk 应答以通知调用者事务执行失败。

1.4 三阶段和三特性

三阶段

1、开启:以 MULTI 开始一个事务

2、入队:将多个命令入队到事务中,接到这些命令并不会立即执行,而是放到等待执行的事务队列里面

3、执行:由 EXEC 命令触发事务

三特性

1、单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

2、没有隔离级别的概念:队列中的命令在没有提交之前都不会实际的被执行,因为事务提交前任何指令都不会被实际执行, 也就不存在“事务内的查询要看到事务里的更新,在事务外查询不能看到”这个让人万分头痛的问题

3、不保证原子性:Redis 同一个事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

不遵循传统的 ACID 中的 AI(原子性和隔离性)。

2. 发布订阅

2.1 什么是发布订阅

进程间的一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis 有发布订阅的功能,但是在实际企业开发的时候并不会使用 Redis 作为消息中间件,本节做个了解即可。

订阅 / 发布消息图

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、client5 和 client1 之间的关系:

订阅channel1

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

channel1发布消息

2.2 常用命令

命令 描述
PSUBSCRIBE pattern [pattern …] 订阅一个或多个符合给定模式的频道。
PUBSUB subcommand [argument [argument …]] 查看订阅与发布系统状态。
PUBLISH channel message 将信息发送到指定的频道。
PUNSUBSCRIBE [pattern [pattern …]] 退订所有给定模式的频道。
SUBSCRIBE channel [channel …] 订阅给定的一个或多个频道的信息。
UNSUBSCRIBE [channel [channel …]] 指退订给定的频道。

2.3 相关案例

依次订阅多个频道

要模拟 Redis 的发布和订阅,需要在 Linux 上开启两个终端,并在两个终端上都启动 Redis 服务,打开 Redis 客户端。其中一个终端用于订阅并接收消息,另一个终端用于发送消息。

首先使一个终端订阅三个频道:

127.0.0.1:6379> SUBSCRIBE c1 c2 c3
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "c1"
3) (integer) 1
1) "subscribe"
2) "c2"
3) (integer) 2
1) "subscribe"
2) "c3"
3) (integer) 3
1) "message"

然后我们在另外一个终端发布消息:

127.0.0.1:6379> PUBLISH c2 hello-redis
(integer) 1

成功发布消息后,在订阅频道的终端就会收到消息,会出现如下显示:

127.0.0.1:6379> SUBSCRIBE c1 c2 c3
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "c1"
3) (integer) 1
1) "subscribe"
2) "c2"
3) (integer) 2
1) "subscribe"
2) "c3"
3) (integer) 3
1) "message"
2) "c2"
3) "hello-redis"

通配符订阅多个频道

Redis 还支持通配符 * 的方式订阅多个频道。还是和前面一样,打开两个终端,都启动 Redis 的客户端。

首先让一个终端使用通配符的方式订阅多个频道:

127.0.0.1:6379> PSUBSCRIBE new*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "new*"
3) (integer) 1

然后在另一个终端发送消息:

127.0.0.1:6379> PUBLISH new1 I-am-mofan
(integer) 1
127.0.0.1:6379> PUBLISH new666 mofan212.getee.io
(integer) 1

回到订阅频道的终端,可以成功接收消息:

127.0.0.1:6379> PSUBSCRIBE new*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "new*"
3) (integer) 1
1) "pmessage"
2) "new*"
3) "new1"
4) "I-am-mofan"
1) "pmessage"
2) "new*"
3) "new666"
4) "mofan212.getee.io"

3. Redis 的复制

3.1 什么是主从复制

Redis 官网介绍:Replication

主从复制(Master / Slaver),主机数据更新后根据配置和策略, 自动同步到备机的 master / slaver 机制。简单来说就是将一台 Redis 服务器的数据,复制到其他的 Redis 服务器。前者称为主节点(master / leader),后者称为从节点(slave / follower)。

数据的复制是单向的,只能由主节点到从节点。Master 以写为主,Slave 以读为主。

那这能做什么呢?

  • 读写分离
  • 容灾恢复
  • 负载均衡
  • 高可用基石

一般来说,在工程项目中不会仅仅只使用一台 Redis,因为:

1、一台 Redis 会发生单点故障,而且一台服务器处理所有的请求压力会很大

2、单个 Redis 服务器的容量有限,一般来说,单台 Redis 最大使用内存不应该超过 20G。

3.2 环境配置

1、配从(库)不配主(库)

2、从库配置命令:slaveof 主库IP 主库端口

  • 每次与 Master 断开之后,都需要重新连接,除非你配置进 Redis 的配置文件 redis.conf (具体位置:redis.conf 文件内搜索 REPLICATION,这也是在【Redis 基础 上篇】中没讲到的配置)
  • 如果需要查看信息,可以使用命令:info replication

修改配置文件细节操作

1、拷贝多个 redis.conf 文件,按 “redis[port].conf” 进行重命名。

进入【Redis 基础 上篇】中存放 Redis 配置文件的地方,对这份配置文件复制三份:

[root@cheny Redis]# cp redis.conf redis6379.conf
[root@cheny Redis]# cp redis.conf redis6380.conf
[root@cheny Redis]# cp redis.conf redis6381.conf
[root@cheny Redis]# ls -l
总用量 2616
-rw------- 1 mofan mofan 2247528 12月  3 21:40 redis-6.0.8.tar.gz
-rw-r--r-- 1 root  root    84643 12月  6 14:15 redis6379.conf
-rw-r--r-- 1 root  root    84643 12月  6 14:15 redis6380.conf
-rw-r--r-- 1 root  root    84643 12月  6 14:15 redis6381.conf
-rw-r--r-- 1 root  root    84644 12月  5 22:16 redis_aof.conf
-rw-r--r-- 1 root  root    84643 12月  5 22:19 redis.conf

2、然后对每个配置文件进行如下修改:

  • 开启 daemonize yes
  • pid 文件名字
  • 指定端口(Redis 6.0.8 版本中该项位于 NETWORK 中)
  • log 文件名字
  • dump.rdb 名字(Redis 6.0.8 版本中该项位于 SNAPSHOTTING 中)

其他配置项位于 GENERAL 中。

比如针对 redis6379.conf 文件,我们对这些配置项进行修改:

1
2
3
4
5
daemonize yes
pidfile /var/run/redis_6379.pid
port 6379
logfile "6379.log"
dbfilename dump6379.rdb

对于 redis6380.conf 和 redis6381.conf 来说,也是按照这种方式进行修改,把和端口号相关的配置按照文件名进行修改即可。

最后通过三个配置文件启动三个 Redis 服务,如:

1
redis-server /Redis/redis6379.conf

然后使用命令 ps -ef|grep redis 查看服务进程情况:

[root@cheny Redis]# ps -ef|grep redis
root      20815      1  0 14:46 ?        00:00:00 redis-server 127.0.0.1:6379
root      20992      1  0 14:47 ?        00:00:00 redis-server 127.0.0.1:6380
root      21069      1  0 14:48 ?        00:00:00 redis-server 127.0.0.1:6381
root      21084  20502  0 14:48 pts/0    00:00:00 grep --color=auto redis

启动三个服务后,会在路径(我这里的路径是 /usr/local/bin)下生成对应的日志文件。

3.3 一主二仆

启动 Redis 服务后,在 三个终端 里打开 Redis 客户端:

1
2
3
4
5
6
# 第一个终端
redis-cli -p 6379
# 第二个终端
redis-cli -p 6380
# 第三个终端
redis-cli -p 6381

启动客户端后,在三个终端内使用命令 KEYS * 查看 0 库是否为空,如果不为空,将其设置为空,执行命令 FLUSHDB

在三个终端内执行命令 INFO replication 查看角色信息,可以发现三个客户端的角色都是 Master 主节点,比如 6380 端口下:

默认角色信息

我们需要将 6379 设置为 Master 主机(默认),将 6380 和 6381 设置为 Slave 从机。

先向 6379 中插入一些数据:

27.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> set k2 v2
OK
127.0.0.1:6379> set k3 v3
OK
127.0.0.1:6379> KEYS *
1) "k3"
2) "k2"
3) "k1"

将 6380 设置为从机(跟着 6379 混):

127.0.0.1:6380> SLAVEOF 127.0.0.1 6379
OK

将 6381 设置为从机(跟着 6379 混):

127.0.0.1:6381> SLAVEOF 127.0.0.1 6379
OK

在 6379 主机中执行命令 SET k4 v4 再插入一条数据。

在 6380 和 6381 从机中执行命令 GET k4 尝试获取数据,看看能够获取数据。

6380 获取数据:

127.0.0.1:6380> get k4
"v4"

6381 获取数据:

127.0.0.1:6381> get k4
"v4"

可以看到,两个从机都获取到了数据。

问题一

那么问题来了,我们是在插入了 k1、k2 和 k3 后,才让 6380 和 6381 跟着主机 6379 混,两个从机都能获得到最后插入的数据 k4,那么从机能够获取到 k1、k2 和 k3 吗?我们来尝试一下。

从机 6380 获取 k1:

127.0.0.1:6380> get k1
"v1"

从机 6381 获取 k3:

127.0.0.1:6381> get k3
"v3"

可以看到从机能够获取到所有数据,也就是说从机会将主机的数据从头撸到尾。

问题二

在主机 6379 中再添加一个值 SET k5 v5,在从机 6380 和 6381 中对值进行修改,能够成功吗?

主机添加数据:

127.0.0.1:6379> SET k5 v5
OK

从机 6380 修改值:

127.0.0.1:6380> set k5 55
(error) READONLY You can't write against a read only replica.

从机 6381 修改值:

127.0.0.1:6381> set k5 555
(error) READONLY You can't write against a read only replica.

可以看到从机 默认 是无法添加或修改值的。

主机可以设置值,从机可以读取到,但是从机 默认 不能写值。

问题三

如果主机 6379 宕掉,从机数据会受影响吗?从机会上位变成主机吗?

看到主机 6379 数据,并手动模拟其宕掉:

127.0.0.1:6379> KEYS *
1) "k4"
2) "k1"
3) "k2"
4) "k5"
5) "k3"
127.0.0.1:6379> SHUTDOWN
not connected> exit

从机 6380 查看数据信息,并使用命令 INFO replication 查看从机信息:

127.0.0.1:6380> KEYS *
1) "k3"
2) "k1"
3) "k4"
4) "k2"
5) "k5"
127.0.0.1:6380> INFO replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:down
master_last_io_seconds_ago:-1
master_sync_in_progress:0
slave_repl_offset:1775
master_link_down_since_seconds:59
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:01acd6b70486c8c3c9ccf15713b8568b625cb872
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:1775
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:1775

对从机 6381 来说,也是上面一样的结果。

可以看到从机数据不会受到影响,并且从机也不会上位变成主机。

那么问题又来了,这时候主机又活了,并执行命令 SET k6 v6,从机能够获取到 k6 吗?

主机活过来,并添加数据:

[root@cheny bin]# redis-server /Redis/redis6379.conf 
[root@cheny bin]# redis-cli -p 6379
127.0.0.1:6379> SET k6 v6
OK

从机 6380 获取 k6:

27.0.0.1:6380> get k6
"v6"

从机 6381 获取 k6:

127.0.0.1:6381> get k6
"v6"

可以看到,主机宕掉后,从机不会上位变成主机。主机又活了后,主从又连接上,如果主机添加数据,从机还能够获取到。

问题四

那如果一个从机宕掉了,会影响另外一个从机吗?

模拟 6380 从机宕掉:

127.0.0.1:6380> SHUTDOWN
not connected> exit

主机 6379 添加数据:

127.0.0.1:6379> SET k7 v7
OK

从机 6381 获取 k7:

127.0.0.1:6381> GET k7
"v7"

可以看到一个从机死了,不会影响另外一个从机。

那如果死了的从机又活了,它是继续连接主机,还是不会呢?

[root@cheny bin]# redis-server /Redis/redis6380.conf 
[root@cheny bin]# redis-cli -p 6380
127.0.0.1:6380> INFO replication
# Replication
role:master
connected_slaves:0
master_replid:329f88c3d94aee96c3fe66b9eae2cfc14848a6e6
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

可以看到 6380 活了后,不会连接上主机,自己默认的角色是 Master。

这也验证了我们最开始说的:从机与 Master 断开之后,都需要重新连接,除非你配置进 Redis 的配置文件 redis.conf(配置角色为从机和连接主机的信息)。

尝试 6380 从机获取 k7:

127.0.0.1:6380> GET k7
(nil)

很显然,无法获取到数据。让 6380 再次作为从机,连接主机为 6379,再看看能够获取到 k7:

127.0.0.1:6380> SLAVEOF 127.0.0.1 6379
OK
127.0.0.1:6380> GET k7
"v7"

重新连接主机后,从机 6380 又能获取到 k7 了。

3.4 薪火相传

在一主二仆中,Master 带了两个仆,当然还可以有多仆的情况,这个时候,Master 的压力就很大,带了很多仆。

那么我们可以使用“薪火相传”以去中心化,简单来说就是一个 Master 带几个仆,这些仆又来了几个仆,比如:

薪火相传

上一个 Slave 可以是下一个 Slave 的 Master,Slave 同样可以接收其他 Slaves 的连接和同步请求,那么该 Slave 作为了链条中下一个的 Master, 可以有效减轻 Master 的写压力(记住:奴隶的奴隶还是奴隶)。

中途变更转向:会清除之前的数据,重新建立拷贝最新的。

使用命令:slaveof 新主库IP 新主库端口

基本测试

让 6379 作为 Master,6380 作为从机,6381 作为 6380 的从机。

在一主二仆中,满足前两个,但 6381 是 6379 的从机,我们修改一下:

127.0.0.1:6381> SLAVEOF 127.0.0.1 6380
OK

使用 INFO replication 查看主机 6379 的信息:

127.0.0.1:6379> INFO replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6380,state=online,offset=2979,lag=0
master_replid:5743c894b43bcd99c2bed3b261d4d854f99d365c
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:2993
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:2993

可以看到 6379 的仆只有 6380 了。

在主机中使用命令 SET k8 v8 添加数据:

127.0.0.1:6379> KEYS *
1) "k3"
2) "k6"
3) "k2"
4) "k4"
5) "k5"
6) "k7"
7) "k1"
127.0.0.1:6379> SET k8 v8
OK

不用想,6380 从机是可以获取到 k8 的:

127.0.0.1:6380> get k8
"v8"

那么 6381 从机可以获取到吗?

127.0.0.1:6381> get k8
"v8"

可以看到从机 6381 也是可以获取到 k8。

这个时候从机 6380 的角色是什么呢?对主机 6379 来说,它是从机,但是对 6381 来说,它又是主机。执行命令 INFO replication 看看:

127.0.0.1:6380> INFO replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
master_last_io_seconds_ago:5
master_sync_in_progress:0
slave_repl_offset:3731
slave_priority:100
slave_read_only:1
connected_slaves:1
slave0:ip=127.0.0.1,port=6381,state=online,offset=3731,lag=1
master_replid:5743c894b43bcd99c2bed3b261d4d854f99d365c
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:3731
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1790
repl_backlog_histlen:1942

6380 的角色还是从机,翻身?不可能的!只不过可以从信息看到 6380 从机还连接了一个从机 6381。

3.5 反客为主

介绍前的准备

在介绍“反客为主”之前,我们先将 6381 设置成 6379 的从机:

127.0.0.1:6381> SLAVEOF 127.0.0.1 6379
OK

查看 6379 主机的信息:

127.0.0.1:6379> INFO replication
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6380,state=online,offset=3885,lag=0
slave1:ip=127.0.0.1,port=6381,state=online,offset=3885,lag=0
master_replid:5743c894b43bcd99c2bed3b261d4d854f99d365c
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:3885
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:3885

嗯!没问题,我们继续。

反客为主

什么叫反客为主?

使当前数据库停止与其他数据库的同步,转成主数据库。

在一主二仆中我们知道,如果主机宕机,两个仆还是原地待命,主机活过来,两个仆重新连上。如果主机活不过来了,我们可以让一个从机变成主机,这个时候其余的从机链接到这台机器。

在没有主机的情况下,对某一个从机使用命令 SLAVEOF NO ONE,就可以让这个从机变成主机。

对一个从属服务器执行命令 SLAVEOF NO ONE 将使得这个从属服务器关闭复制功能,并从从属服务器转变回主服务器,原来同步所得的数据集不会被丢弃。

让主机 6379 宕机:

127.0.0.1:6379> SHUTDOWN
not connected> exit

从机 6380 谋权篡位:

127.0.0.1:6380> SLAVEOF no one
OK
127.0.0.1:6380> INFO replication
# Replication
role:master
connected_slaves:0
master_replid:9fbaa89cfb388bf036b4cd489023cfeb0e8a5fb9
master_replid2:5743c894b43bcd99c2bed3b261d4d854f99d365c
master_repl_offset:4627
second_repl_offset:4628
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1790
repl_backlog_histlen:2838

可以看到 6380 变成了主机。

看看 6381 的信息:

127.0.0.1:6381> INFO replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:down
master_last_io_seconds_ago:-1
master_sync_in_progress:0
slave_repl_offset:4627
master_link_down_since_seconds:204
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:5743c894b43bcd99c2bed3b261d4d854f99d365c
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:4627
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:4627

我们在 6380 中添加新数据:

127.0.0.1:6380> SET k9 v9
OK

那 6381 能够获取到 k9 吗?

127.0.0.1:6381> GET k9
(nil)

看样子没有连接是不行的,6381 是不能获取到数据的。

让 6381 作为 6380 的从机,然后再获取 k9 试试:

127.0.0.1:6381> SLAVEOF 127.0.0.1 6380
OK
127.0.0.1:6381> GET k9
"v9"

成功获取到数据!

那这时候原来的主机 6379 诈尸了,又活过来了呢?

[root@cheny bin]# redis-server /Redis/redis6379.conf 
[root@cheny bin]# redis-cli -p 6379
127.0.0.1:6379> get k9
(nil)
127.0.0.1:6379> INFO replication
# Replication
role:master
connected_slaves:0
master_replid:1729c5bcd52e700756dc2389081be3ea776a489c
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

就算 6379 又活过来也不行了,6379 已经是光杆司令了。6380 和 6381 是一套体系,6379 又是另外一套体系(果然只是诈尸)。

如果将 6380 又设置成 6379 的主机,这时候在 6379 中可以获取到 6380 中的数据,这个时候就相当于是“薪火相传”的情况。

3.6 复制原理

Slave 启动成功连接到 Master 后会发送一个 sync 命令

Master 接到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令, 在后台进程执行完毕之后,Master 将传送整个数据文件到Slave,以完成一次完全同步

全量复制:Slave 服务在接收到数据库文件数据后,会将其存盘并加载到内存中。

增量复制:Master 继续将新的所有收集到的修改命令依次传给 Slave,完成同步

但是只要是重新连接 Master,一次完全同步(全量复制)将被自动执行

3.7 哨兵模式

参考链接:Redis哨兵(Sentinel)模式

主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。 这不是一种推荐的方式,更多时候,我们优先考虑 哨兵模式。Redis 从 2.8 开始正式提供了Sentinel(哨兵) 架构来解决这个问题。

哨兵模式概述

哨兵模式是一种特殊的模式,首先 Redis 提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是 哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个 Redis 实例。

可以理解成反客为主的自动版,能够在后台监控主机是否发生了故障,如果故障了会根据投票数自动将从库转换为主库。

Redis哨兵

这里的哨兵有两个作用

  • 通过发送命令,让 Redis 服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到 Master 宕机,会自动将 Slave 切换成 Master,然后通过 发布订阅模式 通知其他的从服务器,修改配置文件,让它们切换主机。

然而一个哨兵进程对 Redis 服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。

多哨兵监控Redis

用文字描述一下 故障切换(failover) 的过程。假设主服务器宕机,哨兵 1 先检测到这个结果,系统并不会马上进行 failover 过程,仅仅是哨兵 1 主观的认为主服务器不可用,这个现象成为 主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行 failover 操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为 客观下线。这样对于客户端而言,一切都是透明的。

使用步骤

1、在前面的配置上调整结构,使6379 带着6380、6381

2、在 Linux 的 /Redis 目录下新建 sentinel.conf 文件,名字绝不能错!

[root@cheny Redis]# touch sentinel.conf
[root@cheny Redis]# ls -l
总用量 2616
-rw------- 1 mofan mofan 2247528 12月  3 21:40 redis-6.0.8.tar.gz
-rw-r--r-- 1 root  root    84655 12月  6 14:35 redis6379.conf
-rw-r--r-- 1 root  root    84655 12月  6 14:41 redis6380.conf
-rw-r--r-- 1 root  root    84655 12月  6 14:43 redis6381.conf
-rw-r--r-- 1 root  root    84644 12月  5 22:16 redis_aof.conf
-rw-r--r-- 1 root  root    84643 12月  5 22:19 redis.conf
-rw-r--r-- 1 root  root        0 12月  6 16:57 sentinel.conf

3、配置哨兵,在 sentinel.conf 文件内填写内容,格式如下:

1
sentinel monitor <master-group-name> <ip> <port> <quorum>
  • sentinel monitor 被监控主机名字(自己起名字) 127.0.0.1 6379 1

  • 上面最后一个数字 1,表示主机挂掉后 Salve 投票看让谁接替成为主机,得票数多少后成为主机

比如:

sentinel monitor host6379 127.0.0.1 6379 1

4、启动哨兵,使用命令 redis-sentinel /Redis/sentinel.conf (上述目录依照各自的实际情况配置,可能目录不同)。比如:

[root@cheny Redis]# cd /usr/local/bin
[root@cheny bin]# ls -l
总用量 36032
-rw-r--r-- 1 root root   15041 12月  6 16:39 6379.log
-rw-r--r-- 1 root root   70376 12月  6 16:39 6380.log
-rw-r--r-- 1 root root  145855 12月  6 16:39 6381.log
-rw-r--r-- 1 root root    1017 12月  6 00:01 appendonly.aof
-rw-r--r-- 1 root root     236 12月  6 16:39 dump6379.rdb
-rw-r--r-- 1 root root     236 12月  6 16:39 dump6380.rdb
-rw-r--r-- 1 root root     236 12月  6 16:39 dump6381.rdb
-rw-r--r-- 1 root root     116 12月  6 14:14 dump.rdb
-rwxr-xr-x 1 root root 4719848 12月  4 12:51 redis-benchmark
-rwxr-xr-x 1 root root 8976960 12月  4 12:51 redis-check-aof
-rwxr-xr-x 1 root root 8976960 12月  4 12:51 redis-check-rdb
-rwxr-xr-x 1 root root 4976648 12月  4 12:51 redis-cli
lrwxrwxrwx 1 root root      12 12月  4 12:51 redis-sentinel -> redis-server
-rwxr-xr-x 1 root root 8976960 12月  4 12:51 redis-server
[root@cheny bin]# redis-sentinel /Redis/sentinel.conf

启动成功后截图:

哨兵模式启动成功后示例

5、正常主从演示

6、原有的 Master 挂了

7、投票新选

8、重新主从继续开工,info replication 查看信息

查看一下 6379、6380 和 6381 中的数据,三个客户端的数据是一样的,假设以 6379 为例:

127.0.0.1:6379> KEYS *
1) "k3"
2) "k1"
3) "k8"
4) "k5"
5) "k2"
6) "k6"
7) "k7"
8) "k4"

这个时候 6379 是主机,模拟 6379 挂掉:

127.0.0.1:6379> SHUTDOWN
not connected> exit

这时候哨兵会监控到 6379 挂了,然后对从机进行投票,选出一个新的主机。票选截图:

哨兵票选截图

可以看懂,6380 变成了主机,如果不信,执行命令查看一下:

127.0.0.1:6380> INFO replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6381,state=online,offset=40415,lag=1
master_replid:767f5c305d52ffe479b4a1154874e59532934f0e
master_replid2:4f1f8029918698d3d197168e34bfb0c66e01d04a
master_repl_offset:40415
second_repl_offset:28850
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:40415

从上述信息也可以看出,6380 变成了主机,6381 成为了 6381 的从机。

那么问题又来了,如果之前挂了的 Master 重启回来,会不会双 Master 冲突?尝试一下:

[root@cheny bin]# redis-server /Redis/redis6379.conf 
[root@cheny bin]# redis-cli -p 6379
127.0.0.1:6379> INFO replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6380
master_link_status:up
master_last_io_seconds_ago:1
master_sync_in_progress:0
slave_repl_offset:51848
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:767f5c305d52ffe479b4a1154874e59532934f0e
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:51848
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:51693
repl_backlog_histlen:156

6379 变成了 Salve,再看看 6380:

127.0.0.1:6380> INFO replication
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6381,state=online,offset=59450,lag=1
slave1:ip=127.0.0.1,port=6379,state=online,offset=59450,lag=1
master_replid:767f5c305d52ffe479b4a1154874e59532934f0e
master_replid2:4f1f8029918698d3d197168e34bfb0c66e01d04a
master_repl_offset:59450
second_repl_offset:28850
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:59450

不难得出结论,原 Master 活过来后,会变成 Salve 依附新的 Master,所以也就不会产生冲突。

其实从前面截取的哨兵票选截图也可以看到 6379 变成了 6380 的 Salve。

哨兵模式配置介绍

参考链接:Redis高可用方案哨兵机制------ 配置文件sentinel.conf详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# Example sentinel.conf

# 哨兵sentinel实例运行的端口 默认26379
port 26379

# 哨兵sentinel的工作目录
dir /tmp

# 哨兵sentinel监控的redis主节点的 ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 2

# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd


# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000

# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,
这个数字越小,完成failover所需的时间就越长,
但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1



# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000

# SCRIPTS EXECUTION

#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。

#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,
这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,
一个是事件的类型,
一个是事件的描述。
如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。
#通知脚本
# sentinel notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh

# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
# 以下参数将会在调用脚本时传给脚本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
# 目前<state>总是“failover”,
# <role>是“leader”或者“observer”中的一个。
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentinel client-reconfig-script <master-name> <script-path>
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

哨兵模式的优缺点

优点:

1、哨兵集群模式是基于主从模式的,所有主从的优点,哨兵模式同样具有

2、主从可以切换,故障可以转移,系统可用性更好。

3、哨兵模式是主从模式的升级,系统更健壮,可用性更高。

缺点:

1、Redis 较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂

2、实现哨兵模式的配置也不简单,甚至可以说有些繁琐

3.8 复制的缺点

复制延时

由于所有的写操作都是先在 Master 上操作,然后同步更新到 Slave 上,所以从 Master 同步到 Slave 机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave 机器数量的增加也会使这个问题更加严重。

4. Jedis

4.1 测试连通

Jedis 是 Redis 官方推荐的 Java 连接开发工具。要在 Java 开发中使用好 Redis 中间件,必须对 Jedis 熟悉才能写成漂亮的代码。

在使用 IDEA 连接虚拟机的 Redis 时,需要先修改配置文件 redis.conf。因为我是使用的是拷贝的配置文件 redis_aof.conf,所以需要修改这个配置文件,使用命令 vi redis_aof.conf

在 Redis 6.0.8 中,修改 NETWORK 栏中的配置:

Jedis连接前修改配置文件

然后在虚拟机中重启 Redis 客户端。

咱们用 IDEA 创建一个 Maven 项目,然后导入 jedis 依赖:

1
2
3
4
5
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>

编写测试代码:

1
2
3
4
5
6
7
8
9
10
/**
* @author mofan 2020/12/6
*/
public class TestPing {
public static void main(String[] args) {
// 第一个参数为 Redis 服务器的 ip 地址,我在此处修改了开发机的 host 文件
Jedis jedis = new Jedis("redis", 6379);
System.out.println(jedis.ping());
}
}

然后运行代码,可以在控制台看到打印出来的 PONG,证明我们连接成功。

4.2 常用 API

数据添加和获取

现在 Linux 中看看 Redis 里有没有数据:

127.0.0.1:6379> KEYS *
(empty array)

可以,没有数据。

返回 Windows,编写代码,向里面添加数据,并获取:

1
2
3
4
5
6
7
8
9
10
public static void testSetAndGet() {
Jedis jedis = new Jedis("redis", 6379);
jedis.set("k1", "v1");
jedis.set("k2", "v2");
jedis.set("k3", "v3");

System.out.println(jedis.get("k2")); // v2
Set<String> keys = jedis.keys("*");
System.out.println(keys.size()); // 3
}

执行代码,控制台打印成功,返回 Linux 下的 Redis,查看数据:

127.0.0.1:6379> KEYS *
1) "k3"
2) "k2"
3) "k1"

数据添加成功!API 使用成功。

Jedis 的操作方法就和我们在 Linux 中书写命令是一样的,只要命令熟悉了,没什么操作不能一个点不能点出来的,因此就不再赘述了。

List 列表排序

讲一下 Jedis 实现列表排序。

使用排序, 首先需要生成一个排序对象:

1
SortingParams  sortingParams =  new SortingParams();

三个排序 API 如下:

语法 描述
jedis.sort(String key, sortingParams.alpha()) 队列按首字母 a-z 排序
jedis.sort(String key, sortingParams.asc()) 队列按数字升序排列
jedis.sort(String key, sortingParams.desc()) 队列按数字降序排列

测试代码:

1
2
3
4
5
6
7
8
9
10
11
public static void testSortedList() {
Jedis jedis = new Jedis("redis", 6379);
jedis.lpush("list01", "1", "2", "3", "a", "b", "c");

System.out.println(jedis.lrange("list01", 0, -1));
SortingParams sortingParams = new SortingParams();

System.out.println(jedis.sort("list01", sortingParams.alpha()));
System.out.println(jedis.sort("list01", sortingParams.asc()));
System.out.println(jedis.sort("list01", sortingParams.desc()));
}

输出结果:

[c, b, a, 3, 2, 1]
[1, 2, 3, a, b, c]
[1, 2, 3, a, b, c]
[c, b, a, 3, 2, 1]

4.3 事务

日常 API 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void testTransaction() {
Jedis jedis = new Jedis("redis", 6379);
Transaction transaction = jedis.multi();

transaction.set("k4", "v4");
transaction.set("k5", "v5");

transaction.exec();
// 出现异常,回滚 一般放在 catch 块中
// transaction.discard();

// v4
System.out.println(jedis.get("k4"));
// v5
System.out.println(jedis.get("k5"));
}

加锁

在测试加锁之前,先在 Linux 的 Redis 中插入两条数据。

127.0.0.1:6379> set balance 100
OK
127.0.0.1:6379> set debt 0
OK
127.0.0.1:6379> get balance
"100"
127.0.0.1:6379> get debt
"0"

然后编写程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public boolean transMethod() throws InterruptedException {
Jedis jedis = new Jedis("redis", 6379);
int balance; // 可用余额
int debt; // 欠额
// 实刷额度
int amtToSubtract = 10;

jedis.watch("balance");
// 模拟其他程序已经修改了该条目
jedis.set("balance", "5");
Thread.sleep(7000);
balance = Integer.parseInt(jedis.get("balance"));
if (balance < amtToSubtract) {
jedis.unwatch();
System.out.println("modify");
return false;
} else {
System.out.println("***********transaction");
Transaction transaction = jedis.multi();
transaction.decrBy("balance", amtToSubtract);
transaction.incrBy("debt", amtToSubtract);
transaction.exec();
balance = Integer.parseInt(jedis.get("balance"));
debt = Integer.parseInt(jedis.get("debt"));

System.out.println("*******" + balance);
System.out.println("*******" + debt);
return true;
}
}

/**
* 通俗点讲,watch命令就是标记一个键,如果标记了一个键,
* 在提交事务前如果该键被别人修改过,那事务就会失败,这种情况通常可以在程序中 重新再尝试一次。
* 首先标记了键balance,然后检查余额是否足够,不足就取消标记,并不做扣减;
* 足够的话,就启动事务进行更新操作,
* 如果在此期间键balance被其它人修改, 那在提交事务(执行exec)时就会报错,
* 程序中通常可以捕获这类错误再重新执行一次,直到成功。
*
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
TestMulti test = new TestMulti();
boolean retValue = test.transMethod();
System.out.println("main retValue-------: " + retValue);
}

最终控制台打印:

modify
main retValue-------: false

4.4 主从复制

在 Linux 上让 6379 和 6380 启动,并让各自先独立,即:两个客户端的角色都是 Master,同时清空数据(如果使用了其他配置文件,记得按照最开始那样修改配置文件)。

然后编写代码,实现主机写,从机读:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
Jedis jedisM = new Jedis("redis", 6379);
Jedis jedisS = new Jedis("redis", 6380);

jedisS.slaveof("redis", 6379);
jedisM.set("mofan", "212");
// 添加速度很快,可能有延迟,可以去 Redis 中查看或者再次执行
String result = jedisS.get("mofan");
System.out.println(result);
}

4.5 JedisPool

JedisPoolUtil

一个工具类,并且是单例的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @author mofan 2020/12/6
*/
public class JedisPoolUtil {
private static volatile JedisPool jedisPool = null;

private JedisPoolUtil() {}

public static JedisPool getJedisPoolInstance() {
if (null == jedisPool) {
synchronized (JedisPoolUtil.class) {
if (null == jedisPool) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(1000);
poolConfig.setMaxIdle(32);
poolConfig.setMaxWaitMillis(100 * 1000);
poolConfig.setTestOnBorrow(true);

jedisPool = new JedisPool(poolConfig, "redis", 6379);
}
}
}
return jedisPool;
}

public static void release(JedisPool jedisPool, Jedis jedis) {
if (null != jedis) {
jedisPool.close();
}
}

}

测试连接池的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @author cheny 2020/12/6
*/
public class TestPool {
public static void main(String[] args) {
JedisPool jedisPool = JedisPoolUtil.getJedisPoolInstance();
JedisPool jedisPool2 = JedisPoolUtil.getJedisPoolInstance();

System.out.println(jedisPool == jedisPool2);

Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set("aa", "bb");
System.out.println(jedis.get("aa"));
} catch (Exception e) {
e.printStackTrace();
} finally {
JedisPoolUtil.release(jedisPool, jedis);
}
}
}

运行后,控制台会输出 truebb

控制台可能会出现一下问题,但是不影响运行结果:

1
2
3
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

其实就是缺少 SLF4J,在 pom.xml 中导入就行了:

1
2
3
4
5
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>

配置总结

JedisPool 的配置参数大部分是由 JedisPoolConfig 的对应项来赋值的。

  • maxActive:控制一个 pool 可分配多少个 jedis 实例,通过 pool.getResource() 来获取。如果赋值为 -1,则表示不限制;如果 pool 已经分配了maxActive 个 jedis 实例,则此时 pool 的状态为 exhausted。
  • maxIdle:控制一个 pool 最多有多少个状态为 idle(空闲) 的 jedis 实例;
  • whenExhaustedAction:表示当 pool 中的 jedis 实例都被 allocated 完时,pool 要采取的操作;默认有三种。
    • WHEN_EXHAUSTED_FAIL:表示无jedis实例时,直接抛出NoSuchElementException;
    • WHEN_EXHAUSTED_BLOCK:则表示阻塞住,或者达到 maxWait 时抛出 JedisConnectionException;
    • WHEN_EXHAUSTED_GROW:则表示新建一个 jedis 实例,也就说设置的 maxActive 无用;
  • maxWait:表示当 borrow 一个 jedis 实例时,最大的等待时间,如果超过等待时间,则直接抛 JedisConnectionException;
  • testOnBorrow:获得一个 jedis 实例的时候是否检查连接可用性(ping());如果为 true,则得到的jedis实例均是可用的;
  • testOnReturn:return 一个 jedis 实例给 pool 时,是否检查连接可用性(ping());
  • testWhileIdle:如果为 true,表示有一个 idle object evitor 线程对 idle object 进行扫描,如果 validate 失败,此 object 会被从 pool 中 drop 掉;这一项只有在 timeBetweenEvictionRunsMillis 大于 0 时才有意义;
  • timeBetweenEvictionRunsMillis:表示 idle object evitor 两次扫描之间要 sleep 的毫秒数;
  • numTestsPerEvictionRun:表示 idle object evitor 每次扫描的最多的对象数;
  • minEvictableIdleTimeMillis:表示一个对象至少停留在idle状态的最短时间,然后才能被 idle object evitor 扫描并驱逐;这一项只有在 timeBetweenEvictionRunsMillis 大于 0 时才有意义;
  • softMinEvictableIdleTimeMillis:在 minEvictableIdleTimeMillis 基础上,加入了至少 minIdle 个对象已经在 pool 里面了。如果为 -1,evicted 不会根据 idle time 驱逐任何对象。如果 minEvictableIdleTimeMillis > 0,则此项设置无意义,且只有在 timeBetweenEvictionRunsMillis 大于 0 时才有意义;
  • lifo:borrowObject 返回对象时,是采用 DEFAULT_LIFO(last in first out,即类似 cache 的最频繁使用队列),如果为 false,则表示 FIFO 队列;

其中JedisPoolConfig对一些参数的默认设置如下:

  • testWhileIdle = true
  • minEvictableIdleTimeMills = 60000
  • timeBetweenEvictionRunsMillis = 30000
  • numTestsPerEvictionRun = -1

注意:以上部分配置在高版本的 Jedis 中已被启用,导致编写代码时无法使用,官方也提供了代替的方法,因此上述配置仅供参考。

5. 三种特殊数据类型

5.1 Geospatial 地理位置

Geospatial 在 Redis 3.2 版本中推出, 这个功能可以将用户给定的地理位置信息储存起来, 并对这些信息进行操作,来实现一些依赖于地理位置信息的功能,比如:附近的人。

GEO 的数据类型为 Zset。

GEO 的数据结构总共有六个常用命令:

GEO常用命令

具体操作可以参考这个链接:https://www.redis.net.cn/order/3685.html

如果想要强制输出中文,可以重新连接 redis-cli,增加参数 --raw,而不然会乱码。

geoadd

使用这个命令可以将给定的空间元素(纬度、经度、名字)添加到指定的键里面。

语法介绍:

1
geoadd key longitude latitude member ...

这些数据会以有序集合的形式被储存在键里面。

geoadd 命令以标准的 x , y 格式接受参数,所以用户必须先输入经度,然后再输入纬度。

geoadd 能够记录的坐标是有限的:非常接近两极的区域无法被索引。

有效的经度介 -180 到 180 度之间,有效的纬度介于 -85.05112878 度至 85.05112878 度之间。当用户尝试输入一个超出范围的经度或者纬度,geoadd 命令将返回一个错误。

手动添加一些数据,用于后续测试:

127.0.0.1:6379> GEOADD china:city 116.23 40.22 beijing
(integer) 1
127.0.0.1:6379> GEOADD china:city 121.48 31.40 shanghai 113.88 22.55 shenzhen 120.21 30.20 hangzhou
(integer) 3
127.0.0.1:6379> GEOADD china:city 106.54 29.40 chongqing 102.54 30.05 chengdu 108.93 34.23 sian
(integer) 3

geopos

语法介绍:

1
geopos key member [member...]

使用这个命令从 key 里返回所有给定位置元素的位置(经度和纬度)。

127.0.0.1:6379> GEOPOS china:city beijing
1) 1) "116.23000055551528931"
   2) "40.2200010338739844"
127.0.0.1:6379> GEOPOS china:city shanghai chengdu
1) 1) "121.48000091314315796"
   2) "31.40000025319353938"
2) 1) "102.54000037908554077"
   2) "30.05000015956613169"
127.0.0.1:6379> GEOPOS china:city wuhan
1) (nil)

geodist

语法介绍:

1
geodist key member1 member2 [unit]

使用这个命令可以返回两个给定位置之间的距离,如果两个位置之间的其中一个不存在,那么返回空值。

指定单位的参数 unit 必须是以下单位的其中一个:

  • m 表示单位为米

  • km 表示单位为千米

  • mi 表示单位为英里

  • ft 表示单位为英尺

如果用户没有显式地指定单位参数,那么 geodist 默认 使用米作为单位。

geodist 命令在计算距离时会假设地球为完美的球形,因此在极限情况下,这一假设最大会造成 0.5% 的误差。

127.0.0.1:6379> GEODIST china:city beijing shanghai
"1088785.4302"
127.0.0.1:6379> GEODIST china:city beijing shanghai km
"1088.7854"
127.0.0.1:6379> GEODIST china:city chongqing chengdu km
"393.0455"

georadius

语法解析:

1
georadius key longitude latitude radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count]

使用这个命令可以以给定的经纬度为中心, 找出某一半径内的元素。(类似“附近的人”功能)

在 key 中寻找坐标 longitude latitude 半径为 radius 的城市,withcoord 显示经纬度,withdist 显示中心距离,[count count1] 可以限定个数为 count1 个。

127.0.0.1:6379> GEORADIUS china:city 100 30 1000 km
1) "chengdu"
2) "chongqing"
3) "sian"
127.0.0.1:6379> GEORADIUS china:city 100 30 1000 km withdist
1) 1) "chengdu"
   2) "244.6616"
2) 1) "chongqing"
   2) "635.2850"
3) 1) "sian"
   2) "963.3171"
127.0.0.1:6379> GEORADIUS china:city 100 30 1000 km withcoord
1) 1) "chengdu"
   2) 1) "102.54000037908554077"
      2) "30.05000015956613169"
2) 1) "chongqing"
   2) 1) "106.54000014066696167"
      2) "29.39999880018641676"
3) 1) "sian"
   2) 1) "108.92999857664108276"
      2) "34.23000121926852302"
127.0.0.1:6379> GEORADIUS china:city 100 30 1000 km withcoord withdist count 1
1) 1) "chengdu"
   2) "244.6616"
   3) 1) "102.54000037908554077"
      2) "30.05000015956613169"
127.0.0.1:6379> GEORADIUS china:city 100 30 1000 km withcoord withdist count 2
1) 1) "chengdu"
   2) "244.6616"
   3) 1) "102.54000037908554077"
      2) "30.05000015956613169"
2) 1) "chongqing"
   2) "635.2850"
   3) 1) "106.54000014066696167"
      2) "29.39999880018641676"

georadiusbymember

语法解析:

1
georadiusbymember key member radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count]

这个命令可以找出位于指定范围内的元素,中心点是由给定的位置元素决定,而不是经纬度。(这才是正儿八经的“附近的人”)

127.0.0.1:6379> GEORADIUSBYMEMBER china:city chengdu 1000 km
1) "chengdu"
2) "chongqing"
3) "sian"
127.0.0.1:6379> GEORADIUSBYMEMBER china:city chengdu 100 km
1) "chengdu"
127.0.0.1:6379> GEORADIUSBYMEMBER china:city chengdu 400 km
1) "chengdu"
2) "chongqing"

geohash

语法解析:

1
geohash key member [member...]

这个命令可以将二维经纬度转换为一维字符串,字符串越长表示位置更精确,两个字符串越相似表示距离越近。

127.0.0.1:6379> GEOHASH china:city chengdu chongqing
1) "wm2fvq7mgn0"
2) "wm5z22h53v0"
127.0.0.1:6379> GEOHASH china:city chengdu sian
1) "wm2fvq7mgn0"
2) "wqj6wz2khy0"

zrem

GEO 没有提供删除的功能,但GEO 的底层实现是 Zset,所以可以借用 zrem 命令实现对地理位置信息的删除,也可以使用其他 Zset 的命令。

127.0.0.1:6379> GEOADD china:city 116.23 40.22 Peiking
(integer) 1
127.0.0.1:6379> ZRANGE china:city 0 -1
1) "chengdu"
2) "chongqing"
3) "sian"
4) "shenzhen"
5) "hangzhou"
6) "shanghai"
7) "Peiking"
8) "beijing"
127.0.0.1:6379> ZREM china:city Peiking
(integer) 1
127.0.0.1:6379> ZREM china:city beijing
(integer) 1
127.0.0.1:6379> ZRANGE china:city 0 -1
1) "chengdu"
2) "chongqing"
3) "sian"
4) "shenzhen"
5) "hangzhou"
6) "shanghai"

5.2 HyperLogLog

HyperLogLog 概述

参考链接:Redis HyperLogLog

Redis 在 2.8.9 版本添加了 HyperLogLog 结构。

Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。

在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 264 个不同元素的基 数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。

但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

那这有什么用呢?

比如我们想实现网站的浏览用户数量(UV),一个人多次访问这个网站,还是算作一个人,就可以使用 HyperLogLog 来实现。

如果不使用 HyperLogLog,就需要使用 Set 来保存用户的信息,这就比较麻烦,毕竟我们只是为了计数,而不是为了获取用户信息。但是这种方式有 0.81% 的错误率,但对于统计 UV 这种不需要很精确的数据是可以忽略不计的。

那什么是基数呢?

比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。 基数估计就是在误差可接受的范围内,快速计算基数。

基本命令

命令 描述
PFADD key element [element …] 添加指定元素到 HyperLogLog 中。
PFCOUNT key [key …] 返回给定 HyperLogLog 的基数估算值。
PFMERGE destkey sourcekey [sourcekey …] 将多个 HyperLogLog 合并为一个 HyperLogLog

具体使用

127.0.0.1:6379> PFADD hyll1 a a b b c c d d
(integer) 1
127.0.0.1:6379> PFCOUNT hyll1
(integer) 4
127.0.0.1:6379> PFADD hyll2 e e e  f f g g g z
(integer) 1
127.0.0.1:6379> PFCOUNT hyll2
(integer) 4
127.0.0.1:6379> PFMERGE hyll1 hyll2
OK
127.0.0.1:6379> PFCOUNT hyll1
(integer) 8
127.0.0.1:6379> PFCOUNT hyll2
(integer) 4

5.3 Bitmaps

概述

Bitmaps(位图),Bitmaps 本身不是一种数据结构,实际上就是字符串,但是它可以对字符串的位进行操作,是定义在String类型上的一个面向字节操作的集合。

可以把 Bitmaps 想象成一个以位为单位数组,数组中的每个单元只能存 0 或者 1,数组的下标在 Bitmaps 中叫做偏移量。因为字符串是二进制安全的块,他们的最大长度是 512M,最适合设置成2^32个不同字节。单个 Bitmaps 的最大长度是 512MB,即 232 个比特位。

Bitmaps 的最大优势之一在存储信息时极其节约空间。例如,在一个以增量用户 ID 来标识不同用户的系统中,记录用户的四十亿的一个单独 bit 信息(例如,要知道用户是否想要接收最新的来信)仅仅使用 512M 内存。

具体使用

比如,我们可以使用 Bitmaps 来统计某一员工在一周内的打开次数,打卡了设置成 1,没打开设置成 0。

可以使用 SETBIT key offset value 来 key 的第 offset 位为 value (1或0)。比如添加打卡信息:

127.0.0.1:6379> SETBIT mofan 0 1
(integer) 0
127.0.0.1:6379> SETBIT mofan 1 1
(integer) 0
127.0.0.1:6379> SETBIT mofan 2 1
(integer) 0
127.0.0.1:6379> SETBIT mofan 3 0
(integer) 0
127.0.0.1:6379> SETBIT mofan 4 0
(integer) 0
127.0.0.1:6379> SETBIT mofan 5 0
(integer) 0
127.0.0.1:6379> SETBIT mofan 6 1
(integer) 0

上图就表示 mofan 的打卡情况(真迷的打卡情况,997 警告)。

可以使用 GETBIT key offset 来获取 offset 设置的值,未设置过默认返回 0。比如获取某一天的打卡情况:

127.0.0.1:6379> GETBIT mofan 1
(integer) 1
127.0.0.1:6379> GETBIT mofan 4
(integer) 0
127.0.0.1:6379> GETBIT mofan 2
(integer) 1

可以使用 bitcount key [start, end] 统计 key 中位为 1 的个数,还可以指定范围。比如获取一周的打卡次数:

127.0.0.1:6379> BITCOUNT mofan
(integer) 4

BITOP 命令

语法解析:

1
BITOP operation destkey key [key ...]

BITOP 命令用于对一个或多个保存二进制位的字符串 key 进行 位元 操作,并将结果保存到 destkey 上。

operation 可以是 ANDORNOTXOR 这四种操作中的任意一种:

  • BITOP AND destkey key [key ...] ,对一个或多个 key 求逻辑并,并将结果保存到 destkey
  • BITOP OR destkey key [key ...] ,对一个或多个 key 求逻辑或,并将结果保存到 destkey
  • BITOP XOR destkey key [key ...] ,对一个或多个 key 求逻辑异或,并将结果保存到 destkey
  • BITOP NOT destkey key ,对给定 key 求逻辑非,并将结果保存到 destkey

除了 NOT 操作之外,其他操作都可以接受一个或多个 key 作为输入。

BITOP 处理不同长度的字符串时,较短的那个字符串所缺少的部分会被看作 0 。空的 key 也被看作是包含 0 的字符串序列。

BITOP 命令的返回值是保存到 destkey 中的字符串的长度,和输入 key 中最长的字符串长度相等。

127.0.0.1:6379> SETBIT bits-1 0 1        # bits-1 = 1001
(integer) 0
127.0.0.1:6379> SETBIT bits-1 3 1
(integer) 0
127.0.0.1:6379> SETBIT bits-2 0 1        # bits-2 = 1011
(integer) 0
127.0.0.1:6379> SETBIT bits-2 1 1
(integer) 0
127.0.0.1:6379> SETBIT bits-2 3 1
(integer) 0
127.0.0.1:6379> BITOP AND and-result bits-1 bits-2
(integer) 1
127.0.0.1:6379> GETBIT and-result 0      # and-result = 1001
(integer) 1
127.0.0.1:6379> GETBIT and-result 1
(integer) 0
127.0.0.1:6379> GETBIT and-result 2
(integer) 0
127.0.0.1:6379> GETBIT and-result 3
(integer) 1

6. 缓存穿透、击穿、雪崩

Redis 缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。

另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。

参考链接:

帮你解读什么是Redis缓存穿透和缓存雪崩(包含解决方案)

最佳实践 缓存穿透,瞬间并发,缓存雪崩的解决方法

6.1 缓存穿透

缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时需要从数据库查询,查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到数据库去查询,造成缓存穿透。

简单来说就是:用户想要查询一个数据,发现 Redis 内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。

解决方法

1、布隆过滤器

布隆过滤器是一种数据结构,垃圾网站和正常网站加起来全世界据统计也有几十亿个。网警要过滤这些垃圾网站,总不能到数据库里面一个一个去比较吧,这就可以使用布隆过滤器。假设我们存储一亿个垃圾网站地址。

可以先有一亿个二进制比特,然后网警用八个不同的随机数产生器(F1, F2, …, F8) 产生八个信息指纹(f1, f2, …, f8)。接下来用一个随机数产生器 G 把这八个信息指纹映射到 1 到1亿中的八个自然数 g1, g2, …, g8。最后把这八个位置的二进制全部设置为一。过程如下:

布隆过滤器结构

有一天网警查到了一个可疑的网站,想判断一下是否是 XX 网站,首先将可疑网站通过哈希映射到 1 亿个比特数组上的 8 个点。如果 8 个点的其中有一个点不为 1,则可以判断该元素一定不存在集合中。

那这个布隆过滤器是如何解决 Redis 中的缓存穿透呢?很简单首先也是对所有可能查询的参数以 Hash 形式存储,当用户想要查询的时候,使用布隆过滤器发现不在集合中,就直接丢弃,不再对持久层查询。

布隆过滤器

2、缓存空对象

当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源。

缓存空对象

但是这种方法会存在两个问题:

  • 如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;

  • 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。

6.2 缓存击穿

这里需要注意和缓存击穿的区别,缓存击穿,是指一个 key 非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个 key 在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。

当某个 key 在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。

简单来说,缓存穿透是查询缓存和数据库中都没有的数据,而缓存击穿是查询缓存中没有(一般是缓存时间到期)但数据库中有的数据。

解决方案

1、设置热点数据永不过期

从缓存层面来看,没有设置过期时间,所以不会出现热点 key 过期后产生的问题。

2、加互斥锁
分布式锁:使用分布式锁,保证对于每个 key 同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。

6.3 缓存雪崩

如果缓存集中在一段时间内失效,发生大量的缓存穿透,所有的查询都落在数据库上,造成了缓存雪崩。

或者说缓存层出现了错误,不能正常工作了,于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

这个没有完美解决办法,但可以分析用户行为,尽量让失效时间点均匀分布。大多数系统设计者考虑用加锁或者队列的方式保证缓存的单线程(进程)写,从而避免失效时大量的并发请求落到底层存储系统上。

雪崩的时候,每一片雪花都在勇闯天涯!❄️

缓存雪崩

解决方法

1、均匀过期

设置不同的过期时间,让缓存失效的时间点尽量均匀。通常可以为有效期增加随机值或者统一规划有效期。

2、加互斥锁

跟缓存击穿解决思路一致,同一时间只让一个线程构建缓存,其他线程阻塞排队。

3、缓存永不过期

跟缓存击穿解决思路一致,缓存在物理上永远不过期,用一个异步的线程更新缓存。

4、双层缓存策略

使用主备两层缓存,他们的作用分别是:

主缓存:有效期按照经验值设置,设置为主读取的缓存,主缓存失效后从数据库加载最新值。

备份缓存:有效期长,获取锁失败时读取的缓存,主缓存更新时需要同步更新备份缓存。

6.4 缓存预热

缓存预热就是系统上线后,将相关的缓存数据直接加载到缓存系统,这样就可以避免在用户请求的时候,先查询数据库,然后再将数据回写到缓存。

如果不进行预热, 那么 Redis 初始状态数据为空,系统上线初期,对于高并发的流量,都会访问到数据库中, 对数据库造成流量的压力。

缓存预热的操作方法

1、数据量不大的时候,工程启动的时候进行加载缓存动作;

2、数据量大的时候,设置一个定时任务脚本,进行缓存的刷新;

3、数据量太大的时候,优先保证热点数据进行提前加载到缓存。

6.5 缓存降级

缓存降级是指缓存失效或缓存服务器挂掉的情况下,不去访问数据库,直接返回默认数据或访问服务的内存数据。

在项目实战中通常会将部分热点数据缓存到服务的内存中,这样一旦缓存出现异常,可以直接使用服务的内存数据,从而避免数据库遭受巨大压力。

降级一般是有损的操作,所以尽量减少降级对于业务的影响程度。

7. 内存淘汰策略

7.1 内存淘汰策略的定义

Redis 内存淘汰策略是指当缓存内存不足时,通过淘汰旧数据处理新加入数据选择的策略。

7.2 最大内存的配置

通过配置文件配置

可以通过修改 redis.conf 配置文件来配置最大内存:

1
maxmemory 1024mb  # 设置Redis最大占用内存大小为1024M

注意: maxmemory 默认配置为 0,在 64 位操作系统下 Redis 最大内存为操作系统剩余内存,在 32 位操作系统下 Redis 最大内存为 3GB。

动态命令配置

Redis 支持运行时通过命令动态修改内存大小:

127.0.0.1:6379> config set maxmemory 200mb //设置Redis最大占用内存大小为200M
127.0.0.1:6379> config get maxmemory //获取设置的Redis能使用的最大内存大小
1) "maxmemory"
2) "209715200"

7.3 淘汰策略的分类

Redis 官方定义了以下八种策略来淘汰旧数据加入新数据:

noeviction

默认策略,对于写请求直接返回错误,不进行淘汰。

allkeys-lru

lru(less recently used),最近最少使用。从所有的 key 中使用近似 LRU 算法进行淘汰。

volatile-lru

lru(less recently used),最近最少使用。从设置了过期时间的 key 中使用近似 LRU 算法进行淘汰。

allkeys-random

从所有的 key 中随机淘汰。

volatile-random

从设置了过期时间的 key 中随机淘汰。

volatile-ttl

ttl(time to live),在设置了过期时间的 key 中根据 key 的过期时间进行淘汰,越早过期的越优先被淘汰。

allkeys-lfu

lfu(Least Frequently Used),最少使用频率。从所有的 key 中使用近似 LFU 算法进行淘汰。这种策略从 Redis 4.0 开始支持。

volatile-lfu

lfu(Least Frequently Used),最少使用频率。从设置了过期时间的 key 中使用近似 LFU 算法进行淘汰。这种策略从 Redis 4.0 开始支持。

注意: 当使用 volatile-lru、volatile-random、volatile-ttl 这三种策略时,如果没有设置过期的 key 可以被淘汰,则和 noeviction 一样返回错误。

7.4 LRU 最近最少使用

LRU(Least Recently Used),即最近最少使用,是一种缓存置换算法。其核心思想是:如果一个数据在最近一段时间没有被用到,那么将来被使用到的可能性也很小,所以就可以被淘汰掉。

LRU 在 Redis 中的实现

Redis 使用的是近似 LRU 算法,它跟常规的 LRU 算法还不太一样。近似 LRU 算法通过随机采样法淘汰数据,每次随机出 5 个(默认)key,从里面淘汰掉最近最少使用的 key。

可以通过 maxmemory-samples 参数修改采样数量, 如:maxmemory-samples 10

maxmenory-samples 配置的越大,淘汰的结果越接近于严格的 LRU 算法,但因此耗费的 CPU 也很高。

Redis 为了实现近似 LRU 算法,给每个 key 增加了一个额外增加了一个 24bit 的字段,用来存储该 key 最后一次被访问的时间。

Redis3.0 对近似 LRU 的优化

Redis3.0 对近似 LRU 算法进行了一些优化。新算法会维护一个候选池(大小为 16),池中的数据根据访问时间进行排序,第一次随机选取的 key 都会放入池中,随后每次随机选取的 key 只有在访问时间小于池中最小的时间才会放入池中,直到候选池被放满。当放满后,如果有新的 key 需要放入,则将池中最后访问时间最大(最近被访问)的移除。

当需要淘汰的时候,则直接从池中选取最近访问时间最小(最久没被访问)的 key 淘汰掉就行。

7.5 LFU 最少使用频率

LFU(Least Frequently Used),是 Redis4.0 新加的一种淘汰策略,它的核心思想是根据 key 的最近被访问的频率进行淘汰,很少被访问的优先被淘汰,被访问的多的则被留下来。

LFU 算法能更好的表示一个 key 被访问的热度。假如你使用的是 LRU 算法,一个 key 很久没有被访问到,只刚刚是偶尔被访问了一次,那么它就被认为是热点数据,不会被淘汰,而有些 key 将来是很有可能被访问到的则被淘汰了。如果使用 LFU 算法则不会出现这种情况,因为使用一次并不会使一个 key 成为热点数据。

8. 内存模型

Redis 内部使用文件事件处理器 file event handler,这个文件事件处理器是单线程的,所以 Redis 也被叫做单线程的模型。它采用 IO 多路复用机制同时监听多个 Socket,根据 Socket 上的事件来选择对应的事件处理器进行处理。

那为啥 Redis 单线程模型也能效率这么高?

1、纯内存操作

2、核心是基于非阻塞的 IO 多路复用机制

3、单线程反而避免了多线程的频繁上下文切换问题

9. 整合 SpringBoot

9.1 基础使用

环境准备

在SpringBoot 中操作数据可以使用:Spring Data xxx

我们要使用 SpringBoot 操作 Redis 就要使用 Spring Data Redis。

新建一个 SpringBoot 项目,导入以下依赖:

导入整合Redis需要的依赖

进 pom.xml 文件中看看导入的 Redis 相关依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

点进这个依赖可以看到使用的是 lettuce 来操作 Redis:

1
2
3
4
5
6
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.0.1.RELEASE</version>
<scope>compile</scope>
</dependency>

在 SpringBoot 2.x 之后,原来使用的 Jedis 被替换为 lettuce。那这是为什么呢?

Jedis:采用的直连。多个线程操作的话,是不安全的,如果想要避免不安全的,使用 jedispool 连接池!这更像 BIO 模式。

lettuce:采用 Netty,实例可以在多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据,更像 NIO 模式。

分析一下 Redis 自动装配的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
public RedisAutoConfiguration() {
}

@Bean
// 我们可以自己定义一个 RedisTemplate 来替换默认的
@ConditionalOnMissingBean(
name = {"redisTemplate"}
)
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
// 默认的 RedisTemplate 没有过多的设置, redis 对象都是需要序列化的!
// 两个范型都是 Object 类型,后续使用需要强制类型转换
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}

@Bean
@ConditionalOnMissingBean
// 由于 Sring 类型是 Redis 中常用的类型,所以单独提出一个 Bean
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}

通过上面的代码可以看到:

SpringBoot 会自动帮我们在容器中生成了一个 RedisTemplate 和一个 StringRedisTemplate。

RedisTemplate的泛型是 <Object,Object>,而我们需要一个泛型为 <String,Object> 形式的 RedisTemplate,导致后续编码需要强制类型转换。

RedisTemplate 没有设置数据存在 Redis 时,key 及 value 的序列化方式。

为什么要设置序列化方式呢?咱们后文介绍。

同时 SpringBoot 自动帮我们生成的 RedisTemplate 不好用,那应该怎么办呢?咱们后文也会介绍。

修改 SpringBoot 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 这里写虚拟机的静态 ip,我修改了 Windows 的 host 文件
spring.redis.host=redis
spring.redis.port=6379
# Redi s数据库索引(默认为 0)
spring.redis.database= 0
# 连接超时时间(毫秒)
spring.redis.timeout=1800000
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
# 最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0

编写测试代码

在测试之前,打开虚拟机,连接 Redis,清空库内数据。然后前往 SpringBoot 的测试包内编写测试方法,我们需要先注入 RedisTemplate,我们一般使用 RedisTemplate 来操作 Redis。

RedisTemplate 操作 Redis 不同的数据类型的方法:

RedisTemplate的opsFor方法

除了基本的操作,我们常用的方法都可以使用 RedisTemplate 来操作,比如事务、基本的 CRUD 等,比如:

1
2
3
4
5
6
7
@Test
void contextLoads() {
RedisConnection connection =
redisTemplate.getConnectionFactory().getConnection();
connection.flushDb();
connection.flushAll();
}

总的来说,有这么几个常用的类:

1、JedisPoolConfig 用于配置连接池

2、RedisConnectionFactory 用于配置连接信息,这里的 RedisConnectionFactory 是一个接
口,我们需要使用它的实现类,启动器中提供了以下四种工厂模型:

  • JredisConnectionFactory
  • JedisConnectionFactory
  • LettuceConnectionFactory
  • SrpConnectionFactory

3、 RedisTemplate 用于 Redis 的基本操作


那我们来编写一下测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
class RedisSpringbootApplicationTests {

@Autowired
private RedisTemplate redisTemplate;

@Test
void contextLoads() {
redisTemplate.opsForValue().set("mofan","mofan212.getee.io");
System.out.println(redisTemplate.opsForValue().get("mofan"));
}

}

然后运行一下代码,可以在控制台打印:mofan212.getee.io,成功获取到数据。

再前往 Linux 中,使用命令获取一下数据:

127.0.0.1:6379> keys *
1) "\xac\xed\x00\x05t\x00\x05mofan"

这,这是咋回事?乱码了?

我们可以分析一下 RedisTemplate 的源码,并可以看到没有对 Redis 对象进行序列化:

RedisAutoConfiguration 类的 redisTemplate 中,调用了无参构造方法。

1
2
3
4
5
6
7
8
9
10
@Bean
@ConditionalOnMissingBean(
name = {"redisTemplate"}
)
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}

点击进入这个无参构造方法,可以来到 RedisTemplate 类中:

1
2
3
4
/**
* Constructs a new <code>RedisTemplate</code> instance.
*/
public RedisTemplate() {}

看看这个类的成员变量:

RedisTemplate类的成员变量

可以看到所有 Redis 序列化对象都是 null,而这也是为什么前面会输出不正确的原因。

那怎么办呢?我们可以自己将 RedisTemplate 进行配置后注入 Spring 容器中,这样的话原来的 RedisTemplate 就会失效。原因能从这这个注解窥见:

1
2
3
4
5
6
7
8
@Bean
@ConditionalOnMissingBean(
name = {"redisTemplate"}
)
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
// 省略其他代码
}

既然如此,那么我们就自己编写一个 RedisTemplate 并注入 Spring 容器中吧!👊

9.2 自定义 RedisTemplate

简单测试

在编写之前,我们需要了解序列化的问题,一般来说,我们的实体类都会实现序列化接口,同时传输对象的时候都会使用 JSON 进行传输。

我们先试试使用 JSON 序列化传输。

Linux 连接 Redis 服务,打开 Redis 客户端,清空当前表。

在刚才的工程中创建一个实体类:

1
2
3
4
5
6
7
8
9
10
11
/**
* @author mofan 2020/12/8
*/
@Component
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {
private String name;
private int age;
}

编写测试方法:

1
2
3
4
5
6
7
8
@Test
public void test() throws JsonProcessingException {
// 一般来说我们会使用 JSON 来传递对象
User user = new User("默烦", 18);
String string = new ObjectMapper().writeValueAsString(user);
redisTemplate.opsForValue().set("mofan", string);
System.out.println(redisTemplate.opsForValue().get("mofan"));
}

测试一下,测试通过,控制台输出如下:

{"name":"默烦","age":18}

进 Linux 下用命令获取 key 试试:

127.0.0.1:6379> keys *
1) "\xac\xed\x00\x05t\x00\x05mofan"

还是乱码了,乱码的问题待会解决。

那如果我们不使用 JSON 序列化呢?

1
2
3
4
5
6
@Test
public void test() {
User user = new User("默烦", 18);
redisTemplate.opsForValue().set("mofan", user);
System.out.println(redisTemplate.opsForValue().get("mofan"));
}

清空 Redis 的库,再次运行测试,测试不会通过,控制台报以下错:

org.springframework.data.redis.serializer.SerializationException: Cannot serialize; nested exception is org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.lang.IllegalArgumentException: DefaultSerializer requires a Serializable payload but received an object of type [indi.mofan.entity.User]

简单来说,就是 User 实体类没有序列化。那我们序列化试试:

1
2
3
4
5
6
7
8
9
10
11
@Component
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User implements Serializable {

private static final long serialVersionUID = 1669659861831762829L;

private String name;
private int age;
}

再次运行测试方法,测试通过,控制台出现:

User(name=默烦, age=18)

还是进 Linux 下用命令获取 key 试试:

127.0.0.1:6379> keys *
1) "\xac\xed\x00\x05t\x00\x05mofan"

还好乱码。

但这至少证明了插入实体进 Redis 时必须要序列化。那每次都用 Jackson 进行序列化?还是每个实体自己实现序列化接口?

那也太麻烦了,RedisTemplate 可以进行序列化,但 SpringBoot 原版的 RedisTemplate 并没有进行序列化(前面已经分析过了)。

回归正题

既然 SpringBoot 提供的 RedisTemplate 不好用,那我们就自己配置一个 RedisTemplate,然后注入到 Spring 容器中。

RedisConfig.java 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package indi.mofan.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
* @author mofan 2020/12/8
*/
@Configuration
public class RedisConfig {

@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);

// JSON 序列化
Jackson2JsonRedisSerializer<Object> jsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance ,
ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
jsonRedisSerializer.setObjectMapper(objectMapper);
// String 的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

// key 采用 String 序列化
template.setKeySerializer(stringRedisSerializer);
// Hash 的 key 也采用 String 序列化
template.setHashKeySerializer(stringRedisSerializer);
// value 采用 Jackson 序列化
template.setValueSerializer(jsonRedisSerializer);
// Hash 的 value 采用 Jackson 序列化
template.setHashValueSerializer(jsonRedisSerializer);
template.afterPropertiesSet();

return template;
}

}

在上面的代码中,我们并没有使用 JDK 进行序列化,因为使用 JDK 进行序列化就会乱码。

那么我们来测试一下,实体类不实现序列化接口:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* @author mofan 2020/12/8
*/
@Component
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User{

private String name;
private int age;
}

测试方法如下:

1
2
3
4
5
6
@Test
public void test() throws JsonProcessingException {
User user = new User("默烦", 18);
redisTemplate.opsForValue().set("mofan", user);
System.out.println(redisTemplate.opsForValue().get("mofan"));
}

运行测试方法,控制台出现:

User(name=默烦, age=18)

再进 Linux 下用命令获取 key 试试:

127.0.0.1:6379> keys *
1) "mofan"

奈斯,没有乱码,我们的配置生效了!🎉

封装工具类

虽然可以直接使用 RedisTemplate 来操作 Redis,但是需要写很多的代码,不如我们封装一个工具类,让编码更简单。

将这个工具类取名为 RedisUtil,然后将其添加到 Spring 容器中,使用时直接 DI (依赖注入)然后调用方法即可。

RedisUtil.java 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
package indi.mofan.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
* @author mofan 2020/12/8
*/
@Component
public class RedisUtil {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

// ================== 公共的 =======================

/**
* 指定缓存失效时间
*
* @param key 键
* @param time 时间,单位秒
*/
public Boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 获取 key 的过期时间
*
* @param key key,不能为空
* @return 时间(秒) 返回0代表为永久有效
*/
public Long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}

/**
* 判断 key 是否存在
*
* @param key 键
* @return true表示存在,false不存在
*/
public Boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 删除缓存
*
* @param key 一个或多个 key
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(((Collection<String>) CollectionUtils.arrayToList(key)));
}
}
}

// ==================String=======================

/**
* 根据 key 获取 value
*
* @param key 键
* @return 对应的 value
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}

/**
* 添加 String 类型
*
* @param key
* @param value
* @return true成功,false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 添加 String 类型并设置时间
*
* @param key 键
* @param value 值
* @param time 时间,单位秒
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 递增
*
* @param key 键
* @param delta 要增加几(值大于 0)
*/
public Long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key);
}

/**
* 递减
*
* @param key 键
* @param delta 要减少几(值大于 0)
*/
public Long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}

// ========================Map===================

/**
* Hash Get
*
* @param key 键,不能为 null
* @param item 值,不能为 null
* @return
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}

/**
* 获取 key 对应的所有值
*
* @param key 键
* @return 键对应的多个值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}

/**
* HashSet
*
* @param key 键
* @param map 键对应的多个值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* HashSet 并设置时间
*
* @param key 键
* @param map 对应的多个值
* @param time 设置的时间,单位秒
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*    
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key   键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*    
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 删除 Hash 表的值
*
* @param key 键,不能为 null
* @param item 项,可以添加多个,不能为 null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}

/**
* 判断 Hash 表中是否有某项
*
* @param key 键,不能为null
* @param item 值,不能为null
* @return 存在即为 true,反之 false
*/
public boolean hHashKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}

/**
* Hash递增 如果不存在,就会创建一个,返回新增后的值
*
* @param key 键
* @param item 值
* @param by 增加数量(大于 0)
*/
public Double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}

/**
* Hash 递减
*
* @param key 键
* @param item 值
* @param by 减少数量(大于 0)
* @return
*/
public Double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}

// =========================Set==================

/**
* 根据 key 获取 Set 中的所有值
*
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* 根据 value 从一个 Set 中是否存在
*
* @param key 键
* @param value 值
*/
public Boolean sHashSet(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 将数据放入 Set 中
*
* @param key 键
* @param values 值,可以是多个
* @return 放入成功个数
*/
public Long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return (long) 0;
}
}

/**
* 将数据放入 Set 中并设置时间
*
* @param key 键
* @param time 时间,单位秒
* @param values 值,可以是多个
* @return 放入成功个数
*/
public Long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) {
expire(key, time);
}
return count;
} catch (Exception e) {
e.printStackTrace();
return (long) 0;
}
}

/**
* 获取 Set 的长度
*
* @param key 键
*/
public Long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return (long) 0;
}
}

/**
* 移除值为 value 的 Set
*
* @param key   键
* @param values 值 可以是多个
* @return 移除的个数
*    
*/
public Long setRemove(String key, Object... values) {
try {
return redisTemplate.opsForSet().remove(key, values);
} catch (Exception e) {
e.printStackTrace();
return (long) 0;
}
}

// =======================List===================

/**
* 获取 List 缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*    
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* 获取 List 缓存的长度
*
* @param key 键
*/
public Long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return (long) 0;
}
}

/**
* 通过索引 获取 List 中的值
*
* @param key   键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0
* 时,-1,表尾,-2倒数第二个元素,依次类推
*    
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* 将 List 放入缓存
*
* @param key 键
* @param value 值
*    
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 将 List 放入缓存
*
* @param key   键
* @param value 值
* @param time 时间(秒)
*    
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0) {
expire(key, time);
}

return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 将 List 放入缓存
*
* @param key   键
* @param value 值
* @return    
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将 List 放入缓存
*
* @param key   键
* @param value 值
* @param time 时间(秒)
* @return    
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 根据索引修改 List 中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 移除 N 个值为 value 的 List
*
* @param key   键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*    
*/

public Long lRemove(String key, long count, Object value) {
try {
return redisTemplate.opsForList().remove(key, count, value);
} catch (Exception e) {
e.printStackTrace();
return (long) 0;
}
}
}