Skip to main content

废弃平台

QQ小程序

开发

  • 小程序在用户同意后可以获取用户昵称及用户头像
  • 但昵称不唯一,需要在 connect.qq.com 申请获取用户 unionid,作为唯一凭证
  • 小程序用户验证参考文档
  • 小程序页面存在 10 个页面的上限,跳转需要使用 navigateBack 代替 To
物资租借小程序后端,登录、查询、添加与对应的数据库操作
# 物资租借小程序接收消息
import requests as re
from fastapi import APIRouter, Request, UploadFile
from config import config
from log import loggers
from logic import (
authenticate_user,
query_material,
delete_materials,
update_materials,
add_materials,
)


router = APIRouter()
adapter_logger = loggers["adapter"]


@router.get("/login")
async def material_login(nickname: str, code: str):
"""登录"""
if nickname in config["admin_nicknames"]:
# 只处理管理员列表中的昵称
response = re.get(
"https://api.q.qq.com/sns/jscode2session",
params={
"appid": config["QQAPP_ID"],
"secret": config["QQAPP_SECRET"],
"js_code": code, # 用 encryptedData 作为 js_code 发送
"grant_type": "authorization_code",
},
)
# 发送验证消息
if response.status_code == 200:
data = response.json()
unionid = data.get("unionid")
# print(unionid) # 用于添加管理员,先把user_name添加进列表,然后查看打印出的unionid
if unionid in config["admin_uid"]:
adapter_logger.info(
f"⌈QQAPP⌋ 管理员{nickname}成功登录", extra={"event": "程序登录"}
)
return {"success": True, "message": "0"} # 管理员登录成功

result = await authenticate_user(nickname, "1")
if result == 1:
adapter_logger.info(
f"⌈QQAPP⌋ 用户{nickname}成功登录", extra={"event": "程序登录"}
)
return {"success": True, "message": "1"} # 普通用户登录成功
else:
adapter_logger.error(
f"⌈QQAPP⌋ 用户{nickname}登录失败", extra={"event": "程序登录"}
)
return {"success": True, "message": "2"} # 登录失败


@router.get("/query")
async def material_query(
nickname: str,
code: str,
selectedType: str,
):
# 用户访问资源
auth_code = await authenticate_user(nickname, code)
if auth_code == 2:
adapter_logger.error(
f"⌈QQAPP⌋ 陌生用户{nickname}访问资源列表", extra={"event": "资源访问"}
)
return {"success": True, "message": "3"}

if selectedType not in ["boardgames", "scriptmurders", "publications"]:
adapter_logger.error(
f"⌈QQAPP⌋ 用户{nickname}访问无效的物资类型", extra={"event": "资源访问"}
)

return {"success": True, "message": "4"}

response_data = await query_material(selectedType)
adapter_logger.info(
f"⌈QQAPP⌋ 用户{nickname}获取物资列表成功", extra={"event": "资源访问"}
)
return {"success": True, "message": "5", "data": response_data}


@router.get("/delete")
async def material_delete(nickname: str, code: str, selectedType: str, originalID: str):
# 管理员删除资源
auth_code = await authenticate_user(nickname, code)
if auth_code != 0:
adapter_logger.error(
f"⌈QQAPP⌋ 陌生用户{nickname}试图删除资源", extra={"event": "资源删除"}
)
return {"success": True, "message": "6"}

result = await delete_materials(selectedType, originalID)
if result == 1:
adapter_logger.info(
f"⌈QQAPP⌋ 管理{nickname}删除了ID为{originalID}的记录",
extra={"event": "资源删除"},
)
return {"success": True, "message": "9"}
else:
adapter_logger.error(
f"⌈QQAPP⌋ 管理{nickname}试图删除不存在的资源", extra={"event": "资源删除"}
)
return {"success": True, "message": "7"}


@router.post("/update")
async def material_update(
nickname: str,
code: str,
selectedType: str,
originalID: str,
image: UploadFile,
request: Request,
):
# 管理员更新资源
auth_code = await authenticate_user(nickname, code)
if auth_code != 0:
adapter_logger.error(
f"⌈QQAPP⌋ 陌生用{nickname}试图修改资源", extra={"event": "资源修改"}
)
return {"success": True, "message": "10"}

if selectedType not in ["boardgames", "scriptmurders", "publications"]:
adapter_logger.error(
f"⌈QQAPP⌋ 管理员{nickname}修改无效的物资类型", extra={"event": "资源修改"}
)
return {"success": True, "message": "11"}

result, original_data, filtered_new_values = await update_materials(
selectedType, originalID, request, image
)
if result == 0:
adapter_logger.error(
f"⌈QQAPP⌋ 管理员{nickname}试图修改不存在的资源", extra={"event": "资源修改"}
)
return {"success": True, "message": "12"}
elif result != -1:
adapter_logger.error(
f"管理员{nickname}试图把{originalID}修改成已经存在的id{result}",
extra={"event": "资源修改"},
)
return {"success": True, "message": "13"}
else:
adapter_logger.info(
f"管理员{nickname}成功更新数据库{selectedType}:\n{original_data}\n{', '.join(filtered_new_values)}",
extra={"event": "资源修改"},
)
return {"success": True, "message": "14"}


@router.post("/add")
async def material_add(nickname: str, code: str, selectedType: str, request: Request):
# 管理员添加资源
auth_code = await authenticate_user(nickname, code)
if auth_code != 0:
adapter_logger.error(
f"陌生用户{nickname}试图添加资源",
extra={"event": "资源添加"},
)
return {"success": True, "message": "15"}

if selectedType not in ["boardgames", "scriptmurders", "publications"]:
adapter_logger.error(
f"管理员{nickname} 添加无效的物资类型",
extra={"event": "资源添加"},
)
return {"success": True, "message": "16"}

filtered_new_values = await add_materials(selectedType, request)

adapter_logger.info(
f"管理员{nickname}成功添加物资到数据库{selectedType}:\n{filtered_new_values}",
extra={"event": "资源添加"},
)

return {"success": True, "message": "17"}


from config import config, update_database, query_database

async def authenticate_user(nickname, code):
# 管理员身份确认
if nickname in config["admin_nicknames"] and code == "0":
return 0
elif nickname in ["One-8587", "Two-8587", "New Wave"]:
# 测试员账号
return 1
# 从数据库中查找用户
user_found = await query_database(
"SELECT * FROM user_all WHERE qq_name = :nickname", {"nickname": nickname}
)
if user_found and code == "1":
return 1
return 2

