跳转至

SQL 关系型数据库

FastAPI 不强制使用某个 SQL 关系型数据库。

但支持多种关系型数据库。

本章介绍如何使用 SQLAlchemy 控制关系型数据库。

SQLAlchemy 可以让 FastAPI 控制以下数据库:

  • PostgreSQL
  • MySQL
  • SQLite
  • Oracle
  • Microsoft SQL Server 等

本章示例使用 SQLite,因为它使用单个文件,而且 Python 直接集成了 SQLite。因此,可以复制这个示例,并按原样运行。

生产环境下,推荐使用 PostgreSQL 等数据库服务器。

提示

FastAPI 官方基于 Docker 容器,提供了使用 FastAPIPostgreSQL 及前端工具的模板项目,详见:https://github.com/tiangolo/full-stack-fastapi-postgresql

笔记

注意,本章中大多数 SQLAlchemy 标准代码可以在任何框架中使用。

专用于 FastAPI 的代码很少。

ORMs

FastAPI 使用各种数据库和各种风格的支持库与数据库进行对话。

常用的模式是ORM:即 Object-Relational Mapping(对象关系映射)支持库。

ORM 是转换(映射)代码中的对象和数据库表(关系)的工具。

使用 ORM,通常要创建表示 SQL 数据库表的类,类的属性表示列,包含列名和数据类型。

例如,类 Pet 表示 SQL 表 pets

每个类实例对象表示数据表中的一行数据。

例如,对象 orion_catPet 的实例)可以包含属性 orion_cat.type ,表示列 type,该属性的值可以是 "cat"

ORM 还提供了创建连接表与实体之间关系的工具。

这样,orion_cat 对象中就可以包含属性 orion_cat.owner ,属性 owner 表示宠物主人的数据,取自表 owners

因此,orion_cat.owner.name 是宠物主人的名字(来自 owners 表中的 name 列)。

它的值是 "Arquilian"

从宠物对象访问对应的表 owners 时,ORM 会完成所有获取信息的工作。

常用的 ORM 包括:Django-ORM(Django 组件)、SQLAlchemy ORM(SQLAlchemy 组件,独立的支持库)和 Peewee(独立的支持库)等。

本章介绍如何使用 SQLAlchemy ORM

其他 ORM 的使用方式与此类似。

提示

高级用户指南中有一章专门介绍如何使用 Peewee。

文件架构

以下图为例,假设项目的根目录为 my_super_project,其中包含 sql_app 等子文件夹,架构如下:

.
└── sql_app
    ├── __init__.py
    ├── crud.py
    ├── database.py
    ├── main.py
    ├── models.py
    └── schemas.py

__init__.py 是空文件,用于让 Python 识别出 sql_app 与文件夹内的所有模块(Python 文件)是包。

接下来,我们学习每一个文件/模块。

安装 SQLAlchemy

首先,需要安装 SQLAlchemy

$ pip install sqlalchemy

---> 100%

创建 SQLAlchemy 组件

使用的文件是 sql_app/database.py

导入 SQLAlchemy 组件

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

为 SQLAlchemy 创建数据库 URL

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

本例中,连接的是 SQLite 数据库(打开 SQLite 数据库文件)。

该数据库存储在与其他文件同一文件夹的 sql_app.db里。

这就是为什么 URL 以 ./sql_app.db 结尾。

使用 PostgreSQL 数据库时,只需取消下面这行代码的注释:

SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

……并根据数据库的数据和证书进行调整( 也适用于 MySQL、MariaDB 等数据库)。

提示

使用其他数据库时必须修改这行代码。

创建 SQLAlchemy 的 engine

首先,创建 SQLAlchemy 的引擎

稍后需要使用这个 engine

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

注意

参数:

connect_args={"check_same_thread": False}

……只用于 SQLite,其他数据库不需要。

技术细节

默认情况下,SQLite 只能与一个线程通信,即每个线程只处理一个独立的请求。

这是为了防止意外为不同操作(针对不同的请求)共享相同的连接。

但在 FastAPI 中,使用普通函数(def)可以有多个线程为了相同请求与数据库交互,所以要使用 connect_args={"check_same_thread": False} 让 SQLite 支持多线程。

此外,要确保每个请求在依赖项中获得自己的数据库连接会话,因此,不需要使用默认机制。

创建 SessionLocal

SessionLocal 的类实例是数据库会话,但它本身不是数据库会话。

一旦创建了 SessionLocal 的类实例,该实例就是实际的数据库会话。

命名为 SessionLocal 是为了区别从 SQLAlchemy 导入的 Session

稍后,再使用(从 SQLAlchemy 导入的) Session

使用函数 sessionmaker 创建 SessionLocal 类:

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

创建 Base

