admin管理员组

文章数量:1122846

I havee the following setup:

def get_user_token(request: Request, x_user_token: str = Header(None)):
    return x_user_token

def get_storage(token: str = Depends(get_user_token)):
    supabase.auth._headers.update({"Authorization": f"Bearer {token}"})
    return supabase.storage

class Service:
    def __init__(self, db=Depends(get_db), storage=Depends(get_storage)):
        self.db = db
        self.storage = storage

    def create_service(cls, token: str = None):
        db = get_db(token=token) if token else get_db()
        storage = get_storage(token=token) if token else get_storage()
        return cls(db=db, storage=storage)

class FileService(Service):
    def upload(self, file_name: str, file_bytes: bytes):
        self.storage.from_(config.bucket).upload(
            f"{file_name.__str__()}.csv",
            file_bytes,
        )

I also have this FastAPI endpoint:

@router.post("/endpoint")
def create_test(
    name: str,
    file_service: FileService = Depends(FileService),
):
 
    file_service.upload(
        file_name=name, file_bytes=...
    )

I'm expecting it to resolve the Depends(get_user_token). For some reason it throws error, which I believe is because the token is a Depends.

storage3.utils.StorageException: {'statusCode': 400, 'error': 'Unauthorized', 'message': 'jwt malformed'}

Is it because I'm using Depends "outside" FastAPI?

I havee the following setup:

def get_user_token(request: Request, x_user_token: str = Header(None)):
    return x_user_token

def get_storage(token: str = Depends(get_user_token)):
    supabase.auth._headers.update({"Authorization": f"Bearer {token}"})
    return supabase.storage

class Service:
    def __init__(self, db=Depends(get_db), storage=Depends(get_storage)):
        self.db = db
        self.storage = storage

    def create_service(cls, token: str = None):
        db = get_db(token=token) if token else get_db()
        storage = get_storage(token=token) if token else get_storage()
        return cls(db=db, storage=storage)

class FileService(Service):
    def upload(self, file_name: str, file_bytes: bytes):
        self.storage.from_(config.bucket).upload(
            f"{file_name.__str__()}.csv",
            file_bytes,
        )

I also have this FastAPI endpoint:

@router.post("/endpoint")
def create_test(
    name: str,
    file_service: FileService = Depends(FileService),
):
 
    file_service.upload(
        file_name=name, file_bytes=...
    )

I'm expecting it to resolve the Depends(get_user_token). For some reason it throws error, which I believe is because the token is a Depends.

storage3.utils.StorageException: {'statusCode': 400, 'error': 'Unauthorized', 'message': 'jwt malformed'}

Is it because I'm using Depends "outside" FastAPI?

Share Improve this question edited Dec 24, 2024 at 13:13 Chris 33.3k9 gold badges97 silver badges213 bronze badges asked Nov 22, 2024 at 19:25 ScriptKiddieOnAComputerScriptKiddieOnAComputer 98413 silver badges20 bronze badges 1
  • Have you tried printing out the token? That should tell you immediately if the issue is the dependency injection, or something else. – M.O. Commented Nov 23, 2024 at 7:16
Add a comment  | 

1 Answer 1

Reset to default 0

surely you can Depends in function get_user_token, but you should get the header by Request

def get_user_token(request: Request):
    x_user_token = request.headers.get('x_user_token')
    return return 

本文标签: dependency injectionSupabase storage auth with FastAPI DependsStack Overflow