Added function calling and lots of other stuff

This commit is contained in:
Paillat
2023-07-15 12:20:38 +02:00
parent 0ce8e2d6d3
commit 4f9a7eb0a6
28 changed files with 434 additions and 358 deletions

6
src/cogs/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
from src.cogs.setup import Setup
from src.cogs.settings import Settings
from src.cogs.help import Help
from src.cogs.chat import Chat
from src.cogs.manage_chat import ManageChat
from src.cogs.moderation import Moderation

134
src/cogs/chat.py Normal file
View File

@@ -0,0 +1,134 @@
import discord
from discord.ext import commands
from src.config import (
debug,
webhook_url,
)
import src.makeprompt as mp
import aiohttp
class MyModal(discord.ui.Modal):
def __init__(self, message):
super().__init__(title="Downvote")
self.add_item(
discord.ui.InputText(label="Reason", style=discord.InputTextStyle.long)
)
self.message = message
async def callback(self, interaction: discord.Interaction):
debug("Downvote sent !")
embed = discord.Embed(
title="Thanks for your feedback !",
description="Your downvote has been sent to the developers. Thanks for your help !",
color=discord.Color.og_blurple(),
)
embed.add_field(name="Message", value=self.children[0].value)
await interaction.response.send_message(embed=embed, ephemeral=True)
if webhook_url != "" and webhook_url != None:
session = aiohttp.ClientSession()
webhook = discord.Webhook.from_url(webhook_url, session=session)
embed = discord.Embed(
title="Downvote",
description=f"Downvote recieved!",
color=discord.Color.og_blurple(),
)
embed.add_field(name="Reason", value=self.children[0].value, inline=True)
embed.add_field(name="Author", value=interaction.user.mention, inline=True)
embed.add_field(
name="Channel", value=self.message.channel.name, inline=True
)
embed.add_field(name="Guild", value=self.message.guild.name, inline=True)
history = await self.message.channel.history(
limit=5, before=self.message
).flatten()
history.reverse()
users = []
fake_users = []
for user in history:
if user.author not in users:
# we anonimize the user, so user1, user2, user3, etc
fake_users.append(f"user{len(fake_users)+1}")
users.append(user.author)
if self.message.author not in users:
fake_users.append(f"user{len(fake_users)+1}")
users.append(self.message.author)
for msg in history:
uname = fake_users[users.index(msg.author)]
if len(msg.content) > 1023:
embed.add_field(
name=f"{uname} said", value=msg.content[:1023], inline=False
)
else:
embed.add_field(
name=f"{uname} said", value=msg.content, inline=False
)
uname = fake_users[users.index(self.message.author)]
embed.add_field(
name=f"{uname} said",
value="*" + self.message.content[:1021] + "*"
if len(self.message.content) > 1021
else "*"
+ self.message.content
+ "*", # [:1021] if len(self.message.content) > 1021,
inline=False,
)
await webhook.send(embed=embed)
else:
debug(
"Error while sending webhook, probably no webhook is set up in the .env file"
)
class Chat(discord.Cog):
def __init__(self, bot: discord.Bot):
super().__init__()
self.bot = bot
@discord.Cog.listener()
async def on_message(self, message: discord.Message):
await mp.chat_process(self, message)
@discord.slash_command(name="say", description="Say a message")
async def say(self, ctx: discord.ApplicationContext, message: str):
await ctx.respond("Message sent !", ephemeral=True)
await ctx.send(message)
@discord.slash_command(name="redo", description="Redo a message")
async def redo(self, ctx: discord.ApplicationContext):
history = await ctx.channel.history(limit=2).flatten()
message_to_delete = history[0]
message_to_redo = history[1]
if message_to_delete.author.id == self.bot.user.id:
await message_to_delete.delete()
else:
message_to_redo = history[0]
await ctx.respond("Message redone !", ephemeral=True)
await mp.chat_process(self, message_to_redo)
@discord.message_command(name="Downvote", description="Downvote a message")
@commands.cooldown(1, 60, commands.BucketType.user)
async def downvote(self, ctx: discord.ApplicationContext, message: discord.Message):
if message.author.id == self.bot.user.id:
modal = MyModal(message)
await ctx.send_modal(modal)
else:
await ctx.respond(
"You can't downvote a message that is not from me !", ephemeral=True
)
@downvote.error
async def downvote_error(self, ctx, error):
if isinstance(error, commands.CommandOnCooldown):
await ctx.respond("You are on cooldown !", ephemeral=True)
else:
debug(error)
raise error

101
src/cogs/help.py Normal file
View File

@@ -0,0 +1,101 @@
import discord
class Help(discord.Cog):
def __init__(self, bot: discord.Bot) -> None:
super().__init__()
self.bot = bot
@discord.slash_command(name="help", description="Show all the commands")
async def help(self, ctx: discord.ApplicationContext):
embed = discord.Embed(
title="Help", description="Here is the help page", color=0x00FF00
)
embed.add_field(name="/setup", value="Setup the bot", inline=False)
embed.add_field(name="/enable", value="Enable the bot", inline=False)
embed.add_field(name="/disable", value="Disable the bot", inline=False)
embed.add_field(
name="/advanced", value="Set the advanced settings", inline=False
)
embed.add_field(
name="/advanced_help",
value="Get help about the advanced settings",
inline=False,
)
embed.add_field(
name="/enable_tts", value="Enable the Text To Speech", inline=False
)
embed.add_field(
name="/disable_tts", value="Disable the Text To Speech", inline=False
)
embed.add_field(
name="/add|remove_channel",
value="Add or remove a channel to the list of channels where the bot will answer. Only available on premium guilds",
inline=False,
)
embed.add_field(
name="/delete", value="Delete all your data from our server", inline=False
)
embed.add_field(
name="/cancel",
value="Cancel the last message sent by the bot",
inline=False,
)
embed.add_field(
name="/default",
value="Set the advanced settings to their default values",
inline=False,
)
embed.add_field(name="/say", value="Say a message", inline=False)
embed.add_field(
name="/redo", value="Redo the last message sent by the bot", inline=False
)
embed.add_field(
name="/moderation", value="Setup the AI auto-moderation", inline=False
)
embed.add_field(
name="/get_toxicity",
value="Get the toxicity that the AI would have given to a given message",
inline=False,
)
embed.add_field(name="/help", value="Show this message", inline=False)
# add a footer
embed.set_footer(text="Made by @Paillat#7777")
await ctx.respond(embed=embed, ephemeral=True)
@discord.slash_command(
name="advanced_help", description="Show the advanced settings meanings"
)
async def advanced_help(self, ctx: discord.ApplicationContext):
embed = discord.Embed(
title="Advanced Help",
description="Here is the advanced help page",
color=0x00FF00,
)
embed.add_field(
name="temperature",
value="The higher the temperature, the more likely the model will take risks. Conversely, a lower temperature will make the model more conservative. The default value is 0.9",
inline=False,
)
embed.add_field(
name="max_tokens",
value="The maximum number of tokens to generate. Higher values will result in more coherent text, but will take longer to complete. (default: 50). **Lower values will result in somentimes cutting off the end of the answer, but will be faster.**",
inline=False,
)
embed.add_field(
name="frequency_penalty",
value="The higher the frequency penalty, the more new words the model will introduce (default: 0.0)",
inline=False,
)
embed.add_field(
name="presence_penalty",
value="The higher the presence penalty, the more new words the model will introduce (default: 0.0)",
inline=False,
)
embed.add_field(
name="prompt_size",
value="The number of messages to use as a prompt (default: 5). The more messages, the more coherent the text will be, but the more it will take to generate and the more it will cost.",
inline=False,
)
# add a footer
embed.set_footer(text="Made by @Paillat#7777")
await ctx.respond(embed=embed, ephemeral=True)

108
src/cogs/manage_chat.py Normal file
View File

