进程通信
进程通信
- 存在进程间通信的情况,即
- 消息 A 收到 "Hello",但需等到消息 B 收到 "World" 后才能拼接成完整内容
- 由于消息 B 到达时间不确定,必须有机制让 A 等待或监听 B 的结果
- 一般分为三种处理方式
- while True 循环检查
- queue.get() 等待队列中的数据
- future 协程变量
- 方法 1 内存开销大;方法 2 不适合处理多个 B 发送的情况(假设 A1 要等待 B1,A2 要等待 B2……使用一个队列无法区分,需要多个队列)
- 在单文件中,如日志写入,适合用队列方法;在多文件多处理的情况下,适合用 future
future 机制
- 在事件循环中,某个协程使用
await future后,线程变为挂起状态 - 当
set future后,协程变为就绪状态,在事件循环进行到该协程时执行协程
future 管理器
- 项目使用一个 future 管理器来简化 future 的创建与设值过程
使用示例
- 创建
await future.wait(seq,"消息超时!!!") - 创建可设置超时时间,默认 20
- 报错不填的话,会使用默认报错
[future]请求失败-> {key} 获取超时: {timeout}s - 由于超时默认报错,如果需要其他逻辑需要
try: except asyncio.TimeoutError:来捕获这个错误并进行其他逻辑 - 设值
future.set(seq, response)
不同进程间通信
- 若在一些独立运行的线程中设置future(如 flask 中自带的 werkzeug 日志记录器),由于不和主事件循环处于同一线程,无法通知循环中协程的 future 更新
- 参考以下 A、B 两种解决方式,本项目使用 B 方式
- A 中,只调用 a、b 函数的情况下,由于 flask 是在另一个线程中执行的,future(a) 的值被设置后不会通知 b(但值已经变了)
- A 中取消 d 函数中关于 c 的注释,由于 c 会频繁检查 future(a) 的值,会触发 future(a) 的更新机制,通知 b 函数
- B 则是除了频繁检查外的另一种解决办法,使用 loop.call_soon_threadsafe 通知其他的线程
A
import asyncio
from flask import Flask
class FutureManager:
def __init__(self):
self.futures = {} # 用于存储 Future 对象的字典
def get(self, key):
# 获取已有的 Future 对象,若不存在则创建一个新的
if key not in self.futures:
self.futures[key] = asyncio.Future()
return self.futures[key]
def set(self, key, result):
# 设置 Future 对象的结果
_future = self.get(key)
if not _future.done():
_future.set_result(result)
future = FutureManager()
def a():
app = Flask(__name__)
future.set("a", "b")
app.run("0.0.0.0", 5922, debug=False)
async def b():
f = future.get("a")
await f
print(11111111)
async def c():
f = future.get("a")
while True:
await asyncio.sleep(10)
print(f"Future status before await: {f.done()}")
async def d():
tasks = [
asyncio.to_thread(a()),
b(),
# c(),
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(d())
B
import asyncio
from flask import Flask
class FutureManager:
def __init__(self):
self.futures = {}
self.loop = None
def bind_loop(self, loop):
self.loop = loop
def get(self, key):
if key not in self.futures:
# 使用主循环创建Future
self.futures[key] = self.loop.create_future()
return self.futures[key]
def set1(self, key, result):
_future = self.get(key)
if not _future.done():
self.loop.call_soon_threadsafe(_future.set_result, result)
self.loop.call_soon_threadsafe(lambda: None)
future = FutureManager()
def a():
app = Flask(__name__)
future.set1("a", "b") # 现在可以安全执行
app.run("0.0.0.0", 5922, debug=False)
async def b():
f = future.get("a")
await f
print("11111111 成功输出!")
async def d():
loop = asyncio.get_running_loop()
future.bind_loop(loop)
tasks = [
asyncio.to_thread(a),
b(),
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(d())import asyncio
from flask import Flask
class FutureManager:
def __init__(self):
self.futures = {}
self.loop = None
def bind_loop(self, loop):
self.loop = loop
def get(self, key):
if key not in self.futures:
# 使用主循环创建Future
self.futures[key] = self.loop.create_future()
return self.futures[key]
def set1(self, key, result):
_future = self.get(key)
if not _future.done():
self.loop.call_soon_threadsafe(_future.set_result, result)
self.loop.call_soon_threadsafe(lambda: None)
future = FutureManager()
def a():
app = Flask(__name__)
future.set1("a", "b") # 现在可以安全执行
app.run("0.0.0.0", 5922, debug=False)
async def b():
f = future.get("a")
await f
print("11111111 成功输出!")
async def d():
loop = asyncio.get_running_loop()
future.bind_loop(loop)
tasks = [
asyncio.to_thread(a),
b(),
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(d())
ws连接通信
不知道在说什么(
- 接收 ws 信息存在冲突
- 原因在于 on_message 函数捕获了所有的请求,调用 API 后自动 await 的回应接收不到
- 在接收调用 API 的回应时
- A 为 on_message,B 为发送 API,C 为等待的回复,D 为回复类消息
A {
if: xxx{ B; await C;}
else: D
}- A 接收 {B 发送,等待 C,接收 C} ,D 接收
- 则 B 发送的消息会被 C 接收到
- 而在其他地方使用 B 和 C
A {
if: xxx{ E;}
else: D
}
D {
B; await C;
}- A 接收,D 接收
- B 发送,等待 C,接收 C
- C 本来要接收的消息会被 E 捕获
- 推测为 ws 连接优先接收的机制,所有的消息在接收时都会经过 on_message,除非是 on_message 中自己的等待逻辑阻塞了当前的 on_message,即一个 on_message 直接触发的逻辑在等待结果,会先于其他 on_message 捕获结果