admin管理员组

文章数量:1277395

I have two wrapper functions to create an engine, a sessionmaker and a session:

# module connection.py

@functools.lru_cache(maxsize=None)
def get_session() -> Session:
    return _get_sessionmaker()()

@functools.lru_cache(maxsize=None)
def _get_sessionmaker() -> sessionmaker:
    engine = get_engine()
    sessionfactory = sessionmaker(bind=engine)
    return sessionfactory

@functools.lru_cache(maxsize=None)
def get_engine():
    db_url = <our-db-url>
    engine = sqlalchemy.create_engine(db_url,json_serializer=lambda obj: json.dumps(obj, ensure_ascii=False))

    # set search path for Postgres
    schema_names = schema_names_glob
    @event.listens_for(engine, "connect", insert=True)
    def set_search_path(dbapi_connection):
        existing_autocommit = dbapi_connection.autocommit
        dbapi_connection.autocommit = True
        cursor = dbapi_connection.cursor()
        cursor.execute("SET SESSION search_path='%s'" % ",".join(schema_names))
        cursor.close()
        dbapi_connection.autocommit = existing_autocommit

    return engine

I call this in the function myfun() with a context manager, thus I expect the session to close properly:

from connections import *
def myfun(): 
    with get_session() as session, session.begin():
        # do some query

However, I noticed that the session/connection doesn't get closed/returned properly (see test below). (This shouldn't be the case, because eventually I have an issue with too many connections on my server.) Why is this the case?

I would like to understand this behavior. If I just run it without the wrappers, there is no problem:

def myfun_withoutwrapper(): 
    engine = get_engine()
    Session = sessionmaker(bind=engine)
    with Session() as session, session.begin():
        # some query

Thanks!


I test the number of connections before and after executing myfun() with the following code:

# test number of connections before and after
engine = get_engine()

count_sql = """SELECT COUNT(*) FROM pg_stat_activity;"""

with engine.connect() as conn:
    pre_count_df = pd.read_sql(count_sql, conn)

# run myfun()
_ = myfun()

with engine.connect() as conn:
    post_count_df = pd.read_sql(count_sql, conn)

What I get is a difference of 1, i.e. after running myfun() I have one connection more, which shouldn't be the case. (As a proof, I ran the same counting test without calling myfun(), and I got the same number of connections. So the difference of 1 definitely comes from myfun(), regardless how many times I run it.)

I use Python 3.12.8, sqlalchemy 2.0.38, with a postgres 16 database.

I have two wrapper functions to create an engine, a sessionmaker and a session:

# module connection.py

@functools.lru_cache(maxsize=None)
def get_session() -> Session:
    return _get_sessionmaker()()

@functools.lru_cache(maxsize=None)
def _get_sessionmaker() -> sessionmaker:
    engine = get_engine()
    sessionfactory = sessionmaker(bind=engine)
    return sessionfactory

@functools.lru_cache(maxsize=None)
def get_engine():
    db_url = <our-db-url>
    engine = sqlalchemy.create_engine(db_url,json_serializer=lambda obj: json.dumps(obj, ensure_ascii=False))

    # set search path for Postgres
    schema_names = schema_names_glob
    @event.listens_for(engine, "connect", insert=True)
    def set_search_path(dbapi_connection):
        existing_autocommit = dbapi_connection.autocommit
        dbapi_connection.autocommit = True
        cursor = dbapi_connection.cursor()
        cursor.execute("SET SESSION search_path='%s'" % ",".join(schema_names))
        cursor.close()
        dbapi_connection.autocommit = existing_autocommit

    return engine

I call this in the function myfun() with a context manager, thus I expect the session to close properly:

from connections import *
def myfun(): 
    with get_session() as session, session.begin():
        # do some query

However, I noticed that the session/connection doesn't get closed/returned properly (see test below). (This shouldn't be the case, because eventually I have an issue with too many connections on my server.) Why is this the case?

I would like to understand this behavior. If I just run it without the wrappers, there is no problem:

def myfun_withoutwrapper(): 
    engine = get_engine()
    Session = sessionmaker(bind=engine)
    with Session() as session, session.begin():
        # some query

Thanks!


I test the number of connections before and after executing myfun() with the following code:

# test number of connections before and after
engine = get_engine()

count_sql = """SELECT COUNT(*) FROM pg_stat_activity;"""

with engine.connect() as conn:
    pre_count_df = pd.read_sql(count_sql, conn)

# run myfun()
_ = myfun()

with engine.connect() as conn:
    post_count_df = pd.read_sql(count_sql, conn)

What I get is a difference of 1, i.e. after running myfun() I have one connection more, which shouldn't be the case. (As a proof, I ran the same counting test without calling myfun(), and I got the same number of connections. So the difference of 1 definitely comes from myfun(), regardless how many times I run it.)

I use Python 3.12.8, sqlalchemy 2.0.38, with a postgres 16 database.

Share Improve this question edited Feb 27 at 12:49 cafce25 27.5k5 gold badges44 silver badges56 bronze badges asked Feb 24 at 10:53 duepiertduepiert 835 bronze badges 6
  • The cache will cache the session essentially holding it forever, I'm not sure why you'd want this, it can't be cleaned up. Also the session itself is not thread-safe. So you couldn't/shouldn't use the cached instance between threads. Although I would think you would have TOO FEW connections. Maybe you could include more information about your intended use for this and we could figure out the problem from there. Please include get_engine as well. – Ian Wilson Commented Feb 25 at 18:25
  • Added the code for get_engine, though a bit shortened for simplicity. Essentially, we have one module that manages databases and how to access them. This is where get_engine and get_session are defined. The module is imported by diverse streamlit-based utility webapps as well as our prefect-based ETL process. What would be the best way to define and use the functions in these contexts? – duepiert Commented Feb 26 at 12:46
  • streamlit appears to have native support for connecting to databases, is there a reason you did not want to use the builtin connection? – Ian Wilson Commented Feb 26 at 23:37
  • This would be too unflexible as we also need to access the database for other reasons than streamlit only. We would need to handle the additional streamlits .toml files which would mean unnecessary overhead for us. Also, the streamlit connector is based on psycopg2 whereas not using the streamlit connector allows us to use the third version of psycopg. – duepiert Commented Feb 27 at 8:50
  • Are you calling get_session from multiple threads? – defalt Commented Feb 27 at 14:51
 |  Show 1 more comment

1 Answer 1

Reset to default 0

Briefly looking at prefect I would say you need to create a unique session in each "task" but you might need to be careful if they are long running. The engine and session maker can be defined at the module level or really just defined anywhere but only once. I like to setup mine via a single function call so I can pass in dynamic configuration from a file or os environ variable but that is not necessary.

Prefect example untested


def get_sessionmaker(engine):
    Session = sessionmaker(bind=engine)
    return Session

def get_engine(settings):
    engine = sqlalchemy.create_engine(settings['db_url'])
    #...
    return engine

# thread safe and can be declared globally
engine = get_engine(settings)

# thread safe and can be declared globally
Session = get_sessionmaker(engine)

@task
def process_result(crawled_url):
    with Session.begin() as session:
        q = select(Page).where(Page.processing_state == 'PROCESSING', Page.url==crawled_url).limit(1)
        q = q.with_for_update(skip_locked=True).limit(1)
        page = session.scalars(q).first()
        if page:
            # bit of work
            page.processing_state = 'COMPLETE'
            # bit more work and implicit commit after or rollback if exception
            # session is closed implicitly and connection is returned to pool

# This could be repeated every minute or something or maybe you can chain them to restart after they end?
@flow
def process_page_results():
    # pull data set urls out and 
    # then feed them into parallel tasks for processing
    with Session.begin() as session:
        # Process 50 at a time
        # assumes another flow is filling the database up with new data
        q = select(Page).where(Page.processing_state == 'NEW').limit(50)
        # Lock before changing state flag, make sure results are sorted
        # deterministically to prevent a lockup.
        q = q.with_for_update().order_by(Page.id)
        # Extract urls and get out of the session asap.
        urls = []
        for page in session.scalars(q).all()
            page.processing_state = 'PROCESSING'
            urls.append(page.url)
        # Implicit commit releases row locks and returns connection

    # Dispatch parallel tasks, now that the session has ended
    for url in urls:
        process_result(url)


if __name__ == "__main__":
    process_page_results()

Streamlit example untested

Streamlit on the other hand is going a lot of crazy stuff but you might try this:

# session factory is THREAD SAFE, probably not serializable 
@st.cache_resource
def get_sessionmaker_and_engine(settings) -> sessionmaker:
    engine = get_engine(settings)
    Session = sessionmaker(bind=engine)
    return (engine, Session)

# engine is THREAD SAFE, probably not serializable
@st.cache_resource
def get_engine(settings):
    engine = sqlalchemy.create_engine(settings['db_url'])
    #...
    return engine

# THIS IS THE REGULAR CACHE but a dict should serialize just fine.
@st.cache_data
def get_settings():
   return {'db_url': os.environ['DB_URL']}


def on_every_request():
    # Settings should be deterministic to get same maker and 
    # engine back on repeated calls.
    settings = get_settings()
    # I do this weird tuple return because I'm not sure if
    # the engine could be hashed for the 
    # session maker 
    engine, Session = get_sessionmaker_and_engine(settings)


    # The session instance is NOT thread safe
    # so must be setup and tore down on each run.
    with Session.begin() as session:
        # bit of work
        sessionmit()
        # bit of work
        sessionmit()
        # bit of work and implicit commit after or rollback if exception
        # session is closed implicitly and connection is returned to pool

https://docs.streamlit.io/develop/concepts/architecture/caching#examples-1

本文标签: pythonUsing sessionmakerwhy do I get too many connectionsStack Overflow