2022-05-28  2022-05-28    11033 字   23 分钟

第十六章

秒杀高级

优就业.JAVA教研室

学习目标

  • 防止秒杀重复排队
  • 并发超卖问题解决
  • 订单支付
  • RabbitMQ延时队列
  • 库存回滚

1 防止秒杀重复排队

用户每次抢单的时候,一旦排队,我们设置一个自增值,让该值的初始值为1,每次进入抢单的时候,对它进行递增,如果值>1,则表明已经排队,不允许重复排队,如果重复排队,则对外抛出异常,并抛出异常信息表示已经正在排队。

1.1 后台排队记录

修改SeckillOrderServiceImpl的add方法,新增递增值判断是否排队中,代码如下:

上图代码如下:

//递增,判断是否排队
Long userQueueCount = redisTemplate.boundHashOps("UserQueueCount").increment(username, 1);
if(userQueueCount>1){
    //100:表示有重复抢单
    throw new RuntimeException(String.valueOf(StatusCode.REPERROR));
}

测试:

首次秒杀下单:http://localhost:8001/api/seckillOrder/add?id=1&time=2021061914

查看redis缓存,发现已经记录了UserQueueCount

再次重复下单:

查看查看redis缓存,发现UserQueueCount记录的用户下单数量变成了2

2 并发超卖问题解决

超卖问题,这里是指多人抢购同一商品的时候,多人同时判断是否有库存,如果只剩一个,则都会判断有库存,此时会导致超卖现象产生,也就是一个商品下了多个订单的现象。

2.1 思路分析

解决超卖问题,可以利用Redis队列实现,给每件商品创建一个独立的商品个数队列,例如:A商品有2个,A商品的ID为1001,则可以创建一个队列,key=SeckillGoodsCountList_1001,往该队列中塞2次该商品ID。

每次给用户下单的时候,先从队列中取数据,如果能取到数据,则表明有库存,如果取不到,则表明没有库存,这样就可以防止超卖问题产生了。

在我们对Redis进行操作的时候,很多时候,都是先将数据查询出来,在内存中修改,然后存入到Redis,在并发场景,会出现数据错乱问题,为了控制数量准确,我们单独将商品数量整一个自增键,自增键是线程安全的,所以不担心并发场景的问题。

2.2 代码实现

每次将商品压入Redis缓存的时候,另外多创建一个商品的队列。

修改SeckillGoodsPushTask,添加一个pushIds方法,用于将指定商品ID放入到指定的数字中,代码如下:

/***
 * 将商品ID存入到数组中
 * @param len:长度
 * @param id :值
 * @return
 */
public Long[] pushIds(int len,Long id){
    Long[] ids = new Long[len];
    for (int i = 0; i <ids.length ; i++) {
        ids[i]=id;
    }
    return ids;
}

修改SeckillGoodsPushTask的loadGoodsPushRedis方法,添加队列操作,代码如下:

上图代码如下:

//商品数据队列存储,防止高并发超卖
Long[] ids = pushIds(seckillGood.getStockCount(), seckillGood.getId());
redisTemplate.boundListOps("SeckillGoodsCountList_"+seckillGood.getId()).leftPushAll(ids);
//创建一个计数器记录各个商品的库存数量
redisTemplate.boundHashOps("SeckillGoodsCount").increment(seckillGood.getId(),seckillGood.getStockCount());

测试:

重启服务,发现定时任务执行后,在redis缓存已经记录了数据id的队列SeckillGoodsCountList_1、SeckillGoodsCountList_2等等

并且也记录了每个商品的库存数量SeckillGoodsCoun

2.3 超卖控制

修改多线程下单方法,分别修改数量控制,以及售罄后用户抢单排队信息的清理,修改代码如下图:

上图代码如下:

/***
 * 多线程下单操作
 */
