新增订阅消息队列+短信发送队列

This commit is contained in:
wucongxing 2023-03-03 19:35:57 +08:00
parent cbf78278b5
commit 7740dad0e5
19 changed files with 689 additions and 16 deletions

View File

@ -10,6 +10,9 @@
- PDO PHP 拓展 If you need to use MySQL Client
- Redis PHP 拓展 If you need to use Redis Client
# 依赖
- 本地rabbitmqenv内配置
# 启动方式
$ php bin/hyperf.php start

View File

@ -10,6 +10,9 @@ use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;
/**
* 药师处方分配队列
*/
#[Consumer(exchange: 'amqp.direct', routingKey: 'PrescriptionDistribute', queue: 'prescription.distribute.pharmacist.queue', nums: 1)]
class PrescriptionDistributePhConsumer extends ConsumerMessage
{

View File

@ -0,0 +1,22 @@
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;
/**
* 发送短信
*/
#[Consumer(exchange: 'amqp.direct', routingKey: 'SendSmsMessage', queue: 'send.sms.message.queue', nums: 1)]
class SendSmsMessageConsumer extends ConsumerMessage
{
public function consumeMessage($data, AMQPMessage $message): string
{
return Result::ACK;
}
}

View File

@ -0,0 +1,156 @@
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use App\Constants\HttpEnumCode;
use App\Exception\BusinessException;
use App\Model\LogMessagePush;
use App\Model\SubTemplate;
use App\Model\User;
use App\Model\UserDoctor;
use App\Model\UserPatient;
use App\Services\UserService;
use App\Utils\Log;
use Extend\Wechat\Wechat;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\DecodingExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
/**
* 发送订阅消息
*/
#[Consumer(exchange: 'amqp.direct', routingKey: 'SendSubMessage', queue: 'send.sub.message.queue', nums: 1)]
class SendSubMessageConsumer extends ConsumerMessage
{
/**
* @param $data
* [
* "push_user_id" // 用户id被推送者
* "template_title" // 推送的模版名称
* "params" => [ // 推送所需的参数
* "page" // 跳转页面
* "data" => [
* "thing1" => [
* "value" => [
* "参数1"
* ]
* ]
* ]
*
* ]
* ]
* @param AMQPMessage $message
* @return string
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ClientExceptionInterface
* @throws DecodingExceptionInterface
* @throws RedirectionExceptionInterface
* @throws ServerExceptionInterface
* @throws TransportExceptionInterface
*/
public function consumeMessage($data, AMQPMessage $message): string
{
try {
// 获取被推送用户信息
$params = array();
$params['user_id'] = $data['push_user_id'];
$user = User::getOne($params);
if (empty($user)){
$this->addMessagePushLog($data,"未查询到被推送用户信息");
return Result::ACK;
}
// 获取open_id
$UserService = new UserService();
$open_id = $UserService->getOpenIdWithUserId($user['user_id'],$user['user_type']);
if (empty($open_id)){
$this->addMessagePushLog($data,"未获取到被推送用户open_id");
return Result::ACK;
}
// 获取订阅消息模版数据
$params = array();
$params['client_type'] = $user['user_type']; // 客户端类型1:患者端 2:医师端 3:药师端)
$params['template_title'] = $data['template_title'];
$sub_template = SubTemplate::getOne($params);
if (empty($sub_template)){
$this->addMessagePushLog($data,"未查询到推送模版id");
return Result::ACK;
}
$sub_template = $sub_template->toArray();
// 处理发送环境
$miniprogram_state = "developer";
if (env("APP_ENV") == "prod"){
$miniprogram_state = "formal";
}
$options = [
"template_id" => $sub_template['wx_template_id'],
"page" => $data['params']['page'],
"touser" => $open_id,
"data" => $data['params']['data'],
"miniprogram_state" => $miniprogram_state,
"lang" => "zh_CN",
];
// 发起推送
$Wechat = new Wechat($user['user_type']);
$result = $Wechat->sendSubscribeMessage($options);
Log::getInstance()->info("订阅消息推送成功:" . $result);
// 记录推送记录
$this->addMessagePushLog($data,"",$sub_template['wx_template_id'],1,json_encode($options,JSON_UNESCAPED_UNICODE));
} catch (\Exception $e) {
$this->addMessagePushLog($data,$e->getMessage());
return Result::ACK;
}
return Result::ACK;
}
/**
* 增加日志
* @param array $params 推送参数
* @param string $wx_template_id 微信推送模版id
* @param int $status 推送状态1:成功 2:失败)
* @param string $fail_reason 推送失败原因
* @param string $content 推送内容
*/
public function addMessagePushLog(array $params,string $fail_reason = '',string $wx_template_id = "",int $status = 2,string $content = '')
{
$data = array();
$data['push_user_id'] = $params['push_user_id'];
$data['template_title'] = $params['template_title'];
if (!empty($wx_template_id)){
$data['wx_template_id'] = $wx_template_id;
}
$data['params'] = json_encode($params,JSON_UNESCAPED_UNICODE);
$data['status'] = $status;
if (!empty($fail_reason)){
$data['fail_reason'] = $fail_reason;
}
if (!empty($content)){
$data['content'] = $content;
}
$log_message_push = LogMessagePush::addLogMessagePush($data);
if (empty($log_message_push)){
Log::getInstance()->error("增加推送日志失败:" . json_encode($data,JSON_UNESCAPED_UNICODE));
}
}
}

View File

@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
/**
* 发送短信
*/
#[Producer(exchange: 'amqp.direct', routingKey: 'SendSmsMessage')]
class SendSmsMessageProducer extends ProducerMessage
{
public function __construct($data)
{
$this->payload = $data;
}
}

View File

@ -0,0 +1,40 @@
<?php
declare(strict_types=1);
namespace App\Amqp\Producer;
use App\Model\SubTemplate;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
/**
* 发送订阅消息
*/
#[Producer(exchange: 'amqp.direct', routingKey: 'SendSubMessage')]
class SendSubMessageProducer extends ProducerMessage
{
/**
* @param array $data
* [
* "push_user_id" // 用户id被推送者
* "template_title" // 推送的模版名称
* "params" => [ // 推送所需的参数
* "page" // 跳转页面
* "data" => [
* "thing1" => [
* "value" => [
* "参数1"
* ]
* ]
* ]
*
* ]
* ]
*/
public function __construct(array $data)
{
$this->payload = $data;
}
}

View File

@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
namespace App\Constants;
use Hyperf\Constants\AbstractConstants;
use Hyperf\Constants\Annotation\Constants;
#[Constants]
class SubTemplateCode extends AbstractConstants
{
/**
* @Message("患者回复通知")
*/
const PATIENT_RESPONSE = "patientResponse";
}

View File

@ -4,9 +4,11 @@ namespace App\Controller;
use App\Model\Area;
use App\Model\Hospital;
use App\Model\SubTemplate;
use App\Model\TbHospitalMy;
use App\Services\IndexService;
use App\Utils\Prescription;
use Extend\Wechat\Wechat;
use Hyperf\DbConnection\Db;
use Hyperf\Snowflake\IdGeneratorInterface;
use Psr\Http\Message\ResponseInterface;

View File

@ -8,9 +8,17 @@ use App\Request\UserRequest;
use App\Services\UserDoctorService;
use App\Services\UserService;
use App\Utils\Http;
use Extend\Wechat\Wechat;
use Hyperf\DbConnection\Db;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Http\Message\ResponseInterface;
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\DecodingExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
class UserController extends AbstractController
{
}

View File

@ -0,0 +1,76 @@
<?php
declare(strict_types=1);
namespace App\Model;
use Hyperf\Database\Model\Collection;
use Hyperf\Snowflake\Concern\Snowflake;
/**
* @property int $push_id 主键id
* @property int $push_user_id 用户id被推送者
* @property int $event 推送事件(具体定义查看消费端)
* @property string $wx_template_id 微信推送模版id
* @property string $params 推送所需的参数json
* @property int $status 推送状态1:成功 2:失败)
* @property string $fail_reason 推送失败原因
* @property string $content 推送内容
* @property \Carbon\Carbon $created_at 创建时间
* @property \Carbon\Carbon $updated_at 修改时间
*/
class LogMessagePush extends Model
{
use Snowflake;
/**
* The table associated with the model.
*/
protected ?string $table = 'log_message_push';
/**
* The attributes that are mass assignable.
*/
protected array $fillable = ['push_id', 'push_user_id', 'event', 'wx_template_id', 'params', 'status', 'fail_reason', 'content', 'created_at', 'updated_at'];
/**
* The attributes that should be cast to native types.
*/
protected array $casts = ['push_id' => 'integer', 'push_user_id' => 'integer', 'event' => 'integer', 'status' => 'integer', 'created_at' => 'datetime', 'updated_at' => 'datetime'];
protected string $primaryKey = "push_id";
/**
* 获取信息-单条
* @param array $params
* @param array $fields
* @return object|null
*/
public static function getOne(array $params, array $fields = ['*']): object|null
{
return self::where($params)->first($fields);
}
/**
* 获取数据-
* @param array $params
* @param array $fields
* @return Collection|array
*/
public static function getList(array $params = [], array $fields = ['*']): Collection|array
{
return self::where($params)->get($fields);
}
/**
* 新增
* @param array $data
* @return LogMessagePush|\Hyperf\Database\Model\Model
*/
public static function addLogMessagePush(array $data): LogMessagePush|\Hyperf\Database\Model\Model
{
return self::create($data);
}
}

69
app/Model/SubTemplate.php Normal file
View File

@ -0,0 +1,69 @@
<?php
declare(strict_types=1);
namespace App\Model;
use Hyperf\Database\Model\Collection;
use Hyperf\Snowflake\Concern\Snowflake;
/**
* @property int $template_id 主键id
* @property int $client_type 客户端类型1:患者端 2:医师端 3:药师端)
* @property string $wx_template_id 微信订阅模版id
* @property string $template_title 模版标题
* @property int $template_type 模版类型2:长期 3:一次性)
* @property string $template_content 模版内容
* @property \Carbon\Carbon $created_at 创建时间
* @property \Carbon\Carbon $updated_at 修改时间
*/
class SubTemplate extends Model
{
use Snowflake;
/**
* The table associated with the model.
*/
protected ?string $table = 'sub_template';
/**
* The attributes that are mass assignable.
*/
protected array $fillable = ['template_id', 'client_type', 'wx_template_id', 'template_title', 'template_type', 'template_content', 'created_at', 'updated_at'];
protected string $primaryKey = "template_id";
/**
* 获取信息-单条
* @param array $params
* @param array $fields
* @return object|null
*/
public static function getOne(array $params, array $fields = ['*']): object|null
{
return self::where($params)->first($fields);
}
/**
* 获取数据-
* @param array $params
* @param array $fields
* @return Collection|array
*/
public static function getList(array $params = [], array $fields = ['*']): Collection|array
{
return self::where($params)->get($fields);
}
/**
* 新增
* @param array $data
* @return SubTemplate|\Hyperf\Database\Model\Model
*/
public static function addSubTemplate(array $data): SubTemplate|\Hyperf\Database\Model\Model
{
return self::create($data);
}
}

View File

@ -10,7 +10,7 @@ use Hyperf\Validation\Request\FormRequest;
class UserRequest extends FormRequest
{
protected array $scenes = [
'wechatMobileLogin' => ['phone_code','wx_code','user_type'],
];
/**
@ -27,9 +27,6 @@ class UserRequest extends FormRequest
public function rules(): array
{
return [
'phone_code' => 'required',
'wx_code' => 'required',
'user_type' => 'required|integer|min:1|max:3',
];
}
@ -39,12 +36,6 @@ class UserRequest extends FormRequest
public function messages(): array
{
return [
'phone_code.required' => HttpEnumCode::getMessage(HttpEnumCode::CLIENT_HTTP_ERROR),
'wx_code.required' => HttpEnumCode::getMessage(HttpEnumCode::CLIENT_HTTP_ERROR),
'user_type.required' => HttpEnumCode::getMessage(HttpEnumCode::CLIENT_HTTP_ERROR),
'user_type.integer' => HttpEnumCode::getMessage(HttpEnumCode::CLIENT_HTTP_ERROR),
'user_type.min' => HttpEnumCode::getMessage(HttpEnumCode::CLIENT_HTTP_ERROR),
'user_type.max' => HttpEnumCode::getMessage(HttpEnumCode::CLIENT_HTTP_ERROR),
];
}
}

View File

@ -102,9 +102,9 @@ class InquiryService extends BaseService
// 确定支付渠道
// 支付渠道1:小程序支付 2:微信扫码支付)
if ($request_params['inquiry_pay_channel'] == 1){
if ($request_params['client_type'] == 1){
$inquiry_pay_channel = 1;
}elseif ($request_params['inquiry_pay_channel'] == 2){
}elseif ($request_params['client_type'] == 2){
$inquiry_pay_channel = 2;
}

View File

@ -0,0 +1,23 @@
<?php
namespace App\Services;
/**
* 订阅消息业务类
*/
class SubMessageService extends BaseService
{
/**
* 患者回复通知
* @param array $params
* @return array
*/
public function patientResponse(array $params): array
{
$data = array();
$data['thing1']['value'] = $params['data1'];//患者姓名
$data['thing2']['value'] = $params['data2'];//回复内容
return $data;
}
}

View File

@ -3,9 +3,124 @@
namespace App\Services;
use App\Constants\HttpEnumCode;
use App\Model\SubTemplate;
use App\Model\SubUser;
use App\Model\User as UserModel;
use App\Model\UserDoctor;
use App\Model\UserDoctorInfo;
use App\Model\UserPatient;
use Extend\Wechat\Wechat;
use Hyperf\Amqp\Result;
use Hyperf\DbConnection\Db;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\DecodingExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
class UserService extends BaseService
{
/**
* 添加订阅消息
* @return array
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ClientExceptionInterface
* @throws DecodingExceptionInterface
* @throws RedirectionExceptionInterface
* @throws ServerExceptionInterface
* @throws TransportExceptionInterface
*/
public function addSubMessage(): array
{
// 下载消息模版
$weChat = new Wechat(1);
$result = $weChat->getTemplate();
if (empty($result)){
Db::rollBack();
return fail();
}
$template = json_decode($result,true);
foreach ($template['data'] as $item){
$params = array();
$params['wx_template_id'] = $item['priTmplId'];
$sub_template = SubTemplate::getOne($params);
if (empty($sub_template)){
// 新增模版
$data = array();
$data['client_type'] = 1;
$data['wx_template_id'] = $item['priTmplId'];
$data['template_title'] = $item['title'];
$data['template_type'] = $item['type'];
$data['template_content'] = $item['content'];
$sub_template = SubTemplate::addSubTemplate($data);
if (empty($sub_template)){
Db::rollBack();
return fail();
}
}
}
return success();
}
/**
* 通过user_id获取用户openid
* @param string|int $user_id
* @param int $user_type
* @return string
*/
public function getOpenIdWithUserId(string|int $user_id,int $user_type): string
{
$open_id = '';
if ($user_type == 1){
// 患者
$params = array();
$params['user_id'] = $user_id;
$user_patient = UserPatient::getOne($params);
if (empty($user_patient)){
return "";
}
if (empty($user_patient['open_id'])){
return "";
}
$open_id = $user_patient['open_id'];
}elseif ($user_type == 2){
// 医生
$params = array();
$params['user_id'] = $user_id;
$user_doctor = UserDoctor::getOne($params);
if (empty($user_doctor)){
return "";
}
if (empty($user_doctor['open_id'])){
return "";
}
$open_id = $user_doctor['open_id'];
}elseif ($user_type == 3){
// 药师
$params = array();
$params['user_id'] = $user_id;
$user_pharmacist = UserPatient::getOne($params);
if (empty($user_pharmacist)){
return "";
}
if (empty($user_pharmacist['open_id'])){
return "";
}
$open_id = $user_pharmacist['open_id'];
}
return $open_id;
}
}

View File

@ -22,6 +22,7 @@
"hyperf/command": "~3.0.0",
"hyperf/config": "~3.0.0",
"hyperf/constants": "~3.0.0",
"hyperf/crontab": "^3.0",
"hyperf/database": "~3.0.0",
"hyperf/db-connection": "~3.0.0",
"hyperf/framework": "~3.0.0",

48
composer.lock generated
View File

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "9859f63a183816d1cef351a84d674792",
"content-hash": "f299641f60e9b66b2129d0c536d32ee8",
"packages": [
{
"name": "adbario/php-dot-notation",
@ -1739,6 +1739,52 @@
],
"time": "2022-11-01T02:50:54+00:00"
},
{
"name": "hyperf/crontab",
"version": "v3.0.0",
"source": {
"type": "git",
"url": "https://github.com/hyperf/crontab.git",
"reference": "3ba0792038cc0b27d120e1d92f0b351121841789"
},
"dist": {
"type": "zip",
"url": "https://mirrors.huaweicloud.com/repository/php/hyperf/crontab/v3.0.0/hyperf-crontab-v3.0.0.zip",
"reference": "3ba0792038cc0b27d120e1d92f0b351121841789",
"shasum": ""
},
"require": {
"hyperf/utils": "~3.0.0",
"nesbot/carbon": "^2.0",
"php": ">=8.0"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "3.0-dev"
},
"hyperf": {
"config": "Hyperf\\Crontab\\ConfigProvider"
}
},
"autoload": {
"psr-4": {
"Hyperf\\Crontab\\": "src/"
}
},
"license": [
"MIT"
],
"description": "A crontab component for Hyperf.",
"homepage": "https://hyperf.io",
"keywords": [
"crontab",
"hyperf",
"php",
"swoole"
],
"time": "2022-12-10T13:24:37+00:00"
},
{
"name": "hyperf/database",
"version": "v3.0.2",

View File

@ -329,3 +329,6 @@ Router::addGroup('/pay', function () {
Router::post('/callback', [PayController::class, 'wxCallBack']);
});
});
// 添加订阅消息
Router::post('/sub', [UserController::class, 'addSubMessage']);

