CyvBot 2.0

CyvBot est un bot discord que je développe, la version 1.0 était basée sur le très connu Red.

La version 2.0 de mon bot est une réécriture complète pour ne plus être dépendant de Red, contrairement à Red, ma version du bot n'utilise plus de fichier Json pour stocker des données mais a la place un serveur PostgreSQL.

Ce projet est pour le moment toujours propriétaire le temps que son développement soit bien plus avancé.
Pour le moment vous avez ici la version 2.0.3 du fichier principale.

import asyncio
import glob
import importlib
import inspect
import logging
import logging.handlers
import math
import os
import random
import sqlite3
import sys
import time
import traceback
import aiohttp
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from psycopg2.pool import ThreadedConnectionPool
from cogs.utils import checks
try:
from discord.ext import commands
import discord
from discord.ext.commands import help
except ImportError:
print("Discord.py n'est pas installé.\n")
sys.exit(1)
import datetime
from cogs.utils.options import Settings
db_name = "data/cyvbot.db"
description = '''CyvBot a discord Bot.'''
min_connections = 1
max_connections = 100
password_char = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*"
conn_pool = dict()
class CyvBotError(BaseException):
pass
class PrefixError(CyvBotError):
pass
class BadToken(CyvBotError):
pass
class CogNotFoundError(CyvBotError):
pass
class CogLoadError(CyvBotError):
pass
class CogUnloadError(CyvBotError):
pass
class OwnerUnloadWithoutReloadError(CogUnloadError):
def __init__(self):
default_message = "Can't unload the owner plugin :P"
super().__init__(default_message)
class Bot(commands.AutoShardedBot):
def __init__(self, *args, **kwargs):
try:
self.uptime = datetime.datetime.utcnow()
self.cyv_settings = Settings()
self.cyvbot_owner = self.cyv_settings.get_owner()
self.cyvbot_co_owner = self.cyv_settings.get_co_owner()
self.cyvbot_version = self.cyv_settings.get_version()
self.cyvbot_token = self.cyv_settings.get_token()
self.cyvbot_shutdown_mode = None
self.logger = set_logger()
super().__init__(*args, command_prefix=self.cyv_settings.get_prefix(), **kwargs)
except AttributeError:
print(traceback.format_exc())
input("Press any key to exit...")
exit(1)
@staticmethod
def discard():
"""return connection to pool...
Warning, a discarded connection is no longer usable and will throw errors.
discard only if you no longer need a postgres connection."""
frame = inspect.stack()[1]
filename = os.path.splitext(os.path.basename(frame.filename))[0]
if not conn_pool.get("cogs." + filename):
return False
db.putconn(conn_pool["cogs." + filename][0], close=False)
conn_pool["cogs." + filename] = None
@staticmethod
def try_discard(cogname):
"""try to return connection to pool if cogname is in conn_pool..."""
if not conn_pool.get(cogname):
return False
db.putconn(conn_pool[cogname][0], close=False)
conn_pool[cogname] = None
def reset(self):
for cogs in list(conn_pool):
try:
conn_pool[cogs][1].execute("select 1;")
except Exception:
self.try_discard(cogs)
return True
def get_conn(self, flname=None):
frame = inspect.stack()[1]
filename = os.path.splitext(os.path.basename(frame.filename))[0]
if filename == "cyvbot":
filename = flname
if not conn_pool.get("cogs." + filename):
conn_pool["cogs." + filename] = None
try:
if not conn_pool["cogs." + filename]:
conn = db.getconn()
cur = conn.cursor()
cur.execute(f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{filename}';")
req = cur.fetchall()
if len(req) == 0:
cur.execute(f'CREATE SCHEMA {filename} AUTHORIZATION "CyvBot";'
f'COMMENT ON SCHEMA {filename} IS '
f'\'{filename}.py schema\';')
conn.commit()
cur.execute(f"SET search_path TO {filename};")
cur.execute(f'SET application_name TO "CyvBot {self.cyv_settings.get_version()} - {filename}";')
conn_pool[f"cogs.{filename}"] = (conn, cur)
return conn, cur
return conn_pool["cogs." + filename]
except psycopg2.InternalError:
print(traceback.format_exc())
self.reset()
self.get_conn(filename)
async def shutdown(self, *, restart=False):
"""Quitte CyvBot avec le code 0
Si restart est "True" quitte avec le code 42."""
self.cyvbot_shutdown_mode = not restart
await self.logout()
def set_logger():
logger = logging.getLogger("cyvbot")
logger.setLevel(logging.INFO)
red_format = logging.Formatter(
'%(asctime)s %(levelname)s %(module)s %(funcName)s %(lineno)d: '
'%(message)s',
datefmt="[%d/%m/%Y %H:%M]")
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(red_format)
stdout_handler.setLevel(logging.INFO)
logger.setLevel(logging.INFO)
fhandler = logging.handlers.RotatingFileHandler(
filename='data/cyvbot/cyvbot.log', encoding='utf-8', mode='a',
maxBytes=10 ** 7, backupCount=5)
fhandler.setFormatter(red_format)
logger.addHandler(fhandler)
logger.addHandler(stdout_handler)
dpy_logger = logging.getLogger("discord")
dpy_logger.setLevel(logging.WARNING)
handler = logging.FileHandler(
filename='data/cyvbot/discord.log', encoding='utf-8', mode='a')
handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s %(module)s %(funcName)s %(lineno)d: '
'%(message)s',
datefmt="[%d/%m/%Y %H:%M]"))
dpy_logger.addHandler(handler)
return logger
def get_sqlite3_conn():
conn = sqlite3.connect(db_name)
cur = conn.cursor()
return conn, cur
def clear_screen():
if not os.name == "nt":
os.system("clear")
else:
os.system("cls")
def create_connection_string(data, no_ssl):
return ' '.join(['{0}={1}'.format(k, v) for (k, v) in data.items()]) + \
(" sslmode=require" if no_ssl else " sslmode=disable")
def get_conn(data, no_ssl=False):
connection_string = create_connection_string(data, no_ssl)
try:
return ThreadedConnectionPool(min_connections, max_connections, connection_string)
except psycopg2.OperationalError as e:
if str(e) == "server does not support SSL, but SSL was required\n":
get_conn(data, True)
print(traceback.format_exc())
input("Enter any letter to exit...")
exit(1)
def test_login(host, user, password, dbname, port):
data = {
"host": host,
"user": user,
"password": password,
"dbname": dbname,
"port": port
}
conn = get_conn(data)
if conn:
return conn
return False
def db_mode_manuel():
host_input = input("Host (default : localhost) ? ")
if len(host_input) > 1:
host = host_input
else:
print("utilisation du host par défaut.")
host = "localhost"
user = input("user ? ")
password = input("password ? ")
port_input = input("Port (default : 5432) ? ")
if len(port_input) > 1 and port_input.isdigit():
port = port_input
else:
print("port invalide, utilisation du port par défaut.")
port = "5432"
return test_login(host, user, password, "cyvbot", port), (host, user, password, "cyvbot", port)
def db_mode_auto():
host_input = input("Host (default : localhost) ? ")
if len(host_input) > 1:
host = host_input
else:
print("utilisation du host par défaut.")
host = "localhost"
password = input("password of postgres ? ")
port_input = input("Port (default : 5432) ? ")
if len(port_input) > 1 and port_input.isdigit():
port = port_input
elif len(port_input) == 0:
print("utilisation du port par défaut.")
port = "5432"
else:
print("port invalide, utilisation du port par défaut.")
port = "5432"
return test_login(host, "postgres", password, "postgres", port), (host, port)
def initiate():
sqlite3_conn = sqlite3.connect(db_name)
sqlite3_cur = sqlite3_conn.cursor()
sqlite3_cur.execute("""CREATE TABLE IF NOT EXISTS owner(owner INTEGER,
coowner INTEGER,
token TEXT,
prefix TEXT)""")
sqlite3_cur.execute("CREATE TABLE IF NOT EXISTS cogs(name TEXT, cog_status BOOLEAN, UNIQUE(name));")
sqlite3_cur.execute("CREATE TABLE IF NOT EXISTS version(major INTEGER, minor INTEGER, micro INTEGER, releaselevel "
"TEXT);")
sqlite3_cur.execute('CREATE TABLE IF NOT EXISTS postgres(host TEXT, "user" TEXT, password TEXT, dbname TEXT, '
'port TEXT);')
sqlite3_conn.commit()
data = sqlite3_cur.execute('select * from postgres').fetchall()
version_data = sqlite3_cur.execute('select * from version').fetchall()
_db = None
if len(version_data) == 0:
sqlite3_cur.execute("INSERT INTO version(major, minor, micro, releaselevel) VALUES (?, ?, ?, ?)",
(2, 0, 3, "dev"))
sqlite3_conn.commit()
if len(data) == 0:
print("Aucune donnée postgres...\n\nNous avons besoin d'initializer ces données, merci de répondre aux demande "
"suivante : \n")
auto_ask_check = True
while auto_ask_check:
auto_ask = input("Voulez vous une création automatique ou manuel ? (auto or manuel) ?")
if auto_ask in ["automatique", "auto", "manuel", "manu"]:
if auto_ask.lower() in ["manuel", "manu"]:
print("Mode manuel, merci de creer dans postgres : un utilisateur nommé `CyvBot`, une base de "
"données nommé `cyvbot`, et d'attribuer a la base de données `cyvbot` l'owner à `CyvBot`")
_db, info_data = db_mode_manuel()
if _db:
auto_ask_check = False
else:
clear_screen()
print("ERREUR, merci de réessayer")
elif auto_ask.lower() in ["automatique", "auto"]:
print("Mode automatique, merci de fournir les information suivante :")
_db, info_data = db_mode_auto()
if not _db:
clear_screen()
print("ERREUR, merci de réessayer")
else:
auto_ask_check = False
conn = _db.getconn()
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
password = ''.join(random.choice(password_char) for _ in range(20))
cur.execute("CREATE ROLE \"CyvBot\" WITH LOGIN NOSUPERUSER CREATEDB NOCREATEROLE INHERIT "
"NOREPLICATION CONNECTION LIMIT 100 PASSWORD %s; COMMENT ON ROLE \"CyvBot\" IS "
"'Cyvbot postgres user.';", (password,))
cur = conn.cursor()
cur.execute('CREATE DATABASE cyvbot WITH OWNER = postgres ENCODING = \'UTF8\' CONNECTION '
'LIMIT = 100')
cur.execute("COMMENT ON DATABASE cyvbot IS 'cyvbot''s database'")
cur.execute("GRANT ALL ON DATABASE cyvbot TO \"CyvBot\"")
cur.close()
conn.close()
info_data = (info_data[0], "CyvBot", password, "cyvbot", info_data[1])
sqlite3_cur.execute('INSERT INTO postgres(host, "user", password, dbname, port) VALUES '
'(?,?,?,?,?);', info_data)
sqlite3_conn.commit()
sqlite3_cur.close()
sqlite3_conn.close()
else:
data = data[0]
sql_data = {
"host": data[0],
"user": data[1],
"password": data[2],
"dbname": data[3],
"port": data[4]
}
_db = get_conn(sql_data)
return _db
def get_prefix():
conn, cur = get_sqlite3_conn()
try:
owner_table = cur.execute("SELECT * FROM owner").fetchall()
for item in owner_table:
if item[3] is not None:
return item[3]
raise PrefixError
return False
except (IndexError, PrefixError):
print("No prefix in data, migrating. base prefix is : /")
owner_table = cur.execute("SELECT * FROM owner").fetchall()
for item in owner_table:
cur.execute("DROP TABLE owner")
cur.execute("""CREATE TABLE IF NOT EXISTS owner(owner INTEGER,
coowner INTEGER,
token TEXT,
prefix TEXT)""")
cur.execute('INSERT INTO owner(owner, coowner, token, prefix) VALUES (?, ?, ?, ?);',
(item[0], item[1], item[2], "/",))
conn.commit()
cur.close()
conn.close()
def get_token():
conn, cur = get_sqlite3_conn()
owner_table = cur.execute("SELECT * FROM owner").fetchall()
for item in owner_table:
return list(item)
return []
def check_folders():
folders = ("data", "cogs", "cogs/utils")
for folder in folders:
if not os.path.exists(folder):
print("Creating " + folder + " folder...")
os.makedirs(folder)
def sql_to_dict(_, cur):
"""Cogs SQLITE3 TO Python DICTIONARY."""
lis = dict()
registry = cur.execute("select * from cogs").fetchall()
for item in registry:
if item[1] == 0:
statut = False
else:
statut = True
lis[item[0]] = statut
return lis
def list_cogs():
cogs = [os.path.basename(f) for f in glob.glob("cogs/*.py")]
return ["cogs." + os.path.splitext(f)[0] for f in cogs]
def does_cogfile_exist(module):
if "cogs." not in module:
module = "cogs." + module
if module not in list_cogs():
return False
return True
def load_cog(bot, cogname):
if not does_cogfile_exist(cogname):
raise CogNotFoundError(cogname)
try:
mod_obj = importlib.import_module(cogname)
importlib.reload(mod_obj)
bot.load_extension(mod_obj.__name__)
except SyntaxError as e:
raise CogLoadError(*e.args)
except Exception:
raise
def unload_cog(bot, cogname, reloading=False):
if not reloading and cogname == "cogs.cyvax":
raise OwnerUnloadWithoutReloadError
try:
bot.unload_extension(cogname)
bot.try_discard(cogname)
except Exception:
raise CogUnloadError
def load_cogs(bot):
defaults = ("cyvax", "cyv_general")
conn, cur = get_sqlite3_conn()
registry = sql_to_dict(conn, cur)
bot.load_extension('cogs.cyvax')
failed = []
extensions = list_cogs()
if not registry: # All default cogs enabled by default
for ext in defaults:
name = "cogs." + ext
cur.execute('INSERT INTO cogs(name, cog_status) VALUES (?,?);',
(name, True))
for extension in extensions:
if extension.lower() == "cogs.cyvax":
continue
to_load = registry.get(extension, False)
if to_load:
try:
load_cog(bot, extension)
except Exception as e:
print(f"{e.__class__.__name__}: {str(e)}")
bot.logger.exception(e)
failed.append(extension)
registry[extension] = False
for item in registry:
cog_name = item
cog_statut = registry[item]
cur.execute("UPDATE cogs SET cog_status = ? WHERE name = ?", (cog_statut, cog_name))
if failed:
print(f"\nFailed to load: {' '.join(failed)}\n")
conn.commit()
cur.close()
conn.close()
async def main(bot):
check_folders()
bot.remove_command("help")
load_cogs(bot)
print("Logging into Discord...")
bot.uptime = datetime.datetime.utcnow()
if bot.cyv_settings.login_credentials:
await bot.login(bot.cyvbot_token)
else:
print("No credentials available to login.")
raise RuntimeError()
await bot.connect()
def set_cog(cog, value):
conn, cur = get_sqlite3_conn()
exist = cur.execute('SELECT name, cog_status FROM cogs WHERE name= ?',
(cog,)).fetchall()
if exist:
cur.execute('UPDATE cogs SET cog_status = ? WHERE name = ?;',
(value, cog))
else:
cur.execute('INSERT INTO cogs(name, cog_status) VALUES (?,?);',
(cog, value))
conn.commit()
cur.close(), conn.close()
async def get_owner(token):
url = "https://discord.com/api/v7/oauth2/applications/@me"
header = {"User-Agent": "CyvBot (https://github.com/cyvax/CyvBot-ReWrite)",
"Authorization": f"Bot {token}"}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=header) as req:
if req.status == 401:
raise BadToken
cyv_bot = await req.json()
return cyv_bot["owner"]["id"]
async def check_token(token):
url = "https://discord.com/api/v7/oauth2/applications/@me"
header = {"User-Agent": "CyvBot (https://github.com/cyvax/CyvBot-ReWrite)",
"Authorization": f"Bot {token}"}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=header) as req:
if req.status == 401:
raise BadToken
return True
async def interactive_setup():
valid = False
while not valid:
try:
valid_token = False
token = False
print("CyvBot - First run configuration\n")
print("Welcome to the interactive setup !\n")
while not valid_token:
token = input(str("What is your bot token : "))
if len(token) > 50:
token_valid = await check_token(token)
valid_token = token_valid
else:
print("That doesn't look like a valid token.\n")
time.sleep(1)
conn, cur = get_sqlite3_conn()
asking_owner = input(str("The Owner will be the bot token owner did you agree ?"
"\nyes or no? (if no you need to provide owner id.) >"))
if asking_owner == "no":
owner_id = input(int("What is the owner id ?"))
else:
owner_id = await get_owner(token)
prefix = input(str("What prefix do you want to use ? (default : /)"))
if len(prefix) == 0:
prefix = "/"
cur.execute('INSERT INTO owner(owner, coowner, token, prefix) VALUES (?, ?, ?, ?);',
(owner_id, None, token, prefix))
conn.commit()
cur.close()
conn.close()
valid = True
print("Interactive setup is over thanks you !")
except BadToken:
print("This token is not valid. Try Again.")
def initialize(bot_class=Bot):
bot = bot_class(description=description, pm_help=None)
async def get_oauth_url():
try:
data = await bot.application_info()
except Exception as e:
return f"Couldn't retrieve invite link. Error: {e}"
return discord.utils.oauth_url(data.id)
@bot.event
async def on_ready():
users = len(set(bot.get_all_members()))
servers = len(bot.guilds)
channels = len([c for c in bot.get_all_channels()])
login_time = datetime.datetime.utcnow() - bot.uptime
login_time = login_time.seconds + login_time.microseconds / 1E6
total_cogs = len(list_cogs())
print(f"Login successful. ({login_time}ms)\n")
owner = bot.cyvbot_owner
co_owner = bot.cyvbot_co_owner
print(f'-------------------')
print(f'CyvBot {bot.cyv_settings.get_version()}')
print(f'-------------------')
print(f'{str(bot.user)}')
print(f'\nConnected to:')
print(f'{servers} servers')
print(f'{channels} channels')
print(f'{users} users\n')
print(f'Prefix: {bot_prefix}')
print(f'Owner: {str(owner)}')
if co_owner:
print(f'Co-Owner: {", ".join([str(ids) for ids in co_owner]) if len(co_owner) > 1 else co_owner[0]}')
print(f'Shards : {len(bot.shards)}')
print(f'{len(bot.cogs)}/{total_cogs} active cogs with {len(bot.commands)} commands')
print(f'-------------------')
print("\nUrl de cyvbot :")
url = await get_oauth_url()
bot.oauth_url = url
print(url)
@bot.command()
@checks.is_owner()
async def invite(ctx):
await ctx.author.send(await get_oauth_url())
@bot.command()
@checks.is_owner()
async def check(ctx):
import json
temp1 = dict()
for cogs in list(conn_pool.keys()):
if conn_pool[cogs]:
temp1[cogs] = (str(conn_pool[cogs][0]), str(conn_pool[cogs][1]))
else:
temp1[cogs] = None
await ctx.channel.send(f"```json\n{json.dumps(temp1, indent=4)}```")
@bot.event
async def on_command_error(ctx, error):
# if command has local error handler, return
if hasattr(ctx.command, 'on_error'):
return
# get the original exception
error = getattr(error, 'original', error)
if isinstance(error, commands.CommandNotFound):
return
if isinstance(error, commands.BotMissingPermissions):
missing = [perm.replace('_', ' ').replace('guild', 'server').title() for perm in error.missing_perms]
if len(missing) > 2:
bmp = f'{"**, **".join(missing[:-1])}, et {missing[-1]}'
else:
bmp = ' et '.join(missing)
await ctx.send(f'J\'ai besoin des permissions **{bmp}** pour utiliser cette commande.')
return
if isinstance(error, commands.DisabledCommand):
await ctx.send('Cette commande est désactivé.')
return
if isinstance(error, commands.CommandOnCooldown):
if checks.is_owner():
await ctx.reinvoke()
return
await ctx.send(f"Cette commande est en Cooldown, merci de réessayer dans :"
f" {math.ceil(error.retry_after)}s.")
return
if isinstance(error, commands.MissingPermissions):
missing = [perm.replace('_', ' ').replace('guild', 'server').title() for perm in error.missing_perms]
if len(missing) > 2:
bmp = f'{"**, **".join(missing[:-1])}, et {missing[-1]}'
else:
bmp = ' et '.join(missing)
_message = f'Vous avez besoin des permissions suivante : **{bmp}** pour utiliser cette commande.'
await ctx.send(_message)
return
if isinstance(error, commands.UserInputError):
await ctx.send_help(ctx.command)
return
if isinstance(error, commands.NoPrivateMessage):
try:
await ctx.author.send('Cette commande ne peut pas être utilisée dans les MP.')
except discord.Forbidden:
pass
return
if isinstance(error, commands.CheckFailure):
await ctx.send("Vous n'avez pas la permission d'utiliser cette commandes.")
return
if isinstance(error, checks.NSFWFailure):
await ctx.send("Ce channel n'est pas un channel NSFW, cette commande ne peut donc pas s'exécuter.")
return
# ignore all other exception types, but print them to stderr
print(f'Ignoring exception in command {ctx.command}:', file=sys.stderr)
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
bot.reset()
return bot
if __name__ == '__main__':
check_folders()
db = initiate()
bot_token = get_token()
bot_prefix = get_prefix()
loop = asyncio.get_event_loop()
if len(bot_token) == 0:
loop.run_until_complete(interactive_setup())
bot_token = get_token()
bot_prefix = get_prefix()
client = initialize()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(client))
except discord.LoginFailure:
client.logger.error(traceback.format_exc())
print("Impossible de se connecter a discord...\n"
"Suppression de la valeur token...")
sqlite_conn, sqlite_cursor = get_sqlite3_conn()
sqlite_cursor.execute("DELETE FROM owner where token is not null")
sqlite_conn.commit()
sqlite_cursor.close()
sqlite_conn.close()
except Exception:
client.logger.error(traceback.format_exc())
loop.run_until_complete(client.logout())
finally:
loop.close()
if client.cyvbot_shutdown_mode is True:
exit(0)
elif client.cyvbot_shutdown_mode is False:
exit(42) # Restart
else:
exit(1)
view raw cyvbot.py hosted with ❤ by GitHub

cyvax - 2024