admin管理员组文章数量:1292140
I'm building a multi-tenant FastAPI application that uses PostgreSQL schemas to separate tenant data. I have a middleware that extracts an X-Tenant-ID
header, looks up the tenant's schema, and then switches the current schema for the database session accordingly. For a single request (via Postman) the middleware works fine; however, when sending multiple requests concurrently, I sometimes get errors such as:
- Undefined Table
- Table relationship not found
It appears that the DB connection is closing prematurely or reverting to the public schema too soon, so tenant-specific tables are not found.
Below are the relevant code snippets:
Middleware (SchemaSwitchMiddleware
)
from typing import Optional, Callable
from fastapi import Request, Response
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from app.db.session import SessionLocal, switch_schema
from app.repositories.tenant_repository import TenantRepository
from app.core.logger import logger
from contextvars import ContextVar
current_schema: ContextVar[str] = ContextVar("current_schema", default="public")
class SchemaSwitchMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: Callable) -> Response:
"""
Middleware to dynamically switch the schema based on the `X-Tenant-ID` header.
If no header is present, defaults to `public` schema.
"""
db = SessionLocal() # Create a session here
try:
tenant_id: Optional[str] = request.headers.get("X-Tenant-ID")
if tenant_id:
try:
tenant_repo = TenantRepository(db)
tenant = tenant_repo.get_tenant_by_id(tenant_id)
if tenant:
schema_name = tenant.schema_name
else:
logger.warning("Invalid Tenant ID received in request headers")
return JSONResponse(
{"detail": "Invalid access"},
status_code=400
)
except Exception as e:
logger.error(f"Error fetching tenant: {e}. Defaulting to public schema.")
db.rollback()
schema_name = "public"
else:
schema_name = "public"
current_schema.set(schema_name)
switch_schema(db, schema_name)
request.state.db = db # Store the session in request state
response = await call_next(request)
return response
except Exception as e:
logger.error(f"SchemaSwitchMiddleware error: {str(e)}")
db.rollback()
return JSONResponse({"detail": "Internal Server Error"}, status_code=500)
finally:
switch_schema(db, "public") # Always revert to public
db.close()
Database Session (app/db/session.py)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, declarative_base, Session
from app.core.logger import logger
from app.core.config import settings
# Base for models
Base = declarative_base()
DATABASE_URL = settings.DATABASE_URL
# SQLAlchemy engine
engine = create_engine(
DATABASE_URL,
pool_pre_ping=True,
pool_size=20,
max_overflow=30,
)
# Session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def switch_schema(db: Session, schema_name: str):
"""Helper function to switch the search_path to the desired schema."""
db.execute(text(f"SET search_path TO {schema_name}"))
dbmit()
# logger.debug(f"Switched schema to: {schema_name}")
Example tables
Public Schema: Contains tables like users, roles, tenants, and user_lookup.
Tenant Schema: Contains tables like users, roles, buildings, and floors.
When I test with a single request, everything works fine. However, with concurrent requests, the switching sometimes reverts to the public schema too early, resulting in errors because tenant-specific tables are missing.
Question
- What could be causing the race condition where the connection’s schema gets switched back to public during concurrent requests?
- How can I ensure that each request correctly maintains its tenant schema throughout the request lifecycle without interference from concurrent requests?
- Is there a better approach (such as using middleware or context variables) to avoid this issue?
any help on this is much apricated. Thankyou
I'm building a multi-tenant FastAPI application that uses PostgreSQL schemas to separate tenant data. I have a middleware that extracts an X-Tenant-ID
header, looks up the tenant's schema, and then switches the current schema for the database session accordingly. For a single request (via Postman) the middleware works fine; however, when sending multiple requests concurrently, I sometimes get errors such as:
- Undefined Table
- Table relationship not found
It appears that the DB connection is closing prematurely or reverting to the public schema too soon, so tenant-specific tables are not found.
Below are the relevant code snippets:
Middleware (SchemaSwitchMiddleware
)
from typing import Optional, Callable
from fastapi import Request, Response
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from app.db.session import SessionLocal, switch_schema
from app.repositories.tenant_repository import TenantRepository
from app.core.logger import logger
from contextvars import ContextVar
current_schema: ContextVar[str] = ContextVar("current_schema", default="public")
class SchemaSwitchMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: Callable) -> Response:
"""
Middleware to dynamically switch the schema based on the `X-Tenant-ID` header.
If no header is present, defaults to `public` schema.
"""
db = SessionLocal() # Create a session here
try:
tenant_id: Optional[str] = request.headers.get("X-Tenant-ID")
if tenant_id:
try:
tenant_repo = TenantRepository(db)
tenant = tenant_repo.get_tenant_by_id(tenant_id)
if tenant:
schema_name = tenant.schema_name
else:
logger.warning("Invalid Tenant ID received in request headers")
return JSONResponse(
{"detail": "Invalid access"},
status_code=400
)
except Exception as e:
logger.error(f"Error fetching tenant: {e}. Defaulting to public schema.")
db.rollback()
schema_name = "public"
else:
schema_name = "public"
current_schema.set(schema_name)
switch_schema(db, schema_name)
request.state.db = db # Store the session in request state
response = await call_next(request)
return response
except Exception as e:
logger.error(f"SchemaSwitchMiddleware error: {str(e)}")
db.rollback()
return JSONResponse({"detail": "Internal Server Error"}, status_code=500)
finally:
switch_schema(db, "public") # Always revert to public
db.close()
Database Session (app/db/session.py)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, declarative_base, Session
from app.core.logger import logger
from app.core.config import settings
# Base for models
Base = declarative_base()
DATABASE_URL = settings.DATABASE_URL
# SQLAlchemy engine
engine = create_engine(
DATABASE_URL,
pool_pre_ping=True,
pool_size=20,
max_overflow=30,
)
# Session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def switch_schema(db: Session, schema_name: str):
"""Helper function to switch the search_path to the desired schema."""
db.execute(text(f"SET search_path TO {schema_name}"))
dbmit()
# logger.debug(f"Switched schema to: {schema_name}")
Example tables
Public Schema: Contains tables like users, roles, tenants, and user_lookup.
Tenant Schema: Contains tables like users, roles, buildings, and floors.
When I test with a single request, everything works fine. However, with concurrent requests, the switching sometimes reverts to the public schema too early, resulting in errors because tenant-specific tables are missing.
Question
- What could be causing the race condition where the connection’s schema gets switched back to public during concurrent requests?
- How can I ensure that each request correctly maintains its tenant schema throughout the request lifecycle without interference from concurrent requests?
- Is there a better approach (such as using middleware or context variables) to avoid this issue?
any help on this is much apricated. Thankyou
Share asked Feb 13 at 10:47 dineshh912dineshh912 1701 silver badge13 bronze badges 1- You never switch the schema to the public schema before querying for tenant information - so I'm guessing that if a connection is cached in a pool, the wrong schema will be used as you've switched it over from what the pool knows about - you probably want to explicitly set it before querying for tenant information. – MatsLindh Commented Feb 13 at 13:05
2 Answers
Reset to default 1I also implemented a multi-tenant FastAPI application using PostgreSQL via schemas.
In my case, I avoided using middleware because the database session (obtained from SessionLocal) and its state need to be isolated per request.
When using middleware, the connection (and its state) from the connection pool may be reused across requests. Even though ContextVar is designed to be isolated in asynchronous contexts, the actual database connection can still be shared, leading to race conditions. For example, if one request changes the schema and the connection is then reused for another request, that new request might unexpectedly start with the wrong schema (like reverting to "public").
Instead, I handle the tenant schema switching in a dependency (using Depends). This way, each request gets its own session, and we can safely set the schema for that specific request without affecting others.
Below is an example implementation using a synchronous SQLAlchemy Session
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, declarative_base, Session
from app.core.logger import logger
from app.core.config import settings
from typing import Annotated
from fastapi import Header
# Base for models
Base = declarative_base()
DATABASE_URL = settings.DATABASE_URL
# SQLAlchemy engine
engine = create_engine(
DATABASE_URL,
pool_pre_ping=True,
pool_size=20,
max_overflow=30,
)
# Session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# TODO: use this get_db_session function in path operation.
def get_db_session(tenant_id: Annotated[str, Header(alias="X-Tenant-ID")]) -> Generator[Session, None, None]:
session = SessionLocal()
try:
# TODO: Implement tenant_id to tenant_schema here
session.execute(text(f"SET search_path TO {tenant_id};"))
sessionmit() # Ensure the schema change is applied immediately
yield session
finally:
session.close()
I find it too confusing to implement it that way. Either that session in connection pool is cached or not, the easiest way is to create a dedicated connection pool to each schema (public, tenant, marketing, ...).
Also, because this is ASGI middleware, it never guarantee by that way. Could you try with new asgiref format: async def __call__(scope, receive, send)
and use this format to test; but still switching the search path like that is not a recommended way unless you are using the ORM way which have no knowledge of PostgreSQL schema
本文标签:
版权声明:本文标题:python - FastAPI Middleware for Postgres Multi-Tenant Schema Switching Causes Race Conditions with Concurrent Requests - Stack O 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741547656a2384689.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论