import { downloadImage, cdnDownloadImg, cdnDownloadImg2, delay } from '../utils/index' import { TypeMsgQueueItem, TypeMsgQueueItem_TaskInfo, TypeSendMsg, TypeSendMsg_Txt, TypeSendMsg_File, TypeSendMsg_Applet, TypeSendMsg_VideoChannel, TypeSendMsg_Link, TypeSendMsg_Group, TypeAnswerItem } from '../types/typesMsg' import { TypeServerMsgContent } from '../types/typesServer' import hookRequest from '../api/hookRequest' import serverRequest from '../api/scrm/serverRequest' import commonData from '../store/data' import CONST_SEND_MSG_TYPE from '../const/CONST_SEND_MSG_TYPE' class Queue { private items: T[] = [] // 去重:待处理中的业务键集合 private pendingKeys: Set = new Set() // 去重:已发送成功的业务键集合(value 为过期时间戳) private sentKeys: Map = new Map() // 去重时间窗(毫秒) private ttlMs = 1 * 60 * 1000 enqueue(item: T): void { this.items.push(item) commonData.set('waitMsgNum', this.items.length) } dequeue(): T | undefined { if (this.items.length) commonData.set('waitMsgNum', this.items.length - 1) return this.items.shift() } isEmpty(): boolean { return this.items.length === 0 } peek(): T | undefined { return this.items.length > 0 ? this.items[0] : undefined } // 惰性清理已过期的 sentKeys(仅TTL过期清理) private cleanupSentKeys(): void { const now = Date.now() for (const [key, exp] of this.sentKeys) { if (exp <= now) this.sentKeys.delete(key) } } // 判断是否接受该业务键入队 private shouldAccept(key: string): boolean { this.cleanupSentKeys() if (this.pendingKeys.has(key)) return false const exp = this.sentKeys.get(key) if (exp && exp > Date.now()) return false return true } // 去重入队:仅针对消息队列使用(TypeMsgQueueItem) enqueueDedup(item: T): boolean { try { const key = buildBizKey(item) if (!this.shouldAccept(key)) { console.log(`[Queue] 去重命中,丢弃入队 key=${key}`) return false } this.pendingKeys.add(key) this.enqueue(item as T) return true } catch (e) { console.log('[Queue] 去重构建业务键失败,回退普通入队', e) this.enqueue(item as T) return true } } // 处理完成:根据发送结果更新 pending/sent 集合 markDone(item: T, success: boolean): void { try { const key = buildBizKey(item) this.pendingKeys.delete(key) if (success) this.sentKeys.set(key, Date.now() + this.ttlMs) } catch (e) { // 构建键失败也要保证 pending 清理 console.log('[Queue] markDone 失败', e) } } } // 替换文字 const Replace_TXT = { '{{医生姓名}}': 'drName', '{{群名}}': 'entWxRoomName' } async function sendAnswerList( item, _answerList: TypeAnswerItem[], taskInfo: TypeMsgQueueItem_TaskInfo = { sceneType: '' }, drInfo? ): Promise { // item { wxRoomId: 发送群, publisherIds: 'atIds', ids: 回复的消息id } const answerList = JSON.parse(JSON.stringify(_answerList)) as TypeAnswerItem[] answerList.forEach((answerItem) => { if (drInfo) { Object.keys(Replace_TXT).forEach((key) => { const regex = new RegExp(key, 'g') answerItem.extend = (answerItem.extend || '').replace(regex, drInfo[Replace_TXT[key]]) }) } const _answer = { msgType: answerItem.answerType, msgContent: answerItem.extend, // string wxRoomId: item.wxRoomId, atIds: item.publisherIds, addFields: answerItem.addFields || '', taskInfo: taskInfo } // 判断消息是否需要艾特全体成员 if ( answerItem.msgTxtType == 'all' && (_answer.msgType == 'txt' || _answer.msgType == 'atTxt') ) { _answer.msgType = 'atTxt' _answer.atIds = 'atall' } sendMsgToWx(_answer) }) } const defaultDelay = 3000 const messageQueue = new Queue() // 消息发送,判断是否有消息正在处理,如果有,将消息加入队列,如果没有,开始处理当前消息 async function sendMsgToWx(data: TypeMsgQueueItem): Promise { if (!data.msgType) { // console.log('消息发送失败: 消息类型为空', JSON.stringify(data)) sendMsgCb(data, 1, '消息未发送: 消息类型为空') return } if (commonData.get('sendMsgProcessing')) { // 如果有消息正在处理,将消息加入队列 console.log(`[Queue] 消息加入队列, 群ID: ${data.wxRoomId}, 消息类型: ${data.msgType}`) messageQueue.enqueueDedup(data) } else { // 没有消息正在处理,开始处理当前消息 commonData.set('sendMsgProcessing', true) console.log('[Queue] 首条消息处理', JSON.stringify(data)) const accepted = messageQueue.enqueueDedup(data) if (accepted) { await processSendMsg() } else { // 未入队(被去重拦截),恢复处理标记 commonData.set('sendMsgProcessing', false) } } } // processSendMsg: 实际处理消息内容的格式化和发送 // 传入的data 格式 {msgType, wxRoomId, msgContent, atIds, ids, sceneType, taskId, r_item} async function processSendMsg(): Promise { const data = messageQueue.dequeue() if (!data) { commonData.set('sendMsgProcessing', false) console.log('[Queue] 消息队列为空') return } try { console.log(`[Queue] 消息出列、发送, 群ID: ${data.wxRoomId}, 消息类型: ${data.msgType}`) let _data try { _data = (await formatSendMsgData(data))._data } catch (error) { throw new Error('消息格式化失败:' + (error instanceof Error ? error.message : error)) } // 处理好的数据 console.log('发送。。。', JSON.stringify(_data)) await hookRequest.fetchSendMsg(_data) sendMsgCb(data) // 发送成功,标记完成 messageQueue.markDone(data, true) } catch (error) { sendMsgCb(data, 1, (error instanceof Error ? error.message : error) as string) // 发送失败,释放 pending,占位不进入 sentKeys messageQueue.markDone(data, false) } finally { if (messageQueue.isEmpty()) { console.log('消息列表为空了') commonData.set('sendMsgProcessing', false) } else { // 处理下一条消息发送间隔 const nextMsg = messageQueue.isEmpty() ? undefined : messageQueue.peek() const nextTaskInfo = nextMsg?.taskInfo const interval = nextMsg && nextTaskInfo ? nextMsg.wxRoomId === data.wxRoomId && nextTaskInfo.sendInterval ? Number(nextTaskInfo.sendInterval) * 1000 : nextTaskInfo.objectInterval ? Number(nextTaskInfo.objectInterval) * 1000 : defaultDelay : defaultDelay console.log(`[Queue] 下一个消息将在${interval}ms后发送, 群ID: ${nextMsg?.wxRoomId || '无'}`) await delay(interval) await processSendMsg() } } } interface TypeFormatData { _data: | TypeSendMsg | TypeSendMsg_Txt | TypeSendMsg_Link | TypeSendMsg_File | TypeSendMsg_VideoChannel | TypeSendMsg_Applet | TypeSendMsg_Group _msgContent?: TypeServerMsgContent } async function formatSendMsgData(data: TypeMsgQueueItem): Promise { if (!(data.msgContent && data.msgType && data.wxRoomId && CONST_SEND_MSG_TYPE[data.msgType])) { throw new Error(`数据有误`) } const _msgContent: TypeServerMsgContent = JSON.parse(data.msgContent) const _data: TypeSendMsg = { type: CONST_SEND_MSG_TYPE[data.msgType] // chat_room_id: data.wxRoomId // data: { // sendId: data.wxRoomId // } } // 处理_data具体消息内容 // msgType: txt文字 link链接 wxMini小程序 img图片 atTxt群艾特 lhMini横方灯塔 switch (data.msgType) { case 'txt': // 文字消息 _data.user_id = data.wxRoomId _data.msg = (data.atIds ? '\n' : '') + String(_msgContent.msg) return { _data: _data as TypeSendMsg_Txt } case 'atTxt': // 群艾特消息 if (!_msgContent.msg) { throw new Error(`数据有误`) } _data.chat_room_id = data.wxRoomId _data.msg = (data.atIds ? '\n' : '') + String(_msgContent.msg) _data.at_list = data.atIds ? data.atIds.split(',') : [] if (data.atIds && data.atIds.includes('atall')) { _data.at_list = [] } return { _data: _data as TypeSendMsg_Group } case 'img': { // 图片消息 _data.user_id = data.wxRoomId const path = await downloadImage( 'https://oss.hengfangjiankang.com/' + _msgContent.file, _msgContent.file?.split('/')[1] ) _data.path = String(path) return { _data: _data as TypeSendMsg_File } } case 'link': _data.user_id = data.wxRoomId // Object.assign(_data, { content: ' ' }, _msgContent) _data.title = _msgContent.title _data.desc = _msgContent.content || '' _data.cover_url = _msgContent.imgUrl if ( _msgContent.url?.includes('medical.okginko.com') || _msgContent.url?.includes('medical.hengfangjiankang.com') ) { // 101 url后添加groupid参数 _data.target_url = _msgContent.url + '&groupid=' + data.wxRoomId } else { _data.target_url = _msgContent.url } return { _data: _data as TypeSendMsg_Link } case 'wxMini': // 小程序消息/灯塔 case 'lhMini': { let cover_path if (_msgContent.customCoverImg) { cover_path = await cdnDownloadImg2(_msgContent.customCoverImg) } else { cover_path = await cdnDownloadImg(_msgContent) } console.log('cover_path', cover_path, _msgContent.customCoverImg) const msgData: TypeSendMsg_Applet = { ..._data, user_id: data.wxRoomId, title: _msgContent.des || '', desc: _msgContent.title || '', avatar_url: _msgContent.headImgUrl || '', cover_path: cover_path, app_id: _msgContent.appid || '', wechat_id: _msgContent.wechatId || '', page_path: _msgContent.pagepath || '' } // 批量把_data.data中的 null 转换成 "",删除_data.data.id Object.keys(msgData).forEach((key) => { if (msgData[key] === null) msgData[key] = '' }) // 固定参数添加判断 const _addFieldArr = data.addFields ? data.addFields.split(',') : [] if (_addFieldArr.length) { if (_addFieldArr.includes('groupid')) { msgData.page_path = msgData.page_path + '&groupid=' + data.wxRoomId } if (_addFieldArr.includes('drid')) { const res = await serverRequest.getDridByRoomId(data.wxRoomId) if (res.drId) { msgData.page_path = msgData.page_path + '&drid=' + res.drId } } } return { _data: msgData as TypeSendMsg_Applet } } case 'videoChannel': { const msgData: TypeSendMsg_VideoChannel = { ..._data, user_id: data.wxRoomId, cover_url: _msgContent.coverUrl || '', thumb_url: _msgContent.thumbUrl || '', avatar_url: _msgContent.headImgUrl || '', nick_name: _msgContent.nickName || '', desc: _msgContent.description || '', video_url: _msgContent.url || '', extras: _msgContent.extras || '' } return { _data: msgData as TypeSendMsg_VideoChannel } } default: return { _data } } } // 发送消息回调 async function sendMsgCb(data: TypeMsgQueueItem, sendStatus = 0, logMsg = ''): Promise { // sendStatus 0-发送成功 1-发送失败 if (sendStatus == 1) { console.log('消息发送失败', logMsg, JSON.stringify(data)) } const _taskInfo = data.taskInfo if (!_taskInfo) { console.log('_taskInfo为空', JSON.stringify(data)) return } const selfwxid: string = commonData.get('selfwxid') if (_taskInfo.sceneType == 'reply') { // 回复消息 -- 回调 const ids = (_taskInfo.ids || '').split(',') if (sendStatus == 1) { serverRequest.sendMsgFailCb({ appAnswerWxId: selfwxid, ids }) } else { serverRequest.sendMsgCb({ appAnswerWxId: selfwxid, ids }) } } else if (_taskInfo.sceneType == 'timing') { // 定时消息 -- 回调 const cbData = { receiverId: data.wxRoomId, msgType: data.msgType, msgContent: data.msgContent, taskId: _taskInfo.taskId, taskTheme: _taskInfo.taskTheme, sendType: _taskInfo.sendType, // 频次 receiverName: _taskInfo.receiverName, sendStatus: sendStatus || 0 } serverRequest.cbTimingSendMsg(cbData) } } export default { sendMsgToWx, sendMsgCb, sendAnswerList } // ===================== // 业务键构建(用于队列层去重) // ===================== function normalizeStr(s: string): string { return (s || '').trim() } function simpleHash(str: string): string { let h = 0 >>> 0 for (let i = 0; i < str.length; i++) { h = ((h * 31) ^ str.charCodeAt(i)) >>> 0 } return h.toString(16) } // ids function buildBizKey(data: TypeMsgQueueItem): string { const scene = data?.taskInfo?.sceneType || '' const taskId = data?.taskInfo?.taskId || data?.taskInfo?.ids || 'taskId' const atIds = data?.atIds || '' const contentHash = simpleHash(normalizeStr(String(data?.msgContent || ''))) const bizKey = [data.wxRoomId || '', data.msgType || '', scene, taskId, contentHash, atIds].join( ':' ) console.log('bizKey', bizKey) return bizKey }