新增im消息回调修改

This commit is contained in:
wucongxing 2023-11-27 13:17:39 +08:00
parent 6c3d364a6f
commit f57f1905a8
4 changed files with 458 additions and 96 deletions

View File

@ -0,0 +1,86 @@
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use App\Model\User;
use App\Utils\Log;
use Hyperf\Amqp\Message\ConsumerDelayedMessageTrait;
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Redis\Redis;
use PhpAmqpLib\Message\AMQPMessage;
#[Consumer(nums: 1)]
class UserImOffDelayDirectConsumer extends ConsumerMessage
{
use ProducerDelayedMessageTrait;
use ConsumerDelayedMessageTrait;
protected string $exchange = 'amqp.delay.direct';
protected ?string $queue = 'user.im.off.delay.queue';
protected string $type = Type::DIRECT; //Type::FANOUT;
protected string|array $routingKey = 'UserImOff';
public function consumeMessage($data, AMQPMessage $message): string
{
Log::getInstance("queue-UserImOff")->info("开始:" . json_encode($data, JSON_UNESCAPED_UNICODE));
// 检测参数
if (!isset($data['user_id'])){
Log::getInstance("queue-UserImOff")->error("入参错误");
return Result::DROP;
}
// 获取用户数据
$params = array();
$params['user_id'] = $data['Info']['To_Account'];
$user = User::getOne($params);
if (empty($user)){
Log::getInstance("queue-UserImOff")->error("无该用户");
return Result::DROP;
}
try {
// 检测用户状态
if ($user['is_online'] == 0){
Log::getInstance("queue-UserImOff")->info("用户目前已下线,无需处理");
return Result::ACK;
}
// 获取缓存
$redis = $this->container->get(Redis::class);
$redis_key = "user_im_online" . $data['user_id'];
$res = $redis->get($redis_key);
if ($res){
Log::getInstance("queue-UserImOff")->info("用户刚上线未满30分钟无需处理");
return Result::ACK;
}
// 修改用户表在线状态
$params = array();
$params['user_id'] = $user['user_id'];
$data = array();
$data['is_online'] = 0;
$res = User::editUser($params,$data);
if (!$res){
Log::getInstance("queue-UserImOff")->error("在线状态修改失败");
return Result::ACK;
}
}catch (\Throwable $e){
Log::getInstance("queue-UserImOff")->error($e->getMessage());
return Result::DROP;
}
Log::getInstance("queue-UserImOff")->info("结束:" . $user['user_name'] . "已下线");
return Result::ACK;
}
}

View File

