I mean its the biggest commit I ever did (did git add * cause I didn't want to explain all of this It's already so complicated)

This commit is contained in:
Paillat
2023-06-25 16:12:23 +02:00
parent 09c98a460b
commit e676d5851e
14 changed files with 568 additions and 123 deletions

2
.gitignore vendored
View File

@@ -119,6 +119,7 @@ venv/
ENV/
env.bak/
venv.bak/
env.yaml
# Spyder project settings
.spyderproject
@@ -160,3 +161,4 @@ test/
ideas/
montageTEMP_MPY_wvf_snd.mp3
marp.exe
channels/

148
classes/channel.py Normal file
View File

@@ -0,0 +1,148 @@
import yaml
import json
import os
import asyncio
from utils.openaicaller import openai
from utils.normalize_file import normalize_file as nf
from utils.config import bcolors
from utils.misc import clear_screen, open_explorer_here, realbcolors, printm
from classes.video import Video
desc_prompt = """Write an engaging and humorous description for the YouTube channel "[name]" that will be placed on the channel's about page. The description should aim to entice viewers to subscribe to the channel. It should be creative, captivating, and slightly longer, but still within a reasonable length. Try to keep it under 1,000 characters to ensure it remains concise and attention-grabbing. Make sure to incorporate the channel's subject, which is "[subject]" Remember that this description will not be part of the channel's videos.
Answer exclusively with the description, and no other form of greeting or salutation, like "Sure, I'll do that!" or "Here's the description:".
"""
ideas_prompt = """You will generate a list of ideas of videos about [subject]. You will suggest topics for videos that will be created and posted on YouTube.
You will output the list of ideas in a json format. The following fields will be included:
- title
- description
Here is an example of the output:
```
[
{
"title": "TITLE OF THE VIDEO",
"description": "A video about something. Concept1 and concept2 are explained. Concept3 is also explained a bit. We also talk about this and this.
},
{
"title": "TITLE OF THE VIDEO",
"description": "A video about something. In this video we will create a thing. We will also see how that is possible. We will also talk about this and this.
},
{
"title": "TITLE OF THE VIDEO",
"description": "A video about something. We will create the following project, from a to z. We will see how to do this and this. We will also talk about this and this.
},
{
"title": "TITLE OF THE VIDEO",
"description": "A video about the story of how John Doe created a thing. We will see how he did it. We will also talk about this and this.
},
]
```
You will not answer anything else in your message. Your answer will only be the json output, without any other text. This is very important. no codeblock, nothing like "Here are .....". Just the json. You will generate 10 ideas. You will never repeat yourself.
Here are the existing ideas wich you should not repeat again.
[existing ideas]
"""
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
openai.set_api_key(os.getenv("OPENAI_API_KEY"))
class Channel:
def __init__(self) -> None:
pass
async def create(self):
printm("We assume that you have a YouTube channel. If you don't, please create one before continuing.")
input("Press enter when you are ready.")
printm("Great, let's go!")
self.name = input(f"First, please tell me the name of your YouTube channel. Press enter when you are done:{bcolors.BOLD}{bcolors.OKCYAN} ")
printm(f"{bcolors.ENDC}", end="")
self.path = os.path.join("channels", f"{self.name}")
self.path = os.path.abspath(self.path)
if os.path.exists(self.path):
raise FileExistsError("Channel already exists")
os.makedirs(self.path)
printm(f"Great! {bcolors.BOLD}{self.name}{bcolors.ENDC} is a great name for a YouTube channel!")
self.subject = input(f"Now, please tell me the subject of your YouTube channel. Press enter when you are done:{bcolors.BOLD}{bcolors.OKCYAN} ")
printm(f"{bcolors.ENDC}", end="") # we use end="" to avoid a new line
printm(f"Great! {bcolors.BOLD}{self.subject}{bcolors.ENDC} is a great subject for a YouTube channel!")
printm("Now, I will generate a description for your channel. Please wait...")
response = await openai.generate_response(model="gpt-3.5-turbo", messages=[{'role':'user', 'content': desc_prompt.replace("[name]", self.name).replace("[subject]", self.subject)}], max_tokens=100, temperature=0.7, top_p=1, frequency_penalty=0, presence_penalty=0.6, stop=["\n\n"])
self.description = response['choices'][0]['message']['content'] # type: ignore
printm(f"Great! Here is the description I generated for your channel: \n**{self.description}**")
printm(f"Now, please paste all the needed file(s) in the folder that will open. Press enter to open the folder.")
input()
open_explorer_here(self.path)
with open(f"{self.path}/description.txt", "w") as f:
f.write(self.description)
f.close()
self.data = {
"name": self.name,
"subject": self.subject,
"description": self.description,
"path": self.path,
}
await self.generate_ideas()
with open(f"{self.path}/channel.yaml", "w") as f:
yaml.dump(self.data, f)
f.close()
input(f"You can sleep now if you are tired. Press enter when you are awake and ready to continue {bcolors.BOLD}{bcolors.OKCYAN}:{bcolors.ENDC}{bcolors.BOLD}{bcolors.WARNING}){bcolors.ENDC}")
clear_screen()
return self.data
async def load(self, name):
self.name = name
self.path = f"channels/{name}"
if not os.path.exists(self.path):
raise FileNotFoundError("Channel not found")
with open(f"{self.path}/channel.yaml", "r") as f:
self.data = yaml.load(f, Loader=yaml.FullLoader)
f.close()
self.ideas = []
if os.path.exists(os.path.join(self.path, "ideas.json")):
with open(os.path.join(self.path, "ideas.json"), "r") as f:
self.ideas = json.load(f)
f.close()
self.name = self.data['name']
self.subject = self.data['subject']
self.description = self.data['description']
self.path = self.data['path']
return self.data
async def generate_ideas(self):
if not os.path.exists(os.path.join(self.path, "ideas.json")):
ideas = []
else:
with open(os.path.join(self.path, "ideas.json"), "r") as f:
ideas = json.load(f)
f.close()
response = await openai.generate_response(model="gpt-3.5-turbo", messages=[{'role':'user', 'content': ideas_prompt.replace("[subject]", self.subject).replace("[existing ideas]", "\n".join([f"- {idea['title']}" for idea in ideas]))}])
string_new_ideas = response['choices'][0]['message']['content'] # type: ignore
new_ideas = json.loads(string_new_ideas)
ideas += new_ideas
with open(os.path.join(self.path, "ideas.json"), "w") as f:
json.dump(ideas, f, indent=4)
f.close()
self.ideas = ideas
return ideas
async def generate_video(self, idea):
#get the idea object from self.ideas
if not idea in self.ideas:
raise ValueError("Idea not found")
if not os.path.exists(os.path.join(self.path, "videos")):
os.makedirs(os.path.join(self.path, "videos"))
self.video = Video(idea, self)
await self.video.generate()
return self.video
if __name__ == "__main__":
async def main():
channl = Channel()
await channl.create()
asyncio.run(main())

