admin管理员组

文章数量:1333690

In FastAPI application the global session object was used to query the DB. The session_decorator was used to provide all query functions with safe mechanism to rollback if any error occurs.

engine = engine = create_engine(DB_URL)

Session = sessionmaker(bind=engine)
session = Session()


def session_decorator(func):
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except HTTPException as e:
            raise e
        except Exception as e:
            session.rollback()
            logger.exception(f"An error occurred during the database query: {e}")
            raise HTTPException(status_code=500, detail=f"An error occurred, please try again later.")

    return wrapper

On production release, from time to time connection to the DB where terminated and all API requests freezes with erros:

...
This session is in 'prepared' state; no further SQL can be emitted within this transaction.
...
sqlalchemy.exc.IllegalStateChangeError: Method 'rollback()' can't be called here; method 'commit()' is already in progress and this would cause an unexpected state change to <SessionTransactionState.CLOSED: 5>
...
sqlalchemy.exc.InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction.
...

Also, in the DigitalOcean PostgreSQL dashboard idle transactions was shown for ~10-20 minutes of lifetime.

In order to fix this, I implemented another decorator:

class SqlAlchemy:
    def __init__(self, database_url: str):
        self.engine = create_engine(
            database_url,
            echo=False,
            pool_size=15,
            max_overflow=5,
            pool_timeout=30,
            pool_recycle=300,  # Recycle connections after 5 minutes
            pool_pre_ping=True,
        )
        #self.engine = create_engine(database_url, echo=True)
        self.session = sessionmaker(self.engine, expire_on_commit=False)
        self.base = declarative_base()

db = SqlAlchemy(DB_URL)


def session_decorator(func):
    def wrapper(*args, **kwargs):
        with in_transaction(db.session) as session:
            try:
                result = func(session, *args, **kwargs)
                return result
            except Exception as e:
                # Let the context manager handle rollback
                logger.error(f"Error in session decorator: {e}")
                raise e
    return wrapper

@contextmanager
def in_transaction(session):
    session = session()
    try:
        yield session
        session.flush()
    except Exception as r:
        logger.error(f"Rolling back transaction: {r}")
    finally:
        try:
            if session.in_transaction():
                session.invalidate()
            session.close()
            session.bind.dispose()
        except Exception as e:
            logger.error(f"Error closing session: {e}")

But this now causing 2 issues:

  • refactor all code to JOIN where it's needed, because relationship attributes are no longer accessible outside of a query function
  • server response time dropped significantly

Any ideas how can this be fixed?

本文标签: pythonSqlAlchemy session connection error in FastAPIStack Overflow