Merge branch 'main' into nsfw

# Conflicts:
#	roop/core.py
This commit is contained in:
Struchkov Mark 2023-06-05 10:37:45 +03:00
commit f11be8ee7e
Signed by: upagge
GPG Key ID: D3018BE7BA428CA6
8 changed files with 607 additions and 258 deletions

View File

@ -25,9 +25,8 @@ jobs:
uses: actions/setup-python@v2 uses: actions/setup-python@v2
with: with:
python-version: 3.9 python-version: 3.9
- run: pip install -r requirements.txt - run: pip install -r requirements.txt gdown
- run: pip install gdown - run: gdown 13QpWFWJ37EB-nHrEOY64CEtQWY-tz7DZ
- run: gdown 14JzEMo8ScLinvBkl7QEvYvFEi7EBXNAt
- run: ./run.py -f=.github/examples/face.jpg -t=.github/examples/target.mp4 -o=.github/examples/output.mp4 - run: ./run.py -f=.github/examples/face.jpg -t=.github/examples/target.mp4 -o=.github/examples/output.mp4
- run: ffmpeg -i .github/examples/snapshot.mp4 -i .github/examples/output.mp4 -filter_complex "psnr" -f null - - run: ffmpeg -i .github/examples/snapshot.mp4 -i .github/examples/output.mp4 -filter_complex "psnr" -f null -

View File

@ -40,14 +40,17 @@ options:
replace this face replace this face
-o OUTPUT_FILE, --output OUTPUT_FILE -o OUTPUT_FILE, --output OUTPUT_FILE
save output to this file save output to this file
--gpu use gpu
--keep-fps maintain original fps --keep-fps maintain original fps
--keep-frames keep frames directory --keep-frames keep frames directory
--all-faces swap all faces in frame
--max-memory MAX_MEMORY --max-memory MAX_MEMORY
maximum amount of RAM in GB to be used maximum amount of RAM in GB to be used
--max-cores CORES_COUNT --cpu-cores CPU_CORES
number of cores to be use for CPU mode number of CPU cores to use
--all-faces swap all faces in frame --gpu-threads GPU_THREADS
number of threads to be use for the GPU
--gpu-vendor {apple,amd,intel,nvidia}
choice your GPU vendor
``` ```
Looking for a CLI mode? Using the -f/--face argument will make the program in cli mode. Looking for a CLI mode? Using the -f/--face argument will make the program in cli mode.

View File

@ -1,3 +1,5 @@
--extra-index-url https://download.pytorch.org/whl/cu118
numpy==1.23.5 numpy==1.23.5
opencv-python==4.7.0.72 opencv-python==4.7.0.72
onnx==1.14.0 onnx==1.14.0
@ -5,8 +7,9 @@ insightface==0.7.3
psutil==5.9.5 psutil==5.9.5
tk==0.1.0 tk==0.1.0
pillow==9.5.0 pillow==9.5.0
torch==2.0.1 torch==2.0.1+cu118
onnxruntime==1.15.0; sys_platform == 'darwin' onnxruntime==1.15.0; sys_platform == 'darwin' and platform_machine != 'arm64'
onnxruntime-silicon==1.13.1; sys_platform == 'darwin' and platform_machine == 'arm64'
onnxruntime-gpu==1.15.0; sys_platform != 'darwin' onnxruntime-gpu==1.15.0; sys_platform != 'darwin'
tensorflow==2.13.0rc1; sys_platform == 'darwin' tensorflow==2.13.0rc1; sys_platform == 'darwin'
tensorflow==2.12.0; sys_platform != 'darwin' tensorflow==2.12.0; sys_platform != 'darwin'

View File

