新增自动完成服务包订单延迟队列

This commit is contained in:
wucongxing8150 2024-04-17 17:52:33 +08:00
parent f040795d9a
commit 581111bdbe
3 changed files with 286 additions and 1 deletions

View File

@ -0,0 +1,236 @@
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use App\Amqp\Producer\AutoCompleteServicePackageDelayDirectProducer;
use App\Model\OrderServicePackage;
use App\Utils\Log;
use App\Utils\Utils;
use Hyperf\Amqp\Message\ConsumerDelayedMessageTrait;
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Producer;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\DbConnection\Db;
use PhpAmqpLib\Message\AMQPMessage;
/**
* 自动完成服务包订单
* 延迟队列
*/
#[Consumer(nums: 1)]
class AutoCompleteServicePackageDelayDirectConsumer extends ConsumerMessage
{
use ProducerDelayedMessageTrait;
use ConsumerDelayedMessageTrait;
protected string $exchange = 'amqp.delay.direct';
protected ?string $queue = 'auto.complete.service.package.delay.queue';
protected string $type = Type::DIRECT; //Type::FANOUT;
protected string|array $routingKey = 'AutoCompleteServicePackage';
public function consumeMessage($data, AMQPMessage $message): string
{
Log::getInstance("queue-AutoCompleteServicePackage")->info(json_encode($data, JSON_UNESCAPED_UNICODE));
try {
// 检测执行次数
$Utils = new Utils();
$redis_key = "CompleteServicePackage" . $data['order_no'];
$res = $Utils->checkHandleNumber($redis_key);
if (!$res) {
Log::getInstance("queue-AutoCompleteServicePackage")->error("超出最大执行次数或检测错误");
return Result::ACK;
}
}catch (\Throwable $e){
Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage());
return Result::REQUEUE;
}
try {
// 检测入参参数
$res = $this->detectInputParameters($data);
if (!$res){
Log::getInstance("queue-AutoCompleteServicePackage")->error("入参错误" );
return Result::ACK;
}
// 获取订单数据
$params = array();
$params['order_service_no'] = $data['order_no'];
$order_service_package = OrderServicePackage::getOne($params);
if (empty($order_service_package)){
Log::getInstance("queue-AutoCompleteServicePackage")->error("入参错误" );
return Result::ACK;
}
// 检测订单状态
$res = $this->detectOrderStatus($order_service_package);
if (!$res){
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单状态错误" );
return Result::ACK;
}
// 检测订单结束时间
$res = $this->detectOrderFinishTime($order_service_package);
if (!$res){
// 未到结束时间,重新加入队列
$res = $this->addQueue($order_service_package);
if (!$res){
// 重新添加队列失败
return Result::REQUEUE;
}
return Result::DROP;
}
}catch (\Throwable $e){
Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage());
return Result::REQUEUE;
}
// 处理业务
Db::beginTransaction();
try {
// 处理服务包订单为已完成
$data = array();
$data['order_service_status'] = 4;// 订单状态1:待支付 2:未开始 3:服务中 4:服务完成 5:服务取消)
$params = array();
$params['order_service_id'] = $order_service_package['order_service_id'];
OrderServicePackage::edit($params,$data);
Db::commit();
}catch (\Throwable $e){
Db::rollBack();
Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage());
return Result::REQUEUE;
}
// 发送消息
try {
}catch (\Throwable $e){
Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage());
return Result::ACK;
}
return Result::ACK;
}
/**
* 检测入参参数
* @param array $data
* @return bool
*/
public function detectInputParameters(array $data): bool
{
if (empty($data['order_no'])){
return false;
}
return true;
}
/**
* 检测订单状态
* @param array|object $order_service_package
* @return bool
*/
public function detectOrderStatus(array|object $order_service_package): bool
{
if ($order_service_package['order_service_status'] == 1){
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单未支付" );
return false;
}
if ($order_service_package['order_service_status'] == 2){
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单未开始" );
return false;
}
if ($order_service_package['order_service_status'] == 4){
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单已完成" );
return false;
}
if ($order_service_package['order_service_status'] == 5){
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单已取消" );
return false;
}
// 订单退款状态0:无退款 1:申请退款 2:退款中 3:退款成功 4:拒绝退款 5:退款关闭 6:退款异常)
if (!in_array($order_service_package['refund_status'],[0,4,5])){
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单退款中" );
return false;
}
// 订单支付状态
if ($order_service_package['pay_status'] != 2){
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单未支付" );
return false;
}
return true;
}
/**
* 检测订单结束时间
* @param array|object $order_service_package
* @return bool
*/
public function detectOrderFinishTime(array|object $order_service_package): bool
{
$finish_time = strtotime($order_service_package['finish_time']);
$diff_time = time() - $finish_time;
if ($diff_time < 0 && ($diff_time > -7200)){
// 负数,表示还未到完成时间。重新加入队列
return false;
}
// 正数,已到或已超完成时间。可以取消。
return true;
}
/**
* 重新加入队列
* @param array|object $order_service_package
* @return bool
*/
public function addQueue(array|object $order_service_package): bool
{
try {
$finish_time = strtotime($order_service_package['finish_time']);
// 结束时间大于当前时间
if ($finish_time > time()){
$time = $finish_time - time();
$queue_data = array();
$queue_data['order_no'] = $order_service_package['order_service_no'];
$message = new AutoCompleteServicePackageDelayDirectProducer($queue_data);
$message->setDelayMs(1000 * $time);
$producer = $this->container->get(Producer::class);
$res = $producer->produce($message);
if (!$res) {
return false;
}
}
}catch (\Throwable $e){
Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage());
return false;
}
return true;
}
}

View File

@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
use Hyperf\Amqp\Message\ProducerMessage;
use Hyperf\Amqp\Message\Type;
/**
* 自动完成服务包订单
* 延迟队列
*/
#[Producer]
class AutoCompleteServicePackageDelayDirectProducer extends ProducerMessage
{
use ProducerDelayedMessageTrait;
protected string $exchange = 'amqp.delay.direct';
protected string $type = Type::DIRECT;
protected string|array $routingKey = 'AutoCompleteServicePackage';
public function __construct($data)
{
$this->payload = $data;
}
}

View File

@ -4,14 +4,16 @@ declare(strict_types=1);
namespace App\Command;
use App\Amqp\Producer\AutoCompleteServicePackageDelayDirectProducer;
use App\Model\OrderServicePackage;
use Hyperf\Amqp\Producer;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\DbConnection\Db;
use Psr\Container\ContainerInterface;
#[Command]
class AddServicePackageFinishQueueCommand extends HyperfCommand
class AddServicePackageCompleteQueueCommand extends HyperfCommand
{
public function __construct(protected ContainerInterface $container)
{
@ -47,6 +49,21 @@ class AddServicePackageFinishQueueCommand extends HyperfCommand
$this->putAddFinishStatus($order_service_package['order_service_id'],1);
// 添加服务包订单完成延迟队列
$finish_time = strtotime($order_service_package['finish_time']);
$time = $finish_time - time();
$queue_data = array();
$queue_data['order_no'] = $order_service_package['order_service_no'];
$message = new AutoCompleteServicePackageDelayDirectProducer($queue_data);
$message->setDelayMs(1000 * $time);
$producer = $this->container->get(Producer::class);
$res = $producer->produce($message);
if (!$res) {
Db::rollBack();
$this->line("添加队列失败");
}
Db::commit();
}catch (\Throwable $e){