admin管理员组文章数量:1122846
I havee the following setup:
def get_user_token(request: Request, x_user_token: str = Header(None)):
return x_user_token
def get_storage(token: str = Depends(get_user_token)):
supabase.auth._headers.update({"Authorization": f"Bearer {token}"})
return supabase.storage
class Service:
def __init__(self, db=Depends(get_db), storage=Depends(get_storage)):
self.db = db
self.storage = storage
def create_service(cls, token: str = None):
db = get_db(token=token) if token else get_db()
storage = get_storage(token=token) if token else get_storage()
return cls(db=db, storage=storage)
class FileService(Service):
def upload(self, file_name: str, file_bytes: bytes):
self.storage.from_(config.bucket).upload(
f"{file_name.__str__()}.csv",
file_bytes,
)
I also have this FastAPI endpoint:
@router.post("/endpoint")
def create_test(
name: str,
file_service: FileService = Depends(FileService),
):
file_service.upload(
file_name=name, file_bytes=...
)
I'm expecting it to resolve the Depends(get_user_token)
. For some reason it throws error, which I believe is because the token is a Depends
.
storage3.utils.StorageException: {'statusCode': 400, 'error': 'Unauthorized', 'message': 'jwt malformed'}
Is it because I'm using Depends "outside" FastAPI?
I havee the following setup:
def get_user_token(request: Request, x_user_token: str = Header(None)):
return x_user_token
def get_storage(token: str = Depends(get_user_token)):
supabase.auth._headers.update({"Authorization": f"Bearer {token}"})
return supabase.storage
class Service:
def __init__(self, db=Depends(get_db), storage=Depends(get_storage)):
self.db = db
self.storage = storage
def create_service(cls, token: str = None):
db = get_db(token=token) if token else get_db()
storage = get_storage(token=token) if token else get_storage()
return cls(db=db, storage=storage)
class FileService(Service):
def upload(self, file_name: str, file_bytes: bytes):
self.storage.from_(config.bucket).upload(
f"{file_name.__str__()}.csv",
file_bytes,
)
I also have this FastAPI endpoint:
@router.post("/endpoint")
def create_test(
name: str,
file_service: FileService = Depends(FileService),
):
file_service.upload(
file_name=name, file_bytes=...
)
I'm expecting it to resolve the Depends(get_user_token)
. For some reason it throws error, which I believe is because the token is a Depends
.
storage3.utils.StorageException: {'statusCode': 400, 'error': 'Unauthorized', 'message': 'jwt malformed'}
Is it because I'm using Depends "outside" FastAPI?
Share Improve this question edited Dec 24, 2024 at 13:13 Chris 33.3k9 gold badges97 silver badges213 bronze badges asked Nov 22, 2024 at 19:25 ScriptKiddieOnAComputerScriptKiddieOnAComputer 98413 silver badges20 bronze badges 1- Have you tried printing out the token? That should tell you immediately if the issue is the dependency injection, or something else. – M.O. Commented Nov 23, 2024 at 7:16
1 Answer
Reset to default 0surely you can Depends in function get_user_token, but you should get the header by Request
def get_user_token(request: Request):
x_user_token = request.headers.get('x_user_token')
return return
本文标签: dependency injectionSupabase storage auth with FastAPI DependsStack Overflow
版权声明:本文标题:dependency injection - Supabase storage auth with FastAPI Depends - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736301283a1931123.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论