#Can't setup reaction for trigger in postgresql

1 messages · Page 1 of 1 (latest)

oak pikeBOT
#
Notes for Can't setup reaction for trigger in postgresql
At your assistance

@marble sphinx

No Response?

If no response in a reasonable time, ping @Member.

Closing

To close, type !solve or byte solve.

MCVE

Please include an MCVE so that we can reproduce your issue locally.

marble sphinx
#

There is code without file:

import asyncio
import json
import logging

from asyncpg import Connection as AsyncpgConnection
from sqlalchemy import PoolProxiedConnection, text
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine

from infra.db.triggers.buffer import NewsBuffer

logger = logging.getLogger(__name__)


class NewsListener:
    def __init__(self, engine: AsyncEngine, buffer: NewsBuffer):
        self.engine = engine
        self.buffer = buffer
        self._running: bool = False
        self._listener_task: asyncio.Task = None
        self._driver_conn: AsyncpgConnection = None

    async def start(self):
        self._running = True
        logger.info("🔗 Спроба підключення до БД...")
        logger.info("🛠 Вхід y _listen_loop")

        print(123)
        self._listener_task = asyncio.create_task(self._listen_loop())

    async def _listen_loop(self):
        while self._running:
            try:
                print("Entering _ensure_trigger_exists...")
                await self._ensure_trigger_exists()
                """
                Afer this await i dont see my others prints in terminal, so i dont know why
                
                
                """
                print(self.engine)
                print("123")
                logger.info("📡 Викликаю listen_notifications...")

            except Exception as e:
                logger.error(f"❌ Помилка підключення: {e}")
                await asyncio.sleep(5)  # чекаємо перед наступною спробою

            finally:
                # Не встановлюємо _running = False повністю
                await asyncio.sleep(1)
#

    async def listen_notifications(self, conn):
        logger.info("------------------- 🔔 Запуск прослуховування змін...")
        try:
            raw_conn: PoolProxiedConnection = await conn.get_raw_connection()
            _driver_conn: AsyncpgConnection = raw_conn.driver_connection

            await _driver_conn.add_listener(
                "news_changes", self._handle_notification
            )
            logger.info("🔔 Прослуховування запущено!")

        except Exception as e:
            logger.error(f"❌ Помилка прослуховування: {e}")

    async def _handle_notification(self, *args):
        """Обробляє вхідні сповіщення"""
        try:
            # args: (connection, pid, channel, payload)
            payload = json.loads(args[3])
            logging.info(f"Received notification: {payload}")
        except Exception as e:
            logging.error(f"Notification handling error: {e}")
#
    async def _ensure_trigger_exists(self):
        """Створює тригер, якщо він ще не існує"""
        try:
            async with self.engine.connect() as conn:
                print("✅ Підключення до БД встановлено")

                print("🔍 Перевірка наявності функції notify_news_change...")
                result = await conn.execute(
                    text(
                        "SELECT 1 FROM pg_proc WHERE proname = 'notify_news_change'"  # noqa: E501
                    )
                )
                print("✅ Запит SELECT для функції виконано")

                func_exists = result.scalar()
                print(f"📢 Функція існує? {func_exists}")

                if not func_exists:
                    print("🛠 Створюю функцію notify_news_change...")
#
                    await conn.execute(
                        text("""
                        CREATE OR REPLACE FUNCTION notify_news_change()
                        RETURNS TRIGGER AS $$
                        BEGIN
                            PERFORM pg_notify(
                                'news_changes',
                                jsonb_build_object(
                                    'operation', TG_OP,
                                    'data', CASE
                                        WHEN TG_OP = 'DELETE' THEN row_to_json(OLD)
                                        ELSE row_to_json(NEW)
                                    END
                                )::text
                            );
                            RETURN NULL;
                        END;
                        $$ LANGUAGE plpgsql;
                    """)
                    )
                    print("✅ Функція notify_news_change створена")
#

                print("🔍 Перевірка наявності тригера...")
                result = await conn.execute(
                    text(
                        "SELECT 1 FROM pg_trigger WHERE tgname = 'news_change_trigger'"  # noqa: E501
                    )
                )
                print("✅ Запит SELECT для тригера виконано")

                trigger_exists = result.scalar()
                print(f"📢 Тригер існує? {trigger_exists}")

                if not trigger_exists:
                    print("🛠 Створюю тригер news_change_trigger...")
                    await conn.execute(
                        text("""
                        CREATE TRIGGER news_change_trigger
                        AFTER INSERT OR UPDATE OR DELETE ON news
                        FOR EACH ROW EXECUTE FUNCTION notify_news_change();
                    """)
                    )
                    print("✅ Тригер news_change_trigger створений")

                print("✅ Успішне створення тригера!")
                await conn.commit()
                """
                And after this commit i dont see my print outside of context manager
                """
            print("✅ Коміт виконано")
        except Exception as e:
            print(f"❌ Помилка створення тригера: {e}")
#

    async def stop(self):
        logger.info("🛠 Вихід з _listen_loop")
        self._running = False

        if self._driver_conn:
            try:
                await self._driver_conn.remove_listener(
                    "news_changes", self._handle_notification
                )
            except Exception as e:
                logger.error(f"Error removing listener: {e}")

        if self._listener_task:
            self._listener_task.cancel()
            try:
                await self._listener_task
            except asyncio.CancelledError:
                pass

        logger.info("Listener stopped successfully")

#

Logs:

🔍 Перевірка наявності функції notify_news_change...
2025-03-25 18:51:06,397 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-25 18:51:06,397 INFO sqlalchemy.engine.Engine SELECT 1 FROM pg_proc WHERE proname = 'notify_news_change'
2025-03-25 18:51:06,397 INFO sqlalchemy.engine.Engine [cached since 402.3s ago] ()
✅ Запит SELECT для функції виконано
📢 Функція існує? 1
🔍 Перевірка наявності тригера...
2025-03-25 18:51:06,399 INFO sqlalchemy.engine.Engine SELECT 1 FROM pg_trigger WHERE tgname = 'news_change_trigger'
2025-03-25 18:51:06,399 INFO sqlalchemy.engine.Engine [cached since 402.3s ago] ()
✅ Запит SELECT для тригера виконано
📢 Тригер існує? 1
✅ Успішне створення тригера!
2025-03-25 18:51:06,401 INFO sqlalchemy.engine.Engine COMMIT

lean tulip
#

have you tested the trigger outside of python?

lean tulip
#

Verify your trigger actually works in something like datagrip if it's not working.

Also, trhy moving that last print statement into a finally block to see if it makes a difference:

        finally:
            print("✅ Коміт виконано")
#

Also, what does and the code doesn't work anymore. mean exectly?

#

meaning you don't git that conditional block? or you get an error