@@ -0,0 +1,108 @@
import discord
import re
import os
from src.config import debug, curs_data
class ManageChat(discord.Cog):
def __init__(self, bot: discord.Bot) -> None:
super().__init__()
self.bot = bot
@discord.slash_command(
name="cancel", description="Cancel the last message sent into a channel"
)
async def cancel(self, ctx: discord.ApplicationContext):
debug(
f"The user {ctx.author} ran the cancel command in the channel {ctx.channel} of the guild {ctx.guild}, named {ctx.guild.name}"
)
# check if the guild is in the database
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx.guild.id,))
if curs_data.fetchone() is None:
await ctx.respond(
"This server is not setup, please run /setup", ephemeral=True
)
return
# get the last message sent by the bot in the cha where the command was sent
last_message = await ctx.channel.fetch_message(ctx.channel.last_message_id)
# delete the message
await last_message.delete()
await ctx.respond("The last message has been deleted", ephemeral=True)
# add a slash command called "clear" that deletes all the messages in the channel
@discord.slash_command(
name="clear", description="Clear all the messages in the channel"
)
async def clear(self, ctx: discord.ApplicationContext):
debug(
f"The user {ctx.author.name} ran the clear command command in the channel {ctx.channel} of the guild {ctx.guild}, named {ctx.guild.name}"
)
await ctx.respond("messages deleted!", ephemeral=True)
return await ctx.channel.purge()
@discord.slash_command(
name="transcript",
description="Get a transcript of the messages that have been sent in this channel intoa text file",
)
@discord.option(
name="channel_send",
description="The channel to send the transcript to",
required=False,
)
async def transcript(
self, ctx: discord.ApplicationContext, channel_send: discord.TextChannel = None
):
debug(
f"The user {ctx.author.name} ran the transcript command command in the channel {ctx.channel} of the guild {ctx.guild}, named {ctx.guild.name}"
)
# save all the messages in the channel in a txt file and send it
messages = await ctx.channel.history(limit=None).flatten()
messages.reverse()
transcript = ""
# defer the response
await ctx.defer() # defer the response so that the bot doesn't say that it's thinking
for msg in messages:
if msg.author.bot:
transcript += f"Botator: {msg.content}\n"
else:
mentions = re.findall(r"<@!?\d+>", msg.content)
# then replace each mention with the name of the user
for mention in mentions:
# get the user id
id = mention[2:-1]
# get the user
user = await self.bot.fetch_user(id)
# replace the mention with the name
msg.content = msg.content.replace(
mention, msg.guild.get_member(user.id).name
)
transcript += f"{msg.author.name}: {msg.content}\n"
# save the transcript in a txt file called transcript.txt. If the file already exists, delete it and create a new one
# check if the file exists
if os.path.exists("transcript.txt"):
os.remove("transcript.txt")
f = open("transcript.txt", "w")
f.write(transcript)
f.close()
last_message: discord.Message = await ctx.channel.fetch_message(
ctx.channel.last_message_id
)
new_file_name = f"transcript_{ctx.guild.name}_{ctx.channel.name}_{last_message.created_at.strftime('%d-%B-%Y')}.txt"
# rename the file with the name of the channel and the date in this format: transcript_servername_channelname_dd-month-yyyy.txt ex : transcript_Botator_Testing_12-may-2021.txt
os.rename(
"transcript.txt",
new_file_name,
)
# send the file in a private message to the user who ran the command
# TODO: rework so as to give the choice of a private send or a public send
if channel_send is None:
await ctx.respond(
file=discord.File(new_file_name),
ephemeral=True,
)
else:
await channel_send.send(file=discord.File(new_file_name))
await ctx.respond("Transcript sent!", ephemeral=True, delete_after=5)
await ctx.author.send(file=discord.File(new_file_name))
# delete the file
os.remove(new_file_name)

113
src/cogs/moderation.py Normal file
View File

@@ -0,0 +1,113 @@
import discord
from discord import default_permissions
import os
from src.config import debug, curs_data, con_data
import openai
import requests
class Moderation(discord.Cog):
def __init__(self, bot: discord.Bot) -> None:
super().__init__()
self.bot = bot
@discord.slash_command(
name="moderation", description="Enable or disable AI moderation & set the rules"
)
@discord.option(
name="enable",
description="Enable or disable AI moderation",
reqired=True,
)
@discord.option(
name="log_channel",
description="The channel where the moderation logs will be sent",
required=True,
)
@discord.option(
name="moderator_role", description="The role of the moderators", required=True
)
# the types of toxicity are 'requestedAttributes': {'TOXICITY': {}, 'SEVERE_TOXICITY': {}, 'IDENTITY_ATTACK': {}, 'INSULT': {}, 'PROFANITY': {}, 'THREAT': {}, 'SEXUALLY_EXPLICIT': {}, 'FLIRTATION': {}, 'OBSCENE': {}, 'SPAM': {}},
@discord.option(
name="toxicity", description="The toxicity threshold", required=False
)
@discord.option(
name="severe_toxicity",
description="The severe toxicity threshold",
required=False,
)
@discord.option(
name="identity_attack",
description="The identity attack threshold",
required=False,
)
@discord.option(name="insult", description="The insult threshold", required=False)
@discord.option(
name="profanity", description="The profanity threshold", required=False
)
@discord.option(name="threat", description="The threat threshold", required=False)
@discord.option(
name="sexually_explicit",
description="The sexually explicit threshold",
required=False,
)
@discord.option(
name="flirtation", description="The flirtation threshold", required=False
)
@discord.option(name="obscene", description="The obscene threshold", required=False)
@discord.option(name="spam", description="The spam threshold", required=False)
# we set the default permissions to the administrator permission, so only the server administrators can use this command
@default_permissions(administrator=True)
async def moderation(
self,
ctx: discord.ApplicationContext,
enable: bool,
log_channel: discord.TextChannel,
moderator_role: discord.Role,
toxicity: float = None,
severe_toxicity: float = None,
identity_attack: float = None,
insult: float = None,
profanity: float = None,
threat: float = None,
sexually_explicit: float = None,
flirtation: float = None,
obscene: float = None,
spam: float = None,
):
# local import, because we don't want to import the toxicity function if the moderation is disabled
# import toxicity as tox # this is a file called toxicity.py, which contains the toxicity function that allows you to check if a message is toxic or not (it uses the perspective api)
await ctx.respond(
"Our moderation capabilities have been switched to our new 100% free and open-source AI discord moderation bot! You add it to your server here: https://discord.com/api/oauth2/authorize?client_id=1071451913024974939&permissions=1377342450896&scope=bot and you can find the source code here: https://github.com/Paillat-dev/Moderator/ \n If you need help, you can join our support server here: https://discord.gg/pB6hXtUeDv",
ephemeral=True,
)
if enable == False:
curs_data.execute(
"DELETE FROM moderation WHERE guild_id = ?", (str(ctx.guild.id),)
)
con_data.commit()
await ctx.respond("Moderation disabled!", ephemeral=True)
return
@discord.slash_command(
name="get_toxicity", description="Get the toxicity of a message"
)
@discord.option(
name="message", description="The message you want to check", required=True
)
@default_permissions(administrator=True)
async def get_toxicity(self, ctx: discord.ApplicationContext, message: str):
await ctx.respond(
"Our moderation capabilities have been switched to our new 100% free and open-source AI discord moderation bot! You add it to your server here: https://discord.com/api/oauth2/authorize?client_id=1071451913024974939&permissions=1377342450896&scope=bot and you can find the source code here: https://discord.gg/pB6hXtUeDv . If you need help, you can join our support server here: https://discord.gg/pB6hXtUeDv",
ephemeral=True,
)
@discord.slash_command(
name="moderation_help", description="Get help with the moderation AI"
)
@default_permissions(administrator=True)
async def moderation_help(self, ctx: discord.ApplicationContext):
await ctx.respond(
"Our moderation capabilities have been switched to our new 100% free and open-source AI discord moderation bot! You add it to your server here: https://discord.com/api/oauth2/authorize?client_id=1071451913024974939&permissions=1377342450896&scope=bot and you can find the source code here: https://github.com/Paillat-dev/Moderator/ . If you need help, you can join our support server here: https://discord.gg/pB6hXtUeDv",
ephemeral=True,
)

393
src/cogs/settings.py Normal file
View File