@Async
public void createOrder(){
    //从队列中获取排队信息
    SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundListOps("SeckillOrderQueue").rightPop();

    try {
        //从队列中获取一个商品
        Object sgood = redisTemplate.boundListOps("SeckillGoodsCountList_" + seckillStatus.getGoodsId()).rightPop();
        if(sgood==null){
            //清理当前用户的排队信息
            clearQueue(seckillStatus);
            return;
        }

        //时间区间
        String time = seckillStatus.getTime();
        //用户登录名
        String username=seckillStatus.getUsername();
        //用户抢购商品
        Long id = seckillStatus.getGoodsId();

        //获取商品数据
        SeckillGoods goods = (SeckillGoods) redisTemplate.boundHashOps("SeckillGoods_" + time).get(id);

        //如果有库存,则创建秒杀商品订单
        SeckillOrder seckillOrder = new SeckillOrder();
        seckillOrder.setId(idWorker.nextId());
        seckillOrder.setSeckillId(id);
        seckillOrder.setMoney(goods.getCostPrice());
        seckillOrder.setUserId(username);
        seckillOrder.setCreateTime(new Date());
        seckillOrder.setStatus("0");

        //将秒杀订单存入到Redis中
        redisTemplate.boundHashOps("SeckillOrder").put(username,seckillOrder);

        //商品库存-1
        Long surplusCount = redisTemplate.boundHashOps("SeckillGoodsCount").increment(id, -1);//商品数量递减
        goods.setStockCount(surplusCount.intValue());    //根据计数器统计

        //判断当前商品是否还有库存
        if(surplusCount<=0){
            //并且将商品数据同步到MySQL中
            seckillGoodsMapper.updateById(goods);
            //如果没有库存,则清空Redis缓存中该商品
            redisTemplate.boundHashOps("SeckillGoods_" + time).delete(id);
        }else{
            //如果有库存,则直数据重置到Reids中
            redisTemplate.boundHashOps("SeckillGoods_" + time).put(id,goods);
        }
        //抢单成功,更新抢单状态,排队->等待支付
        seckillStatus.setStatus(2);
        seckillStatus.setOrderId(seckillOrder.getId());
        seckillStatus.setMoney(seckillOrder.getMoney().floatValue());
        redisTemplate.boundHashOps("UserQueueStatus").put(username,seckillStatus);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

/***
 * 清理用户排队信息
 * @param seckillStatus
 */
public void clearQueue(SeckillStatus seckillStatus){
    //清理排队标示
    redisTemplate.boundHashOps("UserQueueCount").delete(seckillStatus.getUsername());

    //清理抢单标示
    redisTemplate.boundHashOps("UserQueueStatus").delete(seckillStatus.getUsername());
}

测试:正常秒杀下单

http://localhost:8001/api/seckillOrder/add?id=1&time=2021061914

查看redis中的SeckillGoodsCount发现对应编号的商品减去了库存数量。

3 秒杀订单支付

完成秒杀下订单后,进入支付页面,此时前端会每3秒中向后台发送一次请求用于判断当前用户订单是否完成支付,如果完成了支付,则需要清理掉排队信息,并且需要修改订单状态信息。

3.1 搭建支付模块

创建模块dongyimai-payseckill-service

(1)、修改依赖配置文件pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>dongyimai-service</artifactId>
        <groupId>com.offcn</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>dongyimai-payseckill-service</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- 支付宝支付所需类库包 -->
        <dependency>
            <groupId>com.alipay.sdk</groupId>
            <artifactId>alipay-sdk-java</artifactId>
            <version>4.11.54.ALL</version>
            <exclusions>
                <exclusion>
                    <groupId>org.bouncycastle</groupId>
                    <artifactId>bcprov-jdk15on</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.offcn</groupId>
            <artifactId>dongyimai-common</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>
    </dependencies>

</project>

(2)、配置文件application.yml

server:
  port: 9006
spring:
  application:
    name: pay
  rabbitmq:
    host: 192.168.188.128 #mq的服务器地址
    username: guest #账号
    password: guest #密码
alipay:
  serverUrl: https://openapi.alipaydev.com/gateway.do
  notify-url: http://7r8ukqlrpt.52http.com/pay/notify/url
  appId: 2016091300500103
  privateKey: 用户私钥
  alipayPublicKey: 阿里公钥
  format: json
  charset: utf-8
  signType: RSA2

3.2 创建支付二维码

下单成功后,会跳转到支付选择页面,在支付选择页面要显示订单编号和订单金额,所以我们需要在下单的时候,将订单金额以及订单编号信息存储到用户查询对象中。

选择扫码支付后,会跳转到扫码支付页面,支付页面会根据用户名查看用户秒杀订单,并根据用户秒杀订单的ID创建预支付信息并获取二维码信息,展示给用户看,此时页面每3秒查询一次支付状态,如果支付成功,需要修改订单状态信息。

3.2.1 回显订单号、金额

下单后,进入支付选择页面,需要显示订单号和订单金额,所以需要在用户下单后将该数据传入到pay.html页面,所以查询订单状态的时候,需要将订单号和金额封装到查询的信息中,修改查询订单装的方法加入他们即可。

修改SeckillOrderController的queryStatus方法,代码如下:

上图代码如下:

return new Result(true,seckillStatus.getStatus(),"抢购状态",seckillStatus);

使用Postman测试,效果如下:

http://localhost:8001/api/seckillOrder/query

订单编号为Long型,转换为json字符串出现精度损失

处理方法:在订单状态实体类,订单编号属性值上增加注解,把Long按照字符串进行序列化

//订单号

    @JsonSerialize(using = ToStringSerializer.class)
    private Long orderId;

3.2.2 创建二维码

用户创建二维码,可以先查询用户的秒杀订单抢单信息,然后再发送请求到支付微服务中创建二维码,将订单编号以及订单对应的金额传递到支付微服务:/pay/create/native

修改支付模块dongyimai-payseckill-service

(1)、编写支付接口,定义预下单方法
import java.util.Map;

public interface PayService {
    /**
     * 生成二维码
     * @param out_trade_no
     * @param total_fee
     * @return
     */
    Map<String,String> createNative(Map<String,String> parameters);
   
}
(2)、编写接口实现类

import com.alipay.api.AlipayApiException;
import com.alipay.api.DefaultAlipayClient;
import com.alipay.api.request.AlipayTradePrecreateRequest;
import com.alipay.api.request.AlipayTradeQueryRequest;
import com.alipay.api.response.AlipayTradePrecreateResponse;
import com.alipay.api.response.AlipayTradeQueryResponse;
import com.offcn.pay.service.PayService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
@Service
public class PayServiceImpl implements PayService {

    /**
     * 支付宝gatewayUrl
     */
    @Value("${alipay.serverUrl}")
    private String serverUrl;
    /**
     * 商户应用id
     */
    @Value("${alipay.appId}")
    private String appId;
    /**
     * RSA私钥,用于对商户请求报文加签
     */
    @Value("${alipay.privateKey}")
    private String privateKey;
    /**
     * 支付宝RSA公钥,用于验签支付宝应答
     */
    @Value("${alipay.alipayPublicKey}")
    private String alipayPublicKey;
    /**
     * 签名类型
     */
    @Value("${alipay.signType}")
    private String signType = "RSA2";
    /**
     * 格式
     */
    @Value("${alipay.format}")
    private String formate = "json";
    /**
     * 编码
     */
    @Value("${alipay.charset}")
    private String charset = "UTF-8";

    /**
     * 异步地址
     */
    @Value("${alipay.notify-url}")
    private String notifyUrl;
    @Override
    public Map<String, String> createNative(Map<String, String> parameters) {
        //创建阿里支付客户端请求对象
        DefaultAlipayClient alipayClient = new DefaultAlipayClient(serverUrl, appId, privateKey, formate, charset, alipayPublicKey, signType);
        Map<String,String> map=new HashMap<String, String>();
        //创建预下单请求对象
        AlipayTradePrecreateRequest request = new AlipayTradePrecreateRequest();
        //设置回调地址
        request.setNotifyUrl(notifyUrl);
        //设置预下单请求参数
        request.setBizContent("{" +
                "    \"out_trade_no\":\""+parameters.get("out_trade_no")+"\"," +              
                "    \"total_amount\":\""+parameters.get("total_fee")+"\"," +
                "    \"subject\":\"测试购买商品001\"," +
                "    \"store_id\":\"xa_001\"," +
                "    \"timeout_express\":\"90m\"}");//设置业务参数
        //发出预下单业务请求
        try {
            AlipayTradePrecreateResponse response = alipayClient.execute(request);
            //从相应对象读取相应结果
            String code = response.getCode();
            System.out.println("响应码:"+code);
            //全部的响应结果
            String body = response.getBody();
            System.out.println("返回结果:"+body);

            if(code.equals("10000")){
                map.put("qrcode", response.getQrCode());
                map.put("out_trade_no", response.getOutTradeNo());
                map.put("total_fee",parameters.get("total_fee"));
                System.out.println("qrcode:"+response.getQrCode());
                System.out.println("out_trade_no:"+response.getOutTradeNo());
                System.out.println("total_fee:"+parameters.get("total_fee"));
            }else{
                System.out.println("预下单接口调用失败:"+body);
            }
        } catch (AlipayApiException e) {
            e.printStackTrace();
        }
        return map;
    }

  
}

(3)、编写controller控制器代码

import com.alibaba.fastjson.JSON;
import com.offcn.entity.Result;
import com.offcn.entity.StatusCode;
import com.offcn.pay.service.PayService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import java.io.ByteArrayOutputStream;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("pay")
public class PayController {

    @Autowired
    private PayService payService;
   

    /**
     * 创建二维码连接地址返回给前端 生成二维码图片
     *
     * @param parameters  包含 订单号  包含 金额  包含 queue队列名称 交换机信息 路由信息 用户名
     * @return
     */
    @RequestMapping("/create/native")
    public Result<Map> createNative(@RequestParam Map<String,String> parameters) {
        //获取用户名

        Map<String, String> resultMap = payService.createNative(parameters);

        return new Result<Map>(true, StatusCode.OK, "二维码连接地址创建成功", resultMap);
    }

    
}

(4)、编写主启动类PaySeckillApplication

@SpringBootApplication
public class PaySeckillApplication {

    public static void main(String[] args) {
        SpringApplication.run(PaySeckillApplication.class,args);
    }
}

使用Postman测试效果如下:

http://localhost:9006/pay/create/native?out_trade_no=1132510782836314121&total_fee=1

根据返回的qrcode,可以生成支付宝扫码的二维码:

使用沙箱版支付宝扫码,即可弹出支付界面。

3.3 支付流程分析

如上图,步骤分析如下:

1.用户抢单,经过秒杀系统实现抢单,下单后会将向MQ发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到
2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断Redis缓存中是否存在订单信息,如果存在,则回滚
3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单持久化到MySQL,如果没完成,清理排队信息回滚库存
4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,支付宝平台会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给MQ,第3个步骤就可以监听到消息了。

3.4 支付回调更新

支付回调这一块代码已经实现了,但之前实现的是订单信息的回调数据发送给MQ,指定了对应的队列,不过现在需要实现的是秒杀信息发送给指定队列,所以之前的代码那块需要动态指定队列。

3.4.1 支付回调队列指定

关于指定队列如下:

1.创建支付二维码需要指定队列
2.回调地址回调的时候,获取支付二维码指定的队列,将支付信息发送到指定队列中

在支付宝支付统一下单API中,有一个附加参数,如下:

body:附加数据,String(127),在查询API和支付通知中原样返回,可作为自定义参数使用。

我们可以在创建二维码的时候,指定该参数,该参数用于指定回调支付信息的对应队列,每次回调的时候,会获取该参数,然后将回调信息发送到该参数对应的队列去。

3.4.1.1 在预下单时传递队列信息

修改支付微服务的PayServiceImpl的createNative方法,代码如下:

 @Override
    public Map<String, String> createNative(Map<String, String> parameters) {
        //创建阿里支付客户端请求对象
        DefaultAlipayClient alipayClient = new DefaultAlipayClient(serverUrl, appId, privateKey, formate, charset, alipayPublicKey, signType);
        Map<String,String> map=new HashMap<String, String>();
        //创建预下单请求对象
        AlipayTradePrecreateRequest request = new AlipayTradePrecreateRequest();
        request.setNotifyUrl(notifyUrl);
        request.setBizContent("{" +
                "    \"out_trade_no\":\""+parameters.get("out_trade_no")+"\"," +
                "    \"body\":\"queue="+parameters.get("queue")+"&username="+parameters.get("username")+"&routingkey="+parameters.get("routingkey")+"&exchange="+parameters.get("exchange")+"\"," +
                "    \"total_amount\":\""+parameters.get("total_fee")+"\"," +
                "    \"subject\":\"测试购买商品001\"," +
                "    \"store_id\":\"xa_001\"," +
                "    \"timeout_express\":\"90m\"}");//设置业务参数
        //发出预下单业务请求
        try {
            AlipayTradePrecreateResponse response = alipayClient.execute(request);
            //从相应对象读取相应结果
            String code = response.getCode();
            System.out.println("响应码:"+code);
            //全部的响应结果
            String body = response.getBody();
            System.out.println("返回结果:"+body);

            if(code.equals("10000")){
                map.put("qrcode", response.getQrCode());
                map.put("out_trade_no", response.getOutTradeNo());
                map.put("total_fee",parameters.get("total_fee"));
                System.out.println("qrcode:"+response.getQrCode());
                System.out.println("out_trade_no:"+response.getOutTradeNo());
                System.out.println("total_fee:"+parameters.get("total_fee"));
            }else{
                System.out.println("预下单接口调用失败:"+body);
            }
        } catch (AlipayApiException e) {
            e.printStackTrace();
        }
        return map;
    }

我们创建二维码的时候,需要将下面几个参数传递过去

username:用户名,可以根据用户名查询用户排队信息
out_trade_no:商户订单号,下单必须
total_fee:支付金额,支付必须
queue:队列名字,回调的时候,可以知道将支付信息发送到哪个队列
routingkey:路由的名称
exchange:交换机名称

修改主启动类PayApplication,添加对应队列以及对应交换机绑定,代码如下:


@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableEurekaClient
public class PaySeckillApplication {
    public static void main(String[] args) {
        SpringApplication.run(PaySeckillApplication.class,args);
    }


    @Autowired
    private Environment env;

    
    //创建秒杀队列
    @Bean(name="seckillQueue")
    public Queue createSeckillQueue(){
        return new Queue(env.getProperty("mq.pay.queue.seckillorder"));
    }

    //创建秒杀交换机

    @Bean(name="seckillExchanage")
    public DirectExchange basicSeckillExchanage(){
        return new DirectExchange(env.getProperty("mq.pay.exchange.seckillorder"));
    }

    //绑定秒杀

    @Bean(name="SeckillBinding")
    public Binding basicSeckillBinding(){
        return  BindingBuilder.bind(createSeckillQueue()).to(basicSeckillExchanage()).with(env.getProperty("mq.pay.routing.seckillkey"));
    }
}

修改application.yml,添加如下配置

#位置支付交换机和队列
mq:
  pay:
    exchange:     
      seckillorder: exchange.seckillorder
    queue:     
      seckillorder: queue.seckillorder
    routing:     
      seckillkey: queue.seckillorder
3.4.1.2 测试

使用Postman创建二维码测试

http://localhost:9006/pay/create/native?username=test001&out_trade_no=1132510782836314121&total_fee=1&queue=queue.seckillorder&routingkey=queue.seckillorder&exchange=exchange.seckillorder

以后每次支付,都需要带上对应的参数,包括前面的订单支付。

3.4.1.3 改造支付回调方法

修改PayController的notifyUrl方法,获取自定义参数,并转成Map,获取queue地址,并将支付信息发送到绑定的queue中,代码如下:

//接收支付宝网关回调数据
    @RequestMapping("/notify/url")
    public String reciveResult(HttpServletRequest request){
        //准备一个map封装全部的请求参数和值
        Map<String,String> map=new HashMap<>();
        //获取全部的请求参数名称
        Enumeration<String> parameterNames = request.getParameterNames();
        while (parameterNames.hasMoreElements()){
            String paramName = parameterNames.nextElement();
            //封装参数名和参数值到map
            map.put(paramName,request.getParameter(paramName));
        }

        //转换map集合为json字符串
        String jsonString = JSON.toJSONString(map);
        System.out.println("响应结果:"+jsonString);

        //获取body自定义参数
        String body = map.get("body");
        //body参数多个用&隔开
        String[] splits = body.split("&");
        //创建一个map封装body的参数
        Map<String,String> bodyMap=new HashMap<>();
        for (String split : splits) {
            //继续用=号切开
            String[] parms = split.split("=");
            bodyMap.put(parms[0],parms[1]);
        }

        //发送消息
        rabbitTemplate.convertAndSend(bodyMap.get("exchange"),bodyMap.get("routingkey"),jsonString);
        return "pay-success";

    }

配置回调地址:application.yml

alipay:  
  notify-url: http://7r8ukqlrpt.52http.com/pay/notify/url
3.4.1.4 内网穿透

支付异步通知需要独立ip使阿里支付成功后可以回调我们的接口,所以前提条件就是内网穿透。

https://www.cpolar.com/

如何使用:

(1)、下载 cpolar

https://www.cpolar.com/static/downloads/releases/latest/cpolar_amd64.msi

(2)、安装:

在Linux或OSX上,您可以使用以下命令从终端解压缩cpolar。 在Windows上,只需双击安装

(3)、连接您的帐户:

运行此命令会将您帐户的authtoken添加到您的cpolar.yml文件中。 这将为您提供更多功能,所有打开的隧道将在此处的仪表板中列出。

cpolar authtoken YzQxMzFiY2YtY2I1NC*****LTg2NzAtZjJkMTRmYjQ0ZDRh

具体的token值:登录网站从控制面板获取:https://dashboard.cpolar.com/auth

(4)、开启端口映射
cpolar http 80

在80端口开启映射。

测试内网穿透:访问回调接口接口

http://6ca8e8f6.cpolar.io/pay/notify/url?id=111&name=wwww

扫描支付二维码,支付宝即可回调 接口

控制台可以查看相关信息

3.4.2 支付状态监听

支付状态通过回调地址发送给MQ之后,我们需要在秒杀系统中监听支付信息,如果用户已支付,则修改用户订单状态,如果支付失败,则直接删除订单,回滚库存。

在秒杀工程中创建com.offcn.seckill.consumer.SeckillOrderPayMessageListener,实现监听消息,代码如下:

@Component
@RabbitListener(queues = "${mq.pay.queue.seckillorder}")
public class SeckillOrderPayMessageListener {


    /**
     * 监听消费消息
     * @param message
     */
    @RabbitHandler
    public void consumeMessage(@Payload String message){
        System.out.println(message);
        //将消息转换成Map对象
        Map<String,String> resultMap = JSON.parseObject(message,Map.class);
        System.out.println("监听到的消息:"+resultMap);
    }
}

修改SeckillApplication创建对应的队列以及绑定对应交换机。

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@MapperScan(basePackages = {"com.offcn.seckill.dao"})
@EnableScheduling
@EnableAsync
public class SeckillApplication {


    public static void main(String[] args) {
        SpringApplication.run(SeckillApplication.class,args);
    }

    @Bean
    public IdWorker idWorker(){
        return new IdWorker(1,1);
    }

   @Autowired
    private Environment env;


   
    /***
     * 创建秒杀队列
     * @return
     */
    @Bean(name = "queueSeckillOrder")
    public Queue queueSeckillOrder(){
        return new Queue(env.getProperty("mq.pay.queue.seckillorder"), true);
    }

    //创建秒杀交换机
    @Bean
    public DirectExchange directExchangeOrder(){
        return new DirectExchange(env.getProperty("mq.pay.exchange.seckillorder"));
    }
 

    /****
     * 队列绑定到交换机上
     * @return
     */
    @Bean
    public Binding basicBindingSeckillOrder(){
        return BindingBuilder
                .bind(queueSeckillOrder())
                .to(directExchangeOrder())
                .with(env.getProperty("mq.pay.routing.seckillkey"));
    }
}

修改application.yml文件,添加如下配置:

#位置支付交换机和队列
mq:
  pay:
    exchange:     
      seckillorder: exchange.seckillorder
    queue:     
      seckillorder: queue.seckillorder
    routing:     
      seckillkey: queue.seckillorder

3.4.3 修改订单状态

监听到支付信息后,根据支付信息判断,如果用户支付成功,则修改订单信息,并将订单入库,删除用户排队信息,如果用户支付失败,则删除订单信息,回滚库存,删除用户排队信息。

3.4.3.1 业务层

修改SeckillOrderService,添加修改订单方法,代码如下

/***
 * 更新订单状态
 * @param out_trade_no
 * @param transaction_id
 * @param username
 */
void updatePayStatus(String out_trade_no, String transaction_id,String username);

修改SeckillOrderServiceImpl,添加修改订单方法实现,代码如下:

/***
 * 更新订单状态
 * @param out_trade_no
 * @param transaction_id
 * @param username
 */
@Override
public void updatePayStatus(String out_trade_no, String transaction_id,String username) {
    //订单数据从Redis数据库查询出来
    SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
    //修改状态
    seckillOrder.setStatus("1");
    //设置交易流水号
        seckillOrder.setTransactionId(transaction_id);
    //支付时间
    seckillOrder.setPayTime(new Date());
    //同步到MySQL中
    seckillOrderMapper.insert(seckillOrder);

    //清空Redis缓存
    redisTemplate.boundHashOps("SeckillOrder").delete(username);

    //清空用户排队数据
    redisTemplate.boundHashOps("UserQueueCount").delete(username);

    //删除抢购状态信息
    redisTemplate.boundHashOps("UserQueueStatus").delete(username);
}

修改订单实体类的id生成方式,为外面生成

3.4.3.2 修改订单对接

修改扫码支付状态监听的代码,当用户支付成功后,修改订单状态,也就是调用上面的方法,代码如下:

代码:

package com.offcn.seckill.consumer;

import com.alibaba.fastjson.JSON;
import com.offcn.seckill.service.SeckillOrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
@RabbitListener(queues = "${mq.pay.queue.seckillorder}")
public class SeckillOrderPayMessageListener {
    @Autowired
    private SeckillOrderService seckillOrderService;
    /**
     * 监听消费消息
     * @param message
     */
    @RabbitHandler
    public void consumeMessage(@Payload String message){
        System.out.println(message);
        //将消息转换成Map对象
        Map<String,String> resultMap = JSON.parseObject(message, Map.class);
        System.out.println("监听到的消息:"+resultMap);
        //获取交易状态
    String trade_status=    resultMap.get("trade_status");
    //判断交易状态是否等于成功
        if(trade_status!=null&&trade_status.equalsIgnoreCase("TRADE_SUCCESS")){
            String body=  resultMap.get("body");
            Map<String, String> bodyMap = new HashMap<>();
            if(resultMap.get("body")!=null) {
                String[] splits = body.split("&");
                for (String split : splits) {
                    String[] vs = split.split("=");
                    bodyMap.put(vs[0], vs[1]);
                }
            }
            seckillOrderService.updatePayStatus(resultMap.get("out_trade_no"),resultMap.get("trade_no"),bodyMap.get("username"));
        }else {
            //支付失败,删除订单
        }
    }
}

测试:

使用Postman完整请求创建二维码下单测试一次。

商品ID:1

剩余库存数量:10

http://localhost:8001/api/seckillGoods/one?id=1&time=2021061916

下单:

http://localhost:8001/api/seckillOrder/add?time=2021061916&id=1

下单后,Redis数据

下单查询:

http://localhost:8001/api/seckillOrder/query

创建二维码:

http://localhost:9006/pay/create/native?username=dongyimai&out_trade_no=1375043168986337280&total_fee=0.01&queue=queue.seckillorder&routingkey=queue.seckillorder&exchange=exchange.seckillorder

注意:username 用登录的账号 out_trade_no 用下单成功的订单号

扫码:

使用返回的预下单url生成二维码

使用沙箱支付宝扫描二维码,完成支付

查看订单状态:

http://localhost:9005/seckillOrder/query

3.4.4 删除订单回滚库存

如果用户支付失败,我们需要删除用户订单数据,并回滚库存。

3.4.4.1 业务层实现

修改SeckillOrderService,创建一个关闭订单方法,代码如下:

/***
 * 关闭订单,回滚库存
 */
void closeOrder(String username);

修改SeckillOrderServiceImpl,创建一个关闭订单实现方法,代码如下:

/***
 * 关闭订单,回滚库存
 * @param username
 */
@Override
public void closeOrder(String username) {
    //将消息转换成SeckillStatus
    SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps("UserQueueStatus").get(username);
    //获取Redis中订单信息
    SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);

    //如果Redis中有订单信息,说明用户未支付
    if(seckillStatus!=null && seckillOrder!=null){
        //删除订单
        redisTemplate.boundHashOps("SeckillOrder").delete(username);
        //回滚库存
        //1)从Redis中获取该商品
        SeckillGoods seckillGoods = (SeckillGoods) redisTemplate.boundHashOps("SeckillGoods_"+seckillStatus.getTime()).get(seckillStatus.getGoodsId());

        //2)如果Redis中没有,则从数据库中加载
        if(seckillGoods==null){
            seckillGoods = seckillGoodsMapper.selectById(seckillStatus.getGoodsId());
        }

        //3)数量+1  (递增数量+1,队列数量+1)
        Long surplusCount = redisTemplate.boundHashOps("SeckillGoodsCount").increment(seckillStatus.getGoodsId(), 1);
        seckillGoods.setStockCount(surplusCount.intValue());
        redisTemplate.boundListOps("SeckillGoodsCountList_" + seckillStatus.getGoodsId()).leftPush(seckillStatus.getGoodsId());

        //4)数据同步到Redis中
        redisTemplate.boundHashOps("SeckillGoods_"+seckillStatus.getTime()).put(seckillStatus.getGoodsId(),seckillGoods);

        //清理排队标示
        redisTemplate.boundHashOps("UserQueueCount").delete(seckillStatus.getUsername());

        //清理抢单标示
        redisTemplate.boundHashOps("UserQueueStatus").delete(seckillStatus.getUsername());
    }
}
3.4.4.2 调用删除订单

修改SeckillOrderPayMessageListener,在用户支付失败后调用关闭订单方法,代码如下:

}else{
            //支付失败,关闭订单
            seckillOrderService.closeOrder(bodyMap.get("username"));
}
3.4.4.3 测试

