admin管理员组

文章数量:1289539

This question is inspired by the articles and /.

SQLALchemy session lifecycle management in the context of the FastAPI framework is driving me crazy. At the moment we are writing repositories as they do in various tutorials on the Internet, as well as services, views. It looks like this.

Base repositories:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker


engine = create_async_engine(settings.DB.async_dns, **settings.DB.OPTIONS)
Session = async_sessionmaker(bind=engine, expire_on_commit=False)


# dependency that manages commits
async def get_session():
    session = Session()
    try:
        yield session
        await sessionmit()
    except Exception as e:
        await session.rollback()
        raise e
    finally:
        await session.close()


class BaseRepository:
    def __init__(self, session: AsyncSession = Depends(get_session)):
        self._session = session

    async def count(self, stmt: Select) -> int:
        stmt = stmt.with_only_columns(func.count(literal_column("1")), maintain_column_froms=True)
        return await self._session.scalar(stmt)

    async def update_instance(self, instance, update_data: dict):
        for field, value in update_data.items():
            setattr(instance, field, value)
        await self._session.flush()
        return instance

    async def scalar_or_raise(self, stmt, error: str = None):
        if result := await self._session.scalar(stmt):
            return result
        if error:
            raise NotFoundError(error)
        raise NotFoundError

    # another methods


class ModelRepository(BaseRepository):
    model = None

    def __init__(self, session: AsyncSession = Depends(get_session)):
        if not self.model:
            raise ValueError()
        super().__init__(session)

    async def get_by_pk(self, pk):
        if instance := await self._session.get(self.model, pk):
            return instance
        raise NotFoundError

    async def create(self, **kwargs):
        # commits are performed by the calling code
        instance = self.model(**kwargs)
        self._session.add(instance)
        return instance

    async def bulk_create(self, values: list[dict]):
        # commits are performed by the calling code
        instances = [self.model(**item) for item in values]
        self._session.add_all(instances)
        return instances

    # another methods

Let there be SQLAlchemy models:

class UserType(Base):
    # catalog model
    __tablename__ = "user_types"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    code: Mapped[str] = mapped_column(unique=True)  # f.e. student, teacher, cleaner
    name: Mapped[str]  # f.e. Student, Teacher, Cleaner


class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    first_name: Mapped[str]
    last_name: Mapped[str]
    email: Mapped[str] = mapped_column(unique=True)
    type_id: Mapped[int] = mapped_column(ForeignKey("user_types.id", ondelete="cascade"))

Repositories for models:

from sqlalchemy import select


class UsersRepository(ModelRepository):
    model = User


class UserTypesRepository(ModelRepository):
    model = UserType

    # for catalog models I like doing methods like this:

    async def get_STUDENT_type(self):
        return await self.get_by_code("student")

    async def get_TEACHER_type(self):
        return await self.get_by_code("teacher")

    async def get_CLEANER_type(self):
        return await self.get_by_code("cleaner")

    async def get_by_code(self, code):
        stmt = select(self.model).where(self.model.code == code)
        return await self.scalar_or_raise(stmt, "User type not found")

For each endpoint, a service and repositories are created:


# create_user.py

from sqlalchemy import update


class InputSchema(BasicModel):
    first_name: str
    last_name: str
    email: str
    type_id: int


class Repository(BaseRepository):

    async def email_registered(self, email):
        # this method is being used only in this endpoint
        # I don't want to put in UsersRepository
        # sorry I couldn't figure out more realistic example
        stmt = select("1").where(User.email == email)
        return await self._session.scalar(stmt)

    async def some_method_that_modifies_something(self):
        stmt = update(...)
        await self._session.execute(stmt)
        # sorry I couldn't figure out example



class Service:

    def __init__(
        self,
        repository: Repository = Depends(Repository),
        users_repository: UsersRepository = Depends(UsersRepository),
        user_types_repository: UserTypesRepository = Depends(UserTypesRepository),
    ):
        # all repositories share the same SQLAlchemy session
        self.repository = repository
        self.users_repository = users_repository
        self.user_types_repository = user_types_repository

    async def create_user(self, input: InputSchema):
        # first check if email already registered
        # this check is being done first because sql request will be faster then http request further
        # transaction is being opened
        if self.repository.email_registered(input.email):
            raise HTTPException(status_code=400, detail="User already registered")
        # idle in transaction
        await self.repository.some_method_that_modifies_something()
        # idle in transaction
        # here we cannot commit to stop transaction for optimization purposes before "long" http request
        # because I need to commit it all or nothing
        # http request lasts about 0.5 second
        if not self.user_exists_in_keycloak(input.email):
            raise HTTPException(status_code=400, detail="User does not exist in keycloak")
        type = await self.user_types_repository.get_by_pk(input.type_id)
        # idle in transaction
        if not type:
            raise HTTPException(status_code=400, detail="User type not found")
        # idle in transaction
        user = self.users_repository.create(first_name=input.first_name, last_name=input.last_name, type=type)
        # idle in transaction
        return user


    async def user_exists_in_keycloak(self, email) -> bool:
        # http request to keycloak to represent "long" pause when connection and transaction are idle uselessly