66
classes/video.py Normal file
View File

@@ -0,0 +1,66 @@
import os
import json
import yaml
from utils.openaicaller import openai
from utils.normalize_file import normalize_file as nf
from utils.config import bcolors
from utils.misc import clear_screen, open_explorer_here, realbcolors, printm
from utils.uploader import upload_video
from generators.script import generate_script
from generators.montage import mount, prepare
from generators.thumbnail import generate_thumbnail
class Video:
def __init__(self, idea, parent):
self.parent = parent # The parent class, which is a Channel class
self.id = None
self.url = None
self.script = None
self.path = None
self.idea = idea
self.title = self.idea['title']
self.description = self.idea['description']
self.metadata = None
async def generate(self):
normalized_title = await nf(self.idea['title'])
self.path = os.path.join(self.parent.path, "videos", normalized_title)
if not os.path.exists( self.path):
os.makedirs( self.path)
script = None
if os.path.exists(os.path.join( self.path, "script.json")):
printm("Video script already exists. Do you want to overwrite it ?")
if input("y/N") == "y":
os.remove(os.path.join( self.path, "script.json"))
if not os.path.exists(os.path.join( self.path, "script.json")):
script = await generate_script(self.idea['title'], self.idea['description'])
with open(os.path.join( self.path, "script.json"), "w") as f:
json.dump(json.loads(script), f)
f.close()
else:
with open(os.path.join( self.path, "script.json"), "r") as f:
script = json.load(f)
f.close()
await prepare( self.path)
credits = await mount(self.path, script)
self.metadata = {
"title": self.idea['title'],
"description": self.idea['description'] + "\n\n" + credits,
}
await generate_thumbnail( self.path, self.idea['title'], self.idea['description'])
videoid = await upload_video( self.path, self.idea['title'], self.metadata['description'], 28, "", "private", self.path)
printm(f"Your video is ready! You can find it in { self.path}")
video_meta_file = {
"title": self.idea['title'],
"description": self.metadata['description'],
"id": videoid,
"path": self.path,
"url": f"https://www.youtube.com/watch?v={videoid}",
}
with open(os.path.join( self.path, "video.yaml"), "w") as f:
yaml.dump(video_meta_file, f)
f.close()
return video_meta_file

