admin管理员组

文章数量:1356472

Please see my code block below. In order to run it, I do the following. When I run the following code, things print as expected, and the process exits as expected.

stream = await client.create_completion(...)
stream.subscribe(print) # works perfectly

All the print commands run as expected. Everything looks great, as long as I am doing things in a reactive way.

However, when I do the following, it hangs indefinitely:

stream = await client.create_completion(...)
stream.pipe(ops.to_iterable()).run()` # hangs :(

Why is this? What am I doing wrong?

Thanks in advance for your help.

from openai import AsyncOpenAI, AsyncStream
from openai.types.chat import ChatCompletionChunk
from openai.types.chat.chat_completion_chunk import Choice
import reactivex as rx

from app.ai.bridge.chat.chat_completion_types import ChatRequest, ToolConfig

from app.ai.bridge.chat.drivers.openai_chat_driver_reactive_mappers import (
    map_message_to_openai,
    map_toolconfig_to_openai,
)

import asyncio


class OpenaiClientReactive:
    def __init__(self, openai_client: AsyncOpenAI) -> None:
        self.api = openai_client

    async def create_completion(
        self, chat_request: ChatRequest, tool_config: ToolConfig | None = None
    ) -> rx.Observable[Choice]:
        stream: rx.subject.ReplaySubject[Choice] = rx.subject.ReplaySubject()

        async def do_stream() -> None:
            async_stream: AsyncStream[ChatCompletionChunk] = await self._stream_openai(
                chat_request, tool_config
            )

            max_variants_expected = chat_request.options.num_variants
            num_indexes_completed = 0

            async for chunk in async_stream:
                for choice in chunk.choices:
                    if choice.finish_reason:
                        num_indexes_completed += 1
                    stream.on_next(choice)

                if max_variants_expected == num_indexes_completed:
                    # If all indexes are complete, we can complete the stream
                    print("All indexes complete")
                    break

            await async_stream.close()
            stream.on_completed()

        asyncio.create_task(do_stream())

        return stream

    async def _stream_openai(
        self,
        chat_request: ChatRequest,
        tool_config: ToolConfig | None = None,
    ) -> AsyncStream[ChatCompletionChunk]:
        mapped_messages = mapped_messages = [
            map_message_to_openai(message) for message in chat_request.context.messages
        ]
        if tool_config:
            return await self.api.chatpletions.create(
                # TODO: Move this to database driven configuration, since it's an LLM.
                model="gpt-3.5-turbo",
                messages=mapped_messages,
                stream=True,
                n=chat_request.options.num_variants,
                tools=map_toolconfig_to_openai(tool_config),
            )
        else:
            return await self.api.chatpletions.create(
                # TODO: Move this to database driven configuration, since it's an LLM.
                model="gpt-3.5-turbo",
                messages=mapped_messages,
                stream=True,
                n=chat_request.options.num_variants,
            )

本文标签: Python Reactivex and OpenAIStream hangsStack Overflow