class EmiSimulatorWebServer:
def __init__(...):
self._channels = ChannelsPlugin(backend=MemoryChannelsBackend(history=1), channels=[t.short for t in targets])
self._litestar = Litestar(
[self.handler],
static_files_config=[StaticFilesConfig(path="/static", directories=[self.get_static_path()])],
plugins=[self._channels],
)
# where bind address is ('0.0.0.0', 5555)
self._server = EmbeddedServer(EmbeddedConfig(self._litestar, *self._bind_address))
@websocket("/api/v1/ws")
async def handler(self, socket: WebSocket, channels: ChannelsPlugin) -> None:
if socket.client is None:
self.logger.info("Not accepting web socket connection with `None` client")
return
self.logger.info(f"Accepting web socket connection from {socket.client}")
all_channels = [t.short for t in self.targets]
await socket.accept()
async with channels.subscribe(all_channels) as subscriber:
await channels.put_subscriber_history(subscriber, all_channels, limit=1)
async with subscriber.run_in_background(socket.send_text):
self.logger.info(f"Sent BACnet target status to {socket.client}; waiting for events..")
received = {"type": "websocket.connect"}
while received["type"] != "websocket.disconnect":
received: WebSocketReceiveMessage = await socket.receive()
self.logger.info(f"Received BACnet event from {socket.client} of {received['text']}")
if received["type"] == "websocket.receive":
await self._handle_bacnet_event(socket.client, received["text"])
I can't seem get a connection when connecting to ws://127.0.0.1:5555/api/v1/ws Am I missing something?