admin管理员组文章数量:1399912
I need to connect to several different databases and then perform a number of queries on each one. I first tried to use asyncio until I realize that JDBC doesn't have async calls, and wrapping the database call in an async function doesn't help. Though I did try a solution that involved asyncio loops and tasks but that was becoming convoluted. It seems best to avoid asyncio unless it is truly an async call, not something hacked together.
My research led me to concurrent.futures ThreadPoolExecutor. It seemed like the best solution because
- a) I don't have to manage the threads myself and
- b) Threads are good for IO operations, such as waiting for a database response (so I've read).
I will use the executor to connect to 100ish databases. Putting them all in the thread pool is significantly faster than waiting for each connection to finish before starting the next.
And within each database connection, I use another executor to perform 5 queries. Each query can take up to 5 minutes depending on dB size. Again, putting them all in a pool is faster. And once I'm connected to a database, I can immediately start querying it while the other databases are still connecting.
The code below gave me the output I expected, multiple connectDb()
and performQuery()
executing at the same time, instead of sequentially. There are numerous solutions on stackoverflow and reddit about how to do this, so I'm unsure if using ThreadPoolExecutor is ideal. If there is an even more efficient way to do this, please share.
import time
import concurrent.futures
def performQuery(x, i):
print(f'starting query {i} in db {x}')
time.sleep(1)
print(f'completed query {i} in db {x}')
def connectDb(x):
print('connecting to', x)
time.sleep(2)
print('connected to', x)
with concurrent.futures.ThreadPoolExecutor() as pool:
for i in range(4):
performQuery(x, i)
def main():
with concurrent.futures.ThreadPoolExecutor() as pool:
dbs = ['A','B','C','D','E','F','G']
for db in dbs:
pool.submit(connectDb, db)
main()
I need to connect to several different databases and then perform a number of queries on each one. I first tried to use asyncio until I realize that JDBC doesn't have async calls, and wrapping the database call in an async function doesn't help. Though I did try a solution that involved asyncio loops and tasks but that was becoming convoluted. It seems best to avoid asyncio unless it is truly an async call, not something hacked together.
My research led me to concurrent.futures ThreadPoolExecutor. It seemed like the best solution because
- a) I don't have to manage the threads myself and
- b) Threads are good for IO operations, such as waiting for a database response (so I've read).
I will use the executor to connect to 100ish databases. Putting them all in the thread pool is significantly faster than waiting for each connection to finish before starting the next.
And within each database connection, I use another executor to perform 5 queries. Each query can take up to 5 minutes depending on dB size. Again, putting them all in a pool is faster. And once I'm connected to a database, I can immediately start querying it while the other databases are still connecting.
The code below gave me the output I expected, multiple connectDb()
and performQuery()
executing at the same time, instead of sequentially. There are numerous solutions on stackoverflow and reddit about how to do this, so I'm unsure if using ThreadPoolExecutor is ideal. If there is an even more efficient way to do this, please share.
import time
import concurrent.futures
def performQuery(x, i):
print(f'starting query {i} in db {x}')
time.sleep(1)
print(f'completed query {i} in db {x}')
def connectDb(x):
print('connecting to', x)
time.sleep(2)
print('connected to', x)
with concurrent.futures.ThreadPoolExecutor() as pool:
for i in range(4):
performQuery(x, i)
def main():
with concurrent.futures.ThreadPoolExecutor() as pool:
dbs = ['A','B','C','D','E','F','G']
for db in dbs:
pool.submit(connectDb, db)
main()
Share
Improve this question
edited Mar 26 at 11:59
Bert
asked Mar 26 at 4:08
BertBert
113 bronze badges
2
- The construction of ThreadPoolExecutor within connectDb doesn't make any sense (to me) – Adon Bilivit Commented Mar 26 at 8:27
- @AdonBilivit each database connection needs to perform x queries. I could perform the queries one by one, but each one could take up to 5mins depending on dB size. Will edit post for clarification. – Bert Commented Mar 26 at 11:50
1 Answer
Reset to default 1Looking at your function connectDB
I am guessing that your intention is to run all 4 queries concurrently. If so, you need to be sure that that the database connection can be used concurrently on multiple threads. I am highly doubtful that this is the case. So I would prefer that each query obtain its own connection. In the code below I have renamed your functions to make the names conform to PEP 8 conventions and I have used more meaningful argument names:
import time
import concurrent.futures
def perform_query(db, i):
connection = connect_db(db)
print(f'starting query {i} in db {db}')
time.sleep(1)
print(f'completed query {i} in db {db}')
return f'db {db} result {i}'
def connect_db(db):
"""Return a connection to database db."""
print('connecting to', db)
time.sleep(2)
print('connected to', db)
return db # pretend this is the actual connection
def main():
dbs = ['A','B','C','D','E','F','G']
queries = list(range(4))
NUMBER_DBS = len(dbs)
QUERIES_PER_DB = len(queries)
# Enough threads to run all the queries concurrently:
POOL_SIZE = NUMBER_DBS * QUERIES_PER_DB
with concurrent.futures.ThreadPoolExecutor(POOL_SIZE) as pool:
futures = []
for db in dbs:
for query in queries:
futures.append(pool.submit(perform_query, db, query))
# Collect all the results before printing any of them
# so that the results are printed together:
results = [future.result() for future in futures]
for result in results:
print(result)
if __name__ == '__main__':
main()
If your intention was not to run all 4 queries for a given database concurrently, then:
import time
import concurrent.futures
def perform_query(db, connecion, i):
print(f'starting query {i} in db {db}')
time.sleep(1)
print(f'completed query {i} in db {db}')
return f'db {db} result {i}'
def connect_db(db):
print('connecting to', db)
time.sleep(2)
connection = db # We will pretend this is a connection
print('connected to', db)
return [perform_query(db, connection, i) for i in range(4)]
def main():
dbs = ['A','B','C','D','E','F','G']
queries = list(range(4))
NUMBER_DBS = len(dbs)
# Enough threads to run all the queries concurrently:
POOL_SIZE = NUMBER_DBS
with concurrent.futures.ThreadPoolExecutor(POOL_SIZE) as pool:
futures = []
for db in dbs:
futures.append(pool.submit(connect_db, db))
# Collect all the results before printing any of them
# so that the results are printed together:
results = [future.result() for future in futures]
for result in results:
print(result)
if __name__ == '__main__':
main()
If a single connection could be used concurrently in multiple threads, then this is how I would do it. Note that the pool instance created by main
is being passed to connect_db
rather than having connect_db
create a new pool. Also, for variety I am using the map
method instead of submit
:
import time
import concurrent.futures
from functools import partial
def perform_query(x, i):
print(f'starting query {i} in db {x}')
time.sleep(1)
print(f'completed query {i} in db {x}')
return f'db {x} result {i}'
def connect_db(pool, x):
print('connecting to', x)
time.sleep(2)
print('connected to', x)
worker = partial(perform_query, x)
return list(pool.map(worker, range(4)))
def main():
dbs = ['A','B','C','D','E','F','G']
NUMBER_DBS = len(dbs)
QUERIES_PER_DB = 4
# Enough threads to run all the queries concurrently:
POOL_SIZE = NUMBER_DBS * QUERIES_PER_DB
with concurrent.futures.ThreadPoolExecutor(POOL_SIZE) as pool:
worker = partial(connect_db, pool)
results = list(pool.map(worker, dbs))
for result in results:
print(result)
if __name__ == '__main__':
main()
本文标签: Is the Python ThreadPoolExecutor best used for synchronous database callsStack Overflow
版权声明:本文标题:Is the Python ThreadPoolExecutor best used for synchronous database calls? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744166113a2593555.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论