rag网站的开发基础
AIGC RAG 13

1、数据库ORM技术

引子:使用SQLAlchemy - Python教程 - 廖雪峰的官方网站

ORM(Object-Relational Mapping把关系数据库的表结构映射到对象上

数据库表是一个二维表,包含多行多列。把一个表的内容用Python的数据结构表示出来的话,可以用一个list表示多行,list的每一个元素是tuple,表示一行记录,比如,包含idnameuser表:

[
    ('1', 'Michael'),
    ('2', 'Bob'),
    ('3', 'Adam')
]

但是用tuple表示一行很难看出表的结构。如果把一个tupleclass实例来表示,就可以更容易地看出表的结构来:

class User(object):
    def __init__(self, id, name):
        self.id = id
        self.name = name

[
    User('1', 'Michael'),
    User('2', 'Bob'),
    User('3', 'Adam')
]

这就是传说中的ORM技术:Object-Relational Mapping,把关系数据库的表结构映射到对象上。是不是很简单?

但是由谁来做这个转换呢?所以ORM框架应运而生。

在Python中,最有名的ORM框架是SQLAlchemy。我们来看看SQLAlchemy的用法。

首先通过pip安装SQLAlchemy: pip install sqlalchemy

然后,利用上次我们在MySQL的test数据库中创建的user表,用SQLAlchemy来试试:

第一步,导入SQLAlchemy,并初始化DBSession:

# 导入:
from sqlalchemy import Column, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import declarative_base

# 创建对象的基类,都要继承该类:
Base = declarative_base()

# 定义User对象:
class User(Base):
    # 表的名字:
    __tablename__ = 'user'

    # 表的结构:
    id = Column(String(20), primary_key=True)
    name = Column(String(20))

# 初始化数据库连接:
engine = create_engine('mysql+mysqlconnector://root:password@localhost:3306/test')
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)

create_engine()用来初始化数据库连接。SQLAlchemy用一个字符串表示连接信息:

'数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'

你只需要根据需要替换掉用户名、口令等信息即可。

下面,我们看看如何向数据库表中添加一行记录。

由于有了ORM,我们向数据库表中添加一行记录,可以视为添加一个User对象:

# 创建session对象:
session = DBSession()
# 创建新User对象:
new_user = User(id='5', name='Bob')
# 添加到session:
session.add(new_user)
# 提交即保存到数据库:
session.commit()
# 关闭session:
session.close()

可见,关键是获取session,然后把对象添加到session,最后提交并关闭。DBSession对象可视为当前数据库连接。

如何从数据库表中查询数据呢?有了ORM,查询出来的可以不再是tuple,而是User对象。SQLAlchemy提供的查询接口如下:

# 创建Session:
session = DBSession()
# 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行:
user = session.query(User).filter(User.id=='5').one()
# 打印类型和对象的name属性:
print('type:', type(user))
print('name:', user.name)
# 关闭Session:
session.close()

# type: <class '__main__.User'>
# name: Bob

可见,ORM就是把数据库表的行与相应的对象建立关联,互相转换。

由于关系数据库的多个表还可以用外键实现一对多、多对多等关联,相应地,ORM框架也可以提供两个对象之间的一对多、多对多等功能。

例如,如果一个User拥有多个Book,就可以定义一对多关系如下:

class User(Base):
    __tablename__ = 'user'

    id = Column(String(20), primary_key=True)
    name = Column(String(20))
    # 一对多:
    books = relationship('Book')

class Book(Base):
    __tablename__ = 'book'

    id = Column(String(20), primary_key=True)
    name = Column(String(20))
    # “多”的一方的book表是通过外键关联到user表的:
    user_id = Column(String(20), ForeignKey('user.id'))

当我们查询一个User对象时,该对象的books属性将返回一个包含若干个Book对象的list。

2、FastChat模型管理

FastChat 作为轻量级 LLM 服务框架,其模型管理能力是实现 “多模型统一部署、灵活调度与高效推理” 的核心,涵盖模型支持范围、部署架构、加载配置、调度策略及个性化扩展等关键环节,以下从六大维度系统梳理。

2.1 支持模型范围:覆盖主流开源 LLM

FastChat 通过原生适配与灵活扩展,支持国内外 70 + 种主流开源大语言模型,满足不同场景的推理需求,具体可分为 “官方原生支持” 与 “非官方扩展支持” 两类:

1. 官方原生支持模型

涵盖 Meta、阿里、字节、清华等机构的主流模型,无需额外适配即可直接加载,核心模型及兼容性要求如下表:

模型系列

代表模型

架构兼容性说明

典型应用场景

Meta LLaMA 系列

LLaMA 2(7B/13B/70B)

原生支持 Meta 架构,支持长上下文(4k-100k)

