admin管理员组文章数量:1356472
Please see my code block below. In order to run it, I do the following. When I run the following code, things print as expected, and the process exits as expected.
stream = await client.create_completion(...)
stream.subscribe(print) # works perfectly
All the print
commands run as expected. Everything looks great, as long as I am doing things in a reactive way.
However, when I do the following, it hangs indefinitely:
stream = await client.create_completion(...)
stream.pipe(ops.to_iterable()).run()` # hangs :(
Why is this? What am I doing wrong?
Thanks in advance for your help.
from openai import AsyncOpenAI, AsyncStream
from openai.types.chat import ChatCompletionChunk
from openai.types.chat.chat_completion_chunk import Choice
import reactivex as rx
from app.ai.bridge.chat.chat_completion_types import ChatRequest, ToolConfig
from app.ai.bridge.chat.drivers.openai_chat_driver_reactive_mappers import (
map_message_to_openai,
map_toolconfig_to_openai,
)
import asyncio
class OpenaiClientReactive:
def __init__(self, openai_client: AsyncOpenAI) -> None:
self.api = openai_client
async def create_completion(
self, chat_request: ChatRequest, tool_config: ToolConfig | None = None
) -> rx.Observable[Choice]:
stream: rx.subject.ReplaySubject[Choice] = rx.subject.ReplaySubject()
async def do_stream() -> None:
async_stream: AsyncStream[ChatCompletionChunk] = await self._stream_openai(
chat_request, tool_config
)
max_variants_expected = chat_request.options.num_variants
num_indexes_completed = 0
async for chunk in async_stream:
for choice in chunk.choices:
if choice.finish_reason:
num_indexes_completed += 1
stream.on_next(choice)
if max_variants_expected == num_indexes_completed:
# If all indexes are complete, we can complete the stream
print("All indexes complete")
break
await async_stream.close()
stream.on_completed()
asyncio.create_task(do_stream())
return stream
async def _stream_openai(
self,
chat_request: ChatRequest,
tool_config: ToolConfig | None = None,
) -> AsyncStream[ChatCompletionChunk]:
mapped_messages = mapped_messages = [
map_message_to_openai(message) for message in chat_request.context.messages
]
if tool_config:
return await self.api.chatpletions.create(
# TODO: Move this to database driven configuration, since it's an LLM.
model="gpt-3.5-turbo",
messages=mapped_messages,
stream=True,
n=chat_request.options.num_variants,
tools=map_toolconfig_to_openai(tool_config),
)
else:
return await self.api.chatpletions.create(
# TODO: Move this to database driven configuration, since it's an LLM.
model="gpt-3.5-turbo",
messages=mapped_messages,
stream=True,
n=chat_request.options.num_variants,
)
本文标签: Python Reactivex and OpenAIStream hangsStack Overflow
版权声明:本文标题:Python Reactivex and OpenAI -- Stream hangs? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1743999672a2573625.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论