async def add_users(user_list, identity):
# 更新 users_qq_nickname 表,如果 qq_number 存在则更新 nickname及identity
for user in user_list:
qq_number = user[0] # user_id
nickname = user[1] # nickname
# 查询当前用户的 identity
identity_query = "SELECT identity FROM user_all WHERE qq_number = ?"
current_identity = await query_database(identity_query, (qq_number,))
if current_identity: # 如果存在
current_identity = current_identity[0][0]
# 更新 nickname
update_nickname_query = (
"UPDATE user_all SET nickname = ? WHERE qq_number = ?"
)
await update_database(update_nickname_query, (nickname, qq_number))
# 如果 identity 从 1 改为 2,则更新 identity
if current_identity == 1 and identity == 2:
update_identity_query = (
"UPDATE user_all SET identity = ? WHERE qq_number = ?"
)
await update_database(update_identity_query, (2, qq_number))
else: # 如果不存在,插入新记录
insert_query = """
INSERT INTO user_all (qq_number, nickname, identity)
VALUES (?, ?, ?)
"""
await update_database(insert_query, (qq_number, nickname, identity))
async def users_all():
# 删除表
await update_database("DROP TABLE IF EXISTS users_all")
# 创建表
create_table_query = """
CREATE TABLE IF NOT EXISTS users_all (
qq_number TEXT PRIMARY KEY,
qq_name TEXT,
identity TEXT,
name TEXT,
nickname TEXT,
gender TEXT,
grade TEXT,
major TEXT,
student_id TEXT,
phone TEXT,
political_status TEXT,
hometown TEXT,
card_number TEXT,
id TEXT
);
"""
await execute_update(create_table_query)
# 获取 users_qq_nickname 表中的数据
users_qq_nickname_data = await execute_query(
"SELECT qq_number, nickname, identity FROM users_qq_nickname"
)
# 插入合并数据
for qq_number, qq_name, identity in users_qq_nickname_data:
# 查询对应的 users_info 数据
users_info_query = """
SELECT 姓名, 代号, 性别, 年级, 专业, 学号, 电话, 政治面貌, 籍贯, 卡号, id
FROM users_info
WHERE qq = ?
"""
users_info_data = await execute_query(users_info_query, (qq_number,))
# 默认将各字段设为 None,表示缺失
name = nickname = gender = grade = major = student_id = phone = (
political_status
) = hometown = card_number = user_id = None
# 如果找到对应的 users_info 数据,则更新各字段
if users_info_data:
(
name,
nickname,
gender,
grade,
major,
student_id,
phone,
political_status,
hometown,
card_number,
user_id,
) = users_info_data[0]
# 插入到 users_all 表
insert_query = """
INSERT INTO users_all (
qq_number, qq_name, identity, name, nickname, gender, grade,
major, student_id, phone, political_status, hometown, card_number, id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
await execute_update(
insert_query,
(
qq_number,
qq_name,
identity,
name,
nickname,
gender,
grade,
major,
student_id,
phone,
political_status,
hometown,
card_number,
user_id,
),
)

botpy

  • 虽然说 websocket 放弃支持了,但这个包还是有人用
  • 参考文档
botpy 逻辑,包含两个包内替换函数,主逻辑与消息发送的替换逻辑
"""botpy 旧逻辑,使用需安装 botpy"""
# 把这两个函数在botpy/api.py的最下面,修改后可上传本地文件
async def post_group_file(
self,
group_openid: str,
file_type: int,
url: str,
srv_send_msg: bool = False,
) -> message.Media:
if os.path.isfile(url): # 检查是否为本地文件路径
# 读取本地文件并编码为 base64
with open(url, "rb") as f:
file_data = base64.b64encode(f.read()).decode("utf-8")
else: # 假设传入的是 URL
response = requests.get(url)
if response.status_code == 200:
file_data = base64.b64encode(response.content).decode("utf-8")
else:
print(f"下载文件失败,状态码: {response.status_code}")
payload = {
"group_openid": group_openid,
"file_type": file_type,
"file_data": file_data, # 使用 base64 编码的文件数据
"srv_send_msg": srv_send_msg,
}
route = Route("POST", "/v2/groups/{group_openid}/files", group_openid=group_openid)
return await self._http.request(route, json=payload)


async def post_c2c_file(
self,
openid: str,
file_type: int,
url: str,
srv_send_msg: bool = False,
) -> message.Media:
if os.path.isfile(url): # 检查是否为本地文件路径
# 读取本地文件并编码为 base64
with open(url, "rb") as f:
file_data = base64.b64encode(f.read()).decode("utf-8")
else: # 假设传入的是 URL
response = requests.get(url)
if response.status_code == 200:
file_data = base64.b64encode(response.content).decode("utf-8")
else:
print(f"下载文件失败,状态码: {response.status_code}")
payload = {
"openid": openid,
"file_type": file_type,
"file_data": file_data, # 使用 base64 编码的文件数据
"srv_send_msg": srv_send_msg,
}
route = Route("POST", "/v2/users/{openid}/files", openid=openid)
return await self._http.request(route, json=payload)


# 下面的为主要代码,可参考 botpy 的 pythonsdk 中的包引入
import re
import botpy
from botpy.message import C2CMessage, GroupMessage
from log import loggers
from config import config
from message.handler.msg import Msg

adapter_logger = loggers["adapter"]
global_api = {} # 用于存储 message.api


def set_global_api(api):
# 设置botpy的全局api
global global_api
if "api" not in global_api: # 只在第一次赋值
global_api["api"] = api


class MyClient(botpy.Client):
# 官方QQ机器人客户端类
async def on_ready(self):
adapter_logger.debug(f"⌈LR232⌋ 启动成功", extra={"event": "运行日志"})

@staticmethod
async def on_c2c_message_create(message: C2CMessage):
adapter_logger.debug(f"⌈LR232⌋ 接收:{message}", extra={"event": "消息接收"})
set_global_api(message._api)
message.content = await msg_content_join(message.content)
files = {}
if message.attachments:
for attachment in message.attachments:
file_name = attachment.get("filename")
file_url = attachment.get("url")
if file_name and file_url:
files.append((file_name, file_url))
Msg(
robot="LR232",
kind="私聊文件消息" if message.content else "私聊图文消息",
event="处理",
source=message.author.user_openid,
seq=message.id,
content=message.content,
files=files,
)
else:
Msg(
robot="LR232",
kind="私聊文字消息",
event="处理",
source=message.author.user_openid,
seq=message.id,
content=message.content,
files=files,
)

@staticmethod
async def on_group_at_message_create(message: GroupMessage):
adapter_logger.debug(f"⌈LR232⌋ 接收:{message}", extra={"event": "消息接收"})
set_global_api(message._api)
message.content = await msg_content_join(message.content)
files = {}
if message.attachments:
for attachment in message.attachments:
file_name = attachment.get("filename")
file_url = attachment.get("url")
if file_name and file_url:
files.append((file_name, file_url))
Msg(
robot="LR232",
kind="私聊文件消息" if message.content else "私聊图文消息",
event="处理",
source=message.author.member_openid,
seq=message.id,
content=message.content,
files=files,
group=message.group_openid,
)
else:
Msg(
robot="LR232",
kind="私聊文字消息",
event="处理",
source=message.author.member_openid,
seq=message.id,
content=message.content,
files=files,
group=message.group_openid,
)


async def msg_content_join(content):
# 转换内容中的表情包
content = content.strip()
pattern = r"<faceType=(\d+),faceId=\"(\d*)\".*?>"

def replace_face(match):
face_type = int(match.group(1)) # 获取 faceType

if face_type == 1:
face_id = int(match.group(2)) # 获取 faceId
emoji_name = config["emojis"].get(face_id, "未知表情")
return f"[{emoji_name}]"
elif face_type == 4:
return "[动画表情]"
else:
return match.group(0) # 保留原内容,适用于其他情况

# 替换所有匹配项
return re.sub(pattern, replace_face, content)


async def LR232_start():
intents = botpy.Intents(public_messages=True)
client = MyClient(intents=intents)
await client.start(appid=config["appid"], secret=config["secret"])


# 以下是需要并入 msg_send 的 botpy 发送逻辑
async def msg_send(msg: Msg):
if msg.robot == "LR232":
if msg.group:
# 发送群消息
message_instance = GroupMessage(global_api.get("api"), "", {})
uploadMedia = ""
if msg.files:
# 调用上传函数,传入 base64 编码的文件内容
uploadMedia = await message_instance.api.post_group_file(
group_openid=msg.group,
file_type=1, # 图片类型
url=msg.files[0][1], # 传入 base64 编码后的文件内容
)
await message_instance.api.post_group_message(
group_openid=msg.group,
msg_type=7 if uploadMedia else 0,
content=msg.content,
media=uploadMedia if uploadMedia else None,
msg_id=msg.seq,
)
# 记录消息
if msg.files:
msg.content = f"{msg.content}|{msg.files[0][1]}"
adapter_logger.debug(
f"⌈LR232⌋ 发送:文件{msg.content}", extra={"event": "消息发送"}
)
else:
# 发送好友消息
message_instance = C2CMessage(global_api.get("api"), "", {})
uploadMedia = ""
if msg.files:
# 调用上传函数,传入 base64 编码的文件内容
uploadMedia = await message_instance.api.post_c2c_file(
openid=msg.source,
file_type=4, # 图片类型
url=msg.files[0][1], # 传入 base64 编码后的文件内容
)
await message_instance.api.post_c2c_message(
openid=msg.group,
msg_type=7 if uploadMedia else 0,
content=msg.content,
media=uploadMedia if uploadMedia else None,
msg_id=msg.seq,
)
if msg.files:
msg.content = f"{msg.content}|{msg.files[0][0]}"
adapter_logger.debug(
f"⌈LR232⌋ 发送:文件{msg.content}", extra={"event": "消息发送"}
)

微博

  • 微博的回调配置使用 Oauth3,回调验证在微博服务器上多增加了一步,但整体还与 QQ、微信差不多
  • 回调验证、apikey 的获取、api 的调用均测试成功,但由于迟迟不通过申请,无法接收消息
微博的逻辑,包含回调验证、消息接收、发送
"""微博粉丝服务平台消息接收/发送"""
import json
import time
import hashlib
import requests
from fastapi import APIRouter, Response, Request
from config import config,loggers

# 以下为接收
router = APIRouter()
adapter_logger = loggers["adapter"]


@router.get("/")
def set_callback(signature: str, timestamp: str, nonce: str, echostr: str):
"""回调地址验证"""
try:
token = config["WEIBO_SECRET"]
list = [token, timestamp, nonce]
list.sort()
sha1 = hashlib.sha1() # 计算SHA1哈希值
for item in list:
sha1.update(item.encode("utf-8"))
hashcode = sha1.hexdigest()

if hashcode == signature: # 比对signature与计算出的hashcode
adapter_logger.debug(
f"⌈WEIBO⌋ 消息回调配置成功", extra={"event": "回调配置"}
)
return Response(content=echostr, media_type="text/plain")
else:
raise Exception(
f"回调配置错误 | 数据不完整: signature-{signature} timestamp-{timestamp} nonce-{nonce} echostr-{echostr}"
)

except Exception as e:
raise Exception(f"回调配置错误 | 错误: {e}")


@router.post("/")
async def handle_post(request: Request):
print(request.body())

# 以下为发送
# 请求的基础信息
APP_KEY = "" # 替换为你的 APP ID
APP_SECRET = "" # 替换为你的 Token



def get_token():
# 请求头
headers = {
"client_id": APP_KEY,
"client_secret": APP_SECRET,
"grant_type": "authorization_code",
"code": code,
"redirect_uri": uri, # 可以使用随机生成的 UUID
}

# 发送 POST 请求
response = requests.post("https://api.weibo.com/oauth2/access_token", data=headers)

# 打印结果
print(f"Status Code: {response.status_code}")
print(f"Response Body: {response.json()}")


def get_weibo():
# 请求头
headers = {"access_token": access_token, "screen_name": "whu推协"}

# 发送 POST 请求
response = session.get(
"https://api.weibo.com/2/friendships/friends.json", params=headers
)

# 打印结果
print(f"Status Code: {response.status_code}")
print(f"Response Body: {response.json()}")


def get_mentions():
# 请求头
headers = {"access_token": access_token}

# 发送 POST 请求
response = session.get(
"https://api.weibo.com/2/statuses/mentions.json", params=headers
)

# 打印结果
print(f"Status Code: {response.status_code}")
print(f"Response Body: {response.json()}")


get_mentions()

微信开放平台

  • 微信开放平台可以设置一些自动回复逻辑、被动回复消息等,其他的 API 由于要个人认证故没有测试
微信开放平台逻辑
"""微信开放平台测试代码"""
import hashlib
import time
import requests
import json

# 请求的基础信息
APP_ID = "" # 替换为你的 APP ID
TOKEN = "" # 替换为你的 Token
ACCOUNT = "" # 替换为你的账户
key = ""
access_token = ""
task_id1 = ""


def generate_sign(token, timestamp, nonce, body):
"""
根据文档生成签名 sign = md5(Token + str(unix_timestamp) + nonce + md5(body))
"""
body_md5 = hashlib.md5(body.encode("utf-8")).hexdigest()
sign_str = f"{token}{timestamp}{nonce}{body_md5}"
sign = hashlib.md5(sign_str.encode("utf-8")).hexdigest()
return sign


def send_request():
# 时间戳和随机字符串
timestamp = int(time.time())
nonce = "abc" # 这里可以使用更复杂的随机字符串
url = "https://openaiapi.weixin.qq.com/v2/token"

# 请求体
body = json.dumps({"account": ACCOUNT})

# 生成签名
sign = generate_sign(TOKEN, timestamp, nonce, body)

# 请求头
headers = {
"X-APPID": APP_ID,
"request_id": "", # 可以使用随机生成的 UUID
"timestamp": str(timestamp),
"nonce": nonce,
"sign": sign,
"content-type": "application/json",
}

# 发送 POST 请求
response = requests.post(url, headers=headers, data=body)

# 打印结果
print(f"Status Code: {response.status_code}")
print(f"Response Body: {response.json()}")


def import_simple_qna():
# 时间戳和随机字符串
timestamp = int(time.time())
nonce = "abc" # 这里可以用随机生成的字符串
url = "https://openaiapi.weixin.qq.com/v2/bot/import/json"

# 请求体
body = json.dumps(
{
"mode": 0,
"data": [
{
"skill": "AAA",
"intent": "BBC",
"threshold": "0.9",
"disable": False,
"questions": ["q", "q2"],
"answers": ["a"],
}
],
}
)

# 生成签名
sign = generate_sign(TOKEN, timestamp, nonce, body)

# 请求头
headers = {
"content-type": "application/json",
"X-OPENAI-TOKEN": access_token,
"request_id": ACCOUNT,
"timestamp": str(timestamp),
"nonce": nonce,
"sign": sign,
}

# 发送 POST 请求
response = requests.post(url, headers=headers, data=body)

# 打印响应结果
if response.status_code == 200:
print("导入成功!返回数据:")
print(response.json())
else:
print(f"导入失败,状态码:{response.status_code}")
print(response.text)


def query_qna(task_id):
timestamp = int(time.time())
nonce = "abc" # 这里可以用随机生成的字符串
url = "https://openaiapi.weixin.qq.com/v2/async/fetch"

# 请求体
body = json.dumps({"task_id": task_id})

# 生成签名
sign = generate_sign(TOKEN, timestamp, nonce, body)


if __name__ == "__main__":
send_request()
# import_simple_qna()

LLOneBot

  • LLOneBot 的使用方法与 napcat 基本相同(后端)
  • 这里是一个基于 ws 连接的逻辑,由于 ws 连接的特性,发送消息后的返回值会被 on_message 捕获,导致接收不到结果,需要用 echo 来区分字段
  • 用 http 则没有此问题
LLOneBot 原代码逻辑,包含消息解析(消息段和 CQ 码均有)与一些简单功能(备忘录,活动添加,每日龙王获取等)
import re
import os
import json
import asyncio
import traceback
import websockets
from datetime import datetime
from zoneinfo import ZoneInfo
from lrobot.config import config
from lrobot.log import robot_log
from lrobot.database import add_users, activities_edit
from lrobot.event.point import get_speakers, task_queue
from lrobot.msg import Msg, msg_log, msg_add, global_ws, set_global_ws


async def LR5921_start():
websocket_url = "ws://localhost:5921" # LLOneBot连接端口
try:
async with websockets.connect(websocket_url) as ws:
set_global_ws(ws) # 全局共享ws连接
await on_open()
await on_message() # WebSocket 消息接收和处理
except Exception:
error_message = traceback.format_exc()
log_event(
"LRobot", "系统未知错误", f"WebSocket connection error: {error_message}"
)


async def on_open():
log_event("LR5921", "机器启动", "启动成功")
asyncio.create_task(get_speakers()) # 每日发言逻辑


async def on_message():
try:
async for message in global_ws.get("ws"):
data = json.loads(message)
if "status" in data and data["status"] == "ok":
await echo_deal(data)
elif "post_type" in data: # 收到 WS 事件
if data.get("post_type") == "message": # 消息事件
await message_deal(data)
elif data.get("post_type") == "notice": # 通知事件
log_event("LR5921", "通知事件调试", data)
is_group_recall = (
data.get("notice_type") == "group_recall"
and str(data.get("group_id")) == config["测试群"]
) # 群聊撤回
is_friend_recall = (
data.get("notice_type") == "friend_recall"
) # 私聊撤回
is_poked = data.get("sub_type") == "poke" and str(
data.get("target_id")
) in [
config["LR5921"],
config["LR232"],
] # 戳戳
is_liked = (
data.get("notice_type") == "group_msg_emoji_like"
) # 群消息被表态
if is_group_recall or is_friend_recall:
await revoke(data)
elif is_poked:
await get_poke(data)
elif is_liked:
await get_like(data)
# 不处理请求事件和元事件
else:
# WS返回failed
log_event("LR5921", "WS回应调试", data)
except Exception:
error_message = traceback.format_exc()
log_event(
"LRobot", "系统未知错误", f"WebSocket massage_deal error: {error_message}"
)


async def echo_deal(data):
# 处理 WS 回应的消息,echo为附属信息字段,收发一致
log_event("LR5921", "WS回应调试", data)
echo = data.get("echo", "")
data = data.get("data")
if echo.startswith("msg_send"):
await echo_msg_send(echo, data)
elif echo.startswith("msg_get"):
await echo_msg_get(echo, data)
elif echo.startswith("get_like"):
await echo_get_like(echo, data)
elif echo.startswith("revoke"):
await echo_revoke(data)
elif echo.startswith("flush_speak"):
await echo_flush_speak(data)
elif echo.startswith("get_speak"):
await echo_get_speak(data)
elif echo.startswith("get_talkative"):
await echo_get_talkative(data)
elif echo.startswith("get_users"):
await echo_get_users(data)
elif echo.startswith("get_file"):
await echo_get_file(echo, data)
elif echo.startswith("add_activities"):
await echo_add_activities(echo, data)
elif echo.startswith("add_activity_group"):
await echo_add_activity_group(echo, data)
else:
log_event("LRobot", "系统未知错误", f"未知的WS回应{echo}|{data}")


async def message_deal(data):
# 消息处理
msg_content = data.get("message") # 原始消息段格式(数组)
log_event("LR5921", "消息接收调试", msg_content)
content, info, name, url, at = await segment_join(msg_content)
if not content and not name: # 不处理空格消息
return

if data.get("message_type") == "private": # 私聊消息
if info:
kind = 13 # 好友回复
else:
if content:
if name:
kind = 11
else:
kind = 10
else:
kind = 12
else: # 群聊消息
if info:
if at == 1:
kind = 33 # 配对消息
elif at == 2:
kind = 23 # 群聊回复
else:
content = "[回复消息]" + content
kind = 34
else:
if at == 1:
kind = 33 # 配对消息
elif at == 2:
if content:
if name:
kind = 11
else:
kind = 10
else:
kind = 12
else:
kind = 34

# # 内阁管家特殊处理,不用@快捷使用指令
# keywords = ["记:", "等:", "催:", "示:", "删:"]
# if kind == 12 and str(data.get('group_id')) == config["内阁"] and any(keyword in content for keyword in keywords):
# kind = 2

msg = Msg(
robot="LR5921",
content=content,
kind=kind,
info=info,
file_name=name,
file_url=url,
group=None if data.get("message_type") == "private" else data.get("group_id"),
qq=data.get("user_id"),
seq=data.get("message_id"),
)
await msg_add(msg)


async def segment_join(content):
content_parts = []
# 支持识别表情混合文字、图片混合文字、语音、视频、文件、掷骰子、猜拳、合并转发
# 目前推荐好友、群聊、位置分享、链接分享、音乐分享都是json格式,视频是文件格式
info_updated = False # 只识别第一个文件
info = ""
name = ""
url = ""
at = 0
for item in content:
msg_type = item.get("type")
msg_data = item.get("data", {})
if msg_type == "text":
content_parts.append(msg_data.get("text", ""))
elif msg_type == "face":
face_id = msg_data.get("id")
face_id = config["emojis"].get(int(face_id), "未知表情")
content_parts.append(f"[{face_id}]")
elif msg_type == "mface":
face_id = msg_data.get("summary")
content_parts.append(face_id)
elif msg_type == "rps":
rps_id = msg_data.get("result")
rps_mapping = {"1": "布", "2": "剪刀", "3": "石头"}
rps_result = rps_mapping.get(
rps_id, "未知结果"
) # 默认值为'未知结果',如果rps_id不在映射中
content_parts.append(f"[猜拳:{rps_result}]")
elif msg_type == "dice":
dice_id = msg_data.get("result")
content_parts.append(f"[掷骰子{dice_id}点]")
elif msg_type == "forward":
content_parts.append(f"[合并转发消息]")
elif msg_type == "node":
content_parts.append(f"[合并转发节点]")
elif msg_type == "image" and not info_updated:
name = msg_data.get("file")
url = msg_data.get("url")
elif msg_type == "record" and not info_updated:
name = msg_data.get("file")
url = msg_data.get("url")
elif msg_type == "file" and not info_updated:
name = msg_data.get("file")
# 此处是未下载的msg_id
url = msg_data.get("file_id")
elif msg_type == "reply":
# 回复的消息
info = msg_data.get("id")
elif msg_type == "at":
qq = msg_data.get("qq")
if qq == config["LR232"]:
at = 1
elif qq == config["LR5921"] and at != 1:
at = 2
else:
user = msg_data.get("name")
content_parts.append(f"[@{user}]")
else:
log_event("LRobot", "系统未知错误", f"收到无法解析的消息:{item}")
content_join = "".join(content_parts)
return content_join, info, name, url, at


async def array_join(content):
# 获取发送的消息、撤回的消息、回应的消息(get_file的数组格式)
# 使用re.split将内容按[CQ:...]格式切分
items = re.split(r"(\[CQ:.*?\])", content)
content_parts = []

for item in items:
if not item: # 跳过空字符串
continue
if item.startswith("[CQ:"):
if "CQ:text" in item:
match = re.search(r"text=([^]]+)]", content)
content_parts.append(match.group(1))
elif "CQ:face" in item:
match = re.search(r"id=([^]]+)]", content)
face_id = config["emojis"].get(int(match.group(1)), "未知表情")
content_parts.append(f"[{face_id}]")
elif "CQ:mface" in item:
match = re.search(r"summary=([^,]+),", content)
content_parts.append(match.group(1))
elif "CQ:rps" in item:
match = re.search(r"result=([^]]+)]", content)
rps_mapping = {"1": "布", "2": "剪刀", "3": "石头"}
rps_result = rps_mapping.get(match.group(1), "未知结果")
content_parts.append(f"[猜拳:{rps_result}]")
elif "CQ:mface" in item:
match = re.search(r"result=([^]]+)]", content)
content_parts.append(f"[掷骰子{match.group(1)}点]")
elif "CQ:forward" in item:
content_parts.append(f"[合并转发消息]")
elif "CQ:node" in item:
content_parts.append(f"[合并转发节点]")
elif "CQ:image" in item:
match = re.search(r"file=([^,]+),", content)
content_parts.append(f"[图片{match.group(1)}]")
elif "CQ:record" in item:
match = re.search(r"file=([^,]+),", content)
content_parts.append(f"[音频{match.group(1)}]")
elif "CQ:file" in item:
match = re.search(r"file=([^,]+),", content)
content_parts.append(f"[文件{match.group(1)}]")
elif "CQ:reply" in item:
content_parts.append(f"[回复消息]")
elif "CQ:at" in item:
match = re.search(r"qq=([^]]+)]", content)
content_parts.append(f"[@{match.group(1)}]")
else:
log_event("LRobot", "系统未知错误", f"收到无法解析的消息:{item}")
else:
content_parts.append(item)

content_join = "".join(content_parts)
return content_join


async def revoke(data):
# 获取撤回消息
info = {
"action": "get_msg",
"params": {
"message_id": data.get("message_id"),
},
"echo": "revoke",
}
await global_ws.get("ws").send(json.dumps(info))


async def get_poke(data):
# 被戳了
msg = Msg(
robot="LR5921" if str(data.get("target_id")) == config["LR5921"] else "LR232",
content="戳戳",
kind=24 if data.get("group_id") else 14,
group=data.get("group_id"),
qq=data.get("user_id"),
)
await msg_add(msg)


async def get_like(data):
# 被点赞了
user_id = data.get("user_id")
like_id = data.get("likes")[0].get("emoji_id")
info = {
"action": "get_msg",
"params": {
"message_id": data.get("message_id"),
},
"echo": "get_like" + "|" + str(like_id) + "|" + str(user_id),
}
await global_ws.get("ws").send(json.dumps(info))


async def echo_msg_send(echo, data):
# 发送消息的回应
parts = echo.split("|")
if len(parts) == 2:
user_id = parts[1]
info = {
"action": "get_msg",
"params": {
"message_id": data.get("message_id"),
},
"echo": f"msg_get|{user_id}",
}
await global_ws.get("ws").send(json.dumps(info))


async def echo_msg_get(echo, data):
# 获取发送的消息并记录
parts = echo.split("|")
if len(parts) == 2:
user_id = parts[1]
content = await array_join(data.get("raw_message"))
msg = Msg(
robot="LR5921",
content=content,
kind=31 if data.get("message_type") == "private" else 32,
group=data.get("group_id"),
qq=user_id,
)
await msg_log(msg)


async def echo_get_like(echo, data):
# 获取回应的原消息
parts = echo.split("|")
if len(parts) == 3: # 确保分割后的数组有 3 部分
like_id = parts[1] # 提取 like_id
user_id = parts[2] # 提取 user_id
content = await array_join(data.get("raw_message"))
msg = Msg(
robot="LR5921",
content=content,
kind=35,
info=config["emojis"].get(int(like_id), "未知表情"),
group=data.get("group_id"),
qq=user_id,
)
await msg_add(msg)


async def echo_revoke(data):
# 获取撤回的原消息
content = await array_join(data.get("raw_message"))
msg = Msg(
robot="LR5921",
content=content,
kind=19 if data.get("message_type") == "private" else 29,
group=data.get("group_id"),
qq=data.get("user_id"),
)
await msg_add(msg)


async def echo_flush_speak(data):
# 刷新群成员最后发言时间
asia_tz = ZoneInfo("Asia/Shanghai")
today_start = datetime.now(asia_tz).replace(
hour=0, minute=0, second=0, microsecond=0
)
today_start = int(today_start.timestamp())

for user in data:
last_sent_time = user["last_sent_time"]
if last_sent_time < today_start: # 只刷新未显示今日发言的
info = {
"action": "get_group_member_info_rate_limited",
"params": {
"group_id": config["水群"],
"user_id": user["user_id"],
"no_cache": True, # 强制刷新缓存
},
}
await task_queue.put(info)


async def echo_get_speak(data):
# 获取今日发言的人
asia_tz = ZoneInfo("Asia/Shanghai")
today_start = datetime.now(asia_tz).replace(
hour=0, minute=0, second=0, microsecond=0
) # 获取今天的开始时间戳(从零点开始)
today_start = int(today_start.timestamp())

today_speakers = []
for user in data:
last_sent_time = user["last_sent_time"]
if last_sent_time >= today_start:
today_speakers.append(
{"nickname": user["nickname"], "user_id": user["user_id"]}
)

content = "今日水群发言:" + ",".join(
speaker["nickname"] for speaker in today_speakers
)

# 发送至内阁
info = {
"action": "send_msg",
"params": {"group_id": config["内阁"], "message": content},
"echo": "msg_send",
}
await global_ws.get("ws").send(json.dumps(info))


async def echo_get_talkative(data):
user_id = data["current_talkative"]["user_id"]
nickname = data["current_talkative"]["nickname"]

# 发送至内阁
content = "今日水群龙王:" + nickname

info = {
"action": "send_msg",
"params": {"group_id": config["内阁"], "message": content},
"echo": "msg_send",
}
await global_ws.get("ws").send(json.dumps(info))


async def echo_get_users(data):
# 获取群成员
if data[0]["group_id"] == 920712228:
# 水群和平台群
i = 1
else:
# 社员群
i = 2
user_list = [(user["user_id"], user["nickname"]) for user in data]
await add_users(user_list, i)


async def echo_get_file(echo, data):
parts = echo.split("|")
if len(parts) == 2:
qq = parts[1]
url = data.get("file")
content = os.path.basename(url)
msg = Msg(
robot="LR5921",
content=content,
kind=5,
file_url=url,
qq=qq,
seq=data.get("message_id"),
)
await msg_add(msg)


async def echo_add_activities(echo, data):
# 添加活动的消息序号
parts = echo.split("|")
if len(parts) == 2:
task_id = parts[1]
msg_id = data.get("message_id")
await activities_edit(task_id, "msg_id", msg_id)


async def echo_add_activity_group(echo, data):
print(1)
# 添加活动的二维码图片序号
parts = echo.split("|")
if len(parts) == 2:
task_id = parts[1]
pic_id = data.get("message_id")
print(pic_id)
await activities_edit(task_id, "pic_id", pic_id)