90 lines
2.7 KiB
PHP
90 lines
2.7 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace App\Amqp\Consumer;
|
||
|
||
use App\Model\User;
|
||
use App\Utils\Log;
|
||
use Hyperf\Amqp\Message\ConsumerDelayedMessageTrait;
|
||
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
|
||
use Hyperf\Amqp\Message\Type;
|
||
use Hyperf\Amqp\Result;
|
||
use Hyperf\Amqp\Annotation\Consumer;
|
||
use Hyperf\Amqp\Message\ConsumerMessage;
|
||
use Hyperf\Redis\Redis;
|
||
use PhpAmqpLib\Message\AMQPMessage;
|
||
|
||
/**
|
||
* 用户在线状态
|
||
*/
|
||
#[Consumer(nums: 1)]
|
||
class UserImOffDelayDirectConsumer extends ConsumerMessage
|
||
{
|
||
use ProducerDelayedMessageTrait;
|
||
use ConsumerDelayedMessageTrait;
|
||
|
||
protected string $exchange = 'amqp.delay.direct';
|
||
|
||
protected ?string $queue = 'user.im.off.delay.queue';
|
||
|
||
protected string $type = Type::DIRECT; //Type::FANOUT;
|
||
|
||
protected string|array $routingKey = 'UserImOff';
|
||
|
||
public function consumeMessage($data, AMQPMessage $message): string
|
||
{
|
||
// Log::getInstance("queue-UserImOff")->info("开始:" . json_encode($data, JSON_UNESCAPED_UNICODE));
|
||
|
||
// 检测参数
|
||
if (!isset($data['user_id'])){
|
||
Log::getInstance("queue-UserImOff")->error("入参错误");
|
||
return Result::DROP;
|
||
}
|
||
|
||
// 获取用户数据
|
||
$params = array();
|
||
$params['user_id'] = $data['user_id'];
|
||
$user = User::getOne($params);
|
||
if (empty($user)){
|
||
// Log::getInstance("queue-UserImOff")->error("无该用户");
|
||
return Result::DROP;
|
||
}
|
||
|
||
try {
|
||
// 检测用户状态
|
||
if ($user['is_online'] == 0){
|
||
// Log::getInstance("queue-UserImOff")->info("用户目前已下线,无需处理");
|
||
return Result::ACK;
|
||
}
|
||
|
||
$im_login_at = strtotime($user['im_login_at']);
|
||
|
||
$diff_time = time() - $im_login_at;
|
||
if ($diff_time <= (30 * 60 + 10)){
|
||
// if ($diff_time <= (2 * 50)){
|
||
// Log::getInstance("queue-UserImOff")->info("用户刚上线未满30分钟,无需处理");
|
||
return Result::ACK;
|
||
}
|
||
|
||
// 修改用户表在线状态
|
||
$params = array();
|
||
$params['user_id'] = $user['user_id'];
|
||
|
||
$data = array();
|
||
$data['is_online'] = 0;
|
||
$res = User::editUser($params,$data);
|
||
if (!$res){
|
||
Log::getInstance("queue-UserImOff")->error("在线状态修改失败");
|
||
return Result::ACK;
|
||
}
|
||
}catch (\Throwable $e){
|
||
Log::getInstance("queue-UserImOff")->error($e->getMessage());
|
||
return Result::DROP;
|
||
}
|
||
|
||
// Log::getInstance("queue-UserImOff")->info("结束:" . $user['user_name'] . "已下线");
|
||
return Result::ACK;
|
||
}
|
||
}
|