数据
数据库设计
由于在知识库更新时同时 100 条数据写入超时,且日志数据较多每次前端读取慢,故使用 Mysql 记录数据,Mongodb 记录日志
- 采用 mongo:4.4(机器搭载不了最新的) 与 mysql:8.0 镜像
- Mongodb 和 Mysql 均挂载备份文件;Mysql 挂载初始化文件来创建表以维持系统基础运行
- config 中自动初始化 Mongodb 连接(供其他服务写入日志),Mysql 连接需要自行调用 mysql_init
数据备份
- logic/data/backup.py 中包含备份函数,将 mysql 和 mongodb 数据库中的数据备份至 storage/data/backup 中
- main 中设置定时任务,每天备份一次
- 也可以进入 lrobot 容器执行 backup.py 手动备份,详见此处
- 数据库恢复方法
docker exec -it mysql bashmysql -u root lrobot_data < /app/backup/mysql_2025-07-03.sql(记得改日期)docker exec -it mongodb bashmongorestore --drop --uri="mongodb://localhost:27017" /app/backup/mongo_2025-07-03(记得改日期)
数据库查询
- 引入 database_query,database_update 进行数据库查询
- 传入语句,参数
- 返回字典列表,行 id
Mysql
Mysql 改回 sqlite 需要把 Mysql 语句中的 %s 改成 ?
初始化
- 初始化使用 initdb 建立表
- 若已经有数据(storage/data/mysql)则 initdb 不生效,需要进入容器/使用 pycharm 命令行新建
数据结构
- 所有库都以 id 为主键,可设置唯一键
- 所有数据不能定义非空,否则数据库页面的"新建行"操作将失效
- mysql 允许唯一键为空值,新建行会把所有值设置为空值
提交处理
- 在 Mysql 连接时设置 autocommit=False 来控制数据库提交时的失败回滚
- 但在设置泡泡页面广播时出现异常: A 页面更新某泡泡位置提交到后端后,反复刷新页面,数据库查询的结果会在原位置和新位置反复跳动
- 使用 pycharm 连接 Mysql 数据库可以看到数据正常提交,且没有出现反复变化的情况
- 理由为:
- database_update 执行了显示提交 conn.commit(),更新操作是持久化的
- 而 database_query 中只是执行了查询,没有提交事务,因此这个连接的事务实际上没有结束(即使查询已完成)
- 当 database_query 的连接被释放回连接池,被另一个请求再次获取时,这个连接上可能存在一个未提交的事务(一个没有写操作的空事务)
- 所以这个连接再次被用于 GET 请求,执行新的查询时,由于 REPEATABLE READ 的隔离级别,它可能会继续使用之前建立的快照(旧数据),而看不到其他连接提交的更新
- 解决办法: 在 database_query 执行前加上一句 await conn.commit()
创建表
- 现在不需要记录创建表的语句,改为直接使用 mysql_sql 新建并导入数据,但保留了以前的 init_sql
- 忘记怎么创建表了来这里看看
init_sql
CREATE TABLE IF NOT EXISTS system_command (
id INT AUTO_INCREMENT PRIMARY KEY,
command VARCHAR(255),
user VARCHAR(255),
platform VARCHAR(255),
recv_content TEXT,
send_content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS system_data(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) UNIQUE,
text TEXT
);
CREATE TABLE IF NOT EXISTS system_feedback(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
questions JSON,
responses JSON,
period TIMESTAMP
)
CREATE TABLE IF NOT EXISTS system_ip(
id INT AUTO_INCREMENT PRIMARY KEY,
ip TEXT,
count INTEGER,
first_time INTEGER
);
CREATE TABLE IF NOT EXISTS system_joke(
id INT AUTO_INCREMENT PRIMARY KEY,
text TEXT
);
CREATE TABLE IF NOT EXISTS system_panel (
id INT AUTO_INCREMENT PRIMARY KEY,
name TEXT,
description TEXT,
url TEXT,
tasks TEXT
);
CREATE TABLE system_remind (
id INT AUTO_INCREMENT PRIMARY KEY,
time DATETIME,
content TEXT,
user VARCHAR(255)
);
CREATE TABLE IF NOT EXISTS system_timeline (
id INT AUTO_INCREMENT PRIMARY KEY,
node_id INTEGER,
date DATE,
event TEXT,
tag TEXT
);
CREATE TABLE IF NOT EXISTS system_wiki (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255),
group_name VARCHAR(255),
content Text,
sort INT
);
CREATE TABLE IF NOT EXISTS user_information (
id INT AUTO_INCREMENT PRIMARY KEY,
qq BIGINT UNIQUE,
codename VARCHAR(50),
name VARCHAR(50),
grade VARCHAR(50),
gender ENUM('男', '女') DEFAULT NULL,
major VARCHAR(100),
student_id VARCHAR(20),
phone VARCHAR(20),
political_status VARCHAR(50),
hometown VARCHAR(100),
card_number VARCHAR(20),
card_id VARCHAR(30)
);
CREATE TABLE user_material (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) UNIQUE,
num VARCHAR(256)
);
CREATE TABLE user_media (
id INT AUTO_INCREMENT PRIMARY KEY,
filepath VARCHAR(255) UNIQUE,
media_id VARCHAR(256),
wechat DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
media_json JSON,
qq DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
media_url JSON
);
CREATE TABLE user_nickname (
id INT AUTO_INCREMENT PRIMARY KEY,
user BIGINT UNIQUE,
nickname VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS user_status (
id INT AUTO_INCREMENT PRIMARY KEY,
user VARCHAR(255) UNIQUE,
status TEXT,
information TEXT
);
CREATE TABLE IF NOT EXISTS user_subscribe (
id INT AUTO_INCREMENT PRIMARY KEY,
user VARCHAR(255),
sub VARCHAR(64),
info VARCHAR(64),
UNIQUE KEY unique_user_sub (user, sub) -- 防止重复订阅
);
CREATE TABLE IF NOT EXISTS user_test (
id INT AUTO_INCREMENT PRIMARY KEY,
user BIGINT UNIQUE,
nickname VARCHAR(255),
name VARCHAR(255),
password VARCHAR(255)
);
INSERT INTO system_joke (text) VALUES
('华生:“你怎么知道我要喝三分糖?”
福尔摩斯:“你嘴角有蚂蚁排队,说明糖量不足致死;你手机屏保是‘抗糖宣言’,但手指在‘全糖’选项上有划痕……”
华生:“停!我只是今天想放纵!”
福尔摩斯:“……哦,那是我推理错了。”'),
('程序员单膝跪地:“你是我的全局唯一解!”
女友:“那你的前女友呢?”
程序员:“她们是局部最优解,但收敛不到你。”'),
('“先有鸡还是先有蛋?”
哲学家:“先有‘先’这个字。”'),
('数学家向朋友炫耀:「我能用归纳法证明所有人都是光头!」
朋友:「???」
数学家:「1个人是光头,假设k个人是光头,那么k+1个人也是光头,证毕。」
朋友:「你漏了归纳基础,第一个人根本不是光头!」
数学家摸头:「啊,我确实是光头。」'),
('警长:“这密室杀人案必须请顶级侦探!”
局长:“不用,把案发现场挂到Airbnb上,写上‘凶宅半价’,凶手会来自首的。”'),
('三个逻辑学家走进酒吧。
酒保问:「三位都要啤酒吗?」
第一个说:「我不知道。」
第二个说:「我也不知道。」
第三个说:「是的,都要。」'),
('女友:“你和前女友纠缠不清是吧?”
男友:“不!我和她的关系是量子态的——你不观测时不存在!”
女友:“那我观测一下你的手机?”
男友:“……系统坍缩了。”'),
('教授:“给猴子一台打字机,迟早打出《莎士比亚全集》!”
学生:“那它们打出‘您访问的网站存在风险’怎么办?”
教授:“……说明猴子当网警了。”'),
('妻子:「你说你爱我,到底有多爱?」
程序员:「我对你的爱,等于我对你的爱加上1。」
妻子:「死循环了是吧?」
(无限递归表白,卒)'),
('老婆:“买颗白菜,顺便带根葱。”
程序员老公:“最优路径是:先到葱摊,因为葱的保质期是白菜的0.3倍;但根据实时人流……”
老婆:“你买了三小时还没回来?!”
老公:“……我在模拟退火算法。”'),
('死者:“我在虚拟世界被杀了!这犯法吗?”
警察:“根据《刑法》第250条,凶手得先证明你是真人。”
死者:“……我微信余额还有3块5!”
警察:“立案了!”'),
('凶手:“我用了完美分尸手法,你绝对找不到证据!”
侦探:“确实,但你把肝扔进了厨余垃圾桶——”
凶手:“所以呢?!”
侦探:“小区垃圾分类奖金被扣了,保洁阿姨供出了你。”'),
('老婆:“买两斤土豆,要表面光滑摩擦系数小的。”
物理学家:“你是要做惯性实验还是炖牛肉?”
老婆:“……要好削皮的。”'),
('凶手:“案发时我在元宇宙挖矿!”
警察:“但你显卡温度记录显示当时在玩《原神》。”
凶手:“……我承认,是我干的。”'),
('客户:“我要一个绝对完美的密室杀人案!”
侦探:“好的,方案是:门没锁,但甲方坚持说它是密室。”');
Mongodb
- DatabaseHandler 将日志存入队列 log_queue
- 在主任务中调用 log_writer 即可开始写入日志
- 在 lrobot 的 __init__ 中创建索引(有数据则创建得慢,百万条十索引大概 8 分钟;新库基本上不要时间)
索引
建立索引
- 在 main 的 start 初始化时建立索引
- 格式化索引字典,将第一字段作为 key,第二字段作为 partial 的值,加上 name 参数
- 获取已存在索引并格式化成 name:key,partial 的格式
- 比较
- 若为 text 索引,特殊处理
- 从已有索引中提取 text,若存在,则不创建
- 检测是否有同名索引
- 若同名且其他两个参数相同,为相同索引,跳过此索引
- 若参数不同,删除原索引,继续
- 检测是否有同键索引(只需要比较 key)
- 若有,删除原索引
- 新建索引
- 若为 text 索引,特殊处理
索引相关知识
本人只知道这几条(
- 索引创建后在数据库中一直存在
- 索引是关于键唯一的,即使设置了不同的 partial,也会互相冲突,所以尽量只设置 key(除非你确定其他用不上索引)
- 关于 text 索引,mongo 会进行以下处理
- 所有字母转小写
- 除下划线外其他字符变成分隔符
- 忽略常见英文功能词 the,a,and 等等
- 提取词干,复数、进行时等统一(测过了,过去式可以,过去分词不行www)
- 存入逻辑倒排表里
- 以下为测试时发现的:
- 在 level+event+source 的查询情况下,mongo 会优先将这三者查询的结果统计出来,然后对其中一部分进行 '^定时任务'的搜索,而不是分别匹配索引
([("time", -1), ("message", 1)], {"source": "system", "event": "错误堆栈", "level": "DEBUG"})这个索引在 or 条件中会被认为太严格而不使用,转而使用{'source': 1, 'event': 1, 'level': 1, 'message': 1, 'time': -1}这条全局的索引- 在不同 '^xx' 的查询中,根据占比会使用不同的索引,由于 '消息处理' 在日志中占比很高,直接使用 message 进行筛选能直接筛选出其他条件完全符合的消息;而 '后端运行' 占比低,使用其他条件先筛选更快
- 在查询的 source 占比较多时(如 LR5921),mongo 会使用
{'source': 1, 'time': -1}来筛选而不会使用更快的{'message': 1}
调试指令
docker exec -it mongodb bash
mongo
use lrobot_log
db.system_log.getIndexes()
db.system_log.dropIndexes()
db.system_log.dropIndexes("idx_11")
show dbs
db.dropDatabase()
关于正则表达式检索中文提速三种方法的反驳
- 正方:chatgpt,反方:其他 AI 若干
- 辩论结果:其他 AI 均接收了反驳并支持正方
- 经测试,message:text 确实是一个词,其他两个也是对的
1) if keyword: query["$text"] = {"$search": keyword.strip()} + db.collection.createIndex({ "message": "text" })
语法上这是合法的($text + text 索引),对英文/西文语言及 MongoDB 提供的若干小语种能起效果。
但对中文:自建/Community 部署的 text 索引并不会做像 Elastic/Atlas Search 那样的中文分词(词语切分),因此对中文的检索能力是非常有限的(通常会把整段中文当作 token,从而无法实现常见的“包含某中文词”分词检索)。只有在某些 Enterprise 扩展(历史上通过 Rosette / RLP 的整合)或使用 Atlas Search(Lucene 分词器)时,才能获得真正的中文分词能力。
结论 / 建议:如果您是 Atlas 用户,请使用 Atlas Search(lucene.chinese);如果是 自建 Community,建议在写入时自己做中文分词(或构建 n-gram 字段)并在该分词字段上建索引。
2) db.collection.createIndex({ "message": "text" }, { "collation": { "locale": "zh", "strength": 1 } })
错误 / 不被支持:MongoDB 的 text 索引不支持 collation 选项(text 索引使用二进制比较)。如果集合使用了非 simple collation,要创建 text 索引必须显式使用 { collation: { locale: "simple" } }。官方文档明确说明 text 索引不支持 collation。
结论 / 建议:
不要给 text 索引设置中文 collation — 这样不会带来中文分词效果,且可能被拒绝或要求 simple。创建 text 索引时如果遇到集合 collation 问题,可按文档示例用 collation: { locale: "simple" }
3) db.logs.createIndex({ message: "wildcard" }, { wildcardProjection: { message: 1 } })
这是不合法 / 无效的写法。wildcardProjection 选项 仅 在您创建的索引键为 "$**"(即对全部字段建立通配符索引)时才有效;当您在索引键里指定具体字段路径(例如 message: "wildcard")时,不能使用 wildcardProjection。换句话说,要用 wildcardProjection 必须像下面这样写:
db.logs.createIndex(
{ "$**": 1 },
{ wildcardProjection: { message: 1 } }
)
并且:即便是合法的 wildcard 索引,它的目标是为“字段名不确定 / 动态字段”的场景做索引(减少为每个可能字段单独建索引的开销)。对单一稳定字段 message,直接用普通索引 { message: 1 } 更简单、更高效。并且 wildcard 索引并不会把任意位置的正则搜索(.*xxx.*)变成走索引;正则能否走索引仍然依赖正则是否是前缀表达式等条件。
数据库比较(废弃)
- 在使用 sqlite 数据库时对多种连接方式进行了比较
- 异步队列在调用数据库时会由于读写锁导致延迟
- 在直接读写(靠锁来自行分配)、分五个库(同步多操作)、建立连接池(减少建立连接时间)、使用批量提交(一次性提交)中,批量提交的方法效率最高且接近极限效率
- 假设:建立连接:5ms,提交事务:1ms/10ms(批量),执行事务:1ms,错误日志:0.1ms
| 方案 | 连接方式 | 连接耗时 | 事务提交耗时 | 事务执行耗时 | 错误重试耗时 | 总时间 |
|---|---|---|---|---|---|---|
| 直接读取 | 每次新建连接 | 10000 * 5ms | 10000 * 1ms | 10000 * 1ms | 100 * 0.1ms | 70.01 秒 |
| 拆分 5 个数据库 | 每库 1 连接 | 2000 * 5ms | 2000 * 1ms | 2000 * 1ms | 100 * 0.1ms | 14.01 秒 |
| 连接池(5 连接) | 复用连接 | 5 * 5ms | 2000 * 1ms | 2000 * 1ms | 100 * 0.1ms | 4.035 秒 |
| 批量提交(100/批) | 每次新建连接 | 2 * 100 * 5ms | 2 * 100 * 10 ms | 100 * (100+99) * 1ms | 100 * 0.1ms | 22.91 秒 |
- 可以发现,批量提交并没有达到极致效率。拆分和连接池主要是提高并行效率(5倍),同时增加cpu使用率(5倍),拆分相比连接池差在增加了连接耗时
- 同时可以比较批量提交+单连接复用与连接池,可以发现,在忽略连接时间时,由于错误处理,前者事务提交耗时翻倍,即使事务执行耗时变成了0.1,也是原来的1.05倍时间,后者则只是原来的0.2倍时间
- 使用 condition 会出现同时唤醒所有正在等待的协程的情况
- 测试用例如下,可以运行查看数据的处理顺序
数据库连接池及测试
import re
import asyncio
import aiosqlite
from config import path
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_path: str, max_connections: int = 50):
self.db_path = db_path
self.max_connections = max_connections
self.pool = [] # 连接池
self.condition = asyncio.Condition() # 用于条件同步
async def init(self):
"""初始化连接池"""
self.pool = [
await aiosqlite.connect(self.db_path) for _ in range(self.max_connections)
]
async def get_connection(self):
"""取出连接"""
async with self.condition: # 等待直到池中有可用连接
try:
await asyncio.wait_for(
self.condition.wait_for(lambda: len(self.pool) > 0), 10
)
except Exception as e:
raise Exception(f"连接池异常 -> 获取连接超时 | 异常: {e}")
return self.pool.pop(0)
async def release_connection(self, conn):
"""释放连接"""
async with self.condition:
self.pool.append(conn)
self.condition.notify() # 通知其他等待的任务
async def term(self):
"""清理连接池"""
async with self.condition:
for conn in self.pool:
await conn.close()
self.pool.clear()
self.condition.notify_all()
async def query_database(query: str, params: tuple = ()):
"""查询语句"""
conn = await db_manager.get_connection()
cursor = None
try:
cursor = await conn.execute(query, params)
rows = await cursor.fetchall()
columns = [column[0] for column in cursor.description]
result = [dict(zip(columns, row)) for row in rows]
return result
except Exception as e:
raise Exception(f"查询语句异常 -> {e} | 查询: {query} | 参数: {params}")
finally:
if cursor:
await cursor.close()
await db_manager.release_connection(conn)
async def update_database(query: str, params: tuple = ()):
"""更新语句"""
conn = await db_manager.get_connection()
cursor = None
try:
cursor = await conn.execute(query, params)
await conn.commit()
table_match = re.search(
r"(?:INSERT INTO|UPDATE|DELETE FROM)\s+`?(\w+)`?", query, re.IGNORECASE
)
if table_match:
table_name = table_match.group(1)
if not table_name.startswith("system"):
from web.backend.cab.database import broadcast_db_update
await broadcast_db_update()
return cursor.lastrowid
except Exception as e:
await conn.rollback()
raise Exception(f"更新语句异常 -> {e} | 更新: {query} | 参数: {params}")
finally:
if cursor:
await cursor.close()
await db_manager.release_connection(conn)
db_manager = DatabaseManager(db_path=path / "storage/lrobot.db")
# 下面代码为测试代码
import asyncio
from logic import execute_update, execute_query, db_manager
async def db_test():
"""测试连接池是否生效"""
await db_manager.initialize()
try:
# 清空测试数据
await execute_update("UPDATE users_status SET qq_number = NULL", ())
# 并发写入测试
queries = [
("UPDATE users_status SET qq_number = ? WHERE id = ?", ("123", 1)),
("UPDATE users_status SET qq_number = ? WHERE id = ?", ("456", 2)),
("UPDATE users_status SET qq_number = ? WHERE id = ?", ("789", 3)),
("UPDATE users_status SET qq_number = ? WHERE id = ?", ("987", 4)),
("UPDATE users_status SET qq_number = ? WHERE id = ?", ("654", 5)),
("UPDATE users_status SET qq_number = ? WHERE id = ?", ("321", 6)),
("UPDATE users_status SET qq_number = ? WHERE id = ?", ("777", 7)),
("UPDATE users_status SET qq_number = ? WHERE id = ?", ("888", 8)),
]
tasks = [execute_update(q, p) for q, p in queries]
update_counts = await asyncio.gather(*tasks)
print(f"更新影响行数: {update_counts}")
# 验证查询测试
result = await execute_query(
"SELECT id, qq_number FROM users_status WHERE id IN (1,2,3,4,5,6)"
)
print("查询结果:", result)
return result # 返回结果供进一步验证
finally:
await db_manager.close()
asyncio.run(db_test())