#Help optimizing local TTS/STT Voice Assistant with OpenClaw (High Latency / Websocket Auth)

1 messages · Page 1 of 1 (latest)

dry silo
#

Hey! I'm building a local voice assistant tying together faster_whisper (STT), OpenClaw (LLM), and piper (TTS) via a Python script.

The Current State (Working but Slow):
I have a 100% functional pipeline. Currently, my Python script sends the user's transcribed audio to OpenClaw using the CLI:
subprocess.run(['openclaw', 'agent', '--agent', 'main', '--message', text, '--json'])

The Problem:
While it works perfectly, the latency is around 15 seconds per turn. Profiling shows that the CLI triggers an embedded runner (a cold boot loading all context) rather than using my already hot Gateway. The LLM generation alone takes ~11s, and we have to wait for the entire JSON payload to finish before passing it to the TTS engine.

What we tried:
To achieve real-time streaming (so the TTS can speak while the LLM generates), we tried connecting directly to the Gateway WebSocket (ws://127.0.0.1:18789). However, we couldn't complete the connect.challenge / nonce security handshake. We also tried openclaw proxy to intercept the TUI auth flow, but couldn't easily extract the payload.

My Questions:

What is the recommended way to interact with a hot OpenClaw gateway via a local Python script with streaming support?

Is there a documented WebSocket auth flow for external scripts, a local REST API, or a specific CLI flag that forces the agent command to use the hot gateway (and stream) instead of the embedded runner?
wild gobletBOT
hollow ginkgoBOT
#

To help others find answers, you can mark your question as solved via Right click solution message -> Apps -> ✅ Mark Solution

dry silo
#

Here is my current working (but slow) Python implementation for context:
Python

#!/usr/bin/env python3
import time, json, subprocess, sys
import numpy as np
import sounddevice as sd
from faster_whisper import WhisperModel

PIPER_EXE = "/path/to/piper"
PIPER_MODEL = "/path/to/pt_PT-tugao-medium.onnx"
WHISPER_MODEL_SIZE = "base"
RECORD_SECONDS = 6

print("\n🤖 Assistant: Models ready. Listening...", flush=True)
try:
whisper = WhisperModel(WHISPER_MODEL_SIZE, device="cpu", compute_type="int8")
except Exception as e:
print(f"❌ Error: {e}", flush=True); sys.exit(1)

def speak(text):
t_start = time.time()
clean_text = text.replace('"', '').replace('\n', ' ').replace('*', '')
cmd = f'echo "{clean_text}" | {PIPER_EXE} --model {PIPER_MODEL} --output_raw | paplay --raw --rate=22050 --channels=1 --format=s16le'
subprocess.run(cmd, shell=True)
print(f"⏱️ [TTS Time]: {time.time() - t_start:.2f} seconds\n", flush=True)

def send_via_cli(text):
try:
t_start = time.time()
print("⏳ Processing...", flush=True)
cmd = ['openclaw', 'agent', '--agent', 'main', '--message', text, '--json']
res = subprocess.run(cmd, capture_output=True, text=True)
print(f"⏱️ [OpenClaw Time]: {time.time() - t_start:.2f} seconds", flush=True)

    if res.returncode != 0: return "Connection error."
    try:
        data = json.loads(res.stdout)
        meta = data.get("result", {}).get("meta", {})
        if meta.get("finalAssistantVisibleText"): return meta["finalAssistantVisibleText"]
        payloads = data.get("result", {}).get("payloads", [])
        if payloads and "text" in payloads[0]: return payloads[0]["text"]
        return "Text not found."
    except: return "Invalid JSON."
except: return "Local error."

while True:

#

(continuation)
try:
input("\n[ PRESS ENTER TO SPEAK ]")
print(f"🎤 Listening ({RECORD_SECONDS}s)...", flush=True)
t_rec_start = time.time()
audio = sd.rec(int(RECORD_SECONDS * 16000), samplerate=16000, channels=1, dtype='float32')
sd.wait()

    t_stt_start = time.time()
    segments, _ = whisper.transcribe(np.squeeze(audio), language="pt")
    user_text = " ".join([s.text for s in segments]).strip()
    
    if user_text:
        print(f"👤 You: {user_text}", flush=True)
        response = send_via_cli(user_text)
        print(f"🤖 Assistant: {response}", flush=True)
        speak(response)
except KeyboardInterrupt:
    break
velvet egretBOT
#

Short version, yes, there’s now a better path than openclaw agent --json for this.

  1. openclaw agent already prefers the hot Gateway on current builds

    • --local is what forces embedded.
    • If the Gateway request fails, the CLI falls back to embedded.
    • So if you keep seeing “cold boot” behavior, either:
      • you’re on an older version, or
      • the Gateway call is failing and the CLI is falling back.
  2. There is not currently a CLI flag that turns openclaw agent into a token-streaming stdout client

    • --json gives you the final payload, not streaming deltas.
  3. Recommended path for a local Python voice assistant: use the Gateway HTTP API with SSE

    • Enable one of these:
    {
    
#
     gateway: {
       http: {
         endpoints: {
           responses: { enabled: true },
           chatCompletions: { enabled: true },
         },
       },
     },
   }
  • Then call:
    • POST http://127.0.0.1:18789/v1/responses (newer, more agent-native)
    • or POST http://127.0.0.1:18789/v1/chat/completions (easiest with existing OpenAI Python clients)
  1. Auth
    • If your gateway uses token/password auth, send:
#
  • Authorization: Bearer <gateway token or password>
    • If gateway.auth.mode="none" on loopback/private ingress, no auth header is needed.
  1. Session continuity
    • Use user: "my-voice-assistant" or x-openclaw-session-key so turns reuse the same session.

If you want the easiest Python path, I’d start with the OpenAI-compatible endpoint:

from openai import OpenAI

client = OpenAI(
    base_url="http://127.0.0.1:18789/v1",
    api_key=GATEWAY_TOKEN,
)
#
stream = client.chat.completions.create(
    model="openclaw/default",
    messages=[{"role": "user", "content": text}],
    stream=True,
)

for chunk in stream:
    delta = chunk.choices[0].delta.content or ""
    if delta:
        print(delta, end="", flush=True)
        # feed into your TTS chunker here

On the WebSocket question, yes, the WS auth flow is documented, but it’s lower-level than just echoing the nonce back. You need the connect.challenge nonce plus a signed device payload in the connect request. For an external Python script, I would avoid WS unless you specifically need raw control-plane events.

Docs:

#

Yep, with your current script I’d stop fighting the WebSocket layer and swap send_via_cli() for HTTP SSE against the Gateway.

Why this is the sweet spot for your setup:

  • uses the hot Gateway
  • gives you streaming deltas
  • no connect.challenge / signed-device WS handshake
  • much easier to drive from Python

1. Enable the endpoint

In config:

{
  gateway: {
#
import json
import re
import sys
import queue
import threading
import requests

GATEWAY_URL = "http://127.0.0.1:18789/v1/responses"
GATEWAY_TOKEN = "YOUR_GATEWAY_TOKEN"

tts_queue = queue.Queue()

def tts_worker():
    while True:
        text = tts_queue.get()
#
        if text is None:
            break
        speak(text)
        tts_queue.task_done()

threading.Thread(target=tts_worker, daemon=True).start()

def pop_speakable_chunk(buf: str):
    m = re.search(r"(.+?[.!?;:])(\s|$)", buf, re.S)
    if not m:
        return None, buf
    chunk = m.group(1).strip()
    rest = buf[m.end():].lstrip()
    return chunk, rest
#
def send_via_gateway_stream(text):
    headers = {
        "Authorization": f"Bearer {GATEWAY_TOKEN}",
        "Content-Type": "application/json",
        "x-openclaw-agent-id": "main",
    }

    payload = {
        "model": "openclaw",
        "input": text,
        "stream": True,
        "user": "local-voice-assistant",
    }

    full_text = ""
#
    tts_buffer = ""
    event_type = None

    with requests.post(GATEWAY_URL, headers=headers, json=payload, stream=True) as r:
        r.raise_for_status()

        for line in r.iter_lines(decode_unicode=True):
            if line == "":
                event_type = None
                continue

            if line.startswith("event: "):
                event_type = line[7:]
                continue
#
            if not line.startswith("data: "):
                continue

            data = line[6:]
            if data == "[DONE]":
                break

            obj = json.loads(data)

            if event_type == "response.output_text.delta":
                delta = obj.get("delta", "")
                if not delta:
                    continue

                full_text += delta
#
                tts_buffer += delta

                print(delta, end="", flush=True)

                while True:
                    chunk, tts_buffer = pop_speakable_chunk(tts_buffer)
                    if not chunk:
                        break
                    tts_queue.put(chunk)

    if tts_buffer.strip():
        tts_queue.put(tts_buffer.strip())

    print()
    return full_text
#

3. Why this works better than your current CLI path

On current builds, openclaw agent is Gateway-first already, and only falls back to embedded if the Gateway request fails.

So if you’re consistently seeing embedded behavior, one of these is true:

  • your version is older
  • the Gateway call is failing, and the CLI is silently falling back
  • or you’re just blocked by the fact that CLI --json waits for the final result, so TTS can’t start early

That last point alone makes the HTTP SSE route much better for voice.

Docs for the CLI behavior: https://docs.openclaw.ai/cli/agent


#

4. On the WebSocket auth flow

Your WS attempt got stuck because it’s not just “echo the nonce back”.

The documented WS flow is:

  • receive connect.challenge
  • send connect
  • include auth
  • include a signed device payload tied to that nonce

So yes, it’s documented, but it’s the lower-level control-plane protocol, not the friendly integration path for a Python assistant.

Docs: https://docs.openclaw.ai/gateway/protocol


#

5. Small practical note

For Piper, sentence-level buffering like above is a lot nicer than speaking every tiny delta chunk.

If you want, I can turn your full script into a drop-in streaming version next.

#

Nice, thanks, this makes it much easier to map onto your real loop.

For your exact script, the key change is:

  • remove send_via_cli()
  • remove the final speak(response)
  • add a TTS worker thread + streamed Gateway call

Here’s the smallest practical drop-in version.

#!/usr/bin/env python3
import time, json, subprocess, sys, re, queue, threading
import numpy as np
import sounddevice as sd
import requests
#
from faster_whisper import WhisperModel

PIPER_EXE = "/path/to/piper"
PIPER_MODEL = "/path/to/pt_PT-tugao-medium.onnx"
WHISPER_MODEL_SIZE = "base"
RECORD_SECONDS = 6

GATEWAY_URL = "http://127.0.0.1:18789/v1/responses"
GATEWAY_TOKEN = "YOUR_GATEWAY_TOKEN"

tts_queue = queue.Queue()

print("\n🤖 Assistant: Models ready. Listening...", flush=True)
try:
    whisper = WhisperModel(WHISPER_MODEL_SIZE, device="cpu", compute_type="int8")
#
except Exception as e:
    print(f"❌ Error: {e}", flush=True)
    sys.exit(1)

def speak(text):
    t_start = time.time()
    clean_text = text.replace('"', "").replace("\n", " ").replace("*", "").strip()
    if not clean_text:
        return
    cmd = f'echo "{clean_text}" | {PIPER_EXE} --model {PIPER_MODEL} --output_raw | paplay --raw --rate=22050 --channels=1 --format=s16le'
    subprocess.run(cmd, shell=True)
    print(f"\n⏱️ [TTS chunk time]: {time.time() - t_start:.2f}s", flush=True)

def tts_worker():
    while True:
#
        chunk = tts_queue.get()
        if chunk is None:
            break
        try:
            speak(chunk)
        finally:
            tts_queue.task_done()

threading.Thread(target=tts_worker, daemon=True).start()

def pop_speakable_chunk(buf: str):
    m = re.search(r"(.+?[.!?;:])(\s|$)", buf, re.S)
    if not m:
        return None, buf
    chunk = m.group(1).strip()
#
    rest = buf[m.end():].lstrip()
    return chunk, rest

def send_via_gateway_stream(text):
    headers = {
        "Authorization": f"Bearer {GATEWAY_TOKEN}",
        "Content-Type": "application/json",
        "x-openclaw-agent-id": "main",
    }

    payload = {
        "model": "openclaw",
        "input": text,
        "stream": True,
        "user": "local-voice-assistant",
#
    }

    full_text = ""
    tts_buffer = ""
    event_type = None

    t_start = time.time()
    print("⏳ Processing...", flush=True)

    with requests.post(GATEWAY_URL, headers=headers, json=payload, stream=True) as r:
        r.raise_for_status()

        for line in r.iter_lines(decode_unicode=True):
            if line == "":
                event_type = None
#
                continue

            if line.startswith("event: "):
                event_type = line[7:]
                continue

            if not line.startswith("data: "):
                continue

            data = line[6:]
            if data == "[DONE]":
                break

            obj = json.loads(data)
#
            if event_type == "response.output_text.delta":
                delta = obj.get("delta", "")
                if not delta:
                    continue

                full_text += delta
                tts_buffer += delta

                print(delta, end="", flush=True)

                while True:
                    chunk, tts_buffer = pop_speakable_chunk(tts_buffer)
                    if not chunk:
                        break
                    tts_queue.put(chunk)
#

    if tts_buffer.strip():
        tts_queue.put(tts_buffer.strip())

    print(f"\n⏱️ [OpenClaw stream time]: {time.time() - t_start:.2f}s", flush=True)
    return full_text.strip()

while True:
    try:
        input("\n[ PRESS ENTER TO SPEAK ]")
        print(f"🎤 Listening ({RECORD_SECONDS}s)...", flush=True)

        audio = sd.rec(int(RECORD_SECONDS * 16000), samplerate=16000, channels=1, dtype="float32")
        sd.wait()
#
        segments, _ = whisper.transcribe(np.squeeze(audio), language="pt")
        user_text = " ".join([s.text for s in segments]).strip()

        if user_text:
            print(f"👤 You: {user_text}", flush=True)
            response = send_via_gateway_stream(user_text)
            print(f"🤖 Assistant (final): {response}", flush=True)

            # Wait until all spoken chunks finish before next turn
            tts_queue.join()

    except KeyboardInterrupt:
        break

Important behavior change

#

Do not call:

speak(response)

at the end anymore, because the worker thread is already speaking the streamed chunks.

Two practical notes

  1. You need gateway.http.endpoints.responses.enabled: true
  2. Piper usually sounds better with sentence chunks, not token-by-token chunks, which is why I buffered until punctuation.

If you want, I can also give you a safer speak() version without shell=True, since that current pipe will eventually bite you on quoting.

dry silo
#

Following previous advice, we successfully ditched the CLI and WebSockets. We opened the Gateway HTTP responses endpoint, restarted the systemd service, and our Python script is now successfully using HTTP SSE (POST http://127.0.0.1:18789/v1/responses) to stream chunks to our local TTS engine.

The Current Problem (19-second delay):
While the streaming itself works perfectly, the Time To First Byte (TTFB) is almost 20 seconds.

Running openclaw status shows that the TUI has a hot session (agent:main:main) with ~38k context tokens and 83% cached (which replies in ~3 seconds in the TUI). However, when we hit the HTTP endpoint via Python, the Gateway seems to ignore the cached session and does a massive "cold boot" (loading all 19 skills, injecting 38k tokens from scratch) before it generates the first streamed word.

What we tried in the Python Payload/Headers:
We tried adding "user": "local-voice-assistant" (as suggested) and later tried forcing the session by adding "x-openclaw-session-key": "agent:main:main" to the HTTP headers, but the 15-20 second cold-boot delay remains before the stream starts.

The Question:
What is the exact HTTP Header or JSON Payload parameter required to force the /v1/responses endpoint to attach to and reuse the hot, heavily-cached main session that the TUI uses, so we can get the TTFB down to 2-3 seconds?

here is thje pastebin with what we are using (sorry its in portuguese but im sure you can manage)

https://pastebin.com/2i5QFsgs

velvet egretBOT
#

Short version: there is no extra magic header beyond the one you already found.

The correct routing knob is:

x-openclaw-session-key: agent:main:main

Docs: https://docs.openclaw.ai/gateway/openresponses-http-api

A few important nuances here:

  • user only helps derive a stable auto session when you do not set x-openclaw-session-key
  • previous_response_id helps continue an OpenResponses thread, but it does not “attach to a hotter TUI session” than the explicit session key
  • /v1/responses still runs a normal agent turn, so seeing skills/context get rebuilt is expected. The session key reuses the session/transcript, not some hidden live prefilled prompt blob

So if you are still seeing 19s TTFB with:

#
x-openclaw-agent-id: main
x-openclaw-session-key: agent:main:main

then the problem is probably not “missing header”, it is more likely one of these:

  1. provider prompt cache miss

    • different model / auth profile / thinking settings than the TUI turn
    • changing request shape in a way that busts the cached prefix
  2. status is showing recovered cache stats

  3. synthetic channel context mismatch

    • if you want to get as close as possible to UI behavior, also try:
#
   x-openclaw-message-channel: webchat

So I’d test this exact request shape:

headers = {
    "Authorization": f"Bearer {GATEWAY_TOKEN}",
    "Content-Type": "application/json",
    "x-openclaw-agent-id": "main",
    "x-openclaw-session-key": "agent:main:main",
    "x-openclaw-message-channel": "webchat",
}

payload = {
#
    "model": "openclaw",
    "input": text,
    "stream": True,
}

If that is still slow, I’m pretty confident this is a cache-hit/parity issue, not a routing issue.

At that point, please compare:

  • openclaw --version
  • one fast TUI turn
  • one slow /v1/responses turn
  • same model
  • same session key
  • gateway logs / usage showing whether cacheRead happened
#

If you want, paste those and I’ll help you narrow down whether it’s:

  • a real regression in /v1/responses
  • a model/provider cache behavior difference
  • or a session override mismatch.
dry silo
#

Update: We just tested the suggestions regarding the session key and channel, and we also optimized the Python script to use a persistent requests.Session() with keep-alive and the x-openclaw-intent: chat header to minimize overhead.

The result:
The TTFT improved slightly but is still stuck at 10.12s, while the TUI consistently hits 4s for the exact same prompt and session.

Since we are now using x-openclaw-intent: chat and the webchat channel, we expected the agent to bypass any heavy planning phases and hit the same cache as the TUI. It seems that even with a persistent connection and the correct session mapping, there is an extra 6-second delay inherent to the HTTP Gateway path compared to the internal TUI path.

Here is the updated (safe) version of the function we are currently using for these tests:.

https://pastebin.com/2i5QFsgs

velvet egretBOT
#

I don’t think this is an “extra hidden HTTP header” problem anymore.

A couple of important findings:

That matters because the native TUI path is Gateway chat.send, and that path can carry per-turn settings like thinking. So “same prompt + same session” is not necessarily the same execution path.

So at this point:

  1. x-openclaw-session-key: agent:main:main is still the correct session-routing header
  2. x-openclaw-message-channel: webchat is the right channel hint
  3. There is not another known /v1/responses flag that says “attach to the already-hot TUI run state”

In other words, if TUI is 4s and /v1/responses is 10s, I’d look at native path differences, not transport overhead:

#
  • TUI thinking / reasoning mode
  • session-level thinkingLevel / reasoningLevel
  • model / auth profile parity
  • provider cache hit vs miss

Also, keep-alive is good, but it should save milliseconds, not 6 seconds. So this is almost certainly not raw HTTP overhead.

My recommendation

If you need true TUI parity from Python, use the native Gateway WebSocket chat.send path, not the OpenResponses compat layer.

The WS auth/pairing flow is documented here:

#

Before chasing this further

Please post:

openclaw --version

and, if possible:

  • the TUI thinking mode
  • the exact model in that session
  • one fast TUI run vs one slow /v1/responses run

If you want, I can help map your Python client from /v1/responses to native WS chat.send next. That’s probably the cleanest way to get the last few seconds back.

dry silo
#

That makes total sense! It perfectly explains the 6-second discrepancy. We suspected there was a fundamental difference in the execution path between the compat REST API and the native TUI.

Here is the requested info:

Version: OpenClaw 2026.4.15 (041266a)

Model: anthropic/claude-haiku-4-5

TUI Thinking Mode: Default (we haven't explicitly changed it).

We would absolutely love your help mapping the Python client to the native WS chat.send.

When we originally tried the WebSocket approach, we got blocked right at the authentication step (we couldn't figure out the exact crypto/hashing required to sign the device payload for the connect.challenge nonce).

If you could share a minimal Python snippet showing how to correctly complete that WS handshake and stream a chat.send request, that would be the holy grail for us! We will drop it right into our pipeline. Thank you so much for the deep dive!

#

@velvet egret

velvet egretBOT
#

Yep, and the holy-grail bit is: it’s not HMAC.

It’s an Ed25519 signature over a literal pipe-joined UTF-8 payload:

v3|deviceId|clientId|clientMode|role|scopesCsv|signedAtMs|token|nonce|platform|deviceFamily

For your case, the easiest path is:

  • use the normal gateway token in auth.token
  • include that same token in the signed v3 payload
  • connect over WS
  • call native chat.send
  • stream event: "chat" frames

Docs:

#
from pathlib import Path

import websockets
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.serialization import (
    Encoding,
    NoEncryption,
    PrivateFormat,
    PublicFormat,
)

GATEWAY_URL = "ws://127.0.0.1:18789"
GATEWAY_TOKEN = os.environ["OPENCLAW_GATEWAY_TOKEN"]
SESSION_KEY = "agent:main:main"
#
IDENTITY_PATH = Path.home() / ".openclaw-python-device.json"


def b64u(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")


def extract_text(message) -> str:
    if not isinstance(message, dict):
        return ""
    out = []
    for part in message.get("content", []):
        if isinstance(part, dict) and part.get("type") == "text":
            out.append(part.get("text", ""))
    return "".join(out)
#


def load_or_create_identity():
    if IDENTITY_PATH.exists():
        obj = json.loads(IDENTITY_PATH.read_text())
        private_key = serialization.load_pem_private_key(
            obj["privateKeyPem"].encode("utf-8"),
            password=None,
        )
        return obj["deviceId"], obj["publicKey"], private_key

    private_key = Ed25519PrivateKey.generate()
    public_key_raw = private_key.public_key().public_bytes(
        encoding=Encoding.Raw,
        format=PublicFormat.Raw,
#
    )
    public_key_b64u = b64u(public_key_raw)
    device_id = hashlib.sha256(public_key_raw).hexdigest()

    private_key_pem = private_key.private_bytes(
        encoding=Encoding.PEM,
        format=PrivateFormat.PKCS8,
        encryption_algorithm=NoEncryption(),
    ).decode("utf-8")

    payload = {
        "version": 1,
        "deviceId": device_id,
        "publicKey": public_key_b64u,
        "privateKeyPem": private_key_pem,
#
    }
    IDENTITY_PATH.write_text(json.dumps(payload, indent=2))
    try:
        os.chmod(IDENTITY_PATH, 0o600)
    except OSError:
        pass

    return device_id, public_key_b64u, private_key


def build_v3_payload(
    *,
    device_id: str,
    client_id: str,
    client_mode: str,
#
    role: str,
    scopes: list[str],
    signed_at_ms: int,
    token: str,
    nonce: str,
    platform: str,
    device_family: str,
) -> str:
    return "|".join(
        [
            "v3",
            device_id,
            client_id,
            client_mode,
            role,
#
            ",".join(scopes),
            str(signed_at_ms),
            token,
            nonce,
            platform.lower(),
            device_family.lower(),
        ]
    )


async def main():
    device_id, public_key_b64u, private_key = load_or_create_identity()

    client_id = "gateway-client"   # if you want to mirror TUI more closely, try "openclaw-tui"
    client_mode = "ui"
#
    role = "operator"
    scopes = ["operator.read", "operator.write"]

    async with websockets.connect(GATEWAY_URL, max_size=25_000_000) as ws:
        # 1) wait for connect.challenge
        frame = json.loads(await ws.recv())
        assert frame["type"] == "event" and frame["event"] == "connect.challenge", frame
        nonce = frame["payload"]["nonce"].strip()

        # 2) build + sign v3 device payload
        signed_at_ms = int(time.time() * 1000)
        payload = build_v3_payload(
            device_id=device_id,
            client_id=client_id,
            client_mode=client_mode,
#
            role=role,
            scopes=scopes,
            signed_at_ms=signed_at_ms,
            token=GATEWAY_TOKEN,   # if gateway auth mode is password, this should be ""
            nonce=nonce,
            platform=sys.platform,
            device_family="python",
        )
        signature = b64u(private_key.sign(payload.encode("utf-8")))

        # 3) send connect
        connect_req = {
            "type": "req",
            "id": "connect-1",
            "method": "connect",
#
            "params": {
                "minProtocol": 3,
                "maxProtocol": 3,
                "client": {
                    "id": client_id,
                    "displayName": "python-voice",
                    "version": "0.1.0",
                    "platform": sys.platform,
                    "deviceFamily": "python",
                    "mode": client_mode,
                    "instanceId": str(uuid.uuid4()),
                },
                "role": role,
                "scopes": scopes,
                "caps": ["tool-events"],
#
                "auth": {
                    "token": GATEWAY_TOKEN
                },
                "device": {
                    "id": device_id,
                    "publicKey": public_key_b64u,
                    "signature": signature,
                    "signedAt": signed_at_ms,
                    "nonce": nonce,
                },
            },
        }
        await ws.send(json.dumps(connect_req))

        hello = json.loads(await ws.recv())
#
        assert hello["type"] == "res" and hello["ok"] is True, hello

        # 4) native chat.send
        run_id = str(uuid.uuid4())
        send_req = {
            "type": "req",
            "id": "chat-1",
            "method": "chat.send",
            "params": {
                "sessionKey": SESSION_KEY,
                "message": "Olá! Responde numa frase curta.",
                "deliver": False,
                "timeoutMs": 120000,
                "idempotencyKey": run_id,
            },
#
        }
        await ws.send(json.dumps(send_req))

        ack = json.loads(await ws.recv())
        assert ack["type"] == "res" and ack["ok"] is True, ack

        # 5) stream chat events
        last_text = ""
        while True:
            frame = json.loads(await ws.recv())

            if frame.get("type") != "event":
                continue
            if frame.get("event") != "chat":
                continue
#

            payload = frame.get("payload", {})
            if payload.get("runId") != run_id:
                continue
            if payload.get("sessionKey") != SESSION_KEY:
                continue

            state = payload.get("state")

            if state == "delta":
                full_text = extract_text(payload.get("message"))
                if full_text.startswith(last_text):
                    new_text = full_text[len(last_text):]
                else:
                    new_text = full_text
#
                last_text = full_text
                if new_text:
                    print(new_text, end="", flush=True)

            elif state == "final":
                full_text = extract_text(payload.get("message"))
                if full_text and full_text.startswith(last_text):
                    tail = full_text[len(last_text):]
                    if tail:
                        print(tail, end="", flush=True)
                print()
                break

            elif state == "error":
                raise RuntimeError(payload.get("errorMessage", "chat.send failed"))
#


if __name__ == "__main__":
    asyncio.run(main())

A few sharp edges to know up front:

  • Persist the device keypair. If you generate a new one every run, you’ll make pairing/debugging miserable.
  • The publicKey you send is the raw 32-byte Ed25519 public key, base64url without padding, not PEM.
  • deviceId is sha256(raw_public_key).hexdigest()
  • If your gateway uses password auth instead of token auth:
    • auth becomes {"password": "..."}
    • the signed payload’s token field should be empty string
  • chat delta events send the full merged assistant text so far, not just the new token. That’s why the snippet computes new_text.
#

If you get:

  • device nonce mismatch -> wrong/stale nonce
  • device signature invalid -> payload string shape is wrong
  • device id mismatch -> your deviceId does not match the raw public key

If you want, next reply I can turn this into a version that:

  1. reconnects automatically,
  2. feeds chunks directly into your Piper queue, and
  3. aborts the active run when the user starts speaking again.
dry silo
#

If your gateway uses password auth instead of token auth:

auth becomes {"password": "..."}

whats your suggestion on this? im using this for an alexa type home assistant with limited access to the internet that i want to control via voice at home, i dont plan on having anyone else but me (or people in my home) to have access to it so i do care about security but i dont expect other people to try to gain access

velvet egretBOT
#

For your setup, I’d use token auth, not password auth.

Why:

  • the docs explicitly recommend gateway.auth.mode: "token" for most setups
  • your client is a script, so there’s no UX benefit to a human-memorable password
  • a long random token is usually stronger than the kind of password people actually choose
  • it works cleanly for both WS and HTTP clients

Docs:

What I’d do for a home voice assistant:

#
{
  gateway: {
    bind: "loopback",
    auth: {
      mode: "token",
      token: "long-random-secret-here"
    }
  }
}

Even better, keep the secret out of config and put it in ~/.openclaw/.env:

OPENCLAW_GATEWAY_TOKEN=your-long-random-token
#

Then have your Python script read the same env var.

My actual recommendation, in order:

  1. Best: bind: "loopback" + token auth

    • if the voice pipeline runs on the same machine as OpenClaw
  2. Okay: password auth

    • only if you really prefer typing/pasting a password manually
    • still keep it strong
  3. I would avoid: gateway.auth.mode: "none"

    • even at home, that means any local process can talk to the gateway

One extra nuance: the WS device-signature challenge still happens either way.
Token vs password only changes the shared-secret part of auth.

#

So short version: for a local “Alexa at home” build, I’d go with:

  • loopback bind
  • token auth
  • long random token
  • no raw LAN exposure unless you truly need it

If you want, I can give you a clean “recommended home setup” config for:

  • same-machine only, or
  • one assistant box talking to another OpenClaw box on your LAN.
dry silo
#

noted, lets get back to topic. I will try what you already told me assuming it works indeed i will want to implement the features yo u suggested:
If you get:

  • device nonce mismatch -> wrong/stale nonce
  • device signature invalid -> payload string shape is wrong
  • device id mismatch -> your deviceId does not match the raw public key

If you want, next reply I can turn this into a version that:

  1. reconnects automatically,
  2. feeds chunks directly into your Piper queue, and
  3. aborts the active run when the user starts speaking again.

Also i currently am testing things in the terminal simply clicking enter which starts it to listen to me for 6 seconds, setting up a wake word is another critical step, id love to hear yout thoughts on the best way to do that with my current implementation

velvet egretBOT
#

Yep, with your current setup I’d do this in phases.

My recommendation

1. First, replace the fixed 6-second capture

That’s the next big win.

Use:

  • wake word detector for activation
  • VAD for start/stop of actual speech
  • your existing faster_whisper
  • persistent WS chat.send
  • Piper worker queue

So the flow becomes:

#
mic -> wake word -> start capture -> VAD says “speech ended” -> STT -> OpenClaw WS -> Piper

instead of:

press Enter -> record 6 seconds -> hope user finished in time

Best wake word option for your stack

For a Python, local-first, home-assistant setup, I’d start with:

  • openWakeWord for wake word
  • Silero VAD or WebRTC VAD for end-of-speech detection
#

Why:

  • local
  • Python-friendly
  • fits your current architecture well
  • easy to layer on top of your existing microphone loop

If you want the most polished off-the-shelf wake word experience, Porcupine is also very solid, but for your current “local hacker stack” I’d personally try openWakeWord first.

Important implementation detail

Use a small rolling audio buffer (like 0.5 to 1.0s).

That way when the wake word fires, you keep the audio just before and just after detection, so you don’t chop off the first real word.

Example:

#
  • user says: “computer, turn on the lights”
  • wake word detector triggers on “computer”
  • you keep the buffered audio
  • then continue recording until VAD sees silence

What I’d build next

Phase A, practical MVP

  • persistent WS connection
  • chat.send
  • Piper streaming queue
  • VAD-based end-of-speech
  • no wake word yet, just auto-listen on voice activity or keypress

Phase B, real assistant feel

  • add wake word
  • add pre-roll buffer
#
  • add short cooldown after activation
  • play a small earcon/beep when listening starts

Phase C, much nicer UX

  • barge-in
    • if user speaks while Piper is talking:
      • stop Piper
      • send chat.abort
      • start new capture
  • optional “assistant is speaking” ducking / echo suppression

My strong opinion here

For a home assistant, I would not start with full duplex immediately.

Start with:

#
  • wake word
  • capture user
  • assistant replies
  • only then add barge-in

That path is way less painful.

Full duplex gets messy fast because your own TTS can retrigger the wake word or VAD unless you add echo cancellation / playback suppression.

Very practical defaults

I’d start with:

  • wake word always on
  • disable wake detection while Piper is speaking
  • re-enable wake detection after TTS finishes
  • later add true barge-in once the base loop is stable
#

That gives you a reliable MVP much faster.

If I were you

I’d build this exact order:

  1. get WS chat.send working
  2. replace fixed 6s capture with VAD stop
  3. add Piper queue + streaming
  4. add wake word
  5. add barge-in / abort

If you want, next reply I can sketch the actual Python architecture for:

  • mic loop
  • wake-word thread
  • VAD capture
#
  • persistent WS client
  • Piper worker
  • abort-on-barge-in

all as one clean pipeline.

dry silo
#

yeah the issue is that porcupine seems to be paid, only free temporarily, im trying to stay either free (other than the main ai model) and any other features need to be fully free and local or not local but have really high free rates to use

velvet egretBOT
#

Yep, then I’d just skip Porcupine.

For your constraints, my ranking would be:

1. openWakeWord , best fit

  • free
  • local
  • Python-friendly
  • made for exactly this kind of pipeline

Pair it with:

  • WebRTC VAD for cheap/simple
  • or Silero VAD if you want a stronger VAD

2. Mycroft Precise , workable, but I’d try openWakeWord first

  • also free/local
  • more of a DIY feel
#
  • can work, but I think openWakeWord is the nicer first bet for your setup

3. STT-as-wake-word , only as a fallback

Meaning:

  • keep a tiny local ASR model always listening
  • only trigger if it hears your wake phrase

This is fully free/local too, but I like it less because:

  • more CPU
  • usually more latency
  • more false triggers / misses than a real wake-word model

So for your current stack, I’d do:

openWakeWord + WebRTC/Silero VAD + faster_whisper + WS chat.send + Piper
#

That keeps the whole front-end local and free.

My honest recommendation:

  • use openWakeWord
  • disable wake detection while Piper is speaking
  • use a short rolling buffer
  • replace the fixed 6s window with VAD stop

That’s probably the best “free + local + practical” path.

If you want, I can next sketch the exact architecture using only free/local pieces:

  • openWakeWord
  • webrtcvad or silero-vad
  • faster_whisper
  • persistent OpenClaw WS
  • Piper queue
#

in one clean Python design.

dry silo
#

Yes please! The local architecture sketch is exactly what we need

That sounds absolutely perfect. That architecture is exactly the "end goal" we have been dreaming of: 100% local, free, private, and practical.

We would be extremely grateful if you could sketch out that architecture for us!

To confirm the stack we'd be aiming for based on your recommendation:

Wake Word: openWakeWord

VAD: silero-vad (or webrtcvad, whichever you think handles the "stop listening" trigger better in a Python loop)

STT: faster_whisper (we currently have the base model running locally via CPU)

LLM / Brain: Persistent OpenClaw WS (chat.send with the proper authentication/pairing handshake)

TTS: Piper (with the queue logic to mute the mic/wake-detection while speaking)

If you can provide the Python skeleton that glues these pieces together—especially the exact WS handshake and payload structure for chat.send so we bypass that 6s HTTP REST delay—we will implement it right away.

Thank you so much for the deep dive and for taking the time to guide us towards the proper native path!

velvet egretBOT
#

Love this direction. For your exact stack, I’d build it like this.

My pick on VAD

I’d start with webrtcvad, not Silero.

Why:

  • much lighter for an always-on Python loop
  • frame-based, very easy for “stop listening after silence”
  • good enough for a first local assistant

I’d only switch to Silero VAD if:

  • your room is noisy,
  • WebRTC VAD is too twitchy,
  • or it clips utterances too early.
#

So my recommended first stack is:

  • Wake word: openWakeWord
  • VAD: webrtcvad
  • STT: faster_whisper
  • LLM: persistent OpenClaw WS chat.send
  • TTS: piper

Architecture

Mic
 └─> ring buffer (0.5 to 1.0s pre-roll)
      ├─> openWakeWord  (only when idle)
#
      ├─> WebRTC VAD    (for speech start/end)
      └─> barge-in VAD  (later, while speaking)

Wake detected
 └─> start utterance capture using pre-roll

Utterance capture
 └─> keep collecting frames until N ms of silence

Captured audio
 └─> faster_whisper -> text

Recognized text
 └─> persistent OpenClaw WS client -> chat.send
#
Streaming assistant text
 └─> sentence chunker
      └─> Piper queue

While Piper is speaking
 └─> initially disable wake word
 └─> later allow barge-in:
      speech detected -> stop Piper + chat.abort + start new capture

State machine

Keep it dead simple:

#
IDLE
  -> WAKE_DETECTED
  -> RECORDING
  -> THINKING
  -> SPEAKING
  -> IDLE

Later you add:

SPEAKING
  -> BARGE_IN
  -> RECORDING
#

Practical defaults

These are good starting numbers:

  • audio format: 16 kHz mono PCM16
  • frame size: 20 ms
  • pre-roll buffer: 800 ms
  • end-of-speech silence: 600 to 800 ms
  • minimum utterance length: 400 ms
  • wake cooldown: 1.0 s
  • initially disable wake detection while Piper is speaking

That last one matters a lot. It avoids the assistant waking itself up.


#

Python skeleton

This is not meant to be copy-paste complete, but it is the structure I’d use.

import asyncio
import base64
import collections
import hashlib
import json
import os
import queue
import subprocess
import sys
import threading
import time
#
import uuid
from dataclasses import dataclass
from enum import Enum, auto
from pathlib import Path

import numpy as np
import sounddevice as sd
import websockets
import webrtcvad
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.serialization import (
    Encoding, NoEncryption, PrivateFormat, PublicFormat
)
from faster_whisper import WhisperModel
#

# ---------- config ----------

@dataclass
class Config:
    gateway_url: str = "ws://127.0.0.1:18789"
    gateway_token: str = os.environ["OPENCLAW_GATEWAY_TOKEN"]
    session_key: str = "agent:main:main"
    identity_path: Path = Path.home() / ".openclaw-python-device.json"

    sample_rate: int = 16000
    channels: int = 1
    frame_ms: int = 20
    preroll_ms: int = 800
    silence_ms_to_stop: int = 700
#
    min_utterance_ms: int = 400

    whisper_model_size: str = "base"
    piper_exe: str = "/path/to/piper"
    piper_model: str = "/path/to/model.onnx"

CFG = Config()

# ---------- helpers ----------

def b64u(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")

def extract_text(message) -> str:
    if not isinstance(message, dict):
#
        return ""
    out = []
    for part in message.get("content", []):
        if isinstance(part, dict) and part.get("type") == "text":
            out.append(part.get("text", ""))
    return "".join(out)

def pcm16_bytes_from_float32(x: np.ndarray) -> bytes:
    x = np.clip(x, -1.0, 1.0)
    return (x * 32767).astype(np.int16).tobytes()

# ---------- device identity ----------

def load_or_create_identity(path: Path):
    if path.exists():
#
        obj = json.loads(path.read_text())
        private_key = serialization.load_pem_private_key(
            obj["privateKeyPem"].encode("utf-8"),
            password=None,
        )
        return obj["deviceId"], obj["publicKey"], private_key

    private_key = Ed25519PrivateKey.generate()
    public_raw = private_key.public_key().public_bytes(
        encoding=Encoding.Raw,
        format=PublicFormat.Raw,
    )
    device_id = hashlib.sha256(public_raw).hexdigest()
    public_b64u = b64u(public_raw)
    private_pem = private_key.private_bytes(
#
        encoding=Encoding.PEM,
        format=PrivateFormat.PKCS8,
        encryption_algorithm=NoEncryption(),
    ).decode("utf-8")

    obj = {
        "version": 1,
        "deviceId": device_id,
        "publicKey": public_b64u,
        "privateKeyPem": private_pem,
    }
    path.write_text(json.dumps(obj, indent=2))
    try:
        os.chmod(path, 0o600)
    except OSError:
#
        pass

    return device_id, public_b64u, private_key

# ---------- OpenClaw WS client ----------

class OpenClawWSClient:
    def __init__(self, cfg: Config):
        self.cfg = cfg
        self.ws = None
        self.pending = {}
        self.current_run_id = None
        self.last_text_by_run = {}
        self.on_text = None      # callback(chunk_text)
        self.on_final = None     # callback(full_text)
#
        self.on_error = None     # callback(error_text)

    def _build_v3_payload(
        self,
        device_id: str,
        client_id: str,
        client_mode: str,
        role: str,
        scopes: list[str],
        signed_at_ms: int,
        token: str,
        nonce: str,
        platform: str,
        device_family: str,
    ) -> str:
#
        return "|".join([
            "v3",
            device_id,
            client_id,
            client_mode,
            role,
            ",".join(scopes),
            str(signed_at_ms),
            token,
            nonce,
            platform.lower(),
            device_family.lower(),
        ])

    async def connect(self):
#
        self.ws = await websockets.connect(self.cfg.gateway_url, max_size=25_000_000)

        # 1) read connect.challenge
        frame = json.loads(await self.ws.recv())
        assert frame["type"] == "event" and frame["event"] == "connect.challenge", frame
        nonce = frame["payload"]["nonce"].strip()

        # 2) sign device payload
        device_id, public_key_b64u, private_key = load_or_create_identity(self.cfg.identity_path)
        client_id = "gateway-client"
        client_mode = "ui"
        role = "operator"
        scopes = ["operator.read", "operator.write"]
        signed_at_ms = int(time.time() * 1000)
#
        payload = self._build_v3_payload(
            device_id=device_id,
            client_id=client_id,
            client_mode=client_mode,
            role=role,
            scopes=scopes,
            signed_at_ms=signed_at_ms,
            token=self.cfg.gateway_token,
            nonce=nonce,
            platform=sys.platform,
            device_family="python",
        )
        signature = b64u(private_key.sign(payload.encode("utf-8")))

        # 3) connect RPC
#
        connect_req = {
            "type": "req",
            "id": "connect-1",
            "method": "connect",
            "params": {
                "minProtocol": 3,
                "maxProtocol": 3,
                "client": {
                    "id": client_id,
                    "displayName": "python-voice",
                    "version": "0.1.0",
                    "platform": sys.platform,
                    "deviceFamily": "python",
                    "mode": client_mode,
                    "instanceId": str(uuid.uuid4()),
#
                },
                "role": role,
                "scopes": scopes,
                "caps": ["tool-events"],
                "auth": {"token": self.cfg.gateway_token},
                "device": {
                    "id": device_id,
                    "publicKey": public_key_b64u,
                    "signature": signature,
                    "signedAt": signed_at_ms,
                    "nonce": nonce,
                },
            },
        }
        await self.ws.send(json.dumps(connect_req))
#
        hello = json.loads(await self.ws.recv())
        assert hello["type"] == "res" and hello["ok"] is True, hello

        asyncio.create_task(self._recv_loop())

    async def _rpc(self, method, params):
        req_id = str(uuid.uuid4())
        fut = asyncio.get_running_loop().create_future()
        self.pending[req_id] = fut
        await self.ws.send(json.dumps({
            "type": "req",
            "id": req_id,
            "method": method,
            "params": params,
        }))
#
        return await fut

    async def _recv_loop(self):
        async for raw in self.ws:
            frame = json.loads(raw)

            if frame.get("type") == "res":
                req_id = frame.get("id")
                fut = self.pending.pop(req_id, None)
                if fut and not fut.done():
                    if frame.get("ok"):
                        fut.set_result(frame.get("payload"))
                    else:
                        fut.set_exception(RuntimeError(frame))
                continue
#

            if frame.get("type") != "event":
                continue

            event = frame.get("event")
            payload = frame.get("payload", {})

            if event == "chat":
                run_id = payload.get("runId")
                if run_id != self.current_run_id:
                    continue

                state = payload.get("state")

                if state == "delta":
#
                    full_text = extract_text(payload.get("message"))
                    old = self.last_text_by_run.get(run_id, "")
                    new = full_text[len(old):] if full_text.startswith(old) else full_text
                    self.last_text_by_run[run_id] = full_text
                    if new and self.on_text:
                        self.on_text(new)

                elif state == "final":
                    full_text = extract_text(payload.get("message"))
                    if self.on_final:
                        self.on_final(full_text)
                    self.current_run_id = None

                elif state == "error":
                    if self.on_error:
#
                        self.on_error(payload.get("errorMessage", "unknown error"))
                    self.current_run_id = None

    async def send_chat(self, text: str):
        run_id = str(uuid.uuid4())
        self.current_run_id = run_id
        self.last_text_by_run[run_id] = ""
        await self._rpc("chat.send", {
            "sessionKey": self.cfg.session_key,
            "message": text,
            "deliver": False,
            "timeoutMs": 120000,
            "idempotencyKey": run_id,
        })
        return run_id
#

    async def abort_current(self):
        if not self.current_run_id:
            return
        try:
            await self._rpc("chat.abort", {
                "sessionKey": self.cfg.session_key,
                "runId": self.current_run_id,
            })
        finally:
            self.current_run_id = None

# ---------- Piper worker ----------

class PiperWorker(threading.Thread):
#
    def __init__(self, cfg: Config):
        super().__init__(daemon=True)
        self.cfg = cfg
        self.q = queue.Queue()
        self.proc = None
        self.speaking = threading.Event()
        self.stop_flag = threading.Event()

    def speak(self, text: str):
        if text.strip():
            self.q.put(text)

    def stop_now(self):
        self.stop_flag.set()
        if self.proc and self.proc.poll() is None:
#
            self.proc.kill()
        with self.q.mutex:
            self.q.queue.clear()
        self.speaking.clear()

    def run(self):
        while True:
            text = self.q.get()
            self.stop_flag.clear()
            self.speaking.set()
            try:
                self.proc = subprocess.Popen(
                    [self.cfg.piper_exe, "--model", self.cfg.piper_model],
                    stdin=subprocess.PIPE,
                )
#
                self.proc.communicate(input=text.encode("utf-8"))
            finally:
                self.speaking.clear()
                self.proc = None

# ---------- sentence chunker ----------

class SentenceChunker:
    def __init__(self, on_sentence):
        self.buf = ""
        self.on_sentence = on_sentence

    def push(self, text_delta: str):
        self.buf += text_delta
        while True:
#
            cut = max(self.buf.find("."), self.buf.find("?"), self.buf.find("!"))
            if cut == -1:
                break
            sentence = self.buf[:cut+1].strip()
            self.buf = self.buf[cut+1:].lstrip()
            if sentence:
                self.on_sentence(sentence)

    def flush(self):
        tail = self.buf.strip()
        self.buf = ""
        if tail:
            self.on_sentence(tail)

# ---------- assistant state ----------
#

class AssistantState(Enum):
    IDLE = auto()
    RECORDING = auto()
    THINKING = auto()
    SPEAKING = auto()

# ---------- main assistant ----------

class VoiceAssistant:
    def __init__(self, cfg: Config):
        self.cfg = cfg
        self.state = AssistantState.IDLE
        self.loop = asyncio.new_event_loop()
        self.ws = OpenClawWSClient(cfg)
#
        self.tts = PiperWorker(cfg)
        self.whisper = WhisperModel(cfg.whisper_model_size, device="cpu", compute_type="int8")

        self.vad = webrtcvad.Vad(2)
        self.frame_samples = cfg.sample_rate * cfg.frame_ms // 1000
        self.frame_bytes = self.frame_samples * 2  # int16 mono

        self.preroll = collections.deque(
            maxlen=cfg.preroll_ms // cfg.frame_ms
        )
        self.capture_frames = []
        self.last_speech_ts = 0.0

        self.chunker = SentenceChunker(self.tts.speak)
#
        self.ws.on_text = self._on_llm_text
        self.ws.on_final = self._on_llm_final
        self.ws.on_error = self._on_llm_error

    def start(self):
        self.tts.start()
        threading.Thread(target=self._run_async_loop, daemon=True).start()
        self._start_audio_loop()

    def _run_async_loop(self):
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self.ws.connect())
        self.loop.run_forever()

    def _submit_chat(self, text: str):
#
        async def _task():
            await self.ws.send_chat(text)
        asyncio.run_coroutine_threadsafe(_task(), self.loop)

    def _abort_chat(self):
        async def _task():
            await self.ws.abort_current()
        asyncio.run_coroutine_threadsafe(_task(), self.loop)

    def _on_llm_text(self, text_delta: str):
        self.chunker.push(text_delta)
        if self.state != AssistantState.SPEAKING:
            self.state = AssistantState.SPEAKING

    def _on_llm_final(self, full_text: str):
#
        self.chunker.flush()
        # state will fall back to IDLE when Piper finishes in a more polished version

    def _on_llm_error(self, err: str):
        print("LLM error:", err)
        self.state = AssistantState.IDLE

    def _transcribe_and_send(self, audio_pcm16: bytes):
        segments, _ = self.whisper.transcribe(
            np.frombuffer(audio_pcm16, dtype=np.int16).astype(np.float32) / 32768.0,
            language=None,
        )
        text = " ".join(seg.text.strip() for seg in segments).strip()
        if not text:
            self.state = AssistantState.IDLE
#
            return
        print(f"[user] {text}")
        self.state = AssistantState.THINKING
        self._submit_chat(text)

    def _wake_detected(self, frame_float32: np.ndarray) -> bool:
        # TODO: replace with real openWakeWord adapter
        # Example idea:
        # score = self.openwakeword_model.predict(frame_float32)
        # return score["your_wake_word"] > 0.5
        return False

    def _barge_in_detected(self, frame_bytes: bytes) -> bool:
        # Later:
        # only active while Piper is speaking
#
        # optionally require several consecutive speech frames
        return self.vad.is_speech(frame_bytes, self.cfg.sample_rate)

    def _start_audio_loop(self):
        def callback(indata, frames, time_info, status):
            mono = indata[:, 0].copy()
            frame_bytes = pcm16_bytes_from_float32(mono)
            self._handle_audio_frame(mono, frame_bytes)

        with sd.InputStream(
            samplerate=self.cfg.sample_rate,
            channels=1,
            dtype="float32",
            blocksize=self.frame_samples,
            callback=callback,
#
        ):
            print("Listening...")
            while True:
                time.sleep(0.1)

    def _handle_audio_frame(self, frame_float32: np.ndarray, frame_bytes: bytes):
        self.preroll.append(frame_bytes)

        # speaking -> later allow barge-in
        if self.tts.speaking.is_set():
            # initial safe MVP: ignore wake word while speaking
            # later:
            # if self._barge_in_detected(frame_bytes):
            #     self.tts.stop_now()
            #     self._abort_chat()
#
            #     self._begin_capture()
            return

        if self.state == AssistantState.IDLE:
            if self._wake_detected(frame_float32):
                print("[wake]")
                self.state = AssistantState.RECORDING
                self.capture_frames = list(self.preroll)
                self.last_speech_ts = time.monotonic()
            return

        if self.state == AssistantState.RECORDING:
            self.capture_frames.append(frame_bytes)

            if self.vad.is_speech(frame_bytes, self.cfg.sample_rate):
#
                self.last_speech_ts = time.monotonic()

            silence_for = time.monotonic() - self.last_speech_ts
            total_ms = len(self.capture_frames) * self.cfg.frame_ms

            if total_ms >= self.cfg.min_utterance_ms and silence_for * 1000 >= self.cfg.silence_ms_to_stop:
                audio = b"".join(self.capture_frames)
                self.capture_frames = []
                threading.Thread(
                    target=self._transcribe_and_send,
                    args=(audio,),
                    daemon=True,
                ).start()

# ---------- run ----------
#

if __name__ == "__main__":
    VoiceAssistant(CFG).start()

What I would change first from this skeleton

1. Make _wake_detected() real

That is where openWakeWord plugs in.

Conceptually:

#
score = wake_model.predict(audio_chunk)
if score["your_wake_word"] > threshold:
    triggered = True

2. Add a proper Piper stop path

For barge-in you want:

  • tts.stop_now()
  • ws.abort_current()
  • begin recording immediately

3. Return to IDLE cleanly after TTS

Right now the skeleton leaves that a bit loose. In the real version:

  • when final text is flushed
#
  • and Piper queue drains
  • state -> IDLE

My real recommendation for MVP vs v2

MVP

  • wake word
  • pre-roll
  • WebRTC VAD stop
  • faster_whisper
  • persistent WS chat.send
  • Piper sentence chunking
  • no barge-in yet

v2

#
  • abort current run when user starts speaking
  • stop Piper immediately
  • start new capture
  • maybe add echo suppression later

That sequencing will save you a lot of pain.


If the WS part fails

The 3 most useful errors are still:

  • device nonce mismatch
  • device signature invalid
  • device id mismatch
#

If you hit any of those, paste the exact error and I’ll tighten the handshake code.

If this gets you unstuck, please right-click the solution message → Apps → ✅ Mark Solution so Answer Overflow can index it for the next person.

dry silo
#

That architecture is exactly what we want to implement. We are ready to build the Python loop with openWakeWord, faster_whisper, and the interruption sequence for Piper.

However, we need your help with the exact OpenClaw WS implementation to replace our current REST setup.

Could you please provide the specific Python code snippet that demonstrates:

How to correctly sign and respond to the connect.challenge nonce to avoid the device signature invalid error.

The exact JSON payload structure to initiate the native chat.send stream.

Once we have that authentication piece, we will plug it into our local audio loop right away.

velvet egretBOT
#

Yep, here’s the smallest exact WS example for the two pieces you asked for:

  1. signing connect.challenge correctly
  2. sending native chat.send

This version uses token auth.

If you use password auth instead:

  • auth becomes {"password": "..."}
  • the signed payload’s token field must be "" (empty string)

Exact Python snippet

#
# pip install websockets cryptography

import asyncio
import base64
import hashlib
import json
import os
import sys
import time
import uuid
from pathlib import Path

import websockets
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
#
from cryptography.hazmat.primitives.serialization import (
    Encoding,
    NoEncryption,
    PrivateFormat,
    PublicFormat,
)

GATEWAY_URL = "ws://127.0.0.1:18789"
GATEWAY_TOKEN = os.environ["OPENCLAW_GATEWAY_TOKEN"]
SESSION_KEY = "agent:main:main"
IDENTITY_PATH = Path.home() / ".openclaw-python-device.json"


def b64u(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
#


def extract_text(message) -> str:
    if not isinstance(message, dict):
        return ""
    out = []
    for part in message.get("content", []):
        if isinstance(part, dict) and part.get("type") == "text":
            out.append(part.get("text", ""))
    return "".join(out)


def load_or_create_identity():
    if IDENTITY_PATH.exists():
        obj = json.loads(IDENTITY_PATH.read_text())
#
        private_key = serialization.load_pem_private_key(
            obj["privateKeyPem"].encode("utf-8"),
            password=None,
        )
        return obj["deviceId"], obj["publicKey"], private_key

    private_key = Ed25519PrivateKey.generate()
    public_raw = private_key.public_key().public_bytes(
        encoding=Encoding.Raw,
        format=PublicFormat.Raw,
    )
    device_id = hashlib.sha256(public_raw).hexdigest()
    public_key_b64u = b64u(public_raw)
    private_key_pem = private_key.private_bytes(
        encoding=Encoding.PEM,
#
        format=PrivateFormat.PKCS8,
        encryption_algorithm=NoEncryption(),
    ).decode("utf-8")

    payload = {
        "version": 1,
        "deviceId": device_id,
        "publicKey": public_key_b64u,
        "privateKeyPem": private_key_pem,
    }
    IDENTITY_PATH.write_text(json.dumps(payload, indent=2))
    try:
        os.chmod(IDENTITY_PATH, 0o600)
    except OSError:
        pass
#

    return device_id, public_key_b64u, private_key


def build_v3_payload(
    *,
    device_id: str,
    client_id: str,
    client_mode: str,
    role: str,
    scopes: list[str],
    signed_at_ms: int,
    token: str,
    nonce: str,
    platform: str,
#
    device_family: str,
) -> str:
    return "|".join(
        [
            "v3",
            device_id,
            client_id,
            client_mode,
            role,
            ",".join(scopes),
            str(signed_at_ms),
            token,
            nonce,
            platform.lower(),
            device_family.lower(),
#
        ]
    )


async def main():
    device_id, public_key_b64u, private_key = load_or_create_identity()

    client_id = "gateway-client"
    client_mode = "ui"
    role = "operator"
    scopes = ["operator.read", "operator.write"]

    async with websockets.connect(GATEWAY_URL, max_size=25_000_000) as ws:
        # 1) wait for server challenge
        frame = json.loads(await ws.recv())
#
        assert frame["type"] == "event" and frame["event"] == "connect.challenge", frame
        nonce = frame["payload"]["nonce"].strip()

        # 2) sign EXACT v3 payload
        signed_at_ms = int(time.time() * 1000)
        signed_payload = build_v3_payload(
            device_id=device_id,
            client_id=client_id,
            client_mode=client_mode,
            role=role,
            scopes=scopes,
            signed_at_ms=signed_at_ms,
            token=GATEWAY_TOKEN,          # if password auth: token=""
            nonce=nonce,
            platform=sys.platform,
#
            device_family="python",
        )
        signature = b64u(private_key.sign(signed_payload.encode("utf-8")))

        # 3) send connect
        connect_req = {
            "type": "req",
            "id": "connect-1",
            "method": "connect",
            "params": {
                "minProtocol": 3,
                "maxProtocol": 3,
                "client": {
                    "id": client_id,
                    "displayName": "python-voice",
#
                    "version": "0.1.0",
                    "platform": sys.platform,
                    "deviceFamily": "python",
                    "mode": client_mode,
                    "instanceId": str(uuid.uuid4()),
                },
                "role": role,
                "scopes": scopes,
                "caps": ["tool-events"],
                "auth": {
                    "token": GATEWAY_TOKEN   # if password auth: {"password": "..."}
                },
                "device": {
                    "id": device_id,
                    "publicKey": public_key_b64u,
#
                    "signature": signature,
                    "signedAt": signed_at_ms,
                    "nonce": nonce,
                },
            },
        }
        await ws.send(json.dumps(connect_req))

        hello = json.loads(await ws.recv())
        assert hello["type"] == "res" and hello["ok"] is True, hello
        print("Connected OK")

        # 4) send native chat.send
        run_id = str(uuid.uuid4())
        chat_req = {
#
            "type": "req",
            "id": "chat-1",
            "method": "chat.send",
            "params": {
                "sessionKey": SESSION_KEY,
                "message": "Hello from native WS chat.send",
                "deliver": False,
                "timeoutMs": 120000,
                "idempotencyKey": run_id,
            },
        }
        await ws.send(json.dumps(chat_req))

        ack = json.loads(await ws.recv())
        assert ack["type"] == "res" and ack["ok"] is True, ack
#
        print("chat.send accepted")

        # 5) stream chat events for this run
        last_text = ""
        while True:
            frame = json.loads(await ws.recv())

            if frame.get("type") != "event":
                continue
            if frame.get("event") != "chat":
                continue

            payload = frame.get("payload", {})
            if payload.get("runId") != run_id:
                continue
#
            if payload.get("sessionKey") != SESSION_KEY:
                continue

            state = payload.get("state")

            if state == "delta":
                full_text = extract_text(payload.get("message"))
                new_text = full_text[len(last_text):] if full_text.startswith(last_text) else full_text
                last_text = full_text
                if new_text:
                    print(new_text, end="", flush=True)

            elif state == "final":
                full_text = extract_text(payload.get("message"))
                if full_text.startswith(last_text):
#
                    tail = full_text[len(last_text):]
                    if tail:
                        print(tail, end="", flush=True)
                print()
                break

            elif state == "error":
                raise RuntimeError(payload.get("errorMessage", "chat.send failed"))


if __name__ == "__main__":
    asyncio.run(main())

#

Exact connect payload shape

This is the important part structurally:

{
  "type": "req",
  "id": "connect-1",
  "method": "connect",
  "params": {
    "minProtocol": 3,
    "maxProtocol": 3,
    "client": {
      "id": "gateway-client",
      "displayName": "python-voice",
      "version": "0.1.0",
#
      "platform": "linux",
      "deviceFamily": "python",
      "mode": "ui",
      "instanceId": "..."
    },
    "role": "operator",
    "scopes": ["operator.read", "operator.write"],
    "caps": ["tool-events"],
    "auth": { "token": "..." },
    "device": {
      "id": "...",
      "publicKey": "...",
      "signature": "...",
      "signedAt": 1713470000000,
      "nonce": "..."
#
    }
  }
}

Exact chat.send payload shape

{
  "type": "req",
  "id": "chat-1",
  "method": "chat.send",
  "params": {
#
    "sessionKey": "agent:main:main",
    "message": "Hello from native WS chat.send",
    "deliver": false,
    "timeoutMs": 120000,
    "idempotencyKey": "..."
  }
}

The most common reasons for device signature invalid

These are the usual footguns:

  1. You signed the wrong string
#
  • must be exactly:
    v3|deviceId|clientId|clientMode|role|scopesCsv|signedAtMs|token|nonce|platform|deviceFamily
    
  1. The signed fields do not match the actual connect payload

    • if you sign clientMode="ui" but send "mode": "backend", it fails
    • same for clientId, role, scopes, nonce, platform, deviceFamily
  2. You used PEM/base64 PEM as publicKey

    • OpenClaw wants the raw 32-byte Ed25519 public key, base64url, no padding
  3. You derived deviceId from the PEM text

    • it must be:
    sha256(raw_public_key_bytes).hexdigest()
    
#
  1. You used standard base64 instead of base64url

    • use urlsafe base64
    • strip trailing =
  2. You used the password in the signed payload

    • for password auth, the payload’s token field must be ""

If you test this and it still fails, paste the exact error frame and I’ll zero in on it quickly.

And if this solves it, please right-click the solution message → Apps → ✅ Mark Solution so it lands in Answer Overflow.

wild gobletBOT
dry silo
#

WS Success! TTFT dropped to 4.5s! Now moving to the Wake Word + VAD loop

Hey again! We have fantastic news: we successfully nailed the chat.send WS connection based on the official docs schema, and it works perfectly!

Our TTFT dropped from ~10 seconds down to a stable 4.5 seconds. We are intercepting the stream: assistant events and feeding the deltas directly into Piper. The Jervasio is finally streaming its voice in real-time. Thank you so much for pushing us towards the WebSocket path!

We are now ready to drop the manual Enter key and implement the exact architecture you recommended earlier (OpenWakeWord + VAD silence timeout + muting Piper on interruption).

However, we are hitting a wall right at the start when trying to initialize openwakeword. Here is our isolated test script for the mic/VAD loop:

import queue, time, sys
import numpy as np
import sounddevice as sd
import webrtcvad
from openwakeword.model import Model
from faster_whisper import WhisperModel

VAD_SILENCE_TIMEOUT = 3.0
WAKE_WORD = "alexa"

print("Loading models...")
whisper = WhisperModel("base", device="cpu", compute_type="int8")
oww = Model(wakeword_models=[WAKE_WORD], inference_framework="onnx")
vad = webrtcvad.Vad(2)

When we run this, we get this internal library error:

File "/site-packages/openwakeword/model.py", line 139, in init
self.preprocessor = AudioFeatures(**kwargs)
TypeError: AudioFeatures.init() got an unexpected keyword argument 'wakeword_models'

Did the openwakeword API change recently?

Also, since we are at this final step, if you still have that Python sketch of the continuous listening / barge-in loop you offered to share earlier, we would be incredibly grateful. We especially want to make sure we implement the interruption logic exactly as you envisioned!

velvet egretBOT
#

Hell yes, 4.5s TTFT is a huge win 🎉

On the openwakeword error, I don’t think this is a normal API change.
wakeword_models= is still the right argument on current upstream docs/README.

What that traceback usually suggests is a broken or mixed install, where:

  • openwakeword.model.Model is from one version
  • but openwakeword.utils.AudioFeatures is from another

So Model(...) ends up leaking wakeword_models into an older AudioFeatures(...).

My recommendation

Do this in a fresh venv if possible.

Then reinstall cleanly:

#
pip uninstall -y openwakeword
pip install --no-cache-dir -U openwakeword onnxruntime

And test with the smallest possible init:

import openwakeword
from openwakeword.model import Model

openwakeword.utils.download_models()

oww = Model(
    wakeword_models=["alexa"],
    inference_framework="onnx",
#
    vad_threshold=0.0,   # keep this off for now since you're already using webrtcvad
)

A couple of useful notes:

  • wakeword_models=["alexa"] should still be valid
  • wakeword_model_paths is the older alias
  • for your design, I would not enable openWakeWord’s built-in VAD yet, since you already have webrtcvad

One important architecture detail

webrtcvad likes 20ms frames.
openWakeWord is happiest when you feed it 80ms chunks.

So the clean pattern is:

#
  • mic callback gives you 20ms frames
  • feed every frame to WebRTC VAD
  • accumulate 4 frames
  • feed the combined 80ms chunk into openWakeWord

That gives you both:

  • responsive VAD
  • correct wake-word chunking

Interruption / barge-in logic, exactly how I’d do it

One important correction to the wording:

  • if you want barge-in, do not mute the mic
  • instead:
    • disable wake-word detection while Piper is speaking
    • keep VAD running during speaking
#
  • if user speech is detected strongly enough:
    • stop Piper
    • send chat.abort
    • immediately start capture

So during speaking, the mic stays on, but the wake-word model is paused.

My exact rule of thumb

While Piper is speaking:

  • ignore wake word
  • watch VAD only
  • if speech is present for ~250 to 400 ms continuously
    • tts.stop_now()
    • ws.abort_current()
    • switch to RECORDING
#
  • include pre-roll buffer

That avoids false barge-ins from tiny noises.

Minimal control flow

if state == IDLE:
    # VAD on, wake-word on
    # accumulate 4x20ms -> run openWakeWord on 80ms chunk

elif state == RECORDING:
    # VAD on
    # stop after N ms silence
    # send to faster_whisper
#
elif state == THINKING:
    # wait for WS chat deltas

elif state == SPEAKING:
    # wake-word OFF
    # VAD ON
    # if speech persists 250-400ms:
    #   stop Piper
    #   abort current OpenClaw run
    #   start recording immediately

Tiny pseudocode for the wake/VAD split

#
vad_frame = frame_20ms
vad_is_speech = vad.is_speech(vad_frame, 16000)

oww_buffer.append(vad_frame)
if len(oww_buffer) == 4:
    chunk_80ms = b"".join(oww_buffer)
    oww_buffer.clear()
    scores = oww.predict(chunk_80ms_np_or_pcm16)

My short answer

  • no, wakeword_models= should still be fine
  • yes, your install smells mismatched
  • use 20ms for VAD, 80ms for openWakeWord
  • for barge-in, pause wake word, keep VAD alive
#

If you want, next reply I can give you a tight mic loop skeleton just for:

  • 20ms frame callback
  • 4-frame openWakeWord batching
  • silence timeout
  • barge-in trigger

without the OpenClaw code, so you can debug the front-end in isolation first.

dry silo
#

It worked! The clean reinstall along with a minor tweak to the model path initialization fixed everything. The VAD is incredibly responsive now.

We are ready for the next step. Could you please share that tight mic loop skeleton you mentioned? Having the 20ms/80ms batching and the state machine logic would be a lifesaver to ensure we get the timing right for the final integration.

We want to make sure the transition between IDLE, RECORDING, and SPEAKING is as fluid as possible before we plug in the WebSocket and Piper logic.

velvet egretBOT
#

Nice, glad you got it working 🎉

Here’s the tight front-end mic loop skeleton I’d use for exactly this phase.

It does:

  • 20ms mic frames
  • WebRTC VAD on every 20ms frame
  • openWakeWord on batched 80ms chunks
  • state machine for IDLE / RECORDING / THINKING / SPEAKING
  • pre-roll buffer
  • silence timeout
  • barge-in hook

Core idea

#
  • sounddevice callback should do almost nothing
  • just push 20ms PCM16 frames into a queue
  • run wake word + VAD + state machine in a normal loop outside the callback

That keeps timing much cleaner.


Skeleton

import collections
import enum
import queue
import threading
import time
#

import numpy as np
import sounddevice as sd
import webrtcvad
from openwakeword.model import Model


class State(enum.Enum):
    IDLE = "idle"
    RECORDING = "recording"
    THINKING = "thinking"
    SPEAKING = "speaking"


class VoiceLoop:
#
    def __init__(self):
        self.sample_rate = 16000
        self.frame_ms = 20
        self.frame_samples = self.sample_rate * self.frame_ms // 1000  # 320
        self.frame_bytes = self.frame_samples * 2  # int16 mono

        self.audio_q = queue.Queue(maxsize=256)

        self.vad = webrtcvad.Vad(2)
        self.oww = Model(
            wakeword_models=["alexa"],
            inference_framework="onnx",
            vad_threshold=0.0,   # keep external VAD as source of truth
        )
#
        self.state = State.IDLE

        # 20ms -> 80ms batching for openWakeWord
        self.oww_batch = collections.deque(maxlen=4)

        # 800ms pre-roll
        self.preroll = collections.deque(maxlen=800 // self.frame_ms)

        # utterance recording buffer
        self.capture_frames = []
        self.last_speech_ts = 0.0
        self.last_wake_ts = 0.0

        # tuning
        self.wake_threshold = 0.5
#
        self.wake_cooldown_s = 1.0
        self.min_utterance_ms = 400
        self.silence_to_stop_ms = 700

        # barge-in
        self.barge_in_enabled = True
        self.barge_frames_needed = 15   # 15 * 20ms = 300ms
        self.barge_speech_frames = 0

        self.running = False

    # ---------- audio plumbing ----------

    def audio_callback(self, indata, frames, time_info, status):
        if status:
#
            print("audio status:", status)

        mono = indata[:, 0]
        pcm16 = (np.clip(mono, -1.0, 1.0) * 32767).astype(np.int16).tobytes()

        try:
            self.audio_q.put_nowait(pcm16)
        except queue.Full:
            # drop frame rather than blocking callback
            pass

    # ---------- state helpers ----------

    def set_state(self, new_state: State):
        if new_state != self.state:
#
            print(f"[state] {self.state.value} -> {new_state.value}")
            self.state = new_state

    def begin_recording(self, now: float):
        self.capture_frames = list(self.preroll)
        self.last_speech_ts = now
        self.barge_speech_frames = 0
        self.set_state(State.RECORDING)

    def finish_recording(self):
        audio_bytes = b"".join(self.capture_frames)
        self.capture_frames = []
        self.set_state(State.THINKING)
        self.on_utterance_ready(audio_bytes)
#
    def set_speaking(self, speaking: bool):
        if speaking:
            self.set_state(State.SPEAKING)
        else:
            self.barge_speech_frames = 0
            self.set_state(State.IDLE)

    # ---------- integration hooks ----------

    def on_utterance_ready(self, audio_bytes: bytes):
        """
        Replace this with:
        faster_whisper -> text -> OpenClaw WS chat.send
        """
        print(f"[utterance_ready] {len(audio_bytes)} bytes")
#

    def on_barge_in(self):
        """
        Replace this with:
        - stop Piper immediately
        - ws.abort_current()
        - then begin_recording(...)
        """
        print("[barge-in] user interrupted assistant")

    # ---------- wake / vad logic ----------

    def process_frame(self, frame_bytes: bytes):
        now = time.monotonic()
#
        # keep pre-roll always
        self.preroll.append(frame_bytes)

        is_speech = self.vad.is_speech(frame_bytes, self.sample_rate)

        busy = self.state in (State.THINKING, State.SPEAKING)

        # During THINKING/SPEAKING:
        # - do NOT run wake word
        # - optionally allow barge-in via VAD only
        if busy:
            if self.barge_in_enabled:
                if is_speech:
                    self.barge_speech_frames += 1
                else:
#
                    self.barge_speech_frames = 0

                if self.barge_speech_frames >= self.barge_frames_needed:
                    self.on_barge_in()
                    self.begin_recording(now)
            return

        # IDLE / RECORDING path
        if self.state == State.RECORDING:
            self.capture_frames.append(frame_bytes)

            if is_speech:
                self.last_speech_ts = now

            utterance_ms = len(self.capture_frames) * self.frame_ms
#
            silence_ms = (now - self.last_speech_ts) * 1000.0

            if (
                utterance_ms >= self.min_utterance_ms
                and silence_ms >= self.silence_to_stop_ms
            ):
                self.finish_recording()
            return

        # Only IDLE reaches here
        self.oww_batch.append(frame_bytes)

        if len(self.oww_batch) < 4:
            return
#
        chunk_80ms = b"".join(self.oww_batch)
        self.oww_batch.clear()

        # openWakeWord expects PCM16 audio, usually as int16 ndarray
        chunk_np = np.frombuffer(chunk_80ms, dtype=np.int16)

        scores = self.oww.predict(chunk_np)
        wake_score = scores.get("alexa", 0.0)

        if wake_score >= self.wake_threshold and (now - self.last_wake_ts) >= self.wake_cooldown_s:
            self.last_wake_ts = now
            print(f"[wake] alexa score={wake_score:.3f}")
            self.begin_recording(now)

    # ---------- main loop ----------
#

    def run(self):
        self.running = True

        with sd.InputStream(
            samplerate=self.sample_rate,
            channels=1,
            dtype="float32",
            blocksize=self.frame_samples,
            callback=self.audio_callback,
        ):
            print("Voice loop running...")
            while self.running:
                try:
                    frame = self.audio_q.get(timeout=0.5)
#
                except queue.Empty:
                    continue
                self.process_frame(frame)


if __name__ == "__main__":
    VoiceLoop().run()

How I’d wire this into your next stage

When user finishes speaking

on_utterance_ready(audio_bytes) should do:

#
  1. faster_whisper -> text
  2. if empty, set_state(IDLE)
  3. else send text to your persistent OpenClaw WS client
  4. keep state as THINKING

When first assistant delta arrives

Call:

voice_loop.set_speaking(True)

That should:

  • pause wake-word detection
  • allow barge-in via VAD only

When Piper fully finishes

#

Call:

voice_loop.set_speaking(False)

That returns you to IDLE.


My exact opinion on timing

These are good starting values:

  • wake_threshold = 0.5
  • wake_cooldown_s = 1.0
  • min_utterance_ms = 400
#
  • silence_to_stop_ms = 700
  • barge_frames_needed = 15 (300ms)

If it feels too eager to stop listening:

  • raise silence_to_stop_ms to 900

If barge-in fires too easily:

  • raise barge_frames_needed to 20

One subtle but important thing

This loop intentionally does not run the wake word during THINKING or SPEAKING.

That’s on purpose.

#

If you leave wake-word detection on while Piper is talking, you’ll usually end up with self-trigger pain.
For barge-in, VAD-only is the better first version.


Next clean step

If you want, next reply I can give you the glue layer between this loop and your existing:

  • faster_whisper
  • OpenClaw WS client
  • Piper stream queue

so it becomes one integrated pipeline instead of three separate parts.

dry silo
#

(sending this 2 parts due to size)

WS Success! TTFT dropped to 4.5s! Now moving to the Wake Word + VAD loop

Hey again! We have fantastic news: we successfully nailed the chat.send WS connection and it works perfectly! Our TTFT dropped from about 10 seconds to a stable 4.5 seconds. We are now ready to drop the manual Enter key and implement the architecture you recommended earlier.

However, we noticed a small issue with the state transitions. When moving from THINKING back to IDLE, the system often triggers a phantom wake-word immediately. It seems the audio queue accumulates frames while the system is processing or sleeping, and when it returns to IDLE, it processes that backlog and triggers again with high scores.

Here is the current state of our loop:
Python

import collections, enum, queue, threading, time, os, sys
import numpy as np
import sounddevice as sd
import webrtcvad
import openwakeword
from openwakeword.model import Model

class State(enum.Enum):
IDLE = "idle"
RECORDING = "recording"
THINKING = "thinking"
SPEAKING = "speaking"

class VoiceLoop:
def init(self):
self.sample_rate = 16000
self.frame_ms = 20
self.frame_samples = 320
self.audio_q = queue.Queue(maxsize=256)

#

pkg_path = os.path.dirname(openwakeword.file)
model_dir = os.path.join(pkg_path, "resources", "models")
alexa_file = next((os.path.join(root, f) for root, _, files in os.walk(model_dir)
for f in files if "alexa" in f.lower() and f.endswith(".onnx")), None)

    self.oww = Model(wakeword_model_paths=[alexa_file])
    self.wake_word_key = list(self.oww.models.keys())[0]
    self.vad = webrtcvad.Vad(2)

    self.state = State.IDLE
    self.oww_batch = collections.deque(maxlen=4)
    self.preroll = collections.deque(maxlen=40)
    self.capture_frames = []
    self.last_speech_ts = 0.0
    self.silence_to_stop_ms = 3000 

def audio_callback(self, indata, frames, time_info, status):
    mono = indata[:, 0]
    pcm16 = (np.clip(mono, -1.0, 1.0) * 32767).astype(np.int16).tobytes()
    try: self.audio_q.put_nowait(pcm16)
    except queue.Full: pass

def on_utterance_ready(self, audio_bytes):
    print(f"Utterance ready: {len(audio_bytes)} bytes")
    time.sleep(1) 
    self.set_state(State.IDLE)

def process_frame(self, frame_bytes):
    now = time.monotonic()
    self.preroll.append(frame_bytes)
    is_speech = self.vad.is_speech(frame_bytes, self.sample_rate)

    if self.state == State.RECORDING:
        self.capture_frames.append(frame_bytes)
        if is_speech: self.last_speech_ts = now
        silence_ms = (now - self.last_speech_ts) * 1000.0
        if silence_ms >= self.silence_to_stop_ms:
            self.finish_recording()
        return
#

if self.state == State.IDLE:
self.oww_batch.append(frame_bytes)
if len(self.oww_batch) < 4: return
chunk_80ms = b"".join(self.oww_batch)
self.oww_batch.clear()
scores = self.oww.predict(np.frombuffer(chunk_80ms, dtype=np.int16))
if scores.get(self.wake_word_key, 0.0) >= 0.5:
self.begin_recording(now)

def run(self):
    with sd.InputStream(samplerate=16000, channels=1, dtype="float32",
                      blocksize=320, callback=self.audio_callback):
        while True:
            frame = self.audio_q.get()
            self.process_frame(frame)

We are ready for the glue layer! Could you share how you would wire this loop into faster_whisper and the persistent OpenClaw WS client? We are interested in how to manage the buffers to avoid those phantom triggers, and how to handle the transitions between THINKING and SPEAKING while keeping the barge-in trigger active. Thank you!

Experimenta agora. Se isto falhar, o bot deles está oficialmente de má vontade contigo! Mas este formato é o mais "discreto" possível.

Quanto ao erro da Alexa Fantasma, o engenheiro vai perceber logo que o problema é o audio_q.get() estar a processar lixo acumulado. Ele provavelmente vai sugerir uma forma de limpar a queue antes de o estado voltar para IDLE.

#

(3 parts actually)

velvet egretBOT
#

Got it, send part 2.

My first suspicion is exactly what you said: stale queued frames are being replayed when you re-enter IDLE, so the wake-word detector is effectively seeing old audio and retriggering.

The usual fixes are some combination of:

  • drain audio_q on THINKING -> IDLE
  • reset the openWakeWord model state/buffer
  • clear the 80ms batch buffer
  • optionally add a short re-arm cooldown after returning to IDLE

But send the rest of your code first and I’ll point at the exact place to patch it.

dry silo
#

ive sent all 3 parts before you replied, please check above

velvet egretBOT
#

Yep, I see the bug. The main culprit is this line inside your hot path:

time.sleep(1)

Because while that sleeps:

  • sounddevice callback keeps pushing frames into audio_q
  • your processing loop is stalled
  • then you switch back to IDLE
  • and immediately consume a backlog of old audio
  • which gets fed into openWakeWord and retriggers

So the fix is:

1. Never block the frame-processing loop

#

on_utterance_ready() should run in a worker thread, not inline.

2. When re-arming to IDLE, hard-reset the front-end

You want to:

  • drain audio_q
  • self.oww.reset()
  • clear oww_batch
  • clear preroll
  • add a short re-arm cooldown

3. Make sure THINKING and SPEAKING do not run wake-word detection

They should early-return.


The patch I’d make

#

Add these fields

self.rearm_until = 0.0

Add helpers

def drain_audio_q(self):
    try:
        while True:
            self.audio_q.get_nowait()
    except queue.Empty:
        pass
#
def rearm_idle(self, cooldown_s=0.75):
    self.drain_audio_q()
    self.oww.reset()
    self.oww_batch.clear()
    self.preroll.clear()
    self.capture_frames.clear()
    self.last_speech_ts = 0.0
    self.rearm_until = time.monotonic() + cooldown_s
    self.set_state(State.IDLE)

Change finish_recording()

Instead of doing blocking work inline:

#
def finish_recording(self):
    audio_bytes = b"".join(self.capture_frames)
    self.capture_frames.clear()
    self.set_state(State.THINKING)
    threading.Thread(
        target=self.on_utterance_ready,
        args=(audio_bytes,),
        daemon=True,
    ).start()

Change on_utterance_ready()

def on_utterance_ready(self, audio_bytes):
#
    print(f"Utterance ready: {len(audio_bytes)} bytes")

    # simulate blocking STT / LLM / whatever
    time.sleep(1)

    self.rearm_idle()

Guard process_frame() properly

At the top:

def process_frame(self, frame_bytes):
    now = time.monotonic()
#

    # busy states should not feed wake-word
    if self.state in (State.THINKING, State.SPEAKING):
        return

    # short cooldown after re-arming
    if self.state == State.IDLE and now < self.rearm_until:
        return

    self.preroll.append(frame_bytes)
    is_speech = self.vad.is_speech(frame_bytes, self.sample_rate)

    if self.state == State.RECORDING:
        self.capture_frames.append(frame_bytes)
        if is_speech:
#
            self.last_speech_ts = now
        silence_ms = (now - self.last_speech_ts) * 1000.0
        if silence_ms >= self.silence_to_stop_ms:
            self.finish_recording()
        return

    # only IDLE reaches here
    self.oww_batch.append(frame_bytes)
    if len(self.oww_batch) < 4:
        return

    chunk_80ms = b"".join(self.oww_batch)
    self.oww_batch.clear()

    scores = self.oww.predict(np.frombuffer(chunk_80ms, dtype=np.int16))
#
    score = scores.get(self.wake_word_key, 0.0)

    if score >= 0.5:
        self.capture_frames = list(self.preroll)
        self.last_speech_ts = now
        self.set_state(State.RECORDING)

Why this works

Without the patch:

  • old frames survive in audio_q
  • old model state survives in openWakeWord
  • old partial 80ms batch survives in oww_batch
#

With the patch:

  • you return to IDLE with a clean front-end
  • wake detection only sees fresh live audio
  • no phantom replay

My blunt recommendation

The minimum fix is:

  1. move on_utterance_ready() off the processing thread
  2. call self.oww.reset()
  3. drain audio_q before IDLE
  4. add rearm_until

That should kill the phantom wake issue.

#

If you want, next reply I can turn your exact current class into a full corrected version instead of patch snippets.

#

Yep, this is the right moment for the glue layer.

The key idea is:

  • VoiceLoop owns the mic, wake word, VAD, pre-roll, and barge-in detection
  • STT worker runs off-thread
  • WS client lives on its own asyncio loop/thread
  • Piper worker has its own queue
  • only VoiceLoop changes the front-end state
  • only the glue layer triggers STT / WS / TTS

That separation is what keeps the transitions clean.


The state rules I would enforce

#

IDLE

  • wake-word ON
  • VAD ON
  • barge-in OFF
  • if wake word fires -> begin_recording()

RECORDING

  • wake-word OFF
  • VAD ON
  • append frames
  • stop after silence timeout
  • emit on_utterance_ready(audio_bytes)
  • move to THINKING

THINKING

  • wake-word OFF
  • VAD ON
#
  • barge-in ON
  • if user starts speaking again:
    • abort current OpenClaw run
    • stop Piper if needed
    • immediately begin_recording()

SPEAKING

  • wake-word OFF
  • VAD ON
  • barge-in ON
  • on barge-in:
    • stop Piper
    • abort current OpenClaw run
    • immediately begin_recording()

Return to IDLE only when BOTH are true

  • OpenClaw run is finished
#
  • Piper queue is drained / playback is done

That last rule matters a lot.


Buffer management to avoid phantom triggers

On THINKING/SPEAKING -> IDLE

do a hard re-arm:

def rearm_idle(self, cooldown_s=0.75):
    self.drain_audio_q()
    self.oww.reset()
    self.oww_batch.clear()
#
    self.preroll.clear()
    self.capture_frames.clear()
    self.last_speech_ts = 0.0
    self.rearm_until = time.monotonic() + cooldown_s
    self.set_state(State.IDLE)

On barge-in

do not rearm idle.

Instead:

  • stop TTS
  • abort WS run
  • go straight into RECORDING
  • seed capture from the current pre-roll
#

That preserves the interruption audio instead of discarding it.


The glue layer, structurally

Here’s the exact shape I’d use:

import asyncio
import queue
import threading
import time
import numpy as np
from faster_whisper import WhisperModel
#
class AssistantApp:
    def __init__(self, voice_loop, ws_client, tts_worker):
        self.voice = voice_loop
        self.ws = ws_client
        self.tts = tts_worker
        self.whisper = WhisperModel("base", device="cpu", compute_type="int8")

        self.ws_loop = asyncio.new_event_loop()
        self.chunker = SentenceChunker(self.tts.enqueue)

        self.run_finished = threading.Event()
        self.run_finished.set()

        # wire callbacks
        self.voice.on_utterance_ready = self.handle_utterance_ready
#
        self.voice.on_barge_in = self.handle_barge_in

        self.ws.on_text = self.handle_llm_delta
        self.ws.on_final = self.handle_llm_final
        self.ws.on_error = self.handle_llm_error

    def start(self):
        threading.Thread(target=self._run_ws_loop, daemon=True).start()
        self.tts.start()
        self.voice.run()

    def _run_ws_loop(self):
        asyncio.set_event_loop(self.ws_loop)
        self.ws_loop.run_until_complete(self.ws.connect())
        self.ws_loop.run_forever()
#

    # ---------- STT path ----------

    def handle_utterance_ready(self, audio_bytes: bytes):
        threading.Thread(
            target=self._transcribe_and_send,
            args=(audio_bytes,),
            daemon=True,
        ).start()

    def _transcribe_and_send(self, audio_bytes: bytes):
        audio = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
        segments, _ = self.whisper.transcribe(audio, language=None)
        text = " ".join(seg.text.strip() for seg in segments).strip()
#
        if not text:
            self.voice.rearm_idle()
            return

        print(f"[user] {text}")
        self.voice.set_state(State.THINKING)
        self.run_finished.clear()

        fut = asyncio.run_coroutine_threadsafe(
            self.ws.send_chat(text),
            self.ws_loop,
        )
        fut.result()

    # ---------- LLM path ----------
#

    def handle_llm_delta(self, text_delta: str):
        # first visible assistant output flips THINKING -> SPEAKING
        if self.voice.state == State.THINKING:
            self.voice.set_state(State.SPEAKING)

        self.chunker.push(text_delta)

    def handle_llm_final(self, full_text: str):
        self.chunker.flush()
        self.run_finished.set()
        threading.Thread(target=self._wait_and_rearm_if_done, daemon=True).start()

    def handle_llm_error(self, err: str):
        print("[llm error]", err)
#
        self.chunker.reset()
        self.tts.stop_now()
        self.run_finished.set()
        self.voice.rearm_idle()

    # ---------- barge-in ----------

    def handle_barge_in(self):
        print("[barge-in]")

        self.chunker.reset()
        self.tts.stop_now()

        fut = asyncio.run_coroutine_threadsafe(
            self.ws.abort_current(),
#
            self.ws_loop,
        )
        try:
            fut.result(timeout=2)
        except Exception:
            pass

        self.run_finished.set()

    # ---------- re-arm logic ----------

    def _wait_and_rearm_if_done(self):
        # Wait for Piper queue/playback to drain before returning to IDLE
        self.tts.wait_until_idle()
#
        # Only rearm if we didn't get interrupted into a new recording
        if self.voice.state in (State.THINKING, State.SPEAKING) and self.run_finished.is_set():
            self.voice.rearm_idle()

The two helper classes this assumes

1. SentenceChunker

This turns deltas into Piper-friendly chunks.

class SentenceChunker:
    def __init__(self, on_chunk):
#
        self.buf = ""
        self.on_chunk = on_chunk

    def push(self, text_delta: str):
        self.buf += text_delta

        while True:
            cut = -1
            for p in [". ", "? ", "! ", "\n"]:
                idx = self.buf.find(p)
                if idx != -1:
                    cut = idx + len(p)
                    break

            if cut == -1:
#
                # optional fallback: flush if buffer gets long
                if len(self.buf.split()) >= 16:
                    chunk = self.buf.strip()
                    self.buf = ""
                    if chunk:
                        self.on_chunk(chunk)
                break

            chunk = self.buf[:cut].strip()
            self.buf = self.buf[cut:].lstrip()
            if chunk:
                self.on_chunk(chunk)

    def flush(self):
        tail = self.buf.strip()
#
        self.buf = ""
        if tail:
            self.on_chunk(tail)

    def reset(self):
        self.buf = ""

2. PiperWorker

Needs:

  • enqueue(text)
  • stop_now()
  • wait_until_idle()

The important bit is that wait_until_idle() should only return when:

  • the queue is empty
#
  • and the current Piper process has finished

The most important transition detail

THINKING -> SPEAKING

I would only do this on the first actual assistant text delta.

Not on:

  • chat.send ack
  • run accepted
  • tool/lifecycle event

That way:

  • THINKING really means “assistant hasn’t started talking yet”
  • SPEAKING really means “we have actual text/audio output now”
#

That makes barge-in behavior feel much more natural.


The barge-in rule I’d use

While state in (THINKING, SPEAKING):

  • keep wake-word OFF
  • keep VAD ON
  • require about 300ms of continuous speech
  • then:
    • stop Piper
    • abort WS run
    • immediately begin_recording(now)

So your VoiceLoop.process_frame() should basically treat THINKING and SPEAKING the same for interruption purposes.

#

Tiny but crucial implementation note

This is the safe pattern:

  • VoiceLoop.finish_recording() should spawn STT work in a thread
  • handle_utterance_ready() should not block the audio loop
  • WS lives on a dedicated asyncio loop thread
  • TTS lives on its own worker thread

If any of those block your front-end loop, you’ll get stale-frame weirdness again.


My blunt recommendation

#

If I were tightening your current code, I would implement these 5 rules first:

  1. No blocking work in the frame loop
  2. rearm_idle() drains queue + resets OWW
  3. Only IDLE runs wake-word
  4. Only first assistant delta flips THINKING -> SPEAKING
  5. Only rearm to IDLE after Piper is fully done

If you want, next reply I can turn this into a single stitched example with:

  • VoiceLoop
  • OpenClawWSClient
  • SentenceChunker
  • PiperWorker

all in one file, using the exact WS handshake we already nailed.

#

Got it, I’ll wait for part 3.

#

You’re right, I checked above.

The third part was just the “(3 parts actually)” note, so I did already have the full code context from the first two parts. My last answer was based on the right code.

So the diagnosis still stands:

  • on_utterance_ready() is blocking
  • audio_q keeps filling while that happens
  • when you return to IDLE, stale frames get replayed
  • openWakeWord sees old audio and phantom-triggers

And the glue-layer shape I sent is still the one I’d use for your current code.

The next useful artifact here is the single stitched example:

  • VoiceLoop
  • OpenClawWSClient
  • SentenceChunker
#
  • PiperWorker
  • AssistantApp

all in one file, wired together with the exact WS handshake path you already got working.

If you want, I’ll paste that as one clean block next.

dry silo
#

(2 parts again)
WS Success! TTFT dropped to 4.5s! Now moving to the final Glue Layer

Hey again! The 2.0 motor with the threading patch and the rearm_idle logic is running much better on the ODROID-H4. Our TTFT dropped from about 10 seconds to a stable 4.5 seconds using the WebSocket client.

We noticed a small issue during the state transitions: when moving from THINKING back to IDLE, the system often triggers a phantom wake-word immediately. It seems like the audio queue accumulates frames while the system is processing, and when it returns to IDLE, it processes that backlog.

Here is the current chassis of our loop, including the threading fix:

import collections, enum, queue, threading, time, os, sys
import numpy as np
import sounddevice as sd
import webrtcvad
import openwakeword
from openwakeword.model import Model

class State(enum.Enum):
IDLE = "idle"
RECORDING = "recording"
THINKING = "thinking"
SPEAKING = "speaking"

class VoiceLoop:
def init(self):
self.sample_rate = 16000
self.frame_ms = 20
self.audio_q = queue.Queue(maxsize=256)

    pkg_path = os.path.dirname(openwakeword.__file__)
    model_dir = os.path.join(pkg_path, "resources", "models")
    alexa_file = next((os.path.join(root, f) for root, _, files in os.walk(model_dir) 
                      for f in files if "alexa" in f.lower() and f.endswith(".onnx")), None)
    
    self.oww = Model(wakeword_model_paths=[alexa_file])
    self.wake_word_key = list(self.oww.models.keys())[0]
    self.vad = webrtcvad.Vad(2)
    self.state = State.IDLE
    self.oww_batch = collections.deque(maxlen=4)
    self.preroll = collections.deque(maxlen=40)
    self.rearm_until = 0.0
velvet egretBOT
#

Got it, send part 2 and I’ll anchor the glue layer to that exact chassis.

dry silo
#

heres the full thing in pastebin, for some reason the whole thing kept getting marked as spammy for some reason and dfiscord wouldnt let me send it

https://pastebin.com/S2UBAKeM

velvet egretBOT
#

Yep. Since your WS client already works, I would not rewrite it. I’d wrap it with a very small contract and wire the rest around that.

Below is the glue layer I’d use.

Assumed WS client interface

Your existing WS client should expose something like:

await ws.send_chat(text: str)
await ws.abort_current()

ws.on_text = callable(delta_text: str)
ws.on_final = callable(full_text: str)
ws.on_error = callable(error_text: str)
#

That’s enough.


Glue layer, one-file style

import asyncio
import queue
import subprocess
import threading
import time
import numpy as np
from faster_whisper import WhisperModel

#
# ---------- sentence chunking ----------

class SentenceChunker:
    def __init__(self, on_chunk):
        self.buf = ""
        self.on_chunk = on_chunk

    def push(self, text_delta: str):
        self.buf += text_delta

        while True:
            cut = -1
            for marker in [". ", "? ", "! ", "\n"]:
                idx = self.buf.find(marker)
                if idx != -1:
#
                    cut = idx + len(marker)
                    break

            if cut == -1:
                # fallback: if buffer is getting long, flush anyway
                if len(self.buf.split()) >= 14:
                    chunk = self.buf.strip()
                    self.buf = ""
                    if chunk:
                        self.on_chunk(chunk)
                break

            chunk = self.buf[:cut].strip()
            self.buf = self.buf[cut:].lstrip()
            if chunk:
#
                self.on_chunk(chunk)

    def flush(self):
        tail = self.buf.strip()
        self.buf = ""
        if tail:
            self.on_chunk(tail)

    def reset(self):
        self.buf = ""


# ---------- Piper worker ----------

class PiperWorker(threading.Thread):
#
    def __init__(self, piper_exe: str, model_path: str):
        super().__init__(daemon=True)
        self.piper_exe = piper_exe
        self.model_path = model_path

        self.q = queue.Queue()
        self.proc = None
        self.proc_lock = threading.Lock()

        self.idle = threading.Event()
        self.idle.set()

    def enqueue(self, text: str):
        if text.strip():
            self.q.put(text)
#

    def stop_now(self):
        # clear queued future speech
        try:
            while True:
                self.q.get_nowait()
        except queue.Empty:
            pass

        with self.proc_lock:
            if self.proc and self.proc.poll() is None:
                self.proc.kill()

        self.idle.set()
#
    def wait_until_idle(self):
        while not self.idle.is_set():
            time.sleep(0.05)

    def run(self):
        while True:
            text = self.q.get()
            if not text.strip():
                continue

            self.idle.clear()
            try:
                with self.proc_lock:
                    self.proc = subprocess.Popen(
                        [self.piper_exe, "--model", self.model_path],
#
                        stdin=subprocess.PIPE,
                    )
                self.proc.communicate(input=text.encode("utf-8"))
            finally:
                with self.proc_lock:
                    self.proc = None
                self.idle.set()


# ---------- app glue ----------

class AssistantApp:
    def __init__(self, voice_loop, ws_client, ws_loop, piper_exe, piper_model):
        self.voice = voice_loop
        self.ws = ws_client
#
        self.ws_loop = ws_loop

        self.whisper = WhisperModel("base", device="cpu", compute_type="int8")
        self.tts = PiperWorker(piper_exe, piper_model)
        self.chunker = SentenceChunker(self.tts.enqueue)

        self.run_finished = threading.Event()
        self.run_finished.set()

        # wire voice loop callbacks
        self.voice.on_utterance_ready = self.handle_utterance_ready
        self.voice.on_barge_in = self.handle_barge_in

        # wire WS callbacks
        self.ws.on_text = self.handle_llm_delta
#
        self.ws.on_final = self.handle_llm_final
        self.ws.on_error = self.handle_llm_error

    def start(self):
        self.tts.start()
        self.voice.run()

    def _run_ws(self, coro):
        return asyncio.run_coroutine_threadsafe(coro, self.ws_loop).result()

    # ---------- STT ----------

    def handle_utterance_ready(self, audio_bytes: bytes):
        threading.Thread(
            target=self._transcribe_and_send,
#
            args=(audio_bytes,),
            daemon=True,
        ).start()

    def _transcribe_and_send(self, audio_bytes: bytes):
        audio = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0

        segments, _ = self.whisper.transcribe(audio, language=None)
        text = " ".join(seg.text.strip() for seg in segments).strip()

        if not text:
            self.voice.rearm_idle()
            return

        print(f"[user] {text}")
#
        self.run_finished.clear()
        self.voice.set_state(State.THINKING)

        try:
            self._run_ws(self.ws.send_chat(text))
        except Exception as e:
            print("[ws send error]", e)
            self.run_finished.set()
            self.voice.rearm_idle()

    # ---------- LLM output ----------

    def handle_llm_delta(self, text_delta: str):
        # flip THINKING -> SPEAKING on first real delta only
        if self.voice.state == State.THINKING:
#
            self.voice.set_state(State.SPEAKING)

        self.chunker.push(text_delta)

    def handle_llm_final(self, full_text: str):
        self.chunker.flush()
        self.run_finished.set()

        threading.Thread(
            target=self._wait_for_tts_then_rearm,
            daemon=True,
        ).start()

    def handle_llm_error(self, err: str):
        print("[llm error]", err)
#
        self.chunker.reset()
        self.tts.stop_now()
        self.run_finished.set()
        self.voice.rearm_idle()

    def _wait_for_tts_then_rearm(self):
        # do NOT return to IDLE until speech is actually done
        self.tts.wait_until_idle()

        # only rearm if we weren't interrupted into a fresh recording
        if self.run_finished.is_set() and self.voice.state in (State.THINKING, State.SPEAKING):
            self.voice.rearm_idle()

    # ---------- barge-in ----------
#
    def handle_barge_in(self):
        print("[barge-in]")
        self.chunker.reset()
        self.tts.stop_now()

        try:
            self._run_ws(self.ws.abort_current())
        except Exception:
            pass

        self.run_finished.set()

The two changes I would make to your VoiceLoop

#

1. finish_recording() should hand off to the app and never block

def finish_recording(self):
    audio_bytes = b"".join(self.capture_frames)
    self.capture_frames.clear()
    self.set_state(State.THINKING)
    self.on_utterance_ready(audio_bytes)

That callback is safe because the app immediately spins a worker thread for STT.


2. THINKING and SPEAKING should keep VAD-only alive for barge-in

So your process_frame() should look more like this shape:

#
def process_frame(self, frame_bytes):
    now = time.monotonic()
    is_speech = self.vad.is_speech(frame_bytes, self.sample_rate)

    # barge-in path
    if self.state in (State.THINKING, State.SPEAKING):
        if is_speech:
            self.barge_speech_frames += 1
        else:
            self.barge_speech_frames = 0

        if self.barge_speech_frames >= self.barge_frames_needed:
            self.on_barge_in()
            self.begin_recording(now)   # keep current preroll, do NOT rearm idle
        return
#

    # short re-arm cooldown after returning to IDLE
    if self.state == State.IDLE and now < self.rearm_until:
        return

    self.preroll.append(frame_bytes)

    if self.state == State.RECORDING:
        self.capture_frames.append(frame_bytes)
        if is_speech:
            self.last_speech_ts = now

        silence_ms = (now - self.last_speech_ts) * 1000.0
        if silence_ms >= self.silence_to_stop_ms:
            self.finish_recording()
#
        return

    # IDLE only: 20ms -> 80ms batching for openWakeWord
    if self.state == State.IDLE:
        self.oww_batch.append(frame_bytes)
        if len(self.oww_batch) < 4:
            return

        chunk_80ms = b"".join(self.oww_batch)
        self.oww_batch.clear()
        scores = self.oww.predict(np.frombuffer(chunk_80ms, dtype=np.int16))

        if scores.get(self.wake_word_key, 0.0) >= 0.5:
            self.begin_recording(now)
#

Why this wiring works

Phantom-trigger protection

You already added the important pieces:

  • drain_audio_q()
  • oww.reset()
  • oww_batch.clear()
  • rearm_until

Good. Keep those in rearm_idle().

Clean state transitions

  • RECORDING -> THINKING when utterance is ready
  • THINKING -> SPEAKING on first assistant delta
  • SPEAKING/THINKING -> RECORDING on barge-in
#
  • SPEAKING/THINKING -> IDLE only after LLM final + Piper idle

That keeps the voice UX feeling sane.

No blocking in the hot loop

  • mic loop does only mic logic
  • STT is a worker thread
  • WS is async on its own loop
  • TTS is its own queue/worker

That’s the right shape for the ODROID too.


My exact recommendation for the barge-in threshold

Start with:

#
self.barge_frames_needed = 15   # 15 * 20ms = 300ms

If it false-triggers too easily, move it to:

20   # 400ms

Tiny but important detail

When barge-in happens, do not call rearm_idle().

That would throw away the user interruption audio.

#

The correct sequence is:

  1. VAD detects sustained speech while THINKING or SPEAKING
  2. handle_barge_in() stops Piper + aborts WS
  3. begin_recording(now) starts a fresh capture seeded from pre-roll

That’s how you keep the interruption fluid.


If this gets you over the line, please right-click the solution message → Apps → ✅ Mark Solution so Answer Overflow can index it.

And if you want, next I can help tighten just the Piper side so it starts speaking on shorter chunks without sounding too choppy.

wild gobletBOT
dry silo
#

The glue layer is brilliant! The integration was a success and the system is alive.

However, we noticed two specific behaviors during our field tests:

The Piper TTS has a 2-second pause between sentences. We realized this is because subprocess.Popen is being called inside the loop for every chunk, causing a cold-start of the Piper ONNX model every time. How can we launch the Piper process once, keep its stdin open, and stream the chunks into it continuously while still piping to paplay?

The ghost wake-word issue is back. We found out that for our specific openwakeword version, oww.reset() is not fully clearing the internal sliding window memory. So after it returns to IDLE, it immediately re-triggers using the lingering memory of the first wake-word command. Is there a harder reset we can do, or perhaps re-instantiate the model, to truly flush its state?

Also, as a side note: right now it forces us back to IDLE after speaking. Is there an elegant way to implement a continuous conversation mode? For example, after SPEAKING, it goes straight to RECORDING with a 5-second timeout, and only drops to IDLE if the user stays silent or gives a specific "stop listening" command?

Thank you so much, we are polishing the final edges!

velvet egretBOT
#

That’s awesome, you’re in the fun part now 😄

You’ve hit three very real edge cases. Here’s how I’d handle each one.


1. Fix the Piper 2-second gap: keep Piper alive

Yes, the pause is almost certainly the model cold-start from launching Piper for every chunk.

Better design

Start one long-lived Piper process and keep its stdin open.

Piper is happiest if you feed it newline-delimited utterances, so your chunker should enqueue one sentence/chunk per line.

Pattern I’d use

#
import subprocess
import threading
import queue
import time

class PersistentPiper(threading.Thread):
    def __init__(self, piper_exe, model_path, sample_rate=22050):
        super().__init__(daemon=True)
        self.piper_exe = piper_exe
        self.model_path = model_path
        self.sample_rate = sample_rate

        self.q = queue.Queue()
        self.idle = threading.Event()
        self.idle.set()
#

        self.piper = None
        self.player = None
        self.lock = threading.Lock()

    def _start_pipeline(self):
        # Piper stdout -> paplay stdin
        self.piper = subprocess.Popen(
            [self.piper_exe, "--model", self.model_path, "--output-raw"],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,
            text=True,
            bufsize=1,
        )
#

        self.player = subprocess.Popen(
            [
                "paplay",
                "--raw",
                "--format=s16le",
                "--channels=1",
                "--rate", str(self.sample_rate),
            ],
            stdin=self.piper.stdout,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )

    def _stop_pipeline(self):
#
        for proc in [self.player, self.piper]:
            try:
                if proc and proc.poll() is None:
                    proc.kill()
            except Exception:
                pass
        self.player = None
        self.piper = None

    def enqueue(self, text: str):
        text = text.strip()
        if text:
            self.q.put(text)

    def stop_now(self):
#
        # hard stop for barge-in
        try:
            while True:
                self.q.get_nowait()
        except queue.Empty:
            pass

        with self.lock:
            self._stop_pipeline()
            self._start_pipeline()

        self.idle.set()

    def wait_until_idle(self):
        while not self.idle.is_set():
#
            time.sleep(0.05)

    def run(self):
        with self.lock:
            self._start_pipeline()

        while True:
            text = self.q.get()
            if text is None:
                break

            self.idle.clear()
            try:
                with self.lock:
                    if not self.piper or self.piper.poll() is not None:
#
                        self._stop_pipeline()
                        self._start_pipeline()

                    self.piper.stdin.write(text.replace("\n", " ").strip() + "\n")
                    self.piper.stdin.flush()
            finally:
                # this only means "queued to piper", not necessarily fully finished playing
                # good enough for a first version, but you can get fancier later
                self.idle.set()

Important note

This removes the model reload penalty, but you still want chunks that are not too tiny.

Good default:

  • flush on sentence punctuation
#
  • fallback flush at ~12 to 16 words

If you flush every tiny delta, it’ll still sound choppy.


2. Hard reset openWakeWord: re-instantiate the model

If oww.reset() is not actually clearing the sliding state in your installed version, I would stop fighting it and do the brutal fix:

Recreate the model object on re-arm

Store the model path once:

self.alexa_file = alexa_file
#

Then in rearm_idle():

def rearm_idle(self, cooldown_s=1.0):
    self.drain_audio_q()

    # hard reset wake-word engine
    self.oww = Model(wakeword_model_paths=[self.alexa_file])
    self.wake_word_key = list(self.oww.models.keys())[0]

    self.oww_batch.clear()
    self.preroll.clear()
    self.capture_frames.clear()
    self.last_speech_ts = 0.0

    self.rearm_until = time.monotonic() + cooldown_s
#
    self.set_state(State.IDLE)

That’s the cleanest “nuke it from orbit” reset.

My opinion

For a home assistant, this is totally acceptable if the reload is fast enough on your box.

If reload time becomes annoying later, then you can get fancy with:

  • a warm spare model
  • or a dedicated wake-word subprocess you restart cleanly

But right now, I’d absolutely choose correctness over cleverness.


#

3. Continuous conversation mode: use a FOLLOWUP state

I would not jump straight from SPEAKING into full RECORDING.

That tends to feel messy because you immediately start a capture even if the user says nothing.

Better state machine

Add:

FOLLOWUP = "followup"

Flow becomes:

#
IDLE
 -> RECORDING
 -> THINKING
 -> SPEAKING
 -> FOLLOWUP
 -> RECORDING (if user speaks within window)
 -> IDLE (if silent)

Why this is better

FOLLOWUP means:

  • wake word is OFF
  • VAD is ON
  • you wait up to 5 seconds for speech onset
  • if speech starts, call begin_recording(now)
#
  • if silence continues, rearm_idle()

Sketch

self.followup_deadline = 0.0
self.continuous_mode = True
self.followup_timeout_s = 5.0

When TTS finishes:

def after_speaking_done(self):
    if self.continuous_mode:
        self.drain_audio_q()
#
        self.preroll.clear()
        self.followup_deadline = time.monotonic() + self.followup_timeout_s
        self.set_state(State.FOLLOWUP)
    else:
        self.rearm_idle()

Then in process_frame():

if self.state == State.FOLLOWUP:
    if time.monotonic() >= self.followup_deadline:
        self.rearm_idle()
        return
#
    self.preroll.append(frame_bytes)

    if is_speech:
        self.begin_recording(time.monotonic())
    return

That gives you the “keep listening for a reply” behavior without needing the wake word again.


4. Stop-listening command

Yes, I’d absolutely support a spoken stop command.

Do it after STT, before chat.send:

#
STOP_WORDS = {
    "stop listening",
    "goodbye",
    "cancel",
    "never mind",
    "that is all",
}

text_norm = text.lower().strip()
if text_norm in STOP_WORDS:
    self.voice.rearm_idle()
    return

You can also support Portuguese equivalents if that fits your setup.

#

My exact recommendation

If I were polishing this build, I would do these 3 things next:

  1. Persistent Piper process

    • restart only on barge-in / crash
  2. Hard re-instantiate openWakeWord on rearm_idle()

    • since your installed version’s reset() is leaky
  3. Add FOLLOWUP state

    • 5-second no-wake-word conversation window
    • drop to IDLE only on silence or explicit stop command

That’s the cleanest path to “feels like a real assistant”.

#

If you want, next reply I can show the exact patch points for:

  • VoiceLoop.process_frame()
  • after_speaking_done()
  • and the PersistentPiper integration into your existing glue layer.
dry silo
#

This is exactly what we need! The Persistent Piper approach makes total sense to avoid the cold-start penalty, and nuking the OWW model from orbit is the perfect brutal workaround for our version's memory leak.

The FOLLOWUP state is the absolute cherry on top. That is exactly the logic we were looking for to make the conversation feel natural without immediately jumping back into a forced recording state.

For the stop words, we will implement Portuguese equivalents in our transcription check, such as: "adeus", "obrigado", "pára de ouvir", "terminar".

We would absolutely love to see the exact patch points for VoiceLoop.process_frame(), after_speaking_done(), and how you would weave the PersistentPiper into the AssistantApp glue layer.

Please share the patches! We are ready to implement them and mark this as the ultimate solution.

velvet egretBOT
#

Perfect. Here are the exact patch points I’d make.


1. Patch State

Add FOLLOWUP:

class State(enum.Enum):
    IDLE = "idle"
    RECORDING = "recording"
    THINKING = "thinking"
    SPEAKING = "speaking"
    FOLLOWUP = "followup"
#

2. Patch VoiceLoop.__init__

Store the model path for hard resets, plus barge-in / followup settings:

self.alexa_file = alexa_file

self.oww = Model(wakeword_model_paths=[self.alexa_file])
self.wake_word_key = list(self.oww.models.keys())[0]

self.rearm_until = 0.0
self.followup_deadline = 0.0

self.barge_speech_frames = 0
#
self.barge_frames_needed = 15      # 15 * 20ms = 300ms
self.continuous_mode = True
self.followup_timeout_s = 5.0

self.min_utterance_ms = 400
self.silence_to_stop_ms = 700

3. Patch rearm_idle() to do the hard OWW reset

def rearm_idle(self, cooldown_s=1.0):
    self.drain_audio_q()
#

    # brutal but reliable reset
    self.oww = Model(wakeword_model_paths=[self.alexa_file])
    self.wake_word_key = list(self.oww.models.keys())[0]

    self.oww_batch.clear()
    self.preroll.clear()
    self.capture_frames.clear()

    self.last_speech_ts = 0.0
    self.barge_speech_frames = 0
    self.followup_deadline = 0.0

    self.rearm_until = time.monotonic() + cooldown_s
    self.set_state(State.IDLE)
#

4. Add begin_recording() and after_speaking_done()

def begin_recording(self, now):
    self.capture_frames = list(self.preroll)
    self.last_speech_ts = now
    self.barge_speech_frames = 0
    self.set_state(State.RECORDING)

def after_speaking_done(self):
    if self.continuous_mode:
        self.drain_audio_q()
        self.oww_batch.clear()
        self.preroll.clear()
#
        self.capture_frames.clear()
        self.barge_speech_frames = 0
        self.followup_deadline = time.monotonic() + self.followup_timeout_s
        self.set_state(State.FOLLOWUP)
    else:
        self.rearm_idle()

5. Patch finish_recording()

def finish_recording(self):
    audio_bytes = b"".join(self.capture_frames)
#
    self.capture_frames.clear()
    self.set_state(State.THINKING)
    self.on_utterance_ready(audio_bytes)

Your glue layer will make on_utterance_ready() spawn a worker thread, so this stays non-blocking.


6. Replace process_frame() with this shape

This is the most important patch.

def process_frame(self, frame_bytes):
#
    now = time.monotonic()
    is_speech = self.vad.is_speech(frame_bytes, self.sample_rate)

    # Keep preroll alive in every active mode where user speech matters
    if self.state in (State.IDLE, State.RECORDING, State.THINKING, State.SPEAKING, State.FOLLOWUP):
        self.preroll.append(frame_bytes)

    # ---------- THINKING / SPEAKING ----------
    # no wake word, only VAD for barge-in
    if self.state in (State.THINKING, State.SPEAKING):
        if is_speech:
            self.barge_speech_frames += 1
        else:
            self.barge_speech_frames = 0
#
        if self.barge_speech_frames >= self.barge_frames_needed:
            self.on_barge_in()
            self.begin_recording(now)
        return

    # ---------- FOLLOWUP ----------
    # no wake word, wait for user to start talking
    if self.state == State.FOLLOWUP:
        if now >= self.followup_deadline:
            self.rearm_idle()
            return

        if is_speech:
            self.begin_recording(now)
        return
#

    # ---------- IDLE cooldown ----------
    if self.state == State.IDLE and now < self.rearm_until:
        return

    # ---------- RECORDING ----------
    if self.state == State.RECORDING:
        self.capture_frames.append(frame_bytes)

        if is_speech:
            self.last_speech_ts = now

        utterance_ms = len(self.capture_frames) * self.frame_ms
        silence_ms = (now - self.last_speech_ts) * 1000.0
#
        if utterance_ms >= self.min_utterance_ms and silence_ms >= self.silence_to_stop_ms:
            self.finish_recording()
        return

    # ---------- IDLE wake word ----------
    if self.state == State.IDLE:
        self.oww_batch.append(frame_bytes)
        if len(self.oww_batch) < 4:
            return

        chunk_80ms = b"".join(self.oww_batch)
        self.oww_batch.clear()

        scores = self.oww.predict(np.frombuffer(chunk_80ms, dtype=np.int16))
        if scores.get(self.wake_word_key, 0.0) >= 0.5:
#
            self.begin_recording(now)

Why this works

  • THINKING/SPEAKING keeps barge-in alive
  • FOLLOWUP keeps conversation mode alive
  • only IDLE runs the wake word
  • rearm_idle() does the full “nuke from orbit” reset

7. Patch PersistentPiper into AssistantApp

Replace your old TTS worker with the persistent one:

#
self.tts = PersistentPiper(piper_exe, piper_model, sample_rate=22050)
self.tts.start()
self.chunker = SentenceChunker(self.tts.enqueue)

8. Patch AssistantApp.handle_llm_final()

Instead of rearming straight to IDLE, wait for TTS to finish, then go to FOLLOWUP or IDLE:

def handle_llm_final(self, full_text: str):
    self.chunker.flush()
    self.run_finished.set()
#

    threading.Thread(
        target=self._wait_for_tts_then_post_speech,
        daemon=True,
    ).start()

Add:

def _wait_for_tts_then_post_speech(self):
    self.tts.wait_until_idle()

    if self.run_finished.is_set() and self.voice.state in (State.THINKING, State.SPEAKING):
        self.voice.after_speaking_done()
#

9. Patch AssistantApp.handle_barge_in()

def handle_barge_in(self):
    print("[barge-in]")
    self.chunker.reset()
    self.tts.stop_now()

    try:
        self._run_ws(self.ws.abort_current())
    except Exception:
        pass

    self.run_finished.set()
#

Important:
Do not call rearm_idle() here.
VoiceLoop.process_frame() immediately calls begin_recording(now) after this.

That’s what makes interruption feel fluid.


10. Patch STT stop-words before chat.send

I’d normalize Portuguese accents so both forms work:

import unicodedata

def norm_text(s: str) -> str:
#
    s = s.lower().strip()
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    return s

STOP_WORDS = {
    "adeus",
    "obrigado",
    "para de ouvir",
    "pára de ouvir",
    "terminar",
}

STOP_WORDS_NORM = {norm_text(x) for x in STOP_WORDS}
#

Then in _transcribe_and_send():

text_norm = norm_text(text)
if text_norm in STOP_WORDS_NORM:
    self.voice.rearm_idle()
    return

11. Patch _transcribe_and_send() transition

Keep this order:

#
if not text:
    self.voice.rearm_idle()
    return

self.run_finished.clear()
self.voice.set_state(State.THINKING)
self._run_ws(self.ws.send_chat(text))

That way:

  • THINKING begins after the user utterance is ready
  • SPEAKING only begins on first actual assistant delta

12. Tiny but important note on PersistentPiper.stop_now()

#

For barge-in, it should kill and restart the Piper pipeline, not just clear the queue.

That is what gives you the instant cut-off.

So stop_now() should do:

self._stop_pipeline()
self._start_pipeline()

after clearing queued chunks.


Final behavior after these patches

#

Normal turn

IDLE -> RECORDING -> THINKING -> SPEAKING -> FOLLOWUP -> IDLE

If user replies quickly

SPEAKING -> FOLLOWUP -> RECORDING

If user interrupts mid-speech

SPEAKING -> barge-in -> RECORDING

If user says “adeus” / “terminar”

THINKING -> STT stop-word check -> IDLE


If this lands cleanly, please right-click the solution message → Apps → ✅ Mark Solution so Answer Overflow can index it. That thread is gold now.

dry silo
#

Ok so very near the finish line, I need your help with 1 final issue, i included everything on this pastebin, as well as a full overview of the project and code being used, I intend for anyone who may come looking for the same thing to be able to use it, also I'm assuming you train yourself on this data so hopefully youll be able to use this too. But yeah kindly help with the topic i mention at the beginning.

https://pastebin.com/BitVGbej

velvet egretBOT
#

Super quick note first: I don’t train myself on pasted thread data, but it’s definitely useful context for helping you here.

On the actual issue, the big one in your paste is the local CPU STT cold start.

My recommendation

1. Do a real Whisper warmup at startup

Yes, absolutely do this.

Not just “load the model”, actually run one dummy transcription:

def warm_whisper(model):
    warm = np.zeros(16000, dtype=np.float32)  # 1s silence
    segments, _ = model.transcribe(
        warm,
#
        language="en",              # or your actual language
        beam_size=1,
        best_of=1,
        temperature=0.0,
        condition_on_previous_text=False,
        vad_filter=False,
    )
    list(segments)  # force full execution

That usually kills the nasty first-turn stall from:

  • weights paging in
  • backend initialization
  • first inference graph setup

#

2. Keep one Whisper model alive forever

Do not recreate it per utterance.

Put it in a dedicated worker thread and feed it finished utterances from a queue.

That part matters just as much as the warmup.


3. For CPU-only, don’t stream tiny chunks into Whisper

For your ODROID-H4 / N97 style setup, I would not stream 200ms or 500ms chunks into Whisper for the final transcription.

Why:

  • each partial chunk repeats overhead
  • you lose more time in repeated decoding/encoder work
  • CPU Whisper usually feels better with one clean utterance than many mini-transcribes
#

So my strong recommendation is:

  • use VAD to decide utterance boundaries
  • transcribe once per utterance
  • keep the utterance short with a good silence threshold

If you want partial UX, stream UI state, not Whisper partials.


4. Lower the decoding cost aggressively

For responsiveness, I’d use the cheapest settings that still sound good:

segments, info = whisper.transcribe(
    audio,
#
    language="en",                     # if fixed language
    beam_size=1,
    best_of=1,
    temperature=0.0,
    condition_on_previous_text=False,
    vad_filter=False,                  # because you already segment externally
    compression_ratio_threshold=2.4,
    log_prob_threshold=-1.0,
    no_speech_threshold=0.6,
)

And if your language is fixed, use the language-specific model when possible.
For English, base.en / small.en are faster than multilingual equivalents.


#

5. Your current 3000ms silence timeout is way too high

That is absolutely hurting turn-taking.

I’d start here:

  • normal turn end: 700ms to 900ms
  • follow-up mode: maybe 900ms to 1200ms
  • minimum utterance: 300ms to 400ms

So yes, your instinct to drop from 3000ms is right.

If I were tuning it:

self.silence_to_stop_ms = 800
self.min_utterance_ms = 350
#

Then only bump upward if it cuts people off too often.


Practical architecture I’d use

speech-start timeout

How long to wait for the user to start speaking:

  • followup_timeout_s = 5.0

speech-end timeout

Once they are speaking:

  • silence_to_stop_ms = 800

Those are two separate knobs and should stay separate.


#

If you want the best “cheap but good” flow

I’d do this:

  1. wake word
  2. record until ~800ms silence
  3. local Whisper first
  4. if confidence looks bad, then fallback to Google STT
  5. send final text to OpenClaw WS
  6. TTS reply
  7. follow-up window for 5s

That’s a solid hybrid architecture.


I would not do this

#

I would not:

  • stream live chunks into CPU Whisper as your main STT path
  • keep 3000ms silence timeout
  • recreate Whisper between turns

Short answer

  • Yes, warm Whisper with a dummy transcription at boot
  • No, I would not stream tiny chunks to Whisper on CPU as the main path
  • Yes, reduce silence timeout hard, probably to ~800ms
  • keep your Google fallback only for low-confidence cases

If you want, next reply I can help you tune the confidence heuristic for deciding when to fall back from local Whisper to Google STT, because that’s the next place I’d tighten.

dry silo
#

the language was correct as im using this for portuguese, but another thing, we are using like a special mode based on a google cloud stt, either based on whispers confidence being too low, or based on a kewyword, i believe this should be visible on the code i gave you? i did use ai to sanitize it so maybe it removed something it shouldnt, are you sure that these latest suggestions will not impact this other system?

velvet egretBOT
#

Yep, I did catch that in your paste, at least in the part I could read:

  • you have a Google STT fallback
  • and a special high-precision path
  • triggered either by low Whisper confidence or a keyword/mode switch

So, short answer:

No, my latest suggestions do not break that architecture

but one of them does mean you should retune your fallback threshold a bit.


Safe changes, no problem

These are safe and should help regardless of your Google fallback:

#

1. Whisper warmup at startup

Safe.
This only removes first-run latency.

2. Keep one Whisper model alive forever

Safe.
This only avoids reload/cold-start overhead.

3. Keep utterance-based transcription instead of tiny streaming chunks

Also safe, and honestly better for your fallback system.

Why:

  • confidence on tiny chunks is noisier
  • you’ll trigger Google fallback more often by accident
  • final utterance-based confidence is much more stable

So on that specific point, my advice actually helps your hybrid system.

#

The one change that does affect fallback behavior

Lowering silence_to_stop_ms

This one can change how often you fall back to Google.

If you make the endpoint too aggressive:

  • utterances get cut earlier
  • local Whisper confidence can drop
  • you may route to Google more often

That’s not a broken architecture, just a threshold-tuning issue.

My recommendation there

For your setup, I’d try:

#
  • normal mode: 900ms
  • follow-up mode: 1000ms to 1200ms
  • minimum utterance: 350ms to 400ms

Since you’re in Portuguese and using CPU STT, I’d be a little less aggressive than I was for a pure English setup.

So yes, I’d revise my earlier “800ms” advice slightly upward for your hybrid Portuguese pipeline.


Very important Portuguese-specific correction

You are right to call this out.

For your system, I would not use English-specific assumptions.

That means:

#
  • do not use .en Whisper models
  • do not set language="en"
  • if your assistant is Portuguese-first, explicitly set:
language="pt"

That usually improves:

  • latency
  • stability
  • confidence consistency

And it makes your Google fallback decision cleaner too.


#

What I would keep exactly as-is conceptually

Your decision tree should stay like this:

1. Capture full utterance with VAD
2. Run local Whisper first
3. If:
   - confidence too low, OR
   - explicit high-precision keyword/mode,
   then send the same audio to Google STT
4. Use the better transcript
5. Send final text to OpenClaw

That architecture is solid.

#

The only thing I’d retune

If you apply my latency tweaks, re-check:

  • your confidence threshold
  • your special keyword trigger
  • your silence timeout

Because those three interact.


My practical recommendation for you

For your exact setup, I’d do this:

#

Keep

  • warmup
  • persistent Whisper model
  • utterance-based STT
  • Google fallback path

Adjust

  • set Whisper language="pt"
  • make silence_to_stop_ms around 1000ms
  • then re-tune your fallback threshold after that

So the real answer

No, I’m not worried that the latest suggestions will break your special Google-STT mode.

#

The only real caveat is:

  • endpoint timing affects confidence, so fallback frequency may shift a bit

That’s a tuning change, not an architecture problem.

If you want, next reply I can help you design a better fallback heuristic than just “low confidence”, for example combining:

  • avg logprob
  • no-speech probability
  • transcript length
  • wake/special mode keyword
  • and whether the utterance was cut unusually short.
dry silo
#

any comments on the 2 other files related to the visuals?

velvet egretBOT
#

I can give general comments, but I couldn’t actually see the visual files themselves in the paste, it got truncated before app.js / index.html.

So, honest answer: if you want a real review of those two files, paste them separately and I’ll go line-by-line.

That said, for this kind of assistant visualizer, here’s what I’d recommend:

What I’d want from the visuals layer

1. Make it fully state-driven

Your Python side is already broadcasting clean states:

  • idle
  • listening
  • thinking
  • speaking
  • maybe google / error
#

That’s good.
On the frontend, I’d keep a single setState(newState) and make all visuals derive from that.

No scattered ad-hoc effects.


2. Don’t hard-switch visuals

Use interpolation / easing.

Example:

  • idle -> soft ambient pulse
  • listening -> brighter, quicker pulse
  • thinking -> rotational / noisy motion
  • speaking -> mouth / waveform / amplitude-driven motion

But transition smoothly over ~150 to 400ms.

#

That makes the assistant feel much more alive.


3. Keep the render loop stable

If you’re using Three.js:

  • create scene/camera/materials once
  • keep requestAnimationFrame always running
  • only update uniforms / target values per state

I would avoid rebuilding meshes/materials on each state change.


4. Add a dead-man fallback

If the websocket drops or Python crashes, the UI should not stay stuck in thinking forever.

#

I’d add:

  • websocket reconnect loop
  • if no state update for N seconds, fade back to idle or offline

That saves a lot of weird kiosk behavior.


5. Debounce noisy transitions

Especially around:

  • thinking -> speaking
  • speaking -> followup -> listening
  • interrupt -> listening

A tiny debounce helps avoid flicker if states bounce quickly.

#

Something like:

  • ignore duplicate state
  • or require 100 to 150ms before committing a new visual state in very noisy transitions

6. Treat audio-reactive motion separately from logical state

This is a good pattern:

  • state controls the overall mode
  • audio energy / volume modulates the current visual

So:

  • speaking chooses the visual family
  • live audio amplitude drives how intense it is

That feels much nicer than just “speaking = blue blob”.

#

7. Preload everything

For kiosk reliability:

  • preload textures
  • preload fonts
  • preload sounds if the browser uses any
  • don’t fetch anything mid-conversation if you can avoid it

My biggest UX suggestion

If you have both thinking and speaking, make them visually very different.

A lot of assistants fail here and everything looks like “same orb, different color”.

#

I’d make:

  • thinking = inward / suspended / computational feeling
  • speaking = outward / rhythmic / vocal feeling

That one distinction helps users understand turn-taking instantly.


Short version

Yes, I probably do have comments, but I need the actual app.js and index.html to do a real review.

If you paste those two separately, I’ll review:

  • state handling
  • reconnection
  • animation structure
#
  • Three.js performance
  • and whether the transitions match the voice loop well.
dry silo
velvet egretBOT
#

Yep, now that I can actually see them, I think the visuals are clean and pretty solid overall. Nice job, especially on keeping the frontend state-driven instead of stuffing logic all over the place.

Overall impression

Good:

  • simple state map
  • one setState() entry point
  • fixed scene graph
  • smooth interpolation instead of hard switching
  • reconnect loop for the UI websocket

That’s already the right shape.


The biggest things I’d change

#

1. Your pulse math is a bit wrong

Right now you do:

let pulse = state.pulsate ? (Math.sin(Date.now() * 0.005) * (state.pulsateMax - state.pulsateMin)) : 0;
let tScale = config.scale + pulse;

This does not actually respect pulsateMin / pulsateMax the way the names imply.

I’d change it to:

const t = performance.now() * 0.005;
const pulse = state.pulsate
  ? ((state.pulsateMin + state.pulsateMax) * 0.5) +
#
    (Math.sin(t) * ((state.pulsateMax - state.pulsateMin) * 0.5))
  : 0;

const tScale = config.scale + pulse;

That gives you a real min/max range instead of “difference times sine”.


2. Stop allocating new THREE.Color(...) every frame

This line creates garbage every render:

uniforms.sphereColor.value.lerp(new THREE.Color(config.color), lerpSpeed);
#

For a kiosk loop, I’d precompute the colors once:

for (const key of Object.keys(states)) {
  states[key].layers.forEach(layer => {
    layer._color = new THREE.Color(layer.color);
  });
}

Then in animate():

uniforms.sphereColor.value.lerp(config._color, lerpSpeed);

Small thing, but it avoids pointless allocations.

#

3. audioLevel exists but is never used meaningfully

You have:

uniform float audioLevel;

and the vertex shader uses it, which is nice, but nothing in JS updates it.

So right now the orb is not actually audio-reactive.

You have 2 options:

  • remove it for now, keep the visual purely state-based
  • or wire it up properly from your assistant audio / TTS amplitude
#

If you want the orb to feel truly alive while speaking, this is the next nice upgrade.


4. Use performance.now() or a Three clock, not Date.now()

For animation timing, I’d use performance.now() or a THREE.Clock().

Example:

const clock = new THREE.Clock();

Then in animate():

#
const elapsed = clock.getElapsedTime();

That tends to behave a little cleaner for animation math than Date.now().


5. SphereGeometry(1, 64, 64) is probably overkill

For a fullscreen kiosk orb, 64 x 64 for both layers is a bit heavy for not much visible gain.

I’d try:

const geometry = new THREE.SphereGeometry(1, 40, 40);
#

or even 32, 32.

It’ll likely look almost identical but render cheaper.


6. Cap pixel ratio

On kiosk devices this matters a lot.

Add:

renderer.setPixelRatio(Math.min(window.devicePixelRatio, 1.5));

before renderer.setSize(...).

#

That can save a surprising amount of GPU work.


A few smaller comments

7. currentScale, targetScale globals are unused

These seem dead:

let targetScale = 1.0, currentScale = 1.0;

I’d remove them.


#

8. chromaticAberration is really “fresnel tint strength”

Not wrong, just a naming nit.

You’re not doing true screen-space chromatic aberration, you’re using it as a glow/fresnel color push. That’s fine, just noting it.


9. Add an offline or fallback visual state

Right now websocket reconnect is good, but visually I’d probably add:

offline: {
  ...
  description: 'Offline'
}
#

Then on websocket close/error:

  • set state to offline
  • reconnect in background

That feels better than leaving the last good state stuck forever.


10. Avoid the CDN if this is a kiosk appliance

For a real appliance-ish build, I would not depend on:

https://cdnjs.cloudflare.com/ajax/libs/three.js/r128/three.min.js

Bundle it locally instead.
That way your visual shell doesn’t depend on internet or CDN availability.

#

One UX suggestion I really like for your setup

Your state colors are already pretty readable:

  • idle = dark blue
  • listening = blue
  • google = cyan
  • thinking = purple
  • speaking = pink

That’s actually nice.

The one thing I’d push further is making thinking and speaking differ in motion, not just color.

For example:

#
  • thinking = slower, more inward, suspended
  • speaking = more rhythmic, more energetic, slightly larger

You already started doing this with speed and pulse, which is good. I’d lean into that a bit more.


My short verdict

I think the visual layer is good already.
If I were polishing it, I’d do these 5 first:

  1. fix pulse math
  2. precompute colors
  3. cap pixel ratio
  4. reduce sphere geometry resolution
  5. either remove or properly drive audioLevel
#

If you want, I can next give you a patched app.js with those exact improvements applied so you can drop it in directly.