BIN
database/bot_database.db Normal file

Binary file not shown.

View File

@@ -1,10 +1,6 @@
import openai
import os
import json
from dotenv import load_dotenv
load_dotenv()
from utils.openaicaller import openai
openai.api_key = os.getenv("OPENAI_API_KEY")
with open('prompts/ideas.txt') as f:
prompt = f.read()
f.close()
@@ -25,13 +21,13 @@ async def generate_ideas(path, subject):
exuisting_ideas += f"{idea['title']}\n"
prmpt = prmpt.replace('[existing ideas]', exuisting_ideas)
print(prmpt)
response = await openai.ChatCompletion.acreate(
response = await openai.generate_response(
model="gpt-3.5-turbo",
messages=[
{"role":"user","content":prmpt},
],
)
json_in_str= response['choices'][0]['message']['content']
json_in_str= response['choices'][0]['message']['content'] # type: ignore
json_obj = json.loads(json_in_str)
for idea in json_obj:
ides_json.append(idea)

View File

@@ -2,22 +2,22 @@ import json
import os
import requests
import pysrt
import deepl
import random
from generators.speak import generate_voice, voices
from moviepy.video.VideoClip import ImageClip
from moviepy.editor import VideoFileClip, concatenate_videoclips, CompositeAudioClip, concatenate_audioclips
from moviepy.editor import concatenate_videoclips, CompositeAudioClip, concatenate_audioclips
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.audio.fx.all import volumex, audio_fadein, audio_fadeout # type: ignore
from dotenv import load_dotenv
load_dotenv()
unsplash_access = os.getenv("UNSPLASH_ACCESS_KEY") or "UNSPLASH_ACCESS_KEY"
unsplash_url = "https://api.unsplash.com/photos/random/?client_id=" + unsplash_access + "&query="
deepl_access = os.getenv("DEEPL_ACCESS_KEY") or "DEEPL_ACCESS_KEY"
translator = deepl.Translator(deepl_access)
from utils.misc import getenv
def prepare(path):
unsplash_access = getenv("unsplash_access_key")
if not unsplash_access:
raise Exception("UNSPLASH_ACCESS_KEY is not set in .env file")
unsplash_url = "https://api.unsplash.com/photos/random/?client_id=" + unsplash_access + "&query="
async def prepare(path):
with open(path + "/script.json", 'r', encoding='utf-8') as f:
script = json.load(f)
f.close()
@@ -94,7 +94,7 @@ def subs(length, total, text, srt, index):
srt.append(sub)
return srt
def mount(path, script):
async def mount(path, script):
if not os.path.exists(path + "/montage.mp4"):
num_slides = len(os.listdir(path + "/audio"))
clips = []

View File

@@ -1,20 +1,20 @@
import os
import openai
from dotenv import load_dotenv
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
from utils.openaicaller import openai
with open('prompts/script.txt') as f:
global_prompt = f.read()
f.close()
async def generate_script(title, description):
with open('prompts/script.txt') as f:
prompt = f.read()
f.close()
prompt = global_prompt
prompt = prompt.replace("[title]", title)
prompt = prompt.replace("[description]", description)
response = await openai.ChatCompletion.acreate(
'''response = await openai.ChatCompletion.acreate(
model="gpt-4",
messages=[
{"role":"user","content":prompt}
],
)
return response['choices'][0]['message']['content']
)''' # Deprecated. Use openaicaller.py instead
response = await openai.generate_response(model="gpt-4", messages=[{'role':'user', 'content': prompt}])
return response['choices'][0]['message']['content'] # type: ignore

View File

@@ -1,13 +1,10 @@
import openai
import random
import os
from PIL import Image, ImageDraw, ImageFont
import random
from dotenv import load_dotenv
from PIL import Image
load_dotenv()
from PIL import Image, ImageDraw, ImageFont
openai.api_key = os.getenv("OPENAI_API_KEY")
from utils.openaicaller import openai
'''
Putpose of this file is to generate a miniature of the video.
It has a function that takes a path, title, and description and generates a miniature.
@@ -30,11 +27,12 @@ Answer without anything else, just with the 2 textes. Answer with text1 on the f
Here is the title of the video: [TITLE]
Here is the description of the video: [DESCRIPTION]'''
def rand_gradient(image):
async def rand_gradient(image):
randr = random.SystemRandom().randint(1, 20)
randg = random.SystemRandom().randint(1, 20)
randb = random.SystemRandom().randint(1, 20)
textcolor1 = [0, 0, 0]
textcolor2 = [0, 0, 0]
for i in range(image.size[0]):
for j in range(image.size[1]):
colors = [i//randr, j//randg, i//randb]
@@ -47,20 +45,19 @@ def rand_gradient(image):
image.putpixel((i,j), (colors[0], colors[1], colors[2]))
return image, textcolor1, textcolor2
def generate_miniature(path, title, description):
async def generate_thumbnail(path, title, description):
prmpt = prompt.replace("[TITLE]", title).replace("[DESCRIPTION]", description)
response = openai.ChatCompletion.create(
response = openai.generate_response(
model="gpt-4",
messages=[
{"role":"user","content":prmpt},
],
)
response['choices'][0]['message']['content']
text1 = response['choices'][0]['message']['content'].split("\n")[0]
text2 = response['choices'][0]['message']['content'].split("\n")[1]
generate_image(path, text1, text2)
text1 = response['choices'][0]['message']['content'].split("\n")[0] # type: ignore
text2 = response['choices'][0]['message']['content'].split("\n")[1] # type: ignore
await generate_image(path, text1, text2)
def generate_image(path, text1, text2):
async def generate_image(path, text1, text2):
path_to_bcg = path.split("/")[:-1]
path_to_bcg = "/".join(path_to_bcg)
print(path_to_bcg)
@@ -71,7 +68,7 @@ def generate_image(path, text1, text2):
exit()
bcg = Image.open(f"{path_to_bcg}/bcg.png")
img = Image.new('RGBA', (1920, 1080))
img, textcolor1, textcolor2 = rand_gradient(img)
img, textcolor1, textcolor2 = await rand_gradient(img)
draw = ImageDraw.Draw(img)
font1 = ImageFont.truetype("./Sigmar-Regular.ttf", 200)
font2 = ImageFont.truetype("./Sigmar-Regular.ttf", 200)

126
main.py
View File

@@ -1,79 +1,71 @@
import os
import json
import asyncio
import logging
import yaml
from generators.ideas import generate_ideas
from generators.script import generate_script
from generators.montage import mount, prepare
from generators.miniature import generate_miniature
from generators.uploader import upload_video
from classes.channel import Channel
from utils.config import loadingmessage, bcolors
from utils.misc import clear_screen, printm, getenv
from utils.openaicaller import openai
logging.basicConfig(level=logging.INFO)
async def main():
if not os.path.exists('videos'): os.makedirs('videos')
with open('env/subjects.txt', 'r', encoding='utf-8') as f:
subjects = f.read().splitlines()
printm("Loading...")
await asyncio.sleep(1)
clear_screen()
printm(loadingmessage)
await asyncio.sleep(4)
clear_screen()
await asyncio.sleep(1)
printm("Welcome in FABLE, the Film and Artistic Bot for Lively Entertainment!")
await asyncio.sleep(1)
printm(f"This program will generate for you complete {bcolors.FAIL}{bcolors.BOLD}YouTube{bcolors.ENDC} videos, as well as uploading them to YouTube.")
if not os.path.exists('env.yaml'):
printm("It looks like you don't have an OpenAI API key yet. Please paste it here:")
openai_api_key = input("Paste the key here: ")
openai.set_api_key(openai_api_key)
printm("Please also paste your unsplash access key here:")
unsplash_access_key = input("Paste the key here: ")
env_file = {
"openai_api_key": openai_api_key,
"unsplash_access_key": unsplash_access_key
}
with open('env.yaml', 'w') as f:
yaml.dump(env_file, f)
f.close()
for i in range(len(subjects)):
print(str(i) + ". " + subjects[i])
subject = int(input("Which subject do you want to generate ideas for? (enter the number): "))
subject = subjects[subject]
subjectdirpath = "videos/" + subject[:25].replace(" ", "_").replace(":", "")
if not os.path.exists(subjectdirpath):
os.makedirs(subjectdirpath)
input("It looks like it is the first time you are generating ideas for this subject. The requiered folder has been created. Press enter to continue.")
input("Please put all the requiered google credentials files in that folder. Press enter to continue.")
input("Please put a file called bcg.png in that folder. It will be used as the background of the thumbnails. Press enter to continue.")
if input("Do you want to generate new ideas? (y/N): ") == "y":
await generate_ideas(subjectdirpath, subject)
with open(subjectdirpath + '/ideas.json', 'r', encoding='utf-8') as f:
ideas = json.load(f)
f.close()
existing = []
new = []
for i in ideas:
if os.path.exists(subjectdirpath + "/" + i['title'][:25].replace(" ", "_").replace(":", "") + "/script.json"):
existing.append(i)
else:
new.append(i)
print("Existing ideas:")
for i in range(len(existing)):
print(str(i) + ". " + existing[i]['title'])
print("New ideas:")
for i in range(len(new)):
print(str(i + len(existing)) + ". " + new[i]['title'])
idea = int(input("Which idea do you want to generate a script for? (enter the number): "))
if idea < len(existing):
idea = existing[idea]
openai_api_key = getenv('openai_api_key')
openai.set_api_key(openai_api_key)
channels = os.listdir('channels')
if len(channels) == 0:
printm("It looks like you don't have any channels yet. Let's create one!")
channel = Channel()
await channel.create()
else:
idea = new[idea - len(existing)]
title = idea['title']
title = title[:25]
i = 0
path = subjectdirpath + "/" + title
path = path.replace(" ", "_").replace(":", "")
if not os.path.exists(path + "/script.json"):
script = await generate_script(idea['title'], idea['description'])
if os.path.exists(path) and os.path.exists(path + "/script.json"):
if input("There is already a script for this idea. Do you want to overwrite it? (y/n)") != "y":
print("Exiting...")
exit(1)
if not os.path.exists(path): os.makedirs(path)
with open(path + "/script.json", 'w', encoding='utf-8') as f:
f.write(script)
f.close()
script = prepare(path)
credits = mount(path, script)
description = f"{idea['description']}\n\nMusic credits: {credits}"
if credits != None:
with open(path + "/meta.txt", 'w', encoding='utf-8') as f:
f.write(description)
f.close()
generate_miniature(path, title=idea['title'], description=idea['description'])
upload_video(path, idea['title'], description, 28, "", "private", subjectdirpath)
print(f"Your video is ready! You can find it in {path}.")
printm("Here are your channels:")
for i, channel in enumerate(channels):
printm(f"{i+1}. {channel}")
printm(f"{len(channels)+1}. Create a new channel")
index = input("Which channel do you want to use : ")
if index == str(len(channels)+1):
channel = Channel()
await channel.create()
else:
channel_name = channels[int(index)-1]
channel = Channel()
await channel.load(channel_name)
printm("Now, let's create a video!")
printm("Here are all the ideas you have:")
for i, idea in enumerate(channel.ideas):
printm(f"{i+1}. {idea['title']}")
index = input("Which idea do you want to create a video for : ")
idea = channel.ideas[int(index)-1]
video = await channel.generate_video(idea)
printm("Done!")
printm("Here is the video:")
printm(video.url)
if __name__ == "__main__":
asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

35
utils/config.py Normal file
View File

@@ -0,0 +1,35 @@
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
loadingmessage = f"""{bcolors.OKGREEN}
▄████████ ▄████████ ▀█████████▄ ▄█ ▄████████
███ ███ ███ ███ ███ ███ ███ ███ ███
███ █▀ ███ ███ ███ ███ ███ ███ █▀
▄███▄▄▄ ███ ███ ▄███▄▄▄██▀ ███ ▄███▄▄▄
▀▀███▀▀▀ ▀███████████ ▀▀███▀▀▀██▄ ███ ▀▀███▀▀▀
███ ███ ███ ███ ██▄ ███ ███ █▄
███ ███ ███ ███ ███ ███▌ ▄ ███ ███
███ ███ █▀ ▄█████████▀ █████▄▄██ ██████████
{bcolors.ENDC}{bcolors.OKBLUE}Film and Artistic Bot for Lively Entertainment{bcolors.ENDC}
{bcolors.OKCYAN}
Made with 💖 by: *@paillat-dev*
https://paillat.dev
Thanks to *@Code7G* for the suggestions!
Thanks to *ChatGPT* for the name suggestions!
{bcolors.ENDC}
"""
if __name__ == '__main__':
print(loadingmessage)

89
utils/misc.py Normal file
View File

@@ -0,0 +1,89 @@
import os
import yaml
clear_screen = lambda: os.system('cls' if os.name == 'nt' else 'clear')
open_explorer_here = lambda path: os.system(f'explorer.exe "{path}"' if os.name == 'nt' else f'open "{path}"')
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class realbcolors:
PURPLE = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def printm(*args, **kwargs):
result = ''
underline_counter = 0
bold_counter = 0
sep = kwargs.get('sep', ' ')
text = sep.join([str(arg) for arg in args])
i = 0
while i < len(text):
if text[i:].startswith('$***'):
result += text[i:i+4].replace('$', '')
i += 4
continue
elif text[i:].startswith('$**'):
result += text[i:i+3].replace('$', '')
i += 3
continue
elif text[i:].startswith('$*'):
result += text[i:i+2].replace('$', '')
i += 2
continue
elif text[i:].startswith('***'):
if bold_counter % 2 == 0 and underline_counter % 2 == 0:
result += bcolors.BOLD + bcolors.UNDERLINE
elif bold_counter % 2 == 0:
result += bcolors.BOLD
elif underline_counter % 2 == 0:
result += bcolors.UNDERLINE
else:
result += bcolors.ENDC
i += 3
bold_counter += 1
underline_counter += 1
continue
elif text[i:].startswith('**'):
if bold_counter % 2 == 0:
result += bcolors.BOLD
else:
result += bcolors.ENDC
i += 2
bold_counter += 1
continue
elif text[i:].startswith('*'):
if underline_counter % 2 == 0:
result += bcolors.UNDERLINE
else:
result += bcolors.ENDC
i += 1
underline_counter += 1
continue
result += text[i]
i += 1
result += bcolors.ENDC # Ensure the formatting is reset at the end
print(text, **kwargs)
def getenv(var, default=None):
with open('env.yaml', 'r') as f:
env = yaml.safe_load(f)
return env.get(var, default)

8
utils/normalize_file.py Normal file
View File

@@ -0,0 +1,8 @@
import re
async def normalize_file(filename):
filename = re.sub(r'[<>:"|?*\s]', '_', filename)
#also shorten the filename if it's too long
if len(filename) > 30:
filename = filename[:27] + "___"
return filename

110
utils/openaicaller.py Normal file
View File

@@ -0,0 +1,110 @@
"""
This file provides a Python module that wraps the OpenAI API for making API calls.
The module includes:
- Functions for generating responses using chat-based models and handling API errors.
- Constants for chat and text models and their maximum token limits.
- Imports for required modules, including OpenAI and asyncio.
- A color formatting class, `bcolors`, for console output.
The main component is the `openai_caller` class with methods:
- `__init__(self, api_key=None)`: Initializes an instance of the class and sets the API key if provided.
- `set_api_key(self, key)`: Sets the API key for OpenAI.
- `generate_response(self, **kwargs)`: Asynchronously generates a response based on the provided arguments.
- `chat_generate(self, **kwargs)`: Asynchronously generates a chat-based response, handling token limits and API errors.
The module assumes the presence of `num_tokens_from_messages` function in a separate module called `utils.tokens`, used for token calculation.
Refer to function and method documentation for further details.
"""
import openai as openai_module
import asyncio
from openai.error import APIError, Timeout, RateLimitError, APIConnectionError, InvalidRequestError, AuthenticationError, ServiceUnavailableError
from utils.tokens import num_tokens_from_messages
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
chat_models = ["gpt-4", "gpt-4-32k", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"]
text_models = ["text-davinci-003", "text-davinci-002", "text-curie-001", "text-babbage-001", "text-ada-001"]
models_max_tokens = {
"gpt-4": 8_192,
"gpt-4-32k": 32_768,
"gpt-3.5-turbo": 4_096,
"gpt-3.5-turbo-16k": 16_384,
"text-davinci-003": 4_097,
"text-davinci-002": 4_097,
"text-curie-001": 2_049,
"text-babbage-001": 2_049,
"text-ada-001": 2_049,
}
class openai_caller:
def __init__(self, api_key=None) -> None:
pass
def set_api_key(self, key):
openai_module.api_key = key
async def generate_response(self, **kwargs):
if kwargs['model'] in chat_models:
return await self.chat_generate(**kwargs)
elif kwargs['model'] in text_models:
raise NotImplementedError("Text models are not supported yet")
else:
raise ValueError("Model not found")
async def chat_generate(self, **kwargs):
tokens = await num_tokens_from_messages(kwargs['messages'], kwargs['model'])
model_max_tokens = models_max_tokens[kwargs['model']]
while tokens > model_max_tokens:
kwargs['messages'] = kwargs['messages'][1:]
print(f"{bcolors.BOLD}{bcolors.WARNING}Warning: Too many tokens. Removing first message.{bcolors.ENDC}")
tokens = await num_tokens_from_messages(kwargs['messages'], kwargs['model'])
i = 0
response = None
while i < 10:
try:
response = await openai_module.ChatCompletion.acreate(**kwargs)
break
except APIError:
await asyncio.sleep(10)
i += 1
except Timeout:
await asyncio.sleep(10)
i += 1
except RateLimitError:
await asyncio.sleep(10)
i += 1
except APIConnectionError as e:
print(e)
print(f"\n\n{bcolors.BOLD}{bcolors.FAIL}APIConnectionError. There is an issue with your internet connection. Please check your connection.{bcolors.ENDC}")
raise e
except InvalidRequestError as e:
print(e)
print(f"\n\n{bcolors.BOLD}{bcolors.FAIL}InvalidRequestError. Please check your request.{bcolors.ENDC}")
raise e
except AuthenticationError as e:
print(e)
print(f"\n\n{bcolors.BOLD}{bcolors.FAIL}AuthenticationError. Please check your API key.{bcolors.ENDC}")
raise e
except ServiceUnavailableError:
await asyncio.sleep(10)
i += 1
finally:
if i == 10:
print(f"\n\n{bcolors.BOLD}{bcolors.FAIL}OpenAI API is not responding. Please try again later.{bcolors.ENDC}")
raise TimeoutError
return response
openai = openai_caller()

View File

@@ -7,6 +7,7 @@ import os
import random
import time
import json
import asyncio
import google.oauth2.credentials
import google_auth_oauthlib.flow
@@ -37,7 +38,7 @@ VALID_PRIVACY_STATUSES = ('public', 'private', 'unlisted')
# Authorize the request and store authorization credentials.
def get_authenticated_service(credentialsPath=""):
async def get_authenticated_service(credentialsPath=""):
CLIENT_SECRETS_FILE=f'{credentialsPath}/client_secret.json'
if os.path.exists(f'{credentialsPath}/credentials.json'):
with open(f'{credentialsPath}/credentials.json') as json_file:
@@ -59,7 +60,7 @@ def get_authenticated_service(credentialsPath=""):
return build(API_SERVICE_NAME, API_VERSION, credentials=credentials)
def initialize_upload(youtube, options):
async def initialize_upload(youtube, options):
tags = None
if options['keywords']:
tags = options['keywords'].split(',')
@@ -89,7 +90,7 @@ def initialize_upload(youtube, options):
return videoid
def resumable_upload(request):
async def resumable_upload(request):
response = None
error = None
retry = 0
@@ -122,9 +123,9 @@ def resumable_upload(request):
max_sleep = 2 ** retry
sleep_seconds = random.random() * max_sleep
print('Sleeping %f seconds and then retrying...' % sleep_seconds)
time.sleep(sleep_seconds)
await asyncio.sleep(sleep_seconds)
def upload_video(path, title, description, category, keywords, privacyStatus='private', credentials_path=""):
async def upload_video(path, title, description, category, keywords, privacyStatus='private', credentials_path=""):
options = {
'file': path +"/montage.mp4",
'title': title,
@@ -133,16 +134,17 @@ def upload_video(path, title, description, category, keywords, privacyStatus='pr
'keywords': keywords,
'privacyStatus': privacyStatus
}
youtube = get_authenticated_service(credentials_path)
youtube = await get_authenticated_service(credentials_path)
try:
videoid = initialize_upload(youtube, options)
videoid = await initialize_upload(youtube, options)
await upload_thumbnail(videoid, path + "/miniature.png", credentials_path)
return videoid
except HttpError as e:
print('An HTTP error %d occurred:\n%s' % (e.resp.status, e.content))
upload_thumbnail(videoid, path + "/miniature.png", credentials_path)
def upload_thumbnail(video_id, file, credentials_path=""):
async def upload_thumbnail(video_id, file, credentials_path=""):
youtube = get_authenticated_service(credentials_path)
youtube.thumbnails().set(
youtube.thumbnails().set( # type: ignore
videoId=video_id,
media_body=file
).execute()