admin管理员组文章数量:1289539
This question is inspired by the articles and /.
SQLALchemy session lifecycle management in the context of the FastAPI framework is driving me crazy. At the moment we are writing repositories as they do in various tutorials on the Internet, as well as services, views. It looks like this.
Base repositories:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
engine = create_async_engine(settings.DB.async_dns, **settings.DB.OPTIONS)
Session = async_sessionmaker(bind=engine, expire_on_commit=False)
# dependency that manages commits
async def get_session():
session = Session()
try:
yield session
await sessionmit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
class BaseRepository:
def __init__(self, session: AsyncSession = Depends(get_session)):
self._session = session
async def count(self, stmt: Select) -> int:
stmt = stmt.with_only_columns(func.count(literal_column("1")), maintain_column_froms=True)
return await self._session.scalar(stmt)
async def update_instance(self, instance, update_data: dict):
for field, value in update_data.items():
setattr(instance, field, value)
await self._session.flush()
return instance
async def scalar_or_raise(self, stmt, error: str = None):
if result := await self._session.scalar(stmt):
return result
if error:
raise NotFoundError(error)
raise NotFoundError
# another methods
class ModelRepository(BaseRepository):
model = None
def __init__(self, session: AsyncSession = Depends(get_session)):
if not self.model:
raise ValueError()
super().__init__(session)
async def get_by_pk(self, pk):
if instance := await self._session.get(self.model, pk):
return instance
raise NotFoundError
async def create(self, **kwargs):
# commits are performed by the calling code
instance = self.model(**kwargs)
self._session.add(instance)
return instance
async def bulk_create(self, values: list[dict]):
# commits are performed by the calling code
instances = [self.model(**item) for item in values]
self._session.add_all(instances)
return instances
# another methods
Let there be SQLAlchemy models:
class UserType(Base):
# catalog model
__tablename__ = "user_types"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
code: Mapped[str] = mapped_column(unique=True) # f.e. student, teacher, cleaner
name: Mapped[str] # f.e. Student, Teacher, Cleaner
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
first_name: Mapped[str]
last_name: Mapped[str]
email: Mapped[str] = mapped_column(unique=True)
type_id: Mapped[int] = mapped_column(ForeignKey("user_types.id", ondelete="cascade"))
Repositories for models:
from sqlalchemy import select
class UsersRepository(ModelRepository):
model = User
class UserTypesRepository(ModelRepository):
model = UserType
# for catalog models I like doing methods like this:
async def get_STUDENT_type(self):
return await self.get_by_code("student")
async def get_TEACHER_type(self):
return await self.get_by_code("teacher")
async def get_CLEANER_type(self):
return await self.get_by_code("cleaner")
async def get_by_code(self, code):
stmt = select(self.model).where(self.model.code == code)
return await self.scalar_or_raise(stmt, "User type not found")
For each endpoint, a service and repositories are created:
# create_user.py
from sqlalchemy import update
class InputSchema(BasicModel):
first_name: str
last_name: str
email: str
type_id: int
class Repository(BaseRepository):
async def email_registered(self, email):
# this method is being used only in this endpoint
# I don't want to put in UsersRepository
# sorry I couldn't figure out more realistic example
stmt = select("1").where(User.email == email)
return await self._session.scalar(stmt)
async def some_method_that_modifies_something(self):
stmt = update(...)
await self._session.execute(stmt)
# sorry I couldn't figure out example
class Service:
def __init__(
self,
repository: Repository = Depends(Repository),
users_repository: UsersRepository = Depends(UsersRepository),
user_types_repository: UserTypesRepository = Depends(UserTypesRepository),
):
# all repositories share the same SQLAlchemy session
self.repository = repository
self.users_repository = users_repository
self.user_types_repository = user_types_repository
async def create_user(self, input: InputSchema):
# first check if email already registered
# this check is being done first because sql request will be faster then http request further
# transaction is being opened
if self.repository.email_registered(input.email):
raise HTTPException(status_code=400, detail="User already registered")
# idle in transaction
await self.repository.some_method_that_modifies_something()
# idle in transaction
# here we cannot commit to stop transaction for optimization purposes before "long" http request
# because I need to commit it all or nothing
# http request lasts about 0.5 second
if not self.user_exists_in_keycloak(input.email):
raise HTTPException(status_code=400, detail="User does not exist in keycloak")
type = await self.user_types_repository.get_by_pk(input.type_id)
# idle in transaction
if not type:
raise HTTPException(status_code=400, detail="User type not found")
# idle in transaction
user = self.users_repository.create(first_name=input.first_name, last_name=input.last_name, type=type)
# idle in transaction
return user
async def user_exists_in_keycloak(self, email) -> bool:
# http request to keycloak to represent "long" pause when connection and transaction are idle uselessly
@app.post("/user") # app is app = FastAPI()
async def create_user(input: InputSchema, service: Service = Depends(Service)):
return await service.create_user(input)
The above example of the service-repository connection works, but there are questions about the use of DB resources (transactions, connections).
You can see in the Service.create_user method the transaction is opened on the first request and closed in dependency get_session()
.
During an http request to keycloak the transaction is idle for a long time. This takes up resources. In addition to the transaction, the connection
is uselessly idle. All this wastes DB resources, as far as I can tell. I would like to get rid of these idle times.
QUESTION: how to get rid of these idle times?
I see optimization this way. After each select request a commit must be performed, and the changes are recorded once at the end in dependency get_session()
.
But it will probably be strange to manually do sessionmit()
after calling each select method?
And probably two sessions will be required here: one with AUTOCOMMIT
for select queries, the second - in which changes will be accumulated and which will be committed at the end?
This question is inspired by the articles https://www.gias/blog/prevent-idle-in-transaction-engineering and https://capnfabs/posts/sqlalchemy-connection-management/.
SQLALchemy session lifecycle management in the context of the FastAPI framework is driving me crazy. At the moment we are writing repositories as they do in various tutorials on the Internet, as well as services, views. It looks like this.
Base repositories:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
engine = create_async_engine(settings.DB.async_dns, **settings.DB.OPTIONS)
Session = async_sessionmaker(bind=engine, expire_on_commit=False)
# dependency that manages commits
async def get_session():
session = Session()
try:
yield session
await sessionmit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
class BaseRepository:
def __init__(self, session: AsyncSession = Depends(get_session)):
self._session = session
async def count(self, stmt: Select) -> int:
stmt = stmt.with_only_columns(func.count(literal_column("1")), maintain_column_froms=True)
return await self._session.scalar(stmt)
async def update_instance(self, instance, update_data: dict):
for field, value in update_data.items():
setattr(instance, field, value)
await self._session.flush()
return instance
async def scalar_or_raise(self, stmt, error: str = None):
if result := await self._session.scalar(stmt):
return result
if error:
raise NotFoundError(error)
raise NotFoundError
# another methods
class ModelRepository(BaseRepository):
model = None
def __init__(self, session: AsyncSession = Depends(get_session)):
if not self.model:
raise ValueError()
super().__init__(session)
async def get_by_pk(self, pk):
if instance := await self._session.get(self.model, pk):
return instance
raise NotFoundError
async def create(self, **kwargs):
# commits are performed by the calling code
instance = self.model(**kwargs)
self._session.add(instance)
return instance
async def bulk_create(self, values: list[dict]):
# commits are performed by the calling code
instances = [self.model(**item) for item in values]
self._session.add_all(instances)
return instances
# another methods
Let there be SQLAlchemy models:
class UserType(Base):
# catalog model
__tablename__ = "user_types"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
code: Mapped[str] = mapped_column(unique=True) # f.e. student, teacher, cleaner
name: Mapped[str] # f.e. Student, Teacher, Cleaner
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
first_name: Mapped[str]
last_name: Mapped[str]
email: Mapped[str] = mapped_column(unique=True)
type_id: Mapped[int] = mapped_column(ForeignKey("user_types.id", ondelete="cascade"))
Repositories for models:
from sqlalchemy import select
class UsersRepository(ModelRepository):
model = User
class UserTypesRepository(ModelRepository):
model = UserType
# for catalog models I like doing methods like this:
async def get_STUDENT_type(self):
return await self.get_by_code("student")
async def get_TEACHER_type(self):
return await self.get_by_code("teacher")
async def get_CLEANER_type(self):
return await self.get_by_code("cleaner")
async def get_by_code(self, code):
stmt = select(self.model).where(self.model.code == code)
return await self.scalar_or_raise(stmt, "User type not found")
For each endpoint, a service and repositories are created:
# create_user.py
from sqlalchemy import update
class InputSchema(BasicModel):
first_name: str
last_name: str
email: str
type_id: int
class Repository(BaseRepository):
async def email_registered(self, email):
# this method is being used only in this endpoint
# I don't want to put in UsersRepository
# sorry I couldn't figure out more realistic example
stmt = select("1").where(User.email == email)
return await self._session.scalar(stmt)
async def some_method_that_modifies_something(self):
stmt = update(...)
await self._session.execute(stmt)
# sorry I couldn't figure out example
class Service:
def __init__(
self,
repository: Repository = Depends(Repository),
users_repository: UsersRepository = Depends(UsersRepository),
user_types_repository: UserTypesRepository = Depends(UserTypesRepository),
):
# all repositories share the same SQLAlchemy session
self.repository = repository
self.users_repository = users_repository
self.user_types_repository = user_types_repository
async def create_user(self, input: InputSchema):
# first check if email already registered
# this check is being done first because sql request will be faster then http request further
# transaction is being opened
if self.repository.email_registered(input.email):
raise HTTPException(status_code=400, detail="User already registered")
# idle in transaction
await self.repository.some_method_that_modifies_something()
# idle in transaction
# here we cannot commit to stop transaction for optimization purposes before "long" http request
# because I need to commit it all or nothing
# http request lasts about 0.5 second
if not self.user_exists_in_keycloak(input.email):
raise HTTPException(status_code=400, detail="User does not exist in keycloak")
type = await self.user_types_repository.get_by_pk(input.type_id)
# idle in transaction
if not type:
raise HTTPException(status_code=400, detail="User type not found")
# idle in transaction
user = self.users_repository.create(first_name=input.first_name, last_name=input.last_name, type=type)
# idle in transaction
return user
async def user_exists_in_keycloak(self, email) -> bool:
# http request to keycloak to represent "long" pause when connection and transaction are idle uselessly
@app.post("/user") # app is app = FastAPI()
async def create_user(input: InputSchema, service: Service = Depends(Service)):
return await service.create_user(input)
The above example of the service-repository connection works, but there are questions about the use of DB resources (transactions, connections).
You can see in the Service.create_user method the transaction is opened on the first request and closed in dependency get_session()
.
During an http request to keycloak the transaction is idle for a long time. This takes up resources. In addition to the transaction, the connection
is uselessly idle. All this wastes DB resources, as far as I can tell. I would like to get rid of these idle times.
QUESTION: how to get rid of these idle times?
I see optimization this way. After each select request a commit must be performed, and the changes are recorded once at the end in dependency get_session()
.
But it will probably be strange to manually do sessionmit()
after calling each select method?
And probably two sessions will be required here: one with AUTOCOMMIT
for select queries, the second - in which changes will be accumulated and which will be committed at the end?
1 Answer
Reset to default 2You're running into the classic "idle in transaction" problem with SQLAlchemy and FastAPI. Here's a more optimized approach:
# Create two session factories
ReadSession = async_sessionmaker(bind=engine, expire_on_commit=False, autocommit=True)
WriteSession = async_sessionmaker(bind=engine, expire_on_commit=False)
# Context manager for explicit transaction control
@asynccontextmanager
async def transaction(session):
try:
yield session
await sessionmit()
except Exception as e:
await session.rollback()
raise e
Then in your service:
class UserService:
def __init__(self, read_session=Depends(get_read_session), write_session=Depends(get_write_session)):
self.read_session = read_session
self.write_session = write_session
self.users_repo_read = UsersRepository(read_session)
self.users_repo_write = UsersRepository(write_session)
async def create_user(self, input_data):
# Read operations with autocommit
if await self.users_repo_read.email_registered(input_data["email"]):
raise HTTPException(status_code=400, detail="User already registered")
# External HTTP call - no transaction open
keycloak_user_exists = await self.user_exists_in_keycloak(input_data["email"])
# Write operation with explicit transaction
async with transaction(self.write_session):
user = await self.users_repo_write.create(...)
return user
This separates read/write operations and ensures transactions are only open when needed. No more idle connections during HTTP calls!
本文标签:
版权声明:本文标题:How to avoid unnecesary Idle in transaction in FastAPI - SQLAlchemy application with repository pattern? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741427536a2378163.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
create_user
is done with the tasks,sessionmit()
at the end will close the transaction and the connection will be given back to the connection pool. – defalt Commented Feb 22 at 21:50