接下来,使用返回类的函数 declarative_base()

创建继承自该类的数据库模型或类(ORM模型):

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

创建数据库模型

这里用的是 sql_app/models.py

Base 类创建 SQLAlchemy 模型

使用 Base 类创建 SQLAlchemy 模型。

提示

SQLAlchemy 使用术语模型表示与数据库交互的类和实例。

但是 Pydantic 也使用术语模型表示概图,即数据验证、转换以及文档类和实例。

databasedatabase.py) 导入 Base

创建从它继承的类。

这些类就是 SQLAlchemy 模型。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

__tablename__ 属性是 SQLAlchemy 在数据库中模型的表名。

创建模型属性/列

创建模型的(类)属性。

每个属性都表示对应数据库表中的一列。

使用 SQLAlchemy 中的 Column 作为默认值。

并且,以 IntegerStringBoolean 等参数传递 SQLAlchemy 的类 type,定义数据库中的数据类型。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

创建关系

接下来,创建关系。

为此,要使用 SQLAlchemy ORM 的 relationship

这是一个很魔性的属性,能够包含相关表的值。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

访问 Useritems 属性时,比如 my_user.items ,它会包含 SQLAlchemy 模型 Item 的列表(来自 items 表),这些模型中包含指向 users 表中这条记录的外键。

访问 my_user.items, SQLAlchemy 实际上会从数据库 items 表中提取数据,并将它们填充到这里。

访问 Item 中的 owner 属性时,包含 users 表中的 SQLAlchemy 模型 User 。使用 owner_id 属性/列作为外键,以便从 users 表提取数据。

创建 Pydantic 模型

接下来是 sql_app/schemas.py

提示

为了避免 SQLAlchemy 模型和 Pydantic 模型 之间的混淆,在此,使用 models.py 表示 SQLAlchemy 模型,使用 schemas.py 表示 Pydantic 模型。

Pydantic 模型至少要定义一个概图(有效的数据形状)。

这样做,可以避免混淆这两种方法。

创建初始 Pydantic 模型/概图

创建两个 Pydantic 模型概图), ItemBaseUserBase ,并定义创建或读取数据时的共用属性。

然后,创建 ItemCreateUserCreate ,从两个基类继承(这样两个 Create 类就拥有相同的属性),再加上创建对象所需的其他数据(属性)。

因此,在创建用户时也会包含密码

但为了安全起见,password 不能在其他 Pydantic 概图中,例如,读取用户时,不要让 API 发送密码。

from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True

SQLAlchemy 风格和 Pydantic 风格

注意,SQLAlchemy 模型使用 = 定义属性,并把类型作为参数传递给 Column ,如:

name = Column(String)

而 Pydantic 模型 使用 : 声明类型,即使用类型注释语法/类型提示:

name: str

记住这一点,这样在把 =: 和它们一起使用时就不会混淆了。

为读取/返回数据创建 Pydantic 模型/概图

现在,创建 Pydantic 模型(概图),用于在读取数据和从 API 返回数据。

例如,创建 item 前,不知道分配给它的 ID 是什么,但读取 item 时(从 API 返回 item 时)就已经知道了 item 的 ID。

同样,在读取用户时,可以断言 items 要包含属于该用户的 items。

不仅是这些 items 的 ID,还有在 Pydantic 模型中定义的用于读取 items 的所有数据:Item

from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True

提示

注意 User,读取用户(从 API 返回用户)的 Pydantic 概图不包含密码。

使用 Pydantic 的 orm_mode

现在,在读取 Pydantic 概图ItemUser 中,添加 Config 内部类。

Config 类用于配置 Pydantic。

Config 类中, 设置属性 orm_mode = True

from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True

提示

注意 orm_mode= 赋值,例如:

orm_mode = True

与类型声明使用 : 不同 。

这里是设置配置值,不是声明类型。

Pydantic 的 orm_mode 告诉 Pydantic 概图读取数据,即使它不是字典 ,而是 ORM 模型(或其他具有属性的对象)。

这样,就不再只是从 dict 中获取 id 值,如下所示:

id = data["id"]

还会从属性中获取 id 值,如下所示:

id = data.id

通过这种方式,Pydantic 模型与 ORM 兼容,只需在路径操作中的 response_model 参数中进行声明就可以了。

这样,就可以返回数据库模型,并从中读取数据。

ORM 模式的技术细节

SQLAlchemy 和其他许多工具默认是懒加载的。

即,不从数据库中获取关系数据,直到要访问包含该数据的属性。

例如,访问属性 items

current_user.items

将使 SQLAlchemy 访问 items 表并为这个用户获取 items,但不是在此之前。

没有 orm_mode ,从路径操作返回的 SQLAlchemy 模型不包含关系数据。

即使 Pydantic 模型中声明了这些关系。

但在 ORM 模式下,由于 Pydantic 要从属性(而不是假设的 dict)中访问所需数据,可以声明要返回的特定数据,Pydantic 甚至可以通过 ORM 中获取数据。

CRUD 工具

接下来是 sql_app/crud.py

在这个文件中,使用可复用的函数与数据库中的数据交互。

CRUDCreate、Read、Update、Delete。

……本例只涉及创建(create)和读取(read)数据。

读取数据

sqlalchemy.orm 中导入 Session ,声明 db 参数的类型,并在函数中支持类型检查和自动补全。

导入 models(SQLAlchemy 模型)和 schemas(Pydantic 模型/概图)。

创建实现以下功能的工具函数:

  • 通过 ID 和电子邮件读取单个用户。
  • 读取多个用户。
  • 读取多个条目。
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

提示

创建独立于路径操作函数,只用于与数据库交互(获取用户或项目)的函数,方便在其他部分复用这些函数,以及进行单元测试

创建数据

接下来,创建工具函数来创建数据。

步骤是:

  • 使用数据创建 SQLAlchemy 模型实例
  • add 实例对象到数据库会话。
  • commit 更改到数据库(保存数据)。
  • refresh 实例(包含来自数据库的任何新数据,如,生成的 ID)。
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

提示

SQLAlchemy 的模型 User 中包含 hashed_password ,因此应该包含哈希密码。

但是由于 API 客户端提供的是原始密码,所以需要先使用原始密码生成哈希密码。

然后,把值传递给 hashed_password 参数,并保存。

警告

此示例不安全,密码并没有哈希。

在实际应用中,要对密码进行哈希,不要保存明文密码。

更多细节,请参阅本教程中的安全相关的内容。

这里关注的只是数据库的工具和机制。

提示

不必从 Pydantic 模型中读取每个关键字参数,然后将它们传递给 Item ,而是使用以下方法生成带有 Pydantic 模型的数据的 dict

item.dict()

然后,以关键字参数形式把 dict 的键值对传递给 SQLAlchemy 的 Item

Item(**item.dict())

然后,传递 Pydantic 模型不提供的额外关键字参数 owner_id :

Item(**item.dict(), owner_id=user_id)

FastAPI 的主应用

sql_app/main.py 中,集成和使用之前创建的组件。

创建数据库表

创建数据库表非常简单:

from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

Alembic 注意事项

一般情况下,使用 Alembic 初始化数据库(创建表等)。

还可以使用 Alembic 迁移数据库(Alembic 的主要功能)。

迁移是在更改 SQLAlchemy 模型的结构、添加属性后,在数据库中复制更改与添加新列、新表时所需的一组步骤。

Alembic 的示例在 FastAPI 的模板项目里。Project Generation - Template. 特别是在源代码的 alembic 目录中

创建依赖项

此处,使用 sql_app/databases.py 中的 SessionLocal 类来创建依赖项。

要为每个请求提供独立的数据库会话/连接(SessionLocal),在所有请求中使用相同的会话,并在请求完成后关闭。

然后,为下一个请求创建新会话。

为此,要创建新的 yield 生成器依赖项,详见yield 生成器依赖项一章。

依赖项会创建新的 SQLAlchemy SessionLocal ,用于单个请求,并在请求完成后关闭。

from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

说明

创建 SessionLocal(),并把处理请求放入 try 代码块。

然后,在 finally 代码块中关闭。

这样即可确保就算在处理请求时出现异常,数据库会话也会在请求之后关闭。。

但不能从退出代码中触发另一个异常。详见yield 生成器依赖项与 HTTPException

然后,在路径操作函数中使用依赖项时,使用直接从 SQLAlchemy 导入的类型 Session 声明。

这将在路径操作函数中提供更好的编辑器支持,因为编辑器知道 db 参数的类型是 Session :

from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

技术细节

参数 db 实际上是 SessionLocal 类型, 但是这个类(使用 sessionmaker()创建)是 SQLAlchemy Session代理,因此, 编辑器实际上并不知道提供了什么方法。

但是通过将类型声明为 Session, 编辑器就可以知道可用的方法(.add().query().commit(), 等),并且可以提供自动补全等支持。类型声明不会影响实际对象。

创建 FastAPI 路径操作

最后要说的是,这里是标准的 FastAPI 路径操作 代码。

from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

在每个请求之前通过生成器依赖项创建数据库会话,在之后关闭。

然后,可以在路径操作函数中创建所需的依赖项,以直接获得该会话。

这样,就可以在路径操作函数内部直接调用 crud.get_user,并使用该会话。

提示

注意,返回的值是 SQLAlchemy 模型或 SQLAlchemy 模型列表。

但是,由于所有 路径操作 都有带有使用 orm_mode 的 Pydantic模型/概图的 response_model,Pydantic 模型中声明的数据将从它们中提取出来,并返回给客户端,同时进行所有正常的过滤和验证。

提示

注意,有些 response_models 具有标准的 Python 类型,如 List[schemas.Item]

但是由于该 List 的内容/参数是带有 orm_mode 的 Pydantic模型,数据将像往常一样被检索并返回给客户端,没有问题。

defasync def 简介

在这里,在路径操作函数内部和依赖项中使用 SQLAlchemy 代码,反过来,它将与外部数据库通信。

这可能需要等待一会儿。

但由于 SQLAlchemy 不兼容直接使用 await,如下的代码:

user = await db.query(User).first()

……反之,要使用:

user = db.query(User).first()

然后,声明路径操作函数和依赖没有使用 async def,只是普通的 def,如:

@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    ...

说明

如需异步连接关系型数据库,请参阅异步 SQL 关系型数据库

技术细节

如果您很好奇,并且技术高超,可以阅读本文,了解 async defdef 如何处理技术细节Async 文档。

迁移

因为直接使用 SQLAlchemy,不需要任何其他插件使它与 FastAPI 一起工作,所以可以直接使用 Alembic 实现数据库迁移

由于与 SQLAlchemy 和 SQLAlchemy 模型相关的代码存在于独立的文件中,甚至可以使用 Alembic 执行迁移,而无需安装 FastAPI、Pydantic 等支持库。

同样,也可以在与 FastAPI 无关的代码的其他部分中使用相同的 SQLAlchemy 模型和实用程序。

例如,在后台的 task worker CeleryRQARQ.

回顾所有文件

注意,项目文件夹应为 my_super_project,该文件夹包含 sql_app 子文件夹。

sql_app 应包含以下文件:

  • sql_app/__init__.py:空文件

  • sql_app/database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()
  • sql_app/models.py:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")
  • sql_app/schemas.py:
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
  • sql_app/crud.py:
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item
  • sql_app/main.py:
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

查看文档

复制此代码并按原样使用。

说明

实际上,这里显示的代码是测试的一部分。和这些文档中的大部分代码一样

然后,用 uvicorn 运行:

$ uvicorn sql_app.main:app --reload

<span style="color: green;">INFO</span>:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

打开浏览器,访问 http://127.0.0.1:8000/docs。

在这里可以与 FastAPI 应用交互,从数据库中读取数据:

与数据库直接交互

如果想在 FastAPI 之外,直接处理 SQLite 数据库文件,调试内容、添加表、列、记录、修改数据等,可以使用 DB Browser for SQLite

界面如下:

也可以使用SQLite 在线浏览器 SQLite ViewerExtendsClass

使用中间件的备选 DB 会话

如果不使用生成器依赖项 -- 例如,如果不使用 Python 3.7,也不能为 Python 3.6 安装 backports -- 可以用类似的方式在中间件中设置会话。

中间件是为每个请求执行的函数,有些代码在端点函数之前执行,有些代码在端点函数之后执行。

创建中间件

中间件(只是函数)将为每个请求创建新的 SQLAlchemy SessionLocal ,将其添加到请求中,然后在请求完成后关闭。

from typing import List

from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response


# Dependency
def get_db(request: Request):
    return request.state.db


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response


# Dependency
def get_db(request: Request):
    return request.state.db


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

说明

try 代码块中,创建 SessionLocal() 及处理请求。

然后,在 finally 代码块中关闭 SessionLocal()

这样,就可以确保数据库会话始终在请求之后关闭,即使在处理请求时出现异常。

关于 request.state

request.state 是每个 Request 对象的属性。它用于存储附加到请求本身的任意对象,如本例中的数据库会话。更多详情,请参阅 Starlette 官档 - Request 状态

本例中,它用于确保在所有请求中使用单个数据库会话,然后,在中间件里关闭。

生成器依赖项或中间件

在此,添加中间件与生成器依赖项类似,但也有一些区别:

  • 它需要更多的代码,有点复杂
  • 中间件必须是异步函数
    • 如果有代码在它必须等待网络,它可能将应用阻塞在那里,降低一点性能
    • 尽管在这里 SQLAlchemy 的工作方式可能不是很成问题
    • 但如果添加更多的代码到中间件有大量的 I/O 等待,就可能会有问题。
  • 中间件为每个请求运行。
    • 因此,将为每个请求创建连接。
    • 即使处理该请求的路径操作不需要数据库。

提示

当生成器依赖项对于用例来说已经足够时,使用 yield 可能会更好。

说明

FastAPI 新近添加了生成器依赖项。

本教程的上个版本只有中间件示例,可能有几个应用使用中间件进行数据库会话管理。