@ -1,62 +1,78 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os
import sys
# single thread doubles performance of gpu-mode - needs to be set before torch import
if any(arg.startswith('--gpu-vendor=') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
import platform import platform
import signal import signal
import sys
import shutil import shutil
import glob import glob
import argparse import argparse
import multiprocessing as mp
import os
import torch
from pathlib import Path
import tkinter as tk
from tkinter import filedialog
from opennsfw2 import predict_video_frames, predict_image
from tkinter.filedialog import asksaveasfilename
import webbrowser
import psutil import psutil
import torch
import tensorflow
from pathlib import Path
import multiprocessing as mp
from opennsfw2 import predict_video_frames, predict_image
import cv2 import cv2
import threading
from PIL import Image, ImageTk
import roop.globals import roop.globals
from roop.swapper import process_video, process_img from roop.swapper import process_video, process_img, process_faces, process_frames
from roop.utils import is_img, detect_fps, set_fps, create_video, add_audio, extract_frames, rreplace from roop.utils import is_img, detect_fps, set_fps, create_video, add_audio, extract_frames, rreplace
from roop.analyser import get_face_single from roop.analyser import get_face_single
import roop.ui as ui
if 'ROCMExecutionProvider' in roop.globals.providers:
del torch
pool = None
args = {}
signal.signal(signal.SIGINT, lambda signal_number, frame: quit()) signal.signal(signal.SIGINT, lambda signal_number, frame: quit())
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-f', '--face', help='use this face', dest='source_img') parser.add_argument('-f', '--face', help='use this face', dest='source_img')
parser.add_argument('-t', '--target', help='replace this face', dest='target_path') parser.add_argument('-t', '--target', help='replace this face', dest='target_path')
parser.add_argument('-o', '--output', help='save output to this file', dest='output_file') parser.add_argument('-o', '--output', help='save output to this file', dest='output_file')
parser.add_argument('--gpu', help='use gpu', dest='gpu', action='store_true', default=False)
parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False) parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False)
parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False) parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False)
parser.add_argument('--max-memory', help='maximum amount of RAM in GB to be used', type=int)
parser.add_argument('--max-cores', help='number of cores to be use for CPU mode', dest='cores_count', type=int, default=max(psutil.cpu_count() - 2, 2))
parser.add_argument('--all-faces', help='swap all faces in frame', dest='all_faces', action='store_true', default=False) parser.add_argument('--all-faces', help='swap all faces in frame', dest='all_faces', action='store_true', default=False)
parser.add_argument('--max-memory', help='maximum amount of RAM in GB to be used', dest='max_memory', type=int)
parser.add_argument('--cpu-cores', help='number of CPU cores to use', dest='cpu_cores', type=int, default=max(psutil.cpu_count() / 2, 1))
parser.add_argument('--gpu-threads', help='number of threads to be use for the GPU', dest='gpu_threads', type=int, default=8)
parser.add_argument('--gpu-vendor', help='choice your GPU vendor', dest='gpu_vendor', choices=['apple', 'amd', 'intel', 'nvidia'])
for name, value in vars(parser.parse_args()).items(): args = parser.parse_known_args()[0]
args[name] = value
if '--all-faces' in sys.argv or '-a' in sys.argv: if 'all_faces' in args:
roop.globals.all_faces = True roop.globals.all_faces = True
if args.cpu_cores:
roop.globals.cpu_cores = int(args.cpu_cores)
# cpu thread fix for mac
if sys.platform == 'darwin':
roop.globals.cpu_cores = 1
if args.gpu_threads:
roop.globals.gpu_threads = int(args.gpu_threads)
# gpu thread fix for amd
if args.gpu_vendor == 'amd':
roop.globals.gpu_threads = 1
if args.gpu_vendor:
roop.globals.gpu_vendor = args.gpu_vendor
else:
roop.globals.providers = ['CPUExecutionProvider']
sep = "/" sep = "/"
if os.name == "nt": if os.name == "nt":
sep = "\\" sep = "\\"
def limit_resources(): def limit_resources():
if args['max_memory']: # prevent tensorflow memory leak
memory = args['max_memory'] * 1024 * 1024 * 1024 gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True)
if args.max_memory:
memory = args.max_memory * 1024 * 1024 * 1024
if str(platform.system()).lower() == 'windows': if str(platform.system()).lower() == 'windows':
import ctypes import ctypes
kernel32 = ctypes.windll.kernel32 kernel32 = ctypes.windll.kernel32
@ -74,147 +90,107 @@ def pre_check():
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx') model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx')
if not os.path.isfile(model_path): if not os.path.isfile(model_path):
quit('File "inswapper_128.onnx" does not exist!') quit('File "inswapper_128.onnx" does not exist!')
if '--gpu' in sys.argv: if roop.globals.gpu_vendor == 'apple':
NVIDIA_PROVIDERS = ['CUDAExecutionProvider', 'TensorrtExecutionProvider'] if 'CoreMLExecutionProvider' not in roop.globals.providers:
if len(list(set(roop.globals.providers) - set(NVIDIA_PROVIDERS))) == 1: quit("You are using --gpu=apple flag but CoreML isn't available or properly installed on your system.")
CUDA_VERSION = torch.version.cuda if roop.globals.gpu_vendor == 'amd':
CUDNN_VERSION = torch.backends.cudnn.version() if 'ROCMExecutionProvider' not in roop.globals.providers:
if not torch.cuda.is_available() or not CUDA_VERSION: quit("You are using --gpu=amd flag but ROCM isn't available or properly installed on your system.")
quit("You are using --gpu flag but CUDA isn't available or properly installed on your system.") if roop.globals.gpu_vendor == 'nvidia':
if CUDA_VERSION > '11.8': CUDA_VERSION = torch.version.cuda
quit(f"CUDA version {CUDA_VERSION} is not supported - please downgrade to 11.8") CUDNN_VERSION = torch.backends.cudnn.version()
if CUDA_VERSION < '11.4': if not torch.cuda.is_available():
quit(f"CUDA version {CUDA_VERSION} is not supported - please upgrade to 11.8") quit("You are using --gpu=nvidia flag but CUDA isn't available or properly installed on your system.")
if CUDNN_VERSION < 8220: if CUDA_VERSION > '11.8':
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please upgrade to 8.9.1") quit(f"CUDA version {CUDA_VERSION} is not supported - please downgrade to 11.8")
if CUDNN_VERSION > 8910: if CUDA_VERSION < '11.4':
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please downgrade to 8.9.1") quit(f"CUDA version {CUDA_VERSION} is not supported - please upgrade to 11.8")
else: if CUDNN_VERSION < 8220:
roop.globals.providers = ['CPUExecutionProvider'] quit(f"CUDNN version {CUDNN_VERSION} is not supported - please upgrade to 8.9.1")
if '--all-faces' in sys.argv or '-a' in sys.argv: if CUDNN_VERSION > 8910:
roop.globals.all_faces = True quit(f"CUDNN version {CUDNN_VERSION} is not supported - please downgrade to 8.9.1")
def start_processing(): def get_video_frame(video_path, frame_number = 1):
frame_paths = args["frame_paths"] cap = cv2.VideoCapture(video_path)
n = len(frame_paths) // (args['cores_count']) amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
# single thread cap.set(cv2.CAP_PROP_POS_FRAMES, min(amount_of_frames, frame_number-1))
if args['gpu'] or n < 2: if not cap.isOpened():
process_video(args['source_img'], args["frame_paths"]) print("Error opening video file")
return return
# multithread if total frames to cpu cores ratio is greater than 2 ret, frame = cap.read()
if n > 2: if ret:
processes = [] return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
for i in range(0, len(frame_paths), n):
p = pool.apply_async(process_video, args=(args['source_img'], frame_paths[i:i+n],))
processes.append(p)
for p in processes:
p.get()
pool.close()
pool.join()
cap.release()
def preview_image(image_path):
img = Image.open(image_path)
img = img.resize((180, 180), Image.ANTIALIAS)
photo_img = ImageTk.PhotoImage(img)
left_frame = tk.Frame(window)
left_frame.place(x=60, y=100)
img_label = tk.Label(left_frame, image=photo_img)
img_label.image = photo_img
img_label.pack()
def preview_video(video_path): def preview_video(video_path):
cap = cv2.VideoCapture(video_path) cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): if not cap.isOpened():
print("Error opening video file") print("Error opening video file")
return return 0
amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
ret, frame = cap.read() ret, frame = cap.read()
if ret: if ret:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame = get_video_frame(video_path)
img = Image.fromarray(frame)
img = img.resize((180, 180), Image.ANTIALIAS)
photo_img = ImageTk.PhotoImage(img)
right_frame = tk.Frame(window)
right_frame.place(x=360, y=100)
img_label = tk.Label(right_frame, image=photo_img)
img_label.image = photo_img
img_label.pack()
cap.release() cap.release()
return (amount_of_frames, frame)
def select_face():
args['source_img'] = filedialog.askopenfilename(title="Select a face")
preview_image(args['source_img'])
def select_target():
args['target_path'] = filedialog.askopenfilename(title="Select a target")
threading.Thread(target=preview_video, args=(args['target_path'],)).start()
def toggle_fps_limit():
args['keep_fps'] = int(limit_fps.get() != True)
def toggle_all_faces():
roop.globals.all_faces = True if all_faces.get() == 1 else False
def toggle_keep_frames():
args['keep_frames'] = int(keep_frames.get())
def save_file():
filename, ext = 'output.mp4', '.mp4'
if is_img(args['target_path']):
filename, ext = 'output.png', '.png'
args['output_file'] = asksaveasfilename(initialfile=filename, defaultextension=ext, filetypes=[("All Files","*.*"),("Videos","*.mp4")])
def status(string): def status(string):
value = "Status: " + string
if 'cli_mode' in args: if 'cli_mode' in args:
print("Status: " + string) print(value)
else: else:
status_label["text"] = "Status: " + string ui.update_status_label(value)
window.update()
def start(): def process_video_multi_cores(source_img, frame_paths):
if not args['source_img'] or not os.path.isfile(args['source_img']): n = len(frame_paths) // roop.globals.cpu_cores
if n > 2:
processes = []
for i in range(0, len(frame_paths), n):
p = POOL.apply_async(process_video, args=(source_img, frame_paths[i:i + n],))
processes.append(p)
for p in processes:
p.get()
POOL.close()
POOL.join()
def start(preview_callback = None):
if not args.source_img or not os.path.isfile(args.source_img):
print("\n[WARNING] Please select an image containing a face.") print("\n[WARNING] Please select an image containing a face.")
return return
elif not args['target_path'] or not os.path.isfile(args['target_path']): elif not args.target_path or not os.path.isfile(args.target_path):
print("\n[WARNING] Please select a video/image to swap face in.") print("\n[WARNING] Please select a video/image to swap face in.")
return return
if not args['output_file']: if not args.output_file:
target_path = args['target_path'] target_path = args.target_path
args['output_file'] = rreplace(target_path, "/", "/swapped-", 1) if "/" in target_path else "swapped-" + target_path args.output_file = rreplace(target_path, "/", "/swapped-", 1) if "/" in target_path else "swapped-" + target_path
global pool target_path = args.target_path
pool = mp.Pool(args['cores_count']) test_face = get_face_single(cv2.imread(args.source_img))
target_path = args['target_path']
test_face = get_face_single(cv2.imread(args['source_img']))
if not test_face: if not test_face:
print("\n[WARNING] No face detected in source image. Please try with another one.\n") print("\n[WARNING] No face detected in source image. Please try with another one.\n")
return return
if is_img(target_path): if is_img(target_path):
if predict_image(target_path) > 0.85: if predict_image(target_path) > 0.85:
quit() quit()
process_img(args['source_img'], target_path, args['output_file']) process_img(args.source_img, target_path, args.output_file)
status("swap successful!") status("swap successful!")
return return
# seconds, probabilities = predict_video_frames(video_path=args['target_path'], frame_interval=100) # seconds, probabilities = predict_video_frames(video_path=args.target_path, frame_interval=100)
# if any(probability > 0.85 for probability in probabilities): # if any(probability > 0.85 for probability in probabilities):
# quit() # quit()
video_name_full = target_path.split("/")[-1] video_name_full = target_path.split("/")[-1]
video_name = os.path.splitext(video_name_full)[0] video_name = os.path.splitext(video_name_full)[0]
output_dir = os.path.dirname(target_path) + "/" + video_name output_dir = os.path.dirname(target_path) + "/" + video_name if os.path.dirname(target_path) else video_name
Path(output_dir).mkdir(exist_ok=True) Path(output_dir).mkdir(exist_ok=True)
status("detecting video's FPS...") status("detecting video's FPS...")
fps, exact_fps = detect_fps(target_path) fps, exact_fps = detect_fps(target_path)
if not args['keep_fps'] and fps > 30: if not args.keep_fps and fps > 30:
this_path = output_dir + "/" + video_name + ".mp4" this_path = output_dir + "/" + video_name + ".mp4"
set_fps(target_path, this_path, 30) set_fps(target_path, this_path, 30)
target_path, exact_fps = this_path, 30 target_path, exact_fps = this_path, 30
@ -222,71 +198,83 @@ def start():
shutil.copy(target_path, output_dir) shutil.copy(target_path, output_dir)
status("extracting frames...") status("extracting frames...")
extract_frames(target_path, output_dir) extract_frames(target_path, output_dir)
args['frame_paths'] = tuple(sorted( args.frame_paths = tuple(sorted(
glob.glob(output_dir + "/*.png"), glob.glob(output_dir + "/*.png"),
key=lambda x: int(x.split(sep)[-1].replace(".png", "")) key=lambda x: int(x.split(sep)[-1].replace(".png", ""))
)) ))
status("swapping in progress...") status("swapping in progress...")
start_processing() if roop.globals.gpu_vendor is None and roop.globals.cpu_cores > 1:
global POOL
POOL = mp.Pool(roop.globals.cpu_cores)
process_video_multi_cores(args.source_img, args.frame_paths)
else:
process_video(args.source_img, args.frame_paths)
status("creating video...") status("creating video...")
create_video(video_name, exact_fps, output_dir) create_video(video_name, exact_fps, output_dir)
status("adding audio...") status("adding audio...")
add_audio(output_dir, target_path, video_name_full, args['keep_frames'], args['output_file']) add_audio(output_dir, target_path, video_name_full, args.keep_frames, args.output_file)
save_path = args['output_file'] if args['output_file'] else output_dir + "/" + video_name + ".mp4" save_path = args.output_file if args.output_file else output_dir + "/" + video_name + ".mp4"
print("\n\nVideo saved as:", save_path, "\n\n") print("\n\nVideo saved as:", save_path, "\n\n")
status("swap successful!") status("swap successful!")
def select_face_handler(path: str):
args.source_img = path
def select_target_handler(path: str):
args.target_path = path
return preview_video(args.target_path)
def toggle_all_faces_handler(value: int):
roop.globals.all_faces = True if value == 1 else False
def toggle_fps_limit_handler(value: int):
args.keep_fps = int(value != 1)
def toggle_keep_frames_handler(value: int):
args.keep_frames = value
def save_file_handler(path: str):
args.output_file = path
def create_test_preview(frame_number):
return process_faces(
get_face_single(cv2.imread(args.source_img)),
get_video_frame(args.target_path, frame_number)
)
def run(): def run():
global all_faces, keep_frames, limit_fps, status_label, window global all_faces, keep_frames, limit_fps
pre_check() pre_check()
limit_resources() limit_resources()
if args.source_img:
if args['source_img']: args.cli_mode = True
args['cli_mode'] = True
start() start()
quit() quit()
window = tk.Tk()
window.geometry("600x700")
window.title("roop")
window.configure(bg="#2d3436")
window.resizable(width=False, height=False)
# Contact information window = ui.init(
support_link = tk.Label(window, text="Donate to project <3", fg="#fd79a8", bg="#2d3436", cursor="hand2", font=("Arial", 8)) {
support_link.place(x=180,y=20,width=250,height=30) 'all_faces': roop.globals.all_faces,
support_link.bind("<Button-1>", lambda e: webbrowser.open("https://github.com/sponsors/s0md3v")) 'keep_fps': args.keep_fps,
'keep_frames': args.keep_frames
# Select a face button },
face_button = tk.Button(window, text="Select a face", command=select_face, bg="#2d3436", fg="#74b9ff", highlightthickness=4, relief="flat", highlightbackground="#74b9ff", activebackground="#74b9ff", borderwidth=4) select_face_handler,
face_button.place(x=60,y=320,width=180,height=80) select_target_handler,
toggle_all_faces_handler,
# Select a target button toggle_fps_limit_handler,
target_button = tk.Button(window, text="Select a target", command=select_target, bg="#2d3436", fg="#74b9ff", highlightthickness=4, relief="flat", highlightbackground="#74b9ff", activebackground="#74b9ff", borderwidth=4) toggle_keep_frames_handler,
target_button.place(x=360,y=320,width=180,height=80) save_file_handler,
start,
# All faces checkbox get_video_frame,
all_faces = tk.IntVar() create_test_preview
all_faces_checkbox = tk.Checkbutton(window, anchor="w", relief="groove", activebackground="#2d3436", activeforeground="#74b9ff", selectcolor="black", text="Process all faces in frame", fg="#dfe6e9", borderwidth=0, highlightthickness=0, bg="#2d3436", variable=all_faces, command=toggle_all_faces) )
all_faces_checkbox.place(x=60,y=500,width=240,height=31)
# FPS limit checkbox
limit_fps = tk.IntVar(None, not args['keep_fps'])
fps_checkbox = tk.Checkbutton(window, anchor="w", relief="groove", activebackground="#2d3436", activeforeground="#74b9ff", selectcolor="black", text="Limit FPS to 30", fg="#dfe6e9", borderwidth=0, highlightthickness=0, bg="#2d3436", variable=limit_fps, command=toggle_fps_limit)
fps_checkbox.place(x=60,y=475,width=240,height=31)
# Keep frames checkbox
keep_frames = tk.IntVar(None, args['keep_frames'])
frames_checkbox = tk.Checkbutton(window, anchor="w", relief="groove", activebackground="#2d3436", activeforeground="#74b9ff", selectcolor="black", text="Keep frames dir", fg="#dfe6e9", borderwidth=0, highlightthickness=0, bg="#2d3436", variable=keep_frames, command=toggle_keep_frames)
frames_checkbox.place(x=60,y=450,width=240,height=31)
# Start button
start_button = tk.Button(window, text="Start", bg="#f1c40f", relief="flat", borderwidth=0, highlightthickness=0, command=lambda: [save_file(), start()])
start_button.place(x=240,y=560,width=120,height=49)
# Status label
status_label = tk.Label(window, width=580, justify="center", text="Status: waiting for input...", fg="#2ecc71", bg="#2d3436")
status_label.place(x=10,y=640,width=580,height=30)
window.mainloop() window.mainloop()

