SQLAlchemy + MySQL:commit 到底提交了谁?一次把 connection / session / txn / savepoint 讲明白
最近在看一套测试基建代码,写法大概是:先从 async_engine.connect() 拿一条连接,外层 begin() 开一个大事务,再 begin_nested() 开 savepoint,然后把这条 connection 直接 bind 给 AsyncSession。测试里允许随便 session.commit(),最后 teardown 用外层 trans.rollback() 一把清。
直觉上看起来很爽:你 commit 多少次都没关系,反正最后 rollback。但问题就在于:我一旦把这些概念和 SQLAlchemy 的抽象层叠在一起(Session、SessionTransaction、Transaction、Connection、DBAPIConnection),再加上 MySQL 协议「强顺序化」的前提,脑子就开始打结——尤其是当你看到 SessionTransaction.commit() 的实现:它明明在 trans.commit(),为什么我肉眼观察却像「没提交整个事务」?再进一步,如果连接池只有一个 connection,换成 bind=engine 又会怎样?join_transaction_mode 又是个什么玩意?
这篇就把我们对话里碰到的问题,按我理解的方式整理一遍。你可以把它当成一篇「工程师工作台笔记」,不保证覆盖所有边角,但保证你看完能解释:commit 提交了什么、为什么提交不等于提交外层事务、为什么 pool=1 会让其他 session 阻塞、以及 join_transaction_mode 设计到底在管什么。
声明:本文偏工程视角,尽量可复现,但不同数据库、不同驱动、不同 SQLAlchemy 版本细节会有差异。请带着批判的眼光阅读。
0. 先立一个最重要的前提:MySQL 线协议是强顺序化的
MySQL 的 classic/native protocol 是严格的 request → response 序列:同一条连接上,你发出一个命令,就必须把它的响应读完,连接状态机才能继续下一条命令。这里的「强顺序化」是协议层面的,不是你愿不愿意的问题。
所以从工程结论出发:
- 同一条 connection 在同一时刻只能有一个 in-flight 的请求
- 你可以用 asyncio 并发,但并发单位是「多条连接」,不是「一条连接上多协程并发发包」
- 真要共享同一条连接给多个协程,只能靠你自己做串行化(锁/队列),否则就是「包读写错乱」和「结果串台」的经典事故
这个前提后面会反复用到。
1. connection / session / transaction:先把词汇的层级摆好
我经常看到大家把这三个词混着用,尤其是把 SQLAlchemy 的「Session」当成 MySQL 的「Session」。事实是这样的:它们不是一回事。
1.1 MySQL 语境(数据库层)
- Connection:一条 TCP/Unix socket 连接
- Session:服务端针对这条连接维护的会话上下文(基本一一对应)
- Transaction:挂在这个 session 上的「当前事务」(同一时刻只有一个;savepoint 是事务内部的检查点,不是第二个事务)
这里的核心是:事务是绑定在连接/会话上的。你只有一个连接,就不存在「同时跑两个独立事务」的概念。
1.2 SQLAlchemy 语境(框架层)
这里容易让人崩溃,因为同一个词被拆成了多层对象:
- DBAPIConnection:驱动提供的真实连接(比如 aiomysql/pymysql/mysqlclient),负责协议读写
- Connection(SQLAlchemy Core):对 DBAPIConnection 的代理/门面,附加了 pool、执行、事务管理等能力
- Transaction(SQLAlchemy Core):
conn.begin()/conn.begin_nested()返回的事务对象,对应 DB 的 BEGIN / SAVEPOINT - Session(ORM):工作单元(Unit of Work)+ identity map,管理对象状态、flush 顺序、级联等
- SessionTransaction(ORM):Session 内部用来表示「我现在处在哪个事务状态」的对象;它会驱动底层的 Connection/Transaction
一句话记忆法:
ORM 的 Session 不是数据库的 Session;它更像「对象状态管理器」,真正的事务在 Connection/Transaction 那层。
2. 为什么要有这么多「雷同」概念?这不是设计过度吗?
你觉得奇怪是正常的——因为看上去像是把「连接/事务」重复造了好几遍。但从动机上讲,这种分层解决了三个工程痛点:
ORM 要解决的问题不是协议和连接复用 Session 需要的是对象一致性、flush、identity map;它不该自己实现连接池,更不该关心 MySQL 的 packet 头长什么样。
Core 和 ORM 必须可独立使用 很多项目只用 Core 执行 SQL;也有很多项目 ORM + Core 混用。把 Connection/Transaction 抽出来,才能组合使用。
连接池需要「把 close 变成归还」 这也是很多人第一次踩坑的地方:
Connection.close()在 SQLAlchemy 里经常只代表「我不用了」,而不是「物理断开」。物理连接很贵,当然要复用。
所以你看到的这些对象,更多是在做「关注点分离」。麻烦是麻烦了点,但它确实支撑了复杂用法(测试、框架事务、混用、savepoint、pool reset)。
3. 回到最关键的问题:为什么有时候 session.commit() 没有提交整个 txn?
你给了 SessionTransaction.commit() 的核心代码,我把它翻译成一句人话:
session.commit()会遍历 Session 记录的_connections,对其中should_commit=True的那部分trans调用trans.commit()。
所以真正的问题不是「commit 会不会发起提交」,而是:
_connections里装的trans是谁?(外层 Transaction 还是 NestedTransaction?)- 哪些
trans的should_commit是 True?(Session 是否拥有它的提交权)
这就解释了你测试代码里最反直觉的现象。
4. 你的测试 fixture:为什么 commit 只提交了内部 nested/savepoint?
你的结构(简化版)是这样的:
connection = engine.connect()trans = connection.begin()—— 外层事务(OUTER)nested = connection.begin_nested()—— savepoint(NESTED)session = AsyncSession(bind=connection)
这里的关键事实只有一句:
外层事务
trans不是 Session 开的,是你在 Session 之外开的。
于是 Session 面临一个「重入」问题:我绑到了一个已经在事务中的连接,我的 commit/rollback/close 该不该影响外层事务?
如果 Session 直接把外层事务 commit 掉,那外部控制事务边界的代码就崩了(比如测试 teardown 想 rollback,结果你提前 commit 了)。所以 SQLAlchemy 引入了一个「重入策略」参数:join_transaction_mode。
当策略偏保守(如 rollback_only / create_savepoint)时,Session 的 commit() 往往只会作用在它「拥有」的那层事务边界——也就是 savepoint:
session.commit()→RELEASE SAVEPOINT(提交 nested)- 外层
trans仍然活着 - teardown
trans.rollback()一把清掉全部更改
这就是你观察到「commit 了但没提交整个事务」的原因:你提交的是 内部边界,不是 外部大闸。
从工程角度看,这不是 bug,而是测试隔离的经典套路:允许你在测试里随便 commit,但绝不允许落盘。
5. join_transaction_mode 是什么?它解决的不是并发,而是「事务所有权」
你问得很尖锐:既然 MySQL 一条连接同一时刻只能跑一个 command,那 Session 绑到一个已有事务的 conn,会不会打破这个假设?
不会。原因很简单:join_transaction_mode 处理的是「嵌套/重入的事务边界」,不是「并发复用同一连接」。
savepoint 不是并发,它仍然是串行命令:
BEGIN;
SAVEPOINT sp;
... DML ...
RELEASE SAVEPOINT sp;
... DML ...
COMMIT;
join_transaction_mode 只是在决定:当外层已经有事务时,Session 的 commit/rollback 应该:
- 彻底接管外层(control_fully)
- 只允许 rollback 传播,commit 不传播(rollback_only)
- 永远创建 savepoint,让内层边界可提交、外层不受影响(create_savepoint)
- 或默认折中(conditional_savepoint)
你可以把它类比成「可重入锁」的策略:同一条执行链里,外层已经拿了锁,内层再 with lock: 时,到底是递增计数,还是重新拿一把子锁,还是直接禁止释放外层锁。
6. 那如果我不 bind conn,改成 bind engine,但 pool=1 呢?
这也是个典型「看起来一样,其实完全不同」的坑。
6.1 bind=conn:你强制 Session 使用那条连接
外层事务、savepoint、事件监听器重建 savepoint,全都围绕同一条 connection 工作,语义非常可控。
6.2 bind=engine:Session 会自己从池里 checkout 连接
即使 pool_size=1,也只是说明「同时最多一个 Session 能拿到连接」,不代表你手里那条外部 connection.begin() 一定包住 Session 的行为。
结果就是:
- Session A 拿到连接并开启事务(或者只是没释放连接)
- Session B 想执行 SQL → 在 checkout 阶段阻塞
- 阻塞点是「拿不到连接」,不是 join_transaction_mode
而且更重要的是:你原来那套「外层大事务兜底 rollback」的模式会被破坏。因为 session.commit() 在 engine-bind 的路径里更可能真的发出 COMMIT,把数据提交到数据库(除非你也把外层事务边界重新用 Session.begin() 或 engine.begin() 组织起来)。
所以你对 pool=1 的理解可以这样修正:
pool=1 会让并发请求排队,但不会自动帮你实现「外层事务永远不提交」的测试隔离。
7. 一个最小例子:单 session vs 多 session 的行为差异
7.1 单个 Session,bind=conn(你现在的测试)
- 外层
trans:不归 Session 管 - Session commit:只提交 savepoint(可重建)
- teardown:外层 rollback 清空一切
这非常适合测试:你想要的是「可提交的中间边界 + 最终强制回滚」。
7.2 多个 Session,bind=engine + pool=1
- Session A checkout 连接,事务不结束 → Session B 必阻塞
- 若 A 真的 commit,数据可能落盘(除非你另建外层事务兜底)
- join_transaction_mode 基本不参与,因为 Session 没加入外部事务,它自己就是主人
结语:这套抽象不优雅,但很实用
我不得不吐槽,这些概念堆在一起,初看确实像「雷同抽象」。但把它看成「重入事务的策略管理 + 连接池资源管理 + ORM 工作单元管理」,很多怪现象就顺了:
- commit 没提交外层事务:因为 Session 没有外层事务所有权
- pool=1 导致其他 session block:因为连接资源只有一份
- bind=conn 和 bind=engine 行为差很大:因为前者你掌控了连接和外层事务,后者你把权力交给了池和 Session
这篇先写到这里。如果你愿意把你当前 SQLAlchemy 版本、sessionmaker 的参数(尤其有没有显式设置 join_transaction_mode)、以及你测试里是否真的需要「允许 commit 但最终不落盘」的目标说清楚,我可以把你的 fixture 精简成一版「更不容易踩坑」的实现(包括:什么时候该用 create_savepoint、什么时候只需要 rollback_only、以及 async 下怎么避免同一连接被并发使用)。
References
Just in case 你们需要用, 这里是我的配置. 最终效果是:
- 测试启动时, 整个测试进程只有一条 DB API Connection, 所有测试用例共享这个连接
- 测试用例之间, 通过 savepoint 隔离
# pytest configuration
@pytest.fixture(scope="function", autouse=True) # 必须是 function scope, 否则无法隔离
@classmethod
async def async_db_session(cls) -> AsyncIterator[AsyncSession]:
connection = await async_engine.connect()
trans = await connection.begin()
async_session = TestAsyncSession(bind=connection)
nested = await connection.begin_nested()
# 每次 transaction 结束时, 重建 savepoint
# 如果没有这个listener, 每次 commit 都会提交外层事务
@event.listens_for(async_session.sync_session, "after_transaction_end")
def end_savepoint(session: Any, transaction: Any): # type: ignore
nonlocal nested
if not nested.is_active and connection.sync_connection:
nested = connection.sync_connection.begin_nested()
yield async_session
# 每次测试用例结束时, 回滚事务并关闭连接
await trans.rollback()
await async_session.close()
await connection.close()
# fastAPI async db session dependency
async def _mock_db_session() -> AsyncIterator[AsyncSession]:
try:
# before start, we clear all the session cache
# to avoid the cache data affect the test result
async_db_session.expunge_all()
async_db_session.expire_all()
yield async_db_session
finally:
# after the test case finished, we clear all the session cache
# to avoid the cache data affect the next test case
async_db_session.expunge_all()
async_db_session.expire_all()
# 你可以注入到 fast api 中
@pytest.fixture(scope="function", autouse=True)
@classmethod
async def test_client(
cls,
app: FastAPI,
async_db_session: AsyncSession,
) -> AsyncIterator[AsyncClient]:
async def _mock_db_session() -> AsyncIterator[AsyncSession]:
try:
# before start, we clear all the session cache
# to avoid the cache data affect the test result
async_db_session.expunge_all()
async_db_session.expire_all()
yield async_db_session
finally:
# after the test case finished, we clear all the session cache
# to avoid the cache data affect the next test case
async_db_session.expunge_all()
async_db_session.expire_all()
app.dependency_overrides[get_db_session] = _mock_db_session
try:
async with AsyncClient(
transport=ASGITransport(app=app), # type: ignore
base_url="http://test",
) as client:
yield client
finally:
app.dependency_overrides.clear()