admin管理员组

文章数量:1122846

I'm building a backend API using FastAPI and SQLAlchemy. The goal is to:

Fetch contacts from a mobile device (using a frontend or app-based method). Send the contacts to the backend. Store the contacts in a database where each user's contacts are saved in a separate table, dynamically created based on the user's ID.

Workflow Frontend:

Ask for access to the user's contacts (e.g., using a popup). Fetch the contacts from the mobile device. Send the contacts to the backend API along with the user ID. Backend:

Dynamically create a table for storing contacts, specific to the user's ID (e.g., contacts_user_<user_id>). Insert the contacts into the newly created table. Ensure duplicate contacts are not created if the table already exists.

Here’s my current attempt:

Dynamically Create a Table

from sqlalchemy import Table, Column, Integer, String, MetaData
metadata = MetaData()

class UserCreateService:
    def create_user_contact_table(self, user_id: int):
        """
        Dynamically create a table for storing contacts for a specific user.
        """
        table_name = f"contacts_user_{user_id}"
        return Table(
            table_name,
            metadata,
            Column("id", Integer, primary_key=True, autoincrement=True),
            Column("name", String, nullable=True),
            Column("phone_number", String, nullable=False),
            Column("email", String, nullable=True),
        )

Save Contacts Endpoint

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

@auth_router.post("/save_contacts")
async def save_contacts_dynamic(
    request: SaveContactsRequest,
    db: AsyncSession = Depends(get_db)
):
    """
    Save fetched contacts to a dynamically created table based on the user's ID.
    """
    response = copy.deepcopy(api_response)
    try:
        # Dynamically create the table for the user
        contact_table = UserCreateService().create_user_contact_table(request.user_id)

        # Use a ThreadPoolExecutor to execute the table creation synchronously
        def create_table():
            sync_engine = create_engine(str(db.bind.url))  # Create a sync engine
            with sync_engine.connect() as connection:
                contact_table.create(bind=connection, checkfirst=True)

        executor = ThreadPoolExecutor(max_workers=1)
        await asyncio.get_event_loop().run_in_executor(executor, create_table)

        # Insert contacts into the dynamic table
        contacts_to_insert = [
            {"name": contact.name, "phone_number": contact.phone_number, "email": contact.email}
            for contact in request.contacts
        ]
        await db.execute(contact_table.insert().values(contacts_to_insert))
        await dbmit()

        response.update({"data": [], "status": True, "message": "Contacts saved successfully!"})
    except Exception as e:
        logger.exception(msg=f"Input: {str(locals())}\nOutput: {str(response)} \nException : {str(e)}", exc_info=True)
        error = ExceptionHandling(e=str(e), function_name="api: save_contacts").exception_handling()
        response.update({"error": error, "status": False, "message": "We are having an issue for this request, please try again."})

        return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=response)

    return JSONResponse(status_code=status.HTTP_200_OK, content=response)



In this code I am geting error like

{
  "data": {},
  "error": {
    "class-method": "api: save_contacts",
    "exception_msg": "greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place? (Background on this error at: )"
  },
  "message": "We are having an issue for this request, please try again.",
  "status": false
}

本文标签: