Gnosnay 的自留地

SQLAlchemy + MySQL:commit 到底提交了谁?一次把 connection / session / txn / savepoint 讲明白

最近在看一套测试基建代码,写法大概是:先从 async_engine.connect() 拿一条连接,外层 begin() 开一个大事务,再 begin_nested() 开 savepoint,然后把这条 connection 直接 bindAsyncSession。测试里允许随便 session.commit(),最后 teardown 用外层 trans.rollback() 一把清。

直觉上看起来很爽:你 commit 多少次都没关系,反正最后 rollback。但问题就在于:我一旦把这些概念和 SQLAlchemy 的抽象层叠在一起(Session、SessionTransaction、Transaction、Connection、DBAPIConnection),再加上 MySQL 协议「强顺序化」的前提,脑子就开始打结——尤其是当你看到 SessionTransaction.commit() 的实现:它明明在 trans.commit(),为什么我肉眼观察却像「没提交整个事务」?再进一步,如果连接池只有一个 connection,换成 bind=engine 又会怎样?join_transaction_mode 又是个什么玩意?

这篇就把我们对话里碰到的问题,按我理解的方式整理一遍。你可以把它当成一篇「工程师工作台笔记」,不保证覆盖所有边角,但保证你看完能解释:commit 提交了什么、为什么提交不等于提交外层事务、为什么 pool=1 会让其他 session 阻塞、以及 join_transaction_mode 设计到底在管什么

声明:本文偏工程视角,尽量可复现,但不同数据库、不同驱动、不同 SQLAlchemy 版本细节会有差异。请带着批判的眼光阅读。


0. 先立一个最重要的前提:MySQL 线协议是强顺序化的

MySQL 的 classic/native protocol 是严格的 request → response 序列:同一条连接上,你发出一个命令,就必须把它的响应读完,连接状态机才能继续下一条命令。这里的「强顺序化」是协议层面的,不是你愿不愿意的问题。

所以从工程结论出发:

  • 同一条 connection 在同一时刻只能有一个 in-flight 的请求
  • 你可以用 asyncio 并发,但并发单位是「多条连接」,不是「一条连接上多协程并发发包」
  • 真要共享同一条连接给多个协程,只能靠你自己做串行化(锁/队列),否则就是「包读写错乱」和「结果串台」的经典事故

这个前提后面会反复用到。


1. connection / session / transaction:先把词汇的层级摆好

我经常看到大家把这三个词混着用,尤其是把 SQLAlchemy 的「Session」当成 MySQL 的「Session」。事实是这样的:它们不是一回事。

1.1 MySQL 语境(数据库层)

  • Connection:一条 TCP/Unix socket 连接
  • Session:服务端针对这条连接维护的会话上下文(基本一一对应)
  • Transaction:挂在这个 session 上的「当前事务」(同一时刻只有一个;savepoint 是事务内部的检查点,不是第二个事务)

这里的核心是:事务是绑定在连接/会话上的。你只有一个连接,就不存在「同时跑两个独立事务」的概念。

1.2 SQLAlchemy 语境(框架层)

这里容易让人崩溃,因为同一个词被拆成了多层对象:

  • DBAPIConnection:驱动提供的真实连接(比如 aiomysql/pymysql/mysqlclient),负责协议读写
  • Connection(SQLAlchemy Core):对 DBAPIConnection 的代理/门面,附加了 pool、执行、事务管理等能力
  • Transaction(SQLAlchemy Core)conn.begin() / conn.begin_nested() 返回的事务对象,对应 DB 的 BEGIN / SAVEPOINT
  • Session(ORM):工作单元(Unit of Work)+ identity map,管理对象状态、flush 顺序、级联等
  • SessionTransaction(ORM):Session 内部用来表示「我现在处在哪个事务状态」的对象;它会驱动底层的 Connection/Transaction

一句话记忆法:

ORM 的 Session 不是数据库的 Session;它更像「对象状态管理器」,真正的事务在 Connection/Transaction 那层。


2. 为什么要有这么多「雷同」概念?这不是设计过度吗?

你觉得奇怪是正常的——因为看上去像是把「连接/事务」重复造了好几遍。但从动机上讲,这种分层解决了三个工程痛点:

  1. ORM 要解决的问题不是协议和连接复用 Session 需要的是对象一致性、flush、identity map;它不该自己实现连接池,更不该关心 MySQL 的 packet 头长什么样。

  2. Core 和 ORM 必须可独立使用 很多项目只用 Core 执行 SQL;也有很多项目 ORM + Core 混用。把 Connection/Transaction 抽出来,才能组合使用。

  3. 连接池需要「把 close 变成归还」 这也是很多人第一次踩坑的地方:Connection.close() 在 SQLAlchemy 里经常只代表「我不用了」,而不是「物理断开」。物理连接很贵,当然要复用。

所以你看到的这些对象,更多是在做「关注点分离」。麻烦是麻烦了点,但它确实支撑了复杂用法(测试、框架事务、混用、savepoint、pool reset)。


3. 回到最关键的问题:为什么有时候 session.commit() 没有提交整个 txn?

你给了 SessionTransaction.commit() 的核心代码,我把它翻译成一句人话:

session.commit() 会遍历 Session 记录的 _connections,对其中 should_commit=True 的那部分 trans 调用 trans.commit()

所以真正的问题不是「commit 会不会发起提交」,而是:

  • _connections 里装的 trans 是谁?(外层 Transaction 还是 NestedTransaction?)
  • 哪些 transshould_commit 是 True?(Session 是否拥有它的提交权)

这就解释了你测试代码里最反直觉的现象。


4. 你的测试 fixture:为什么 commit 只提交了内部 nested/savepoint?

你的结构(简化版)是这样的:

  1. connection = engine.connect()
  2. trans = connection.begin() —— 外层事务(OUTER)
  3. nested = connection.begin_nested() —— savepoint(NESTED)
  4. session = AsyncSession(bind=connection)

这里的关键事实只有一句:

外层事务 trans 不是 Session 开的,是你在 Session 之外开的。

于是 Session 面临一个「重入」问题:我绑到了一个已经在事务中的连接,我的 commit/rollback/close 该不该影响外层事务?

如果 Session 直接把外层事务 commit 掉,那外部控制事务边界的代码就崩了(比如测试 teardown 想 rollback,结果你提前 commit 了)。所以 SQLAlchemy 引入了一个「重入策略」参数:join_transaction_mode

当策略偏保守(如 rollback_only / create_savepoint)时,Session 的 commit() 往往只会作用在它「拥有」的那层事务边界——也就是 savepoint:

  • session.commit()RELEASE SAVEPOINT(提交 nested)
  • 外层 trans 仍然活着
  • teardown trans.rollback() 一把清掉全部更改

这就是你观察到「commit 了但没提交整个事务」的原因:你提交的是 内部边界,不是 外部大闸

从工程角度看,这不是 bug,而是测试隔离的经典套路:允许你在测试里随便 commit,但绝不允许落盘。


5. join_transaction_mode 是什么?它解决的不是并发,而是「事务所有权」

你问得很尖锐:既然 MySQL 一条连接同一时刻只能跑一个 command,那 Session 绑到一个已有事务的 conn,会不会打破这个假设?

不会。原因很简单:join_transaction_mode 处理的是「嵌套/重入的事务边界」,不是「并发复用同一连接」。

savepoint 不是并发,它仍然是串行命令:

BEGIN;
SAVEPOINT sp;
... DML ...
RELEASE SAVEPOINT sp;
... DML ...
COMMIT;

join_transaction_mode 只是在决定:当外层已经有事务时,Session 的 commit/rollback 应该:

  • 彻底接管外层(control_fully)
  • 只允许 rollback 传播,commit 不传播(rollback_only)
  • 永远创建 savepoint,让内层边界可提交、外层不受影响(create_savepoint)
  • 或默认折中(conditional_savepoint)

你可以把它类比成「可重入锁」的策略:同一条执行链里,外层已经拿了锁,内层再 with lock: 时,到底是递增计数,还是重新拿一把子锁,还是直接禁止释放外层锁。


6. 那如果我不 bind conn,改成 bind engine,但 pool=1 呢?

这也是个典型「看起来一样,其实完全不同」的坑。

6.1 bind=conn:你强制 Session 使用那条连接

外层事务、savepoint、事件监听器重建 savepoint,全都围绕同一条 connection 工作,语义非常可控。

6.2 bind=engine:Session 会自己从池里 checkout 连接

即使 pool_size=1,也只是说明「同时最多一个 Session 能拿到连接」,不代表你手里那条外部 connection.begin() 一定包住 Session 的行为。

结果就是:

  • Session A 拿到连接并开启事务(或者只是没释放连接)
  • Session B 想执行 SQL → 在 checkout 阶段阻塞
  • 阻塞点是「拿不到连接」,不是 join_transaction_mode