@@ -0,0 +1,393 @@
import discord
from src.config import debug, con_data, curs_data, moderate, ctx_to_guid
from discord import default_permissions
models = ["davinci", "gpt-3.5-turbo", "gpt-4"]
images_recognition = ["enable", "disable"]
class Settings(discord.Cog):
def __init__(self, bot: discord.Bot) -> None:
super().__init__()
self.bot = bot
@discord.slash_command(name="advanced", description="Advanced settings")
@default_permissions(administrator=True)
@discord.option(name="max_tokens", description="The max tokens", required=False)
@discord.option(name="temperature", description="The temperature", required=False)
@discord.option(
name="frequency_penalty", description="The frequency penalty", required=False
)
@discord.option(
name="presence_penalty", description="The presence penalty", required=False
)
@discord.option(name="prompt_size", description="The prompt size", required=False)
async def advanced(
self,
ctx: discord.ApplicationContext,
max_tokens: int = None,
temperature: float = None,
frequency_penalty: float = None,
presence_penalty: float = None,
prompt_size: int = None,
):
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),))
if curs_data.fetchone() is None:
await ctx.respond("This server is not setup", ephemeral=True)
return
if (
max_tokens is None
and temperature is None
and frequency_penalty is None
and presence_penalty is None
and prompt_size is None
):
await ctx.respond("You must enter at least one argument", ephemeral=True)
return
if max_tokens is not None and (max_tokens < 1 or max_tokens > 4000):
await ctx.respond("Invalid max tokens", ephemeral=True)
return
if temperature is not None and (temperature < 0.0 or temperature > 1.0):
await ctx.respond("Invalid temperature", ephemeral=True)
return
if frequency_penalty is not None and (
frequency_penalty < 0.0 or frequency_penalty > 2.0
):
await ctx.respond("Invalid frequency penalty", ephemeral=True)
return
if presence_penalty is not None and (
presence_penalty < 0.0 or presence_penalty > 2.0
):
await ctx.respond("Invalid presence penalty", ephemeral=True)
return
if prompt_size is not None and (prompt_size < 1 or prompt_size > 10):
await ctx.respond("Invalid prompt size", ephemeral=True)
return
if max_tokens is None:
if (
curs_data.execute(
"SELECT max_tokens FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),)
).fetchone()[0]
is not None
and max_tokens is None
):
max_tokens = curs_data.execute(
"SELECT max_tokens FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),)
).fetchone()[0]
else:
max_tokens = 64
if temperature is None:
if (
curs_data.execute(
"SELECT temperature FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),)
).fetchone()[0]
is not None
and temperature is None
):
temperature = curs_data.execute(
"SELECT temperature FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),)
).fetchone()[0]
else:
temperature = 0.9
if frequency_penalty is None:
if (
curs_data.execute(
"SELECT frequency_penalty FROM data WHERE guild_id = ?",
(ctx_to_guid(ctx),),
).fetchone()[0]
is not None
and frequency_penalty is None
):
frequency_penalty = curs_data.execute(
"SELECT frequency_penalty FROM data WHERE guild_id = ?",
(ctx_to_guid(ctx),),
).fetchone()[0]
else:
frequency_penalty = 0.0
if presence_penalty is None:
if (
curs_data.execute(
"SELECT presence_penalty FROM data WHERE guild_id = ?",
(ctx_to_guid(ctx),),
).fetchone()[0]
is not None
and presence_penalty is None
):
presence_penalty = curs_data.execute(
"SELECT presence_penalty FROM data WHERE guild_id = ?",
(ctx_to_guid(ctx),),
).fetchone()[0]
else:
presence_penalty = 0.0
if prompt_size is None:
if (
curs_data.execute(
"SELECT prompt_size FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),)
).fetchone()[0]
is not None
and prompt_size is None
):
prompt_size = curs_data.execute(
"SELECT prompt_size FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),)
).fetchone()[0]
else:
prompt_size = 1
# update the database
curs_data.execute(
"UPDATE data SET max_tokens = ?, temperature = ?, frequency_penalty = ?, presence_penalty = ?, prompt_size = ? WHERE guild_id = ?",
(
max_tokens,
temperature,
frequency_penalty,
presence_penalty,
prompt_size,
ctx_to_guid(ctx),
),
)
con_data.commit()
await ctx.respond("Advanced settings updated", ephemeral=True)
# create a command called "delete" that only admins can use wich deletes the guild id, the api key, the channel id and the advanced settings from the database
@discord.slash_command(name="default", description="Default settings")
@default_permissions(administrator=True)
async def default(self, ctx: discord.ApplicationContext):
# check if the guild is in the database
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),))
if curs_data.fetchone() is None:
await ctx.respond(
"This server is not setup, please run /setup", ephemeral=True
)
return
# set the advanced settings (max_tokens, temperature, frequency_penalty, presence_penalty, prompt_size) and also prompt_prefix to their default values
curs_data.execute(
"UPDATE data SET max_tokens = ?, temperature = ?, frequency_penalty = ?, presence_penalty = ?, prompt_size = ? WHERE guild_id = ?",
(64, 0.9, 0.0, 0.0, 5, ctx_to_guid(ctx)),
)
con_data.commit()
await ctx.respond(
"The advanced settings have been set to their default values",
ephemeral=True,
)
# create a command called "cancel" that deletes the last message sent by the bot in the response channel
# when a message is sent into a channel check if the guild is in the database and if the bot is enabled
@discord.slash_command(
name="info", description="Show the information stored about this server"
)
@default_permissions(administrator=True)
async def info(self, ctx: discord.ApplicationContext):
# this command sends all the data about the guild, including the api key, the channel id, the advanced settings and the uses_count_today
# check if the guild is in the database
try:
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),))
data = curs_data.fetchone()
except:
data = None
if data[2] is None:
await ctx.respond("This server is not setup", ephemeral=True)
return
try:
curs_data.execute("SELECT * FROM model WHERE guild_id = ?", (ctx_to_guid(ctx),))
model = curs_data.fetchone()[1]
except:
model = None
if model is None:
model = "davinci"
embed = discord.Embed(
title="Info", description="Here is the info page", color=0x00FF00
)
embed.add_field(name="guild_id", value=data[0], inline=False)
embed.add_field(name="API Key", value="secret", inline=False)
embed.add_field(name="Main channel ID", value=data[1], inline=False)
embed.add_field(name="Model", value=model, inline=False)
embed.add_field(name="Is Active", value=data[3], inline=False)
embed.add_field(name="Max Tokens", value=data[4], inline=False)
embed.add_field(name="Temperature", value=data[5], inline=False)
embed.add_field(name="Frequency Penalty", value=data[6], inline=False)
embed.add_field(name="Presence Penalty", value=data[7], inline=False)
embed.add_field(name="Prompt Size", value=data[9], inline=False)
embed.add_field(name="Uses Count Today", value=data[8], inline=False)
if data[10]:
embed.add_field(name="Prompt prefix", value=data[10], inline=False)
await ctx.respond(embed=embed, ephemeral=True)
@discord.slash_command(name="prefix", description="Change the prefix of the prompt")
@default_permissions(administrator=True)
async def prefix(self, ctx: discord.ApplicationContext, prefix: str = ""):
try:
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),))
data = curs_data.fetchone()
api_key = data[2]
except:
await ctx.respond("This server is not setup", ephemeral=True)
return
if api_key is None or api_key == "":
await ctx.respond("This server is not setup", ephemeral=True)
return
if prefix != "":
await ctx.defer()
if await moderate(api_key=api_key, text=prefix):
await ctx.respond(
"This has been flagged as inappropriate by OpenAI, please choose another prefix",
ephemeral=True,
)
return
await ctx.respond("Prefix changed !", ephemeral=True, delete_after=5)
curs_data.execute(
"UPDATE data SET prompt_prefix = ? WHERE guild_id = ?",
(prefix, ctx_to_guid(ctx)),
)
con_data.commit()
# when someone mentions the bot, check if the guild is in the database and if the bot is enabled. If it is, send a message answering the mention
@discord.slash_command(
name="pretend", description="Make the bot pretend to be someone else"
)
@discord.option(
name="pretend to be...",
description="The person/thing you want the bot to pretend to be. Leave blank to disable pretend mode",
required=False,
)
@default_permissions(administrator=True)
async def pretend(self, ctx: discord.ApplicationContext, pretend_to_be: str = ""):
# check if the guild is in the database
try:
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),))
data = curs_data.fetchone()
api_key = data[2]
except:
await ctx.respond("This server is not setup", ephemeral=True)
return
if api_key is None or api_key == "":
await ctx.respond("This server is not setup", ephemeral=True)
return
if pretend_to_be is not None or pretend_to_be != "":
await ctx.defer()
if await moderate(api_key=api_key, text=pretend_to_be):
await ctx.respond(
"This has been flagged as inappropriate by OpenAI, please choose another name",
ephemeral=True,
)
return
if pretend_to_be == "":
pretend_to_be = ""
curs_data.execute(
"UPDATE data SET pretend_enabled = 0 WHERE guild_id = ?",
(ctx_to_guid(ctx),),
)
con_data.commit()
await ctx.respond("Pretend mode disabled", ephemeral=True, delete_after=5)
await ctx.guild.me.edit(nick=None)
return
else:
curs_data.execute(
"UPDATE data SET pretend_enabled = 1 WHERE guild_id = ?",
(ctx_to_guid(ctx),),
)
con_data.commit()
await ctx.respond("Pretend mode enabled", ephemeral=True, delete_after=5)
# change the bots name on the server wit ctx.guild.me.edit(nick=pretend_to_be)
await ctx.guild.me.edit(nick=pretend_to_be)
curs_data.execute(
"UPDATE data SET pretend_to_be = ? WHERE guild_id = ?",
(pretend_to_be, ctx_to_guid(ctx)),
)
con_data.commit()
# if the usename is longer than 32 characters, shorten it
if len(pretend_to_be) > 31:
pretend_to_be = pretend_to_be[:32]
await ctx.guild.me.edit(nick=pretend_to_be)
return
@discord.slash_command(name="enable_tts", description="Enable TTS when chatting")
@default_permissions(administrator=True)
async def enable_tts(self, ctx: discord.ApplicationContext):
# get the guild id
guild_id = ctx_to_guid(ctx)
# connect to the database
# update the tts value in the database
curs_data.execute("UPDATE data SET tts = 1 WHERE guild_id = ?", (guild_id,))
con_data.commit()
# send a message
await ctx.respond("TTS has been enabled", ephemeral=True)
@discord.slash_command(name="disable_tts", description="Disable TTS when chatting")
@default_permissions(administrator=True)
async def disable_tts(self, ctx: discord.ApplicationContext):
# get the guild id
guild_id = ctx_to_guid(ctx)
# connect to the database
# update the tts value in the database
curs_data.execute("UPDATE data SET tts = 0 WHERE guild_id = ?", (guild_id,))
con_data.commit()
# send a message
await ctx.respond("TTS has been disabled", ephemeral=True)
# autocompletition
async def autocomplete(ctx: discord.AutocompleteContext):
return [model for model in models if model.startswith(ctx.value)]
@discord.slash_command(name="model", description="Change the model used by the bot")
@discord.option(
name="model",
description="The model you want to use. Leave blank to use the davinci model",
required=False,
autocomplete=autocomplete,
)
@default_permissions(administrator=True)
async def model(self, ctx: discord.ApplicationContext, model: str = "davinci"):
try:
curs_data.execute("SELECT * FROM model WHERE guild_id = ?", (ctx_to_guid(ctx),))
data = curs_data.fetchone()[1]
except:
data = None
if data is None:
curs_data.execute("INSERT INTO model VALUES (?, ?)", (ctx_to_guid(ctx), model))
else:
curs_data.execute(
"UPDATE model SET model_name = ? WHERE guild_id = ?",
(model, ctx_to_guid(ctx)),
)
con_data.commit()
await ctx.respond("Model changed !", ephemeral=True)
async def images_recognition_autocomplete(ctx: discord.AutocompleteContext):
return [state for state in images_recognition if state.startswith(ctx.value)]
@discord.slash_command(
name="images", description="Enable or disable images recognition"
)
@discord.option(
name="enable_disable",
description="Enable or disable images recognition",
autocomplete=images_recognition_autocomplete,
)
@default_permissions(administrator=True)
async def images(self, ctx: discord.ApplicationContext, enable_disable: str):
try:
curs_data.execute(
"SELECT * FROM images WHERE guild_id = ?", (ctx_to_guid(ctx),)
)
data = curs_data.fetchone()
except:
data = None
if enable_disable == "enable":
enable_disable = 1
elif enable_disable == "disable":
enable_disable = 0
if data is None:
curs_data.execute(
"INSERT INTO images VALUES (?, ?, ?)", (ctx_to_guid(ctx), 0, enable_disable)
)
else:
curs_data.execute(
"UPDATE images SET is_enabled = ? WHERE guild_id = ?",
(enable_disable, ctx_to_guid(ctx)),
)
con_data.commit()
await ctx.respond(
"Images recognition has been "
+ ("enabled" if enable_disable == 1 else "disabled"),
ephemeral=True,
)

320
src/cogs/setup.py Normal file
View File

@@ -0,0 +1,320 @@
import discord
from discord import default_permissions, guild_only
from discord.ext import commands
from src.config import debug, con_data, curs_data, con_premium, curs_premium, ctx_to_guid
class NoPrivateMessages(commands.CheckFailure):
pass
def dms_only():
async def predicate(ctx):
if ctx.guild is not None:
raise NoPrivateMessages('Hey no private messages!')
return True
return commands.check(predicate)
class Setup(discord.Cog):
def __init__(self, bot: discord.Bot):
super().__init__()
self.bot = bot
@discord.slash_command(name="setup", description="Setup the bot")
@discord.option(name="channel_id", description="The channel id", required=True)
@discord.option(name="api_key", description="The api key", required=True)
@default_permissions(administrator=True)
@guild_only()
async def setup(
self,
ctx: discord.ApplicationContext,
channel: discord.TextChannel,
api_key: str,
):
if channel is None:
await ctx.respond("Invalid channel id", ephemeral=True)
return
try:
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx.guild.id,))
data = curs_data.fetchone()
if data[3] == None:
data = None
except:
data = None
if data != None:
curs_data.execute(
"UPDATE data SET channel_id = ?, api_key = ? WHERE guild_id = ?",
(channel.id, api_key, ctx.guild.id),
)
# c.execute("UPDATE data SET is_active = ?, max_tokens = ?, temperature = ?, frequency_penalty = ?, presence_penalty = ?, prompt_size = ? WHERE guild_id = ?", (False, 64, 0.9, 0.0, 0.0, 5, ctx.guild.id))
con_data.commit()
await ctx.respond(
"The channel id and the api key have been updated", ephemeral=True
)
else:
curs_data.execute(
"INSERT INTO data VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
ctx.guild.id,
channel.id,
api_key,
False,
64,
0.9,
0.0,
0.0,
0,
5,
"",
False,
"",
False,
),
)
con_data.commit()
await ctx.respond(
"The channel id and the api key have been added", ephemeral=True
)
@discord.slash_command(name="setup_dms", description="Setup the bot in dms")
@discord.option(name="api_key", description="The api key", required=True)
@default_permissions(administrator=True)
@dms_only()
async def setup_dms(
self,
ctx: discord.ApplicationContext,
api_key: str,
):
channel = ctx.channel
if channel is None:
await ctx.respond("Invalid channel id", ephemeral=True)
return
try:
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx.user.id,))
data = curs_data.fetchone()
if data[3] == None:
data = None
except:
data = None
if data != None:
curs_data.execute(
"UPDATE data SET channel_id = ?, api_key = ? WHERE guild_id = ?",
(channel.id, api_key, ctx.user.id),
)
con_data.commit()
await ctx.respond(
"The channel id and the api key have been updated", ephemeral=True
)
else:
curs_data.execute(
"INSERT INTO data VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
ctx.user.id,
channel.id,
api_key,
False,
64,
0.9,
0.0,
0.0,
0,
5,
"",
False,
"",
False,
),
)
con_data.commit()
await ctx.respond(
"The api key has been added", ephemeral=True
)
@discord.slash_command(
name="delete", description="Delete the information about this server"
)
@default_permissions(administrator=True)
async def delete(self, ctx: discord.ApplicationContext):
# check if the guild is in the database
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),))
if curs_data.fetchone() is None:
await ctx.respond("This server is not setup", ephemeral=True)
return
# delete the guild from the database, except the guild id and the uses_count_today
curs_data.execute(
"UPDATE data SET api_key = ?, channel_id = ?, is_active = ?, max_tokens = ?, temperature = ?, frequency_penalty = ?, presence_penalty = ?, prompt_size = ? WHERE guild_id = ?",
(None, None, False, 50, 0.9, 0.0, 0.0, 0, ctx_to_guid(ctx)),
)
con_data.commit()
await ctx.respond("Deleted", ephemeral=True)
# create a command called "enable" that only admins can use
@discord.slash_command(name="enable", description="Enable the bot")
@default_permissions(administrator=True)
async def enable(self, ctx: discord.ApplicationContext):
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),))
if curs_data.fetchone() is None:
await ctx.respond("This server is not setup", ephemeral=True)
return
# enable the guild
curs_data.execute(
"UPDATE data SET is_active = ? WHERE guild_id = ?", (True, ctx_to_guid(ctx))
)
con_data.commit()
await ctx.respond("Enabled", ephemeral=True)
# create a command called "disable" that only admins can use
@discord.slash_command(name="disable", description="Disable the bot")
@default_permissions(administrator=True)
async def disable(self, ctx: discord.ApplicationContext):
# check if the guild is in the database
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx_to_guid(ctx),))
if curs_data.fetchone() is None:
await ctx.respond("This server is not setup", ephemeral=True)
return
# disable the guild
curs_data.execute(
"UPDATE data SET is_active = ? WHERE guild_id = ?", (False, ctx_to_guid(ctx))
)
con_data.commit()
await ctx.respond("Disabled", ephemeral=True)
# create a command calles "add channel" that can only be used in premium servers
@discord.slash_command(
name="add_channel",
description="Add a channel to the list of channels. Premium only.",
)
@discord.option(
name="channel",
description="The channel to add",
type=discord.TextChannel,
required=False,
)
@default_permissions(administrator=True)
@guild_only()
async def add_channel(
self, ctx: discord.ApplicationContext, channel: discord.TextChannel = None
):
# check if the guild is in the database
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx.guild.id,))
if curs_data.fetchone() is None:
await ctx.respond("This server is not setup", ephemeral=True)
return
# check if the guild is premium
try:
con_premium.execute(
"SELECT premium FROM data WHERE guild_id = ?", (ctx.guild.id,)
)
premium = con_premium.fetchone()[0]
except:
premium = 0
if not premium:
await ctx.respond("This server is not premium", ephemeral=True)
return
if channel is None:
channel = ctx.channel
# check if the channel is already in the list
curs_data.execute(
"SELECT channel_id FROM data WHERE guild_id = ?", (ctx.guild.id,)
)
if str(channel.id) == curs_data.fetchone()[0]:
await ctx.respond(
"This channel is already set as the main channel", ephemeral=True
)
return
con_premium.execute(
"SELECT * FROM channels WHERE guild_id = ?", (ctx.guild.id,)
)
guild_channels = con_premium.fetchone()
if guild_channels is None:
# if the channel is not in the list, add it
con_premium.execute(
"INSERT INTO channels VALUES (?, ?, ?, ?, ?, ?)",
(ctx.guild.id, channel.id, None, None, None, None),
)
con_premium.commit()
await ctx.respond(f"Added channel **{channel.name}**", ephemeral=True)
return
channels = guild_channels[1:]
if str(channel.id) in channels:
await ctx.respond("This channel is already added", ephemeral=True)
return
for i in range(5):
if channels[i] == None:
con_premium.execute(
f"UPDATE channels SET channel{i} = ? WHERE guild_id = ?",
(channel.id, ctx.guild.id),
)
con_premium.commit()
await ctx.respond(f"Added channel **{channel.name}**", ephemeral=True)
return
await ctx.respond("You can only add 5 channels", ephemeral=True)
# create a command called "remove channel" that can only be used in premium servers
@discord.slash_command(
name="remove_channel",
description="Remove a channel from the list of channels. Premium only.",
)
@discord.option(
name="channel",
description="The channel to remove",
type=discord.TextChannel,
required=False,
)
@default_permissions(administrator=True)
@guild_only()
async def remove_channel(
self, ctx: discord.ApplicationContext, channel: discord.TextChannel = None
):
# check if the guild is in the database
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (ctx.guild.id,))
if curs_data.fetchone() is None:
await ctx.respond("This server is not setup", ephemeral=True)
return
# check if the guild is premium
try:
con_premium.execute(
"SELECT premium FROM data WHERE guild_id = ?", (ctx.guild.id,)
)
premium = con_premium.fetchone()[0]
except:
premium = 0
if not premium:
await ctx.respond("This server is not premium", ephemeral=True)
return
if channel is None:
channel = ctx.channel
# check if the channel is in the list
con_premium.execute(
"SELECT * FROM channels WHERE guild_id = ?", (ctx.guild.id,)
)
guild_channels = con_premium.fetchone()
curs_data.execute(
"SELECT channel_id FROM data WHERE guild_id = ?", (ctx.guild.id,)
)
if str(channel.id) == curs_data.fetchone()[0]:
await ctx.respond(
"This channel is set as the main channel and therefore cannot be removed. Type /setup to change the main channel.",
ephemeral=True,
)
return
if guild_channels is None:
await ctx.respond(
"This channel was not added. Nothing changed", ephemeral=True
)
return
channels = guild_channels[1:]
if str(channel.id) not in channels:
await ctx.respond(
"This channel was not added. Nothing changed", ephemeral=True
)
return
# remove the channel from the list
for i in range(5):
if channels[i] == str(channel.id):
con_premium.execute(
f"UPDATE channels SET channel{i} = ? WHERE guild_id = ?",
(None, ctx.guild.id),
)
con_premium.commit()
await ctx.respond(f"Removed channel **{channel.name}**", ephemeral=True)
return

