From 7fbd295c876b1f7de29ae354e8b31e7dcc1e7844 Mon Sep 17 00:00:00 2001 From: nin0dev Date: Sat, 22 Jun 2024 15:26:47 -0400 Subject: [PATCH 1/3] Fixed f string error --- videobundler/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/videobundler/main.py b/videobundler/main.py index a8d662f2..96a22327 100644 --- a/videobundler/main.py +++ b/videobundler/main.py @@ -44,8 +44,8 @@ def get_merged_video(): audio_itag = request.args.get("audio_itag") video_itag = request.args.get("video_itag") # Download both audio and video - subprocess.run(["wget", f"-O{job_id}.m4a", f"{os.getenv("PROXY_URL")}/latest_version?id={video_id}&itag={audio_itag}&local=true"], check=True) - subprocess.run(["wget", f"-O{job_id}.mp4", f"{os.getenv("PROXY_URL")}/latest_version?id={video_id}&itag={video_itag}&local=true"], check=True) + subprocess.run(["wget", f"-O{job_id}.m4a", f"{os.getenv('PROXY_URL')}/latest_version?id={video_id}&itag={audio_itag}&local=true"], check=True) + subprocess.run(["wget", f"-O{job_id}.mp4", f"{os.getenv('PROXY_URL')}/latest_version?id={video_id}&itag={video_itag}&local=true"], check=True) # Merge both files subprocess.run(f"ffmpeg -i {pwd}/{job_id}.m4a -i {pwd}/{job_id}.mp4 -c copy {pwd}/output.{job_id}.mp4", shell=True, check=True) thread = Thread(target=autodelete, args = (job_id, )) @@ -61,4 +61,4 @@ def get_merged_video(): if __name__ == "__main__": from waitress import serve - serve(app, host="0.0.0.0", port=os.getenv("PORT")) \ No newline at end of file + serve(app, host="0.0.0.0", port=os.getenv("PORT")) From bbff542938d171e0f69eb8c7d6f93caa3d334bd6 Mon Sep 17 00:00:00 2001 From: nin0dev Date: Sat, 22 Jun 2024 18:02:30 -0400 Subject: [PATCH 2/3] Added concurrency --- videobundler/.env.example | 1 + videobundler/README.md | 4 +- videobundler/main.py | 130 +++++++++++++++++++++++++++----------- 3 files changed, 98 insertions(+), 37 deletions(-) diff --git a/videobundler/.env.example b/videobundler/.env.example index e226afe4..9a377018 100644 --- a/videobundler/.env.example +++ b/videobundler/.env.example @@ -1,4 +1,5 @@ TIME_BEFORE_DELETE=30 +INACTIVE_TIME_BEFORE_DELETE=3600 PORT=45872 # DO NOT PUT A / AT THE END OF THE URL PROXY_URL=https://eu-proxy.poketube.fun \ No newline at end of file diff --git a/videobundler/README.md b/videobundler/README.md index c1ea44a4..3b317448 100644 --- a/videobundler/README.md +++ b/videobundler/README.md @@ -22,4 +22,6 @@ Takes 2 input streams, downloads them, and spits out a combined file. ## Endpoints - `/`: Will return `{success:true}` if alive. -- `/get_merged_video?id=VIDEO_ID&audio_itag=AUDIO_ITAG&video_itag=VIDEO_ITAG`: Returns a merged video. ID is the youtube video ID, and itags are self explanatory. +- `/merge?id=VIDEO_ID&audio_itag=AUDIO_ITAG&video_itag=VIDEO_ITAG`: Starts the merging process. ID is the youtube video ID, and itags are self explanatory. As a response, you will get a job ID that you will be able to use in future requests to query the video or its status. When this process is finished, the inactive autodelete counter will start, which will allow you to fetch the video until the countdown is over. +- `/get?id=JOB_ID`: Queries a merged video and sends it to you. If the video is successfully and fully merged you will get a 200 response with a video. However, if it isn't finished, you will get a `success: false` 404 response. If the video indeed exists and is sent to you, the get autodelete counter will start, which will allow you to fetch it until this countdown is over. +- `/check?id=JOB_ID`: Queries a merged video's status. If the video is successfully and fully merged you will get a 200 response with `success:true`. However, if it isn't finished, you will get a `success: false` 404 response. Useful if you want to poll the status without triggering the get autodelete counter. \ No newline at end of file diff --git a/videobundler/main.py b/videobundler/main.py index 96a22327..44c2105f 100644 --- a/videobundler/main.py +++ b/videobundler/main.py @@ -17,48 +17,106 @@ load_dotenv() app = Flask(__name__) def autodelete(job_id: str): - sleep(os.getenv("TIME_BEFORE_DELETE")) - os.remove(f"{job_id}.mp4") - os.remove(f"{job_id}.m4a") - os.remove(f"output.{job_id}.mp4") + f = open(f"pendingDelete.{job_id}", "a") + f.write(":3") + f.close() + sleep(int(os.getenv("TIME_BEFORE_DELETE"))) + try: + os.remove(f"done.{job_id}") + os.remove(f"{job_id}.mp4") + os.remove(f"{job_id}.m4a") + os.remove(f"output.{job_id}.mp4") + os.remove(f"pendingDelete.{job_id}") + except Exception: + _ = 0 + +def inactive_autodelete(job_id: str): + pwd = os.getcwd() + sleep(int(os.getenv("INACTIVE_TIME_BEFORE_DELETE"))) + if not os.path.isfile(f"{pwd}/done.{job_id}"): + return + try: + os.remove(f"done.{job_id}") + os.remove(f"{job_id}.mp4") + os.remove(f"{job_id}.m4a") + os.remove(f"output.{job_id}.mp4") + os.remove(f"pendingDelete.{job_id}") + except Exception: + _ = 0 def get_random_string(length): - # choose from all lowercase letter - letters = string.ascii_lowercase - result_str = "".join(random.choice(letters) for i in range(length)) - return result_str + # choose from all lowercase letter + letters = string.ascii_lowercase + result_str = "".join(random.choice(letters) for i in range(length)) + return result_str + +def merge_video(job_id: str, video_id: str, audio_itag: str, video_itag: str): + pwd = os.getcwd() + # Download both audio and video + subprocess.run(["wget", f"-O{job_id}.m4a", f"{os.getenv("PROXY_URL")}/latest_version?id={video_id}&itag={audio_itag}&local=true"], check=True) + subprocess.run(["wget", f"-O{job_id}.mp4", f"{os.getenv("PROXY_URL")}/latest_version?id={video_id}&itag={video_itag}&local=true"], check=True) + # Merge both files + subprocess.run(f"ffmpeg -i {pwd}/{job_id}.m4a -i {pwd}/{job_id}.mp4 -c copy {pwd}/output.{job_id}.mp4", shell=True, check=True) + f = open(f"done.{job_id}", "a") + f.write(":3") + f.close() + thread = Thread(target=inactive_autodelete, args = (job_id, )) + thread.start() @app.route("/") def ping(): - return json.loads(""" - { - "success": true - } - """) + return json.loads(""" + { + "success": true + } + """) -@app.route("/get_merged_video") -def get_merged_video(): - pwd = os.getcwd() - video_id = request.args.get("id") - job_id = get_random_string(10) - audio_itag = request.args.get("audio_itag") - video_itag = request.args.get("video_itag") - # Download both audio and video - subprocess.run(["wget", f"-O{job_id}.m4a", f"{os.getenv('PROXY_URL')}/latest_version?id={video_id}&itag={audio_itag}&local=true"], check=True) - subprocess.run(["wget", f"-O{job_id}.mp4", f"{os.getenv('PROXY_URL')}/latest_version?id={video_id}&itag={video_itag}&local=true"], check=True) - # Merge both files - subprocess.run(f"ffmpeg -i {pwd}/{job_id}.m4a -i {pwd}/{job_id}.mp4 -c copy {pwd}/output.{job_id}.mp4", shell=True, check=True) - thread = Thread(target=autodelete, args = (job_id, )) - thread.start() - with open(f"output.{job_id}.mp4", "rb") as bytes: - return send_file( - io.BytesIO(bytes.read()), - mimetype="video/mp4", - download_name=f"output.{job_id}.mp4", - as_attachment=True - ) +@app.route("/merge") +def merge(): + job_id = get_random_string(10) + thread = Thread(target=merge_video, args = (job_id, request.args.get("id"), request.args.get("audio_itag"), request.args.get("video_itag"))) + thread.start() + return json.loads('{"success":true,"job_id":"' + job_id + '"}') + +@app.route("/get") +def get(): + pwd = os.getcwd() + job_id = request.args.get("job_id") + if os.path.isfile(f"{pwd}/done.{job_id}"): + if not os.path.isfile(f"{pwd}/pendingDelete.{job_id}"): + thread = Thread(target=autodelete, args = (job_id, )) + thread.start() + with open(f"output.{job_id}.mp4", "rb") as bytes: + return send_file( + io.BytesIO(bytes.read()), + mimetype="video/mp4", + download_name=f"output.{job_id}.mp4", + as_attachment=True + ) + return json.loads('{"success":false}'), 404 + +@app.route("/check") +def check(): + pwd = os.getcwd() + job_id = request.args.get("job_id") + if os.path.isfile(f"{pwd}/done.{job_id}"): + return json.loads('{"success":true}') + return json.loads('{"success":false}'), 404 if __name__ == "__main__": - from waitress import serve - serve(app, host="0.0.0.0", port=os.getenv("PORT")) + from waitress import serve + serve(app, host="0.0.0.0", port=os.getenv("PORT")) + +#with open(f"output.{job_id}.mp4", "rb") as bytes: + #return send_file( + # io.BytesIO(bytes.read()), + # mimetype="video/mp4", + # download_name=f"output.{job_id}.mp4", + # as_attachment=True + # ) +# +# +# +# +# From 70243b5b76d5d0281f5459b9cfddd636c8cd1ef0 Mon Sep 17 00:00:00 2001 From: nin0dev Date: Sun, 23 Jun 2024 12:21:01 -0400 Subject: [PATCH 3/3] Updated videobundler to add concurrency and caching --- videobundler/README.md | 5 +- videobundler/main.py | 159 ++++++++++++++--------------------------- 2 files changed, 56 insertions(+), 108 deletions(-) diff --git a/videobundler/README.md b/videobundler/README.md index 3b317448..54d24caf 100644 --- a/videobundler/README.md +++ b/videobundler/README.md @@ -22,6 +22,5 @@ Takes 2 input streams, downloads them, and spits out a combined file. ## Endpoints - `/`: Will return `{success:true}` if alive. -- `/merge?id=VIDEO_ID&audio_itag=AUDIO_ITAG&video_itag=VIDEO_ITAG`: Starts the merging process. ID is the youtube video ID, and itags are self explanatory. As a response, you will get a job ID that you will be able to use in future requests to query the video or its status. When this process is finished, the inactive autodelete counter will start, which will allow you to fetch the video until the countdown is over. -- `/get?id=JOB_ID`: Queries a merged video and sends it to you. If the video is successfully and fully merged you will get a 200 response with a video. However, if it isn't finished, you will get a `success: false` 404 response. If the video indeed exists and is sent to you, the get autodelete counter will start, which will allow you to fetch it until this countdown is over. -- `/check?id=JOB_ID`: Queries a merged video's status. If the video is successfully and fully merged you will get a 200 response with `success:true`. However, if it isn't finished, you will get a `success: false` 404 response. Useful if you want to poll the status without triggering the get autodelete counter. \ No newline at end of file +- `/[ANYTHING]?id=VIDEO_ID&audio_itag=AUDIO_ITAG&video_itag=VIDEO_ITAG`: Starts the merging process. ID is the youtube video ID, and itags are self explanatory. As a response, you will get a job ID that you will be able to use in future requests to query the video or its status. When this process is finished, the inactive autodelete counter will start, which will allow you to fetch the video until the countdown is over. +> Replace `[ANYTHING]` with absolutely anything, however it has to be unique to the request. Preferably use an UUID diff --git a/videobundler/main.py b/videobundler/main.py index 44c2105f..236c1c1b 100644 --- a/videobundler/main.py +++ b/videobundler/main.py @@ -1,48 +1,13 @@ -from datetime import datetime -from dotenv import load_dotenv -from flask import Flask, request, Response, send_file -from threading import Thread -from time import sleep - -import io -import json +import asyncio +import aiohttp +from aiohttp import web +import string import os import random -import string import subprocess -import uuid -load_dotenv() - -app = Flask(__name__) - -def autodelete(job_id: str): - f = open(f"pendingDelete.{job_id}", "a") - f.write(":3") - f.close() - sleep(int(os.getenv("TIME_BEFORE_DELETE"))) - try: - os.remove(f"done.{job_id}") - os.remove(f"{job_id}.mp4") - os.remove(f"{job_id}.m4a") - os.remove(f"output.{job_id}.mp4") - os.remove(f"pendingDelete.{job_id}") - except Exception: - _ = 0 - -def inactive_autodelete(job_id: str): - pwd = os.getcwd() - sleep(int(os.getenv("INACTIVE_TIME_BEFORE_DELETE"))) - if not os.path.isfile(f"{pwd}/done.{job_id}"): - return - try: - os.remove(f"done.{job_id}") - os.remove(f"{job_id}.mp4") - os.remove(f"{job_id}.m4a") - os.remove(f"output.{job_id}.mp4") - os.remove(f"pendingDelete.{job_id}") - except Exception: - _ = 0 +app = web.Application() +app.router._frozen = False def get_random_string(length): # choose from all lowercase letter @@ -50,73 +15,57 @@ def get_random_string(length): result_str = "".join(random.choice(letters) for i in range(length)) return result_str -def merge_video(job_id: str, video_id: str, audio_itag: str, video_itag: str): - pwd = os.getcwd() - # Download both audio and video - subprocess.run(["wget", f"-O{job_id}.m4a", f"{os.getenv("PROXY_URL")}/latest_version?id={video_id}&itag={audio_itag}&local=true"], check=True) - subprocess.run(["wget", f"-O{job_id}.mp4", f"{os.getenv("PROXY_URL")}/latest_version?id={video_id}&itag={video_itag}&local=true"], check=True) - # Merge both files - subprocess.run(f"ffmpeg -i {pwd}/{job_id}.m4a -i {pwd}/{job_id}.mp4 -c copy {pwd}/output.{job_id}.mp4", shell=True, check=True) +async def merge(request): + # register params + try: + job_id = request.rel_url.query["id"] + video_id: str = request.rel_url.query["id"] + audio_itag: str = request.rel_url.query["audio_itag"] + video_itag: str = request.rel_url.query["video_itag"] + except: + # no one gives a fuck + _ = 0 + # validate + if " " in video_id or len(video_id) > 11: + print(f"Video {video_id} flagged as invalid, dropping request") + return + if not audio_itag.isdigit(): + print(f"Audio itag {audio_itag} flagged as invalid, dropping request") + return + if not video_itag.isdigit(): + print(f"Video itag {video_itag} flagged as invalid, dropping request") + return + if os.path.isfile(f"done.{job_id}"): + return web.FileResponse( + path=f"output.{job_id}.mp4" + ) + proc_audio = await asyncio.create_subprocess_shell( + f"wget -O{job_id}.m4a \"https://eu-proxy.poketube.fun/latest_version?id={video_id}&itag={audio_itag}&local=true\"", + ) + proc_video = await asyncio.create_subprocess_shell( + f"wget -O{job_id}.mp4 \"https://eu-proxy.poketube.fun/latest_version?id={video_id}&itag={video_itag}&local=true\"" + ) + await asyncio.gather(proc_audio.wait(), proc_video.wait()) + proc_ffmpeg = await asyncio.create_subprocess_shell( + f"ffmpeg -i {job_id}.m4a -i {job_id}.mp4 -c copy output.{job_id}.mp4" + ) + await proc_ffmpeg.wait() f = open(f"done.{job_id}", "a") f.write(":3") f.close() - thread = Thread(target=inactive_autodelete, args = (job_id, )) - thread.start() + return web.FileResponse( + path=f"output.{job_id}.mp4" + ) -@app.route("/") -def ping(): - return json.loads(""" - { - "success": true - } - """) +async def ping(request): + return web.Response(body='{"success": true}', content_type="application/json") -@app.route("/merge") -def merge(): - job_id = get_random_string(10) - thread = Thread(target=merge_video, args = (job_id, request.args.get("id"), request.args.get("audio_itag"), request.args.get("video_itag"))) - thread.start() - return json.loads('{"success":true,"job_id":"' + job_id + '"}') +async def init_app(): + app.router.add_get("/{id:.+}", merge) + app.router.add_get("/", ping) + return app -@app.route("/get") -def get(): - pwd = os.getcwd() - job_id = request.args.get("job_id") - if os.path.isfile(f"{pwd}/done.{job_id}"): - if not os.path.isfile(f"{pwd}/pendingDelete.{job_id}"): - thread = Thread(target=autodelete, args = (job_id, )) - thread.start() - with open(f"output.{job_id}.mp4", "rb") as bytes: - return send_file( - io.BytesIO(bytes.read()), - mimetype="video/mp4", - download_name=f"output.{job_id}.mp4", - as_attachment=True - ) - return json.loads('{"success":false}'), 404 - -@app.route("/check") -def check(): - pwd = os.getcwd() - job_id = request.args.get("job_id") - if os.path.isfile(f"{pwd}/done.{job_id}"): - return json.loads('{"success":true}') - return json.loads('{"success":false}'), 404 - - -if __name__ == "__main__": - from waitress import serve - serve(app, host="0.0.0.0", port=os.getenv("PORT")) - -#with open(f"output.{job_id}.mp4", "rb") as bytes: - #return send_file( - # io.BytesIO(bytes.read()), - # mimetype="video/mp4", - # download_name=f"output.{job_id}.mp4", - # as_attachment=True - # ) -# -# -# -# -# +if __name__ == '__main__': + loop = asyncio.get_event_loop() + app = loop.run_until_complete(init_app()) + web.run_app(app, port=3030) \ No newline at end of file