mirror of
https://github.com/Paillat-dev/Botator.git
synced 2026-01-02 09:16:19 +00:00
324
code/code.py
324
code/code.py
@@ -3,183 +3,181 @@ import discord # pip install pycord
|
|||||||
from discord import File, Intents # pip install pycord
|
from discord import File, Intents # pip install pycord
|
||||||
import logging # pip install logging
|
import logging # pip install logging
|
||||||
import sqlite3 # pip install sqlite3
|
import sqlite3 # pip install sqlite3
|
||||||
|
import asyncio # pip install asyncio
|
||||||
#set the debug mode to the maximum
|
#set the debug mode to the maximum
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
def debug(message):
|
def debug(message):
|
||||||
logging.info(message)
|
logging.info(message)
|
||||||
|
|
||||||
#create a database called "database.db" if the database does not exist, else connect to it
|
#create a database called "database.db" if the database does not exist, else connect to it
|
||||||
conn = sqlite3.connect('database.db')
|
conn = sqlite3.connect('data.db')
|
||||||
c = conn.cursor()
|
c = conn.cursor()
|
||||||
|
|
||||||
# Create table called "data" if it does not exist with the following columns: guild_id, channel_id, api_key, is_active
|
# Create table called "data" if it does not exist with the following columns: guild_id, channel_id, api_key, is_active, max_tokens, temperature, frequency_penalty, presence_penalty, uses_count_today, prompt_size
|
||||||
c.execute('''CREATE TABLE IF NOT EXISTS data (guild_id text, channel_id text, api_key text, is_active boolean)''')
|
c.execute('''CREATE TABLE IF NOT EXISTS data (guild_id text, channel_id text, api_key text, is_active boolean, max_tokens integer, temperature real, frequency_penalty real, presence_penalty real, uses_count_today integer, prompt_size integer)''')
|
||||||
Intents =discord.Intents.all() # enable all intents
|
Intents =discord.Intents.all() # enable all intents
|
||||||
Intents.members = True
|
Intents.members = True
|
||||||
bot = discord.Bot(intents=Intents.all())
|
bot = discord.Bot(intents=Intents.all())
|
||||||
#create a command called "setchannel"
|
#create a command called "setup" that takes 2 arguments: the channel id and the api key
|
||||||
@bot.command()
|
@bot.command()
|
||||||
async def setchannel(ctx, channel: discord.TextChannel):
|
@discord.commands.option(name="channel_id", description="The channel id", required=True)
|
||||||
# Check if the bot has the "Manage Channels" permission
|
@discord.commands.option(name="api_key", description="The api key", required=True)
|
||||||
if ctx.author.guild_permissions.manage_channels:
|
async def setup(ctx, channel: discord.TextChannel, api_key):
|
||||||
# Check if the channel is already set
|
#check if the api key is valid
|
||||||
c.execute("SELECT channel_id FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
openai.api_key = api_key
|
||||||
if c.fetchone() is None:
|
try:
|
||||||
# Insert the channel id into the database
|
openai.Completion.create(engine="davinci", prompt="Hello world", max_tokens=1)
|
||||||
c.execute("INSERT INTO data VALUES (?, ?, ?, ?)", (ctx.guild.id, channel.id, None, False))
|
except:
|
||||||
conn.commit()
|
await ctx.respond("Invalid api key", ephemeral=True)
|
||||||
await ctx.respond("Channel set!",ephemeral=True)
|
return
|
||||||
else:
|
#check if the channel is valid
|
||||||
await ctx.respond("Channel already set!",ephemeral=True)
|
if channel is None:
|
||||||
else:
|
await ctx.respond("Invalid channel id", ephemeral=True)
|
||||||
await ctx.respond("You do not have the permission to do that!",ephemeral=True)
|
return
|
||||||
#create a command called "setkey"
|
#check if the guild is already in the database
|
||||||
@bot.command()
|
c.execute("SELECT * FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
||||||
async def setkey(ctx, key):
|
if c.fetchone() is not None:
|
||||||
# Check if the bot has the "Manage Channels" permission
|
await ctx.respond("This server is already setup", ephemeral=True)
|
||||||
if ctx.author.guild_permissions.manage_channels:
|
return
|
||||||
# Check if the channel is already set
|
#add the guild to the database
|
||||||
c.execute("SELECT channel_id FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
c.execute("INSERT INTO data VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (ctx.guild.id, channel.id, api_key, False, 50, 0.9, 0.0, 0.0, 0, 0))
|
||||||
if c.fetchone() is not None:
|
conn.commit()
|
||||||
# Insert the api key into the database
|
await ctx.send("The guild has been added to the database")
|
||||||
c.execute("UPDATE data SET api_key = ? WHERE guild_id = ?", (key, ctx.guild.id))
|
#create a command called "enable" taht only admins can use
|
||||||
conn.commit()
|
|
||||||
await ctx.respond("Key set!",ephemeral=True)
|
|
||||||
else:
|
|
||||||
await ctx.respond("Channel not set!",ephemeral=True)
|
|
||||||
else:
|
|
||||||
await ctx.respond("You do not have the permission to do that!",ephemeral=True)
|
|
||||||
#create a command called "enable"
|
|
||||||
@bot.command()
|
@bot.command()
|
||||||
|
##@discord.commands.permissions(administrator=True)
|
||||||
async def enable(ctx):
|
async def enable(ctx):
|
||||||
# Check if the bot has the "Manage Channels" permission
|
#check if the guild is in the database
|
||||||
if ctx.author.guild_permissions.manage_channels:
|
c.execute("SELECT * FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
||||||
# Check if the channel is already set
|
if c.fetchone() is None:
|
||||||
c.execute("SELECT channel_id FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
await ctx.respond("This server is not setup", ephemeral=True)
|
||||||
if c.fetchone() is not None:
|
return
|
||||||
# Check if the api key is already set
|
#enable the guild
|
||||||
c.execute("SELECT api_key FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
c.execute("UPDATE data SET is_active = ? WHERE guild_id = ?", (True, ctx.guild.id))
|
||||||
if c.fetchone() is not None:
|
conn.commit()
|
||||||
# Set is_active to True
|
await ctx.send("The guild has been enabled")
|
||||||
c.execute("UPDATE data SET is_active = ? WHERE guild_id = ?", (True, ctx.guild.id))
|
#create a command called "disable" that only admins can use
|
||||||
conn.commit()
|
|
||||||
await ctx.respond("Enabled!", ephemeral=True)
|
|
||||||
else:
|
|
||||||
await ctx.respond("Key not set!", ephemeral=True)
|
|
||||||
else:
|
|
||||||
await ctx.respond("Channel not set!", ephemeral=True)
|
|
||||||
else:
|
|
||||||
await ctx.respond("You do not have the permission to do that!", ephemeral=True)
|
|
||||||
#create a command called "disable"
|
|
||||||
@bot.command()
|
@bot.command()
|
||||||
|
##@discord.commands.permissions(administrator=True)
|
||||||
async def disable(ctx):
|
async def disable(ctx):
|
||||||
# Check if the bot has the "Manage Channels" permission
|
#check if the guild is in the database
|
||||||
if ctx.author.guild_permissions.manage_channels:
|
c.execute("SELECT * FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
||||||
# Check if the channel is already set
|
if c.fetchone() is None:
|
||||||
c.execute("SELECT channel_id FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
await ctx.respond("This server is not setup", ephemeral=True)
|
||||||
if c.fetchone() is not None:
|
return
|
||||||
# Check if the api key is already set
|
#disable the guild
|
||||||
c.execute("SELECT api_key FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
c.execute("UPDATE data SET is_active = ? WHERE guild_id = ?", (False, ctx.guild.id))
|
||||||
if c.fetchone() is not None:
|
conn.commit()
|
||||||
# Set is_active to false
|
await ctx.send("The guild has been disabled")
|
||||||
c.execute("UPDATE data SET is_active = ? WHERE guild_id = ?", (False, ctx.guild.id))
|
#create a command called "advanced" that only admins can use, wich sets the advanced settings up: max_tokens, temperature, frequency_penalty, presence_penalty, prompt_size
|
||||||
conn.commit()
|
|
||||||
await ctx.respond("Disabled!", ephemeral=True)
|
|
||||||
else:
|
|
||||||
await ctx.respond("Key not set!", ephemeral=True)
|
|
||||||
else:
|
|
||||||
await ctx.respond("Channel not set!", ephemeral=True)
|
|
||||||
else:
|
|
||||||
await ctx.respond("You do not have the permission to do that!", ephemeral=True)
|
|
||||||
#create a command called "delete" to delete the channel and api key from the database
|
|
||||||
@bot.command()
|
@bot.command()
|
||||||
async def delete(ctx):
|
##@discord.commands.permissions(administrator=True)
|
||||||
# Check if the bot has the "Manage Channels" permission
|
#set the first argument: max_tokens, with a default value of 150
|
||||||
if ctx.author.guild_permissions.manage_channels:
|
@discord.commands.option(name="max_tokens", description="The max tokens", required=False)
|
||||||
# Check if the channel is already set
|
#set the second argument: temperature, with a default value of 0.5
|
||||||
c.execute("SELECT channel_id FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
@discord.commands.option(name="temperature", description="The temperature", required=False)
|
||||||
if c.fetchone() is not None:
|
#set the third argument: frequency_penalty, with a default value of 0.5
|
||||||
# Delete the channel and api key from the database
|
@discord.commands.option(name="frequency_penalty", description="The frequency penalty", required=False)
|
||||||
c.execute("DELETE FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
#set the fourth argument: presence_penalty, with a default value of 0.5
|
||||||
conn.commit()
|
@discord.commands.option(name="presence_penalty", description="The presence penalty", required=False)
|
||||||
await ctx.respond("Deleted!", ephemeral=True)
|
#set the fifth argument: prompt_size, with a default value of 5
|
||||||
else:
|
@discord.commands.option(name="prompt_size", description="The number of messages to use as a prompt", required=False)
|
||||||
await ctx.respond("Channel not set!", ephemeral=True)
|
async def advanced(ctx, max_tokens=256, temperature=0.9, frequency_penalty=0, presence_penalty=0.6, prompt_size=5):
|
||||||
else:
|
#check if the guild is in the database
|
||||||
await ctx.respond("You do not have the permission to do that!", ephemeral=True)
|
c.execute("SELECT * FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
||||||
#create a command called "info" to get the channel and api key from the database
|
if c.fetchone() is None:
|
||||||
|
await ctx.respond("This server is not setup, please run /setup", ephemeral=True)
|
||||||
|
return
|
||||||
|
#update the advanced settings
|
||||||
|
c.execute("UPDATE data SET max_tokens = ?, temperature = ?, frequency_penalty = ?, presence_penalty = ?, prompt_size = ? WHERE guild_id = ?", (max_tokens, temperature, frequency_penalty, presence_penalty, prompt_size, ctx.guild.id))
|
||||||
|
conn.commit()
|
||||||
|
await ctx.respond("The advanced settings have been updated", ephemeral=True)
|
||||||
|
#create a command called "delete" that only admins can use wich deletes the guild id, the api key, the channel id and the advanced settings from the database
|
||||||
@bot.command()
|
@bot.command()
|
||||||
async def info(ctx):
|
##@discord.commands.permissions(administrator=True)
|
||||||
# Check if the bot has the "administrator" permission
|
async def delete(ctx):
|
||||||
if ctx.author.guild_permissions.administrator:
|
#check if the guild is in the database
|
||||||
# Check if the channel is already set
|
c.execute("SELECT * FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
||||||
c.execute("SELECT channel_id FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
if c.fetchone() is None:
|
||||||
if c.fetchone() is not None:
|
await ctx.respond("This server is not setup", ephemeral=True)
|
||||||
# Get the channel and api key from the database
|
return
|
||||||
c.execute("SELECT channel_id, api_key FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
#delete the guild from the database
|
||||||
channel_id, api_key = c.fetchone()
|
c.execute("DELETE FROM data WHERE guild_id = ?", (ctx.guild.id,))
|
||||||
await ctx.respond(f"Channel: {channel_id}, api key: {api_key}", ephemeral=True)
|
conn.commit()
|
||||||
else:
|
await ctx.send("The guild has been deleted from the database")
|
||||||
await ctx.respond("Channel not set!", ephemeral=True)
|
@bot.command()
|
||||||
else:
|
async def help(ctx):
|
||||||
await ctx.respond("You do not have the permission to do that!", ephemeral=True)
|
embed = discord.Embed(title="Help", description="Here is the help page", color=0x00ff00)
|
||||||
# when a message is sent in a channel, check if the channel is in the database for the guild, and if it is, and if it is not, check if the channel is active, and if it is, check if the user is a bot, and if it is not, send the message to openai with the 5 last messages from the channel as the prompt
|
embed.add_field(name="/setup", value="Setup the bot", inline=False)
|
||||||
|
embed.add_field(name="/enable", value="Enable the bot", inline=False)
|
||||||
|
embed.add_field(name="/disable", value="Disable the bot", inline=False)
|
||||||
|
embed.add_field(name="/advanced", value="Set the advanced settings", inline=False)
|
||||||
|
embed.add_field(name="/delete", value="Delete all your data from our server", inline=False)
|
||||||
|
embed.add_field(name="/help", value="Show this message", inline=False)
|
||||||
|
await ctx.respond(embed=embed, ephemeral=True)
|
||||||
|
#when a message is sent into a channel check if the guild is in the database and if the bot is enabled
|
||||||
@bot.event
|
@bot.event
|
||||||
async def on_message(message):
|
async def on_message(message):
|
||||||
debug(message)
|
#check if the message is from a bot
|
||||||
# Check if the channel is in the database for the guild and if the message has been sent in that channel
|
if message.author.bot:
|
||||||
|
return
|
||||||
|
#check if the guild is in the database
|
||||||
|
c.execute("SELECT * FROM data WHERE guild_id = ?", (message.guild.id,))
|
||||||
|
if c.fetchone() is None:
|
||||||
|
return
|
||||||
|
#check if the bot is enabled
|
||||||
|
c.execute("SELECT is_active FROM data WHERE guild_id = ?", (message.guild.id,))
|
||||||
|
if c.fetchone()[0] == False:
|
||||||
|
return
|
||||||
|
#check if the message has been sent in the channel set in the database
|
||||||
c.execute("SELECT channel_id FROM data WHERE guild_id = ?", (message.guild.id,))
|
c.execute("SELECT channel_id FROM data WHERE guild_id = ?", (message.guild.id,))
|
||||||
channel = c.fetchone()
|
if str(message.channel.id) != str(c.fetchone()[0]):
|
||||||
debug(channel[0])
|
debug("The message has been sent in the wrong channel")
|
||||||
debug(message.channel.id)
|
return
|
||||||
if channel is not None and str(message.channel.id) == str(channel[0]):
|
#check if the bot hasn't been used more than 200 times in the last 24 hours (uses_count_today)
|
||||||
debug("Channel is in database")
|
c.execute("SELECT uses_count_today FROM data WHERE guild_id = ?", (message.guild.id,))
|
||||||
# Check if the channel is active
|
if c.fetchone()[0] >= 200:
|
||||||
c.execute("SELECT is_active FROM data WHERE guild_id = ?", (message.guild.id,))
|
return
|
||||||
if c.fetchone() == (True,):
|
#add 1 to the uses_count_today
|
||||||
debug("Channel is active")
|
c.execute("UPDATE data SET uses_count_today = uses_count_today + 1 WHERE guild_id = ?", (message.guild.id,))
|
||||||
# Check if the user is a bot
|
#get the api key from the database
|
||||||
if not message.author.bot:
|
c.execute("SELECT api_key FROM data WHERE guild_id = ?", (message.guild.id,))
|
||||||
debug("User is not a bot")
|
api_key = c.fetchone()[0]
|
||||||
# Get the api key from the database
|
#get the advanced settings from the database
|
||||||
c.execute("SELECT api_key FROM data WHERE guild_id = ?", (message.guild.id,))
|
c.execute("SELECT max_tokens, temperature, frequency_penalty, presence_penalty, prompt_size FROM data WHERE guild_id = ?", (message.guild.id,))
|
||||||
api_key = c.fetchone()[0]
|
max_tokens, temperature, frequency_penalty, presence_penalty, prompt_size = c.fetchone()
|
||||||
# Get the 5 last messages from the channel
|
messages = await message.channel.history(limit=prompt_size).flatten()
|
||||||
messages = await message.channel.history(limit=5).flatten()
|
messages.reverse()
|
||||||
# Create the prompt with the 5 last messages and the message sent by the user goint at the line after each message, adding HUMAN: before the messages taht were not sent by the bot and AI: before the messages that were sent by the bot
|
prompt = ""
|
||||||
prompt = ""
|
for msg in messages:
|
||||||
for m in messages:
|
if msg.author.bot:
|
||||||
#add at the beginning of the prompt and not at the end to have the messages in the right order
|
prompt += f"AI: {msg.content}\n"
|
||||||
if m.author.bot:
|
else:
|
||||||
prompt = f"AI: {m.content}\n" + prompt
|
prompt += f"{msg.author.display_name}: {msg.content}\n"
|
||||||
else:
|
prompt = f"This is a conversation with an AI. Only the {prompt_size} last messages are used as a prompt.\n\n" + prompt + f"\n AI:"
|
||||||
prompt = m.author.display_name + ": " + m.content + "\n" + prompt
|
#send the request to the api
|
||||||
#prompt = f"HUMAN: {m.content}\n" + prompt
|
debug("Sending request to the api")
|
||||||
#add AI: at the end of the prompt
|
debug(prompt)
|
||||||
prompt += "AI:"
|
openai.api_key = api_key
|
||||||
prompt = "This is a conversation with an AI. Only the last 5 messages are taken as a prompt. \n \n" + prompt
|
response = openai.Completion.create(
|
||||||
debug(prompt)
|
engine="text-davinci-002",
|
||||||
# prompt += f"HUMAN: {message.content}"
|
prompt=str(prompt),
|
||||||
#set the api key
|
max_tokens=int(max_tokens),
|
||||||
openai.api_key = api_key
|
top_p=1,
|
||||||
# Send the prompt to openai
|
temperature=float(temperature),
|
||||||
response = openai.Completion.create(
|
frequency_penalty=float(frequency_penalty),
|
||||||
engine="text-davinci-002",
|
presence_penalty=float(presence_penalty),
|
||||||
prompt=prompt,
|
stop=[" Human:", " AI:", "AI:", "Human:"] )
|
||||||
temperature=0.9,
|
#send the response
|
||||||
max_tokens=512,
|
if response["choices"][0] ["text"] != "":
|
||||||
top_p=1,
|
await message.channel.send(response["choices"][0]["text"])
|
||||||
frequency_penalty=0,
|
else:
|
||||||
presence_penalty=0.6,
|
await message.channel.send("The AI is not sure what to say (the response was empty)")
|
||||||
stop=["\n", " Human:", " AI:"]
|
debug("The response was empty")
|
||||||
)
|
debug("The response has been sent")
|
||||||
# Send the response to the channel
|
|
||||||
if response["choices"][0] ["text"] != "":
|
#get the message content
|
||||||
await message.channel.send(response["choices"][0]["text"])
|
# add a slash command called "say" that sends a message to the channel
|
||||||
else:
|
|
||||||
# If the response is empty, send a message saying that the response is empty
|
|
||||||
await message.channel.send("I don't know what to say (response is empty)")
|
|
||||||
# add a slash command called "say" that sends a message to the channel
|
|
||||||
@bot.slash_command()
|
@bot.slash_command()
|
||||||
async def say(ctx, message: str):
|
async def say(ctx, message: str):
|
||||||
await ctx.respond("message sent!", ephemeral=True)
|
await ctx.respond("message sent!", ephemeral=True)
|
||||||
@@ -190,6 +188,18 @@ async def clear(ctx):
|
|||||||
await ctx.respond("messages deleted!", ephemeral=True)
|
await ctx.respond("messages deleted!", ephemeral=True)
|
||||||
return await ctx.channel.purge()
|
return await ctx.channel.purge()
|
||||||
|
|
||||||
|
#at the end of the day reset the uses_count_today to 0 for all the guilds
|
||||||
|
async def reset_uses_count_today():
|
||||||
|
await bot.wait_until_ready()
|
||||||
|
while not bot.is_closed():
|
||||||
|
c.execute("UPDATE data SET uses_count_today = 0")
|
||||||
|
conn.commit()
|
||||||
|
await asyncio.sleep(86400)
|
||||||
|
# on startup run the reset_uses_count_today function
|
||||||
|
bot.loop.create_task(reset_uses_count_today())
|
||||||
|
|
||||||
#run the bot
|
#run the bot
|
||||||
# Replace the following with your bot's token
|
# Replace the following with your bot's token
|
||||||
bot.run("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
|
with open("key.txt") as f:
|
||||||
|
key = f.read()
|
||||||
|
bot.run(key)
|
||||||
Reference in New Issue
Block a user