237 lines
7.6 KiB
PHP
237 lines
7.6 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace App\Amqp\Consumer;
|
||
|
||
use App\Amqp\Producer\AutoCompleteServicePackageDelayDirectProducer;
|
||
use App\Model\OrderServicePackage;
|
||
use App\Utils\Log;
|
||
use App\Utils\Utils;
|
||
use Hyperf\Amqp\Message\ConsumerDelayedMessageTrait;
|
||
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
|
||
use Hyperf\Amqp\Message\Type;
|
||
use Hyperf\Amqp\Producer;
|
||
use Hyperf\Amqp\Result;
|
||
use Hyperf\Amqp\Annotation\Consumer;
|
||
use Hyperf\Amqp\Message\ConsumerMessage;
|
||
use Hyperf\DbConnection\Db;
|
||
use PhpAmqpLib\Message\AMQPMessage;
|
||
|
||
/**
|
||
* 自动完成服务包订单
|
||
* 延迟队列
|
||
*/
|
||
#[Consumer(nums: 1)]
|
||
class AutoCompleteServicePackageDelayDirectConsumer extends ConsumerMessage
|
||
{
|
||
use ProducerDelayedMessageTrait;
|
||
use ConsumerDelayedMessageTrait;
|
||
|
||
protected string $exchange = 'amqp.delay.direct';
|
||
|
||
protected ?string $queue = 'auto.complete.service.package.delay.queue';
|
||
|
||
protected string $type = Type::DIRECT; //Type::FANOUT;
|
||
|
||
protected string|array $routingKey = 'AutoCompleteServicePackage';
|
||
|
||
public function consumeMessage($data, AMQPMessage $message): string
|
||
{
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->info(json_encode($data, JSON_UNESCAPED_UNICODE));
|
||
|
||
try {
|
||
// 检测执行次数
|
||
$Utils = new Utils();
|
||
$redis_key = "CompleteServicePackage" . $data['order_no'];
|
||
$res = $Utils->checkHandleNumber($redis_key);
|
||
if (!$res) {
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error("超出最大执行次数或检测错误");
|
||
return Result::ACK;
|
||
}
|
||
}catch (\Throwable $e){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage());
|
||
return Result::REQUEUE;
|
||
}
|
||
|
||
try {
|
||
// 检测入参参数
|
||
$res = $this->detectInputParameters($data);
|
||
if (!$res){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error("入参错误" );
|
||
return Result::ACK;
|
||
}
|
||
|
||
// 获取订单数据
|
||
$params = array();
|
||
$params['order_service_no'] = $data['order_no'];
|
||
$order_service_package = OrderServicePackage::getOne($params);
|
||
if (empty($order_service_package)){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error("入参错误" );
|
||
return Result::ACK;
|
||
}
|
||
|
||
// 检测订单状态
|
||
$res = $this->detectOrderStatus($order_service_package);
|
||
if (!$res){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单状态错误" );
|
||
return Result::ACK;
|
||
}
|
||
|
||
// 检测订单结束时间
|
||
$res = $this->detectOrderFinishTime($order_service_package);
|
||
if (!$res){
|
||
// 未到结束时间,重新加入队列
|
||
$res = $this->addQueue($order_service_package);
|
||
if (!$res){
|
||
// 重新添加队列失败
|
||
return Result::REQUEUE;
|
||
}
|
||
|
||
return Result::DROP;
|
||
}
|
||
|
||
}catch (\Throwable $e){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage());
|
||
return Result::REQUEUE;
|
||
}
|
||
|
||
// 处理业务
|
||
Db::beginTransaction();
|
||
try {
|
||
// 处理服务包订单为已完成
|
||
$data = array();
|
||
$data['order_service_status'] = 4;// 订单状态(1:待支付 2:未开始 3:服务中 4:服务完成 5:服务取消)
|
||
|
||
$params = array();
|
||
$params['order_service_id'] = $order_service_package['order_service_id'];
|
||
OrderServicePackage::edit($params,$data);
|
||
|
||
Db::commit();
|
||
}catch (\Throwable $e){
|
||
Db::rollBack();
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage());
|
||
return Result::REQUEUE;
|
||
}
|
||
|
||
// 发送消息
|
||
try {
|
||
|
||
}catch (\Throwable $e){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage());
|
||
return Result::ACK;
|
||
}
|
||
|
||
return Result::ACK;
|
||
}
|
||
|
||
/**
|
||
* 检测入参参数
|
||
* @param array $data
|
||
* @return bool
|
||
*/
|
||
public function detectInputParameters(array $data): bool
|
||
{
|
||
if (empty($data['order_no'])){
|
||
return false;
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* 检测订单状态
|
||
* @param array|object $order_service_package
|
||
* @return bool
|
||
*/
|
||
public function detectOrderStatus(array|object $order_service_package): bool
|
||
{
|
||
if ($order_service_package['order_service_status'] == 1){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单未支付" );
|
||
return false;
|
||
}
|
||
|
||
if ($order_service_package['order_service_status'] == 2){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单未开始" );
|
||
return false;
|
||
}
|
||
|
||
if ($order_service_package['order_service_status'] == 4){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单已完成" );
|
||
return false;
|
||
}
|
||
|
||
if ($order_service_package['order_service_status'] == 5){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单已取消" );
|
||
return false;
|
||
}
|
||
|
||
// 订单退款状态(0:无退款 1:申请退款 2:退款中 3:退款成功 4:拒绝退款 5:退款关闭 6:退款异常)
|
||
if (!in_array($order_service_package['refund_status'],[0,4,5])){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单退款中" );
|
||
return false;
|
||
}
|
||
|
||
// 订单支付状态
|
||
if ($order_service_package['pay_status'] != 2){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error("订单未支付" );
|
||
return false;
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* 检测订单结束时间
|
||
* @param array|object $order_service_package
|
||
* @return bool
|
||
*/
|
||
public function detectOrderFinishTime(array|object $order_service_package): bool
|
||
{
|
||
$finish_time = strtotime($order_service_package['finish_time']);
|
||
|
||
$diff_time = time() - $finish_time;
|
||
|
||
if ($diff_time < 0 && ($diff_time > -7200)){
|
||
// 负数,表示还未到完成时间。重新加入队列
|
||
return false;
|
||
}
|
||
|
||
// 正数,已到或已超完成时间。可以取消。
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* 重新加入队列
|
||
* @param array|object $order_service_package
|
||
* @return bool
|
||
*/
|
||
public function addQueue(array|object $order_service_package): bool
|
||
{
|
||
try {
|
||
$finish_time = strtotime($order_service_package['finish_time']);
|
||
|
||
// 结束时间大于当前时间
|
||
if ($finish_time > time()){
|
||
$time = $finish_time - time();
|
||
|
||
$queue_data = array();
|
||
$queue_data['order_no'] = $order_service_package['order_service_no'];
|
||
|
||
$message = new AutoCompleteServicePackageDelayDirectProducer($queue_data);
|
||
$message->setDelayMs(1000 * $time);
|
||
$producer = $this->container->get(Producer::class);
|
||
$res = $producer->produce($message);
|
||
if (!$res) {
|
||
return false;
|
||
}
|
||
}
|
||
}catch (\Throwable $e){
|
||
Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage());
|
||
return false;
|
||
}
|
||
|
||
return true;
|
||
}
|
||
}
|