@app.post("/user")  # app is app = FastAPI()
async def create_user(input: InputSchema, service: Service = Depends(Service)):
    return await service.create_user(input)

The above example of the service-repository connection works, but there are questions about the use of DB resources (transactions, connections).

You can see in the Service.create_user method the transaction is opened on the first request and closed in dependency get_session(). During an http request to keycloak the transaction is idle for a long time. This takes up resources. In addition to the transaction, the connection is uselessly idle. All this wastes DB resources, as far as I can tell. I would like to get rid of these idle times.

QUESTION: how to get rid of these idle times?

I see optimization this way. After each select request a commit must be performed, and the changes are recorded once at the end in dependency get_session(). But it will probably be strange to manually do sessionmit() after calling each select method?

And probably two sessions will be required here: one with AUTOCOMMIT for select queries, the second - in which changes will be accumulated and which will be committed at the end?

This question is inspired by the articles https://www.gias/blog/prevent-idle-in-transaction-engineering and https://capnfabs/posts/sqlalchemy-connection-management/.

SQLALchemy session lifecycle management in the context of the FastAPI framework is driving me crazy. At the moment we are writing repositories as they do in various tutorials on the Internet, as well as services, views. It looks like this.

Base repositories:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker


engine = create_async_engine(settings.DB.async_dns, **settings.DB.OPTIONS)
Session = async_sessionmaker(bind=engine, expire_on_commit=False)


# dependency that manages commits
async def get_session():
    session = Session()
    try:
        yield session
        await sessionmit()
    except Exception as e:
        await session.rollback()
        raise e
    finally:
        await session.close()


class BaseRepository:
    def __init__(self, session: AsyncSession = Depends(get_session)):
        self._session = session

    async def count(self, stmt: Select) -> int:
        stmt = stmt.with_only_columns(func.count(literal_column("1")), maintain_column_froms=True)
        return await self._session.scalar(stmt)

    async def update_instance(self, instance, update_data: dict):
        for field, value in update_data.items():
            setattr(instance, field, value)
        await self._session.flush()
        return instance

    async def scalar_or_raise(self, stmt, error: str = None):
        if result := await self._session.scalar(stmt):
            return result
        if error:
            raise NotFoundError(error)
        raise NotFoundError

    # another methods


class ModelRepository(BaseRepository):
    model = None

    def __init__(self, session: AsyncSession = Depends(get_session)):
        if not self.model:
            raise ValueError()
        super().__init__(session)

    async def get_by_pk(self, pk):
        if instance := await self._session.get(self.model, pk):
            return instance
        raise NotFoundError

    async def create(self, **kwargs):
        # commits are performed by the calling code
        instance = self.model(**kwargs)
        self._session.add(instance)
        return instance

    async def bulk_create(self, values: list[dict]):
        # commits are performed by the calling code
        instances = [self.model(**item) for item in values]
        self._session.add_all(instances)
        return instances

    # another methods

Let there be SQLAlchemy models:

class UserType(Base):
    # catalog model
    __tablename__ = "user_types"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    code: Mapped[str] = mapped_column(unique=True)  # f.e. student, teacher, cleaner
    name: Mapped[str]  # f.e. Student, Teacher, Cleaner


class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    first_name: Mapped[str]
    last_name: Mapped[str]
    email: Mapped[str] = mapped_column(unique=True)
    type_id: Mapped[int] = mapped_column(ForeignKey("user_types.id", ondelete="cascade"))

Repositories for models:

from sqlalchemy import select


class UsersRepository(ModelRepository):
    model = User


class UserTypesRepository(ModelRepository):
    model = UserType

    # for catalog models I like doing methods like this:

    async def get_STUDENT_type(self):
        return await self.get_by_code("student")

    async def get_TEACHER_type(self):
        return await self.get_by_code("teacher")

    async def get_CLEANER_type(self):
        return await self.get_by_code("cleaner")

    async def get_by_code(self, code):
        stmt = select(self.model).where(self.model.code == code)
        return await self.scalar_or_raise(stmt, "User type not found")

For each endpoint, a service and repositories are created:


# create_user.py

from sqlalchemy import update


class InputSchema(BasicModel):
    first_name: str
    last_name: str
    email: str
    type_id: int


class Repository(BaseRepository):

    async def email_registered(self, email):
        # this method is being used only in this endpoint
        # I don't want to put in UsersRepository
        # sorry I couldn't figure out more realistic example
        stmt = select("1").where(User.email == email)
        return await self._session.scalar(stmt)

    async def some_method_that_modifies_something(self):
        stmt = update(...)
        await self._session.execute(stmt)
        # sorry I couldn't figure out example



