Merge pull request #364 from s0md3v/utilities-and-start-refactoring
Utilities and start refactoring
This commit is contained in:
commit
80aec1cb3a
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@ -28,3 +28,5 @@ jobs:
|
||||
- run: pip install -r requirements.txt gdown
|
||||
- run: gdown 13QpWFWJ37EB-nHrEOY64CEtQWY-tz7DZ
|
||||
- run: python run.py -f=.github/examples/face.jpg -t=.github/examples/target.mp4 -o=.github/examples/output.mp4
|
||||
- run: ffmpeg -i .github/examples/snapshot.mp4 -i .github/examples/output.mp4 -filter_complex psnr -f null -
|
||||
|
||||
|
181
roop/core.py
181
roop/core.py
@ -2,47 +2,53 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
|
||||
# single thread doubles performance of gpu-mode - needs to be set before torch import
|
||||
if any(arg.startswith('--gpu-vendor') for arg in sys.argv):
|
||||
os.environ['OMP_NUM_THREADS'] = '1'
|
||||
import warnings
|
||||
from typing import List
|
||||
import platform
|
||||
import signal
|
||||
import shutil
|
||||
import glob
|
||||
import argparse
|
||||
import psutil
|
||||
import torch
|
||||
import tensorflow
|
||||
from pathlib import Path
|
||||
import multiprocessing as mp
|
||||
import multiprocessing
|
||||
from opennsfw2 import predict_video_frames, predict_image
|
||||
import cv2
|
||||
|
||||
import roop.globals
|
||||
from roop.swapper import process_video, process_img, process_faces, process_frames
|
||||
from roop.utils import is_img, detect_fps, set_fps, create_video, add_audio, extract_frames
|
||||
from roop.swapper import process_video, process_img, process_faces
|
||||
from roop.utilities import has_image_extention, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frames_paths, restore_audio, create_temp, move_temp, clean_temp
|
||||
from roop.analyser import get_face_single
|
||||
import roop.ui as ui
|
||||
|
||||
warnings.simplefilter(action='ignore', category=FutureWarning)
|
||||
|
||||
def handle_parse():
|
||||
global args
|
||||
signal.signal(signal.SIGINT, lambda signal_number, frame: quit())
|
||||
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-f', '--face', help='use this face', dest='source_target')
|
||||
parser.add_argument('-f', '--face', help='use this face', dest='source_path')
|
||||
parser.add_argument('-t', '--target', help='replace this face', dest='target_path')
|
||||
parser.add_argument('-o', '--output', help='save output to this file', dest='output_path')
|
||||
parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False)
|
||||
parser.add_argument('--keep-audio', help='maintain original audio', dest='keep_audio', action='store_true', default=True)
|
||||
parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False)
|
||||
parser.add_argument('--all-faces', help='swap all faces in frame', dest='all_faces', action='store_true', default=False)
|
||||
parser.add_argument('--max-memory', help='maximum amount of RAM in GB to be used', dest='max_memory', type=int)
|
||||
parser.add_argument('--cpu-cores', help='number of CPU cores to use', dest='cpu_cores', type=int, default=max(psutil.cpu_count() / 2, 1))
|
||||
parser.add_argument('--gpu-threads', help='number of threads to be use for the GPU', dest='gpu_threads', type=int, default=8)
|
||||
parser.add_argument('--gpu-vendor', help='choice your GPU vendor', dest='gpu_vendor', choices=['apple', 'amd', 'intel', 'nvidia'])
|
||||
parser.add_argument('--gpu-vendor', help='select your GPU vendor', dest='gpu_vendor', choices=['apple', 'amd', 'intel', 'nvidia'])
|
||||
|
||||
args = parser.parse_known_args()[0]
|
||||
|
||||
roop.globals.headless = args.source_target or args.target_path or args.output_path
|
||||
roop.globals.headless = args.source_path or args.target_path or args.output_path
|
||||
roop.globals.keep_fps = args.keep_fps
|
||||
roop.globals.keep_audio = args.keep_audio
|
||||
roop.globals.keep_frames = args.keep_frames
|
||||
roop.globals.all_faces = args.all_faces
|
||||
|
||||
if args.cpu_cores:
|
||||
@ -83,7 +89,7 @@ def limit_resources():
|
||||
|
||||
def pre_check():
|
||||
if sys.version_info < (3, 9):
|
||||
quit('Python version is not supported - please upgrade to 3.9 or higher')
|
||||
quit('Python version is not supported - please upgrade to 3.9 or higher.')
|
||||
if not shutil.which('ffmpeg'):
|
||||
quit('ffmpeg is not installed!')
|
||||
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx')
|
||||
@ -91,23 +97,23 @@ def pre_check():
|
||||
quit('File "inswapper_128.onnx" does not exist!')
|
||||
if roop.globals.gpu_vendor == 'apple':
|
||||
if 'CoreMLExecutionProvider' not in roop.globals.providers:
|
||||
quit("You are using --gpu=apple flag but CoreML isn't available or properly installed on your system.")
|
||||
quit('You are using --gpu=apple flag but CoreML is not available or properly installed on your system.')
|
||||
if roop.globals.gpu_vendor == 'amd':
|
||||
if 'ROCMExecutionProvider' not in roop.globals.providers:
|
||||
quit("You are using --gpu=amd flag but ROCM isn't available or properly installed on your system.")
|
||||
quit('You are using --gpu=amd flag but ROCM is not available or properly installed on your system.')
|
||||
if roop.globals.gpu_vendor == 'nvidia':
|
||||
CUDA_VERSION = torch.version.cuda
|
||||
CUDNN_VERSION = torch.backends.cudnn.version()
|
||||
if not torch.cuda.is_available():
|
||||
quit("You are using --gpu=nvidia flag but CUDA isn't available or properly installed on your system.")
|
||||
quit('You are using --gpu=nvidia flag but CUDA is not available or properly installed on your system.')
|
||||
if CUDA_VERSION > '11.8':
|
||||
quit(f"CUDA version {CUDA_VERSION} is not supported - please downgrade to 11.8")
|
||||
quit(f'CUDA version {CUDA_VERSION} is not supported - please downgrade to 11.8')
|
||||
if CUDA_VERSION < '11.4':
|
||||
quit(f"CUDA version {CUDA_VERSION} is not supported - please upgrade to 11.8")
|
||||
quit(f'CUDA version {CUDA_VERSION} is not supported - please upgrade to 11.8')
|
||||
if CUDNN_VERSION < 8220:
|
||||
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please upgrade to 8.9.1")
|
||||
quit(f'CUDNN version {CUDNN_VERSION} is not supported - please upgrade to 8.9.1')
|
||||
if CUDNN_VERSION > 8910:
|
||||
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please downgrade to 8.9.1")
|
||||
quit(f'CUDNN version {CUDNN_VERSION} is not supported - please downgrade to 8.9.1')
|
||||
|
||||
|
||||
def get_video_frame(video_path, frame_number = 1):
|
||||
@ -115,19 +121,18 @@ def get_video_frame(video_path, frame_number = 1):
|
||||
amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, min(amount_of_frames, frame_number-1))
|
||||
if not cap.isOpened():
|
||||
print("Error opening video file")
|
||||
status('Error opening video file')
|
||||
return
|
||||
ret, frame = cap.read()
|
||||
if ret:
|
||||
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
cap.release()
|
||||
|
||||
|
||||
def preview_video(video_path):
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
if not cap.isOpened():
|
||||
print("Error opening video file")
|
||||
status('Error opening video file')
|
||||
return 0
|
||||
amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
|
||||
ret, frame = cap.read()
|
||||
@ -138,86 +143,91 @@ def preview_video(video_path):
|
||||
return (amount_of_frames, frame)
|
||||
|
||||
|
||||
def status(string):
|
||||
value = "Status: " + string
|
||||
def status(message: str):
|
||||
value = 'Status: ' + message
|
||||
print(value)
|
||||
if not roop.globals.headless:
|
||||
ui.update_status_label(value)
|
||||
|
||||
|
||||
def process_video_multi_cores(source_target, frame_paths):
|
||||
n = len(frame_paths) // roop.globals.cpu_cores
|
||||
if n > 2:
|
||||
processes = []
|
||||
for i in range(0, len(frame_paths), n):
|
||||
p = POOL.apply_async(process_video, args=(source_target, frame_paths[i:i + n],))
|
||||
processes.append(p)
|
||||
for p in processes:
|
||||
p.get()
|
||||
POOL.close()
|
||||
def conditional_process_video(source_path: str, frame_paths: List[str]) -> None:
|
||||
pool_amount = len(frame_paths) // roop.globals.cpu_cores
|
||||
if pool_amount > 2 and roop.globals.cpu_cores > 1 and roop.globals.gpu_vendor is None:
|
||||
status('Pool-Swapping in progress...')
|
||||
global POOL
|
||||
POOL = multiprocessing.Pool(roop.globals.cpu_cores, maxtasksperchild=1)
|
||||
pools = []
|
||||
for i in range(0, len(frame_paths), pool_amount):
|
||||
pool = POOL.apply_async(process_video, args=(source_path, frame_paths[i:i + pool_amount]))
|
||||
pools.append(pool)
|
||||
for pool in pools:
|
||||
pool.get()
|
||||
POOL.join()
|
||||
POOL.close()
|
||||
else:
|
||||
status('Swapping in progress...')
|
||||
process_video(args.source_path, frame_paths)
|
||||
|
||||
|
||||
def start(preview_callback = None):
|
||||
if not args.source_target or not os.path.isfile(args.source_target):
|
||||
print("\n[WARNING] Please select an image containing a face.")
|
||||
def start(preview_callback = None) -> None:
|
||||
if not args.source_path or not os.path.isfile(args.source_path):
|
||||
status('Please select an image containing a face.')
|
||||
return
|
||||
elif not args.target_path or not os.path.isfile(args.target_path):
|
||||
print("\n[WARNING] Please select a video/image to swap face in.")
|
||||
status('Please select a video/image target!')
|
||||
return
|
||||
target_path = args.target_path
|
||||
test_face = get_face_single(cv2.imread(args.source_target))
|
||||
test_face = get_face_single(cv2.imread(args.source_path))
|
||||
if not test_face:
|
||||
print("\n[WARNING] No face detected in source image. Please try with another one.\n")
|
||||
status('No face detected in source image. Please try with another one!')
|
||||
return
|
||||
if is_img(target_path):
|
||||
if predict_image(target_path) > 0.85:
|
||||
quit()
|
||||
process_img(args.source_target, target_path, args.output_path)
|
||||
status("swap successful!")
|
||||
# process image to image
|
||||
if has_image_extention(args.target_path):
|
||||
if predict_image(args.target_path) > 0.85:
|
||||
destroy()
|
||||
process_img(args.source_path, args.target_path, args.output_path)
|
||||
if is_image(args.target_path):
|
||||
status('Swapping to image succeed!')
|
||||
else:
|
||||
status('Swapping to image failed!')
|
||||
return
|
||||
# process image to videos
|
||||
seconds, probabilities = predict_video_frames(video_path=args.target_path, frame_interval=100)
|
||||
if any(probability > 0.85 for probability in probabilities):
|
||||
quit()
|
||||
video_name_full = target_path.split(os.sep)[-1]
|
||||
video_name = os.path.splitext(video_name_full)[0]
|
||||
output_dir = os.path.dirname(target_path) + os.sep + video_name if os.path.dirname(target_path) else video_name
|
||||
Path(output_dir).mkdir(exist_ok=True)
|
||||
status("detecting video's FPS...")
|
||||
fps, exact_fps = detect_fps(target_path)
|
||||
if not args.keep_fps and fps > 30:
|
||||
this_path = output_dir + os.sep + video_name + ".mp4"
|
||||
set_fps(target_path, this_path, 30)
|
||||
target_path, exact_fps = this_path, 30
|
||||
else:
|
||||
shutil.copy(target_path, output_dir)
|
||||
status("extracting frames...")
|
||||
extract_frames(target_path, output_dir)
|
||||
args.frame_paths = tuple(sorted(
|
||||
glob.glob(output_dir + "/*.png"),
|
||||
key=lambda x: int(x.split(os.sep)[-1].replace(".png", ""))
|
||||
))
|
||||
status("swapping in progress...")
|
||||
if roop.globals.gpu_vendor is None and roop.globals.cpu_cores > 1:
|
||||
global POOL
|
||||
POOL = mp.Pool(roop.globals.cpu_cores)
|
||||
process_video_multi_cores(args.source_target, args.frame_paths)
|
||||
else:
|
||||
process_video(args.source_target, args.frame_paths)
|
||||
# prevent out of memory while using ffmpeg with cuda
|
||||
destroy()
|
||||
status('Creating temp resources...')
|
||||
create_temp(args.target_path)
|
||||
status('Extracting frames...')
|
||||
extract_frames(args.target_path)
|
||||
frame_paths = get_temp_frames_paths(args.target_path)
|
||||
conditional_process_video(args.source_path, frame_paths)
|
||||
# prevent memory leak using ffmpeg with cuda
|
||||
if args.gpu_vendor == 'nvidia':
|
||||
torch.cuda.empty_cache()
|
||||
status("creating video...")
|
||||
create_video(video_name, exact_fps, output_dir)
|
||||
status("adding audio...")
|
||||
add_audio(output_dir, target_path, video_name_full, args.keep_frames, args.output_path)
|
||||
save_path = args.output_path if args.output_path else output_dir + os.sep + video_name + ".mp4"
|
||||
print("\n\nVideo saved as:", save_path, "\n\n")
|
||||
status("swap successful!")
|
||||
if roop.globals.keep_fps:
|
||||
status('Detecting fps...')
|
||||
fps = detect_fps(args.source_path)
|
||||
status(f'Creating video with {fps} fps...')
|
||||
create_video(args.target_path, fps)
|
||||
else:
|
||||
status('Creating video with 30 fps...')
|
||||
create_video(args.target_path, 30)
|
||||
if roop.globals.keep_audio:
|
||||
if roop.globals.keep_fps:
|
||||
status('Restoring audio...')
|
||||
else:
|
||||
status('Restoring audio might cause issues as fps are not kept...')
|
||||
restore_audio(args.target_path, args.output_path)
|
||||
else:
|
||||
move_temp(args.target_path, args.output_path)
|
||||
clean_temp(args.target_path)
|
||||
if is_video(args.target_path):
|
||||
status('Swapping to video succeed!')
|
||||
else:
|
||||
status('Swapping to video failed!')
|
||||
|
||||
|
||||
def select_face_handler(path: str):
|
||||
args.source_target = path
|
||||
args.source_path = path
|
||||
|
||||
|
||||
def select_target_handler(path: str):
|
||||
@ -243,22 +253,27 @@ def save_file_handler(path: str):
|
||||
|
||||
def create_test_preview(frame_number):
|
||||
return process_faces(
|
||||
get_face_single(cv2.imread(args.source_target)),
|
||||
get_face_single(cv2.imread(args.source_path)),
|
||||
get_video_frame(args.target_path, frame_number)
|
||||
)
|
||||
|
||||
|
||||
def run():
|
||||
def destroy() -> None:
|
||||
clean_temp(args.target_path)
|
||||
quit()
|
||||
|
||||
|
||||
def run() -> None:
|
||||
global all_faces, keep_frames, limit_fps
|
||||
handle_parse()
|
||||
pre_check()
|
||||
limit_resources()
|
||||
if roop.globals.headless:
|
||||
start()
|
||||
quit()
|
||||
else:
|
||||
window = ui.init(
|
||||
{
|
||||
'all_faces': roop.globals.all_faces,
|
||||
'all_faces': args.all_faces,
|
||||
'keep_fps': args.keep_fps,
|
||||
'keep_frames': args.keep_frames
|
||||
},
|
||||
|
@ -1,11 +1,14 @@
|
||||
import onnxruntime
|
||||
|
||||
keep_fps = None
|
||||
keep_audio = None
|
||||
keep_frames = None
|
||||
all_faces = None
|
||||
log_level = 'error'
|
||||
cpu_cores = None
|
||||
gpu_threads = None
|
||||
gpu_vendor = None
|
||||
headless = None
|
||||
log_level = 'error'
|
||||
providers = onnxruntime.get_available_providers()
|
||||
|
||||
if 'TensorrtExecutionProvider' in providers:
|
||||
|
@ -58,9 +58,8 @@ def multi_process_frame(source_img, frame_paths, progress):
|
||||
num_threads = roop.globals.gpu_threads
|
||||
num_frames_per_thread = len(frame_paths) // num_threads
|
||||
remaining_frames = len(frame_paths) % num_threads
|
||||
|
||||
# create thread and launch
|
||||
start_index = 0
|
||||
# create threads by frames
|
||||
for _ in range(num_threads):
|
||||
end_index = start_index + num_frames_per_thread
|
||||
if remaining_frames > 0:
|
||||
@ -71,8 +70,7 @@ def multi_process_frame(source_img, frame_paths, progress):
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
start_index = end_index
|
||||
|
||||
# threading
|
||||
# join threads
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
@ -83,13 +81,13 @@ def process_img(source_img, target_path, output_file):
|
||||
source_face = get_face_single(cv2.imread(source_img))
|
||||
result = get_face_swapper().get(frame, face, source_face, paste_back=True)
|
||||
cv2.imwrite(output_file, result)
|
||||
print("\n\nImage saved as:", output_file, "\n\n")
|
||||
|
||||
|
||||
def process_video(source_img, frame_paths):
|
||||
do_multi = roop.globals.gpu_vendor is not None and roop.globals.gpu_threads > 1
|
||||
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
|
||||
with tqdm(total=len(frame_paths), desc="Processing", unit="frame", dynamic_ncols=True, bar_format=progress_bar_format) as progress:
|
||||
total = len(frame_paths)
|
||||
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
|
||||
if do_multi:
|
||||
multi_process_frame(source_img, frame_paths, progress)
|
||||
else:
|
||||
|
@ -6,7 +6,7 @@ from tkinter import filedialog
|
||||
from tkinter.filedialog import asksaveasfilename
|
||||
import threading
|
||||
|
||||
from roop.utils import is_img
|
||||
from roop.utilities import is_image
|
||||
|
||||
max_preview_size = 800
|
||||
|
||||
@ -114,7 +114,7 @@ def select_target(select_target_handler: Callable[[str], Tuple[int, Any]], targe
|
||||
def save_file(save_file_handler: Callable[[str], None], target_path: str):
|
||||
filename, ext = 'output.mp4', '.mp4'
|
||||
|
||||
if is_img(target_path):
|
||||
if is_image(target_path):
|
||||
filename, ext = 'output.png', '.png'
|
||||
|
||||
if save_file_handler:
|
||||
|
93
roop/utilities.py
Normal file
93
roop/utilities.py
Normal file
@ -0,0 +1,93 @@
|
||||
import glob
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import List, Any
|
||||
|
||||
import roop.globals
|
||||
from PIL import Image
|
||||
|
||||
|
||||
def run_ffmpeg(args: List) -> None:
|
||||
commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', roop.globals.log_level]
|
||||
commands.extend(args)
|
||||
try:
|
||||
subprocess.check_output(commands, stderr=subprocess.STDOUT)
|
||||
except Exception as exception:
|
||||
pass
|
||||
|
||||
|
||||
def detect_fps(source_path: str) -> int:
|
||||
command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', source_path]
|
||||
output = subprocess.check_output(command).decode().strip()
|
||||
try:
|
||||
return int(eval(output))
|
||||
except Exception:
|
||||
pass
|
||||
return 30
|
||||
|
||||
|
||||
def extract_frames(target_path: str) -> None:
|
||||
run_ffmpeg(['-i', target_path, get_temp_directory_path(target_path) + os.sep + '%04d.png'])
|
||||
|
||||
|
||||
def create_video(target_path: str, fps: int) -> None:
|
||||
run_ffmpeg(['-i', get_temp_directory_path(target_path) + os.sep + '%04d.png', '-framerate', str(fps), '-c:v', 'libx264', '-crf', '7', '-pix_fmt', 'yuv420p', '-y', get_temp_file_path(target_path)])
|
||||
|
||||
|
||||
def restore_audio(target_path: str, output_path: str) -> None:
|
||||
run_ffmpeg(['-i', get_temp_file_path(target_path), '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
|
||||
if not os.path.isfile(output_path):
|
||||
move_temp(target_path, output_path)
|
||||
|
||||
|
||||
def get_temp_frames_paths(target_path: str) -> List:
|
||||
return glob.glob(get_temp_directory_path(target_path) + os.sep + '*.png')
|
||||
|
||||
|
||||
def get_temp_directory_path(target_path: str) -> str:
|
||||
return os.path.dirname(target_path) + os.sep + 'temp'
|
||||
|
||||
|
||||
def get_temp_file_path(target_path: str) -> str:
|
||||
return get_temp_directory_path(target_path) + os.sep + 'temp.mp4'
|
||||
|
||||
|
||||
def create_temp(target_path: str) -> None:
|
||||
Path(get_temp_directory_path(target_path)).mkdir(exist_ok=True)
|
||||
|
||||
|
||||
def move_temp(target_path: str, output_path: str) -> None:
|
||||
temp_file_path = get_temp_file_path(target_path)
|
||||
if os.path.isfile(temp_file_path):
|
||||
shutil.move(temp_file_path, output_path)
|
||||
|
||||
|
||||
def clean_temp(target_path: str) -> None:
|
||||
if not roop.globals.keep_frames:
|
||||
shutil.rmtree(get_temp_directory_path(target_path))
|
||||
|
||||
|
||||
def has_image_extention(image_path: str) -> bool:
|
||||
return image_path.lower().endswith(('png', 'jpg', 'jpeg', 'bmp'))
|
||||
|
||||
|
||||
def is_image(path: str) -> bool:
|
||||
if os.path.isfile(path):
|
||||
try:
|
||||
image = Image.open(path)
|
||||
image.verify()
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def is_video(path: str) -> bool:
|
||||
try:
|
||||
run_ffmpeg(['-v', 'error', '-i', path, '-f', 'null', '-'])
|
||||
return True
|
||||
except subprocess.CalledProcessError:
|
||||
pass
|
||||
return False
|
@ -1,49 +0,0 @@
|
||||
import os
|
||||
import shutil
|
||||
import roop.globals
|
||||
|
||||
|
||||
def run_command(command, mode="silent"):
|
||||
if mode == "debug":
|
||||
return os.system(command)
|
||||
return os.popen(command).read()
|
||||
|
||||
|
||||
def detect_fps(input_path):
|
||||
output = os.popen(f'ffprobe -v error -select_streams v -of default=noprint_wrappers=1:nokey=1 -show_entries stream=r_frame_rate "{input_path}"').read()
|
||||
if "/" in output:
|
||||
try:
|
||||
return int(output.split("/")[0]) // int(output.split("/")[1].strip()), output.strip()
|
||||
except:
|
||||
pass
|
||||
return 30, 30
|
||||
|
||||
|
||||
def run_ffmpeg(args):
|
||||
run_command(f'ffmpeg -hide_banner -hwaccel auto -loglevel {roop.globals.log_level} {args}')
|
||||
|
||||
|
||||
def set_fps(input_path, output_path, fps):
|
||||
run_ffmpeg(f'-i "{input_path}" -filter:v fps=fps={fps} "{output_path}"')
|
||||
|
||||
|
||||
def create_video(video_name, fps, output_dir):
|
||||
run_ffmpeg(f'-framerate "{fps}" -i "{output_dir}{os.sep}%04d.png" -c:v libx264 -crf 7 -pix_fmt yuv420p -y "{output_dir}{os.sep}output.mp4"')
|
||||
|
||||
|
||||
def extract_frames(input_path, output_dir):
|
||||
run_ffmpeg(f'-i "{input_path}" "{output_dir}{os.sep}%04d.png"')
|
||||
|
||||
|
||||
def add_audio(output_dir, target_path, video, keep_frames, output_file):
|
||||
video_name = os.path.splitext(video)[0]
|
||||
save_to = output_file if output_file else output_dir + "/swapped-" + video_name + ".mp4"
|
||||
run_ffmpeg(f'-i "{output_dir}{os.sep}output.mp4" -i "{output_dir}{os.sep}{video}" -c:v copy -map 0:v:0 -map 1:a:0 -y "{save_to}"')
|
||||
if not os.path.isfile(save_to):
|
||||
shutil.move(output_dir + "/output.mp4", save_to)
|
||||
if not keep_frames:
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
|
||||
def is_img(path):
|
||||
return path.lower().endswith(("png", "jpg", "jpeg", "bmp"))
|
Loading…
Reference in New Issue
Block a user