通用对话、代码生成

国产开源模型

ChatGLM3、Qwen(7B/14B)

ChatGLM 需设置tokenizer patch,Qwen 支持流式输出

中文对话、多轮交互

国外开源模型

Mistral/Mixtral、Vicuna

Mistral 支持模型合并推理,Vicuna 需匹配基座权重

低延迟对话、多模态交互

其他特色模型

FastChat-T5、LongChat

T5 支持序列生成,LongChat 适配 32k 长上下文

文本摘要、长文档理解

2. 非官方扩展支持

对于 Hugging Face 上支持AutoModelForCausalLM的任意模型,可通过--model-path参数指定模型路径(本地路径或 HF 仓库地址)实现加载,例如:

bash

# 加载Hugging Face非官方模型(如Baichuan-13B)
python3 -m fastchat.serve.model_worker --model-path baichuan-inc/Baichuan-13B-Chat

扩展适配仅需在model_adapter.py中补充 “模型对话模板” 与 “Tokenizer 处理逻辑”,即可支持自定义模型的多轮对话格式。

二、模型部署架构:控制器 - 工作器分布式模式

FastChat 采用 “控制器(Controller)+ 模型工作器(Model Worker)” 的分布式架构,实现模型与服务解耦,支持多模型并行部署与动态调度,核心架构如下:

1. 核心组件分工

组件名称

功能定位

关键作用

Controller(控制节点)

请求路由与 Worker 状态管理

1. 维护所有 Worker 的模型列表;2. 接收 API/Web 请求,分配至对应 Worker;3. 监控 Worker 健康状态

Model Worker(模型工作器)

单模型独立推理进程

1. 加载指定模型及 Tokenizer;2. 执行推理计算(支持 CPU/GPU/vLLM 加速);3. 接收 Controller 指令,返回推理结果

API Server(接口服务)

提供 OpenAI 兼容 API

1. 封装/v1/chat/completions等接口;2. 处理流式输出、参数调控(Temperature/Top-p)

2. 部署流程:三步启动多模型服务

需按 “Controller→Model Worker→Web/API 服务” 的顺序启动,确保组件间通信正常:

  1. 启动控制器:初始化调度中心,默认监听127.0.0.1:21001

    bash

    python3 -m fastchat.serve.controller
    
  2. 启动模型 Worker:加载目标模型,每个 Worker 对应一个独立模型(支持多卡并行)

    bash

    # 单GPU加载Vicuna-7B
    python3 -m fastchat.serve.model_worker --model-path lmsys/vicuna-7b-v1.5
    # 多GPU(2卡)加载Qwen-14B,限制单卡内存8GiB
    python3 -m fastchat.serve.model_worker --model-path qwen/Qwen-14B-Chat --num-gpus 2 --max-gpu-memory 8GiB
    
  3. 启动服务入口:选择 Web UI 或 API 服务,对外提供交互能力

    bash

    # 启动Gradio Web UI(用于可视化测试)
    python3 -m fastchat.serve.gradio_web_server
    # 启动OpenAI兼容API服务(用于企业级调用)
    python3 -m fastchat.serve.openai_api_server --port 8000

三、模型加载配置:适配不同硬件与性能需求

FastChat 提供细粒度加载配置,支持 CPU/GPU/vLLM 等不同硬件环境,兼顾 “显存占用” 与 “推理速度”,核心配置参数如下:

1. 硬件适配参数

参数名称

作用

示例配置

适用场景

--num-gpus

指定使用 GPU 数量

--num-gpus 2(2 卡并行)

大模型(13B+)加载

--max-gpu-memory

限制单卡模型权重占用内存

--max-gpu-memory 8GiB

避免显存溢出

--cpu-offloading

启用 CPU 卸载(部分层放 CPU)

--cpu-offloading

单 GPU 显存不足(如 14B 模型用 12GiB GPU)

--backend

指定推理后端(vLLM/LiteLLM)

--backend vllm

高吞吐场景(如 API 服务)

2. 性能优化配置

  • vLLM 加速:与 vLLM 推理引擎深度兼容,通过--backend vllm启用,可提升吞吐 3-10 倍,适合高并发 API 服务:

    bash

    python3 -m fastchat.serve.model_worker --model-path lmsys/vicuna-7b-v1.5 --backend vllm
    
  • 长上下文适配:针对 LongChat、LLaMA 2-70B 等长上下文模型,需指定--context-length参数,例如:

    bash

    # 适配32k长上下文
    python3 -m fastchat.serve.model_worker --model-path lmsys/longchat-7b-32k-v1.5 --context-length 32768

四、模型调度策略:动态切换与负载均衡

FastChat 通过 Controller 实现多模型的 “统一入口、动态调度”,支持按请求指定模型、自动负载均衡及故障转移,核心策略如下:

