import os
import disnake
from disnake.ext import commands, tasks
import requests
from utils.database import DataBase
class Twitch(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.db = DataBase()
self.client_id = os.getenv('TWITCH_CLIENT_ID')
self.oauth_token = os.getenv('TWITCH_ACCESS_TOKEN')
self.discord_channel_id = 1177508121078398977
self.twitch_username = None
self.is_streaming = False
self.stream_check.start()
def get_stream_data(self):
url = f"https://api.twitch.tv/helix/streams?user_login={self.twitch_username}"
headers = {
"Client-ID": self.client_id,
"Authorization": f"Bearer {self.oauth_token}"
}
response = requests.get(url, headers=headers)
data = response.json()
return data
async def send_stream_notification(self):
twitch_users = await self.db.get_twitch_users()
for self.twitch_username in twitch_users:
data = self.get_stream_data()
if "data" in data and data["data"]:
if not self.is_streaming:
self.is_streaming = True
stream = data["data"][0]
stream_title = stream["title"]
stream_url = f"https://twitch.tv/{self.twitch_username}"
viewers = stream["viewer_count"]
avatar_url = stream["thumbnail_url"].replace("{width}", "1280").replace("{height}", "720")
embed = disnake.Embed(title="Начался стрим!",
description=f"Пользователь **{self.twitch_username}** начал стримить!",
color=disnake.Color.purple())
embed.set_image(url=avatar_url)
embed.add_field(name="Название стрима", value=stream_title, inline=False)
embed.add_field(name="Количество зрителей", value=viewers, inline=False)
embed.add_field(name="Ссылка на стрим", value=f"[Смотреть стрим]({stream_url})", inline=False)
await self.bot.get_channel(self.discord_channel_id).send(embed=embed)
else:
self.is_streaming = False
@tasks.loop(seconds=60)
async def stream_check(self):
await self.send_stream_notification()
@stream_check.before_loop
async def before_stream_check(self):
await self.bot.wait_until_ready()
def setup(bot):
bot.add_cog(Twitch(bot))```
#event for receiving notification of the start of the stream on twitch there are no errors but it doe
1 messages · Page 1 of 1 (latest)
from disnake.ext import tasks, commands
class MyCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.index = 0
self.printer.start()
def cog_unload(self):
self.printer.cancel()
@tasks.loop(seconds=5.0)
async def printer(self):
print(self.index)
self.index += 1
@printer.before_loop
async def before_printer(self):
print('waiting...')
await self.bot.wait_until_ready()
@printer.error
async def printer_error(self, error):
formatted = "".join(
traceback.format_exception(type(error), error, error.__traceback__))
await self.bot.get_channel(channel_id).send(formatted)
def setup(bot):
bot.add_cog(MyCog(bot))```
use error handler for ur task
dont use requests
use aiohttp instead
In asynchronous programming a blocking call is essentially all the parts of the function that are not await. Do not despair however, because not all forms of blocking are bad! Using blocking calls is inevitable, but you must work to make sure that you don't excessively block functions. Remember, if you block for too long then your bot will freeze since it has not stopped the function's execution at that point to do other things.
If logging is enabled, this library will attempt to warn you that blocking is occurring with the message: Heartbeat blocked for more than N seconds. See Setting Up Logging for details on enabling logging.
A common source of blocking for too long is something like time.sleep. Don't do that. Use asyncio.sleep instead. Similar to this example:
# Bad
time.sleep(10)
# Good
await asyncio.sleep(10)
Another common source of blocking for too long is using HTTP requests with the famous module Requests: HTTP for Humans™. While Requests: HTTP for Humans™ is an amazing module for non-asynchronous programming, it is not a good choice for asyncio because certain requests can block the event loop too long. Instead, use the aiohttp library which is installed on the side with this library.
Consider the following example:
# Bad
r = requests.get("http://aws.random.cat/meow")
if r.status_code == 200:
json = r.json()
await channel.send(json["file"])
# Good
async with aiohttp.ClientSession() as session:
async with session.get("http://aws.random.cat/meow") as r:
if r.status == 200:
json = await r.json()
await channel.send(json["file"])
well, on aiohttp, what's the joke anyway doesn't work
import os
import disnake
from disnake.ext import commands, tasks
import aiohttp
from utils.database import DataBase
class Twitch(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.db = DataBase()
self.client_id = os.getenv('TWITCH_CLIENT_ID')
self.oauth_token = os.getenv('TWITCH_ACCESS_TOKEN')
self.discord_channel_id = 1177508121078398977
self.twitch_usernames = [] # Изменил на список для поддержки нескольких пользователей
self.is_streaming = {}
self.stream_check.start()
async def get_stream_data(self, username):
url = f"https://api.twitch.tv/helix/streams?user_login={username}"
headers = {
"Client-ID": self.client_id,
"Authorization": f"Bearer {self.oauth_token}"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
data = await response.json()
return data
async def send_stream_notification(self, username):
data = await self.get_stream_data(username)
if "data" in data and data["data"]:
if username not in self.is_streaming or not self.is_streaming[username]:
self.is_streaming[username] = True
stream = data["data"][0]
stream_title = stream["title"]
stream_url = f"https://twitch.tv/{username}"
viewers = stream["viewer_count"]
avatar_url = stream["thumbnail_url"].replace("{width}", "1280").replace("{height}", "720")
embed = disnake.Embed(title="Начался стрим!",
description=f"Пользователь **{username}** начал стримить!",
color=disnake.Color.purple())
embed.set_image(url=avatar_url)
embed.add_field(name="Название стрима", value=stream_title, inline=False)
embed.add_field(name="Количество зрителей", value=viewers, inline=False)
embed.add_field(name="Ссылка на стрим", value=f"[Смотреть стрим]({stream_url})", inline=False)
await self.bot.get_channel(self.discord_channel_id).send(embed=embed)
else:
self.is_streaming[username] = False
@tasks.loop(seconds=10)
async def stream_check(self):
for username in self.twitch_usernames:
await self.send_stream_notification(username)
@stream_check.before_loop
async def before_task(self):
await self.bot.wait_until_ready()
def setup(bot):
bot.add_cog(Twitch(bot))```
@wicked robin ?
you just ignore my messages?
No, I just tried this method already and didn't find any mistakes.
what method?
@tasks.loop(seconds=10)
async def stream_check(self):
for username in self.twitch_usernames:
try:
await self.send_stream_notification(username)
except Exception as e:
print(f"{e}")```
the joke is that the code does not give an error
data = await response.json()
print(data)
still empty, lol
from motor.motor_asyncio import AsyncIOMotorClient
import os
class DataBase:
def __init__(self):
try:
self.cluster = AsyncIOMotorClient(os.getenv("MONGODB_LINK"))
self.twitch = self.cluster.ark.twitch
except Exception as e:
print(f"Error connecting to MongoDB: {e}")
async def add_twitch_user(self, username: str):
try:
await self.twitch.insert_one({"username": username})
print(f"Added Twitch user: {username}")
except Exception as e:
print(f"Error adding Twitch user: {e}")
async def get_twitch_users(self):
try:
cursor = self.twitch.find({})
users = [document["username"] async for document in cursor]
return users
except Exception as e:
print(f"Error getting Twitch users: {e}")
return []
def close_connection(self):
try:
self.cluster.close()
print("Closed MongoDB connection")
except Exception as e:
print(f"Error closing MongoDB connection: {e}")
There are no errors in the database like
just run function separately, without any task
maybe voice?
no, lol
sorry
its ok
LOL there are zero errors
there are no errors but it doesn't work either
import os
import disnake
import aiohttp
from utils.database import DataBase
from disnake.ext import commands
class Twitch(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.db = DataBase()
self.client_id = os.getenv('TWITCH_CLIENT_ID')
self.oauth_token = os.getenv('TWITCH_ACCESS_TOKEN')
self.discord_channel_id = 1177508121078398977
self.twitch_usernames = [] # Изменил на список для поддержки нескольких пользователей
self.is_streaming = {}
async def get_stream_data(self, username):
url = f"https://api.twitch.tv/helix/streams?user_login={username}"
headers = {
"Client-ID": self.client_id,
"Authorization": f"Bearer {self.oauth_token}"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
data = await response.json()
print(data)
return data
async def send_stream_notification(self, username):
data = await self.get_stream_data(username)
if "data" in data and data["data"]:
if username not in self.is_streaming or not self.is_streaming[username]:
self.is_streaming[username] = True
stream = data["data"][0]
stream_title = stream["title"]
stream_url = f"https://twitch.tv/{username}"
viewers = stream["viewer_count"]
avatar_url = stream["thumbnail_url"].replace("{width}", "1280").replace("{height}", "720")
embed = disnake.Embed(title="Начался стрим!",
description=f"Пользователь **{username}** начал стримить!",
color=disnake.Color.purple())
embed.set_image(url=avatar_url)
embed.add_field(name="Название стрима", value=stream_title, inline=False)
embed.add_field(name="Количество зрителей", value=viewers, inline=False)
embed.add_field(name="Ссылка на стрим", value=f"[Смотреть стрим]({stream_url})", inline=False)
await self.bot.get_channel(self.discord_channel_id).send(embed=embed)
else:
self.is_streaming[username] = False
async def check_streams(self):
for username in self.twitch_usernames:
try:
await self.send_stream_notification(username)
except Exception as e:
print(f"{e}")
def setup(bot):
bot.add_cog(Twitch(bot))
did u ran this function separately?
yes
are you sure?
I checked the code separately, it didn't show anything
only via aiohttp
show
I do not know why it does not work for me
show how did u ran the code
import aiohttp
import os
client_id = os.getenv('TWITCH_CLIENT_ID')
os.getenv('TWITCH_ACCESS_TOKEN')
async def get_stream_data(self, username):
url = f"https://api.twitch.tv/helix/streams?user_login=art1ord"
headers = {
"Client-ID": self.client_id,
"Authorization": f"Bearer {self.oauth_token}"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
data = await response.json()
print(data)
something like this
url = "https://api.twitch.tv/helix/streams?user_login=mande"
headers = {
"Client-ID": "dfhgdh",
"Authorization": "Bearer dgfdfgd"
}
async with aiohttp.request("GET", url=url, headers=headers) as response:
data = await response.json()
print(data)
return data```
0 problems here
bcz ur function don't start never
lol
my mistake, I threw the wrong screen, I just have 6 in the morning already