#event for receiving notification of the start of the stream on twitch there are no errors but it doe

1 messages · Page 1 of 1 (latest)

wicked robin
#
import os
import disnake
from disnake.ext import commands, tasks
import requests
from utils.database import DataBase

class Twitch(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.db = DataBase()  
        self.client_id = os.getenv('TWITCH_CLIENT_ID')
        self.oauth_token = os.getenv('TWITCH_ACCESS_TOKEN')
        self.discord_channel_id = 1177508121078398977
        self.twitch_username = None
        self.is_streaming = False
        self.stream_check.start()

    def get_stream_data(self):
        url = f"https://api.twitch.tv/helix/streams?user_login={self.twitch_username}"
        headers = {
            "Client-ID": self.client_id,
            "Authorization": f"Bearer {self.oauth_token}"
        }
        response = requests.get(url, headers=headers)
        data = response.json()
        return data

    async def send_stream_notification(self):
        twitch_users = await self.db.get_twitch_users()

        for self.twitch_username in twitch_users:
            data = self.get_stream_data()

            if "data" in data and data["data"]:
                if not self.is_streaming:
                    self.is_streaming = True
                    stream = data["data"][0]
                    stream_title = stream["title"]
                    stream_url = f"https://twitch.tv/{self.twitch_username}"
                    viewers = stream["viewer_count"]
                    avatar_url = stream["thumbnail_url"].replace("{width}", "1280").replace("{height}", "720")
                    embed = disnake.Embed(title="Начался стрим!",
                                          description=f"Пользователь **{self.twitch_username}** начал стримить!",
                                          color=disnake.Color.purple())
                    embed.set_image(url=avatar_url)
                    embed.add_field(name="Название стрима", value=stream_title, inline=False)
                    embed.add_field(name="Количество зрителей", value=viewers, inline=False)
                    embed.add_field(name="Ссылка на стрим", value=f"[Смотреть стрим]({stream_url})", inline=False)
                    await self.bot.get_channel(self.discord_channel_id).send(embed=embed)
                else:
                    self.is_streaming = False

    @tasks.loop(seconds=60)
    async def stream_check(self):
        await self.send_stream_notification()

    @stream_check.before_loop
    async def before_stream_check(self):
        await self.bot.wait_until_ready()

def setup(bot):
    bot.add_cog(Twitch(bot))```
idle lagoonBOT
#
from disnake.ext import tasks, commands

class MyCog(commands.Cog):

    def __init__(self, bot):
        self.bot = bot
        self.index = 0
        self.printer.start()

    def cog_unload(self):
        self.printer.cancel()

    @tasks.loop(seconds=5.0)
    async def printer(self):
        print(self.index)
        self.index += 1

    @printer.before_loop
    async def before_printer(self):
        print('waiting...')
        await self.bot.wait_until_ready()

    @printer.error
    async def printer_error(self, error):
        formatted = "".join(
        traceback.format_exception(type(error), error, error.__traceback__))
        await self.bot.get_channel(channel_id).send(formatted)


def setup(bot):
    bot.add_cog(MyCog(bot))```
lean schooner
#

use error handler for ur task

wintry glade
idle lagoonBOT
#

In asynchronous programming a blocking call is essentially all the parts of the function that are not await. Do not despair however, because not all forms of blocking are bad! Using blocking calls is inevitable, but you must work to make sure that you don't excessively block functions. Remember, if you block for too long then your bot will freeze since it has not stopped the function's execution at that point to do other things.
If logging is enabled, this library will attempt to warn you that blocking is occurring with the message: Heartbeat blocked for more than N seconds. See Setting Up Logging for details on enabling logging.
A common source of blocking for too long is something like time.sleep. Don't do that. Use asyncio.sleep instead. Similar to this example:

# Bad  
time.sleep(10)  
  
# Good  
await asyncio.sleep(10)  

Another common source of blocking for too long is using HTTP requests with the famous module Requests: HTTP for Humans™. While Requests: HTTP for Humans™ is an amazing module for non-asynchronous programming, it is not a good choice for asyncio because certain requests can block the event loop too long. Instead, use the aiohttp library which is installed on the side with this library.
Consider the following example:

# Bad  
r = requests.get("http://aws.random.cat/meow")  
if r.status_code == 200:  
 json = r.json()  
 await channel.send(json["file"])  
  
# Good  
async with aiohttp.ClientSession() as session:  
 async with session.get("http://aws.random.cat/meow") as r:  
 if r.status == 200:  
 json = await r.json()  
 await channel.send(json["file"])  

wicked robin
# wintry glade use `aiohttp` instead

well, on aiohttp, what's the joke anyway doesn't work

import os
import disnake
from disnake.ext import commands, tasks
import aiohttp
from utils.database import DataBase

class Twitch(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.db = DataBase()
        self.client_id = os.getenv('TWITCH_CLIENT_ID')
        self.oauth_token = os.getenv('TWITCH_ACCESS_TOKEN')
        self.discord_channel_id = 1177508121078398977
        self.twitch_usernames = []  # Изменил на список для поддержки нескольких пользователей
        self.is_streaming = {}

        self.stream_check.start()

    async def get_stream_data(self, username):
        url = f"https://api.twitch.tv/helix/streams?user_login={username}"
        headers = {
            "Client-ID": self.client_id,
            "Authorization": f"Bearer {self.oauth_token}"
        }
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=headers) as response:
                data = await response.json()
        return data

    async def send_stream_notification(self, username):
        data = await self.get_stream_data(username)

        if "data" in data and data["data"]:
            if username not in self.is_streaming or not self.is_streaming[username]:
                self.is_streaming[username] = True
                stream = data["data"][0]
                stream_title = stream["title"]
                stream_url = f"https://twitch.tv/{username}"
                viewers = stream["viewer_count"]
                avatar_url = stream["thumbnail_url"].replace("{width}", "1280").replace("{height}", "720")
                embed = disnake.Embed(title="Начался стрим!",
                                      description=f"Пользователь **{username}** начал стримить!",
                                      color=disnake.Color.purple())
                embed.set_image(url=avatar_url)
                embed.add_field(name="Название стрима", value=stream_title, inline=False)
                embed.add_field(name="Количество зрителей", value=viewers, inline=False)
                embed.add_field(name="Ссылка на стрим", value=f"[Смотреть стрим]({stream_url})", inline=False)
                await self.bot.get_channel(self.discord_channel_id).send(embed=embed)
            else:
                self.is_streaming[username] = False

    @tasks.loop(seconds=10)
    async def stream_check(self):
        for username in self.twitch_usernames:
            await self.send_stream_notification(username)

    @stream_check.before_loop
    async def before_task(self):
        await self.bot.wait_until_ready()


def setup(bot):
    bot.add_cog(Twitch(bot))```
lean schooner
#

you just ignore my messages?

wicked robin
lean schooner
#

what method?

wicked robin
# lean schooner what method?
@tasks.loop(seconds=10)
    async def stream_check(self):
        for username in self.twitch_usernames:
            try:
                await self.send_stream_notification(username)
            except Exception as e:
                print(f"{e}")```
wicked robin
lean schooner
#

data = await response.json()
print(data)

wicked robin
#
from motor.motor_asyncio import AsyncIOMotorClient
import os

class DataBase:
    def __init__(self):
        try:
            self.cluster = AsyncIOMotorClient(os.getenv("MONGODB_LINK"))
            self.twitch = self.cluster.ark.twitch
        except Exception as e:
            print(f"Error connecting to MongoDB: {e}")

    async def add_twitch_user(self, username: str):
        try:
            await self.twitch.insert_one({"username": username})
            print(f"Added Twitch user: {username}")
        except Exception as e:
            print(f"Error adding Twitch user: {e}")

    async def get_twitch_users(self):
        try:
            cursor = self.twitch.find({})
            users = [document["username"] async for document in cursor]
            return users
        except Exception as e:
            print(f"Error getting Twitch users: {e}")
            return []

    def close_connection(self):
        try:
            self.cluster.close()
            print("Closed MongoDB connection")
        except Exception as e:
            print(f"Error closing MongoDB connection: {e}")

There are no errors in the database like

lean schooner
#

just run function separately, without any task

wicked robin
lean schooner
wicked robin
#

sorry

lean schooner
#

its ok

wicked robin
#

LOL there are zero errors

#

there are no errors but it doesn't work either

#
import os
import disnake
import aiohttp
from utils.database import DataBase
from disnake.ext import commands

class Twitch(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.db = DataBase()
        self.client_id = os.getenv('TWITCH_CLIENT_ID')
        self.oauth_token = os.getenv('TWITCH_ACCESS_TOKEN')
        self.discord_channel_id = 1177508121078398977
        self.twitch_usernames = []  # Изменил на список для поддержки нескольких пользователей
        self.is_streaming = {}

    async def get_stream_data(self, username):
        url = f"https://api.twitch.tv/helix/streams?user_login={username}"
        headers = {
            "Client-ID": self.client_id,
            "Authorization": f"Bearer {self.oauth_token}"
        }
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=headers) as response:
                data = await response.json()
                print(data)
        return data

    async def send_stream_notification(self, username):
        data = await self.get_stream_data(username)

        if "data" in data and data["data"]:
            if username not in self.is_streaming or not self.is_streaming[username]:
                self.is_streaming[username] = True
                stream = data["data"][0]
                stream_title = stream["title"]
                stream_url = f"https://twitch.tv/{username}"
                viewers = stream["viewer_count"]
                avatar_url = stream["thumbnail_url"].replace("{width}", "1280").replace("{height}", "720")
                embed = disnake.Embed(title="Начался стрим!",
                                      description=f"Пользователь **{username}** начал стримить!",
                                      color=disnake.Color.purple())
                embed.set_image(url=avatar_url)
                embed.add_field(name="Название стрима", value=stream_title, inline=False)
                embed.add_field(name="Количество зрителей", value=viewers, inline=False)
                embed.add_field(name="Ссылка на стрим", value=f"[Смотреть стрим]({stream_url})", inline=False)
                await self.bot.get_channel(self.discord_channel_id).send(embed=embed)
            else:
                self.is_streaming[username] = False

    async def check_streams(self):
        for username in self.twitch_usernames:
            try:
                await self.send_stream_notification(username)
            except Exception as e:
                print(f"{e}")

def setup(bot):
    bot.add_cog(Twitch(bot))
lean schooner
#

did u ran this function separately?

wicked robin
lean schooner
wicked robin
lean schooner
#

it works

#

how did u run the code?

wicked robin
lean schooner
#

show

wicked robin
#

I do not know why it does not work for me

lean schooner
#

show how did u ran the code

wicked robin
#
import aiohttp
import os
client_id = os.getenv('TWITCH_CLIENT_ID')
os.getenv('TWITCH_ACCESS_TOKEN')

    async def get_stream_data(self, username):
        url = f"https://api.twitch.tv/helix/streams?user_login=art1ord"
        headers = {
            "Client-ID": self.client_id,
            "Authorization": f"Bearer {self.oauth_token}"
        }
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=headers) as response:
                data = await response.json()
        print(data)
#

something like this

lean schooner
# lean schooner are you sure?
    url = "https://api.twitch.tv/helix/streams?user_login=mande"
    headers = {
        "Client-ID": "dfhgdh",
        "Authorization": "Bearer dgfdfgd"
    }
    async with aiohttp.request("GET", url=url, headers=headers) as response:
        data = await response.json()
        print(data)
        return data```
#

0 problems here

wicked robin
#

hm

#

the question is why they don't give me anything

lean schooner
#

bcz ur function don't start never

#

lol

wicked robin
lean schooner
#

that's ok, streamer is offline

#

data : []

#

that's all?

#

do you have other questions?