1. 模型指定方式

  • 请求体指定:API 调用时通过model字段指定目标模型,与 OpenAI API 完全兼容:

    json

    // 调用ChatGLM3模型的API请求示例
    POST /v1/chat/completions
    {
      "model": "chatglm3",
      "messages": [{"role": "user", "content": "介绍FastChat"}],
      "stream": true
    }
    
  • 全局默认模型:启动 Worker 时通过--served-model-name指定对外暴露的模型名,适合固定模型服务:

    bash

    python3 -m fastchat.serve.model_worker --model-path chatglm3-6b --served-model-name my-chatglm3
    

2. 多模型负载均衡

当部署多个 Worker(对应不同模型或同模型多实例)时,Controller 自动实现:

  • 请求分发:按 Worker 负载(CPU/GPU 使用率)分配请求,避免单 Worker 过载;

  • 故障转移:若某 Worker 异常下线,Controller 自动将请求路由至其他可用 Worker;

  • 动态扩容:新增 Worker 时,无需重启 Controller,Worker 会自动注册至 Controller,支持运行时热扩容。

五、模型版本与生命周期管理

FastChat 支持模型的 “版本区分、热更新与资源释放”,满足企业级服务的迭代与维护需求:

1. 模型版本管理

  • 版本区分:通过模型路径或服务名区分版本,例如lmsys/vicuna-7b-v1.5lmsys/vicuna-7b-v1.1可同时部署,通过model字段指定调用;

  • 微调模型加载:加载微调后的模型时,仅需指定微调后的 HF 仓库或本地路径,例如:

    bash

    # 加载微调后的Vicuna模型
    python3 -m fastchat.serve.model_worker --model-path ./fine-tuned-vicuna-7b
    

2. 模型热更新与资源释放

  • 热更新:替换模型时,无需停止 Controller 与 API 服务,仅需重启对应 Worker:

    bash

    # 停止旧模型Worker(假设PID为1234)
    kill 1234
    # 启动新模型Worker
    python3 -m fastchat.serve.model_worker --model-path lmsys/vicuna-13b-v1.5
    
  • 资源释放:Worker 进程退出时自动释放 GPU/CPU 资源,也可通过--max-batch-size限制单 Worker 并发,避免资源占用过高。

六、个性化扩展:自定义模型适配

对于特殊格式或小众模型,FastChat 支持通过修改代码实现适配,核心步骤如下:

1. 新增模型对话模板

fastchat/conversation.py中添加模型专属对话格式(如自定义角色、多轮历史处理),示例:

python

# 为自定义模型添加对话模板
class MyModelConversation(Conversation):
    def __init__(self):
        super().__init__(
            name="my-model",
            system_template="你是专业的技术助手:{system_message}",
            roles=("用户", "助手"),
            sep="\n",
        )

2. 实现模型适配器

fastchat/model_adapter.py中补充模型加载逻辑(如 Tokenizer 特殊处理、模型权重映射):

python

# 自定义模型适配器
class MyModelAdapter(BaseModelAdapter):
    def match(self, model_path: str):
        return "my-model" in model_path  # 匹配模型路径
    
    def load_model(self, model_path: str, **kwargs):
        # 自定义模型加载逻辑(如加载特殊Tokenizer)
        tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
        model = AutoModelForCausalLM.from_pretrained(model_path, **kwargs)
        return model, tokenizer

3. 注册模型

fastchat/model_registry.py中注册模型信息,确保 Controller 能识别:

python

# 注册自定义模型
model_info = {
    "my-model-7b": {
        "name": "my-model-7b",
        "chat_template": "my-model",  # 关联对话模板
        "adapter": "MyModelAdapter",  # 关联适配器
    }
}

3、Python并行开发

3.1 多进程

多进程 - Python教程 - 廖雪峰的官方网站

要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识。

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/macOS:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

输出:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

multiprocessing

如果你打算编写多进程的服务程序,Unix/Linux无疑是正确的选择。由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?

由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。

multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

"""
Parent process 928.
Child process will start.
Run child process test (929)...
Process end.
"""

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

子进程 subprocess()

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:

import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

3.2 多线程

多任务可以由多进程完成,也可以由一个进程内的多线程完成。

我们前面提到了进程是由若干线程组成的,一个进程至少有一个线程。

由于线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Python也不例外,并且,Python的线程是真正的Posix Thread,而不是模拟出来的线程。

Python的标准库提供了两个模块:_threadthreading_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行:

import time, threading

