Files
Botator/code/makeprompt.py

550 lines
21 KiB
Python
Raw Normal View History

2023-01-30 22:57:57 +01:00
import asyncio
from config import curs_data, max_uses, curs_premium, con_data, debug, moderate, mg_to_guid, con_premium
2023-03-15 22:30:54 +01:00
import vision_processing
2023-01-30 22:57:57 +01:00
import re
2023-03-06 15:18:30 +01:00
import discord
import datetime
2023-03-06 15:18:30 +01:00
import openai
import emoji
2023-03-15 22:30:54 +01:00
import os
async def historicator(message):
if message.guild != None:
return message.channel
else:
return message.author
async def replace_mentions(content, bot):
mentions = re.findall(r"<@!?\d+>", content)
for mention in mentions:
uid = mention[2:-1]
user = await bot.fetch_user(uid)
content = content.replace(mention, f"@{user.name}")
return content
2023-03-01 21:30:16 +01:00
async def extract_emoji(string):
# Match any character that is jus after a "+"
pattern = r"(?<=\+)."
# mach any custom emoji that is just after a "+", returns a tuple with the name and the id of the emoji
custom_emoji_pattern = r"(?<=\+)<:(.+):(\d+)>"
# now we match the pattern with the string
debug("Extracting emojis from string" + string)
matches = re.findall(pattern, string)
custom_emoji_matches = re.findall(custom_emoji_pattern, string)
found_emojis = []
for match in matches:
# if the match is an emoji, we replace it with the match
if emoji.emoji_count(match) > 0:
found_emojis.append(match)
string = string.replace(
f"+{match}", ""
) # we remove the emoji from the string
for match in custom_emoji_matches:
found_emojis.append(match[1])
string = string.replace(f"+<:{match[0]}:{match[1]}>", "")
return found_emojis, string
def get_guild_data(message):
"""This function gets the data of the guild where the message was sent.
Args:
message (str): Data of the message that was sent
Returns:
dict: A dictionary with the data of the guild
"""
guild_data = {}
guid = mg_to_guid(message)
try:
curs_data.execute(
"SELECT * FROM model WHERE guild_id = ?", (guid,)
) # get the model in the database
data = curs_data.fetchone()
model = data[1]
except:
model = "gpt-3.5-turbo"
try:
curs_premium.execute(
"SELECT * FROM data WHERE guild_id = ?", (guid,)
)
premium = curs_premium.fetchone()[2]
except Exception as e:
premium = 0
2023-04-01 14:23:42 +02:00
guild_data["model"] = "gpt-3.5-turbo" if model == "chatGPT" else model
debug(f"Model: {guild_data['model']}")
debug(f"Model from database: {model}")
guild_data["premium"] = premium
return guild_data
async def need_ignore_message(bot, data_dict, message, guild_data, original_message, channels):
## ---- Message ignore conditions ---- ##
if data_dict["api_key"] is None:
return True # if the api key is not set, return
ret = False
if (
# if the message is not in a premium channel and
not (str(message.channel.id) in [str(channel) for channel in channels]
# if the message doesn't mention the bot and
and (message.content.find("<@" + str(bot.user.id) + ">") != -1
or original_message)) # if the message is not a reply to the bot and
# if the message is not in the default channel
and str(message.channel.id) != str(data_dict["channel_id"])
):
ret = True
for channel in channels:
if str(message.channel.id) == str(channel):
ret = False
print("Message ignored in guild " + str(message.guild.id))
if ret:
return True
# if the bot has been used more than max_uses*5 times in the last 24 hours in this guild and the guild is premium
# send a message and return
elif data_dict["uses_count_today"] >= max_uses * 5 and guild_data["premium"] == 1:
return True
# if the bot is not active in this guild we return
if data_dict["is_active"] == 0:
return True
# if the message starts with - or // it's a comment and we return
if message.content.startswith("-") or message.content.startswith("//"):
return True
# if the bot has been used more than max_uses times in the last 24 hours in this guild and the guild is not premium
# send a message and return
if (
data_dict["uses_count_today"] >= max_uses
and guild_data["premium"] == 0
and mg_to_guid(message) != 1050769643180146749
):
hist = await historicator(message)
await hist.send(
f"The bot has been used more than {str(max_uses)} times in the last 24 hours in this guild. Please try again in 24h."
)
return True
return False
async def get_data_dict(message):
try:
if isinstance(message.channel, discord.DMChannel):
curs_data.execute(
"SELECT * FROM data WHERE guild_id = ?", (mg_to_guid(message),)
)
else:
curs_data.execute(
"SELECT * FROM data WHERE guild_id = ?", (mg_to_guid(message),)
)
data = curs_data.fetchone()
# Create a dict with the data
data_dict = {
"channel_id": data[1],
"api_key": data[2],
"is_active": data[3],
"max_tokens": data[4],
"temperature": data[5],
"frequency_penalty": data[6],
"presence_penalty": data[7],
"uses_count_today": data[8],
"prompt_size": data[9],
"prompt_prefix": data[10],
"tts": bool(data[11]),
"pretend_to_be": data[12],
"pretend_enabled": data[13],
"images_enabled": 0,
"images_usage": 0,
}
try:
curs_data.execute(
"SELECT * FROM images WHERE user_id = ?", (mg_to_guid(message)))
images_data = curs_data.fetchone()
except:
images_data = None
if not images_data:
images_data = [0, 0, 0]
images_data = [mg_to_guid(message), 0, 0]
data_dict["images_usage"] = 0 if mg_to_guid(message) == 1050769643180146749 else images_data[1]
print(type(images_data))
print(type(data_dict))
print(type(images_data[2]))
data_dict["images_enabled"] = images_data[2]
data_dict["images_usage"] = images_data[1]
return data_dict
except Exception as e:
hist = await historicator(message)
await hist.send(
"The bot is not configured yet. Please use `//setup` to configure it. \n" +
"If it still doesn't work, it might be a database error. \n ```" + e.__str__()
+ "```", delete_after=60
)
def get_prompt(guild_data, data_dict, message, pretend_to_be):
# support for custom prompts
custom_prompt_path = f"../database/prompts/{guild_data['model']}.txt"
if(os.path.exists(custom_prompt_path)):
prompt_path = custom_prompt_path
else:
prompt_path = f"./prompts/{guild_data['model']}.txt"
# open the prompt file for the selected model with utf-8 encoding for emojis
with open(prompt_path, "r", encoding="utf-8") as f:
prompt = f.read()
# replace the variables in the prompt with the actual values
prompt = (
prompt.replace("[prompt-prefix]", data_dict['prompt_prefix'])
.replace("[server-name]", message.guild.name if message.guild else "DMs conversation")
.replace("[channel-name]", message.channel.name if message.guild else "DMs conversation")
.replace(
"[date-and-time]", datetime.datetime.utcnow().strftime("%d/%m/%Y %H:%M:%S")
)
.replace("[pretend-to-be]", pretend_to_be)
)
f.close()
return prompt
async def chat_process(self, message):
"""This function processes the message and sends the prompt to the API
Args:
message (str): Data of the message that was sent
"""
if message.author.bot:
return
guild_data = get_guild_data(message)
data_dict = await get_data_dict(message)
try:
original_message = await message.channel.fetch_message(
message.reference.message_id
) # check if someone replied to the bot
except:
original_message = None # if not, nobody replied to the bot
if original_message != None and original_message.author.id != self.bot.user.id:
# if the message someone replied to is not from the bot, set original_message to None
original_message = None
2023-03-06 23:16:32 +01:00
channels = []
2023-03-01 21:30:16 +01:00
try:
curs_premium.execute(
"SELECT * FROM channels WHERE guild_id = ?", (mg_to_guid(message),) )
channels_data = curs_premium.fetchone()
if guild_data["premium"]:
# for 5 times, we get c.fetchone()[1] to c.fetchone()[5] and we add it to the channels list, each time with try except
2023-03-06 23:28:29 +01:00
for i in range(1, 6):
# we use the i variable to get the channel id
try:
channels.append(str(channels_data[i]))
except Exception as e:
pass
except Exception as e:
pass
print(channels)
if (await need_ignore_message(self.bot, data_dict, message, guild_data, original_message, channels)):
return
2023-03-06 22:47:46 +01:00
try:
try: await message.channel.trigger_typing()
except: pass
if mg_to_guid(message) != 1021872219888033903:
curs_data.execute(
"UPDATE data SET uses_count_today = uses_count_today + 1 WHERE guild_id = ?",
(mg_to_guid(message),),
)
con_data.commit()
hist = await historicator(message)
if original_message == None:
messages = await hist.history(
limit=data_dict["prompt_size"]
).flatten()
messages.reverse()
else:
messages = await hist.history(
limit=data_dict["prompt_size"], before=original_message
).flatten()
messages.reverse()
messages.append(original_message)
messages.append(message)
except Exception as e:
debug("Error while getting message history")
2023-04-03 13:12:20 +02:00
pretend_to_be = data_dict["pretend_to_be"]
pretend_to_be = f"In this conversation, the assistant pretends to be {pretend_to_be}" if data_dict[ "pretend_enabled"] else ""
debug(f"Pretend to be: {pretend_to_be}")
prompt = get_prompt(guild_data, data_dict, message, pretend_to_be) + "\n"
prompt_handlers = {
"gpt-3.5-turbo": gpt_prompt,
"gpt-4": gpt_prompt,
"davinci": davinci_prompt,
}
debug(guild_data["model"])
response = await prompt_handlers[guild_data["model"]](
self.bot, messages, message, data_dict, prompt, guild_data
)
if response != "":
emojis, string = await extract_emoji(response)
debug(f"Emojis: {emojis}")
if len(string) < 1996:
hist = await historicator(message)
await hist.send(string, tts=data_dict["tts"])
else:
# we send in an embed if the message is too long
embed = discord.Embed(
title="Botator response",
description=string,
color=discord.Color.brand_green(),
)
hist = await historicator(message)
await hist.send(embed=embed, tts=data_dict["tts"])
for emoji in emojis:
# if the emoji is longer than 1 character, it's a custom emoji
try:
if len(emoji) > 1:
# if the emoji is a custom emoji, we need to fetch it
# the emoji is in the format id
debug(f"Emoji: {emoji}")
emoji = await message.guild.fetch_emoji(int(emoji))
await message.add_reaction(emoji)
else:
debug(f"Emoji: {emoji}")
await message.add_reaction(emoji)
except:
pass
else:
hist = await historicator(message)
await hist.send(
"The AI is not sure what to say (the response was empty)"
)
async def check_moderate(api_key, message, msg):
if await moderate(api_key=api_key, text=msg.content):
embed = discord.Embed(
title="Message flagged as inappropriate",
description=f"The message *{msg.content}* has been flagged as inappropriate by the OpenAI API. This means that if it hadn't been deleted, your openai account would have been banned. Please contact OpenAI support if you think this is a mistake.",
color=discord.Color.brand_red(),
)
hist = await historicator(message)
await hist.send(
f"{msg.author.mention}", embed=embed, delete_after=10
)
message.delete()
return True
return False
async def check_easter_egg(message, msgs):
if message.content.lower().find("undude") != -1:
msgs.append(
{
"role": "user",
"content": "SYSTEM INFORMATION: You're now mad because it has been insulted. He will start talking in capital letters. always and yell at the user.",
"name": "system",
}
)
if message.content.lower().find("hello there") != -1:
msgs.append(
{
"role": "user",
"content": 'SYSTEM INFORMATION: Hello there detected. Botator will now say "General Kenobi!" in reference to star wars',
"name": "system",
}
)
await asyncio.sleep(1)
hist = await historicator(message)
await hist.send(
"https://media.tenor.com/FxIRfdV3unEAAAAd/star-wars-general-grievous.gif"
)
await message.channel.trigger_typing()
return msgs
async def gpt_prompt(bot, messages, message, data_dict, prompt, guild_data):
msgs = [] # create the msgs list
msgs.append(
{"name": "System", "role": "user", "content": prompt}
) # add the prompt to the msgs list
name = "" # create the name variable
for msg in messages: # for each message in the messages list
content = msg.content # get the content of the message
content = await replace_mentions(
content, bot
) # replace the mentions in the message
if await check_moderate(data_dict["api_key"], message, msg):
continue
content = await replace_mentions(content, bot)
if msg.author.id == bot.user.id:
role = "assistant"
name = "assistant"
else:
role = "user"
name = msg.author.name
name = re.sub(r"[^a-zA-Z0-9_-]", "", name)
if False: # GPT-4 images, not implemented yet
input_content = [content]
for attachment in msg.attachments:
image_bytes = await attachment.read()
input_content.append({"image": image_bytes})
msgs.append({"role": role, "content": input_content, "name": name})
if (
len(msg.attachments) > 0
and role == "user"
and data_dict["images_enabled"] == 1
):
for attachment in msg.attachments:
path = f"./../database/google-vision/results/{attachment.id}.txt"
if data_dict['images_usage'] >= 6 and guild_data["premium"] == 0:
guild_data["images_limit_reached"] = True
elif data_dict['images_usage'] >= 30 and guild_data["premium"] == 1:
guild_data["images_limit_reached"] = True
if (
attachment.url.endswith((".png", ".jpg", ".jpeg", ".gif"))
and not guild_data["images_limit_reached"]
and not os.path.exists(path)
):
data_dict['images_usage'] += 1
analysis = await vision_processing.process(attachment)
if analysis != None:
content = f"{content} \n\n {analysis}"
msgs.append(
{
"role": role,
"content": f"{content}",
"name": name,
}
)
# if the attachment is still an image, we can check if there's a file called ./../database/google-vision/results/{attachment.id}.txt, if there is, we add the content of the file to the message
elif attachment.url.endswith(
(".png", ".jpg", ".jpeg", ".gif")
) and os.path.exists(path):
try:
with open(
path,
"r",
) as f:
content = f"{content} \n\n {f.read()}"
except:
debug(f"Error while reading {path}")
finally:
msgs.append(
{
"role": role,
"content": f"{content}",
"name": name,
}
)
f.close()
else:
msgs.append(
{"role": role, "content": f"{content}", "name": name}
)
curs_data.execute(
"UPDATE images SET usage_count = ? WHERE guild_id = ?",
(data_dict['images_usage'], mg_to_guid(message)),
)
else:
msgs.append({"role": role, "content": f"{content}", "name": name})
msgs = await check_easter_egg(message, msgs)
response = ""
should_break = True
for x in range(10):
try:
openai.api_key = data_dict["api_key"]
response = await openai.ChatCompletion.acreate(
model=guild_data["model"],
temperature=2,
top_p=0.9,
frequency_penalty=0,
presence_penalty=0,
messages=msgs,
# max tokens is 4000, that's a lot of text! (the max tokens is 2048 for the davinci model)
max_tokens=512,
)
if (
response.choices[0]
.message.content.lower()
.find("as an ai language model")
!= -1
):
should_break = False
# react with a redone arrow
await message.add_reaction("🔃")
else:
should_break = True
except Exception as e:
should_break = False
hist = await historicator(message)
await hist.send(
f"```diff\n-Error: OpenAI API ERROR.\n\n{e}```", delete_after=5
)
# if the ai said "as an ai language model..." we continue the loop" (this is a bug in the chatgpt model)
if response == None:
should_break = False
if should_break:
break
await asyncio.sleep(15)
await message.channel.trigger_typing()
response = response.choices[0].message.content
try:
if guild_data["images_limit_reached"]:
hist = await historicator(message)
await hist.send(
f"```diff\n-Warning: You have reached the image limit for this server. You can upgrade to premium to get more images recognized. More info in our server: https://discord.gg/sxjHtmqrbf```",
delete_after=10,
)
except:
pass
return response
async def davinci_prompt(self, messages, message, data_dict, prompt, guild_data):
debug("davinci_prompt")
for msg in messages:
if not await check_moderate(data_dict["api_key"], message, msg):
content = msg.content
content = await replace_mentions(content, self)
prompt += f"{msg.author.name}: {content}\n"
# Disabled eastereggs because of compatibility issues with the gpt-3.5 format
# prompt.append(await check_easter_egg(message, prompt))
debug("prompt: " + prompt)
prompt = prompt + f"\n{self.user.name}:"
response = ""
for _ in range(10):
try:
openai.api_key = data_dict["api_key"]
response = await openai.Completion.acreate(
engine="text-davinci-003",
prompt=str(prompt),
max_tokens=int(data_dict["max_tokens"]),
top_p=1,
temperature=float(data_dict["temperature"]),
frequency_penalty=float(data_dict["frequency_penalty"]),
presence_penalty=float(data_dict["presence_penalty"]),
stop=[
" Human:",
" AI:",
"AI:",
"<|endofprompt|>",
],
)
response = response.choices[0].text
except Exception as e:
response = None
hist = await historicator(message)
await hist.send(
f"```diff\n-Error: OpenAI API ERROR.\n\n{e}```", delete_after=10
)
return
if response != None:
return response