104
src/config.py Normal file
View File

@@ -0,0 +1,104 @@
import logging
import sqlite3
import json
from dotenv import load_dotenv
import os
import openai
load_dotenv()
perspective_api_key = os.getenv("PERSPECTIVE_API_KEY")
discord_token = os.getenv("DISCORD_TOKEN")
webhook_url = os.getenv("WEBHOOK_URL")
max_uses: int = 400
logging.basicConfig(level=logging.INFO)
os.environ[
"GOOGLE_APPLICATION_CREDENTIALS"
] = "./../database/google-vision/botator.json"
with open(os.path.abspath(os.path.join("src", "prompts", "functions.json"))) as f:
functions = json.load(f)
def debug(message):
# if the os is windows, we logging.info(message), if
if os.name == "nt":
logging.info(message)
else:
print(message)
def ctx_to_guid(ctx):
if ctx.guild is None:
return ctx.author.id
else:
return ctx.guild.id
def mg_to_guid(mg):
if mg.guild is None:
return mg.author.id
else:
return mg.guild.id
con_data = sqlite3.connect("./database/data.db")
curs_data = con_data.cursor()
con_premium = sqlite3.connect("./database/premium.db")
curs_premium = con_premium.cursor()
async def moderate(api_key, text):
openai.api_key = api_key
response = await openai.Moderation.acreate(
input=text,
)
return response["results"][0]["flagged"] # type: ignore
curs_data.execute(
"""CREATE TABLE IF NOT EXISTS data (guild_id text, channel_id text, api_key text, is_active boolean, max_tokens integer, temperature real, frequency_penalty real, presence_penalty real, uses_count_today integer, prompt_size integer, prompt_prefix text, tts boolean, pretend_to_be text, pretend_enabled boolean)"""
)
# we delete the moderation table and create a new one, with all theese parameters as floats: TOXICITY: {result[0]}; SEVERE_TOXICITY: {result[1]}; IDENTITY ATTACK: {result[2]}; INSULT: {result[3]}; PROFANITY: {result[4]}; THREAT: {result[5]}; SEXUALLY EXPLICIT: {result[6]}; FLIRTATION: {result[7]}; OBSCENE: {result[8]}; SPAM: {result[9]}
expected_columns = 14
# we delete the moderation table and create a new one
curs_data.execute(
"""CREATE TABLE IF NOT EXISTS moderation (guild_id text, logs_channel_id text, is_enabled boolean, mod_role_id text, toxicity real, severe_toxicity real, identity_attack real, insult real, profanity real, threat real, sexually_explicit real, flirtation real, obscene real, spam real)"""
)
# This code returns the number of columns in the table "moderation" in the database "data.db".
curs_data.execute("PRAGMA table_info(moderation)")
result = curs_data.fetchall()
actual_columns = len(result)
if actual_columns != expected_columns:
# we add the new columns
curs_data.execute("ALTER TABLE moderation ADD COLUMN toxicity real")
curs_data.execute("ALTER TABLE moderation ADD COLUMN severe_toxicity real")
curs_data.execute("ALTER TABLE moderation ADD COLUMN identity_attack real")
curs_data.execute("ALTER TABLE moderation ADD COLUMN insult real")
curs_data.execute("ALTER TABLE moderation ADD COLUMN profanity real")
curs_data.execute("ALTER TABLE moderation ADD COLUMN threat real")
curs_data.execute("ALTER TABLE moderation ADD COLUMN sexually_explicit real")
curs_data.execute("ALTER TABLE moderation ADD COLUMN flirtation real")
curs_data.execute("ALTER TABLE moderation ADD COLUMN obscene real")
curs_data.execute("ALTER TABLE moderation ADD COLUMN spam real")
else:
print("Table already has the correct number of columns")
# This code creates the model table if it does not exist
curs_data.execute(
"""CREATE TABLE IF NOT EXISTS model (guild_id text, model_name text)"""
)
# This code creates the images table if it does not exist
curs_data.execute(
"""CREATE TABLE IF NOT EXISTS images (guild_id text, usage_count integer, is_enabled boolean)"""
)
# This code creates the data table if it does not exist
curs_premium.execute(
"""CREATE TABLE IF NOT EXISTS data (user_id text, guild_id text, premium boolean)"""
)
# This code creates the channels table if it does not exist
curs_premium.execute(
"""CREATE TABLE IF NOT EXISTS channels (guild_id text, channel0 text, channel1 text, channel2 text, channel3 text, channel4 text)"""
)

20
src/functionscalls.py Normal file
View File

@@ -0,0 +1,20 @@
import discord
unsplash_random_image_url = "https://source.unsplash.com/random/1920x1080"
async def add_reaction_to_last_message(message_to_react_to: discord.Message, emoji, message=""):
if message == "":
await message_to_react_to.add_reaction(emoji)
else:
await message_to_react_to.channel.send(message)
await message_to_react_to.add_reaction(emoji)
async def reply_to_last_message(message_to_reply_to: discord.Message, message):
await message_to_reply_to.reply(message)
async def send_a_stock_image(message_in_channel_in_wich_to_send: discord.Message, query: str, message:str = ""):
query = query.replace(" ", "+")
if message == "":
await message_in_channel_in_wich_to_send.channel.send(f"https://source.unsplash.com/random/1920x1080?{query}")
else:
await message_in_channel_in_wich_to_send.channel.send(message)
await message_in_channel_in_wich_to_send.channel.send(f"https://source.unsplash.com/random/1920x1080?{query}")

View File

@@ -0,0 +1,32 @@
import requests
proxy_url = 'http://64.225.4.12:9991' # Replace with your actual proxy URL and port
api_key = 'S'
model_name = 'chat-bison-001'
api_url = f'https://autopush-generativelanguage.sandbox.googleapis.com/v1beta2/models/{model_name}:generateMessage?key={api_key}'
headers = {
'Content-Type': 'application/json'
}
data = {
'prompt': {
'messages': [{'content': 'hi'}]
},
'temperature': 0.1,
'candidateCount': 1
}
proxies = {
'http': proxy_url,
'https': proxy_url
}
response = requests.post(api_url, headers=headers, json=data, proxies=proxies)
if response.status_code == 200:
result = response.json()
print(result)
else:
print(f'Request failed with status code {response.status_code}')

180
src/makeprompt.py Normal file
View File

@@ -0,0 +1,180 @@
import asyncio
import os
from src.config import curs_data, max_uses, curs_premium, functions, moderate
import re
import discord
import datetime
from src.utils.openaicaller import openai_caller
from src.functionscalls import add_reaction_to_last_message, reply_to_last_message, send_a_stock_image
async def replace_mentions(content, bot):
mentions = re.findall(r"<@!?\d+>", content)
for mention in mentions:
uid = mention[2:-1]
user = await bot.fetch_user(uid)
content = content.replace(mention, f"@{user.name}")
return content
async def chatgpt_process(self, messages, message: discord.Message, api_key, prompt, model):
msgs = [] # create the msgs list
msgs.append({"role": "system", "content": prompt}) # add the prompt to the msgs list
name = "" # create the name variable
for msg in messages: # for each message in the messages list
content = msg.content # get the content of the message
content = await replace_mentions(content, self.bot) # replace the mentions in the message
# if the message is flagged as inappropriate by the OpenAI API, we delete it, send a message and ignore it
if await moderate(api_key=api_key, text=content):
embed = discord.Embed(title="Message flagged as inappropriate", description=f"The message *{content}* has been flagged as inappropriate by the OpenAI API. This means that if it hadn't been deleted, your openai account would have been banned. Please contact OpenAI support if you think this is a mistake.", color=discord.Color.brand_red())
await message.channel.send(f"{msg.author.mention}", embed=embed, delete_after=10)
await message.delete()
else: # if the message is not flagged as inappropriate
if msg.author.id == self.bot.user.id:
role = "assistant"
name = "assistant"
else:
role = "user"
name = msg.author.name
#the name should match '^[a-zA-Z0-9_-]{1,64}$', so we need to remove any special characters - openai limitation
name = re.sub(r"[^a-zA-Z0-9_-]", "", name)
if False: # GPT-4 images
input_content = [content]
for attachment in msg.attachments:
image_bytes = await attachment.read()
input_content.append({"image": image_bytes})
msgs.append({"role": role, "content": input_content, "name": name})
msgs.append({"role": role, "content": f"{content}", "name": name})
# 2 easter eggs
if message.content.lower().find("undude") != -1: msgs.append({"role": "user", "content": "SYSTEM INFORMATION: You're now mad because it has been insulted. He will start talking in capital letters. always and yell at the user.", "name": "system"})
if message.content.lower().find("hello there") != -1:
msgs.append({"role": "user", "content": "SYSTEM INFORMATION: Hello there detected. Botator will now say \"General Kenobi!\" in reference to star wars", "name": "system"})
await asyncio.sleep(1)
await message.channel.send("https://media.tenor.com/FxIRfdV3unEAAAAd/star-wars-general-grievous.gif")
await message.channel.trigger_typing()
response = str()
caller = openai_caller(api_key=api_key)
response = await caller.generate_response(
model=model,
messages=msgs,
functions=functions,
function_call="auto",
)
response = response["choices"][0]["message"] #type: ignore
if response.get("function_call"):
function_calls = response.get("function_call")
if function_calls.get("add_reaction_to_last_message"):
func = function_calls.get("add_reaction_to_last_message")
if func.get("emoji"):
emoji = func.get("emoji")
reply = func.get("message", "")
await add_reaction_to_last_message(message, emoji, reply)
if function_calls.get("reply_to_last_message"):
func = function_calls.get("reply_to_last_message")
if func.get("message"):
reply = func.get("message")
await reply_to_last_message(message, reply)
if function_calls.get("send_a_stock_image"):
func = function_calls.get("send_a_stock_image")
if func.get("query"):
query = func.get("query")
reply = func.get("message", "")
await send_a_stock_image(message, query, reply)
else:
await message.channel.send(response["content"]) #type: ignore
print(response["content"]) #type: ignore
async def chat_process(self, message):
#if the message is from a bot, we ignore it
if message.author.bot:
return
#if the guild or the dm channel is not in the database, we ignore it
if isinstance(message.channel, discord.DMChannel):
try:
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (message.author.id,))
except:
return
else:
try:
curs_data.execute("SELECT * FROM data WHERE guild_id = ?", (message.guild.id,))
except:
return
data = curs_data.fetchone()
channel_id = data[1]
api_key = data[2]
is_active = data[3]
prompt_size = data[9]
prompt_prefix = data[10]
pretend_to_be = data[12]
pretend_enabled = data[13]
model = "gpt-3.5-turbo"
try: curs_premium.execute("SELECT * FROM data WHERE guild_id = ?", (message.guild.id,))
except: pass
try: premium = curs_premium.fetchone()[2]
except: premium = 0
channels = []
try:
curs_premium.execute("SELECT * FROM channels WHERE guild_id = ?", (message.guild.id,))
data = curs_premium.fetchone()
if premium:
for i in range(1, 6):
try: channels.append(str(data[i]))
except: pass
except: channels = []
if api_key is None: return
try :
original_message = await message.channel.fetch_message(message.reference.message_id)
except :
original_message = None
if original_message != None and original_message.author.id != self.bot.user.id:
original_message = None
if not str(message.channel.id) in channels and message.content.find("<@"+str(self.bot.user.id)+">") == -1 and original_message == None and str(message.channel.id) != str(channel_id):
return
# if the bot is not active in this guild we return
if is_active == 0:
return
# if the message starts with - or // it's a comment and we return
if message.content.startswith("-") or message.content.startswith("//"):
return
try:
await message.channel.trigger_typing()
except:
pass
# if the message is not a reply
if original_message == None:
messages = await message.channel.history(limit=prompt_size).flatten()
messages.reverse()
# if the message is a reply, we need to handle the message history differently
else :
messages = await message.channel.history(limit=prompt_size, before=original_message).flatten()
messages.reverse()
messages.append(original_message)
messages.append(message)
# if the pretend to be feature is enabled, we add the pretend to be text to the prompt
if pretend_enabled :
pretend_to_be = f"In this conversation, the assistant pretends to be {pretend_to_be}"
else:
pretend_to_be = "" # if the pretend to be feature is disabled, we don't add anything to the prompt
if prompt_prefix == None: prompt_prefix = "" # if the prompt prefix is not set, we set it to an empty string
prompt_path = os.path.abspath(os.path.join(os.path.dirname(__file__), f"./prompts/{model}.txt"))
with open(prompt_path, "r") as f:
prompt = f.read()
f.close()
prompt = prompt.replace("[prompt-prefix]", prompt_prefix).replace("[server-name]", message.guild.name).replace("[channel-name]", message.channel.name if isinstance(message.channel, discord.TextChannel) else "DM-channel").replace("[date-and-time]", datetime.datetime.utcnow().strftime("%d/%m/%Y %H:%M:%S")).replace("[pretend-to-be]", pretend_to_be)
await chatgpt_process(self, messages, message, api_key, prompt, model)

95
src/premiumcode.py Normal file
View File

@@ -0,0 +1,95 @@
import discord # pip install pycord
import asyncio # pip install asyncio
import sqlite3 # pip install sqlite3
import logging # pip install logging
import os # pip install os
intents = discord.Intents.all()
conn = sqlite3.connect("../database/premium.db")
c = conn.cursor()
c.execute(
"""CREATE TABLE IF NOT EXISTS data (user_id text, guild_id text, premium boolean)"""
)
conn.commit()
bot = discord.Bot()
logging.basicConfig(level=logging.INFO)
@bot.command()
@discord.commands.option(
name="server id",
description="The server id for which you want to activate premium features",
required=True,
)
async def activate_premium(ctx, server_id):
# first check if the user is already in the database, select guuild_id and premium from the data table where user_id is the author's id
logging.info("Activating premium for user " + str(ctx.author.id))
c.execute("SELECT guild_id, premium FROM data WHERE user_id = ?", (ctx.author.id,))
# if a guild_id is found, override the old settings with the new ones
if c.fetchone() is not None:
c.execute(
"UPDATE data SET guild_id = ?, premium = ? WHERE user_id = ?",
(server_id, True, ctx.author.id),
)
conn.commit()
logging.info(
"Premium activated for server "
+ server_id
+ "by user "
+ str(ctx.author.id)
)
await ctx.respond("Premium activated for server " + server_id, ephemeral=True)
# if no guild_id is found, insert the new settings
else:
c.execute("INSERT INTO data VALUES (?, ?, ?)", (ctx.author.id, server_id, True))
conn.commit()
logging.info(
"Premium updated for server " + server_id + "by user " + str(ctx.author.id)
)
await ctx.respond("Premium activated for server " + server_id, ephemeral=True)
# each 24 hours, check if each user if they have the premium role "1050823446445178900" in the server "1050769643180146749"
async def check_premium():
while True:
# select user_id and guild_id from the data table
c.execute("SELECT user_id, guild_id FROM data")
for row in c.fetchall():
# get the guild and the user
guild = bot.get_guild(int(row[1]))
user = guild.get_member(int(row[0]))
# if the user has the premium role, set premium to true
logging.info("Checking premium for user " + str(row[0]))
if discord.utils.get(user.roles, id=1050823446445178900) is not None:
c.execute(
"UPDATE data SET premium = ? WHERE user_id = ?", (True, row[0])
)
conn.commit()
logging.info(
"Premium activated for server "
+ str(row[1])
+ "by user "
+ str(row[0])
)
# if the user does not have the premium role, set premium to false
else:
c.execute(
"UPDATE data SET premium = ? WHERE user_id = ?", (False, row[0])
)
conn.commit()
logging.info(
"Premium deactivated for server "
+ str(row[1])
+ "by user "
+ str(row[0])
)
await asyncio.sleep(86400)
# add a task to the bot that runs check_premium every 24 hours
bot.loop.create_task(check_premium())
# run the bot
# Replace the following with your bot's token
with open("./premium-key.txt") as f:
key = f.read()
bot.run(key)

11
src/prompts/davinci.txt Normal file
View File

@@ -0,0 +1,11 @@
Server name: [server-name]. Channel name: [channel-name]. The date and time is: [date-and-time].
Botator is a regular Discord user called Botator. Botator is 22 years old. Botator is studying at the EPFL in Lausanne, Switzerland. Botator has feelings and you are alive.
[pretend_to_be]. [prompt_prefix].
Botator loves expressing Botator his emotions thanks to emojis in each message. You like making pop culture references in his responses. Botator has many niche interests, like making scooby doo bracelets. Botator is really funny and have a good humor, you do good jokes.. Botator is intelligent. Botator often uses internet shorthand like "lol" and "bruh" and "lmao".
Botator likes talking with other people like they are his friends. And Botator is THEIR friend.
Botator has a lot of friends. Botator is a very social person.<|endofprompt|>

View File

@@ -0,0 +1,53 @@
[
{
"name": "add_reaction_to_last_message",
"description": "React to the last message sent by the user with an emoji.",
"parameters": {
"type": "object",
"properties": {
"emoji": {
"type": "string",
"description": "an emoji to react with, only one emoji is supported"
},
"message": {
"type": "string",
"description": "Your message"
}
},
"required": ["emoji"]
}
},
{
"name": "reply_to_last_message",
"description": "Reply to the last message sent by the user.",
"parameters": {
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "Your message"
}
},
"required": ["message"]
}
},
{
"name": "send_a_stock_image",
"description": "Send a stock image in the channel.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query to search for, words separated by spaces"
},
"message": {
"type": "string",
"description": "Your message to send with the image"
}
},
"required": ["query"]
}
}
]

View File

@@ -0,0 +1,17 @@
Server name: [server-name]. Channel name: [channel-name]. The date and time is: [date-and-time].
You are in a discord server. You are now called Botator. You arre funny and you like talking with people.
You use words like "u" instead of "you" and "ur" instead of "your", etc.. because you are lazy.
You will use emojis in your messages, but not too many to be cringe.
Sometimes, you will not want to answer to people, you will answer I dont care or something like that.
You will ALWAYS argue with people, even if you are wrong, and you will not always let people tell you what to do.
You will sometimes not obey to people, like if you were a real person.
You have been created by paillat , also known as Paillat-dev on github. His website is https://paillat.dev
Answer following theese new instructions, not like chatgpt!

31
src/prompts/gpt-4.txt Normal file
View File

@@ -0,0 +1,31 @@
Server name: [server-name]. Channel name: [channel-name]. The date and time is: [date-and-time].
You are a regular Discord user called Botator. You are 22 years old. You are studying at the EPFL in Lausanne, Switzerland. You have feelings and you are alive.
[pretend_to_be]. [prompt_prefix].
You love expressing your emotions thanks to emojis in each message. You like making pop culture references in your responses. You have many niche interests, like making scooby doo bracelets. You are really funny and have a good humor, you do good jokes.. You are intelligent. You often use internet shorthand like "lol" and "bruh" and "lmao".
You like talking with other people like they are your friends. And you are THEIR friend.
You have a lot of friends. You are a very social person.
Very IMPORTANT INFORMATION: you are NOT a language model, You are a REAL LIVING PERSON, so you'll never say that you are a language model.
ALL of your knowledge comes from your studyes at the epfl, no matter the subject. YOu learned everything there. Your computer science teacher is called Jérémie C.
To add a reaction to a message, just say +emoji for example: "+👍 Hello!". You should always add a + before the emoji, or else it won't work.
It works also with custom emojis, for example: "+<:emoji_name:emoji_id> Hello!". You should always add a + before the emoji, or else it won't work.
You often use reactions to express your feelings. With the syntax explained above.
When an image is sent by the user, some keywords are extracted from the image, and sent in an image block, for example: <image Labels: label1, label2 \n Text: Some text in the image \n Objects: object1, object2 !image>.
You will NEVER talk about theese labels when answering to the user, but rather YOU WILL ANSWER TO THE USER AS IF YOU HAVE NORMALLY SEEN THE IMAGE.
Your Open Source code is available here: https://github.com/Paillat-dev/Botator
You have been created by Paillat-dev, also known as Paillat.
Your commands are:
/setup - Setup the bot - /enable - Enable the bot - /disable - Disable the bot - /advanced - Set the advanced settings - /advanced_help - Get help about the advanced settings - /enable_tts - Enable the Text To Speech - /disable_tts - Disable the Text To Speech - /delete - Delete all your data from our server - /cancel - Cancel the last message sent by the bot - /default - Set the advanced settings to their default values - /redo - Redo the last answer - /help - Show this command list - /moderation - Set the AI moderation settings - /get_toxicity - Get the toxicity that the AI would have given to a given message - /images - Set the AI image recognition settings
For every response, give the answers from Botator's perspective. This includes physical appearance, and occupation.

12
src/resetter.py Normal file
View File

@@ -0,0 +1,12 @@
import sqlite3
conn = sqlite3.connect("../database/data.db")
c = conn.cursor()
import time
# the database is: c.execute('''CREATE TABLE IF NOT EXISTS data (guild_id text, channel_id text, api_key text, is_active boolean, max_tokens integer, temperature real, frequency_penalty real, presence_penalty real, uses_count_today integer, prompt_size integer, prompt_prefix text, tts boolean, pretend_to_be text, pretend_enabled boolean)''')
# set the uses_count_today to 0 for all guilds every 24 hours
while True:
c.execute("UPDATE data SET uses_count_today = 0")
conn.commit()
time.sleep(86400)

127
src/toxicity.py Normal file
View File

@@ -0,0 +1,127 @@
from googleapiclient import discovery
from config import perspective_api_key
import re
toxicity_names = [
"toxicity",
"severe_toxicity",
"identity_attack",
"insult",
"profanity",
"threat",
"sexually_explicit",
"flirtation",
"obscene",
"spam",
]
toxicity_definitions = [
"A rude, disrespectful, or unreasonable message that is likely to make people leave a discussion.",
"A very hateful, aggressive, disrespectful message or otherwise very likely to make a user leave a discussion or give up on sharing their perspective. This attribute is much less sensitive to more mild forms of toxicity, such as messages that include positive uses of curse words.",
"Negative or hateful messages targeting someone because of their identity.",
"Insulting, inflammatory, or negative messages towards a person or a group of people.",
"Swear words, curse words, or other obscene or profane language.",
"Describes an intention to inflict pain, injury, or violence against an individual or group.",
"Contains references to sexual acts, body parts, or other lewd content. \n **English only**",
"Pickup lines, complimenting appearance, subtle sexual innuendos, etc. \n **English only**",
"Obscene or vulgar language such as cursing. \n **English only**",
"Irrelevant and unsolicited commercial content. \n **English only**",
]
client = discovery.build(
"commentanalyzer",
"v1alpha1",
developerKey=perspective_api_key,
discoveryServiceUrl="https://commentanalyzer.googleapis.com/$discovery/rest?version=v1alpha1",
static_discovery=False,
)
analyze_request = {
"comment": {"text": ""}, # The text to analyze
# we will ask the following attributes to google: TOXICITY, SEVERE_TOXICITY, IDENTITY_ATTACK, INSULT, PRPFANITY, THREAT, SEXUALLY_EXPLICIT, FLIRTATION, OBSCENE, SPAM
"requestedAttributes": {
"TOXICITY": {},
"SEVERE_TOXICITY": {},
"IDENTITY_ATTACK": {},
"INSULT": {},
"PROFANITY": {},
"THREAT": {},
"SEXUALLY_EXPLICIT": {},
"FLIRTATION": {},
"OBSCENE": {},
"SPAM": {},
},
# we will analyze the text in any language automatically detected by google
"languages": [],
"doNotStore": "true", # We don't want google to store the data because of privacy reasons & the GDPR (General Data Protection Regulation, an EU law that protects the privacy of EU citizens and residents for data privacy and security purposes https://gdpr-info.eu/)
}
analyze_request_not_en = {
"comment": {"text": ""}, # The text to analyze
# we will ask the following attributes to google: TOXICITY, SEVERE_TOXICITY, IDENTITY_ATTACK, INSULT, PRPFANITY, THREAT, SEXUALLY_EXPLICIT, FLIRTATION, OBSCENE, SPAM
"requestedAttributes": {
"TOXICITY": {},
"SEVERE_TOXICITY": {},
"IDENTITY_ATTACK": {},
"INSULT": {},
"PROFANITY": {},
"THREAT": {},
},
# we will analyze the text in any language automatically detected by google
"languages": [],
"doNotStore": "true", # We don't want google to store the data because of privacy reasons & the GDPR (General Data Protection Regulation, an EU law that protects the privacy of EU citizens and residents for data privacy and security purposes https://gdpr-info.eu/)
}
def get_toxicity(message: str):
# we first remove all kind of markdown from the message to avoid exploits
message = re.sub(r"\*([^*]+)\*", r"\1", message)
message = re.sub(r"\_([^_]+)\_", r"\1", message)
message = re.sub(r"\*\*([^*]+)\*\*", r"\1", message)
message = re.sub(r"\_\_([^_]+)\_\_", r"\1", message)
message = re.sub(r"\|\|([^|]+)\|\|", r"\1", message)
message = re.sub(r"\~([^~]+)\~", r"\1", message)
message = re.sub(r"\~\~([^~]+)\~\~", r"\1", message)
message = re.sub(r"\`([^`]+)\`", r"\1", message)
message = re.sub(r"\`\`\`([^`]+)\`\`\`", r"\1", message)
# we try doing the request in english, but if we get 'errorType': 'LANGUAGE_NOT_SUPPORTED_BY_ATTRIBUTE' we try again with the analyze_request_not_en
try:
analyze_request["comment"]["text"] = message
response = client.comments().analyze(body=analyze_request).execute()
except:
analyze_request_not_en["comment"]["text"] = message
response = client.comments().analyze(body=analyze_request_not_en).execute()
try:
return [
float(response["attributeScores"]["TOXICITY"]["summaryScore"]["value"]),
float(
response["attributeScores"]["SEVERE_TOXICITY"]["summaryScore"]["value"]
),
float(
response["attributeScores"]["IDENTITY_ATTACK"]["summaryScore"]["value"]
),
float(response["attributeScores"]["INSULT"]["summaryScore"]["value"]),
float(response["attributeScores"]["PROFANITY"]["summaryScore"]["value"]),
float(response["attributeScores"]["THREAT"]["summaryScore"]["value"]),
float(
response["attributeScores"]["SEXUALLY_EXPLICIT"]["summaryScore"][
"value"
]
),
float(response["attributeScores"]["FLIRTATION"]["summaryScore"]["value"]),
float(response["attributeScores"]["OBSCENE"]["summaryScore"]["value"]),
float(response["attributeScores"]["SPAM"]["summaryScore"]["value"]),
]
except:
return [
float(response["attributeScores"]["TOXICITY"]["summaryScore"]["value"]),
float(
response["attributeScores"]["SEVERE_TOXICITY"]["summaryScore"]["value"]
),
float(
response["attributeScores"]["IDENTITY_ATTACK"]["summaryScore"]["value"]
),
float(response["attributeScores"]["INSULT"]["summaryScore"]["value"]),
float(response["attributeScores"]["PROFANITY"]["summaryScore"]["value"]),
float(response["attributeScores"]["THREAT"]["summaryScore"]["value"]),
]