而且更重要的是:你原来那套「外层大事务兜底 rollback」的模式会被破坏。因为 session.commit() 在 engine-bind 的路径里更可能真的发出 COMMIT,把数据提交到数据库(除非你也把外层事务边界重新用 Session.begin()engine.begin() 组织起来)。

所以你对 pool=1 的理解可以这样修正:

pool=1 会让并发请求排队,但不会自动帮你实现「外层事务永远不提交」的测试隔离。


7. 一个最小例子:单 session vs 多 session 的行为差异

7.1 单个 Session,bind=conn(你现在的测试)

  • 外层 trans:不归 Session 管
  • Session commit:只提交 savepoint(可重建)
  • teardown:外层 rollback 清空一切

这非常适合测试:你想要的是「可提交的中间边界 + 最终强制回滚」。

7.2 多个 Session,bind=engine + pool=1

  • Session A checkout 连接,事务不结束 → Session B 必阻塞
  • 若 A 真的 commit,数据可能落盘(除非你另建外层事务兜底)
  • join_transaction_mode 基本不参与,因为 Session 没加入外部事务,它自己就是主人

结语:这套抽象不优雅,但很实用

我不得不吐槽,这些概念堆在一起,初看确实像「雷同抽象」。但把它看成「重入事务的策略管理 + 连接池资源管理 + ORM 工作单元管理」,很多怪现象就顺了:

  • commit 没提交外层事务:因为 Session 没有外层事务所有权
  • pool=1 导致其他 session block:因为连接资源只有一份
  • bind=conn 和 bind=engine 行为差很大:因为前者你掌控了连接和外层事务,后者你把权力交给了池和 Session

这篇先写到这里。如果你愿意把你当前 SQLAlchemy 版本、sessionmaker 的参数(尤其有没有显式设置 join_transaction_mode)、以及你测试里是否真的需要「允许 commit 但最终不落盘」的目标说清楚,我可以把你的 fixture 精简成一版「更不容易踩坑」的实现(包括:什么时候该用 create_savepoint、什么时候只需要 rollback_only、以及 async 下怎么避免同一连接被并发使用)。

References

Just in case 你们需要用, 这里是我的配置. 最终效果是:

  1. 测试启动时, 整个测试进程只有一条 DB API Connection, 所有测试用例共享这个连接
  2. 测试用例之间, 通过 savepoint 隔离
# pytest configuration
@pytest.fixture(scope="function", autouse=True) # 必须是 function scope, 否则无法隔离
@classmethod
async def async_db_session(cls) -> AsyncIterator[AsyncSession]:
   connection = await async_engine.connect()
   trans = await connection.begin()
   async_session = TestAsyncSession(bind=connection)
   nested = await connection.begin_nested()

   # 每次 transaction 结束时, 重建 savepoint
   # 如果没有这个listener, 每次 commit 都会提交外层事务
   @event.listens_for(async_session.sync_session, "after_transaction_end")
   def end_savepoint(session: Any, transaction: Any):  # type: ignore
      nonlocal nested

      if not nested.is_active and connection.sync_connection:
            nested = connection.sync_connection.begin_nested()

   yield async_session

   # 每次测试用例结束时, 回滚事务并关闭连接
   await trans.rollback()
   await async_session.close()
   await connection.close()

# fastAPI async db session dependency
async def _mock_db_session() -> AsyncIterator[AsyncSession]:
   try:
         # before start, we clear all the session cache
         # to avoid the cache data affect the test result
         async_db_session.expunge_all()
         async_db_session.expire_all()
         yield async_db_session
   finally:
         # after the test case finished, we clear all the session cache
         # to avoid the cache data affect the next test case
         async_db_session.expunge_all()
         async_db_session.expire_all()

# 你可以注入到 fast api 中
@pytest.fixture(scope="function", autouse=True)
@classmethod
async def test_client(
   cls,
   app: FastAPI,
   async_db_session: AsyncSession,
) -> AsyncIterator[AsyncClient]:
   async def _mock_db_session() -> AsyncIterator[AsyncSession]:
      try:
            # before start, we clear all the session cache
            # to avoid the cache data affect the test result
            async_db_session.expunge_all()
            async_db_session.expire_all()
            yield async_db_session
      finally:
            # after the test case finished, we clear all the session cache
            # to avoid the cache data affect the next test case
            async_db_session.expunge_all()
            async_db_session.expire_all()

   app.dependency_overrides[get_db_session] = _mock_db_session

   try:
      async with AsyncClient(
            transport=ASGITransport(app=app),  # type: ignore
            base_url="http://test",
      ) as client:
            yield client
   finally:
      app.dependency_overrides.clear()