使用Postman完整请求创建二维码下单测试一次。

商品ID:1

剩余库存数量:10

http://localhost:9005/seckillGoods/one?id=1&time=2021032518

下单:

http://localhost:9005/seckillOrder/add?time=2021032518&id=1

下单后,Redis数据

下单查询:

http://localhost:9005/seckillOrder/query

创建二维码:

http://localhost:9006/pay/create/native?username=test001&out_trade_no=1375043168986337280&total_fee=0.01&queue=queue.seckillorder&routingkey=queue.seckillorder&exchange=exchange.seckillorder

秒杀抢单后,商品数量变化:

http://localhost:9005/seckillGoods/one?id=1&time=2021032518

4 RabbitMQ延时消息队列

4.1 延时队列介绍

延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。 那么,为什么需要延迟消费呢?我们来看以下的场景

网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网) 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会 系统中的业务失败之后,需要重试 这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会。那么一天之中肯定是会有很多个预约的,时间也是不一定的,假设现在有1点 2点 3点 三个预约,如何让系统知道在当前时间等于0点 1点 2点给用户发送信息呢,是不是需要一个轮询,一直去查看所有的预约,比对当前的系统时间和预约提前一小时的时间是否相等呢?这样做非常浪费资源而且轮询的时间间隔不好控制。如果我们使用延时消息队列呢,我们在创建时把需要通知的预约放入消息中间件中,并且设置该消息的过期时间,等过期时间到达时再取出消费即可。

Rabbitmq实现延时队列一般而言有两种形式: 第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)[A队列过期->转发给B队列]

第二种方式:利用rabbitmq中的插件x-delay-message

4.2 TTL DLX实现延时队列

4.2.1 TTL DLX介绍

TTL RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

Dead Letter Exchanges(DLX) RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。 x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange

x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

4.2.3 DLX延时队列实现

4.2.3.1 创建工程

创建springboot_rabbitmq_delay工程,并引入相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>dongyimai_parent</artifactId>
        <groupId>com.offcn</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>springboot_rabbitmq_delay</artifactId>

    <dependencies>
        <!--starter-web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--加入ampq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!--测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

application.yml配置

spring:
  application:
    name: springboot-demo
  rabbitmq:
    host: 192.168.188.128
    port: 5672
    password: guest
    username: guest
4.2.3.2 队列创建

创建2个队列,用于接收消息的叫延时队列queue.message.delay,用于转发消息的队列叫queue.message,同时创建一个交换机,代码如下:

@Configuration
public class QueueConfig {

    /** 短信发送队列 */
    public static final String QUEUE_MESSAGE = "queue.message";

    /** 交换机 */
    public static final String DLX_EXCHANGE = "dlx.exchange";

    /** 短信发送队列 延迟缓冲(按消息) */
    public static final String QUEUE_MESSAGE_DELAY = "queue.message.delay";

    /**
     * 接收死信队列
     * @return
     */
    @Bean
    public Queue messageQueue() {
        //允许长期存储消息
        return new Queue(QUEUE_MESSAGE, true);
    }

    /**
     * 接收延时消息队列
     * @return
     */
    @Bean
    public Queue delayMessageQueue() {
        return QueueBuilder.durable(QUEUE_MESSAGE_DELAY)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)        // 消息超时进入死信队列,绑定死信队列交换机
                .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGE)   // 绑定指定的routing-key
                .build();
    }

    /***
     * 创建交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DLX_EXCHANGE);
    }


    /***
     * 交换机与队列绑定
     * @param messageQueue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding basicBinding(Queue messageQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(messageQueue)
                .to(directExchange)
                .with(QUEUE_MESSAGE);
    }
}
4.2.3.3 消息监听

创建MessageListener用于监听消息,代码如下:

@Component
@RabbitListener(queues = QueueConfig.QUEUE_MESSAGE)
public class MessageListener {


    /***
     * 监听消息
     * @param msg
     */
    @RabbitHandler
    public void msg(@Payload Object msg){
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("当前时间:"+dateFormat.format(new Date()));
        System.out.println("收到信息:"+msg);
    }

}

注意:@Payload 和 @Headers 注解可以分别接收消息中的 body 与 headers 信息

@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用 @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型。

4.2.3.4 创建启动类
@SpringBootApplication
@EnableRabbit
public class SpringRabbitMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringRabbitMQApplication.class,args);
    }
}
4.2.3.5 测试
@SpringBootTest(classes = SpringRabbitMQApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMQTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /***
     * 发送消息
     */
    @Test
    public void sendMessage() throws InterruptedException, IOException {
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("发送当前时间:"+dateFormat.format(new Date()));
        Map<String,String> message = new HashMap<>();
        message.put("name","offcn");
        rabbitTemplate.convertAndSend(QueueConfig.QUEUE_MESSAGE_DELAY, message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        });

        System.in.read();
    }
}

其中message.getMessageProperties().setExpiration(“10000”);设置消息超时时间,超时后,会将消息转入到另外一个队列。

测试效果如下:

接收时间延迟了10秒,在死信队列收到消息

5 超时未完成支付、自动删除订单库存回滚(作业)

5.1 秒杀流程回顾

如上图,步骤分析如下:

1.用户抢单,经过秒杀系统实现抢单,下单后会将向MQ发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到
2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断Redis缓存中是否存在订单信息,如果存在,则回滚
3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单持久化到MySQL,如果没完成,清理排队信息回滚库存
4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,支付宝平台会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给MQ,第3个步骤就可以监听到消息了。

延时队列实现订单关闭回滚库存:

1.创建一个过期队列  Queue1
2.接收消息的队列    Queue2
3.中转交换机
4.监听Queue2
	1)SeckillStatus->检查Redis中是否有订单信息
	2)如果有订单信息,调用删除订单回滚库存->[需要先关闭扫码支付]
	3)如果关闭订单时,用于已支付,修改订单状态即可
	4)如果关闭订单时,发生了别的错误,记录日志,人工处理

5.2 关闭支付

用户如果半个小时没有支付,我们会关闭支付订单,但在关闭之前,需要先关闭扫码支付,防止中途用户支付。

修改支付微服务的PayService,添加关闭支付方法,代码如下:

/***
 * 关闭支付
 * @param orderId
 * @return
 */
Map<String,String> closePay(Long out_trade_no) throws Exception;

修改PayServiceImpl,实现关闭扫码支付方法,代码如下:

@Override
    public Map<String, String> closePay(Long out_trade_no) throws Exception {
        //创建阿里支付客户端请求对象
        DefaultAlipayClient alipayClient = new DefaultAlipayClient(serverUrl, appId, privateKey, formate, charset, alipayPublicKey, signType);
        Map<String,String> map=new HashMap<String, String>();
        //撤销交易请求对象
        AlipayTradeCancelRequest request = new AlipayTradeCancelRequest();
        request.setBizContent("{" +
                "    \"out_trade_no\":\""+out_trade_no+"\"," +
                "    \"trade_no\":\"\"}"); //设置业务参数

        try {
            AlipayTradeCancelResponse response = alipayClient.execute(request);
            String code=response.getCode();

            if(code.equals("10000")){

                System.out.println("返回值:"+response.getBody());
                map.put("code", code);
                map.put("out_trade_no", out_trade_no+"");
                return map;
            }
        } catch (AlipayApiException e) {
            e.printStackTrace();
            return null;
        }

        return null;
    }

修改PayController,增加关闭服务方法

//关闭支付预下单
    @PostMapping("closepay")
    public Result closePay(Long out_trade_no){
        try {
            Map<String, String> resultMap = payService.closePay(out_trade_no);
            if(resultMap!=null&&resultMap.get("code")!=null&&resultMap.get("code").equals("10000")){
                //成功关闭,就返回结果
                return new Result(true,StatusCode.OK,"订单关闭成功",resultMap);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return new Result(false,StatusCode.ERROR,"订单关闭失败");
        }
        return new Result(false,StatusCode.ERROR,"订单关闭失败");
    }

测试:

http://localhost:9006/pay/closepay?out_trade_no=1406451841629294592

5.3 关闭订单回滚库存

5.3.1 配置延时队列

在application.yml文件中引入队列信息配置,如下:

#位置支付交换机和队列
mq:
  pay:
    exchange:
      seckillorder: exchange.seckillorder
      seckillordertimer: exchange.seckillordertimer
    queue:
      seckillorder: queue.seckillorder
      seckillordertimer: queue.seckillordertimer
      seckillordertimerdelay: queue.seckillordertimerdelay
    routing:
      seckillkey: queue.seckillorder
      seckillordertimerkey: queue.seckillordertime

配置队列与交换机,在SeckillApplication中添加如下方法

/**
     * 到期数据队列
     * @return
     */
    @Bean
    public Queue seckillOrderTimerQueue() {
        return new Queue(env.getProperty("mq.pay.queue.seckillordertimer"), true);
    }

    /**
     * 超时数据队列
     * @return
     */
    @Bean
    public Queue delaySeckillOrderTimerQueue() {
        return QueueBuilder.durable(env.getProperty("mq.pay.queue.seckillordertimerdelay"))
                .withArgument("x-dead-letter-exchange", env.getProperty("mq.pay.exchange.seckillordertimer"))        // 消息超时进入死信队列,绑定死信队列交换机
                .withArgument("x-dead-letter-routing-key", env.getProperty("mq.pay.routing.seckillordertimerkey"))   // 绑定指定的routing-key
                .build();
    }

    //创建延时交换机
    @Bean
    public DirectExchange directExchangeOrderTimer(){
        return new DirectExchange(env.getProperty("mq.pay.exchange.seckillordertimer"));
    }


    /***
     * 交换机与队列绑定
     * @param
     * @param
     * @return
     */
    @Bean
    public Binding basicBindingOrderTime() {
        return BindingBuilder.bind(seckillOrderTimerQueue())
                .to(directExchangeOrderTimer())
                .with(env.getProperty("mq.pay.routing.seckillordertimerkey"));
    }

5.3.2 发送延时消息

修改MultiThreadingCreateOrder,添加如下方法:

/***
 * 发送延时消息到RabbitMQ中
 * @param seckillStatus
 */
public void sendTimerMessage(SeckillStatus seckillStatus){
    rabbitTemplate.convertAndSend(env.getProperty("mq.pay.queue.seckillordertimerdelay"), (Object) JSON.toJSONString(seckillStatus), new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setExpiration("10000");
            return message;
        }
    });
}

在createOrder方法中调用上面方法,如下代码:

//发送延时消息到MQ中
sendTimerMessage(seckillStatus);

5.3.3 Feign调用支付服务关闭预下单接口

在模块dongyimai-seckill-service创建feign接口

package com.offcn.seckill.feign;

import com.offcn.entity.Result;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(value = "PAY")
public interface PayFeign {

    @PostMapping("/pay/closepay")
    public Result closePay(@RequestParam("out_trade_no") Long out_trade_no);
}

注意修改配置文件application.yml设置feign调用超时时间

feign:
  hystrix:
    enabled: true
  client:
    config:
      default:
        connectTimeout: 5000  #连接超时时间5秒
        readTimeout: 5000     #读超时时间5秒
#hystrix 配置
hystrix:
  command:
    default:
      execution:
        timeout:
          #如果enabled设置为false,则请求超时交给ribbon控制
          enabled: true
        isolation:
          thread:
            timeoutInMilliseconds: 10000
          strategy: SEMAPHORE

5.3.4 库存回滚

创建SeckillOrderDelayMessageListener实现监听消息,并回滚库存,代码如下:

package com.offcn.seckill.consumer;

import com.alibaba.fastjson.JSON;
import com.offcn.entity.Result;
import com.offcn.seckill.bean.SeckillStatus;
import com.offcn.seckill.feign.PayFeign;
import com.offcn.seckill.pojo.SeckillOrder;
import com.offcn.seckill.service.SeckillOrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = "${mq.pay.queue.seckillordertimer}")
public class SeckillOrderDelayMessageListener {

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private SeckillOrderService seckillOrderService;

    @Autowired
    private PayFeign payFeign;

    /***
     * 读取消息
     * 判断Redis中是否存在对应的订单
     * 如果存在,则关闭支付,再关闭订单
     * @param message
     */
    @RabbitHandler
    public void consumeMessage(@Payload String message){
        //读取消息
        SeckillStatus seckillStatus = JSON.parseObject(message,SeckillStatus.class);

        //获取Redis中订单信息
        String username = seckillStatus.getUsername();
        SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);

        //如果Redis中有订单信息,说明用户未支付
        if(seckillOrder!=null){
            System.out.println("准备回滚---"+seckillStatus);
            //关闭支付
            Result closeResult = payFeign.closePay(seckillStatus.getOrderId());
            Map<String,String> closeMap = (Map<String, String>) closeResult.getData();

            if(closeMap!=null && closeMap.get("code").equalsIgnoreCase("10000")){
                //关闭订单
                seckillOrderService.closeOrder(username);
            }
        }
    }
}

测试:

http://localhost:8001/api/seckillOrder/add?time=2021062012&id=1

测试下单,10秒内未完成支付,即可收到延时消息,回滚订单


avatar
青山
悟已往之不谏 知来者之可追
一言
今日诗词
站点信息
本站访客数 :
本站总访问量 :