大家好,我是曾续缘。今天我们将学习如何使用Redis来提升系统性能和实现高效缓存策略。我们将从搭建一个SpringBoot项目开始,逐步创建数据库表、连接数据库,并使用代码生成器简化开发。我们会探讨如何实现秒杀功能,并搭建Redis集群,学习分布式锁的运用。我们将优化秒杀逻辑,并实施缓存策略以解决缓存穿透等问题。通过这个教程,您将掌握使用Redis进行性能优化的实用技巧。让我们开始吧!
创建SpringBoot项目
先加Spring web
和Lombok
依赖,其他的需要用到的时候再加。
项目目录如下。
创建数据库
数据库表
用户表,可有可无,因为后面为了方便,我们使用MVC拦截器给每个请求设置了一个随机的用户ID来模拟访问。
查看代码
DROP TABLE IF EXISTS `t_user`;
CREATE TABLE `t_user` (
`id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`phone` varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '手机号码',
`username` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '' COMMENT '昵称,默认是用户id',
`password` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '' COMMENT '密码',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `uniqe_key_phone`(`phone`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1024 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;
优惠券表,保存了优惠券的基本信息,包括类型和状态等。
查看代码
DROP TABLE IF EXISTS `t_voucher`;
CREATE TABLE `t_voucher` (
`id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '代金券标题',
`pay_value` bigint(10) UNSIGNED NOT NULL COMMENT '支付金额,单位是分。例如100代表1元',
`actual_value` bigint(10) NOT NULL COMMENT '抵扣金额,单位是分。例如100代表1元',
`type` tinyint(1) UNSIGNED NOT NULL DEFAULT 0 COMMENT '0,普通券;1,秒杀券',
`status` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '1,上架; 2,下架; 3,过期',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1024 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;
秒杀券,其实是优惠券的一种类型,保存有额外的信息,比如库存,生效时间和失效时间。
查看代码
DROP TABLE IF EXISTS `t_seckill_voucher`;
CREATE TABLE `t_seckill_voucher` (
`voucher_id` bigint(20) UNSIGNED NOT NULL COMMENT '关联的优惠券的id',
`stock` int(8) NOT NULL COMMENT '库存',
`begin_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '生效时间',
`end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '失效时间',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`voucher_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '属于t_voucher表中的秒杀券,存在库存等额外信息' ROW_FORMAT = Compact;
订单表,就是用户抢到券后的凭证,需要保存到数据库中。
查看代码
DROP TABLE IF EXISTS `t_voucher_order`;
CREATE TABLE `t_voucher_order` (
`id` bigint(20) NOT NULL COMMENT '主键',
`user_id` bigint(20) UNSIGNED NOT NULL COMMENT '下单的用户id',
`voucher_id` bigint(20) UNSIGNED NOT NULL COMMENT '购买的代金券id',
`status` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下单时间',
`pay_time` timestamp NULL DEFAULT NULL COMMENT '支付时间',
`use_time` timestamp NULL DEFAULT NULL COMMENT '核销时间',
`refund_time` timestamp NULL DEFAULT NULL COMMENT '退款时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;
插入测试数据
我们在这里插入了两张优惠券,其中一种是秒杀券,库存100。
查看代码
INSERT INTO `t_user` (`phone`, `username`, `password`)
VALUES
('18812345678', '曾续缘', 'cengxuyuan'),
('13598765432', 'cengxuyuan', 'superidol'),
('15045678901', '粉丝', 'superidol');
INSERT INTO `t_voucher` (`title`, `pay_value`, `actual_value`, `type`, `status`)
VALUES
('优惠券A', 500, 400, 0, 1),
('秒杀券B', 1000, 600, 1, 1);
INSERT INTO `t_seckill_voucher` (`voucher_id`, `stock`, `begin_time`, `end_time`)
VALUES
(1025, 100, '2024-03-20 00:00:00', '2024-03-21 23:59:59');
连接数据库
导入依赖
导入mysql
和mybatis-plus
的依赖,同时也导入mybatis-plus代码生成器
相关的依赖。
查看代码
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
<version>3.5.5</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-engine-core</artifactId>
<version>2.2</version>
</dependency>
写配置文件
在application.yaml
中加上配置。
查看代码
spring:
application:
name: redis-cache
datasource:
username: root
password: root
url: jdbc:mysql://localhost:3306/redis-cache?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
server:
port: 8080
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
global-config:
db-config:
logic-delete-value: 1
logic-not-delete-value: 0
代码生成器
mp相关的代码生成器,可以生成entity、mapper、service、controller等代码。
查看代码
public class CodeGenerator {
public static void main(String[] args) {
FastAutoGenerator.create("jdbc:mysql://localhost:3306/redis-cache?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8",
"root", "root")
.globalConfig(builder -> {
builder.author("曾续缘")
.outputDir(System.getProperty("user.dir") + "/src/main/java");
})
.packageConfig(builder -> {
builder.parent("com.cengxuyuan.rediscache")
.pathInfo(Collections.singletonMap(OutputFile.entity, System.getProperty("user.dir") + "/src/main/java/com/cengxuyuan/rediscache/entity"));
})
.strategyConfig(builder -> {
builder.addInclude(scanner("表名,多个英文逗号分割").split(","))
.addTablePrefix("t_")
.entityBuilder()
.enableLombok()
.naming(NamingStrategy.underline_to_camel);
builder.controllerBuilder()
.enableRestStyle();
builder.mapperBuilder()
.enableMapperAnnotation();
builder.serviceBuilder()
.formatServiceFileName("%sService")
.formatServiceImplFileName("%sServiceImpl");
})
.execute();
}
public static String scanner(String tip) {
Scanner scanner = new Scanner(System.in);
StringBuilder help = new StringBuilder();
help.append("请输入" + tip + ":");
System.out.println(help);
if (scanner.hasNext()) {
String ipt = scanner.next();
if (StringUtils.isNotBlank(ipt)) {
return ipt;
}
}
throw new MybatisPlusException("请输入正确的" + tip + "!");
}
}
生成代码
运行上面的main
方法,输入需要生成相关代码的表名。
t_user,t_voucher,t_seckill_voucher,t_voucher_order
每张表都会生成entity、mapper、service、controller等代码。
添加主键注解
代码生成器没有对SeckillVoucher
和VoucherOrder
表的主键生成TableId
主键,这会导致无法使用mybatis-plus
自带的与主键相关的方法,比如selectById
方法。因此需要我们手动补上主键注解。
/**
* 关联的优惠券的id
*/
@TableId(value = "voucher_id", type = IdType.INPUT)
private Long voucherId;
/**
* 主键
*/
@TableId(value = "id", type = IdType.INPUT)
private Long id;
添加配置类
写一个mybatis-plus的自动插入的配置类,更新时间和创建时间不需要我们手动new
,可以自动填充。
查看代码
@Component
public class MyMetaObjectHandler implements MetaObjectHandler {
// 插入时的填充策略
@Override
public void insertFill(MetaObject metaObject) {
// setFieldValByName(String fieldName, Object fieldVal, MetaObject metaObject
this.setFieldValByName("createTime", new Date(), metaObject);
this.setFieldValByName("updateTime", new Date(), metaObject);
}
// 更新时的填充策略
@Override
public void updateFill(MetaObject metaObject) {
this.setFieldValByName("updateTime", new Date(), metaObject);
}
}
这是分页拦截器,可有可无。
查看代码
@MapperScan("com.cengxuyuan.rediscache.mapper")// 扫描 mapper 文件夹
@EnableTransactionManagement
@Configuration // 配置类
public class MyBatisPlusConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor mybatisPlusInterceptor = new MybatisPlusInterceptor();
mybatisPlusInterceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return mybatisPlusInterceptor;
}
}
数据传输对象
定义一个用于前后端交互的数据传输对象,十分常用。
查看代码
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Result {
private Boolean success;
private String msg;
private Object data;
public static Result success(){
return new Result(true, null, null);
}
public static Result success(Object data){
return new Result(true, null, data);
}
public static Result fail(String msg){
return new Result(false, msg, null);
}
}
UserHolder
是用来存储每个请求的用户ID的,用来分辨不同线程之间的用户。
查看代码
public class UserHolder {
public static ThreadLocal<Long> threadLocal = new ThreadLocal<>();
public static void setCurrentId(Long id) {
threadLocal.set(id);
}
public static Long getCurrentId() {
return threadLocal.get();
}
public static void removeCurrentId() {
threadLocal.remove();
}
}
实现秒杀下单
定义全局唯一Id
在项目中生成全局唯一ID是非常重要的,尤其在高并发情况下更是需要注意。虽然我们经常使用UUID或者自定义规则来生成唯一ID,但存在并发性问题。为了解决这个问题,我们可以使用Redis来实现分布式全局唯一ID的生成。
为什么不能直接使用数据库的自增ID呢?首先,数据库自增ID是有序增长的,容易被猜测到系统中订单数量的上限,同时存在安全隐患。其次,自增ID缺乏信息区分度,不适合多业务交互和用户展示。此外,在分库分表时自增ID可能会重复,造成数据混乱。
Redis通过提供自增原子命令来实现分布式唯一ID的生成,保证了生成的ID是唯一有序的。
引入依赖
引入redis
的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置文件
查看代码
data:
redis:
host: localhost
port: 6379
lettuce:
pool:
max-active: 10
max-idle: 10
min-idle: 1
time-between-eviction-runs: 10s
实现类RedisIdWorker
ID的组成部分:符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
查看代码
@Component
public class RedisIdWorker {
/**
* 开始时间戳
*/
private static final long BEGIN_TIMESTAMP = 1711023321L;
/**
* 序列号的位数
*/
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix) {
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 2.生成序列号
// 2.1.获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2.自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3.拼接并返回
return timestamp << COUNT_BITS | count;
}
}
实现订单逻辑
VoucherOrderController
新增一个API接口,购买ID为${id}
秒杀券
查看代码
@RequestMapping("/voucherOrder")
public class VoucherOrderController {
@Resource
private VoucherOrderService voucherOrderService;
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherOrderService.seckillVoucher(voucherId);
}
}
VoucherOrderService
实现秒杀业务方法,下面会逐个讲解。
查看代码
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements VoucherOrderService {
@Resource
private SeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getCurrentId();
synchronized (userId.toString().intern()) {
VoucherOrderService proxy = (VoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getCurrentId();
// 5.1.查询订单
long count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.success(orderId);
}
}
判断秒杀时间和库存
首先我们需要判断是否在秒杀券的开放时间内,是否还存在库存。
查看代码
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
一人一单
为了更符合实际业务需求,我们的秒杀券只能一人一单,不能重复购买,具体如何实现?
- 对用户ID加synchronized锁,
.toString()
方法内部实现是new一个String,需要使用.intern()
方法 - 锁的范围是【查询该用户是否存在订单,下单完成或存在订单】,保证了每个用户只有一个请求可以进入锁的范围。
查看代码
Long userId = UserHolder.getCurrentId();
synchronized (userId.toString().intern()) {
VoucherOrderService proxy = (VoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getCurrentId();
// 5.1.查询订单
long count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
锁的释放是在事务提交之后,也就是需要锁住整个事务方法。
使用事务
下单操作需要修改两个数据库seckillVoucher
和voucherOrder
,使用事务保持原子性。
添加事务的注解@Transactional
@Transactional
public Result createVoucherOrder(Long voucherId)
在Spring框架中,事务是通过Spring AOP的代理对象来实现的。如果在方法内部直接调用其他方法而不是调用经过代理的方法,那么事务逻辑将不会生效。因此,我们需要确保在调用方法时使用代理对象而不是直接调用原始方法。
synchronized (userId.toString().intern()) {
VoucherOrderService proxy = (VoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
为了使用AopContext来获取代理对象,我们需要引入AspectJ的依赖,以便支持AOP的相关功能。在项目的pom.xml
文件中增加以下依赖
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
我们需要在启动类上增加注解@EnableAspectJAutoProxy(exposeProxy = true)
,这样代理对象才能被外部获取到。
@EnableAspectJAutoProxy(exposeProxy = true)
@SpringBootApplication
public class RedisCacheApplication {
public static void main(String[] args) {
SpringApplication.run(RedisCacheApplication.class, args);
}
}
解决超卖
使用乐观锁机制来确保并发情况下的数据一致性。
通过条件eq("voucher_id", voucherId).gt("stock", 0)
来限定更新条件为指定的代金券且库存大于0。
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
测试
构造数据
为了测试超卖情况,我们需要构造一个模拟多用户同时下单的场景。
我们可以定义一个拦截器,在每次请求中为用户设置一个随机的userId
来模拟不同用户的操作。
定义拦截器
在MvcConfig类中添加一个静态的计数器变量,然后在每次请求时递增这个计数器,并将其设置为当前用户ID。
查看代码
@Configuration
public class MvcConfig implements WebMvcConfigurer {
private static final AtomicLong userId = new AtomicLong(0);
@Override
public void addInterceptors(InterceptorRegistry registry) {
// 用户Id拦截器,用来模拟已登录用户
registry.addInterceptor(new HandlerInterceptor() {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
userId.getAndUpdate(id -> (id + 1) % 1001);
UserHolder.setCurrentId(userId.get());
return HandlerInterceptor.super.preHandle(request, response, handler);
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeCurrentId();
HandlerInterceptor.super.afterCompletion(request, response, handler, ex);
}
});
}
}
以上拦截器会为每个请求设置一个在1
到1000
之间的随机userId
,以模拟不同用户的下单操作。
测试超卖
接下来,我们可以使用 JMeter 来模拟 1000 个线程发起请求,代表 1000 个用户抢购秒杀券的场景。
其中我们的秒杀券库存只有100张。
订单表中没有订单
现在启动线程组,看到1000个线程都没有出现异常。
库存为0,不是负数,正常
订单数量100,正常。
测试一人一单
在数据库中将秒杀券库存数量恢复到初始状态,并删除所有订单数据,进行一人一单测试。
此时我们拦截器中的userId
设置为0到29
,
userId.getAndUpdate(id -> (id + 1) % 30);
接着,使用 JMeter 仍然启动 1000 个线程,代表这 30 个用户共计抢购 1000 次。
启动测试
库存70,抢了30张,正常。
订单30条,uerId
不重复,正常。
至此,通过上述测试,结合使用了synchronized
锁、乐观锁和事务来解决秒杀下单场景中的一人一单和超卖问题。这些技术手段可以有效地保证系统在高并发场景下的稳定性和数据一致性,确保秒杀活动的顺利进行。
分布式
在解决单体项目中的秒杀问题时,我们采用了单个 JVM 的方案。然而,当我们需要多个服务实例,也就是多个不同的 JVM 时,使用 synchronized 锁就不再适用了。
为了解决这个问题,我们需要引入多个进程即 JVM 可见的锁。一种常见的做法是利用 Redis 作为中间件,供其他 JVM 访问。这样,我们可以将锁的信息存储在 Redis 中,从而实现多个 JVM 之间的锁同步。
搭建集群
启动多个服务
我们需要复制同一个项目,并在不同的端口上启动这些服务。
编写测试接口
为了测试服务器之间的区别,我们可以编写一个简单的接口,返回当前服务器的端口信息。
查看代码
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private Environment env;
@GetMapping("/nginx")
public Result testNginx(){
return Result.success(env.getProperty("local.server.port"));
}
}
修改服务
复制一个服务,使用命令的方式覆盖服务端口为8082
修改原始服务的端口为8081,预留8080端口给nginx作为反向代理,达到访问nginx代理服务器时跳转到指定服务器的目的。
搭建nginx
接下来,我们需要搭建 Nginx 作为反向代理服务器,实现对不同服务的负载均衡。
进入官网下载windows版本的nginx。
https://nginx.org/en/download.html
conf
:存在Nginx配置文件的目录docs
:存放Nginx文档的目录html
:存放静态html文件的目录logs
:存放Nginx日志的目录temp
:存放临时文件的目录
修改端口
nginx的配置文件是conf目录下的nginx.conf,默认配置的nginx监听的端口为80,如果80端口被占用可以修改为未被占用的端口即可
启动nginx
在文件路径栏输入cmd,回车打开命令行。
执行nginx.exe
后,关闭cmd窗口
关闭niginx
通过 cmd
命令窗口启动 nginx
后,关闭 cmd
窗口并不会结束 nginx
进程。为了正确关闭 nginx
进程,可以采用以下两种方法:
方法一:使用命令行方式
- 快速停止
nginx
进程:在命令行中输入nginx -s stop
- 完整有序地停止
nginx
进程:在命令行中输入nginx -s quit
方法二:使用任务管理器方式
- 使用命令
taskkill /f /t /im nginx.exe
强制结束nginx
进程
配置nginx
在 Nginx 的 conf/nginx.conf
文件中进行配置,指定 Nginx 监听的端口和后端服务器信息。
查看代码
server {
listen 8080;
server_name localhost;
location /api {
default_type application/json;
keepalive_timeout 30s;
keepalive_requests 1000;
rewrite /api(/.*) $1 break;
proxy_pass_request_headers on;
proxy_next_upstream error timeout;
proxy_pass http://backend;
}
}
upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
}
这个配置实现了对名为"localhost"的服务器监听端口8080,当收到以"/api"开头的请求时,Nginx 会根据负载均衡策略将请求转发给后端服务器。
重启nginx
nginx.exe -s reload
测试集群
访问http://localhost:8080/api/user/nginx
查看返回信息
刷新,再一次请求
可以看到返回的端口信息会轮流显示为8081和8082,说明负载均衡配置生效,同时知道了nginx默认采用轮询的方式实现负载均衡。
分布式锁
分布式锁是在分布式系统中用来控制对共享资源的访问的一种同步机制。
在这个项目,我们使用基于Redis的分布式锁来实现对共享资源的控制。
锁的基本接口
我们定义一个基本的分布式锁接口 ILock
,其中包括尝试获取锁和释放锁的操作。
查看代码
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功; false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
使用redis实现锁
这是完整代码,下面会详细讲解。
查看代码
public class RedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID() + "-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
}
锁的名称
在创建锁的对象时,我们将业务相关的名称传入并保存到Java代码的 name
变量中。
锁的名称在redis中是作为key,而value是唯一的,保存的是锁的标识,也就是谁拿到了key的锁。
锁的标识
我们需要区分是哪个线程加的锁,否则会出现误删锁的情况。
线程ID
在JVM内部,每创建一个线程,线程的ID都会递增。
如果存在多个JVM,创建的线程ID可能会出现冲突的情况。
因此线程ID无法唯一标识一个锁。
具体的说是线程ID无法区分是由哪个JVM创建。
UUID
我们使用UUID来区分不同的JVM,每个UUID在服务启动时创建。
因此为了区分不同JVM和线程,我们结合使用了UUID和线程ID作为锁的标识。UUID用于区分不同的JVM,而线程ID则用于区分同一JVM内的线程。
加锁
加锁操作是利用Redis的 setnx
命令来保证原子性,确保在一个原子性操作内完成对锁的设置。
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
对当前业务**【锁的名称】尝试加上【JVM UUID + 线程ID】**的值。
如果不存在锁,则成功加锁。否则失败。
删锁
为了避免误删锁的情况,我们需要在释放锁时判断是否是加锁的同一线程,即同一个JVM的UUID和同一个线程ID。
同时需要保证判断锁、删锁的过程是原子性的,否则同样会出现误删的情况,因为在两个动作之间,如果【当前线程判断锁成功】后【当前锁超时释放】,【其他线程成功加了锁】,【当前线程就会误删】其他线程的锁。
使用lua脚本可以保证redis的原子性操作。
-- 判断线程标识与锁中的标识是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del', KEYS[1])
end
return 0
查看代码
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
threadId);
}
缺点
上面是基于setnx实现的分布式锁,已经可以解决大部分需求,不过存在一下几个问题。如果对锁的要求更高的情况下,可以考虑使用Redission等专业的分布式锁框架。
重入问题
重入问题指的是一个线程在已经持有锁的情况下,能否再次进入同一把锁保护的代码块。可重入锁的重要性在于避免死锁现象。
例如,在Java中的HashTable,其方法都是同步的。如果一个同步方法内部调用了另一个同步方法,如果没有可重入性,将会导致死锁。因此,可重入锁的关键作用是防止死锁。现有的synchronized关键字和Lock接口提供的锁都是可重入的。
不可重试
当前的分布式锁实现仅允许线程尝试一次获取锁,这在实际应用中可能不够灵活。
理想情况下,当线程获取锁失败后,应该有机会重新尝试获取锁,而不是直接放弃。比如实现tryAcquire
方法,允许在一段时间内自旋尝试重新获取锁。
超时释放
虽然我们通过设置过期时间来防止死锁,但如果程序因为某些原因长时间卡顿,即使使用了Lua脚本来避免误删其他线程的锁,仍然存在安全隐患。比如我们可以增加续约功能,如果持锁线程未执行完,则可以延长超时时间。
主从一致性
如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
修改秒杀下单
在使用分布式锁之前,我们使用synchronized
关键字来控制秒杀下单的并发访问。
而现在我们将其改为使用自定义的Redis锁。
synchronized (userId.toString().intern()) {
VoucherOrderService proxy = (VoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
修改后
查看代码
RedisLock redisLock = new RedisLock("order:" + userId, stringRedisTemplate);
boolean isLock = redisLock.tryLock(5);
if (!isLock) {
return Result.fail("不允许重复下单!");
}
try {
VoucherOrderService proxy = (VoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
redisLock.unlock();
}
首先需要创建锁对象,
然后尝试获取锁,并设置最大等待时间为5秒。
其他逻辑和单体synchronized
锁的几乎一致。
测试分布式锁
请求路径需要多加/api
以让nginx进行转发和负载均衡。
其他的结果和秒杀下单时的测试结果相同。
我们成功地将原本基于单体架构的锁机制迁移到了基于Redis的分布式锁,确保了系统的高可用性和并发访问的安全性。
秒杀优化
新增秒杀券
现在我们新增一个业务功能,就是添加新的秒杀券。
新增秒杀券传输对象
秒杀券包含优惠券信息的通用信息和秒杀券信息的额外信息。
为了存储优惠券信息和秒杀券信息,我们创建一个名为SeckillVoucherDTO
的传输对象。
使用这一个对象就可以存下两个表对应的信息了。
查看代码
@Data
public class SeckillVoucherDTO {
private Long id;
private String title;
private Long payValue;
private Long actualValue;
private Byte type;
private Byte status;
private Integer stock;
private LocalDateTime beginTime;
private LocalDateTime endTime;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
新增秒杀券接口
在VoucherController
中,我们新增了一个接口/voucher/seckill
用于添加秒杀券。这个接口接收前端传来的SeckillVoucherDTO
对象,并返回优惠券的ID。
查看代码
@RestController
@RequestMapping("/voucher")
public class VoucherController {
@Resource
private VoucherService voucherService;
/**
* 新增秒杀券
* @param seckillVoucherDTO 优惠券信息,包含秒杀信息
* @return 优惠券id
*/
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody SeckillVoucherDTO seckillVoucherDTO) {
long voucherId = voucherService.addSeckillVoucher(seckillVoucherDTO);
return Result.success(voucherId);
}
}
实现新增秒杀券服务
在VoucherServiceImpl
中,我们实现添加秒杀券的服务。该方法需要使用事务来保证原子性。具体实现包括以下步骤:
- 将优惠券信息保存到优惠券表中
- 将额外的秒杀信息保存到秒杀券表中
- 将库存保存到Redis缓存中,这是后续秒杀优化的前提
查看代码
@Service
public class VoucherServiceImpl extends ServiceImpl<VoucherMapper, Voucher> implements VoucherService {
@Resource
private SeckillVoucherService seckillVoucherService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
@Transactional
public long addSeckillVoucher(SeckillVoucherDTO seckillVoucherDTO) {
Voucher voucher = new Voucher();
BeanUtils.copyProperties(seckillVoucherDTO, voucher);
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(seckillVoucherDTO.getStock());
seckillVoucher.setBeginTime(seckillVoucherDTO.getBeginTime());
seckillVoucher.setEndTime(seckillVoucherDTO.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
stringRedisTemplate.opsForValue().set("seckill:stock:" + voucher.getId(), seckillVoucher.getStock().toString());
return voucher.getId();
}
}
测试
我们通过向http://localhost:8080/api/voucher/seckill
发送JSON数据来测试新增秒杀券功能。
查看代码
{
"title": "优惠券C",
"payValue": 10000,
"actualValue": 5000,
"type": 1,
"status": 1,
"stock": 100,
"beginTime": "2024-03-20T12:00:00",
"endTime": "2024-04-30T23:59:59"
}
返回成功信息。
数据库表更新正常
redis缓存增加也正常。
我们成功地实现了新增秒杀券的功能,并验证了其正常运行。
优化秒杀资格判断
我们的优化目标是将秒杀资格判断和下单这两个业务异步进行。
编写lua脚本
我们编写一个 Lua 脚本来判断库存是否充足以及每个用户是否只能下单一次,以保证原子性。
查看代码
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
修改秒杀资格判断
接着,我们将之前用 Java 代码判断库存和订单情况的逻辑改为执行 Lua 脚本。
读取脚本
VoucherOrderServiceImpl
中增加:
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
静态代码块会在类被加载时执行,并且只会执行一次。因此,lua脚本在类加载时就会被读取。
lua秒杀资格判断
这里只是判断了库存是否充足和是否一人一单,
如果都成立,需要将订单保存到队列中。这里后面再实现。
tips:订单发送也可以写到lua脚本中。
查看代码
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getCurrentId();
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 2.2.为0 ,有购买资格,把下单信息保存到队列
long orderId = redisIdWorker.nextId("order");
// TODO 保存订单到队列中
// 3.返回订单id
return Result.success(orderId);
}
创建Stream消息队列
在redis的命令行中输入
XGROUP CREATE stream.orders redis-cache 0 MKSTREAM
这个Redis命令是用于创建一个新的消费者组(Consumer Group)并且创建一个新的流(Stream)。具体解释如下:
- XGROUP CREATE: 这部分表示创建一个消费者组。
- stream.orders: 表示要操作的流的名称,在这里是"stream.orders"。
- redis-cache: 表示消费者组的名称,在这里是"redis-cache"。
- 0: 表示要从哪个消息开始读取,0表示从最旧的消息开始读取。
- MKSTREAM: 表示如果指定的流不存在,则创建一个新的流。
订单发送脚本
在之前的lua脚本中进行修改。
增加一个步骤:发送消息到队列中(使用了Redis的Stream数据结构)。这个步骤将订单信息发送到队列中,实现了异步处理订单的功能,可以提高系统的并发处理能力和响应速度。
查看代码
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
修改业务逻辑
订单发送也可以写到lua脚本中,但需要多传一个订单ID
的参数,向stream消息队列中发送订单。
查看代码
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getCurrentId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(),
String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 下单信息已经在lua脚本中发送到redis的消息队列中
// 3.返回订单id
return Result.success(orderId);
}
订单处理
创建线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
使用一个单线程的ExecutorService来处理订单。
创建Runnable任务
这个Runnable任务,即VoucherOrderHandler,在其run方法中,会不断地从Redis Stream中读取订单信息。这个读取操作是阻塞的,如果在2秒内没有读取到任何信息,它会继续下一次循环。
读取到订单信息后,它会从上次消费的位置开始读取出一条记录,然后将这个记录的值转换为VoucherOrder对象,然后调用createVoucherOrder方法来创建订单。
创建订单后,它会向Redis发送一个确认消息,表示这条订单信息已经被处理。
如果在处理订单时发生异常,调用handlePendingList方法来处理挂起的列表。这个方法的逻辑与run方法类似,但它是从消息队列的头部开始尝试读取订单信息。
查看代码
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP redis-cache consumer-name COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("redis-cache", "order-consumer"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "redis-cache", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("redis-cache", "order-consumer"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "redis-cache", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
还需要修改createVoucherOrder
方法的参数,现在传入的是VoucherOrder voucherOrder
,而不是long
类型了,而且用户ID需要在voucherOrder
中获取,而不是在本线程中。
上面的代码中,为了方便使用了BeanUtil.fillBeanWithMap
方法,这是hutool工具类中的方法,因此需要引入hutool依赖。
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.17</version>
</dependency>
启动线程池
线程池在类初始化时启动,并提交上面实现的Runnable任务。
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
测试
恢复数据库数据,开始测试秒杀新增的ID为1026的秒杀券。
同样是使用1000个线程,模拟30个用户进行秒杀。
可以看到,redis缓存中的库存正确
数据库中的库存数量也正确
使用Rabbitmq优化
使用Rabbitmq
优化后的架构流程如下:
引入Rabbitmq
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置
增加yaml
配置。
查看代码
rabbitmq:
username: cengxuyuan
password: xxxxroot
host: 192.168.101.100
port: 5672
seckill: # 自定义属性:mq队列长度
order:
maxQueueLength: 1000
thread: # 自定义:线程池属性
pool:
core:
size: 20
max:
size: 60
queue:
capacity: 1000
keep:
alive:
time: 10
编写配置类RabbitConfig
。
配置一个订单交换机,绑定两个队列,实现将收到的设置有TTL的消息分别发送到订单队列和延迟队列中,其中订单队列有消费者监听,收到消息后会使用线程池将订单存入数据库中,在这个过程,如果消息超时了会发送到死信交换机中。此外,延迟队列同样会收到订单交换机的消息,但是没有消费者监听,因此消息必定堆积在延迟队列中等待到超时,然后发送到死信交换机中。
死信交换机绑定了一个死信队列,有消费者监听,但是会判断消息死亡
的类型,如果是超时,则会执行线程池取消订单任务,如果是超过队列长度,则会执行存入订单任务。
查看代码
@Configuration
public class RabbitConfig {
@Value("${seckill.order.maxQueueLength}")
private long seckillOrderMaxQueueLength;
public static final String SECKILL_ORDER_QUEUE = "seckill.order.queue";
public static final String SECKILL_ORDER_EXCHANGE = "seckill.order.exchange";
public static final String SECKILL_ORDER_KEY = "seckill.order.key";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
public static final String SECKILL_ORDER_DEAD_LETTER_QUEUE = "seckill.order.dead.letter.queue";
public static final String SECKILL_ORDER_DEAD_LETTER_KEY = "seckill.order.dead.letter.key";
public static final String SECKILL_ORDER_DELAY_QUEUE = "seckill.order.delay.queue";
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setCreateMessageIds(true);
rabbitTemplate.setMessageConverter(converter);
return rabbitTemplate;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setCreateMessageIds(true);
factory.setMessageConverter(converter);
return factory;
}
@Bean
public Queue seckillOrderQueue() {
return QueueBuilder.durable(SECKILL_ORDER_QUEUE)
.deadLetterExchange(DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey(SECKILL_ORDER_DEAD_LETTER_KEY)
.maxLength(seckillOrderMaxQueueLength)
// .ttl(10000) // 设置队列中消息的过期时间为10000毫秒(10秒)
.build();
}
@Bean
public DirectExchange seckillOrderExchange() {
return new DirectExchange(SECKILL_ORDER_EXCHANGE, true, false);
}
@Bean
public Binding bindingSeckillOrder(
@Qualifier("seckillOrderQueue") Queue seckillOrderQueue,
@Qualifier("seckillOrderExchange") DirectExchange seckillOrderExchange
) {
//bind队列to交换机中with路由key
return BindingBuilder.bind(seckillOrderQueue).to(seckillOrderExchange).with(SECKILL_ORDER_KEY);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明死信队列
@Bean("seckillOrderDeadLetterQueue")
public Queue seckillOrderDeadLetterQueue() {
return new Queue(SECKILL_ORDER_DEAD_LETTER_QUEUE);
}
@Bean
public Binding bindingSeckillOrder2DeadLetter(
@Qualifier("seckillOrderDeadLetterQueue") Queue seckillOrderDeadLetterQueue,
@Qualifier("deadLetterExchange") DirectExchange deadLetterExchange
) {
return BindingBuilder.bind(seckillOrderDeadLetterQueue).to(deadLetterExchange).with(SECKILL_ORDER_DEAD_LETTER_KEY);
}
@Bean
public Queue seckillOrderDelayQueue() {
return QueueBuilder.durable(SECKILL_ORDER_DELAY_QUEUE)
.deadLetterExchange(DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey(SECKILL_ORDER_DEAD_LETTER_KEY)
.maxLength(seckillOrderMaxQueueLength)
.build();
}
@Bean
public Binding bindingDelayQueue2SeckillOrderExchange(
@Qualifier("seckillOrderDelayQueue") Queue seckillOrderDelayQueue,
@Qualifier("seckillOrderExchange") DirectExchange seckillOrderExchange
) {
return BindingBuilder.bind(seckillOrderDelayQueue).to(seckillOrderExchange).with(SECKILL_ORDER_KEY);
}
}
编写线程池配置类ThreadPoolConfig
。
查看代码
@Configuration
public class ThreadPoolConfig {
@Value("${thread.pool.core.size}")
private int corePoolSize;
@Value("${thread.pool.max.size}")
private int maximumPoolSize;
@Value("${thread.pool.keep.alive.time}")
private long keepAliveTime;
@Value("${thread.pool.queue.capacity}")
private int queueCapacity;
@Bean
public ThreadPoolExecutor seckillOrderThreadPoolExecutor() {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(queueCapacity);
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
Executors.defaultThreadFactory(),
handler
);
}
}
编写生产者消费者
生产者
这是一个sendVoucherOrderWithExpiration
方法负责将 VoucherOrder
对象转换为 JSON 格式,并设置过期时间(10分钟),然后将这些消息发送到 RabbitMQ 队列。
通过使用线程池,确保消息发送过程的高效性和非阻塞性。
查看代码
@Service
@Slf4j
public class RabbitProducer {
@Autowired
RabbitTemplate rabbitTemplate;
@Resource
ThreadPoolExecutor seckillOrderThreadPoolExecutor;
public void sendVoucherOrderWithExpiration(VoucherOrder voucherOrder) {
seckillOrderThreadPoolExecutor.execute(() -> {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("600000"); // 设置消息的过期时间1000毫秒(1秒)600000
messageProperties.setContentType(CONTENT_TYPE_JSON);
Message message = new Message(JSONUtil.parse(voucherOrder).toString().getBytes(StandardCharsets.UTF_8), messageProperties);
log.info("发送订单:{}", message);
rabbitTemplate.send(SECKILL_ORDER_EXCHANGE, SECKILL_ORDER_KEY, message);
}
);
}
}
消费者
定义两个线程池任务,一个CreateOrderTask
是创建未支付订单到数据库中,另一个CancelOrderTask
是取消未支付订单。
查看代码
class CreateOrderTask implements Runnable {
private final Message message;
public CreateOrderTask(Message message) {
this.message = message;
}
@Override
public void run() {
VoucherOrder voucherOrder = JSONUtil.toBean(new String(message.getBody(), StandardCharsets.UTF_8), VoucherOrder.class);
try {
log.info("处理订单:{}", voucherOrder);
voucherOrderService.createVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("订单{}异常", voucherOrder, e);
}
}
}
class CancelOrderTask implements Runnable {
private final Message message;
public CancelOrderTask(Message message) {
this.message = message;
}
@Override
public void run() {
VoucherOrder voucherOrder = JSONUtil.toBean(new String(message.getBody(), StandardCharsets.UTF_8), VoucherOrder.class);
log.info("取消订单:{}", voucherOrder);
voucherOrderService.cancelVoucherOrder(voucherOrder);
}
}
创建两个消费者,分别监听未支付订单队列,和死信队列。
查看代码
@Service
@Slf4j
public class RabbitConsumer {
@Resource
VoucherOrderService voucherOrderService;
@Resource
ThreadPoolExecutor seckillOrderThreadPoolExecutor;
@RabbitListener(queues = {SECKILL_ORDER_QUEUE})
public void handleVoucherOrderMessage(Message message) {
seckillOrderThreadPoolExecutor.execute(new CreateOrderTask(message));
}
@RabbitListener(queues = {SECKILL_ORDER_DEAD_LETTER_QUEUE})
public void handleVoucherOrderDeadMessage(Message message) {
String deathReason = analyzeDeadLetterReason(message);
log.info("收到{}死信:{}", deathReason, message);
if ("expired".equals(deathReason)) {
seckillOrderThreadPoolExecutor.execute(new CancelOrderTask(message));
} else if ("maxlen".equals(deathReason)) {
seckillOrderThreadPoolExecutor.execute(new CreateOrderTask(message));
} else {
log.error("{}死信未处理,", deathReason);
}
}
private String analyzeDeadLetterReason(Message message) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
if (headers.containsKey("x-first-death-reason")) {
return (String) headers.get("x-first-death-reason"); // expired, maxlen
}
StringBuilder reason = new StringBuilder();
if (headers.containsKey("x-death")) {
@SuppressWarnings("unchecked")
List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
for (Map<String, Object> death : deathList) {
reason.append((String) death.get("reason"));
}
}
return reason.toString();
}
}
修改秒杀逻辑
删除原有seckill.lua
脚本中的消息队列相关的命令。
在java代码的seckillVoucher
方法中将订单消息异步发送到mq中。
查看代码
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getCurrentId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(),
String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// --------------------------------
// 3.异步发送订单消息到mq中
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
voucherOrder.setCreateTime(LocalDateTime.now());
rabbitProducer.sendVoucherOrderWithExpiration(voucherOrder);
// --------------------------------
// 4.返回订单id
return Result.success(orderId);
}
然后增加取消订单的逻辑。
如果未支付的订单存在数据库中,将未支付
状态改为取消
状态。
如果不存在数据中,直接存入一个取消
状态的订单数据。
查看代码
@Transactional
public Result cancelVoucherOrder(VoucherOrder voucherOrder) {
val id = voucherOrder.getId();
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
// 查询订单
VoucherOrder dbVoucherOrder = getOne(new LambdaQueryWrapper<VoucherOrder>()
.eq(VoucherOrder::getUserId, userId)
.eq(VoucherOrder::getVoucherId, voucherId));
// 判断是否存在未支付的订单
if (dbVoucherOrder != null && dbVoucherOrder.getStatus() == 1) {
log.info("用户没有该未支付的订单:{}", voucherOrder);
return Result.fail("用户没有该未支付的订单");
}
// 增加库存
stringRedisTemplate.opsForValue().increment("seckill:stock:" + id);
boolean success = seckillVoucherService.update()
.setSql("stock = stock + 1") // set stock = stock + 1
.eq("voucher_id", voucherId)
.update();
if (!success) {
log.info("增加库存失败:{}", voucherOrder);
return Result.fail("增加库存失败");
}
if (dbVoucherOrder == null) { // 如果还没更新数据库,直接插入
voucherOrder.setStatus((byte) 4);
save(voucherOrder);
log.info("保存取消的订单:{}", voucherOrder);
} else { // 否则更新
update().set("status", 4).eq("id", id);
log.info("取消未支付的订单:{}", voucherOrder);
}
// 返回id
return Result.success(id);
}
添加缓存
优惠券列表查询
为了方便演示,这里只查询t_voucher
表的所有信息,没有联表秒杀券。
Controller
在VoucherController
添加如下方法
@GetMapping("/list")
public Result getVoucherList(){
List<Voucher> vouchers = voucherService.getVoucherList();
if(vouchers.isEmpty()){
return Result.fail("不存在优惠券");
}
return Result.success(vouchers);
}
Service
在VoucherService
新增接口方法,然后在实现类VoucherServiceImpl
中实现如下
@Override
public List<Voucher> getVoucherList() {
return list();
}
测试
在浏览器测试接口是否正常,返回的数据很难看,不过正常。
在postman中测试,数据好看多了,接口正常。
接着使用鸡米特,对这个接口进行测压,看看没加缓存前接口的极限在哪里。
使用1000个线程测压。
记录一下结果,后面和加缓存的结果对比一下。
添加缓存
增加缓存逻辑
修改VoucherServiceImpl
中的getVoucherList
查询方法。
查看代码
@Override
public List<Voucher> getVoucherList() {
// 先从redis中查缓存
String voucherListJson = stringRedisTemplate.opsForValue().get("cache:voucherList");
if (StrUtil.isNotBlank(voucherListJson)) {
// 存在缓存数据,直接返回
return JSONUtil.toList(voucherListJson, Voucher.class);
}
// 不存在缓存数据,查询数据库
List<Voucher> voucherList = list();
if (!voucherList.isEmpty()) {
// 查询到的数据写入redis缓存
stringRedisTemplate.opsForValue().set("cache:voucherList", JSONUtil.toJsonStr(voucherList));
}
return voucherList;
}
测试
启动服务,访问一次http://localhost:8080/api/voucher/list
,然后查看redis数据库,可以发现优惠券信息已经写入缓存。
接着使用鸡米特,对这个接口进行测压,看看加了缓存后接口的极限在哪里。
同样使用1000个线程测压。
对比结果,发现有所提升,但不多,可能线程的量不够。。
缓存更新
如果缓存没有设置过期时间,随机缓存数据越来越多,当redis内存不足时,redis会进行内存淘汰,自动删除一些数据。但更好的是我们给缓存设置过期时间,让redis的内存利用率更高些。
添加过期时间
在VoucherServiceImpl
的getVoucherList()
方法中,修改设置缓存的逻辑,添加过期时间10分钟。
stringRedisTemplate.opsForValue().set("cache:voucherList", JSONUtil.toJsonStr(voucherList),10L, TimeUnit.MINUTES);
双写一致性
如果我们需要修改缓存中相关的数据,比如新增了一种优惠券,为了缓存和数据库的数据保存一致性,我们主动更新数据库的同时,需要更新缓存。
为了双写一致性,我们采取的策略是先修改数据库,再删缓存。
我们模仿数据更新的操作就是之前的新增秒杀券,其中使用了mybatis-plus提供的save方法来向数据库写入一条优惠券信息。
前面说了,为了简便,我们只关注优惠券表,秒杀券相关的逻辑我们先不改动。
因此我们将保存优惠券
的逻辑写成一个方法,在方法里加上缓存相关的逻辑。
测试
目前redis缓存中存在3种优惠券的信息,
接下来,我们新增秒杀券,看看是否执行了修改数据、删除缓存操作。
新增秒杀券D,
数据库添加正常
redis中的优惠券列表缓存已删除
查询优惠券列表,新增的优惠券D可以查询到。
同时redis中的列表缓存设置了过期时间,新增的优惠券D也可以查询到。
缓存穿透
缓存穿透是指客户端请求的数据既不在缓存中,也不在数据库中,导致缓存无法起到作用,所有这些请求都会直接打到数据库。由于数据库的并发处理能力有限,大量的请求同时访问这种不存在的数据会给数据库带来压力。
缓存空对象
一种简单的解决方案是,即使数据在数据库中也不存在,我们也将这个“空对象”存入到Redis中。这样,下次用户访问这个不存在的数据时,Redis中就能找到,从而避免继续访问数据库。
实现
查询单一优惠券
区别于查询优惠券列表,我们新增一个API接口,用于根据优惠券ID返回该优惠券的信息。
VoucherController
@GetMapping("/{id}")
public Result getVoucher(@PathVariable("id") long voucherId){
return voucherService.getVoucher(voucherId);
}
VoucherServiceImpl
查看代码
public Result getVoucher(long voucherId) {
String voucherJson = stringRedisTemplate.opsForValue().get("cache:voucher:" + voucherId);
if (StrUtil.isNotBlank(voucherJson)) {
Voucher voucher = JSONUtil.toBean(voucherJson, Voucher.class);
return Result.success(voucher);
}
if (voucherJson != null) {
// 缓存命中空值, 返回错误信息
return Result.fail("不存在该优惠券!");
}
Voucher voucher = getById(voucherId);
if (voucher == null) {
// 数据库不存在, 缓存一个空字符串值
stringRedisTemplate.opsForValue().set("cache:voucher:" + voucherId, "", 10L, TimeUnit.MINUTES);
return Result.fail("不存在该优惠券!");
}
stringRedisTemplate.opsForValue().set("cache:voucher:" + voucherId, JSONUtil.toJsonStr(voucher), 10L, TimeUnit.MINUTES);
return Result.success(voucher);
}
上面的代码的逻辑如下:
- 检查Redis缓存:首先会尝试从Redis缓存中根据voucherId获取优惠券信息。
- 缓存命中:如果Redis缓存中存在该优惠券信息,返回结果。
- 缓存命中空值:如果Redis中不存在该信息,但不为null,则表示缓存中命中了空值,直接返回错误信息。注意null和空值不同!!
- 数据库查询:如果在Redis中是null,说明第一次未找到该优惠券信息,则会从数据库中查询该优惠券信息。
- 数据库不存在:如果数据库中也不存在该优惠券信息,将在Redis中缓存一个空字符串值,然后返回错误信息。这会使得第二次查询该数据在缓存中命中空值(过期时间内)。
- 数据库存在:如果数据库中存在该优惠券信息,则将其存入Redis缓存中,并设置缓存时间为10分钟,然后返回结果。
通过对比上述代码和查询优惠券列表的代码,可以清晰地了解如何有效解决缓存穿透问题。
测试
首先查询数据库中存在的数据,查询ID为1024的优惠券
接下来查询数据库中不存在的数据,显然缓存中也不存在。
查询ID为2049
的优惠券
此时redis为这个不存在的查询缓存了空值。
再次查询时,就会返回redis的空值了,进而返回报错信息。
布隆过滤
另一种解决方案是使用布隆过滤器,通过哈希思想判断要查询的数据是否存在。如果布隆过滤器判断存在,则请求会继续访问Redis,即使此时Redis中的数据已过期,但数据库中仍然存在这个数据。如果布隆过滤器判断数据不存在,则直接返回,避免进一步查询Redis和数据库。
布隆过滤器能够节约内存空间,但可能存在误判,因为哈希冲突的存在可能导致误判的产生。
实现方式如有需要以后再补。
缓存雪崩
当大量缓存key同时失效或Redis服务宕机时,可能引发缓存雪崩问题,导致数据库压力激增。
随机过期时间
设置缓存过期时间时,为不同的缓存Key添加随机的TTL值,降低缓存同时失效的可能性,有效防止雪崩效应的发生。
Redis集群
部署Redis集群可以提高服务的可用性和容错能力,一旦某个节点发生故障,其他节点仍能正常提供缓存服务,减少因单点故障引发的雪崩效应。
降级限流
在缓存雪崩事件发生时,及时启用降级限流策略,如关闭部分非关键功能或对请求进行限流处理,以减轻数据库压力,确保系统稳定运行。
多级缓存
建立多级缓存体系,包括本地缓存、分布式缓存等,利用不同缓存层级间的协作,提高系统整体的容错性和稳定性,降低缓存雪崩风险。
缓存击穿
缓存击穿问题,也称为热点Key问题,指的是一个被高并发访问且缓存重建业务较复杂的Key突然失效,导致大量请求瞬间给数据库带来巨大冲击。
互斥锁
一种解决方案是利用锁来实现互斥性,确保每次只有一个线程能够访问数据库,从而避免对数据库的过大访问压力。然而,使用锁可能会影响查询性能,因为原本并行的查询变成了串行操作。为了优化这一方面,可以采用tryLock方法结合双重检查机制。
当线程1访问时,如果未命中缓存但成功获取锁资源,线程1将独自执行逻辑。若此时线程2到来,在未获取锁的情况下,线程2会进入休眠状态,直至线程1释放锁后,线程2获得锁并执行查询缓存逻辑,此时便能从缓存中获取数据。这种方式能够有效地优化数据库访问并提升性能。
实现
通过使用分布式锁的方式来解决缓存击穿问题。具体来说,当某个请求发现缓存中没有对应的优惠券数据时,会尝试获取名为"lock:cache:voucher:" + voucherId
的分布式锁实例。如果成功获取了锁,那么该线程就负责去数据库中查询对应的优惠券数据,并将其放入缓存中。如果在获取锁的过程中失败,该线程会进行短暂的休眠后重试,以避免多个线程同时请求数据库。
在获取到锁之后,代码会再次检查缓存中是否存在优惠券数据,以防止在等待获取锁的过程中已经有其他线程更新了缓存。然后,它会从数据库中查询数据,并将结果存入缓存中。最后,无论是从数据库中获取数据还是直接从缓存中获取数据,最终都会返回一个包含优惠券信息的成功响应。
通过这种方式,即使在缓存失效的情况下,也能保证只有一个线程去负责重新构建缓存数据,从而避免了大量请求同时涌入数据库,减轻了数据库的压力,提高了系统的性能和稳定性。
查看代码
public Result getVoucher(long voucherId) {
String voucherJson = stringRedisTemplate.opsForValue().get("cache:voucher:" + voucherId);
if (StrUtil.isNotBlank(voucherJson)) {
Voucher voucher = JSONUtil.toBean(voucherJson, Voucher.class);
return Result.success(voucher);
}
if (voucherJson != null) {
// 缓存命中空值, 返回错误信息
return Result.fail("不存在该优惠券!");
}
// 缓存重建, 让一个线程去查数据库
RedisLock lock = new RedisLock("lock:cache:voucher:" + voucherId, stringRedisTemplate);
Voucher voucher = null;
try {
boolean isLock = lock.tryLock(2);
if(!isLock){
// 如果当前线程没拿到锁, 休眠一段时间后重试
Thread.sleep(20);
return getVoucher(voucherId); // 递归实现重试
}
voucher = getById(voucherId);
if (voucher == null) {
// 数据库不存在, 缓存一个空字符串值
stringRedisTemplate.opsForValue().set("cache:voucher:" + voucherId, "", 10L, TimeUnit.MINUTES);
return Result.fail("不存在该优惠券!");
}
stringRedisTemplate.opsForValue().set("cache:voucher:" + voucherId, JSONUtil.toJsonStr(voucher), 10L, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return Result.success(voucher);
}
测试
我们在查询数据库的代码中加一个控制台输出,每次查询了数据库就输出信息。
接着删除redis中的缓存,使用鸡米特发送1000个请求查询ID为1024的优惠券,看看有多少查询了多少次数据库。
可以看到,只查询了一次数据库。
这样就避免了大量请求同时查询数据库。
逻辑过期
缓存击穿问题常由于设置了过期时间的键导致,若不设置过期时间则不会出现这一问题。然而,不设置过期时间将导致数据一直占用内存。为处理此问题,也可以考虑逻辑过期方案。
在逻辑过期方案中,我们将过期时间存储在Redis的value中,并非直接作用于Redis,而是通过后续逻辑处理。当线程1查询缓存时,若发现数据已过期,线程1获取互斥锁,导致其他线程阻塞。获得锁的线程将启动一个新线程执行重构数据的逻辑,直至完成并释放锁后,线程1直接返回。若线程3尝试访问,由于线程2持有锁,线程3无法获得,同样直接返回数据,直至线程2完成重建数据后,其他线程才能返回正确数据。
逻辑过期方案巧妙之处在于异步构建缓存,但缺点在于在构建完缓存之前,返回的数据可能是过期的。
总结
我们采用Spring Boot快速搭建了web项目,并利用mp代码生成器快速生成了结构化的代码。随后,我们专注于秒杀业务的开发。
针对秒杀下单流程,我们首先定义了全局唯一ID,并实现了订单逻辑。这包括对秒杀时间和库存的判断、限制每人只能下单一次,以及通过事务来确保操作的原子性。随后,我们进行了充分的测试,验证系统在高并发情况下的稳定性和数据一致性。
除此之外,我们还搭建了Redis集群,并实现了分布式锁。优化措施涵盖了秒杀资格的判断和订单提交脚本的添加。最后,我们对缓存进行了优化,包括优惠券列表查询和新增秒杀券的功能,并解决了缓存穿透、缓存雪崩和缓存击穿等问题。
希望以上内容能够帮助大家更好地理解如何在Spring Boot项目中利用Redis实现缓存管理、分布式锁、消息队列等功能,并解决常见的缓存相关问题。