@ -0,0 +1,30 @@
<?php
declare(strict_types=1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
use Hyperf\Amqp\Message\ProducerMessage;
use Hyperf\Amqp\Message\Type;
/**
* 用户im下线处理
*/
#[Producer]
class UserImOffDelayDirectProducer extends ProducerMessage
{
use ProducerDelayedMessageTrait;
protected string $exchange = 'amqp.delay.direct';
protected string $type = Type::DIRECT;
protected string|array $routingKey = 'UserImOff';
public function __construct($data)
{
$this->payload = $data;
}
}

View File

@ -738,107 +738,26 @@ class CallBackController extends AbstractController
return $this->ImErrorReturn("回调签名不匹配"); return $this->ImErrorReturn("回调签名不匹配");
} }
// 验证消息内容 if (empty($request_params['CallbackCommand'])){
if (empty($request_params['MsgBody'])) { return $this->ImErrorReturn("回调事件为空");
return $this->ImErrorReturn("消息内容错误缺少MsgBody");
} }
// 验证消息内容类型 $userService = new UserService();
if (empty($request_params['MsgBody'][0]['MsgType'])) {
return $this->ImErrorReturn("消息内容错误缺少MsgType");
}
// 验证消息内容详情 if ($request_params['CallbackCommand'] == "State.StateChange"){
if (empty($request_params['MsgBody'][0]['MsgContent'])) { // 用户状态变更
return $this->ImErrorReturn("消息内容错误缺少MsgContent"); $result = $userService->userImLoginStatus($request_params);
} if ($result['code'] == 0){
return $this->ImErrorReturn($result['message']);
// 验证接收方user_id
if (empty($request_params['To_Account'])) {
return $this->ImErrorReturn("消息内容错误,接收用户错误");
}
// 验证消息唯一id
if (empty($request_params['MsgKey'])) {
return $this->ImErrorReturn("消息内容错误,消息唯一标识错误");
}
// 验证消息重复性
$params = array();
$params['message_key'] = $request_params['MsgKey'];
$message = MessageIm::getExists($params);
if ($message) {
// 消息重复
Log::getInstance("CallBackController-imCallBack")->info("消息重复");
return $this->ImSuccessReturn();
}
// 处理发送结果
if ($request_params['SendMsgResult'] == 0) {
// im中0表示成功
$message_send_result = 1;
}
// 验证自定义消息内容
$is_system = 0;// 是否系统操作发送0:否 1:是)
if (!empty($request_params['CloudCustomData'])) {
$cloud_custom_data = json_decode($request_params['CloudCustomData'], true);
if (!empty($cloud_custom_data['order_inquiry_id'])) {
// 获取订单数据
$params = array();
$params['order_inquiry_id'] = $cloud_custom_data['order_inquiry_id'];
$order_inquiry = OrderInquiry::getOne($params);
if (empty($order_inquiry)) {
return $this->ImErrorReturn("消息内容错误,非法订单");
}
$order_inquiry_id = $cloud_custom_data['order_inquiry_id'];
} }
}elseif ($request_params['CallbackCommand'] == "C2C.CallbackAfterSendMsg"){
if (!empty($cloud_custom_data['is_system'])) { // 用户im消息发送后回调
if ($cloud_custom_data['is_system'] == 1) { $result = $userService->userImAfterSendMsg($request_params);
// 系统发送 if ($result['code'] == 0){
$is_system = 1; return $this->ImErrorReturn($result['message']);
}
}
}
// 入库
$data = array();
if (!empty($request_params['From_Account'])) {
// 系统发送时不带参数
$data['from_user_id'] = $request_params['From_Account'];
}
$data['to_user_id'] = $request_params['To_Account'];
$data['message_key'] = $request_params['MsgKey'];
$data['message_send_time'] = $request_params['RequestTime'];
$data['message_seq'] = $request_params['MsgSeq'];
$data['message_send_result'] = $message_send_result ?? 0;
$data['send_error_info'] = $request_params['ErrorInfo'];
$data['message_type'] = $request_params['MsgBody'][0]['MsgType'];
$data['is_system'] = $is_system;
if (!empty($order_inquiry_id)) {
$data['order_inquiry_id'] = $order_inquiry_id;
}
$message_content = $request_params['MsgBody'][0]['MsgContent'];
$data['message_content'] = json_encode($message_content, JSON_UNESCAPED_UNICODE);
$data['message_custom_content'] = $request_params['CloudCustomData'] ?? "";
$message = MessageIm::addMessage($data);
if (empty($message)) {
return $this->ImErrorReturn("存储数据库失败");
}
// im消息通知
if ($is_system == 0 && isset($message_send_result) && isset($order_inquiry_id)){
try {
$UserService = new UserService();
$UserService->userImMessageNotice($request_params['To_Account'],$order_inquiry_id,$request_params['MsgBody']);
}catch (\Throwable $e){
Log::getInstance("CallBackController-imCallBack")->error("im消息通知失败");
} }
}else{
return $this->ImErrorReturn("非法事件");
} }
} catch (\Exception $e) { } catch (\Exception $e) {
// 验证失败 // 验证失败

View File

@ -2,8 +2,11 @@
namespace App\Services; namespace App\Services;
use App\Amqp\Producer\UserImOffDelayDirectProducer;
use App\Constants\HttpEnumCode; use App\Constants\HttpEnumCode;
use App\Model\Area; use App\Model\Area;
use App\Model\MessageIm;
use App\Model\OrderInquiry;
use App\Model\Popup; use App\Model\Popup;
use App\Model\SubTemplate; use App\Model\SubTemplate;
use App\Model\User; use App\Model\User;
@ -14,12 +17,14 @@ use App\Model\UserLocation;
use App\Model\UserPatient; use App\Model\UserPatient;
use App\Model\UserShipAddress; use App\Model\UserShipAddress;
use App\Model\UserSystem; use App\Model\UserSystem;
use App\Utils\Log;
use App\Utils\Mask; use App\Utils\Mask;
use App\Utils\PcreMatch; use App\Utils\PcreMatch;
use Extend\Tencent\map\Location; use Extend\Tencent\map\Location;
use Extend\TencentIm\Profile; use Extend\TencentIm\Profile;
use Extend\Wechat\Wechat; use Extend\Wechat\Wechat;
use GuzzleHttp\Exception\GuzzleException; use GuzzleHttp\Exception\GuzzleException;
use Hyperf\Amqp\Producer;
use Hyperf\Amqp\Result; use Hyperf\Amqp\Result;
use Hyperf\DbConnection\Db; use Hyperf\DbConnection\Db;
use Hyperf\Redis\Redis; use Hyperf\Redis\Redis;
@ -919,4 +924,326 @@ class UserService extends BaseService
return true; return true;
} }
/**
* 处理用户im登陆状态
* @param array $data im消息体
* {
"CallbackCommand":"State.StateChange",
"Info":{
"To_Account":"516898713896521728",
"Action":"Disconnect",
"Reason":"LinkClose"
},
"EventTime":1701047839484,
"ClientIP":"221.216.35.130",
"OptPlatform":"Web",
"RequestId":"clhus7ir8lcc6oth83f0-144115242829354858-Disconnect-LinkClose",
"SdkAppid":"1400798221",
"contenttype":"json",
"Sign":"c32ed26bfab2b71ffc4d93f209dade65a7fb8736138d0ca6f44af4c11ba66d2d",
"RequestTime":"1701047839"
}
* @return array
*/
public function userImLoginStatus(array $msg_data): array
{
$result = array();
$result['message'] = "";
$result['code'] = 0;
if (empty($msg_data['Info'])){
$result['message'] = "消息内容错误缺少Info";
return $result;
}
if (empty($msg_data['Info']['To_Account'])){
$result['message'] = "消息内容错误缺少Info.To_Account";
return $result;
}
if (empty($msg_data['Info']['Action'])){
$result['message'] = "消息内容错误缺少Info.Action";
return $result;
}
if (empty($msg_data['Info']['Reason'])){
$result['message'] = "消息内容错误缺少Info.Reason";
return $result;
}
if (empty($msg_data['RequestTime'])){
$result['message'] = "消息内容错误缺少RequestTime";
return $result;
}
// 获取用户数据
$params = array();
$params['user_id'] = $msg_data['Info']['To_Account'];
$user = User::getOne($params);
if (empty($user)){
$result['message'] = "消息内容错误,接收用户错误";
return $result;
}
Db::beginTransaction();
try {
if ($msg_data['Info']['Action'] == "Login"){
// 登陆
$im_login_at = date('Y-m-d H:i:s',$msg_data['RequestTime']);
// 修改用户表在线状态
$params = array();
$params['user_id'] = $user['user_id'];
$data = array();
if ($user['is_online'] == 0){
$data['is_online'] = 1;
}
$data['im_login_at'] = $im_login_at;
$res = User::editUser($params,$data);
if (!$res){
$result['message'] = "在线状态存储失败";
return $result;
}
// 添加缓存
$redis = $this->container->get(Redis::class);
$redis_key = "user_im_online" . $user['user_id'];
$redis->set($redis_key,$msg_data['RequestTime'],30*60);
} elseif ($msg_data['Info']['Action'] == "Disconnect"){
// 点右上角退出/断网(如手机开启飞行模式)/微信切后台/杀掉微信进程
$time = strtotime($msg_data['RequestTime']) + 30*60;
$data = array();
$data['user_id'] = $user['user_id'];
$message = new UserImOffDelayDirectProducer($data);
$message->setDelayMs(1000 * $time);
$producer = $this->container->get(Producer::class);
$res = $producer->produce($message);
if (!$res) {
$result['message'] = "添加下线队列失败";
return $result;
}
} elseif ($msg_data['Info']['Action'] == "Logout"){
// 主动退出
// 修改用户表在线状态
if ($user['is_online'] == 1){
$params = array();
$params['user_id'] = $user['user_id'];
$data = array();
$data['is_online'] = 0;
$res = User::editUser($params,$data);
if (!$res){
$result['message'] = "在线状态存储失败";
return $result;
}
}
// 删除缓存
$redis = $this->container->get(Redis::class);
$redis_key = "user_im_online" . $user['user_id'];
$redis->del($redis_key);
}
Db::commit();
}catch (\Throwable $e){
Db::rollBack();
$result['message'] = $e->getMessage();
return $result;
}
$result['message'] = "成功";
$result['code'] = 1;
return $result;
}
/**
* 处理用户im发送消息后回调
* @param array $data im消息体
* {
"CloudCustomData":"{\"order_inquiry_id\":\"581144270615580673\",\"is_system\":1,\"inquiry_type\":\"2\",\"message_rounds\":0,\"patient_family_data\":{\"patient_name\":\"貂蝉一\",\"patient_sex\":\"2\",\"patient_age\":\"32\"}}",
"MsgVersion":0,
"MsgBody":[
{
"MsgType":"TIMCustomElem",
"MsgContent":{
"Desc":"",
"Data":"{\"message_type\":11,\"title\":\"患者信息\",\"desc\":\"\",\"data\":{\"order_no\":\"581144270615580672\",\"disease_desc\":\"ᵀᵒᵈᵃʸ ᴵ ʷᵃⁿᵗ ᵗᵒ ᵇᵉ ᵗʰᵉ ʰᵃᵖᵖⁱᵉˢᵗ ᶜʰⁱˡᵈ ⁱⁿ ᵗʰᵉ ʷʰᵒˡᵉ ᵘⁿⁱᵛᵉʳˢᵉ.今天也要做全宇宙最快乐的小朋友 \",\"message_path\":\"\\\/Pages\\\/yishi\\\/case\\\/index?order_inquiry_id=581144270615580673\"}}",
"Ext":"",
"Sound":""
}
}
],
"CallbackCommand":"C2C.CallbackAfterSendMsg",
"From_Account":"1682282293411975168",
"To_Account":"581056776246890497",
"MsgRandom":299670010,
"MsgSeq":2064411751,
"MsgTime":1699515713,
"SupportMessageExtension":0,
"MsgKey":"2064411751_299670010_1699515713",
"OnlineOnlyFlag":0,
"SendMsgResult":0,
"ErrorInfo":"send msg succeed",
"UnreadMsgNum":8,
"EventTime":1699515713924,
"ClientIP":"139.155.127.177",
"OptPlatform":"RESTAPI",
"RequestId":"37677-144115242877083697-1699515713-299670010",
"SdkAppid":"1400798221",
"contenttype":"json",
"Sign":"f6069aeb0c62cf3d77336794f7cf5a8543ddbff8c285f7341419df37ac713ff2",
"RequestTime":"1699515713"
}
* @return array
*/
public function userImAfterSendMsg(array $msg_data): array
{
$result = array();
$result['message'] = "";
$result['code'] = 0;
// 验证消息内容
if (empty($msg_data['MsgBody'])) {
$result['message'] = "消息内容错误缺少MsgBody";
return $result;
}
// 验证消息内容类型
if (empty($msg_data['MsgBody'][0]['MsgType'])) {
$result['message'] = "消息内容错误缺少MsgType";
return $result;
}
// 验证消息内容详情
if (empty($msg_data['MsgBody'][0]['MsgContent'])) {
$result['message'] = "消息内容错误缺少MsgContent";
return $result;
}
// 验证接收方user_id
if (empty($msg_data['To_Account'])) {
$result['message'] = "消息内容错误,接收用户错误";
return $result;
}
// 验证消息唯一id
if (empty($msg_data['MsgKey'])) {
$result['message'] = "消息内容错误,消息唯一标识错误";
return $result;
}
// 处理自定义消息
if ($msg_data['MsgBody'][0]['MsgType'] == "TIMCustomElem"){
if (empty($msg_data['MsgBody'][0]['MsgContent']['Data'])){
$result['message'] = "自定义消息数据类型错误";
return $result;
}
$content = json_decode($msg_data['MsgBody'][0]['MsgContent']['Data'],true);
if (empty($content)){
$result['message'] = "自定义消息数据内容错误";
return $result;
}
if ($content['message_type'] == 4 || $content['message_type'] == 5){
// 4/5类型时不进行处理
$result['message'] = "成功";
$result['code'] = 1;
return $result;
}
}
// 验证消息重复性
$params = array();
$params['message_key'] = $msg_data['MsgKey'];
$message = MessageIm::getExists($params);
if ($message) {
$result['message'] = "消息重复";
return $result;
}
// 处理发送结果
if ($msg_data['SendMsgResult'] == 0) {
// im中0表示成功
$message_send_result = 1;
}
// 验证自定义消息内容
$is_system = 0;// 是否系统操作发送0:否 1:是)
if (!empty($msg_data['CloudCustomData'])) {
$cloud_custom_data = json_decode($msg_data['CloudCustomData'], true);
if (!empty($cloud_custom_data['order_inquiry_id'])) {
// 获取订单数据
$params = array();
$params['order_inquiry_id'] = $cloud_custom_data['order_inquiry_id'];
$order_inquiry = OrderInquiry::getOne($params);
if (empty($order_inquiry)) {
$result['message'] = "消息内容错误,非法订单";
return $result;
}
$order_inquiry_id = $cloud_custom_data['order_inquiry_id'];
}
if (!empty($cloud_custom_data['is_system'])) {
if ($cloud_custom_data['is_system'] == 1) {
// 系统发送
$is_system = 1;
}
}
}
try {
// 入库
$data = array();
if (!empty($msg_data['From_Account'])) {
// 系统发送时不带参数
$data['from_user_id'] = $msg_data['From_Account'];
}
$data['to_user_id'] = $msg_data['To_Account'];
$data['message_key'] = $msg_data['MsgKey'];
$data['message_send_time'] = $msg_data['RequestTime'];
$data['message_seq'] = $msg_data['MsgSeq'];
$data['message_send_result'] = $message_send_result ?? 0;
$data['send_error_info'] = $msg_data['ErrorInfo'];
$data['message_type'] = $msg_data['MsgBody'][0]['MsgType'];
$data['is_system'] = $is_system;
if (!empty($order_inquiry_id)) {
$data['order_inquiry_id'] = $order_inquiry_id;
}
$message_content = $msg_data['MsgBody'][0]['MsgContent'];
$data['message_content'] = json_encode($message_content, JSON_UNESCAPED_UNICODE);
$data['message_custom_content'] = $msg_data['CloudCustomData'] ?? "";
$message = MessageIm::addMessage($data);
if (empty($message)) {
$result['message'] = "存储数据库失败";
return $result;
}
// im消息通知
if ($is_system == 0 && isset($message_send_result) && isset($order_inquiry_id)){
try {
$UserService = new UserService();
$UserService->userImMessageNotice($msg_data['To_Account'],$order_inquiry_id,$msg_data['MsgBody']);
}catch (\Throwable $e){
Log::getInstance("UserService-userImAfterSendMsg")->error("im消息通知失败");
}
}
}catch (\Throwable $e){
$result['message'] = $e->getMessage();
return $result;
}
$result['message'] = "成功";
$result['code'] = 1;
return $result;
}
} }