class Service:

    def __init__(
        self,
        repository: Repository = Depends(Repository),
        users_repository: UsersRepository = Depends(UsersRepository),
        user_types_repository: UserTypesRepository = Depends(UserTypesRepository),
    ):
        # all repositories share the same SQLAlchemy session
        self.repository = repository
        self.users_repository = users_repository
        self.user_types_repository = user_types_repository

    async def create_user(self, input: InputSchema):
        # first check if email already registered
        # this check is being done first because sql request will be faster then http request further
        # transaction is being opened
        if self.repository.email_registered(input.email):
            raise HTTPException(status_code=400, detail="User already registered")
        # idle in transaction
        await self.repository.some_method_that_modifies_something()
        # idle in transaction
        # here we cannot commit to stop transaction for optimization purposes before "long" http request
        # because I need to commit it all or nothing
        # http request lasts about 0.5 second
        if not self.user_exists_in_keycloak(input.email):
            raise HTTPException(status_code=400, detail="User does not exist in keycloak")
        type = await self.user_types_repository.get_by_pk(input.type_id)
        # idle in transaction
        if not type:
            raise HTTPException(status_code=400, detail="User type not found")
        # idle in transaction
        user = self.users_repository.create(first_name=input.first_name, last_name=input.last_name, type=type)
        # idle in transaction
        return user


    async def user_exists_in_keycloak(self, email) -> bool:
        # http request to keycloak to represent "long" pause when connection and transaction are idle uselessly


@app.post("/user")  # app is app = FastAPI()
async def create_user(input: InputSchema, service: Service = Depends(Service)):
    return await service.create_user(input)

The above example of the service-repository connection works, but there are questions about the use of DB resources (transactions, connections).

You can see in the Service.create_user method the transaction is opened on the first request and closed in dependency get_session(). During an http request to keycloak the transaction is idle for a long time. This takes up resources. In addition to the transaction, the connection is uselessly idle. All this wastes DB resources, as far as I can tell. I would like to get rid of these idle times.

QUESTION: how to get rid of these idle times?

I see optimization this way. After each select request a commit must be performed, and the changes are recorded once at the end in dependency get_session(). But it will probably be strange to manually do sessionmit() after calling each select method?

And probably two sessions will be required here: one with AUTOCOMMIT for select queries, the second - in which changes will be accumulated and which will be committed at the end?

Share Improve this question asked Feb 20 at 14:44 Альберт АлександровАльберт Александров 8652 gold badges20 silver badges44 bronze badges 4
  • Have you verified that this actually is a problem for your request flow? Or are you imagining that this will be a problem in the future? If you don't want to have a transaction active in specific parts of your code, commit and don't start a new transaction while waiting for answers from the service outside (or don't start the transaction at all before having what you need). If you don't have any issues - I'm not sure if trying to swap it for something else will show that it's the right choice for the issues you're having. – MatsLindh Commented Feb 21 at 7:17
  • Also, doesn't "recording changes and handling them at the end of the request" just move the transaction handling to your application? How do you handle conflicting updates between running requests? How do you do transaction isolation? Isolated reads if you need them? Your application should probably just commit when it's sure that it wants to commit, instead of keeping a transaction open for a very long time (be aware that the article talks about background tasks that may remain open for a long long time, and not what's usually the case in request/response. Is half a second an issue there? – MatsLindh Commented Feb 21 at 7:19
  • @MatsLindh, thanks for comments. Yep, I'm imagining that this will be a problem in the future. About conflicts and isolation level is just in example code. Half a second is just an example. I don't start transactions unlike SQLAlchemy) – Альберт Александров Commented Feb 21 at 18:01
  • 1 Your transaction is not idle when it's actively querying the db. As soon as your create_user is done with the tasks, sessionmit() at the end will close the transaction and the connection will be given back to the connection pool. – defalt Commented Feb 22 at 21:50
Add a comment  | 

1 Answer 1

Reset to default 2

You're running into the classic "idle in transaction" problem with SQLAlchemy and FastAPI. Here's a more optimized approach:

# Create two session factories
ReadSession = async_sessionmaker(bind=engine, expire_on_commit=False, autocommit=True)
WriteSession = async_sessionmaker(bind=engine, expire_on_commit=False)

# Context manager for explicit transaction control
@asynccontextmanager
async def transaction(session):
    try:
        yield session
        await sessionmit()
    except Exception as e:
        await session.rollback()
        raise e

Then in your service:

class UserService:
    def __init__(self, read_session=Depends(get_read_session), write_session=Depends(get_write_session)):
        self.read_session = read_session
        self.write_session = write_session
        self.users_repo_read = UsersRepository(read_session)
        self.users_repo_write = UsersRepository(write_session)
    
    async def create_user(self, input_data):
        # Read operations with autocommit
        if await self.users_repo_read.email_registered(input_data["email"]):
            raise HTTPException(status_code=400, detail="User already registered")
        
        # External HTTP call - no transaction open
        keycloak_user_exists = await self.user_exists_in_keycloak(input_data["email"])
        
        # Write operation with explicit transaction
        async with transaction(self.write_session):
            user = await self.users_repo_write.create(...)
            return user

This separates read/write operations and ensures transactions are only open when needed. No more idle connections during HTTP calls!

本文标签: