第十六章
秒杀高级
优就业.JAVA教研室
学习目标
- 防止秒杀重复排队
- 并发超卖问题解决
- 订单支付
- RabbitMQ延时队列
- 库存回滚
1 防止秒杀重复排队
用户每次抢单的时候,一旦排队,我们设置一个自增值,让该值的初始值为1,每次进入抢单的时候,对它进行递增,如果值>1,则表明已经排队,不允许重复排队,如果重复排队,则对外抛出异常,并抛出异常信息表示已经正在排队。
1.1 后台排队记录
修改SeckillOrderServiceImpl的add方法,新增递增值判断是否排队中,代码如下:
上图代码如下:
//递增,判断是否排队
Long userQueueCount = redisTemplate.boundHashOps("UserQueueCount").increment(username, 1);
if(userQueueCount>1){
//100:表示有重复抢单
throw new RuntimeException(String.valueOf(StatusCode.REPERROR));
}
测试:
首次秒杀下单:http://localhost:8001/api/seckillOrder/add?id=1&time=2021061914
查看redis缓存,发现已经记录了UserQueueCount
再次重复下单:
查看查看redis缓存,发现UserQueueCount记录的用户下单数量变成了2
2 并发超卖问题解决
超卖问题,这里是指多人抢购同一商品的时候,多人同时判断是否有库存,如果只剩一个,则都会判断有库存,此时会导致超卖现象产生,也就是一个商品下了多个订单的现象。
2.1 思路分析
解决超卖问题,可以利用Redis队列实现,给每件商品创建一个独立的商品个数队列,例如:A商品有2个,A商品的ID为1001,则可以创建一个队列,key=SeckillGoodsCountList_1001,往该队列中塞2次该商品ID。
每次给用户下单的时候,先从队列中取数据,如果能取到数据,则表明有库存,如果取不到,则表明没有库存,这样就可以防止超卖问题产生了。
在我们对Redis进行操作的时候,很多时候,都是先将数据查询出来,在内存中修改,然后存入到Redis,在并发场景,会出现数据错乱问题,为了控制数量准确,我们单独将商品数量整一个自增键,自增键是线程安全的,所以不担心并发场景的问题。
2.2 代码实现
每次将商品压入Redis缓存的时候,另外多创建一个商品的队列。
修改SeckillGoodsPushTask,添加一个pushIds方法,用于将指定商品ID放入到指定的数字中,代码如下:
/***
* 将商品ID存入到数组中
* @param len:长度
* @param id :值
* @return
*/
public Long[] pushIds(int len,Long id){
Long[] ids = new Long[len];
for (int i = 0; i <ids.length ; i++) {
ids[i]=id;
}
return ids;
}
修改SeckillGoodsPushTask的loadGoodsPushRedis方法,添加队列操作,代码如下:
上图代码如下:
//商品数据队列存储,防止高并发超卖
Long[] ids = pushIds(seckillGood.getStockCount(), seckillGood.getId());
redisTemplate.boundListOps("SeckillGoodsCountList_"+seckillGood.getId()).leftPushAll(ids);
//创建一个计数器记录各个商品的库存数量
redisTemplate.boundHashOps("SeckillGoodsCount").increment(seckillGood.getId(),seckillGood.getStockCount());
测试:
重启服务,发现定时任务执行后,在redis缓存已经记录了数据id的队列SeckillGoodsCountList_1、SeckillGoodsCountList_2等等
并且也记录了每个商品的库存数量SeckillGoodsCoun
2.3 超卖控制
修改多线程下单方法,分别修改数量控制,以及售罄后用户抢单排队信息的清理,修改代码如下图:
上图代码如下:
/***
* 多线程下单操作
*/
@Async
public void createOrder(){
//从队列中获取排队信息
SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundListOps("SeckillOrderQueue").rightPop();
try {
//从队列中获取一个商品
Object sgood = redisTemplate.boundListOps("SeckillGoodsCountList_" + seckillStatus.getGoodsId()).rightPop();
if(sgood==null){
//清理当前用户的排队信息
clearQueue(seckillStatus);
return;
}
//时间区间
String time = seckillStatus.getTime();
//用户登录名
String username=seckillStatus.getUsername();
//用户抢购商品
Long id = seckillStatus.getGoodsId();
//获取商品数据
SeckillGoods goods = (SeckillGoods) redisTemplate.boundHashOps("SeckillGoods_" + time).get(id);
//如果有库存,则创建秒杀商品订单
SeckillOrder seckillOrder = new SeckillOrder();
seckillOrder.setId(idWorker.nextId());
seckillOrder.setSeckillId(id);
seckillOrder.setMoney(goods.getCostPrice());
seckillOrder.setUserId(username);
seckillOrder.setCreateTime(new Date());
seckillOrder.setStatus("0");
//将秒杀订单存入到Redis中
redisTemplate.boundHashOps("SeckillOrder").put(username,seckillOrder);
//商品库存-1
Long surplusCount = redisTemplate.boundHashOps("SeckillGoodsCount").increment(id, -1);//商品数量递减
goods.setStockCount(surplusCount.intValue()); //根据计数器统计
//判断当前商品是否还有库存
if(surplusCount<=0){
//并且将商品数据同步到MySQL中
seckillGoodsMapper.updateById(goods);
//如果没有库存,则清空Redis缓存中该商品
redisTemplate.boundHashOps("SeckillGoods_" + time).delete(id);
}else{
//如果有库存,则直数据重置到Reids中
redisTemplate.boundHashOps("SeckillGoods_" + time).put(id,goods);
}
//抢单成功,更新抢单状态,排队->等待支付
seckillStatus.setStatus(2);
seckillStatus.setOrderId(seckillOrder.getId());
seckillStatus.setMoney(seckillOrder.getMoney().floatValue());
redisTemplate.boundHashOps("UserQueueStatus").put(username,seckillStatus);
} catch (Exception e) {
e.printStackTrace();
}
}
/***
* 清理用户排队信息
* @param seckillStatus
*/
public void clearQueue(SeckillStatus seckillStatus){
//清理排队标示
redisTemplate.boundHashOps("UserQueueCount").delete(seckillStatus.getUsername());
//清理抢单标示
redisTemplate.boundHashOps("UserQueueStatus").delete(seckillStatus.getUsername());
}
测试:正常秒杀下单
http://localhost:8001/api/seckillOrder/add?id=1&time=2021061914
查看redis中的SeckillGoodsCount发现对应编号的商品减去了库存数量。
3 秒杀订单支付
完成秒杀下订单后,进入支付页面,此时前端会每3秒中向后台发送一次请求用于判断当前用户订单是否完成支付,如果完成了支付,则需要清理掉排队信息,并且需要修改订单状态信息。
3.1 搭建支付模块
创建模块dongyimai-payseckill-service
(1)、修改依赖配置文件pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dongyimai-service</artifactId>
<groupId>com.offcn</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dongyimai-payseckill-service</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 支付宝支付所需类库包 -->
<dependency>
<groupId>com.alipay.sdk</groupId>
<artifactId>alipay-sdk-java</artifactId>
<version>4.11.54.ALL</version>
<exclusions>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.offcn</groupId>
<artifactId>dongyimai-common</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>
</dependencies>
</project>
(2)、配置文件application.yml
server:
port: 9006
spring:
application:
name: pay
rabbitmq:
host: 192.168.188.128 #mq的服务器地址
username: guest #账号
password: guest #密码
alipay:
serverUrl: https://openapi.alipaydev.com/gateway.do
notify-url: http://7r8ukqlrpt.52http.com/pay/notify/url
appId: 2016091300500103
privateKey: 用户私钥
alipayPublicKey: 阿里公钥
format: json
charset: utf-8
signType: RSA2
3.2 创建支付二维码
下单成功后,会跳转到支付选择页面,在支付选择页面要显示订单编号和订单金额,所以我们需要在下单的时候,将订单金额以及订单编号信息存储到用户查询对象中。
选择扫码支付后,会跳转到扫码支付页面,支付页面会根据用户名查看用户秒杀订单,并根据用户秒杀订单的ID创建预支付信息并获取二维码信息,展示给用户看,此时页面每3秒查询一次支付状态,如果支付成功,需要修改订单状态信息。
3.2.1 回显订单号、金额
下单后,进入支付选择页面,需要显示订单号和订单金额,所以需要在用户下单后将该数据传入到pay.html页面,所以查询订单状态的时候,需要将订单号和金额封装到查询的信息中,修改查询订单装的方法加入他们即可。
修改SeckillOrderController的queryStatus方法,代码如下:
上图代码如下:
return new Result(true,seckillStatus.getStatus(),"抢购状态",seckillStatus);
使用Postman测试,效果如下:
http://localhost:8001/api/seckillOrder/query
订单编号为Long型,转换为json字符串出现精度损失
处理方法:在订单状态实体类,订单编号属性值上增加注解,把Long按照字符串进行序列化
//订单号
@JsonSerialize(using = ToStringSerializer.class)
private Long orderId;
3.2.2 创建二维码
用户创建二维码,可以先查询用户的秒杀订单抢单信息,然后再发送请求到支付微服务中创建二维码,将订单编号以及订单对应的金额传递到支付微服务:/pay/create/native
。
修改支付模块dongyimai-payseckill-service
(1)、编写支付接口,定义预下单方法
import java.util.Map;
public interface PayService {
/**
* 生成二维码
* @param out_trade_no
* @param total_fee
* @return
*/
Map<String,String> createNative(Map<String,String> parameters);
}
(2)、编写接口实现类
import com.alipay.api.AlipayApiException;
import com.alipay.api.DefaultAlipayClient;
import com.alipay.api.request.AlipayTradePrecreateRequest;
import com.alipay.api.request.AlipayTradeQueryRequest;
import com.alipay.api.response.AlipayTradePrecreateResponse;
import com.alipay.api.response.AlipayTradeQueryResponse;
import com.offcn.pay.service.PayService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class PayServiceImpl implements PayService {
/**
* 支付宝gatewayUrl
*/
@Value("${alipay.serverUrl}")
private String serverUrl;
/**
* 商户应用id
*/
@Value("${alipay.appId}")
private String appId;
/**
* RSA私钥,用于对商户请求报文加签
*/
@Value("${alipay.privateKey}")
private String privateKey;
/**
* 支付宝RSA公钥,用于验签支付宝应答
*/
@Value("${alipay.alipayPublicKey}")
private String alipayPublicKey;
/**
* 签名类型
*/
@Value("${alipay.signType}")
private String signType = "RSA2";
/**
* 格式
*/
@Value("${alipay.format}")
private String formate = "json";
/**
* 编码
*/
@Value("${alipay.charset}")
private String charset = "UTF-8";
/**
* 异步地址
*/
@Value("${alipay.notify-url}")
private String notifyUrl;
@Override
public Map<String, String> createNative(Map<String, String> parameters) {
//创建阿里支付客户端请求对象
DefaultAlipayClient alipayClient = new DefaultAlipayClient(serverUrl, appId, privateKey, formate, charset, alipayPublicKey, signType);
Map<String,String> map=new HashMap<String, String>();
//创建预下单请求对象
AlipayTradePrecreateRequest request = new AlipayTradePrecreateRequest();
//设置回调地址
request.setNotifyUrl(notifyUrl);
//设置预下单请求参数
request.setBizContent("{" +
" \"out_trade_no\":\""+parameters.get("out_trade_no")+"\"," +
" \"total_amount\":\""+parameters.get("total_fee")+"\"," +
" \"subject\":\"测试购买商品001\"," +
" \"store_id\":\"xa_001\"," +
" \"timeout_express\":\"90m\"}");//设置业务参数
//发出预下单业务请求
try {
AlipayTradePrecreateResponse response = alipayClient.execute(request);
//从相应对象读取相应结果
String code = response.getCode();
System.out.println("响应码:"+code);
//全部的响应结果
String body = response.getBody();
System.out.println("返回结果:"+body);
if(code.equals("10000")){
map.put("qrcode", response.getQrCode());
map.put("out_trade_no", response.getOutTradeNo());
map.put("total_fee",parameters.get("total_fee"));
System.out.println("qrcode:"+response.getQrCode());
System.out.println("out_trade_no:"+response.getOutTradeNo());
System.out.println("total_fee:"+parameters.get("total_fee"));
}else{
System.out.println("预下单接口调用失败:"+body);
}
} catch (AlipayApiException e) {
e.printStackTrace();
}
return map;
}
}
(3)、编写controller控制器代码
import com.alibaba.fastjson.JSON;
import com.offcn.entity.Result;
import com.offcn.entity.StatusCode;
import com.offcn.pay.service.PayService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import java.io.ByteArrayOutputStream;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("pay")
public class PayController {
@Autowired
private PayService payService;
/**
* 创建二维码连接地址返回给前端 生成二维码图片
*
* @param parameters 包含 订单号 包含 金额 包含 queue队列名称 交换机信息 路由信息 用户名
* @return
*/
@RequestMapping("/create/native")
public Result<Map> createNative(@RequestParam Map<String,String> parameters) {
//获取用户名
Map<String, String> resultMap = payService.createNative(parameters);
return new Result<Map>(true, StatusCode.OK, "二维码连接地址创建成功", resultMap);
}
}
(4)、编写主启动类PaySeckillApplication
@SpringBootApplication
public class PaySeckillApplication {
public static void main(String[] args) {
SpringApplication.run(PaySeckillApplication.class,args);
}
}
使用Postman测试效果如下:
http://localhost:9006/pay/create/native?out_trade_no=1132510782836314121&total_fee=1
根据返回的qrcode,可以生成支付宝扫码的二维码:
使用沙箱版支付宝扫码,即可弹出支付界面。
3.3 支付流程分析
如上图,步骤分析如下:
1.用户抢单,经过秒杀系统实现抢单,下单后会将向MQ发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到
2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断Redis缓存中是否存在订单信息,如果存在,则回滚
3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单持久化到MySQL,如果没完成,清理排队信息回滚库存
4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,支付宝平台会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给MQ,第3个步骤就可以监听到消息了。
3.4 支付回调更新
支付回调这一块代码已经实现了,但之前实现的是订单信息的回调数据发送给MQ,指定了对应的队列,不过现在需要实现的是秒杀信息发送给指定队列,所以之前的代码那块需要动态指定队列。
3.4.1 支付回调队列指定
关于指定队列如下:
1.创建支付二维码需要指定队列
2.回调地址回调的时候,获取支付二维码指定的队列,将支付信息发送到指定队列中
在支付宝支付统一下单API中,有一个附加参数,如下:
body:附加数据,String(127),在查询API和支付通知中原样返回,可作为自定义参数使用。
我们可以在创建二维码的时候,指定该参数,该参数用于指定回调支付信息的对应队列,每次回调的时候,会获取该参数,然后将回调信息发送到该参数对应的队列去。
3.4.1.1 在预下单时传递队列信息
修改支付微服务的PayServiceImpl的createNative方法,代码如下:
@Override
public Map<String, String> createNative(Map<String, String> parameters) {
//创建阿里支付客户端请求对象
DefaultAlipayClient alipayClient = new DefaultAlipayClient(serverUrl, appId, privateKey, formate, charset, alipayPublicKey, signType);
Map<String,String> map=new HashMap<String, String>();
//创建预下单请求对象
AlipayTradePrecreateRequest request = new AlipayTradePrecreateRequest();
request.setNotifyUrl(notifyUrl);
request.setBizContent("{" +
" \"out_trade_no\":\""+parameters.get("out_trade_no")+"\"," +
" \"body\":\"queue="+parameters.get("queue")+"&username="+parameters.get("username")+"&routingkey="+parameters.get("routingkey")+"&exchange="+parameters.get("exchange")+"\"," +
" \"total_amount\":\""+parameters.get("total_fee")+"\"," +
" \"subject\":\"测试购买商品001\"," +
" \"store_id\":\"xa_001\"," +
" \"timeout_express\":\"90m\"}");//设置业务参数
//发出预下单业务请求
try {
AlipayTradePrecreateResponse response = alipayClient.execute(request);
//从相应对象读取相应结果
String code = response.getCode();
System.out.println("响应码:"+code);
//全部的响应结果
String body = response.getBody();
System.out.println("返回结果:"+body);
if(code.equals("10000")){
map.put("qrcode", response.getQrCode());
map.put("out_trade_no", response.getOutTradeNo());
map.put("total_fee",parameters.get("total_fee"));
System.out.println("qrcode:"+response.getQrCode());
System.out.println("out_trade_no:"+response.getOutTradeNo());
System.out.println("total_fee:"+parameters.get("total_fee"));
}else{
System.out.println("预下单接口调用失败:"+body);
}
} catch (AlipayApiException e) {
e.printStackTrace();
}
return map;
}
我们创建二维码的时候,需要将下面几个参数传递过去
username:用户名,可以根据用户名查询用户排队信息
out_trade_no:商户订单号,下单必须
total_fee:支付金额,支付必须
queue:队列名字,回调的时候,可以知道将支付信息发送到哪个队列
routingkey:路由的名称
exchange:交换机名称
修改主启动类PayApplication,添加对应队列以及对应交换机绑定,代码如下:
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableEurekaClient
public class PaySeckillApplication {
public static void main(String[] args) {
SpringApplication.run(PaySeckillApplication.class,args);
}
@Autowired
private Environment env;
//创建秒杀队列
@Bean(name="seckillQueue")
public Queue createSeckillQueue(){
return new Queue(env.getProperty("mq.pay.queue.seckillorder"));
}
//创建秒杀交换机
@Bean(name="seckillExchanage")
public DirectExchange basicSeckillExchanage(){
return new DirectExchange(env.getProperty("mq.pay.exchange.seckillorder"));
}
//绑定秒杀
@Bean(name="SeckillBinding")
public Binding basicSeckillBinding(){
return BindingBuilder.bind(createSeckillQueue()).to(basicSeckillExchanage()).with(env.getProperty("mq.pay.routing.seckillkey"));
}
}
修改application.yml,添加如下配置
#位置支付交换机和队列
mq:
pay:
exchange:
seckillorder: exchange.seckillorder
queue:
seckillorder: queue.seckillorder
routing:
seckillkey: queue.seckillorder
3.4.1.2 测试
使用Postman创建二维码测试
http://localhost:9006/pay/create/native?username=test001&out_trade_no=1132510782836314121&total_fee=1&queue=queue.seckillorder&routingkey=queue.seckillorder&exchange=exchange.seckillorder
以后每次支付,都需要带上对应的参数,包括前面的订单支付。
3.4.1.3 改造支付回调方法
修改PayController的notifyUrl方法,获取自定义参数,并转成Map,获取queue地址,并将支付信息发送到绑定的queue中,代码如下:
//接收支付宝网关回调数据
@RequestMapping("/notify/url")
public String reciveResult(HttpServletRequest request){
//准备一个map封装全部的请求参数和值
Map<String,String> map=new HashMap<>();
//获取全部的请求参数名称
Enumeration<String> parameterNames = request.getParameterNames();
while (parameterNames.hasMoreElements()){
String paramName = parameterNames.nextElement();
//封装参数名和参数值到map
map.put(paramName,request.getParameter(paramName));
}
//转换map集合为json字符串
String jsonString = JSON.toJSONString(map);
System.out.println("响应结果:"+jsonString);
//获取body自定义参数
String body = map.get("body");
//body参数多个用&隔开
String[] splits = body.split("&");
//创建一个map封装body的参数
Map<String,String> bodyMap=new HashMap<>();
for (String split : splits) {
//继续用=号切开
String[] parms = split.split("=");
bodyMap.put(parms[0],parms[1]);
}
//发送消息
rabbitTemplate.convertAndSend(bodyMap.get("exchange"),bodyMap.get("routingkey"),jsonString);
return "pay-success";
}
配置回调地址:application.yml
alipay:
notify-url: http://7r8ukqlrpt.52http.com/pay/notify/url
3.4.1.4 内网穿透
支付异步通知需要独立ip使阿里支付成功后可以回调我们的接口,所以前提条件就是内网穿透。
如何使用:
(1)、下载 cpolar
https://www.cpolar.com/static/downloads/releases/latest/cpolar_amd64.msi
(2)、安装:
在Linux或OSX上,您可以使用以下命令从终端解压缩cpolar。 在Windows上,只需双击安装
(3)、连接您的帐户:
运行此命令会将您帐户的authtoken添加到您的cpolar.yml文件中。 这将为您提供更多功能,所有打开的隧道将在此处的仪表板中列出。
cpolar authtoken YzQxMzFiY2YtY2I1NC*****LTg2NzAtZjJkMTRmYjQ0ZDRh
具体的token值:登录网站从控制面板获取:https://dashboard.cpolar.com/auth
(4)、开启端口映射
cpolar http 80
在80端口开启映射。
测试内网穿透:访问回调接口接口
http://6ca8e8f6.cpolar.io/pay/notify/url?id=111&name=wwww
扫描支付二维码,支付宝即可回调 接口
控制台可以查看相关信息
3.4.2 支付状态监听
支付状态通过回调地址发送给MQ之后,我们需要在秒杀系统中监听支付信息,如果用户已支付,则修改用户订单状态,如果支付失败,则直接删除订单,回滚库存。
在秒杀工程中创建com.offcn.seckill.consumer.SeckillOrderPayMessageListener,实现监听消息,代码如下:
@Component
@RabbitListener(queues = "${mq.pay.queue.seckillorder}")
public class SeckillOrderPayMessageListener {
/**
* 监听消费消息
* @param message
*/
@RabbitHandler
public void consumeMessage(@Payload String message){
System.out.println(message);
//将消息转换成Map对象
Map<String,String> resultMap = JSON.parseObject(message,Map.class);
System.out.println("监听到的消息:"+resultMap);
}
}
修改SeckillApplication创建对应的队列以及绑定对应交换机。
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@MapperScan(basePackages = {"com.offcn.seckill.dao"})
@EnableScheduling
@EnableAsync
public class SeckillApplication {
public static void main(String[] args) {
SpringApplication.run(SeckillApplication.class,args);
}
@Bean
public IdWorker idWorker(){
return new IdWorker(1,1);
}
@Autowired
private Environment env;
/***
* 创建秒杀队列
* @return
*/
@Bean(name = "queueSeckillOrder")
public Queue queueSeckillOrder(){
return new Queue(env.getProperty("mq.pay.queue.seckillorder"), true);
}
//创建秒杀交换机
@Bean
public DirectExchange directExchangeOrder(){
return new DirectExchange(env.getProperty("mq.pay.exchange.seckillorder"));
}
/****
* 队列绑定到交换机上
* @return
*/
@Bean
public Binding basicBindingSeckillOrder(){
return BindingBuilder
.bind(queueSeckillOrder())
.to(directExchangeOrder())
.with(env.getProperty("mq.pay.routing.seckillkey"));
}
}
修改application.yml文件,添加如下配置:
#位置支付交换机和队列
mq:
pay:
exchange:
seckillorder: exchange.seckillorder
queue:
seckillorder: queue.seckillorder
routing:
seckillkey: queue.seckillorder
3.4.3 修改订单状态
监听到支付信息后,根据支付信息判断,如果用户支付成功,则修改订单信息,并将订单入库,删除用户排队信息,如果用户支付失败,则删除订单信息,回滚库存,删除用户排队信息。
3.4.3.1 业务层
修改SeckillOrderService,添加修改订单方法,代码如下
/***
* 更新订单状态
* @param out_trade_no
* @param transaction_id
* @param username
*/
void updatePayStatus(String out_trade_no, String transaction_id,String username);
修改SeckillOrderServiceImpl,添加修改订单方法实现,代码如下:
/***
* 更新订单状态
* @param out_trade_no
* @param transaction_id
* @param username
*/
@Override
public void updatePayStatus(String out_trade_no, String transaction_id,String username) {
//订单数据从Redis数据库查询出来
SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
//修改状态
seckillOrder.setStatus("1");
//设置交易流水号
seckillOrder.setTransactionId(transaction_id);
//支付时间
seckillOrder.setPayTime(new Date());
//同步到MySQL中
seckillOrderMapper.insert(seckillOrder);
//清空Redis缓存
redisTemplate.boundHashOps("SeckillOrder").delete(username);
//清空用户排队数据
redisTemplate.boundHashOps("UserQueueCount").delete(username);
//删除抢购状态信息
redisTemplate.boundHashOps("UserQueueStatus").delete(username);
}
修改订单实体类的id生成方式,为外面生成
3.4.3.2 修改订单对接
修改扫码支付状态监听的代码,当用户支付成功后,修改订单状态,也就是调用上面的方法,代码如下:
代码:
package com.offcn.seckill.consumer;
import com.alibaba.fastjson.JSON;
import com.offcn.seckill.service.SeckillOrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
@RabbitListener(queues = "${mq.pay.queue.seckillorder}")
public class SeckillOrderPayMessageListener {
@Autowired
private SeckillOrderService seckillOrderService;
/**
* 监听消费消息
* @param message
*/
@RabbitHandler
public void consumeMessage(@Payload String message){
System.out.println(message);
//将消息转换成Map对象
Map<String,String> resultMap = JSON.parseObject(message, Map.class);
System.out.println("监听到的消息:"+resultMap);
//获取交易状态
String trade_status= resultMap.get("trade_status");
//判断交易状态是否等于成功
if(trade_status!=null&&trade_status.equalsIgnoreCase("TRADE_SUCCESS")){
String body= resultMap.get("body");
Map<String, String> bodyMap = new HashMap<>();
if(resultMap.get("body")!=null) {
String[] splits = body.split("&");
for (String split : splits) {
String[] vs = split.split("=");
bodyMap.put(vs[0], vs[1]);
}
}
seckillOrderService.updatePayStatus(resultMap.get("out_trade_no"),resultMap.get("trade_no"),bodyMap.get("username"));
}else {
//支付失败,删除订单
}
}
}
测试:
使用Postman完整请求创建二维码下单测试一次。
商品ID:1
剩余库存数量:10
http://localhost:8001/api/seckillGoods/one?id=1&time=2021061916
下单:
http://localhost:8001/api/seckillOrder/add?time=2021061916&id=1
下单后,Redis数据
下单查询:
http://localhost:8001/api/seckillOrder/query
创建二维码:
http://localhost:9006/pay/create/native?username=dongyimai&out_trade_no=1375043168986337280&total_fee=0.01&queue=queue.seckillorder&routingkey=queue.seckillorder&exchange=exchange.seckillorder
注意:username 用登录的账号 out_trade_no 用下单成功的订单号
扫码:
使用返回的预下单url生成二维码
使用沙箱支付宝扫描二维码,完成支付
查看订单状态:
http://localhost:9005/seckillOrder/query
3.4.4 删除订单回滚库存
如果用户支付失败,我们需要删除用户订单数据,并回滚库存。
3.4.4.1 业务层实现
修改SeckillOrderService,创建一个关闭订单方法,代码如下:
/***
* 关闭订单,回滚库存
*/
void closeOrder(String username);
修改SeckillOrderServiceImpl,创建一个关闭订单实现方法,代码如下:
/***
* 关闭订单,回滚库存
* @param username
*/
@Override
public void closeOrder(String username) {
//将消息转换成SeckillStatus
SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps("UserQueueStatus").get(username);
//获取Redis中订单信息
SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
//如果Redis中有订单信息,说明用户未支付
if(seckillStatus!=null && seckillOrder!=null){
//删除订单
redisTemplate.boundHashOps("SeckillOrder").delete(username);
//回滚库存
//1)从Redis中获取该商品
SeckillGoods seckillGoods = (SeckillGoods) redisTemplate.boundHashOps("SeckillGoods_"+seckillStatus.getTime()).get(seckillStatus.getGoodsId());
//2)如果Redis中没有,则从数据库中加载
if(seckillGoods==null){
seckillGoods = seckillGoodsMapper.selectById(seckillStatus.getGoodsId());
}
//3)数量+1 (递增数量+1,队列数量+1)
Long surplusCount = redisTemplate.boundHashOps("SeckillGoodsCount").increment(seckillStatus.getGoodsId(), 1);
seckillGoods.setStockCount(surplusCount.intValue());
redisTemplate.boundListOps("SeckillGoodsCountList_" + seckillStatus.getGoodsId()).leftPush(seckillStatus.getGoodsId());
//4)数据同步到Redis中
redisTemplate.boundHashOps("SeckillGoods_"+seckillStatus.getTime()).put(seckillStatus.getGoodsId(),seckillGoods);
//清理排队标示
redisTemplate.boundHashOps("UserQueueCount").delete(seckillStatus.getUsername());
//清理抢单标示
redisTemplate.boundHashOps("UserQueueStatus").delete(seckillStatus.getUsername());
}
}
3.4.4.2 调用删除订单
修改SeckillOrderPayMessageListener,在用户支付失败后调用关闭订单方法,代码如下:
}else{
//支付失败,关闭订单
seckillOrderService.closeOrder(bodyMap.get("username"));
}
3.4.4.3 测试
使用Postman完整请求创建二维码下单测试一次。
商品ID:1
剩余库存数量:10
http://localhost:9005/seckillGoods/one?id=1&time=2021032518
下单:
http://localhost:9005/seckillOrder/add?time=2021032518&id=1
下单后,Redis数据
下单查询:
http://localhost:9005/seckillOrder/query
创建二维码:
http://localhost:9006/pay/create/native?username=test001&out_trade_no=1375043168986337280&total_fee=0.01&queue=queue.seckillorder&routingkey=queue.seckillorder&exchange=exchange.seckillorder
秒杀抢单后,商品数量变化:
http://localhost:9005/seckillGoods/one?id=1&time=2021032518
4 RabbitMQ延时消息队列
4.1 延时队列介绍
延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。 那么,为什么需要延迟消费呢?我们来看以下的场景
网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网) 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会 系统中的业务失败之后,需要重试 这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会。那么一天之中肯定是会有很多个预约的,时间也是不一定的,假设现在有1点 2点 3点 三个预约,如何让系统知道在当前时间等于0点 1点 2点给用户发送信息呢,是不是需要一个轮询,一直去查看所有的预约,比对当前的系统时间和预约提前一小时的时间是否相等呢?这样做非常浪费资源而且轮询的时间间隔不好控制。如果我们使用延时消息队列呢,我们在创建时把需要通知的预约放入消息中间件中,并且设置该消息的过期时间,等过期时间到达时再取出消费即可。
Rabbitmq实现延时队列一般而言有两种形式: 第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)[A队列过期->转发给B队列]
第二种方式:利用rabbitmq中的插件x-delay-message
4.2 TTL DLX实现延时队列
4.2.1 TTL DLX介绍
TTL RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
Dead Letter Exchanges(DLX) RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。 x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
4.2.3 DLX延时队列实现
4.2.3.1 创建工程
创建springboot_rabbitmq_delay工程,并引入相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dongyimai_parent</artifactId>
<groupId>com.offcn</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springboot_rabbitmq_delay</artifactId>
<dependencies>
<!--starter-web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--加入ampq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
application.yml配置
spring:
application:
name: springboot-demo
rabbitmq:
host: 192.168.188.128
port: 5672
password: guest
username: guest
4.2.3.2 队列创建
创建2个队列,用于接收消息的叫延时队列queue.message.delay,用于转发消息的队列叫queue.message,同时创建一个交换机,代码如下:
@Configuration
public class QueueConfig {
/** 短信发送队列 */
public static final String QUEUE_MESSAGE = "queue.message";
/** 交换机 */
public static final String DLX_EXCHANGE = "dlx.exchange";
/** 短信发送队列 延迟缓冲(按消息) */
public static final String QUEUE_MESSAGE_DELAY = "queue.message.delay";
/**
* 接收死信队列
* @return
*/
@Bean
public Queue messageQueue() {
//允许长期存储消息
return new Queue(QUEUE_MESSAGE, true);
}
/**
* 接收延时消息队列
* @return
*/
@Bean
public Queue delayMessageQueue() {
return QueueBuilder.durable(QUEUE_MESSAGE_DELAY)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // 消息超时进入死信队列,绑定死信队列交换机
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGE) // 绑定指定的routing-key
.build();
}
/***
* 创建交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DLX_EXCHANGE);
}
/***
* 交换机与队列绑定
* @param messageQueue
* @param directExchange
* @return
*/
@Bean
public Binding basicBinding(Queue messageQueue, DirectExchange directExchange) {
return BindingBuilder.bind(messageQueue)
.to(directExchange)
.with(QUEUE_MESSAGE);
}
}
4.2.3.3 消息监听
创建MessageListener用于监听消息,代码如下:
@Component
@RabbitListener(queues = QueueConfig.QUEUE_MESSAGE)
public class MessageListener {
/***
* 监听消息
* @param msg
*/
@RabbitHandler
public void msg(@Payload Object msg){
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("当前时间:"+dateFormat.format(new Date()));
System.out.println("收到信息:"+msg);
}
}
注意:@Payload 和 @Headers 注解可以分别接收消息中的 body 与 headers 信息
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用 @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型。
4.2.3.4 创建启动类
@SpringBootApplication
@EnableRabbit
public class SpringRabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(SpringRabbitMQApplication.class,args);
}
}
4.2.3.5 测试
@SpringBootTest(classes = SpringRabbitMQApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/***
* 发送消息
*/
@Test
public void sendMessage() throws InterruptedException, IOException {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("发送当前时间:"+dateFormat.format(new Date()));
Map<String,String> message = new HashMap<>();
message.put("name","offcn");
rabbitTemplate.convertAndSend(QueueConfig.QUEUE_MESSAGE_DELAY, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
});
System.in.read();
}
}
其中message.getMessageProperties().setExpiration(“10000”);设置消息超时时间,超时后,会将消息转入到另外一个队列。
测试效果如下:
接收时间延迟了10秒,在死信队列收到消息
5 超时未完成支付、自动删除订单库存回滚(作业)
5.1 秒杀流程回顾
如上图,步骤分析如下:
1.用户抢单,经过秒杀系统实现抢单,下单后会将向MQ发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到
2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断Redis缓存中是否存在订单信息,如果存在,则回滚
3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单持久化到MySQL,如果没完成,清理排队信息回滚库存
4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,支付宝平台会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给MQ,第3个步骤就可以监听到消息了。
延时队列实现订单关闭回滚库存:
1.创建一个过期队列 Queue1
2.接收消息的队列 Queue2
3.中转交换机
4.监听Queue2
1)SeckillStatus->检查Redis中是否有订单信息
2)如果有订单信息,调用删除订单回滚库存->[需要先关闭扫码支付]
3)如果关闭订单时,用于已支付,修改订单状态即可
4)如果关闭订单时,发生了别的错误,记录日志,人工处理
5.2 关闭支付
用户如果半个小时没有支付,我们会关闭支付订单,但在关闭之前,需要先关闭扫码支付,防止中途用户支付。
修改支付微服务的PayService,添加关闭支付方法,代码如下:
/***
* 关闭支付
* @param orderId
* @return
*/
Map<String,String> closePay(Long out_trade_no) throws Exception;
修改PayServiceImpl,实现关闭扫码支付方法,代码如下:
@Override
public Map<String, String> closePay(Long out_trade_no) throws Exception {
//创建阿里支付客户端请求对象
DefaultAlipayClient alipayClient = new DefaultAlipayClient(serverUrl, appId, privateKey, formate, charset, alipayPublicKey, signType);
Map<String,String> map=new HashMap<String, String>();
//撤销交易请求对象
AlipayTradeCancelRequest request = new AlipayTradeCancelRequest();
request.setBizContent("{" +
" \"out_trade_no\":\""+out_trade_no+"\"," +
" \"trade_no\":\"\"}"); //设置业务参数
try {
AlipayTradeCancelResponse response = alipayClient.execute(request);
String code=response.getCode();
if(code.equals("10000")){
System.out.println("返回值:"+response.getBody());
map.put("code", code);
map.put("out_trade_no", out_trade_no+"");
return map;
}
} catch (AlipayApiException e) {
e.printStackTrace();
return null;
}
return null;
}
修改PayController,增加关闭服务方法
//关闭支付预下单
@PostMapping("closepay")
public Result closePay(Long out_trade_no){
try {
Map<String, String> resultMap = payService.closePay(out_trade_no);
if(resultMap!=null&&resultMap.get("code")!=null&&resultMap.get("code").equals("10000")){
//成功关闭,就返回结果
return new Result(true,StatusCode.OK,"订单关闭成功",resultMap);
}
} catch (Exception e) {
e.printStackTrace();
return new Result(false,StatusCode.ERROR,"订单关闭失败");
}
return new Result(false,StatusCode.ERROR,"订单关闭失败");
}
测试:
http://localhost:9006/pay/closepay?out_trade_no=1406451841629294592
5.3 关闭订单回滚库存
5.3.1 配置延时队列
在application.yml文件中引入队列信息配置,如下:
#位置支付交换机和队列
mq:
pay:
exchange:
seckillorder: exchange.seckillorder
seckillordertimer: exchange.seckillordertimer
queue:
seckillorder: queue.seckillorder
seckillordertimer: queue.seckillordertimer
seckillordertimerdelay: queue.seckillordertimerdelay
routing:
seckillkey: queue.seckillorder
seckillordertimerkey: queue.seckillordertime
配置队列与交换机,在SeckillApplication中添加如下方法
/**
* 到期数据队列
* @return
*/
@Bean
public Queue seckillOrderTimerQueue() {
return new Queue(env.getProperty("mq.pay.queue.seckillordertimer"), true);
}
/**
* 超时数据队列
* @return
*/
@Bean
public Queue delaySeckillOrderTimerQueue() {
return QueueBuilder.durable(env.getProperty("mq.pay.queue.seckillordertimerdelay"))
.withArgument("x-dead-letter-exchange", env.getProperty("mq.pay.exchange.seckillordertimer")) // 消息超时进入死信队列,绑定死信队列交换机
.withArgument("x-dead-letter-routing-key", env.getProperty("mq.pay.routing.seckillordertimerkey")) // 绑定指定的routing-key
.build();
}
//创建延时交换机
@Bean
public DirectExchange directExchangeOrderTimer(){
return new DirectExchange(env.getProperty("mq.pay.exchange.seckillordertimer"));
}
/***
* 交换机与队列绑定
* @param
* @param
* @return
*/
@Bean
public Binding basicBindingOrderTime() {
return BindingBuilder.bind(seckillOrderTimerQueue())
.to(directExchangeOrderTimer())
.with(env.getProperty("mq.pay.routing.seckillordertimerkey"));
}
5.3.2 发送延时消息
修改MultiThreadingCreateOrder,添加如下方法:
/***
* 发送延时消息到RabbitMQ中
* @param seckillStatus
*/
public void sendTimerMessage(SeckillStatus seckillStatus){
rabbitTemplate.convertAndSend(env.getProperty("mq.pay.queue.seckillordertimerdelay"), (Object) JSON.toJSONString(seckillStatus), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
});
}
在createOrder方法中调用上面方法,如下代码:
//发送延时消息到MQ中
sendTimerMessage(seckillStatus);
5.3.3 Feign调用支付服务关闭预下单接口
在模块dongyimai-seckill-service创建feign接口
package com.offcn.seckill.feign;
import com.offcn.entity.Result;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(value = "PAY")
public interface PayFeign {
@PostMapping("/pay/closepay")
public Result closePay(@RequestParam("out_trade_no") Long out_trade_no);
}
注意修改配置文件application.yml设置feign调用超时时间
feign:
hystrix:
enabled: true
client:
config:
default:
connectTimeout: 5000 #连接超时时间5秒
readTimeout: 5000 #读超时时间5秒
#hystrix 配置
hystrix:
command:
default:
execution:
timeout:
#如果enabled设置为false,则请求超时交给ribbon控制
enabled: true
isolation:
thread:
timeoutInMilliseconds: 10000
strategy: SEMAPHORE
5.3.4 库存回滚
创建SeckillOrderDelayMessageListener实现监听消息,并回滚库存,代码如下:
package com.offcn.seckill.consumer;
import com.alibaba.fastjson.JSON;
import com.offcn.entity.Result;
import com.offcn.seckill.bean.SeckillStatus;
import com.offcn.seckill.feign.PayFeign;
import com.offcn.seckill.pojo.SeckillOrder;
import com.offcn.seckill.service.SeckillOrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "${mq.pay.queue.seckillordertimer}")
public class SeckillOrderDelayMessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private SeckillOrderService seckillOrderService;
@Autowired
private PayFeign payFeign;
/***
* 读取消息
* 判断Redis中是否存在对应的订单
* 如果存在,则关闭支付,再关闭订单
* @param message
*/
@RabbitHandler
public void consumeMessage(@Payload String message){
//读取消息
SeckillStatus seckillStatus = JSON.parseObject(message,SeckillStatus.class);
//获取Redis中订单信息
String username = seckillStatus.getUsername();
SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
//如果Redis中有订单信息,说明用户未支付
if(seckillOrder!=null){
System.out.println("准备回滚---"+seckillStatus);
//关闭支付
Result closeResult = payFeign.closePay(seckillStatus.getOrderId());
Map<String,String> closeMap = (Map<String, String>) closeResult.getData();
if(closeMap!=null && closeMap.get("code").equalsIgnoreCase("10000")){
//关闭订单
seckillOrderService.closeOrder(username);
}
}
}
}
测试:
http://localhost:8001/api/seckillOrder/add?time=2021062012&id=1
测试下单,10秒内未完成支付,即可收到延时消息,回滚订单