跳到主要内容

进程通信

进程通信

  • 存在进程间通信的情况,即
    • 消息 A 收到 "Hello",但需等到消息 B 收到 "World" 后才能拼接成完整内容
    • 由于消息 B 到达时间不确定,必须有机制让 A 等待或监听 B 的结果
  • 一般分为三种处理方式
    • while True 循环检查
    • queue.get() 等待队列中的数据
    • future 协程变量
  • 方法 1 内存开销大;方法 2 不适合处理多个 B 发送的情况(假设 A1 要等待 B1,A2 要等待 B2……使用一个队列无法区分,需要多个队列)
  • 在单文件中,如日志写入,适合用队列方法;在多文件多处理的情况下,适合用 future

future 机制

  • 在事件循环中,某个协程使用 await future 后,线程变为挂起状态
  • set future 后,协程变为就绪状态,在事件循环进行到该协程时执行协程

future 管理器

  • 项目使用一个 future 管理器来简化 future 的创建与设值过程

使用示例

  • 创建 await future.wait(seq,"消息超时!!!")
  • 创建可设置超时时间,默认 20
  • 报错不填的话,会使用默认报错[future]请求失败-> {key} 获取超时: {timeout}s
  • 由于超时默认报错,如果需要其他逻辑需要 try: except asyncio.TimeoutError: 来捕获这个错误并进行其他逻辑
  • 设值 future.set(seq, response)

不同进程间通信

  • 若在一些独立运行的线程中设置future(如 flask 中自带的 werkzeug 日志记录器),由于不和主事件循环处于同一线程,无法通知循环中协程的 future 更新
  • 参考以下 A、B 两种解决方式,本项目使用 B 方式
  • A 中,只调用 a、b 函数的情况下,由于 flask 是在另一个线程中执行的,future(a) 的值被设置后不会通知 b(但值已经变了)
  • A 中取消 d 函数中关于 c 的注释,由于 c 会频繁检查 future(a) 的值,会触发 future(a) 的更新机制,通知 b 函数
  • B 则是除了频繁检查外的另一种解决办法,使用 loop.call_soon_threadsafe 通知其他的线程
A
import asyncio
from flask import Flask

class FutureManager:
def __init__(self):
self.futures = {} # 用于存储 Future 对象的字典

def get(self, key):
# 获取已有的 Future 对象,若不存在则创建一个新的
if key not in self.futures:
self.futures[key] = asyncio.Future()
return self.futures[key]

def set(self, key, result):
# 设置 Future 对象的结果
_future = self.get(key)
if not _future.done():
_future.set_result(result)


future = FutureManager()


def a():
app = Flask(__name__)
future.set("a", "b")
app.run("0.0.0.0", 5922, debug=False)


async def b():
f = future.get("a")
await f
print(11111111)


async def c():
f = future.get("a")
while True:
await asyncio.sleep(10)
print(f"Future status before await: {f.done()}")


async def d():
tasks = [
asyncio.to_thread(a()),
b(),
# c(),
]
await asyncio.gather(*tasks)


if __name__ == "__main__":
asyncio.run(d())
B
import asyncio
from flask import Flask

class FutureManager:
def __init__(self):
self.futures = {}
self.loop = None

def bind_loop(self, loop):
self.loop = loop

def get(self, key):
if key not in self.futures:
# 使用主循环创建Future
self.futures[key] = self.loop.create_future()
return self.futures[key]

def set1(self, key, result):
_future = self.get(key)
if not _future.done():
self.loop.call_soon_threadsafe(_future.set_result, result)
self.loop.call_soon_threadsafe(lambda: None)


future = FutureManager()


def a():
app = Flask(__name__)
future.set1("a", "b") # 现在可以安全执行
app.run("0.0.0.0", 5922, debug=False)


async def b():
f = future.get("a")
await f
print("11111111 成功输出!")


async def d():
loop = asyncio.get_running_loop()
future.bind_loop(loop)
tasks = [
asyncio.to_thread(a),
b(),
]
await asyncio.gather(*tasks)


if __name__ == "__main__":
asyncio.run(d())import asyncio
from flask import Flask


class FutureManager:
def __init__(self):
self.futures = {}
self.loop = None

def bind_loop(self, loop):
self.loop = loop

def get(self, key):
if key not in self.futures:
# 使用主循环创建Future
self.futures[key] = self.loop.create_future()
return self.futures[key]

def set1(self, key, result):
_future = self.get(key)
if not _future.done():
self.loop.call_soon_threadsafe(_future.set_result, result)
self.loop.call_soon_threadsafe(lambda: None)


future = FutureManager()


def a():
app = Flask(__name__)
future.set1("a", "b") # 现在可以安全执行
app.run("0.0.0.0", 5922, debug=False)


async def b():
f = future.get("a")
await f
print("11111111 成功输出!")


async def d():
loop = asyncio.get_running_loop()
future.bind_loop(loop)
tasks = [
asyncio.to_thread(a),
b(),
]
await asyncio.gather(*tasks)


if __name__ == "__main__":
asyncio.run(d())

ws连接通信

不知道在说什么(

  • 接收 ws 信息存在冲突
  • 原因在于 on_message 函数捕获了所有的请求,调用 API 后自动 await 的回应接收不到
  • 在接收调用 API 的回应时
    • A 为 on_message,B 为发送 API,C 为等待的回复,D 为回复类消息
    A {
    if: xxx{ B; await C;}
    else: D
    }
    • A 接收 {B 发送,等待 C,接收 C} ,D 接收
    • 则 B 发送的消息会被 C 接收到
  • 而在其他地方使用 B 和 C
    A {
    if: xxx{ E;}
    else: D
    }
    D {
    B; await C;
    }
    • A 接收,D 接收
    • B 发送,等待 C,接收 C
    • C 本来要接收的消息会被 E 捕获
  • 推测为 ws 连接优先接收的机制,所有的消息在接收时都会经过 on_message,除非是 on_message 中自己的等待逻辑阻塞了当前的 on_message,即一个 on_message 直接触发的逻辑在等待结果,会先于其他 on_message 捕获结果