# 新线程执行的代码:
def loop():
    print('thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1Thread-2……

Lock

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

创建一个锁就是通过threading.Lock()来实现:

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要获取锁:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要释放锁:
            lock.release()

当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。

获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try...finally来确保锁一定会被释放。

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

4、Python异步编程

异步IO - Python教程 - 廖雪峰的官方网站

我们知道,CPU的速度远远快于磁盘、网络等IO。在一个线程中,CPU执行代码的速度极快,然而,一旦遇到IO操作,如读写文件、发送网络数据时,就需要等待IO操作完成,才能继续进行下一步操作。这种情况称为同步IO。

在IO操作的过程中,当前线程被挂起,而其他需要CPU执行的代码就无法被当前线程执行了。

因为一个IO操作就阻塞了当前线程,导致其他代码无法执行,所以我们必须使用多线程或者多进程来并发执行代码,为多个用户服务。每个用户都会分配一个线程,如果遇到IO导致线程被挂起,其他用户的线程不受影响。

多线程和多进程的模型虽然解决了并发问题,但是系统不能无上限地增加线程。由于系统切换线程的开销也很大,所以,一旦线程数量过多,CPU的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。

由于我们要解决的问题是CPU高速执行能力和IO设备的龟速严重不匹配,多线程和多进程只是解决这一问题的一种方法。

另一种解决IO问题的方法是异步IO。当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就去执行其他代码了。一段时间后,当IO返回结果时,再通知CPU进行处理。

可以想象如果按普通顺序写出的代码实际上是没法完成异步IO的:

do_some_code()
f = open('/path/to/file', 'r')
r = f.read() # <== 线程停在此处等待IO操作结果
# IO操作完成后线程才能继续执行:
do_some_code(r)

所以,同步IO模型的代码是无法实现异步IO模型的。

异步IO模型需要一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程:

loop = get_event_loop()
while True:
    event = loop.get_event()
    process_event(event)

消息模型其实早在应用在桌面应用程序中了。一个GUI程序的主线程就负责不停地读取消息并处理消息。所有的键盘、鼠标等消息都被发送到GUI程序的消息队列中,然后由GUI程序的主线程处理。

由于GUI线程处理键盘、鼠标等消息的速度非常快,所以用户感觉不到延迟。某些时候,GUI线程在一个消息处理的过程中遇到问题导致一次消息处理时间过长,此时,用户会感觉到整个GUI程序停止响应了,敲键盘、点鼠标都没有反应。这种情况说明在消息模型中,处理一个消息必须非常迅速,否则,主线程将无法及时处理消息队列中的其他消息,导致程序看上去停止响应。

消息模型是如何解决同步IO必须等待IO操作这一问题的呢?当遇到IO操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。

在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。

协程 - Python教程 - 廖雪峰的官方网站

使用asyncio

使用asyncio - Python教程 - 廖雪峰的官方网站

asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。

asyncio的编程模型就是一个消息循环。asyncio模块内部实现了EventLoop,把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

asyncio提供的@asyncio.coroutine可以把一个generator标记为coroutine类型,然后在coroutine内部用yield from调用另一个coroutine实现异步操作。

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法asyncawait,可以让coroutine的代码更简洁易读。

asyncio实现Hello world代码如下:

import asyncio

async def hello():
    print("Hello world!")
    # 异步调用asyncio.sleep(1):
    await asyncio.sleep(1)
    print("Hello again!")

asyncio.run(hello())

async把一个函数变成coroutine类型,然后,我们就把这个async函数扔到asyncio.run()中执行。执行结果如下:

Hello!
(等待约1秒)
Hello again!

hello()会首先打印出Hello world!,然后,await语法可以让我们方便地调用另一个async函数。由于asyncio.sleep()也是一个async函数,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,就接着执行下一行语句。

asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的async函数了,因此可以实现并发执行。

上述hello()还没有看出并发执行的特点,我们改写一下,让两个hello()同时并发执行:

# 传入name参数:
async def hello(name):
    # 打印name和当前线程:
    print("Hello %s! (%s)" % (name, threading.current_thread))
    # 异步调用asyncio.sleep(1):
    await asyncio.sleep(1)
    print("Hello %s again! (%s)" % (name, threading.current_thread))
    return name

asyncio.gather()同时调度多个async函数:

async def main():
    L = await asyncio.gather(hello("Bob"), hello("Alice"))
    print(L)

asyncio.run(main())
Hello Bob! (<function current_thread at 0x10387d260>)
Hello Alice! (<function current_thread at 0x10387d260>)
(等待约1秒)
Hello Bob again! (<function current_thread at 0x10387d260>)
Hello Alice again! (<function current_thread at 0x10387d260>)
['Bob', 'Alice']

5、uvicorn服务器

(14 封私信 / 80 条消息) uvicorn,一个无敌的 Python 库! - 知乎

1

rag网站的开发基础
https://www.bytecanvas.top/archives/Slot0tiQ
作者
禧语许
发布于
更新于
许可