Merge remote-tracking branch 'origin/next' into nsfw

# Conflicts:
#	roop/core.py
This commit is contained in:
2023-06-09 14:00:00 +03:00
11 changed files with 563 additions and 610 deletions

View File

@@ -27,6 +27,6 @@ jobs:
python-version: 3.9 python-version: 3.9
- run: pip install -r requirements.txt gdown - run: pip install -r requirements.txt gdown
- run: gdown 13QpWFWJ37EB-nHrEOY64CEtQWY-tz7DZ - run: gdown 13QpWFWJ37EB-nHrEOY64CEtQWY-tz7DZ
- run: ./run.py -f=.github/examples/face.jpg -t=.github/examples/target.mp4 -o=.github/examples/output.mp4 - run: python run.py -f=.github/examples/face.jpg -t=.github/examples/target.mp4 -o=.github/examples/output.mp4
- run: ffmpeg -i .github/examples/snapshot.mp4 -i .github/examples/output.mp4 -filter_complex "psnr" -f null - - run: ffmpeg -i .github/examples/snapshot.mp4 -i .github/examples/output.mp4 -filter_complex psnr -f null -

2
.gitignore vendored
View File

@@ -1,3 +1,3 @@
.idea .idea
temp
__pycache__ __pycache__
*.onnx

View File

@@ -34,23 +34,28 @@ Additional command line arguments are given below:
``` ```
options: options:
-h, --help show this help message and exit -h, --help show this help message and exit
-f SOURCE_IMG, --face SOURCE_IMG -f SOURCE_PATH, --face SOURCE_PATH
use this face use a face image
-t TARGET_PATH, --target TARGET_PATH -t TARGET_PATH, --target TARGET_PATH
replace this face replace image or video with face
-o OUTPUT_FILE, --output OUTPUT_FILE -o OUTPUT_PATH, --output OUTPUT_PATH
save output to this file save output to this file
--keep-fps maintain original fps --keep-fps maintain original fps
--keep-audio maintain original audio
--keep-frames keep frames directory --keep-frames keep frames directory
--all-faces swap all faces in frame --many-faces swap every face in the frame
--video-encoder VIDEO_ENCODER
adjust output video encoder
--video-quality VIDEO_QUALITY
adjust output video quality
--max-memory MAX_MEMORY --max-memory MAX_MEMORY
maximum amount of RAM in GB to be used maximum amount of RAM in GB to be used
--cpu-cores CPU_CORES --cpu-cores CPU_CORES
number of CPU cores to use number of CPU cores to use
--gpu-threads GPU_THREADS --gpu-threads GPU_THREADS
number of threads to be use for the GPU number of threads to be use for the GPU
--gpu-vendor {apple,amd,intel,nvidia} --gpu-vendor {apple,amd,nvidia}
choice your GPU vendor select your GPU vendor
``` ```
Looking for a CLI mode? Using the -f/--face argument will make the program in cli mode. Looking for a CLI mode? Using the -f/--face argument will make the program in cli mode.

View File

@@ -1,10 +1,11 @@
from typing import Any
import insightface import insightface
import roop.globals import roop.globals
FACE_ANALYSER = None FACE_ANALYSER = None
def get_face_analyser(): def get_face_analyser() -> Any:
global FACE_ANALYSER global FACE_ANALYSER
if FACE_ANALYSER is None: if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=roop.globals.providers) FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=roop.globals.providers)
@@ -12,16 +13,16 @@ def get_face_analyser():
return FACE_ANALYSER return FACE_ANALYSER
def get_face_single(img_data): def get_one_face(image_data) -> Any:
face = get_face_analyser().get(img_data) face = get_face_analyser().get(image_data)
try: try:
return sorted(face, key=lambda x: x.bbox[0])[0] return min(face, key=lambda x: x.bbox[0])
except IndexError: except ValueError:
return None return None
def get_face_many(img_data): def get_many_faces(image_data) -> Any:
try: try:
return get_face_analyser().get(img_data) return get_face_analyser().get(image_data)
except IndexError: except IndexError:
return None return None

12
roop/capturer.py Normal file
View File

@@ -0,0 +1,12 @@
import cv2
def get_video_frame(video_path: str, frame_number: int = 1):
capture = cv2.VideoCapture(video_path)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_frame, frame = capture.read()
capture.release()
if has_frame:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return None

View File

@@ -5,75 +5,100 @@ import sys
# single thread doubles performance of gpu-mode - needs to be set before torch import # single thread doubles performance of gpu-mode - needs to be set before torch import
if any(arg.startswith('--gpu-vendor') for arg in sys.argv): if any(arg.startswith('--gpu-vendor') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1' os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
from typing import List
import platform import platform
import signal import signal
import shutil import shutil
import glob
import argparse import argparse
import psutil import psutil
import torch import torch
import tensorflow import tensorflow
from pathlib import Path import multiprocessing
import multiprocessing as mp
from opennsfw2 import predict_video_frames, predict_image from opennsfw2 import predict_video_frames, predict_image
import cv2 import cv2
import roop.globals import roop.globals
from roop.swapper import process_video, process_img, process_faces, process_frames
from roop.utils import is_img, detect_fps, set_fps, create_video, add_audio, extract_frames, rreplace
from roop.analyser import get_face_single
import roop.ui as ui import roop.ui as ui
from roop.swapper import process_video, process_image
from roop.utilities import has_image_extention, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frames_paths, restore_audio, create_temp, move_temp, clean_temp
from roop.analyser import get_one_face
signal.signal(signal.SIGINT, lambda signal_number, frame: quit()) if 'ROCMExecutionProvider' in roop.globals.providers:
parser = argparse.ArgumentParser() del torch
parser.add_argument('-f', '--face', help='use this face', dest='source_img')
parser.add_argument('-t', '--target', help='replace this face', dest='target_path')
parser.add_argument('-o', '--output', help='save output to this file', dest='output_file')
parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False)
parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False)
parser.add_argument('--all-faces', help='swap all faces in frame', dest='all_faces', action='store_true', default=False)
parser.add_argument('--max-memory', help='maximum amount of RAM in GB to be used', dest='max_memory', type=int)
parser.add_argument('--cpu-cores', help='number of CPU cores to use', dest='cpu_cores', type=int, default=max(psutil.cpu_count() / 2, 1))
parser.add_argument('--gpu-threads', help='number of threads to be use for the GPU', dest='gpu_threads', type=int, default=8)
parser.add_argument('--gpu-vendor', help='choice your GPU vendor', dest='gpu_vendor', choices=['apple', 'amd', 'intel', 'nvidia'])
args = parser.parse_known_args()[0] warnings.simplefilter(action='ignore', category=FutureWarning)
if 'all_faces' in args:
roop.globals.all_faces = True
if args.cpu_cores: def parse_args() -> None:
roop.globals.cpu_cores = int(args.cpu_cores) signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--face', help='use a face image', dest='source_path')
parser.add_argument('-t', '--target', help='replace image or video with face', dest='target_path')
parser.add_argument('-o', '--output', help='save output to this file', dest='output_path')
parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False)
parser.add_argument('--keep-audio', help='maintain original audio', dest='keep_audio', action='store_true', default=True)
parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False)
parser.add_argument('--many-faces', help='swap every face in the frame', dest='many_faces', action='store_true', default=False)
parser.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264')
parser.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18)
parser.add_argument('--max-memory', help='maximum amount of RAM in GB to be used', dest='max_memory', type=int, default=suggest_max_memory())
parser.add_argument('--cpu-cores', help='number of CPU cores to use', dest='cpu_cores', type=int, default=suggest_cpu_cores())
parser.add_argument('--gpu-threads', help='number of threads to be use for the GPU', dest='gpu_threads', type=int, default=suggest_gpu_threads())
parser.add_argument('--gpu-vendor', help='select your GPU vendor', dest='gpu_vendor', choices=['apple', 'amd', 'nvidia'])
# cpu thread fix for mac args = parser.parse_known_args()[0]
if sys.platform == 'darwin':
roop.globals.cpu_cores = 1
if args.gpu_threads: roop.globals.source_path = args.source_path
roop.globals.gpu_threads = int(args.gpu_threads) roop.globals.target_path = args.target_path
roop.globals.output_path = args.output_path
roop.globals.headless = args.source_path or args.target_path or args.output_path
roop.globals.keep_fps = args.keep_fps
roop.globals.keep_audio = args.keep_audio
roop.globals.keep_frames = args.keep_frames
roop.globals.many_faces = args.many_faces
roop.globals.video_encoder = args.video_encoder
roop.globals.video_quality = args.video_quality
roop.globals.max_memory = args.max_memory
roop.globals.cpu_cores = args.cpu_cores
roop.globals.gpu_threads = args.gpu_threads
# gpu thread fix for amd if args.gpu_vendor:
if args.gpu_vendor == 'amd':
roop.globals.gpu_threads = 1
if args.gpu_vendor:
roop.globals.gpu_vendor = args.gpu_vendor roop.globals.gpu_vendor = args.gpu_vendor
else: else:
roop.globals.providers = ['CPUExecutionProvider'] roop.globals.providers = ['CPUExecutionProvider']
sep = "/"
if os.name == "nt": def suggest_max_memory() -> int:
sep = "\\" if platform.system().lower() == 'darwin':
return 4
return 16
def limit_resources(): def suggest_gpu_threads() -> int:
if 'ROCMExecutionProvider' in roop.globals.providers:
return 2
return 8
def suggest_cpu_cores() -> int:
if platform.system().lower() == 'darwin':
return 2
return int(max(psutil.cpu_count() / 2, 1))
def limit_resources() -> None:
# prevent tensorflow memory leak # prevent tensorflow memory leak
gpus = tensorflow.config.experimental.list_physical_devices('GPU') gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus: for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True) tensorflow.config.experimental.set_memory_growth(gpu, True)
if args.max_memory: if roop.globals.max_memory:
memory = args.max_memory * 1024 * 1024 * 1024 memory = roop.globals.max_memory * 1024 ** 3
if str(platform.system()).lower() == 'windows': if platform.system().lower() == 'darwin':
memory = roop.globals.max_memory * 1024 ** 6
if platform.system().lower() == 'windows':
import ctypes import ctypes
kernel32 = ctypes.windll.kernel32 kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory)) kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
@@ -82,9 +107,9 @@ def limit_resources():
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory)) resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
def pre_check(): def pre_check() -> None:
if sys.version_info < (3, 9): if sys.version_info < (3, 9):
quit('Python version is not supported - please upgrade to 3.9 or higher') quit('Python version is not supported - please upgrade to 3.9 or higher.')
if not shutil.which('ffmpeg'): if not shutil.which('ffmpeg'):
quit('ffmpeg is not installed!') quit('ffmpeg is not installed!')
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx') model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx')
@@ -92,189 +117,117 @@ def pre_check():
quit('File "inswapper_128.onnx" does not exist!') quit('File "inswapper_128.onnx" does not exist!')
if roop.globals.gpu_vendor == 'apple': if roop.globals.gpu_vendor == 'apple':
if 'CoreMLExecutionProvider' not in roop.globals.providers: if 'CoreMLExecutionProvider' not in roop.globals.providers:
quit("You are using --gpu=apple flag but CoreML isn't available or properly installed on your system.") quit('You are using --gpu=apple flag but CoreML is not available or properly installed on your system.')
if roop.globals.gpu_vendor == 'amd': if roop.globals.gpu_vendor == 'amd':
if 'ROCMExecutionProvider' not in roop.globals.providers: if 'ROCMExecutionProvider' not in roop.globals.providers:
quit("You are using --gpu=amd flag but ROCM isn't available or properly installed on your system.") quit('You are using --gpu=amd flag but ROCM is not available or properly installed on your system.')
if roop.globals.gpu_vendor == 'nvidia': if roop.globals.gpu_vendor == 'nvidia':
CUDA_VERSION = torch.version.cuda
CUDNN_VERSION = torch.backends.cudnn.version()
if not torch.cuda.is_available(): if not torch.cuda.is_available():
quit("You are using --gpu=nvidia flag but CUDA isn't available or properly installed on your system.") quit('You are using --gpu=nvidia flag but CUDA is not available or properly installed on your system.')
if CUDA_VERSION > '11.8': if torch.version.cuda > '11.8':
quit(f"CUDA version {CUDA_VERSION} is not supported - please downgrade to 11.8") quit(f'CUDA version {torch.version.cuda} is not supported - please downgrade to 11.8')
if CUDA_VERSION < '11.4': if torch.version.cuda < '11.4':
quit(f"CUDA version {CUDA_VERSION} is not supported - please upgrade to 11.8") quit(f'CUDA version {torch.version.cuda} is not supported - please upgrade to 11.8')
if CUDNN_VERSION < 8220: if torch.backends.cudnn.version() < 8220:
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please upgrade to 8.9.1") quit(f'CUDNN version { torch.backends.cudnn.version()} is not supported - please upgrade to 8.9.1')
if CUDNN_VERSION > 8910: if torch.backends.cudnn.version() > 8910:
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please downgrade to 8.9.1") quit(f'CUDNN version { torch.backends.cudnn.version()} is not supported - please downgrade to 8.9.1')
def get_video_frame(video_path, frame_number = 1): def conditional_process_video(source_path: str, frame_paths: List[str]) -> None:
cap = cv2.VideoCapture(video_path) pool_amount = len(frame_paths) // roop.globals.cpu_cores
amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) if pool_amount > 2 and roop.globals.cpu_cores > 1 and roop.globals.gpu_vendor is None:
cap.set(cv2.CAP_PROP_POS_FRAMES, min(amount_of_frames, frame_number-1)) global POOL
if not cap.isOpened(): POOL = multiprocessing.Pool(roop.globals.cpu_cores, maxtasksperchild=1)
print("Error opening video file") pools = []
return for i in range(0, len(frame_paths), pool_amount):
ret, frame = cap.read() pool = POOL.apply_async(process_video, args=(source_path, frame_paths[i:i + pool_amount], 'cpu'))
if ret: pools.append(pool)
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) for pool in pools:
pool.get()
cap.release()
def preview_video(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("Error opening video file")
return 0
amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
ret, frame = cap.read()
if ret:
frame = get_video_frame(video_path)
cap.release()
return (amount_of_frames, frame)
def status(string):
value = "Status: " + string
if 'cli_mode' in args:
print(value)
else:
ui.update_status_label(value)
def process_video_multi_cores(source_img, frame_paths):
n = len(frame_paths) // roop.globals.cpu_cores
if n > 2:
processes = []
for i in range(0, len(frame_paths), n):
p = POOL.apply_async(process_video, args=(source_img, frame_paths[i:i + n],))
processes.append(p)
for p in processes:
p.get()
POOL.close() POOL.close()
POOL.join() POOL.join()
else:
process_video(roop.globals.source_path, frame_paths, 'gpu')
def start(preview_callback = None): def update_status(message: str) -> None:
if not args.source_img or not os.path.isfile(args.source_img): value = 'Status: ' + message
print("\n[WARNING] Please select an image containing a face.") print(value)
if not roop.globals.headless:
ui.update_status(value)
def start() -> None:
if not roop.globals.source_path or not os.path.isfile(roop.globals.source_path):
update_status('Select an image that contains a face.')
return return
elif not args.target_path or not os.path.isfile(args.target_path): elif not roop.globals.target_path or not os.path.isfile(roop.globals.target_path):
print("\n[WARNING] Please select a video/image to swap face in.") update_status('Select an image or video target!')
return return
if not args.output_file: test_face = get_one_face(cv2.imread(roop.globals.source_path))
target_path = args.target_path
args.output_file = rreplace(target_path, "/", "/swapped-", 1) if "/" in target_path else "swapped-" + target_path
target_path = args.target_path
test_face = get_face_single(cv2.imread(args.source_img))
if not test_face: if not test_face:
print("\n[WARNING] No face detected in source image. Please try with another one.\n") update_status('No face detected in source image. Please try with another one!')
return return
if is_img(target_path): # process image to image
if predict_image(target_path) > 0.85: if has_image_extention(roop.globals.target_path):
quit() if predict_image(roop.globals.target_path) > 0.85:
process_img(args.source_img, target_path, args.output_file) destroy()
status("swap successful!") process_image(roop.globals.source_path, roop.globals.target_path, roop.globals.output_path)
if is_image(roop.globals.target_path):
update_status('Swapping to image succeed!')
else:
update_status('Swapping to image failed!')
return return
# seconds, probabilities = predict_video_frames(video_path=args.target_path, frame_interval=100) # process image to videos
# seconds, probabilities = predict_video_frames(video_path=roop.globals.target_path, frame_interval=100)
# if any(probability > 0.85 for probability in probabilities): # if any(probability > 0.85 for probability in probabilities):
# quit() # destroy()
video_name_full = target_path.split("/")[-1] update_status('Creating temp resources...')
video_name = os.path.splitext(video_name_full)[0] create_temp(roop.globals.target_path)
output_dir = os.path.dirname(target_path) + "/" + video_name if os.path.dirname(target_path) else video_name update_status('Extracting frames...')
Path(output_dir).mkdir(exist_ok=True) extract_frames(roop.globals.target_path)
status("detecting video's FPS...") frame_paths = get_temp_frames_paths(roop.globals.target_path)
fps, exact_fps = detect_fps(target_path) update_status('Swapping in progress...')
if not args.keep_fps and fps > 30: conditional_process_video(roop.globals.source_path, frame_paths)
this_path = output_dir + "/" + video_name + ".mp4" # prevent memory leak using ffmpeg with cuda
set_fps(target_path, this_path, 30) if roop.globals.gpu_vendor == 'nvidia':
target_path, exact_fps = this_path, 30 torch.cuda.empty_cache()
if roop.globals.keep_fps:
update_status('Detecting fps...')
fps = detect_fps(roop.globals.target_path)
update_status(f'Creating video with {fps} fps...')
create_video(roop.globals.target_path, fps)
else: else:
shutil.copy(target_path, output_dir) update_status('Creating video with 30 fps...')
status("extracting frames...") create_video(roop.globals.target_path, 30)
extract_frames(target_path, output_dir) if roop.globals.keep_audio:
args.frame_paths = tuple(sorted( if roop.globals.keep_fps:
glob.glob(output_dir + "/*.png"), update_status('Restoring audio...')
key=lambda x: int(x.split(sep)[-1].replace(".png", ""))
))
status("swapping in progress...")
if roop.globals.gpu_vendor is None and roop.globals.cpu_cores > 1:
global POOL
POOL = mp.Pool(roop.globals.cpu_cores)
process_video_multi_cores(args.source_img, args.frame_paths)
else: else:
process_video(args.source_img, args.frame_paths) update_status('Restoring audio might cause issues as fps are not kept...')
status("creating video...") restore_audio(roop.globals.target_path, roop.globals.output_path)
create_video(video_name, exact_fps, output_dir) else:
status("adding audio...") move_temp(roop.globals.target_path, roop.globals.output_path)
add_audio(output_dir, target_path, video_name_full, args.keep_frames, args.output_file) clean_temp(roop.globals.target_path)
save_path = args.output_file if args.output_file else output_dir + "/" + video_name + ".mp4" if is_video(roop.globals.target_path):
print("\n\nVideo saved as:", save_path, "\n\n") update_status('Swapping to video succeed!')
status("swap successful!") else:
update_status('Swapping to video failed!')
def select_face_handler(path: str): def destroy() -> None:
args.source_img = path if roop.globals.target_path:
clean_temp(roop.globals.target_path)
quit()
def select_target_handler(path: str): def run() -> None:
args.target_path = path parse_args()
return preview_video(args.target_path)
def toggle_all_faces_handler(value: int):
roop.globals.all_faces = True if value == 1 else False
def toggle_fps_limit_handler(value: int):
args.keep_fps = int(value != 1)
def toggle_keep_frames_handler(value: int):
args.keep_frames = value
def save_file_handler(path: str):
args.output_file = path
def create_test_preview(frame_number):
return process_faces(
get_face_single(cv2.imread(args.source_img)),
get_video_frame(args.target_path, frame_number)
)
def run():
global all_faces, keep_frames, limit_fps
pre_check() pre_check()
limit_resources() limit_resources()
if args.source_img: if roop.globals.headless:
args.cli_mode = True
start() start()
quit() else:
window = ui.init(start, destroy)
window = ui.init(
{
'all_faces': roop.globals.all_faces,
'keep_fps': args.keep_fps,
'keep_frames': args.keep_frames
},
select_face_handler,
select_target_handler,
toggle_all_faces_handler,
toggle_fps_limit_handler,
toggle_keep_frames_handler,
save_file_handler,
start,
get_video_frame,
create_test_preview
)
window.mainloop() window.mainloop()

View File

@@ -1,10 +1,20 @@
import onnxruntime import onnxruntime
all_faces = None source_path = None
log_level = 'error' target_path = None
output_path = None
keep_fps = None
keep_audio = None
keep_frames = None
many_faces = None
video_encoder = None
video_quality = None
max_memory = None
cpu_cores = None cpu_cores = None
gpu_threads = None gpu_threads = None
gpu_vendor = None gpu_vendor = None
headless = None
log_level = 'error'
providers = onnxruntime.get_available_providers() providers = onnxruntime.get_available_providers()
if 'TensorrtExecutionProvider' in providers: if 'TensorrtExecutionProvider' in providers:

View File

@@ -1,17 +1,18 @@
import os import os
from typing import Any
from tqdm import tqdm from tqdm import tqdm
import cv2 import cv2
import insightface import insightface
import threading import threading
import roop.globals import roop.globals
from roop.analyser import get_face_single, get_face_many from roop.analyser import get_one_face, get_many_faces
FACE_SWAPPER = None FACE_SWAPPER = None
THREAD_LOCK = threading.Lock() THREAD_LOCK = threading.Lock()
def get_face_swapper(): def get_face_swapper() -> None:
global FACE_SWAPPER global FACE_SWAPPER
with THREAD_LOCK: with THREAD_LOCK:
if FACE_SWAPPER is None: if FACE_SWAPPER is None:
@@ -20,27 +21,27 @@ def get_face_swapper():
return FACE_SWAPPER return FACE_SWAPPER
def swap_face_in_frame(source_face, target_face, frame): def swap_face_in_frame(source_face: Any, target_face: Any, frame: Any) -> None:
if target_face: if target_face:
return get_face_swapper().get(frame, target_face, source_face, paste_back=True) return get_face_swapper().get(frame, target_face, source_face, paste_back=True)
return frame return frame
def process_faces(source_face, target_frame): def process_faces(source_face: Any, target_frame: Any) -> Any:
if roop.globals.all_faces: if roop.globals.many_faces:
many_faces = get_face_many(target_frame) many_faces = get_many_faces(target_frame)
if many_faces: if many_faces:
for face in many_faces: for face in many_faces:
target_frame = swap_face_in_frame(source_face, face, target_frame) target_frame = swap_face_in_frame(source_face, face, target_frame)
else: else:
face = get_face_single(target_frame) face = get_one_face(target_frame)
if face: if face:
target_frame = swap_face_in_frame(source_face, face, target_frame) target_frame = swap_face_in_frame(source_face, face, target_frame)
return target_frame return target_frame
def process_frames(source_img, frame_paths, progress=None): def process_frames(source_path: str, frame_paths: [str], progress=None) -> None:
source_face = get_face_single(cv2.imread(source_img)) source_face = get_one_face(cv2.imread(source_path))
for frame_path in frame_paths: for frame_path in frame_paths:
frame = cv2.imread(frame_path) frame = cv2.imread(frame_path)
try: try:
@@ -53,16 +54,14 @@ def process_frames(source_img, frame_paths, progress=None):
progress.update(1) progress.update(1)
def multi_process_frame(source_img, frame_paths, progress): def multi_process_frame(source_img, frame_paths, progress) -> None:
threads = [] threads = []
num_threads = roop.globals.gpu_threads frames_per_thread = len(frame_paths) // roop.globals.gpu_threads
num_frames_per_thread = len(frame_paths) // num_threads remaining_frames = len(frame_paths) % roop.globals.gpu_threads
remaining_frames = len(frame_paths) % num_threads
# create thread and launch
start_index = 0 start_index = 0
for _ in range(num_threads): # create threads by frames
end_index = start_index + num_frames_per_thread for _ in range(roop.globals.gpu_threads):
end_index = start_index + frames_per_thread
if remaining_frames > 0: if remaining_frames > 0:
end_index += 1 end_index += 1
remaining_frames -= 1 remaining_frames -= 1
@@ -71,26 +70,26 @@ def multi_process_frame(source_img, frame_paths, progress):
threads.append(thread) threads.append(thread)
thread.start() thread.start()
start_index = end_index start_index = end_index
# join threads
# threading
for thread in threads: for thread in threads:
thread.join() thread.join()
def process_img(source_img, target_path, output_file): def process_image(source_path: str, target_path: str, output_file) -> None:
frame = cv2.imread(target_path) frame = cv2.imread(target_path)
face = get_face_single(frame) target_frame = get_one_face(frame)
source_face = get_face_single(cv2.imread(source_img)) source_face = get_one_face(cv2.imread(source_path))
result = get_face_swapper().get(frame, face, source_face, paste_back=True) result = get_face_swapper().get(frame, target_frame, source_face, paste_back=True)
cv2.imwrite(output_file, result) cv2.imwrite(output_file, result)
print("\n\nImage saved as:", output_file, "\n\n")
def process_video(source_img, frame_paths): def process_video(source_path: str, frame_paths: [str], mode: str) -> None:
do_multi = roop.globals.gpu_vendor is not None and roop.globals.gpu_threads > 1
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
with tqdm(total=len(frame_paths), desc="Processing", unit="frame", dynamic_ncols=True, bar_format=progress_bar_format) as progress: total = len(frame_paths)
if do_multi: with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
multi_process_frame(source_img, frame_paths, progress) if mode == 'cpu':
else: progress.set_postfix({'mode': mode, 'cores': roop.globals.cpu_cores, 'memory': roop.globals.max_memory})
process_frames(source_img, frame_paths, progress) process_frames(source_path, frame_paths, progress)
elif mode == 'gpu':
progress.set_postfix({'mode': mode, 'threads': roop.globals.gpu_threads, 'memory': roop.globals.max_memory})
multi_process_frame(source_path, frame_paths, progress)

View File

@@ -1,315 +1,246 @@
import os
import tkinter as tk import tkinter as tk
from typing import Any, Callable, Tuple
from PIL import Image, ImageTk
import webbrowser
from tkinter import filedialog from tkinter import filedialog
from tkinter.filedialog import asksaveasfilename from typing import Callable, Any, Tuple
import threading
from roop.utils import is_img import cv2
from PIL import Image, ImageTk, ImageOps
max_preview_size = 800 import roop.globals
from roop.analyser import get_one_face
from roop.capturer import get_video_frame
from roop.swapper import process_faces
from roop.utilities import is_image, is_video
PRIMARY_COLOR = '#2d3436'
SECONDARY_COLOR = '#74b9ff'
TERTIARY_COLOR = '#f1c40f'
ACCENT_COLOR = '#2ecc71'
WINDOW_HEIGHT = 700
WINDOW_WIDTH = 600
PREVIEW_MAX_HEIGHT = 700
PREVIEW_MAX_WIDTH = 1200
RECENT_DIRECTORY_SOURCE = None
RECENT_DIRECTORY_TARGET = None
RECENT_DIRECTORY_OUTPUT = None
def create_preview(parent): def init(start: Callable, destroy: Callable) -> tk.Tk:
global preview_image_frame, preview_frame_slider, test_button global ROOT, PREVIEW
preview_window = tk.Toplevel(parent) ROOT = create_root(start, destroy)
# Override close button PREVIEW = create_preview(ROOT)
preview_window.protocol("WM_DELETE_WINDOW", hide_preview)
preview_window.withdraw()
preview_window.title("Preview")
preview_window.configure(bg="red")
preview_window.resizable(width=False, height=False)
frame = tk.Frame(preview_window, background="#2d3436") return ROOT
frame.pack(fill='both', side='left', expand='True')
# Preview image
preview_image_frame = tk.Label(frame)
preview_image_frame.pack(side='top')
# Bottom frame
buttons_frame = tk.Frame(frame, background="#2d3436")
buttons_frame.pack(fill='both', side='bottom')
current_frame = tk.IntVar()
preview_frame_slider = tk.Scale(
buttons_frame,
from_=0,
to=0,
orient='horizontal',
variable=current_frame
)
preview_frame_slider.pack(fill='both', side='left', expand='True')
test_button = tk.Button(buttons_frame, text="Test", bg="#f1c40f", relief="flat", width=15, borderwidth=0, highlightthickness=0)
test_button.pack(side='right', fill='y')
return preview_window
def show_preview(): def create_root(start: Callable, destroy: Callable) -> tk.Tk:
preview.deiconify() global source_label, target_label, status_label
preview_visible.set(True)
root = tk.Tk()
root.minsize(WINDOW_WIDTH, WINDOW_HEIGHT)
root.title('roop')
root.configure(bg=PRIMARY_COLOR)
root.option_add('*Font', ('Arial', 11))
root.protocol('WM_DELETE_WINDOW', lambda: destroy())
source_label = tk.Label(root, bg=PRIMARY_COLOR)
source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25)
target_label = tk.Label(root, bg=PRIMARY_COLOR)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25)
source_button = create_primary_button(root, 'Select a face', lambda: select_source_path())
source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1)
target_button = create_primary_button(root, 'Select a target', lambda: select_target_path())
target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
keep_fps_value = tk.BooleanVar(value=roop.globals.keep_fps)
keep_fps_checkbox = create_checkbox(root, 'Limit to 30 fps', keep_fps_value, lambda: setattr(roop.globals, 'keep_fps', not roop.globals.keep_fps))
keep_fps_checkbox.place(relx=0.1, rely=0.6)
keep_frames_value = tk.BooleanVar(value=roop.globals.keep_frames)
keep_frames_checkbox = create_checkbox(root, 'Keep frames dir', keep_frames_value, lambda: setattr(roop.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_checkbox.place(relx=0.1, rely=0.65)
keep_audio_value = tk.BooleanVar(value=roop.globals.keep_audio)
keep_audio_checkbox = create_checkbox(root, 'Keep original audio', keep_audio_value, lambda: setattr(roop.globals, 'keep_audio', keep_audio_value.get()))
keep_audio_checkbox.place(relx=0.6, rely=0.6)
many_faces_value = tk.BooleanVar(value=roop.globals.many_faces)
many_faces_checkbox = create_checkbox(root, 'Replace all faces', many_faces_value, lambda: setattr(roop.globals, 'many_faces', many_faces_value.get()))
many_faces_checkbox.place(relx=0.6, rely=0.65)
start_button = create_secondary_button(root, 'Start', lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.75, relwidth=0.2, relheight=0.05)
stop_button = create_secondary_button(root, 'Destroy', lambda: destroy())
stop_button.place(relx=0.4, rely=0.75, relwidth=0.2, relheight=0.05)
preview_button = create_secondary_button(root, 'Preview', lambda: toggle_preview())
preview_button.place(relx=0.65, rely=0.75, relwidth=0.2, relheight=0.05)
status_label = tk.Label(root, justify='center', text='Status: None', fg=ACCENT_COLOR, bg=PRIMARY_COLOR)
status_label.place(relx=0.1, rely=0.9)
return root
def hide_preview(): def create_preview(parent) -> tk.Toplevel:
global preview_label, preview_scale
preview = tk.Toplevel(parent)
preview.withdraw() preview.withdraw()
preview_visible.set(False) preview.title('Preview')
preview.configure(bg=PRIMARY_COLOR)
preview.option_add('*Font', ('Arial', 11))
preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview())
preview.resizable(width=False, height=False)
preview_label = tk.Label(preview, bg=PRIMARY_COLOR)
preview_label.pack(fill='both', expand=True)
preview_scale = tk.Scale(preview, orient='horizontal', command=lambda frame_value: update_preview(int(frame_value)))
preview_scale.pack(fill='x')
return preview
def set_preview_handler(test_handler): def create_primary_button(parent: Any, text: str, command: Callable) -> tk.Button:
test_button.config(command = test_handler)
def init_slider(frames_count, change_handler):
preview_frame_slider.configure(to=frames_count, command=lambda value: change_handler(preview_frame_slider.get()))
preview_frame_slider.set(0)
def update_preview(frame):
img = Image.fromarray(frame)
width, height = img.size
aspect_ratio = 1
if width > height:
aspect_ratio = max_preview_size / width
else:
aspect_ratio = max_preview_size / height
img = img.resize(
(
int(width * aspect_ratio),
int(height * aspect_ratio)
),
Image.ANTIALIAS
)
photo_img = ImageTk.PhotoImage(img)
preview_image_frame.configure(image=photo_img)
preview_image_frame.image = photo_img
def select_face(select_face_handler: Callable[[str], None]):
if select_face_handler:
path = filedialog.askopenfilename(title="Select a face")
preview_face(path)
return select_face_handler(path)
return None
def update_slider_handler(get_video_frame, video_path):
return lambda frame_number: update_preview(get_video_frame(video_path, frame_number))
def test_preview(create_test_preview):
frame = create_test_preview(preview_frame_slider.get())
update_preview(frame)
def update_slider(get_video_frame, create_test_preview, video_path, frames_amount):
init_slider(frames_amount, update_slider_handler(get_video_frame, video_path))
set_preview_handler(lambda: preview_thread(lambda: test_preview(create_test_preview)))
def analyze_target(select_target_handler: Callable[[str], Tuple[int, Any]], target_path: tk.StringVar, frames_amount: tk.IntVar):
path = filedialog.askopenfilename(title="Select a target")
target_path.set(path)
amount, frame = select_target_handler(path)
frames_amount.set(amount)
preview_target(frame)
update_preview(frame)
def select_target(select_target_handler: Callable[[str], Tuple[int, Any]], target_path: tk.StringVar, frames_amount: tk.IntVar):
if select_target_handler:
analyze_target(select_target_handler, target_path, frames_amount)
def save_file(save_file_handler: Callable[[str], None], target_path: str):
filename, ext = 'output.mp4', '.mp4'
if is_img(target_path):
filename, ext = 'output.png', '.png'
if save_file_handler:
return save_file_handler(asksaveasfilename(initialfile=filename, defaultextension=ext, filetypes=[("All Files","*.*"),("Videos","*.mp4")]))
return None
def toggle_all_faces(toggle_all_faces_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_all_faces_handler:
return lambda: toggle_all_faces_handler(variable.get())
return None
def toggle_fps_limit(toggle_all_faces_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_all_faces_handler:
return lambda: toggle_all_faces_handler(variable.get())
return None
def toggle_keep_frames(toggle_keep_frames_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_keep_frames_handler:
return lambda: toggle_keep_frames_handler(variable.get())
return None
def create_button(parent, text, command):
return tk.Button( return tk.Button(
parent, parent,
text=text, text=text,
command=command, command=command,
bg="#f1c40f", bg=PRIMARY_COLOR,
relief="flat", fg=SECONDARY_COLOR,
relief='flat',
highlightthickness=4,
highlightbackground=SECONDARY_COLOR,
activebackground=SECONDARY_COLOR,
borderwidth=4
)
def create_secondary_button(parent: Any, text: str, command: Callable) -> tk.Button:
return tk.Button(
parent,
text=text,
command=command,
bg=TERTIARY_COLOR,
relief='flat',
borderwidth=0, borderwidth=0,
highlightthickness=0 highlightthickness=0
) )
def create_background_button(parent, text, command): def create_checkbox(parent: Any, text: str, variable: tk.BooleanVar, command: Callable) -> tk.Checkbutton:
button = create_button(parent, text, command)
button.configure(
bg="#2d3436",
fg="#74b9ff",
highlightthickness=4,
highlightbackground="#74b9ff",
activebackground="#74b9ff",
borderwidth=4
)
return button
def create_check(parent, text, variable, command):
return tk.Checkbutton( return tk.Checkbutton(
parent, parent,
anchor="w",
relief="groove",
activebackground="#2d3436",
activeforeground="#74b9ff",
selectcolor="black",
text=text, text=text,
fg="#dfe6e9",
borderwidth=0,
highlightthickness=0,
bg="#2d3436",
variable=variable, variable=variable,
command=command command=command,
relief='flat',
bg=PRIMARY_COLOR,
activebackground=PRIMARY_COLOR,
activeforeground=SECONDARY_COLOR,
selectcolor=PRIMARY_COLOR,
fg=SECONDARY_COLOR,
borderwidth=0,
highlightthickness=0
) )
def preview_thread(thread_function): def update_status(text: str) -> None:
threading.Thread(target=thread_function).start() status_label['text'] = text
ROOT.update()
def open_preview_window(get_video_frame, target_path): def select_source_path():
if preview_visible.get(): global RECENT_DIRECTORY_SOURCE
hide_preview() source_path = filedialog.askopenfilename(title='Select an face image', initialdir=RECENT_DIRECTORY_SOURCE)
if is_image(source_path):
roop.globals.source_path = source_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(roop.globals.source_path)
image = render_image_preview(roop.globals.source_path, (200, 200))
source_label.configure(image=image)
source_label.image = image
else: else:
show_preview() roop.globals.source_path = None
if target_path: source_label.configure(image=None)
frame = get_video_frame(target_path) source_label.image = None
update_preview(frame)
def preview_face(path): def select_target_path():
img = Image.open(path) global RECENT_DIRECTORY_TARGET
img = img.resize((180, 180), Image.ANTIALIAS) target_path = filedialog.askopenfilename(title='Select an image or video target', initialdir=RECENT_DIRECTORY_TARGET)
photo_img = ImageTk.PhotoImage(img) if is_image(target_path):
face_label.configure(image=photo_img) roop.globals.target_path = target_path
face_label.image = photo_img RECENT_DIRECTORY_TARGET = os.path.dirname(roop.globals.target_path)
image = render_image_preview(roop.globals.target_path)
target_label.configure(image=image)
target_label.image = image
elif is_video(target_path):
roop.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(roop.globals.target_path)
video_frame = render_video_preview(target_path, (200, 200))
target_label.configure(image=video_frame)
target_label.image = video_frame
else:
roop.globals.target_path = None
target_label.configure(image=None)
target_label.image = None
def preview_target(frame): def select_output_path(start):
img = Image.fromarray(frame) global RECENT_DIRECTORY_OUTPUT
img = img.resize((180, 180), Image.ANTIALIAS) if is_image(roop.globals.target_path):
photo_img = ImageTk.PhotoImage(img) output_path = filedialog.asksaveasfilename(title='Save image output', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
target_label.configure(image=photo_img) elif is_video(roop.globals.target_path):
target_label.image = photo_img output_path = filedialog.asksaveasfilename(title='Save video output', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
if output_path:
roop.globals.output_path = output_path
RECENT_DIRECTORY_OUTPUT = os.path.dirname(roop.globals.output_path)
start()
def update_status_label(value): def render_image_preview(image_path: str, dimensions: Tuple[int, int] = None) -> ImageTk.PhotoImage:
status_label["text"] = value image = Image.open(image_path)
window.update() if dimensions:
image = ImageOps.fit(image, dimensions, Image.LANCZOS)
return ImageTk.PhotoImage(image)
def init( def render_video_preview(video_path: str, dimensions: Tuple[int, int] = None, frame_number: int = 1) -> ImageTk.PhotoImage:
initial_values: dict, capture = cv2.VideoCapture(video_path)
select_face_handler: Callable[[str], None], if frame_number:
select_target_handler: Callable[[str], Tuple[int, Any]], capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
toggle_all_faces_handler: Callable[[int], None], has_frame, frame = capture.read()
toggle_fps_limit_handler: Callable[[int], None], if has_frame:
toggle_keep_frames_handler: Callable[[int], None], image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
save_file_handler: Callable[[str], None], if dimensions:
start: Callable[[], None], image = ImageOps.fit(image, dimensions, Image.LANCZOS)
get_video_frame: Callable[[str, int], None], return ImageTk.PhotoImage(image)
create_test_preview: Callable[[int], Any], capture.release()
): cv2.destroyAllWindows()
global window, preview, preview_visible, face_label, target_label, status_label
window = tk.Tk()
window.geometry("600x700")
window.title("roop")
window.configure(bg="#2d3436")
window.resizable(width=False, height=False)
preview_visible = tk.BooleanVar(window, False) def toggle_preview() -> None:
target_path = tk.StringVar() if PREVIEW.state() == 'normal':
frames_amount = tk.IntVar() PREVIEW.withdraw()
else:
update_preview(1)
PREVIEW.deiconify()
# Preview window
preview = create_preview(window)
# Contact information def update_preview(frame_number: int) -> None:
support_link = tk.Label(window, text="Donate to project <3", fg="#fd79a8", bg="#2d3436", cursor="hand2", font=("Arial", 8)) if roop.globals.source_path and roop.globals.target_path and frame_number:
support_link.place(x=180,y=20,width=250,height=30) video_frame = process_faces(
support_link.bind("<Button-1>", lambda e: webbrowser.open("https://github.com/sponsors/s0md3v")) get_one_face(cv2.imread(roop.globals.source_path)),
get_video_frame(roop.globals.target_path, frame_number)
left_frame = tk.Frame(window) )
left_frame.place(x=60, y=100, width=180, height=180) image = Image.fromarray(video_frame)
face_label = tk.Label(left_frame) image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
face_label.pack(fill='both', side='top', expand=True) image = ImageTk.PhotoImage(image)
preview_label.configure(image=image)
right_frame = tk.Frame(window) preview_label.image = image
right_frame.place(x=360, y=100, width=180, height=180)
target_label = tk.Label(right_frame)
target_label.pack(fill='both', side='top', expand=True)
# Select a face button
face_button = create_background_button(window, "Select a face", lambda: [
select_face(select_face_handler)
])
face_button.place(x=60,y=320,width=180,height=80)
# Select a target button
target_button = create_background_button(window, "Select a target", lambda: [
select_target(select_target_handler, target_path, frames_amount),
update_slider(get_video_frame, create_test_preview, target_path.get(), frames_amount.get())
])
target_button.place(x=360,y=320,width=180,height=80)
# All faces checkbox
all_faces = tk.IntVar(None, initial_values['all_faces'])
all_faces_checkbox = create_check(window, "Process all faces in frame", all_faces, toggle_all_faces(toggle_all_faces_handler, all_faces))
all_faces_checkbox.place(x=60,y=500,width=240,height=31)
# FPS limit checkbox
limit_fps = tk.IntVar(None, not initial_values['keep_fps'])
fps_checkbox = create_check(window, "Limit FPS to 30", limit_fps, toggle_fps_limit(toggle_fps_limit_handler, limit_fps))
fps_checkbox.place(x=60,y=475,width=240,height=31)
# Keep frames checkbox
keep_frames = tk.IntVar(None, initial_values['keep_frames'])
frames_checkbox = create_check(window, "Keep frames dir", keep_frames, toggle_keep_frames(toggle_keep_frames_handler, keep_frames))
frames_checkbox.place(x=60,y=450,width=240,height=31)
# Start button
start_button = create_button(window, "Start", lambda: [save_file(save_file_handler, target_path.get()), preview_thread(lambda: start(update_preview))])
start_button.place(x=170,y=560,width=120,height=49)
# Preview button
preview_button = create_button(window, "Preview", lambda: open_preview_window(get_video_frame, target_path.get()))
preview_button.place(x=310,y=560,width=120,height=49)
# Status label
status_label = tk.Label(window, width=580, justify="center", text="Status: waiting for input...", fg="#2ecc71", bg="#2d3436")
status_label.place(x=10,y=640,width=580,height=30)
return window

114
roop/utilities.py Normal file
View File

@@ -0,0 +1,114 @@
import glob
import os
import shutil
import subprocess
from pathlib import Path
from typing import List
import cv2
from PIL import Image
import roop.globals
TEMP_FILE = 'temp.mp4'
TEMP_DIRECTORY = 'temp'
def run_ffmpeg(args: List) -> None:
commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', roop.globals.log_level]
commands.extend(args)
try:
subprocess.check_output(commands, stderr=subprocess.STDOUT)
except Exception:
pass
def detect_fps(target_path: str) -> int:
command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path]
output = subprocess.check_output(command).decode().strip()
try:
return int(eval(output))
except Exception:
pass
return 30
def extract_frames(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-i', target_path, os.path.join(temp_directory_path, '%04d.png')])
def create_video(target_path: str, fps: int) -> None:
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-i', os.path.join(temp_directory_path, '%04d.png'), '-framerate', str(fps), '-c:v', roop.globals.video_encoder, '-crf', str(roop.globals.video_quality), '-pix_fmt', 'yuv420p', '-y', get_temp_file_path(target_path)])
def restore_audio(target_path: str, output_path: str) -> None:
temp_file_path = get_temp_file_path(target_path)
run_ffmpeg(['-i', temp_file_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
if not os.path.isfile(output_path):
move_temp(target_path, output_path)
def get_temp_frames_paths(target_path: str) -> List:
temp_directory_path = get_temp_directory_path(target_path)
return glob.glob(os.path.join(temp_directory_path, '*.png'))
def get_temp_directory_path(target_path: str) -> str:
filename, _ = os.path.splitext(os.path.basename(target_path))
target_name = os.path.dirname(target_path)
return os.path.join(target_name, TEMP_DIRECTORY, filename)
def get_temp_file_path(target_path: str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, TEMP_FILE)
def create_temp(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
Path(temp_directory_path).mkdir(parents=True, exist_ok=True)
def move_temp(target_path: str, output_path: str) -> None:
temp_file_path = get_temp_file_path(target_path)
if os.path.isfile(temp_file_path):
shutil.move(temp_file_path, output_path)
def clean_temp(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
parent_directory_path = os.path.dirname(temp_directory_path)
parent_directory_name = os.path.basename(parent_directory_path)
if not roop.globals.keep_frames and os.path.isdir(temp_directory_path):
shutil.rmtree(temp_directory_path)
if not os.listdir(parent_directory_path) and parent_directory_name == TEMP_DIRECTORY:
os.rmdir(parent_directory_path)
def has_image_extention(image_path: str) -> bool:
return image_path.lower().endswith(('png', 'jpg', 'jpeg'))
def is_image(image_path: str) -> bool:
if image_path and os.path.isfile(image_path):
try:
image = Image.open(image_path)
image.verify()
return True
except Exception:
pass
return False
def is_video(video_path: str) -> bool:
if video_path and os.path.isfile(video_path):
try:
capture = cv2.VideoCapture(video_path)
if capture.isOpened():
is_video, _ = capture.read()
capture.release()
return is_video
except Exception:
pass
return False

View File

@@ -1,72 +0,0 @@
import os
import shutil
import roop.globals
sep = "/"
if os.name == "nt":
sep = "\\"
def path(string):
if sep == "\\":
return string.replace("/", "\\")
return string
def run_command(command, mode="silent"):
if mode == "debug":
return os.system(command)
return os.popen(command).read()
def detect_fps(input_path):
input_path = path(input_path)
output = os.popen(f'ffprobe -v error -select_streams v -of default=noprint_wrappers=1:nokey=1 -show_entries stream=r_frame_rate "{input_path}"').read()
if "/" in output:
try:
return int(output.split("/")[0]) // int(output.split("/")[1].strip()), output.strip()
except:
pass
return 30, 30
def run_ffmpeg(args):
log_level = f'-loglevel {roop.globals.log_level}'
run_command(f'ffmpeg {log_level} {args}')
def set_fps(input_path, output_path, fps):
input_path, output_path = path(input_path), path(output_path)
run_ffmpeg(f'-i "{input_path}" -filter:v fps=fps={fps} "{output_path}"')
def create_video(video_name, fps, output_dir):
hwaccel_option = '-hwaccel cuda' if roop.globals.gpu_vendor == 'nvidia' else ''
output_dir = path(output_dir)
run_ffmpeg(f'{hwaccel_option} -framerate "{fps}" -i "{output_dir}{sep}%04d.png" -c:v libx264 -crf 7 -pix_fmt yuv420p -y "{output_dir}{sep}output.mp4"')
def extract_frames(input_path, output_dir):
hwaccel_option = '-hwaccel cuda' if roop.globals.gpu_vendor == 'nvidia' else ''
input_path, output_dir = path(input_path), path(output_dir)
run_ffmpeg(f' {hwaccel_option} -i "{input_path}" "{output_dir}{sep}%04d.png"')
def add_audio(output_dir, target_path, video, keep_frames, output_file):
video_name = os.path.splitext(video)[0]
save_to = output_file if output_file else output_dir + "/swapped-" + video_name + ".mp4"
save_to_ff, output_dir_ff = path(save_to), path(output_dir)
run_ffmpeg(f'-i "{output_dir_ff}{sep}output.mp4" -i "{output_dir_ff}{sep}{video}" -c:v copy -map 0:v:0 -map 1:a:0 -y "{save_to_ff}"')
if not os.path.isfile(save_to):
shutil.move(output_dir + "/output.mp4", save_to)
if not keep_frames:
shutil.rmtree(output_dir)
def is_img(path):
return path.lower().endswith(("png", "jpg", "jpeg", "bmp"))
def rreplace(s, old, new, occurrence):
li = s.rsplit(old, occurrence)
return new.join(li)