View File

@ -1,7 +1,10 @@
import onnxruntime import onnxruntime
use_gpu = False all_faces = None
all_faces = False log_level = 'error'
cpu_cores = None
gpu_threads = None
gpu_vendor = None
providers = onnxruntime.get_available_providers() providers = onnxruntime.get_available_providers()
if 'TensorrtExecutionProvider' in providers: if 'TensorrtExecutionProvider' in providers:

View File

@ -1,18 +1,22 @@
import os import os
from tqdm import tqdm from tqdm import tqdm
import cv2 import cv2
import insightface import insightface
import threading
import roop.globals import roop.globals
from roop.analyser import get_face_single, get_face_many from roop.analyser import get_face_single, get_face_many
FACE_SWAPPER = None FACE_SWAPPER = None
THREAD_LOCK = threading.Lock()
def get_face_swapper(): def get_face_swapper():
global FACE_SWAPPER global FACE_SWAPPER
if FACE_SWAPPER is None: with THREAD_LOCK:
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx') if FACE_SWAPPER is None:
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=roop.globals.providers) model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx')
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=roop.globals.providers)
return FACE_SWAPPER return FACE_SWAPPER
@ -22,41 +26,57 @@ def swap_face_in_frame(source_face, target_face, frame):
return frame return frame
def process_faces(source_face, frame, progress, all_faces=False): def process_faces(source_face, target_frame):
if all_faces: if roop.globals.all_faces:
many_faces = get_face_many(frame) many_faces = get_face_many(target_frame)
if many_faces: if many_faces:
for face in many_faces: for face in many_faces:
frame = swap_face_in_frame(source_face, face, frame) target_frame = swap_face_in_frame(source_face, face, target_frame)
progress.set_postfix(status='.', refresh=True)
else:
progress.set_postfix(status='S', refresh=True)
else: else:
face = get_face_single(frame) face = get_face_single(target_frame)
if face: if face:
frame = swap_face_in_frame(source_face, face, frame) target_frame = swap_face_in_frame(source_face, face, target_frame)
progress.set_postfix(status='.', refresh=True) return target_frame
else:
progress.set_postfix(status='S', refresh=True)
return frame
def process_video(source_img, frame_paths): def process_frames(source_img, frame_paths, progress=None):
source_face = get_face_single(cv2.imread(source_img)) source_face = get_face_single(cv2.imread(source_img))
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' for frame_path in frame_paths:
frame = cv2.imread(frame_path)
with tqdm(total=len(frame_paths), desc="Processing", unit="frame", dynamic_ncols=True, bar_format=progress_bar_format) as progress: try:
for frame_path in frame_paths: result = process_faces(source_face, frame)
frame = cv2.imread(frame_path) cv2.imwrite(frame_path, result)
try: except Exception as exception:
result = process_faces(source_face, frame, progress, roop.globals.all_faces) print(exception)
cv2.imwrite(frame_path, result) pass
except Exception: if progress:
progress.set_postfix(status='E', refresh=True)
pass
progress.update(1) progress.update(1)
def multi_process_frame(source_img, frame_paths, progress):
threads = []
num_threads = roop.globals.gpu_threads
num_frames_per_thread = len(frame_paths) // num_threads
remaining_frames = len(frame_paths) % num_threads
# create thread and launch
start_index = 0
for _ in range(num_threads):
end_index = start_index + num_frames_per_thread
if remaining_frames > 0:
end_index += 1
remaining_frames -= 1
thread_frame_paths = frame_paths[start_index:end_index]
thread = threading.Thread(target=process_frames, args=(source_img, thread_frame_paths, progress))
threads.append(thread)
thread.start()
start_index = end_index
# threading
for thread in threads:
thread.join()
def process_img(source_img, target_path, output_file): def process_img(source_img, target_path, output_file):
frame = cv2.imread(target_path) frame = cv2.imread(target_path)
face = get_face_single(frame) face = get_face_single(frame)
@ -64,3 +84,13 @@ def process_img(source_img, target_path, output_file):
result = get_face_swapper().get(frame, face, source_face, paste_back=True) result = get_face_swapper().get(frame, face, source_face, paste_back=True)
cv2.imwrite(output_file, result) cv2.imwrite(output_file, result)
print("\n\nImage saved as:", output_file, "\n\n") print("\n\nImage saved as:", output_file, "\n\n")
def process_video(source_img, frame_paths):
do_multi = roop.globals.gpu_vendor is not None and roop.globals.gpu_threads > 1
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
with tqdm(total=len(frame_paths), desc="Processing", unit="frame", dynamic_ncols=True, bar_format=progress_bar_format) as progress:
if do_multi:
multi_process_frame(source_img, frame_paths, progress)
else:
process_frames(source_img, frame_paths, progress)

