admin管理员组

文章数量:1399912

I need to connect to several different databases and then perform a number of queries on each one. I first tried to use asyncio until I realize that JDBC doesn't have async calls, and wrapping the database call in an async function doesn't help. Though I did try a solution that involved asyncio loops and tasks but that was becoming convoluted. It seems best to avoid asyncio unless it is truly an async call, not something hacked together.

My research led me to concurrent.futures ThreadPoolExecutor. It seemed like the best solution because

  • a) I don't have to manage the threads myself and
  • b) Threads are good for IO operations, such as waiting for a database response (so I've read).

I will use the executor to connect to 100ish databases. Putting them all in the thread pool is significantly faster than waiting for each connection to finish before starting the next.

And within each database connection, I use another executor to perform 5 queries. Each query can take up to 5 minutes depending on dB size. Again, putting them all in a pool is faster. And once I'm connected to a database, I can immediately start querying it while the other databases are still connecting.

The code below gave me the output I expected, multiple connectDb() and performQuery() executing at the same time, instead of sequentially. There are numerous solutions on stackoverflow and reddit about how to do this, so I'm unsure if using ThreadPoolExecutor is ideal. If there is an even more efficient way to do this, please share.

import time
import concurrent.futures

def performQuery(x, i):
    print(f'starting query {i} in db {x}')
    time.sleep(1)
    print(f'completed query {i} in db {x}')

def connectDb(x):
    print('connecting to', x)
    time.sleep(2)
    print('connected to', x)
    with concurrent.futures.ThreadPoolExecutor() as pool:
        for i in range(4):
            performQuery(x, i)

def main():
    with concurrent.futures.ThreadPoolExecutor() as pool:
        dbs = ['A','B','C','D','E','F','G']
        for db in dbs:
            pool.submit(connectDb, db)

main()

I need to connect to several different databases and then perform a number of queries on each one. I first tried to use asyncio until I realize that JDBC doesn't have async calls, and wrapping the database call in an async function doesn't help. Though I did try a solution that involved asyncio loops and tasks but that was becoming convoluted. It seems best to avoid asyncio unless it is truly an async call, not something hacked together.

My research led me to concurrent.futures ThreadPoolExecutor. It seemed like the best solution because

  • a) I don't have to manage the threads myself and
  • b) Threads are good for IO operations, such as waiting for a database response (so I've read).

I will use the executor to connect to 100ish databases. Putting them all in the thread pool is significantly faster than waiting for each connection to finish before starting the next.

And within each database connection, I use another executor to perform 5 queries. Each query can take up to 5 minutes depending on dB size. Again, putting them all in a pool is faster. And once I'm connected to a database, I can immediately start querying it while the other databases are still connecting.

The code below gave me the output I expected, multiple connectDb() and performQuery() executing at the same time, instead of sequentially. There are numerous solutions on stackoverflow and reddit about how to do this, so I'm unsure if using ThreadPoolExecutor is ideal. If there is an even more efficient way to do this, please share.

import time
import concurrent.futures

def performQuery(x, i):
    print(f'starting query {i} in db {x}')
    time.sleep(1)
    print(f'completed query {i} in db {x}')

def connectDb(x):
    print('connecting to', x)
    time.sleep(2)
    print('connected to', x)
    with concurrent.futures.ThreadPoolExecutor() as pool:
        for i in range(4):
            performQuery(x, i)

def main():
    with concurrent.futures.ThreadPoolExecutor() as pool:
        dbs = ['A','B','C','D','E','F','G']
        for db in dbs:
            pool.submit(connectDb, db)

main()
Share Improve this question edited Mar 26 at 11:59 Bert asked Mar 26 at 4:08 BertBert 113 bronze badges 2
  • The construction of ThreadPoolExecutor within connectDb doesn't make any sense (to me) – Adon Bilivit Commented Mar 26 at 8:27
  • @AdonBilivit each database connection needs to perform x queries. I could perform the queries one by one, but each one could take up to 5mins depending on dB size. Will edit post for clarification. – Bert Commented Mar 26 at 11:50
Add a comment  | 

1 Answer 1

Reset to default 1

Looking at your function connectDB I am guessing that your intention is to run all 4 queries concurrently. If so, you need to be sure that that the database connection can be used concurrently on multiple threads. I am highly doubtful that this is the case. So I would prefer that each query obtain its own connection. In the code below I have renamed your functions to make the names conform to PEP 8 conventions and I have used more meaningful argument names:

import time
import concurrent.futures

def perform_query(db, i):
    connection = connect_db(db)
    print(f'starting query {i} in db {db}')
    time.sleep(1)
    print(f'completed query {i} in db {db}')
    return f'db {db} result {i}'

def connect_db(db):
    """Return a connection to database db."""

    print('connecting to', db)
    time.sleep(2)
    print('connected to', db)
    return db  # pretend this is the actual connection

def main():
    dbs = ['A','B','C','D','E','F','G']
    queries = list(range(4))

    NUMBER_DBS = len(dbs)
    QUERIES_PER_DB = len(queries)
    # Enough threads to run all the queries concurrently:
    POOL_SIZE = NUMBER_DBS * QUERIES_PER_DB

    with concurrent.futures.ThreadPoolExecutor(POOL_SIZE) as pool:
        futures = []
        for db in dbs:
            for query in queries:
                futures.append(pool.submit(perform_query, db, query))
        # Collect all the results before printing any of them
        # so that the results are printed together:
        results = [future.result() for future in futures]
        for result in results:
            print(result)

if __name__ == '__main__':
    main()

If your intention was not to run all 4 queries for a given database concurrently, then:

import time
import concurrent.futures

def perform_query(db, connecion, i):
    print(f'starting query {i} in db {db}')
    time.sleep(1)
    print(f'completed query {i} in db {db}')
    return f'db {db} result {i}'

def connect_db(db):
    print('connecting to', db)
    time.sleep(2)
    connection = db  #  We will pretend this is a connection
    print('connected to', db)
    return [perform_query(db, connection, i) for i in range(4)]

def main():
    dbs = ['A','B','C','D','E','F','G']
    queries = list(range(4))

    NUMBER_DBS = len(dbs)
    # Enough threads to run all the queries concurrently:
    POOL_SIZE = NUMBER_DBS

    with concurrent.futures.ThreadPoolExecutor(POOL_SIZE) as pool:
        futures = []
        for db in dbs:
            futures.append(pool.submit(connect_db, db))
        # Collect all the results before printing any of them
        # so that the results are printed together:
        results = [future.result() for future in futures]
        for result in results:
            print(result)

if __name__ == '__main__':
    main()

If a single connection could be used concurrently in multiple threads, then this is how I would do it. Note that the pool instance created by main is being passed to connect_db rather than having connect_db create a new pool. Also, for variety I am using the map method instead of submit:

import time
import concurrent.futures
from functools import partial

def perform_query(x, i):
    print(f'starting query {i} in db {x}')
    time.sleep(1)
    print(f'completed query {i} in db {x}')
    return f'db {x} result {i}'

def connect_db(pool, x):
    print('connecting to', x)
    time.sleep(2)
    print('connected to', x)
    worker = partial(perform_query, x)
    return list(pool.map(worker, range(4)))

def main():
    dbs = ['A','B','C','D','E','F','G']

    NUMBER_DBS = len(dbs)
    QUERIES_PER_DB = 4
    # Enough threads to run all the queries concurrently:
    POOL_SIZE = NUMBER_DBS * QUERIES_PER_DB

    with concurrent.futures.ThreadPoolExecutor(POOL_SIZE) as pool:
        worker = partial(connect_db, pool)
        results = list(pool.map(worker, dbs))
        for result in results:
            print(result)

if __name__ == '__main__':
    main()

本文标签: Is the Python ThreadPoolExecutor best used for synchronous database callsStack Overflow