diff --git a/app/Amqp/Consumer/AutoCompleteServicePackageDelayDirectConsumer.php b/app/Amqp/Consumer/AutoCompleteServicePackageDelayDirectConsumer.php new file mode 100644 index 0000000..f6847d9 --- /dev/null +++ b/app/Amqp/Consumer/AutoCompleteServicePackageDelayDirectConsumer.php @@ -0,0 +1,236 @@ +info(json_encode($data, JSON_UNESCAPED_UNICODE)); + + try { + // 检测执行次数 + $Utils = new Utils(); + $redis_key = "CompleteServicePackage" . $data['order_no']; + $res = $Utils->checkHandleNumber($redis_key); + if (!$res) { + Log::getInstance("queue-AutoCompleteServicePackage")->error("超出最大执行次数或检测错误"); + return Result::ACK; + } + }catch (\Throwable $e){ + Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage()); + return Result::REQUEUE; + } + + try { + // 检测入参参数 + $res = $this->detectInputParameters($data); + if (!$res){ + Log::getInstance("queue-AutoCompleteServicePackage")->error("入参错误" ); + return Result::ACK; + } + + // 获取订单数据 + $params = array(); + $params['order_service_no'] = $data['order_no']; + $order_service_package = OrderServicePackage::getOne($params); + if (empty($order_service_package)){ + Log::getInstance("queue-AutoCompleteServicePackage")->error("入参错误" ); + return Result::ACK; + } + + // 检测订单状态 + $res = $this->detectOrderStatus($order_service_package); + if (!$res){ + Log::getInstance("queue-AutoCompleteServicePackage")->error("订单状态错误" ); + return Result::ACK; + } + + // 检测订单结束时间 + $res = $this->detectOrderFinishTime($order_service_package); + if (!$res){ + // 未到结束时间,重新加入队列 + $res = $this->addQueue($order_service_package); + if (!$res){ + // 重新添加队列失败 + return Result::REQUEUE; + } + + return Result::DROP; + } + + }catch (\Throwable $e){ + Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage()); + return Result::REQUEUE; + } + + // 处理业务 + Db::beginTransaction(); + try { + // 处理服务包订单为已完成 + $data = array(); + $data['order_service_status'] = 4;// 订单状态(1:待支付 2:未开始 3:服务中 4:服务完成 5:服务取消) + + $params = array(); + $params['order_service_id'] = $order_service_package['order_service_id']; + OrderServicePackage::edit($params,$data); + + Db::commit(); + }catch (\Throwable $e){ + Db::rollBack(); + Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage()); + return Result::REQUEUE; + } + + // 发送消息 + try { + + }catch (\Throwable $e){ + Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage()); + return Result::ACK; + } + + return Result::ACK; + } + + /** + * 检测入参参数 + * @param array $data + * @return bool + */ + public function detectInputParameters(array $data): bool + { + if (empty($data['order_no'])){ + return false; + } + + return true; + } + + /** + * 检测订单状态 + * @param array|object $order_service_package + * @return bool + */ + public function detectOrderStatus(array|object $order_service_package): bool + { + if ($order_service_package['order_service_status'] == 1){ + Log::getInstance("queue-AutoCompleteServicePackage")->error("订单未支付" ); + return false; + } + + if ($order_service_package['order_service_status'] == 2){ + Log::getInstance("queue-AutoCompleteServicePackage")->error("订单未开始" ); + return false; + } + + if ($order_service_package['order_service_status'] == 4){ + Log::getInstance("queue-AutoCompleteServicePackage")->error("订单已完成" ); + return false; + } + + if ($order_service_package['order_service_status'] == 5){ + Log::getInstance("queue-AutoCompleteServicePackage")->error("订单已取消" ); + return false; + } + + // 订单退款状态(0:无退款 1:申请退款 2:退款中 3:退款成功 4:拒绝退款 5:退款关闭 6:退款异常) + if (!in_array($order_service_package['refund_status'],[0,4,5])){ + Log::getInstance("queue-AutoCompleteServicePackage")->error("订单退款中" ); + return false; + } + + // 订单支付状态 + if ($order_service_package['pay_status'] != 2){ + Log::getInstance("queue-AutoCompleteServicePackage")->error("订单未支付" ); + return false; + } + + return true; + } + + /** + * 检测订单结束时间 + * @param array|object $order_service_package + * @return bool + */ + public function detectOrderFinishTime(array|object $order_service_package): bool + { + $finish_time = strtotime($order_service_package['finish_time']); + + $diff_time = time() - $finish_time; + + if ($diff_time < 0 && ($diff_time > -7200)){ + // 负数,表示还未到完成时间。重新加入队列 + return false; + } + + // 正数,已到或已超完成时间。可以取消。 + return true; + } + + /** + * 重新加入队列 + * @param array|object $order_service_package + * @return bool + */ + public function addQueue(array|object $order_service_package): bool + { + try { + $finish_time = strtotime($order_service_package['finish_time']); + + // 结束时间大于当前时间 + if ($finish_time > time()){ + $time = $finish_time - time(); + + $queue_data = array(); + $queue_data['order_no'] = $order_service_package['order_service_no']; + + $message = new AutoCompleteServicePackageDelayDirectProducer($queue_data); + $message->setDelayMs(1000 * $time); + $producer = $this->container->get(Producer::class); + $res = $producer->produce($message); + if (!$res) { + return false; + } + } + }catch (\Throwable $e){ + Log::getInstance("queue-AutoCompleteServicePackage")->error($e->getMessage()); + return false; + } + + return true; + } +} diff --git a/app/Amqp/Producer/AutoCompleteServicePackageDelayDirectProducer.php b/app/Amqp/Producer/AutoCompleteServicePackageDelayDirectProducer.php new file mode 100644 index 0000000..1f308d1 --- /dev/null +++ b/app/Amqp/Producer/AutoCompleteServicePackageDelayDirectProducer.php @@ -0,0 +1,32 @@ +payload = $data; + } + +} diff --git a/app/Command/AddServicePackageFinishQueueCommand.php b/app/Command/AddServicePackageCompleteQueueCommand.php similarity index 81% rename from app/Command/AddServicePackageFinishQueueCommand.php rename to app/Command/AddServicePackageCompleteQueueCommand.php index 976f11e..4870b8f 100644 --- a/app/Command/AddServicePackageFinishQueueCommand.php +++ b/app/Command/AddServicePackageCompleteQueueCommand.php @@ -4,14 +4,16 @@ declare(strict_types=1); namespace App\Command; +use App\Amqp\Producer\AutoCompleteServicePackageDelayDirectProducer; use App\Model\OrderServicePackage; +use Hyperf\Amqp\Producer; use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Annotation\Command; use Hyperf\DbConnection\Db; use Psr\Container\ContainerInterface; #[Command] -class AddServicePackageFinishQueueCommand extends HyperfCommand +class AddServicePackageCompleteQueueCommand extends HyperfCommand { public function __construct(protected ContainerInterface $container) { @@ -47,6 +49,21 @@ class AddServicePackageFinishQueueCommand extends HyperfCommand $this->putAddFinishStatus($order_service_package['order_service_id'],1); // 添加服务包订单完成延迟队列 + $finish_time = strtotime($order_service_package['finish_time']); + + $time = $finish_time - time(); + + $queue_data = array(); + $queue_data['order_no'] = $order_service_package['order_service_no']; + + $message = new AutoCompleteServicePackageDelayDirectProducer($queue_data); + $message->setDelayMs(1000 * $time); + $producer = $this->container->get(Producer::class); + $res = $producer->produce($message); + if (!$res) { + Db::rollBack(); + $this->line("添加队列失败"); + } Db::commit(); }catch (\Throwable $e){