315
roop/ui.py Normal file
View File

@ -0,0 +1,315 @@
import tkinter as tk
from typing import Any, Callable, Tuple
from PIL import Image, ImageTk
import webbrowser
from tkinter import filedialog
from tkinter.filedialog import asksaveasfilename
import threading
from roop.utils import is_img
max_preview_size = 800
def create_preview(parent):
global preview_image_frame, preview_frame_slider, test_button
preview_window = tk.Toplevel(parent)
# Override close button
preview_window.protocol("WM_DELETE_WINDOW", hide_preview)
preview_window.withdraw()
preview_window.title("Preview")
preview_window.configure(bg="red")
preview_window.resizable(width=False, height=False)
frame = tk.Frame(preview_window, background="#2d3436")
frame.pack(fill='both', side='left', expand='True')
# Preview image
preview_image_frame = tk.Label(frame)
preview_image_frame.pack(side='top')
# Bottom frame
buttons_frame = tk.Frame(frame, background="#2d3436")
buttons_frame.pack(fill='both', side='bottom')
current_frame = tk.IntVar()
preview_frame_slider = tk.Scale(
buttons_frame,
from_=0,
to=0,
orient='horizontal',
variable=current_frame
)
preview_frame_slider.pack(fill='both', side='left', expand='True')
test_button = tk.Button(buttons_frame, text="Test", bg="#f1c40f", relief="flat", width=15, borderwidth=0, highlightthickness=0)
test_button.pack(side='right', fill='y')
return preview_window
def show_preview():
preview.deiconify()
preview_visible.set(True)
def hide_preview():
preview.withdraw()
preview_visible.set(False)
def set_preview_handler(test_handler):
test_button.config(command = test_handler)
def init_slider(frames_count, change_handler):
preview_frame_slider.configure(to=frames_count, command=lambda value: change_handler(preview_frame_slider.get()))
preview_frame_slider.set(0)
def update_preview(frame):
img = Image.fromarray(frame)
width, height = img.size
aspect_ratio = 1
if width > height:
aspect_ratio = max_preview_size / width
else:
aspect_ratio = max_preview_size / height
img = img.resize(
(
int(width * aspect_ratio),
int(height * aspect_ratio)
),
Image.ANTIALIAS
)
photo_img = ImageTk.PhotoImage(img)
preview_image_frame.configure(image=photo_img)
preview_image_frame.image = photo_img
def select_face(select_face_handler: Callable[[str], None]):
if select_face_handler:
path = filedialog.askopenfilename(title="Select a face")
preview_face(path)
return select_face_handler(path)
return None
def update_slider_handler(get_video_frame, video_path):
return lambda frame_number: update_preview(get_video_frame(video_path, frame_number))
def test_preview(create_test_preview):
frame = create_test_preview(preview_frame_slider.get())
update_preview(frame)
def update_slider(get_video_frame, create_test_preview, video_path, frames_amount):
init_slider(frames_amount, update_slider_handler(get_video_frame, video_path))
set_preview_handler(lambda: preview_thread(lambda: test_preview(create_test_preview)))
def analyze_target(select_target_handler: Callable[[str], Tuple[int, Any]], target_path: tk.StringVar, frames_amount: tk.IntVar):
path = filedialog.askopenfilename(title="Select a target")
target_path.set(path)
amount, frame = select_target_handler(path)
frames_amount.set(amount)
preview_target(frame)
update_preview(frame)
def select_target(select_target_handler: Callable[[str], Tuple[int, Any]], target_path: tk.StringVar, frames_amount: tk.IntVar):
if select_target_handler:
analyze_target(select_target_handler, target_path, frames_amount)
def save_file(save_file_handler: Callable[[str], None], target_path: str):
filename, ext = 'output.mp4', '.mp4'
if is_img(target_path):
filename, ext = 'output.png', '.png'
if save_file_handler:
return save_file_handler(asksaveasfilename(initialfile=filename, defaultextension=ext, filetypes=[("All Files","*.*"),("Videos","*.mp4")]))
return None
def toggle_all_faces(toggle_all_faces_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_all_faces_handler:
return lambda: toggle_all_faces_handler(variable.get())
return None
def toggle_fps_limit(toggle_all_faces_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_all_faces_handler:
return lambda: toggle_all_faces_handler(variable.get())
return None
def toggle_keep_frames(toggle_keep_frames_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_keep_frames_handler:
return lambda: toggle_keep_frames_handler(variable.get())
return None
def create_button(parent, text, command):
return tk.Button(
parent,
text=text,
command=command,
bg="#f1c40f",
relief="flat",
borderwidth=0,
highlightthickness=0
)
def create_background_button(parent, text, command):
button = create_button(parent, text, command)
button.configure(
bg="#2d3436",
fg="#74b9ff",
highlightthickness=4,
highlightbackground="#74b9ff",
activebackground="#74b9ff",
borderwidth=4
)
return button
def create_check(parent, text, variable, command):
return tk.Checkbutton(
parent,
anchor="w",
relief="groove",
activebackground="#2d3436",
activeforeground="#74b9ff",
selectcolor="black",
text=text,
fg="#dfe6e9",
borderwidth=0,
highlightthickness=0,
bg="#2d3436",
variable=variable,
command=command
)
def preview_thread(thread_function):
threading.Thread(target=thread_function).start()
def open_preview_window(get_video_frame, target_path):
if preview_visible.get():
hide_preview()
else:
show_preview()
if target_path:
frame = get_video_frame(target_path)
update_preview(frame)
def preview_face(path):
img = Image.open(path)
img = img.resize((180, 180), Image.ANTIALIAS)
photo_img = ImageTk.PhotoImage(img)
face_label.configure(image=photo_img)
face_label.image = photo_img
def preview_target(frame):
img = Image.fromarray(frame)
img = img.resize((180, 180), Image.ANTIALIAS)
photo_img = ImageTk.PhotoImage(img)
target_label.configure(image=photo_img)
target_label.image = photo_img
def update_status_label(value):
status_label["text"] = value
window.update()
def init(
initial_values: dict,
select_face_handler: Callable[[str], None],
select_target_handler: Callable[[str], Tuple[int, Any]],
toggle_all_faces_handler: Callable[[int], None],
toggle_fps_limit_handler: Callable[[int], None],
toggle_keep_frames_handler: Callable[[int], None],
save_file_handler: Callable[[str], None],
start: Callable[[], None],
get_video_frame: Callable[[str, int], None],
create_test_preview: Callable[[int], Any],
):
global window, preview, preview_visible, face_label, target_label, status_label
window = tk.Tk()
window.geometry("600x700")
window.title("roop")
window.configure(bg="#2d3436")
window.resizable(width=False, height=False)
preview_visible = tk.BooleanVar(window, False)
target_path = tk.StringVar()
frames_amount = tk.IntVar()
# Preview window
preview = create_preview(window)
# Contact information
support_link = tk.Label(window, text="Donate to project <3", fg="#fd79a8", bg="#2d3436", cursor="hand2", font=("Arial", 8))
support_link.place(x=180,y=20,width=250,height=30)
support_link.bind("<Button-1>", lambda e: webbrowser.open("https://github.com/sponsors/s0md3v"))
left_frame = tk.Frame(window)
left_frame.place(x=60, y=100, width=180, height=180)
face_label = tk.Label(left_frame)
face_label.pack(fill='both', side='top', expand=True)
right_frame = tk.Frame(window)
right_frame.place(x=360, y=100, width=180, height=180)
target_label = tk.Label(right_frame)
target_label.pack(fill='both', side='top', expand=True)
# Select a face button
face_button = create_background_button(window, "Select a face", lambda: [
select_face(select_face_handler)
])
face_button.place(x=60,y=320,width=180,height=80)
# Select a target button
target_button = create_background_button(window, "Select a target", lambda: [
select_target(select_target_handler, target_path, frames_amount),
update_slider(get_video_frame, create_test_preview, target_path.get(), frames_amount.get())
])
target_button.place(x=360,y=320,width=180,height=80)
# All faces checkbox
all_faces = tk.IntVar(None, initial_values['all_faces'])
all_faces_checkbox = create_check(window, "Process all faces in frame", all_faces, toggle_all_faces(toggle_all_faces_handler, all_faces))
all_faces_checkbox.place(x=60,y=500,width=240,height=31)
# FPS limit checkbox
limit_fps = tk.IntVar(None, not initial_values['keep_fps'])
fps_checkbox = create_check(window, "Limit FPS to 30", limit_fps, toggle_fps_limit(toggle_fps_limit_handler, limit_fps))
fps_checkbox.place(x=60,y=475,width=240,height=31)
# Keep frames checkbox
keep_frames = tk.IntVar(None, initial_values['keep_frames'])
frames_checkbox = create_check(window, "Keep frames dir", keep_frames, toggle_keep_frames(toggle_keep_frames_handler, keep_frames))
frames_checkbox.place(x=60,y=450,width=240,height=31)
# Start button
start_button = create_button(window, "Start", lambda: [save_file(save_file_handler, target_path.get()), preview_thread(lambda: start(update_preview))])
start_button.place(x=170,y=560,width=120,height=49)
# Preview button
preview_button = create_button(window, "Preview", lambda: open_preview_window(get_video_frame, target_path.get()))
preview_button.place(x=310,y=560,width=120,height=49)
# Status label
status_label = tk.Label(window, width=580, justify="center", text="Status: waiting for input...", fg="#2ecc71", bg="#2d3436")
status_label.place(x=10,y=640,width=580,height=30)
return window

View File

@ -1,5 +1,6 @@
import os import os
import shutil import shutil
import roop.globals
sep = "/" sep = "/"
if os.name == "nt": if os.name == "nt":
@ -29,26 +30,33 @@ def detect_fps(input_path):
return 30, 30 return 30, 30
def run_ffmpeg(args):
log_level = f'-loglevel {roop.globals.log_level}'
run_command(f'ffmpeg {log_level} {args}')
def set_fps(input_path, output_path, fps): def set_fps(input_path, output_path, fps):
input_path, output_path = path(input_path), path(output_path) input_path, output_path = path(input_path), path(output_path)
os.system(f'ffmpeg -i "{input_path}" -filter:v fps=fps={fps} "{output_path}" -loglevel error') run_ffmpeg(f'-i "{input_path}" -filter:v fps=fps={fps} "{output_path}"')
def create_video(video_name, fps, output_dir): def create_video(video_name, fps, output_dir):
hwaccel_option = '-hwaccel cuda' if roop.globals.gpu_vendor == 'nvidia' else ''
output_dir = path(output_dir) output_dir = path(output_dir)
os.system(f'ffmpeg -framerate "{fps}" -i "{output_dir}{sep}%04d.png" -c:v libx264 -crf 7 -pix_fmt yuv420p -y "{output_dir}{sep}output.mp4" -loglevel error') run_ffmpeg(f'{hwaccel_option} -framerate "{fps}" -i "{output_dir}{sep}%04d.png" -c:v libx264 -crf 7 -pix_fmt yuv420p -y "{output_dir}{sep}output.mp4"')
def extract_frames(input_path, output_dir): def extract_frames(input_path, output_dir):
hwaccel_option = '-hwaccel cuda' if roop.globals.gpu_vendor == 'nvidia' else ''
input_path, output_dir = path(input_path), path(output_dir) input_path, output_dir = path(input_path), path(output_dir)
os.system(f'ffmpeg -i "{input_path}" "{output_dir}{sep}%04d.png" -loglevel error') run_ffmpeg(f' {hwaccel_option} -i "{input_path}" "{output_dir}{sep}%04d.png"')
def add_audio(output_dir, target_path, video, keep_frames, output_file): def add_audio(output_dir, target_path, video, keep_frames, output_file):
video_name = os.path.splitext(video)[0] video_name = os.path.splitext(video)[0]
save_to = output_file if output_file else output_dir + "/swapped-" + video_name + ".mp4" save_to = output_file if output_file else output_dir + "/swapped-" + video_name + ".mp4"
save_to_ff, output_dir_ff = path(save_to), path(output_dir) save_to_ff, output_dir_ff = path(save_to), path(output_dir)
os.system(f'ffmpeg -i "{output_dir_ff}{sep}output.mp4" -i "{output_dir_ff}{sep}{video}" -c:v copy -map 0:v:0 -map 1:a:0 -y "{save_to_ff}" -loglevel error') run_ffmpeg(f'-i "{output_dir_ff}{sep}output.mp4" -i "{output_dir_ff}{sep}{video}" -c:v copy -map 0:v:0 -map 1:a:0 -y "{save_to_ff}"')
if not os.path.isfile(save_to): if not os.path.isfile(save_to):
shutil.move(output_dir + "/output.mp4", save_to) shutil.move(output_dir + "/output.mp4", save_to)
if not keep_frames: if not keep_frames: