admin管理员组文章数量:1277395
I have two wrapper functions to create an engine, a sessionmaker and a session:
# module connection.py
@functools.lru_cache(maxsize=None)
def get_session() -> Session:
return _get_sessionmaker()()
@functools.lru_cache(maxsize=None)
def _get_sessionmaker() -> sessionmaker:
engine = get_engine()
sessionfactory = sessionmaker(bind=engine)
return sessionfactory
@functools.lru_cache(maxsize=None)
def get_engine():
db_url = <our-db-url>
engine = sqlalchemy.create_engine(db_url,json_serializer=lambda obj: json.dumps(obj, ensure_ascii=False))
# set search path for Postgres
schema_names = schema_names_glob
@event.listens_for(engine, "connect", insert=True)
def set_search_path(dbapi_connection):
existing_autocommit = dbapi_connection.autocommit
dbapi_connection.autocommit = True
cursor = dbapi_connection.cursor()
cursor.execute("SET SESSION search_path='%s'" % ",".join(schema_names))
cursor.close()
dbapi_connection.autocommit = existing_autocommit
return engine
I call this in the function myfun()
with a context manager, thus I expect the session to close properly:
from connections import *
def myfun():
with get_session() as session, session.begin():
# do some query
However, I noticed that the session/connection doesn't get closed/returned properly (see test below). (This shouldn't be the case, because eventually I have an issue with too many connections on my server.) Why is this the case?
I would like to understand this behavior. If I just run it without the wrappers, there is no problem:
def myfun_withoutwrapper():
engine = get_engine()
Session = sessionmaker(bind=engine)
with Session() as session, session.begin():
# some query
Thanks!
I test the number of connections before and after executing myfun()
with the following code:
# test number of connections before and after
engine = get_engine()
count_sql = """SELECT COUNT(*) FROM pg_stat_activity;"""
with engine.connect() as conn:
pre_count_df = pd.read_sql(count_sql, conn)
# run myfun()
_ = myfun()
with engine.connect() as conn:
post_count_df = pd.read_sql(count_sql, conn)
What I get is a difference of 1, i.e. after running myfun()
I have one connection more, which shouldn't be the case. (As a proof, I ran the same counting test without calling myfun()
, and I got the same number of connections. So the difference of 1 definitely comes from myfun()
, regardless how many times I run it.)
I use Python 3.12.8, sqlalchemy 2.0.38, with a postgres 16 database.
I have two wrapper functions to create an engine, a sessionmaker and a session:
# module connection.py
@functools.lru_cache(maxsize=None)
def get_session() -> Session:
return _get_sessionmaker()()
@functools.lru_cache(maxsize=None)
def _get_sessionmaker() -> sessionmaker:
engine = get_engine()
sessionfactory = sessionmaker(bind=engine)
return sessionfactory
@functools.lru_cache(maxsize=None)
def get_engine():
db_url = <our-db-url>
engine = sqlalchemy.create_engine(db_url,json_serializer=lambda obj: json.dumps(obj, ensure_ascii=False))
# set search path for Postgres
schema_names = schema_names_glob
@event.listens_for(engine, "connect", insert=True)
def set_search_path(dbapi_connection):
existing_autocommit = dbapi_connection.autocommit
dbapi_connection.autocommit = True
cursor = dbapi_connection.cursor()
cursor.execute("SET SESSION search_path='%s'" % ",".join(schema_names))
cursor.close()
dbapi_connection.autocommit = existing_autocommit
return engine
I call this in the function myfun()
with a context manager, thus I expect the session to close properly:
from connections import *
def myfun():
with get_session() as session, session.begin():
# do some query
However, I noticed that the session/connection doesn't get closed/returned properly (see test below). (This shouldn't be the case, because eventually I have an issue with too many connections on my server.) Why is this the case?
I would like to understand this behavior. If I just run it without the wrappers, there is no problem:
def myfun_withoutwrapper():
engine = get_engine()
Session = sessionmaker(bind=engine)
with Session() as session, session.begin():
# some query
Thanks!
I test the number of connections before and after executing myfun()
with the following code:
# test number of connections before and after
engine = get_engine()
count_sql = """SELECT COUNT(*) FROM pg_stat_activity;"""
with engine.connect() as conn:
pre_count_df = pd.read_sql(count_sql, conn)
# run myfun()
_ = myfun()
with engine.connect() as conn:
post_count_df = pd.read_sql(count_sql, conn)
What I get is a difference of 1, i.e. after running myfun()
I have one connection more, which shouldn't be the case. (As a proof, I ran the same counting test without calling myfun()
, and I got the same number of connections. So the difference of 1 definitely comes from myfun()
, regardless how many times I run it.)
I use Python 3.12.8, sqlalchemy 2.0.38, with a postgres 16 database.
Share Improve this question edited Feb 27 at 12:49 cafce25 27.5k5 gold badges44 silver badges56 bronze badges asked Feb 24 at 10:53 duepiertduepiert 835 bronze badges 6 | Show 1 more comment1 Answer
Reset to default 0Briefly looking at prefect I would say you need to create a unique session in each "task"
but you might need to be careful if they are long running. The engine and session maker can be defined at the module level or really just defined anywhere but only once. I like to setup mine via a single function call so I can pass in dynamic configuration from a file or os environ variable but that is not necessary.
Prefect example untested
def get_sessionmaker(engine):
Session = sessionmaker(bind=engine)
return Session
def get_engine(settings):
engine = sqlalchemy.create_engine(settings['db_url'])
#...
return engine
# thread safe and can be declared globally
engine = get_engine(settings)
# thread safe and can be declared globally
Session = get_sessionmaker(engine)
@task
def process_result(crawled_url):
with Session.begin() as session:
q = select(Page).where(Page.processing_state == 'PROCESSING', Page.url==crawled_url).limit(1)
q = q.with_for_update(skip_locked=True).limit(1)
page = session.scalars(q).first()
if page:
# bit of work
page.processing_state = 'COMPLETE'
# bit more work and implicit commit after or rollback if exception
# session is closed implicitly and connection is returned to pool
# This could be repeated every minute or something or maybe you can chain them to restart after they end?
@flow
def process_page_results():
# pull data set urls out and
# then feed them into parallel tasks for processing
with Session.begin() as session:
# Process 50 at a time
# assumes another flow is filling the database up with new data
q = select(Page).where(Page.processing_state == 'NEW').limit(50)
# Lock before changing state flag, make sure results are sorted
# deterministically to prevent a lockup.
q = q.with_for_update().order_by(Page.id)
# Extract urls and get out of the session asap.
urls = []
for page in session.scalars(q).all()
page.processing_state = 'PROCESSING'
urls.append(page.url)
# Implicit commit releases row locks and returns connection
# Dispatch parallel tasks, now that the session has ended
for url in urls:
process_result(url)
if __name__ == "__main__":
process_page_results()
Streamlit example untested
Streamlit on the other hand is going a lot of crazy stuff but you might try this:
# session factory is THREAD SAFE, probably not serializable
@st.cache_resource
def get_sessionmaker_and_engine(settings) -> sessionmaker:
engine = get_engine(settings)
Session = sessionmaker(bind=engine)
return (engine, Session)
# engine is THREAD SAFE, probably not serializable
@st.cache_resource
def get_engine(settings):
engine = sqlalchemy.create_engine(settings['db_url'])
#...
return engine
# THIS IS THE REGULAR CACHE but a dict should serialize just fine.
@st.cache_data
def get_settings():
return {'db_url': os.environ['DB_URL']}
def on_every_request():
# Settings should be deterministic to get same maker and
# engine back on repeated calls.
settings = get_settings()
# I do this weird tuple return because I'm not sure if
# the engine could be hashed for the
# session maker
engine, Session = get_sessionmaker_and_engine(settings)
# The session instance is NOT thread safe
# so must be setup and tore down on each run.
with Session.begin() as session:
# bit of work
sessionmit()
# bit of work
sessionmit()
# bit of work and implicit commit after or rollback if exception
# session is closed implicitly and connection is returned to pool
https://docs.streamlit.io/develop/concepts/architecture/caching#examples-1
本文标签: pythonUsing sessionmakerwhy do I get too many connectionsStack Overflow
版权声明:本文标题:python - Using sessionmaker, why do I get too many connections? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741276178a2369739.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
get_engine
as well. – Ian Wilson Commented Feb 25 at 18:25get_engine
, though a bit shortened for simplicity. Essentially, we have one module that manages databases and how to access them. This is whereget_engine
andget_session
are defined. The module is imported by diverse streamlit-based utility webapps as well as our prefect-based ETL process. What would be the best way to define and use the functions in these contexts? – duepiert Commented Feb 26 at 12:46streamlit
appears to have native support for connecting to databases, is there a reason you did not want to use the builtin connection? – Ian Wilson Commented Feb 26 at 23:37psycopg2
whereas not using the streamlit connector allows us to use the third version ofpsycopg
. – duepiert Commented Feb 27 at 8:50get_session
from multiple threads? – defalt Commented Feb 27 at 14:51