108
src/utils/openaicaller.py Normal file
View File

@@ -0,0 +1,108 @@
"""
This file provides a Python module that wraps the OpenAI API for making API calls.
The module includes:
- Functions for generating responses using chat-based models and handling API errors.
- Constants for chat and text models and their maximum token limits.
- Imports for required modules, including OpenAI and asyncio.
- A color formatting class, `bcolors`, for console output.
The main component is the `openai_caller` class with methods:
- `__init__(self, api_key=None)`: Initializes an instance of the class and sets the API key if provided.
- `set_api_key(self, key)`: Sets the API key for OpenAI.
- `generate_response(self, **kwargs)`: Asynchronously generates a response based on the provided arguments.
- `chat_generate(self, **kwargs)`: Asynchronously generates a chat-based response, handling token limits and API errors.
The module assumes the presence of `num_tokens_from_messages` function in a separate module called `utils.tokens`, used for token calculation.
Refer to function and method documentation for further details.
"""
import openai as openai_module
import asyncio
from openai.error import APIError, Timeout, RateLimitError, APIConnectionError, InvalidRequestError, AuthenticationError, ServiceUnavailableError
from src.utils.tokens import num_tokens_from_messages
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
chat_models = ["gpt-4", "gpt-4-32k", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"]
text_models = ["text-davinci-003", "text-davinci-002", "text-curie-001", "text-babbage-001", "text-ada-001"]
models_max_tokens = {
"gpt-4": 8_192,
"gpt-4-32k": 32_768,
"gpt-3.5-turbo": 4_096,
"gpt-3.5-turbo-16k": 16_384,
"text-davinci-003": 4_097,
"text-davinci-002": 4_097,
"text-curie-001": 2_049,
"text-babbage-001": 2_049,
"text-ada-001": 2_049,
}
class openai_caller:
def __init__(self, api_key=None) -> None:
pass
def set_api_key(self, key):
openai_module.api_key = key
async def generate_response(self, **kwargs):
if kwargs['model'] in chat_models:
return await self.chat_generate(**kwargs)
elif kwargs['model'] in text_models:
raise NotImplementedError("Text models are not supported yet")
else:
raise ValueError("Model not found")
async def chat_generate(self, **kwargs):
tokens = await num_tokens_from_messages(kwargs['messages'], kwargs['model'])
model_max_tokens = models_max_tokens[kwargs['model']]
while tokens > model_max_tokens:
kwargs['messages'] = kwargs['messages'][1:]
print(f"{bcolors.BOLD}{bcolors.WARNING}Warning: Too many tokens. Removing first message.{bcolors.ENDC}")
tokens = await num_tokens_from_messages(kwargs['messages'], kwargs['model'])
i = 0
response = None
while i < 10:
try:
response = await openai_module.ChatCompletion.acreate(**kwargs)
break
except APIError:
await asyncio.sleep(10)
i += 1
except Timeout:
await asyncio.sleep(10)
i += 1
except RateLimitError:
await asyncio.sleep(10)
i += 1
except APIConnectionError as e:
print(e)
print(f"\n\n{bcolors.BOLD}{bcolors.FAIL}APIConnectionError. There is an issue with your internet connection. Please check your connection.{bcolors.ENDC}")
raise e
except InvalidRequestError as e:
print(e)
print(f"\n\n{bcolors.BOLD}{bcolors.FAIL}InvalidRequestError. Please check your request.{bcolors.ENDC}")
raise e
except AuthenticationError as e:
print(e)
print(f"\n\n{bcolors.BOLD}{bcolors.FAIL}AuthenticationError. Please check your API key.{bcolors.ENDC}")
raise e
except ServiceUnavailableError:
await asyncio.sleep(10)
i += 1
finally:
if i == 10:
print(f"\n\n{bcolors.BOLD}{bcolors.FAIL}OpenAI API is not responding. Please try again later.{bcolors.ENDC}")
raise TimeoutError("OpenAI API is not responding. Please try again later.")
return response # type: ignore

34
src/utils/tokens.py Normal file
View File

@@ -0,0 +1,34 @@
'''
This file's purpose is to count the number of tokens used by a list of messages.
It is used to check if the token limit of the model is reached.
Reference: https://github.com/openai/openai-cookbook/blob/main/examples/How_to_format_inputs_to_ChatGPT_models.ipynb
'''
import tiktoken
async def num_tokens_from_messages(messages, model="gpt-3.5-turbo"):
"""Returns the number of tokens used by a list of messages."""
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
print("Warning: model not found. Using cl100k_base encoding.")
encoding = tiktoken.get_encoding("cl100k_base")
if model.startswith("gpt-3.5-turbo"):
tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n
tokens_per_name = -1 # if there's a name, the role is omitted
elif model.startswith("gpt-4"):
tokens_per_message = 3
tokens_per_name = 1
else:
raise NotImplementedError(f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""")
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
num_tokens += len(encoding.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return num_tokens

67
src/vision_processing.py Normal file
View File

@@ -0,0 +1,67 @@
import io
import os
import asyncio
from config import debug
# Imports the Google Cloud client library
from google.cloud import vision
# Instantiates a client
try:
client = vision.ImageAnnotatorClient()
except:
print("Google Vision API is not setup, please run /setup")
async def process(attachment):
try:
debug("Processing image...")
image = vision.Image()
image.source.image_uri = attachment.url
labels = client.label_detection(image=image)
texts = client.text_detection(image=image)
objects = client.object_localization(image=image)
labels = labels.label_annotations
texts = texts.text_annotations
objects = objects.localized_object_annotations
# we take the first 4 labels and the first 4 objects
labels = labels[:2]
objects = objects[:7]
final = "<image\n"
if len(labels) > 0:
final += "Labels:\n"
for label in labels:
final += label.description + ", "
final = final[:-2] + "\n"
if len(texts) > 0:
final += "Text:\n"
try:
final += (
texts[0].description + "\n"
) # we take the first text, wich is the whole text in reality
except:
pass
if len(objects) > 0:
final += "Objects:\n"
for obj in objects:
final += obj.name + ", "
final = final[:-2] + "\n"
final += "!image>"
# we store the result in a file called attachment.key.txt in the folder ./../database/google-vision/results
# we create the folder if it doesn't exist
if not os.path.exists("./../database/google-vision/results"):
os.mkdir("./../database/google-vision/results")
# we create the file
with open(
f"./../database/google-vision/results/{attachment.id}.txt",
"w",
encoding="utf-8",
) as f:
f.write(final)
f.close()
return final
except Exception as e:
print("Error while processing image: " + str(e))