CyvBot est un bot discord que je développe, la version 1.0 était basée sur le très connu Red.
La version 2.0 de mon bot est une réécriture complète pour ne plus être dépendant de Red, contrairement à Red, ma version du bot n'utilise plus de fichier Json pour stocker des données mais a la place un serveur PostgreSQL.
Ce projet est pour le moment toujours propriétaire le temps que son développement soit bien plus avancé.
Pour le moment vous avez ici la version 2.0.3 du fichier principale.
import asyncio | |
import glob | |
import importlib | |
import inspect | |
import logging | |
import logging.handlers | |
import math | |
import os | |
import random | |
import sqlite3 | |
import sys | |
import time | |
import traceback | |
import aiohttp | |
import psycopg2 | |
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT | |
from psycopg2.pool import ThreadedConnectionPool | |
from cogs.utils import checks | |
try: | |
from discord.ext import commands | |
import discord | |
from discord.ext.commands import help | |
except ImportError: | |
print(" n'est pas installé.\n") | |
sys.exit(1) | |
import datetime | |
from cogs.utils.options import Settings | |
db_name = "data/cyvbot.db" | |
description = '''CyvBot a discord Bot.''' | |
min_connections = 1 | |
max_connections = 100 | |
password_char = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*" | |
conn_pool = dict() | |
class CyvBotError(BaseException): | |
pass | |
class PrefixError(CyvBotError): | |
pass | |
class BadToken(CyvBotError): | |
pass | |
class CogNotFoundError(CyvBotError): | |
pass | |
class CogLoadError(CyvBotError): | |
pass | |
class CogUnloadError(CyvBotError): | |
pass | |
class OwnerUnloadWithoutReloadError(CogUnloadError): | |
def __init__(self): | |
default_message = "Can't unload the owner plugin :P" | |
super().__init__(default_message) | |
class Bot(commands.AutoShardedBot): | |
def __init__(self, *args, **kwargs): | |
try: | |
self.uptime = datetime.datetime.utcnow() | |
self.cyv_settings = Settings() | |
self.cyvbot_owner = self.cyv_settings.get_owner() | |
self.cyvbot_co_owner = self.cyv_settings.get_co_owner() | |
self.cyvbot_version = self.cyv_settings.get_version() | |
self.cyvbot_token = self.cyv_settings.get_token() | |
self.cyvbot_shutdown_mode = None | |
self.logger = set_logger() | |
super().__init__(*args, command_prefix=self.cyv_settings.get_prefix(), **kwargs) | |
except AttributeError: | |
print(traceback.format_exc()) | |
input("Press any key to exit...") | |
exit(1) | |
@staticmethod | |
def discard(): | |
"""return connection to pool... | |
Warning, a discarded connection is no longer usable and will throw errors. | |
discard only if you no longer need a postgres connection.""" | |
frame = inspect.stack()[1] | |
filename = os.path.splitext(os.path.basename(frame.filename))[0] | |
if not conn_pool.get("cogs." + filename): | |
return False | |
db.putconn(conn_pool["cogs." + filename][0], close=False) | |
conn_pool["cogs." + filename] = None | |
@staticmethod | |
def try_discard(cogname): | |
"""try to return connection to pool if cogname is in conn_pool...""" | |
if not conn_pool.get(cogname): | |
return False | |
db.putconn(conn_pool[cogname][0], close=False) | |
conn_pool[cogname] = None | |
def reset(self): | |
for cogs in list(conn_pool): | |
try: | |
conn_pool[cogs][1].execute("select 1;") | |
except Exception: | |
self.try_discard(cogs) | |
return True | |
def get_conn(self, flname=None): | |
frame = inspect.stack()[1] | |
filename = os.path.splitext(os.path.basename(frame.filename))[0] | |
if filename == "cyvbot": | |
filename = flname | |
if not conn_pool.get("cogs." + filename): | |
conn_pool["cogs." + filename] = None | |
try: | |
if not conn_pool["cogs." + filename]: | |
conn = db.getconn() | |
cur = conn.cursor() | |
cur.execute(f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{filename}';") | |
req = cur.fetchall() | |
if len(req) == 0: | |
cur.execute(f'CREATE SCHEMA {filename} AUTHORIZATION "CyvBot";' | |
f'COMMENT ON SCHEMA {filename} IS ' | |
f'\'{filename}.py schema\';') | |
conn.commit() | |
cur.execute(f"SET search_path TO {filename};") | |
cur.execute(f'SET application_name TO "CyvBot {self.cyv_settings.get_version()} - {filename}";') | |
conn_pool[f"cogs.{filename}"] = (conn, cur) | |
return conn, cur | |
return conn_pool["cogs." + filename] | |
except psycopg2.InternalError: | |
print(traceback.format_exc()) | |
self.reset() | |
self.get_conn(filename) | |
async def shutdown(self, *, restart=False): | |
"""Quitte CyvBot avec le code 0 | |
Si restart est "True" quitte avec le code 42.""" | |
self.cyvbot_shutdown_mode = not restart | |
await self.logout() | |
def set_logger(): | |
logger = logging.getLogger("cyvbot") | |
logger.setLevel(logging.INFO) | |
red_format = logging.Formatter( | |
'%(asctime)s %(levelname)s %(module)s %(funcName)s %(lineno)d: ' | |
'%(message)s', | |
datefmt="[%d/%m/%Y %H:%M]") | |
stdout_handler = logging.StreamHandler(sys.stdout) | |
stdout_handler.setFormatter(red_format) | |
stdout_handler.setLevel(logging.INFO) | |
logger.setLevel(logging.INFO) | |
fhandler = logging.handlers.RotatingFileHandler( | |
filename='data/cyvbot/cyvbot.log', encoding='utf-8', mode='a', | |
maxBytes=10 ** 7, backupCount=5) | |
fhandler.setFormatter(red_format) | |
logger.addHandler(fhandler) | |
logger.addHandler(stdout_handler) | |
dpy_logger = logging.getLogger("discord") | |
dpy_logger.setLevel(logging.WARNING) | |
handler = logging.FileHandler( | |
filename='data/cyvbot/discord.log', encoding='utf-8', mode='a') | |
handler.setFormatter(logging.Formatter( | |
'%(asctime)s %(levelname)s %(module)s %(funcName)s %(lineno)d: ' | |
'%(message)s', | |
datefmt="[%d/%m/%Y %H:%M]")) | |
dpy_logger.addHandler(handler) | |
return logger | |
def get_sqlite3_conn(): | |
conn = sqlite3.connect(db_name) | |
cur = conn.cursor() | |
return conn, cur | |
def clear_screen(): | |
if not == "nt": | |
os.system("clear") | |
else: | |
os.system("cls") | |
def create_connection_string(data, no_ssl): | |
return ' '.join(['{0}={1}'.format(k, v) for (k, v) in data.items()]) + \ | |
(" sslmode=require" if no_ssl else " sslmode=disable") | |
def get_conn(data, no_ssl=False): | |
connection_string = create_connection_string(data, no_ssl) | |
try: | |
return ThreadedConnectionPool(min_connections, max_connections, connection_string) | |
except psycopg2.OperationalError as e: | |
if str(e) == "server does not support SSL, but SSL was required\n": | |
get_conn(data, True) | |
print(traceback.format_exc()) | |
input("Enter any letter to exit...") | |
exit(1) | |
def test_login(host, user, password, dbname, port): | |
data = { | |
"host": host, | |
"user": user, | |
"password": password, | |
"dbname": dbname, | |
"port": port | |
} | |
conn = get_conn(data) | |
if conn: | |
return conn | |
return False | |
def db_mode_manuel(): | |
host_input = input("Host (default : localhost) ? ") | |
if len(host_input) > 1: | |
host = host_input | |
else: | |
print("utilisation du host par défaut.") | |
host = "localhost" | |
user = input("user ? ") | |
password = input("password ? ") | |
port_input = input("Port (default : 5432) ? ") | |
if len(port_input) > 1 and port_input.isdigit(): | |
port = port_input | |
else: | |
print("port invalide, utilisation du port par défaut.") | |
port = "5432" | |
return test_login(host, user, password, "cyvbot", port), (host, user, password, "cyvbot", port) | |
def db_mode_auto(): | |
host_input = input("Host (default : localhost) ? ") | |
if len(host_input) > 1: | |
host = host_input | |
else: | |
print("utilisation du host par défaut.") | |
host = "localhost" | |
password = input("password of postgres ? ") | |
port_input = input("Port (default : 5432) ? ") | |
if len(port_input) > 1 and port_input.isdigit(): | |
port = port_input | |
elif len(port_input) == 0: | |
print("utilisation du port par défaut.") | |
port = "5432" | |
else: | |
print("port invalide, utilisation du port par défaut.") | |
port = "5432" | |
return test_login(host, "postgres", password, "postgres", port), (host, port) | |
def initiate(): | |
sqlite3_conn = sqlite3.connect(db_name) | |
sqlite3_cur = sqlite3_conn.cursor() | |
sqlite3_cur.execute("""CREATE TABLE IF NOT EXISTS owner(owner INTEGER, | |
coowner INTEGER, | |
token TEXT, | |
prefix TEXT)""") | |
sqlite3_cur.execute("CREATE TABLE IF NOT EXISTS cogs(name TEXT, cog_status BOOLEAN, UNIQUE(name));") | |
sqlite3_cur.execute("CREATE TABLE IF NOT EXISTS version(major INTEGER, minor INTEGER, micro INTEGER, releaselevel " | |
"TEXT);") | |
sqlite3_cur.execute('CREATE TABLE IF NOT EXISTS postgres(host TEXT, "user" TEXT, password TEXT, dbname TEXT, ' | |
'port TEXT);') | |
sqlite3_conn.commit() | |
data = sqlite3_cur.execute('select * from postgres').fetchall() | |
version_data = sqlite3_cur.execute('select * from version').fetchall() | |
_db = None | |
if len(version_data) == 0: | |
sqlite3_cur.execute("INSERT INTO version(major, minor, micro, releaselevel) VALUES (?, ?, ?, ?)", | |
(2, 0, 3, "dev")) | |
sqlite3_conn.commit() | |
if len(data) == 0: | |
print("Aucune donnée postgres...\n\nNous avons besoin d'initializer ces données, merci de répondre aux demande " | |
"suivante : \n") | |
auto_ask_check = True | |
while auto_ask_check: | |
auto_ask = input("Voulez vous une création automatique ou manuel ? (auto or manuel) ?") | |
if auto_ask in ["automatique", "auto", "manuel", "manu"]: | |
if auto_ask.lower() in ["manuel", "manu"]: | |
print("Mode manuel, merci de creer dans postgres : un utilisateur nommé `CyvBot`, une base de " | |
"données nommé `cyvbot`, et d'attribuer a la base de données `cyvbot` l'owner à `CyvBot`") | |
_db, info_data = db_mode_manuel() | |
if _db: | |
auto_ask_check = False | |
else: | |
clear_screen() | |
print("ERREUR, merci de réessayer") | |
elif auto_ask.lower() in ["automatique", "auto"]: | |
print("Mode automatique, merci de fournir les information suivante :") | |
_db, info_data = db_mode_auto() | |
if not _db: | |
clear_screen() | |
print("ERREUR, merci de réessayer") | |
else: | |
auto_ask_check = False | |
conn = _db.getconn() | |
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) | |
cur = conn.cursor() | |
password = ''.join(random.choice(password_char) for _ in range(20)) | |
"'Cyvbot postgres user.';", (password,)) | |
cur = conn.cursor() | |
cur.execute('CREATE DATABASE cyvbot WITH OWNER = postgres ENCODING = \'UTF8\' CONNECTION ' | |
'LIMIT = 100') | |
cur.execute("COMMENT ON DATABASE cyvbot IS 'cyvbot''s database'") | |
cur.execute("GRANT ALL ON DATABASE cyvbot TO \"CyvBot\"") | |
cur.close() | |
conn.close() | |
info_data = (info_data[0], "CyvBot", password, "cyvbot", info_data[1]) | |
sqlite3_cur.execute('INSERT INTO postgres(host, "user", password, dbname, port) VALUES ' | |
'(?,?,?,?,?);', info_data) | |
sqlite3_conn.commit() | |
sqlite3_cur.close() | |
sqlite3_conn.close() | |
else: | |
data = data[0] | |
sql_data = { | |
"host": data[0], | |
"user": data[1], | |
"password": data[2], | |
"dbname": data[3], | |
"port": data[4] | |
} | |
_db = get_conn(sql_data) | |
return _db | |
def get_prefix(): | |
conn, cur = get_sqlite3_conn() | |
try: | |
owner_table = cur.execute("SELECT * FROM owner").fetchall() | |
for item in owner_table: | |
if item[3] is not None: | |
return item[3] | |
raise PrefixError | |
return False | |
except (IndexError, PrefixError): | |
print("No prefix in data, migrating. base prefix is : /") | |
owner_table = cur.execute("SELECT * FROM owner").fetchall() | |
for item in owner_table: | |
cur.execute("DROP TABLE owner") | |
cur.execute("""CREATE TABLE IF NOT EXISTS owner(owner INTEGER, | |
coowner INTEGER, | |
token TEXT, | |
prefix TEXT)""") | |
cur.execute('INSERT INTO owner(owner, coowner, token, prefix) VALUES (?, ?, ?, ?);', | |
(item[0], item[1], item[2], "/",)) | |
conn.commit() | |
cur.close() | |
conn.close() | |
def get_token(): | |
conn, cur = get_sqlite3_conn() | |
owner_table = cur.execute("SELECT * FROM owner").fetchall() | |
for item in owner_table: | |
return list(item) | |
return [] | |
def check_folders(): | |
folders = ("data", "cogs", "cogs/utils") | |
for folder in folders: | |
if not os.path.exists(folder): | |
print("Creating " + folder + " folder...") | |
os.makedirs(folder) | |
def sql_to_dict(_, cur): | |
"""Cogs SQLITE3 TO Python DICTIONARY.""" | |
lis = dict() | |
registry = cur.execute("select * from cogs").fetchall() | |
for item in registry: | |
if item[1] == 0: | |
statut = False | |
else: | |
statut = True | |
lis[item[0]] = statut | |
return lis | |
def list_cogs(): | |
cogs = [os.path.basename(f) for f in glob.glob("cogs/*.py")] | |
return ["cogs." + os.path.splitext(f)[0] for f in cogs] | |
def does_cogfile_exist(module): | |
if "cogs." not in module: | |
module = "cogs." + module | |
if module not in list_cogs(): | |
return False | |
return True | |
def load_cog(bot, cogname): | |
if not does_cogfile_exist(cogname): | |
raise CogNotFoundError(cogname) | |
try: | |
mod_obj = importlib.import_module(cogname) | |
importlib.reload(mod_obj) | |
bot.load_extension(mod_obj.__name__) | |
except SyntaxError as e: | |
raise CogLoadError(*e.args) | |
except Exception: | |
raise | |
def unload_cog(bot, cogname, reloading=False): | |
if not reloading and cogname == "cogs.cyvax": | |
raise OwnerUnloadWithoutReloadError | |
try: | |
bot.unload_extension(cogname) | |
bot.try_discard(cogname) | |
except Exception: | |
raise CogUnloadError | |
def load_cogs(bot): | |
defaults = ("cyvax", "cyv_general") | |
conn, cur = get_sqlite3_conn() | |
registry = sql_to_dict(conn, cur) | |
bot.load_extension('cogs.cyvax') | |
failed = [] | |
extensions = list_cogs() | |
if not registry: # All default cogs enabled by default | |
for ext in defaults: | |
name = "cogs." + ext | |
cur.execute('INSERT INTO cogs(name, cog_status) VALUES (?,?);', | |
(name, True)) | |
for extension in extensions: | |
if extension.lower() == "cogs.cyvax": | |
continue | |
to_load = registry.get(extension, False) | |
if to_load: | |
try: | |
load_cog(bot, extension) | |
except Exception as e: | |
print(f"{e.__class__.__name__}: {str(e)}") | |
bot.logger.exception(e) | |
failed.append(extension) | |
registry[extension] = False | |
for item in registry: | |
cog_name = item | |
cog_statut = registry[item] | |
cur.execute("UPDATE cogs SET cog_status = ? WHERE name = ?", (cog_statut, cog_name)) | |
if failed: | |
print(f"\nFailed to load: {' '.join(failed)}\n") | |
conn.commit() | |
cur.close() | |
conn.close() | |
async def main(bot): | |
check_folders() | |
bot.remove_command("help") | |
load_cogs(bot) | |
print("Logging into Discord...") | |
bot.uptime = datetime.datetime.utcnow() | |
if bot.cyv_settings.login_credentials: | |
await bot.login(bot.cyvbot_token) | |
else: | |
print("No credentials available to login.") | |
raise RuntimeError() | |
await bot.connect() | |
def set_cog(cog, value): | |
conn, cur = get_sqlite3_conn() | |
exist = cur.execute('SELECT name, cog_status FROM cogs WHERE name= ?', | |
(cog,)).fetchall() | |
if exist: | |
cur.execute('UPDATE cogs SET cog_status = ? WHERE name = ?;', | |
(value, cog)) | |
else: | |
cur.execute('INSERT INTO cogs(name, cog_status) VALUES (?,?);', | |
(cog, value)) | |
conn.commit() | |
cur.close(), conn.close() | |
async def get_owner(token): | |
url = "" | |
header = {"User-Agent": "CyvBot (", | |
"Authorization": f"Bot {token}"} | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, headers=header) as req: | |
if req.status == 401: | |
raise BadToken | |
cyv_bot = await req.json() | |
return cyv_bot["owner"]["id"] | |
async def check_token(token): | |
url = "" | |
header = {"User-Agent": "CyvBot (", | |
"Authorization": f"Bot {token}"} | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, headers=header) as req: | |
if req.status == 401: | |
raise BadToken | |
return True | |
async def interactive_setup(): | |
valid = False | |
while not valid: | |
try: | |
valid_token = False | |
token = False | |
print("CyvBot - First run configuration\n") | |
print("Welcome to the interactive setup !\n") | |
while not valid_token: | |
token = input(str("What is your bot token : ")) | |
if len(token) > 50: | |
token_valid = await check_token(token) | |
valid_token = token_valid | |
else: | |
print("That doesn't look like a valid token.\n") | |
time.sleep(1) | |
conn, cur = get_sqlite3_conn() | |
asking_owner = input(str("The Owner will be the bot token owner did you agree ?" | |
"\nyes or no? (if no you need to provide owner id.) >")) | |
if asking_owner == "no": | |
owner_id = input(int("What is the owner id ?")) | |
else: | |
owner_id = await get_owner(token) | |
prefix = input(str("What prefix do you want to use ? (default : /)")) | |
if len(prefix) == 0: | |
prefix = "/" | |
cur.execute('INSERT INTO owner(owner, coowner, token, prefix) VALUES (?, ?, ?, ?);', | |
(owner_id, None, token, prefix)) | |
conn.commit() | |
cur.close() | |
conn.close() | |
valid = True | |
print("Interactive setup is over thanks you !") | |
except BadToken: | |
print("This token is not valid. Try Again.") | |
def initialize(bot_class=Bot): | |
bot = bot_class(description=description, pm_help=None) | |
async def get_oauth_url(): | |
try: | |
data = await bot.application_info() | |
except Exception as e: | |
return f"Couldn't retrieve invite link. Error: {e}" | |
return discord.utils.oauth_url( | |
@bot.event | |
async def on_ready(): | |
users = len(set(bot.get_all_members())) | |
servers = len(bot.guilds) | |
channels = len([c for c in bot.get_all_channels()]) | |
login_time = datetime.datetime.utcnow() - bot.uptime | |
login_time = login_time.seconds + login_time.microseconds / 1E6 | |
total_cogs = len(list_cogs()) | |
print(f"Login successful. ({login_time}ms)\n") | |
owner = bot.cyvbot_owner | |
co_owner = bot.cyvbot_co_owner | |
print(f'-------------------') | |
print(f'CyvBot {bot.cyv_settings.get_version()}') | |
print(f'-------------------') | |
print(f'{str(bot.user)}') | |
print(f'\nConnected to:') | |
print(f'{servers} servers') | |
print(f'{channels} channels') | |
print(f'{users} users\n') | |
print(f'Prefix: {bot_prefix}') | |
print(f'Owner: {str(owner)}') | |
if co_owner: | |
print(f'Co-Owner: {", ".join([str(ids) for ids in co_owner]) if len(co_owner) > 1 else co_owner[0]}') | |
print(f'Shards : {len(bot.shards)}') | |
print(f'{len(bot.cogs)}/{total_cogs} active cogs with {len(bot.commands)} commands') | |
print(f'-------------------') | |
print("\nUrl de cyvbot :") | |
url = await get_oauth_url() | |
bot.oauth_url = url | |
print(url) | |
@bot.command() | |
@checks.is_owner() | |
async def invite(ctx): | |
await get_oauth_url()) | |
@bot.command() | |
@checks.is_owner() | |
async def check(ctx): | |
import json | |
temp1 = dict() | |
for cogs in list(conn_pool.keys()): | |
if conn_pool[cogs]: | |
temp1[cogs] = (str(conn_pool[cogs][0]), str(conn_pool[cogs][1])) | |
else: | |
temp1[cogs] = None | |
await"```json\n{json.dumps(temp1, indent=4)}```") | |
@bot.event | |
async def on_command_error(ctx, error): | |
# if command has local error handler, return | |
if hasattr(ctx.command, 'on_error'): | |
return | |
# get the original exception | |
error = getattr(error, 'original', error) | |
if isinstance(error, commands.CommandNotFound): | |
return | |
if isinstance(error, commands.BotMissingPermissions): | |
missing = [perm.replace('_', ' ').replace('guild', 'server').title() for perm in error.missing_perms] | |
if len(missing) > 2: | |
bmp = f'{"**, **".join(missing[:-1])}, et {missing[-1]}' | |
else: | |
bmp = ' et '.join(missing) | |
await ctx.send(f'J\'ai besoin des permissions **{bmp}** pour utiliser cette commande.') | |
return | |
if isinstance(error, commands.DisabledCommand): | |
await ctx.send('Cette commande est désactivé.') | |
return | |
if isinstance(error, commands.CommandOnCooldown): | |
if checks.is_owner(): | |
await ctx.reinvoke() | |
return | |
await ctx.send(f"Cette commande est en Cooldown, merci de réessayer dans :" | |
f" {math.ceil(error.retry_after)}s.") | |
return | |
if isinstance(error, commands.MissingPermissions): | |
missing = [perm.replace('_', ' ').replace('guild', 'server').title() for perm in error.missing_perms] | |
if len(missing) > 2: | |
bmp = f'{"**, **".join(missing[:-1])}, et {missing[-1]}' | |
else: | |
bmp = ' et '.join(missing) | |
_message = f'Vous avez besoin des permissions suivante : **{bmp}** pour utiliser cette commande.' | |
await ctx.send(_message) | |
return | |
if isinstance(error, commands.UserInputError): | |
await ctx.send_help(ctx.command) | |
return | |
if isinstance(error, commands.NoPrivateMessage): | |
try: | |
await'Cette commande ne peut pas être utilisée dans les MP.') | |
except discord.Forbidden: | |
pass | |
return | |
if isinstance(error, commands.CheckFailure): | |
await ctx.send("Vous n'avez pas la permission d'utiliser cette commandes.") | |
return | |
if isinstance(error, checks.NSFWFailure): | |
await ctx.send("Ce channel n'est pas un channel NSFW, cette commande ne peut donc pas s'exécuter.") | |
return | |
# ignore all other exception types, but print them to stderr | |
print(f'Ignoring exception in command {ctx.command}:', file=sys.stderr) | |
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr) | |
bot.reset() | |
return bot | |
if __name__ == '__main__': | |
check_folders() | |
db = initiate() | |
bot_token = get_token() | |
bot_prefix = get_prefix() | |
loop = asyncio.get_event_loop() | |
if len(bot_token) == 0: | |
loop.run_until_complete(interactive_setup()) | |
bot_token = get_token() | |
bot_prefix = get_prefix() | |
client = initialize() | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete(main(client)) | |
except discord.LoginFailure: | |
client.logger.error(traceback.format_exc()) | |
print("Impossible de se connecter a discord...\n" | |
"Suppression de la valeur token...") | |
sqlite_conn, sqlite_cursor = get_sqlite3_conn() | |
sqlite_cursor.execute("DELETE FROM owner where token is not null") | |
sqlite_conn.commit() | |
sqlite_cursor.close() | |
sqlite_conn.close() | |
except Exception: | |
client.logger.error(traceback.format_exc()) | |
loop.run_until_complete(client.logout()) | |
finally: | |
loop.close() | |
if client.cyvbot_shutdown_mode is True: | |
exit(0) | |
elif client.cyvbot_shutdown_mode is False: | |
exit(42) # Restart | |
else: | |
exit(1) |
cyvax - 2024