View File

@ -218,9 +218,8 @@ class Wechat
}
}
// 获取分享二维码
/**
* 获取分享二维码
* @throws NotFoundExceptionInterface
* @throws RedirectionExceptionInterface
* @throws ContainerExceptionInterface
@ -258,4 +257,83 @@ class Wechat
}
}
/**
* 获取当前账号下订阅消息个人模版列表
* @return array|string
* @throws ClientExceptionInterface
* @throws ContainerExceptionInterface
* @throws DecodingExceptionInterface
* @throws NotFoundExceptionInterface
* @throws RedirectionExceptionInterface
* @throws ServerExceptionInterface
* @throws TransportExceptionInterface
*/
public function getTemplate(): array|string
{
try {
$this->createApp($this->config['app_id'], $this->config['secret']);
$client = $this->createClient($this->config['app_id'], $this->config['secret']);
$options = [];
$response = $client->get('wxaapi/newtmpl/gettemplate', $options);
if ($response->isFailed()) {
// 出错了,处理异常
$result = $response->toArray();
if(empty($result)){
// 返回值为空
return [];
}
if (isset($result['errcode'])){
throw new BusinessException();
}
return [];
}
return $response->getContent(false);
} catch (\Exception $e) {
throw new BusinessException($e->getMessage(), HttpEnumCode::SERVER_ERROR);
}
}
/**
* 发送订阅消息
* @throws NotFoundExceptionInterface
* @throws RedirectionExceptionInterface
* @throws ContainerExceptionInterface
* @throws DecodingExceptionInterface
* @throws ClientExceptionInterface
* @throws TransportExceptionInterface
* @throws ServerExceptionInterface
*/
public function sendSubscribeMessage(array $options): array|string
{
try {
$this->createApp($this->config['app_id'], $this->config['secret']);
$client = $this->createClient($this->config['app_id'], $this->config['secret']);
$response = $client->postJson('cgi-bin/message/subscribe/send', $options);
if ($response->isFailed()) {
// 出错了,处理异常
$result = $response->toArray();
if(empty($result)){
// 返回值为空
throw new BusinessException("返回值为空");
}
if (isset($result['errcode'])){
if ($result['errcode'] == 43101){
throw new BusinessException("用户拒绝接收消息");
}
throw new BusinessException($result['errmsg']);
}
throw new BusinessException();
}
return $response->getContent(false);
} catch (\Exception $e) {
throw new BusinessException($e->getMessage(), HttpEnumCode::SERVER_ERROR);
}
}
}