#Streaming approach with FastAPI

1 messages · Page 1 of 1 (latest)

winged hatch
#

Autogen setup

class CustomProxyAgent(UserProxyAgent):
    def generate_init_message(self, **context) -> str | Dict:
        return json.dumps(
            {
                "role": "user",
                "content": context["message"],
                "user_id": context["user_id"],
            }
        )

math_assistant = AssistantAgent(
    ...
)

general_assistant = AssistantAgent(
    ...
)
user_proxy = CustomProxyAgent(
    "user_proxy",
    human_input_mode="NEVER",
    code_execution_config=False,
    default_auto_reply=None,
)
group_chat = GroupChat(
    agents=[math_assistant, user_proxy, general_assistant],
    messages=[],
)
referee = GroupChatManager(name="referee", groupchat=group_chat, llm_config=llm_config)
def handle_messages(recipient, messages, sender, config):
    connection_manager = get_connection_manager()
    json_message = json.loads(messages[-1].get("content"))
    if "callback" in config and config["callback"] is not None:
        callback = config["callback"]
        callback(recipient, messages[-1], sender)
    connection_manager.user_messages[json_message.get("user_id")].append(
        messages[-1].get("content")
    )
    return False, None


referee.register_reply(
    [Agent, None],
    reply_func=handle_messages,
    config={"callback": None},
)

user_proxy.register_reply(
    [Agent, None],
    reply_func=handle_messages,
    config={"callback": None},
)

math_assistant.register_reply(
    [Agent, None],
    reply_func=handle_messages,
    config={"callback": None},
)

# IM THE ENTRYPOINT
async def test_response_from_autogen(user_id: str, message: str):
    logger.info(f"Starting chat for user {user_id}")
    await user_proxy.a_initiate_chat(referee, message=message, user_id=user_id)
#

API Route

@app.get("/sse-connect/{user_id}")
async def events(user_id):
    connection_manager = get_connection_manager()
    connection_manager.connect(user_id)

    async def event_generator(user_id, connection_manager):
        while True:
            try:
                if connection_manager.user_messages[user_id]:
                    new_message = connection_manager.user_messages[user_id].pop(0)
                    yield f"data: {new_message}\n\n"
                else:
                    yield "data: ping\n\n"
                    await asyncio.sleep(5)
            except asyncio.CancelledError:
                connection_manager.disconnect(user_id)
                logger.info(f"{user_id} disconnected")
                break

    return StreamingResponse(
        event_generator(user_id, connection_manager), media_type="text/event-stream"
    )

@app.post("/agen_stream")
async def chat(
    message: AgenTest,
):
    await test_response_from_autogen(message.user_id, message.content)
    return {"status": "recieved"}
#

Lastly, my connection manager...

import asyncio


class ConnectionManager:
    def __init__(self, user_conditions={}, user_messages={}):
        self.user_conditions = user_conditions
        self.user_messages = user_messages

    def connect(self, user_id: str):
        self.user_conditions[user_id] = asyncio.Condition()
        self.user_messages[user_id] = []

    def disconnect(self, user_id: str):
        del self.user_conditions[user_id]
        del self.user_messages[user_id]


connection_manager = ConnectionManager()


def get_connection_manager():
    return connection_manager
#

Any input or guidance would be greatly appreciated.

winged hatch
#

This is resolved, if there is a way to mark it. Ended up being my implementation in test_response_from_autogen

tired spade
#

hey @winged hatch could you please share use case? how can I send message and recieve?

olive wolf
#

HI @winged hatch - Did you manage to get this done? I am very interested in this approach, because we already have a setup like this. It would be difficult and time consuming to change the entire existing backend/frontend to websockets.

#

Does it actually stream the response from Agent (and hence the underlying LLM) as it comes, or it simply streams it once the Agent's response have all arrived?