Merge pull request #371 from s0md3v/ui-refactoring

UI refactoring
This commit is contained in:
Henry Ruhs 2023-06-07 15:46:42 +02:00 committed by GitHub
commit 1ac99abff3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 326 additions and 439 deletions

View File

@ -1,10 +1,11 @@
from typing import Any
import insightface
import roop.globals
FACE_ANALYSER = None
def get_face_analyser():
def get_face_analyser() -> Any:
global FACE_ANALYSER
if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=roop.globals.providers)
@ -12,16 +13,16 @@ def get_face_analyser():
return FACE_ANALYSER
def get_face_single(img_data):
face = get_face_analyser().get(img_data)
def get_one_face(image_data) -> Any:
face = get_face_analyser().get(image_data)
try:
return sorted(face, key=lambda x: x.bbox[0])[0]
except IndexError:
return min(face, key=lambda x: x.bbox[0])
except ValueError:
return None
def get_face_many(img_data):
def get_many_faces(image_data) -> Any:
try:
return get_face_analyser().get(img_data)
return get_face_analyser().get(image_data)
except IndexError:
return None

12
roop/capturer.py Normal file
View File

@ -0,0 +1,12 @@
import cv2
def get_video_frame(video_path: str, frame_number: int = 1):
capture = cv2.VideoCapture(video_path)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_frame, frame = capture.read()
capture.release()
if has_frame:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return None

View File

@ -2,10 +2,11 @@
import os
import sys
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# single thread doubles performance of gpu-mode - needs to be set before torch import
if any(arg.startswith('--gpu-vendor') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
from typing import List
import platform
@ -20,36 +21,44 @@ from opennsfw2 import predict_video_frames, predict_image
import cv2
import roop.globals
from roop.swapper import process_video, process_img, process_faces
from roop.utilities import has_image_extention, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frames_paths, restore_audio, create_temp, move_temp, clean_temp
from roop.analyser import get_face_single
import roop.ui as ui
from roop.swapper import process_video, process_img
from roop.utilities import has_image_extention, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frames_paths, restore_audio, create_temp, move_temp, clean_temp
from roop.analyser import get_one_face
if 'ROCMExecutionProvider' in roop.globals.providers:
del torch
warnings.simplefilter(action='ignore', category=FutureWarning)
def handle_parse():
global args
def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--face', help='use this face', dest='source_path')
parser.add_argument('-t', '--target', help='replace this face', dest='target_path')
parser.add_argument('-f', '--face', help='use a face image', dest='source_path')
parser.add_argument('-t', '--target', help='replace image or video with face', dest='target_path')
parser.add_argument('-o', '--output', help='save output to this file', dest='output_path')
parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False)
parser.add_argument('--keep-audio', help='maintain original audio', dest='keep_audio', action='store_true', default=True)
parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False)
parser.add_argument('--all-faces', help='swap all faces in frame', dest='all_faces', action='store_true', default=False)
parser.add_argument('--many-faces', help='swap every face in the frame', dest='many_faces', action='store_true', default=False)
parser.add_argument('--video-quality', help='adjust video quality of output file', dest='video_quality', type=int, default=10)
parser.add_argument('--max-memory', help='maximum amount of RAM in GB to be used', dest='max_memory', type=int)
parser.add_argument('--cpu-cores', help='number of CPU cores to use', dest='cpu_cores', type=int, default=max(psutil.cpu_count() / 2, 1))
parser.add_argument('--gpu-threads', help='number of threads to be use for the GPU', dest='gpu_threads', type=int, default=8)
parser.add_argument('--gpu-vendor', help='select your GPU vendor', dest='gpu_vendor', choices=['apple', 'amd', 'intel', 'nvidia'])
parser.add_argument('--gpu-vendor', help='select your GPU vendor', dest='gpu_vendor', choices=['apple', 'amd', 'nvidia'])
args = parser.parse_known_args()[0]
roop.globals.source_path = args.source_path
roop.globals.target_path = args.target_path
roop.globals.output_path = args.output_path
roop.globals.headless = args.source_path or args.target_path or args.output_path
roop.globals.keep_fps = args.keep_fps
roop.globals.keep_audio = args.keep_audio
roop.globals.keep_frames = args.keep_frames
roop.globals.all_faces = args.all_faces
roop.globals.many_faces = args.many_faces
roop.globals.video_quality = args.video_quality
if args.cpu_cores:
roop.globals.cpu_cores = int(args.cpu_cores)
@ -76,8 +85,8 @@ def limit_resources():
gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True)
if args.max_memory:
memory = args.max_memory * 1024 * 1024 * 1024
if roop.globals.max_memory:
memory = roop.globals.max_memory * 1024 * 1024 * 1024
if str(platform.system()).lower() == 'windows':
import ctypes
kernel32 = ctypes.windll.kernel32
@ -102,58 +111,22 @@ def pre_check():
if 'ROCMExecutionProvider' not in roop.globals.providers:
quit('You are using --gpu=amd flag but ROCM is not available or properly installed on your system.')
if roop.globals.gpu_vendor == 'nvidia':
CUDA_VERSION = torch.version.cuda
CUDNN_VERSION = torch.backends.cudnn.version()
if not torch.cuda.is_available():
quit('You are using --gpu=nvidia flag but CUDA is not available or properly installed on your system.')
if CUDA_VERSION > '11.8':
quit(f'CUDA version {CUDA_VERSION} is not supported - please downgrade to 11.8')
if CUDA_VERSION < '11.4':
quit(f'CUDA version {CUDA_VERSION} is not supported - please upgrade to 11.8')
if CUDNN_VERSION < 8220:
quit(f'CUDNN version {CUDNN_VERSION} is not supported - please upgrade to 8.9.1')
if CUDNN_VERSION > 8910:
quit(f'CUDNN version {CUDNN_VERSION} is not supported - please downgrade to 8.9.1')
def get_video_frame(video_path, frame_number = 1):
cap = cv2.VideoCapture(video_path)
amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
cap.set(cv2.CAP_PROP_POS_FRAMES, min(amount_of_frames, frame_number-1))
if not cap.isOpened():
status('Error opening video file')
return
ret, frame = cap.read()
if ret:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
cap.release()
def preview_video(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
status('Error opening video file')
return 0
amount_of_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
ret, frame = cap.read()
if ret:
frame = get_video_frame(video_path)
cap.release()
return (amount_of_frames, frame)
def status(message: str):
value = 'Status: ' + message
print(value)
if not roop.globals.headless:
ui.update_status_label(value)
if torch.version.cuda > '11.8':
quit(f'CUDA version {torch.version.cuda} is not supported - please downgrade to 11.8')
if torch.version.cuda < '11.4':
quit(f'CUDA version {torch.version.cuda} is not supported - please upgrade to 11.8')
if torch.backends.cudnn.version() < 8220:
quit(f'CUDNN version { torch.backends.cudnn.version()} is not supported - please upgrade to 8.9.1')
if torch.backends.cudnn.version() > 8910:
quit(f'CUDNN version { torch.backends.cudnn.version()} is not supported - please downgrade to 8.9.1')
def conditional_process_video(source_path: str, frame_paths: List[str]) -> None:
pool_amount = len(frame_paths) // roop.globals.cpu_cores
if pool_amount > 2 and roop.globals.cpu_cores > 1 and roop.globals.gpu_vendor is None:
status('Pool-Swapping in progress...')
update_status('Pool-Swapping in progress...')
global POOL
POOL = multiprocessing.Pool(roop.globals.cpu_cores, maxtasksperchild=1)
pools = []
@ -162,129 +135,89 @@ def conditional_process_video(source_path: str, frame_paths: List[str]) -> None:
pools.append(pool)
for pool in pools:
pool.get()
POOL.join()
POOL.close()
POOL.join()
else:
status('Swapping in progress...')
process_video(args.source_path, frame_paths)
update_status('Swapping in progress...')
process_video(roop.globals.source_path, frame_paths)
def start(preview_callback = None) -> None:
if not args.source_path or not os.path.isfile(args.source_path):
status('Please select an image containing a face.')
def update_status(message: str):
value = 'Status: ' + message
print(value)
if not roop.globals.headless:
ui.update_status(value)
def start() -> None:
if not roop.globals.source_path or not os.path.isfile(roop.globals.source_path):
update_status('Select an image that contains a face.')
return
elif not args.target_path or not os.path.isfile(args.target_path):
status('Please select a video/image target!')
elif not roop.globals.target_path or not os.path.isfile(roop.globals.target_path):
update_status('Select an image or video target!')
return
test_face = get_face_single(cv2.imread(args.source_path))
test_face = get_one_face(cv2.imread(roop.globals.source_path))
if not test_face:
status('No face detected in source image. Please try with another one!')
update_status('No face detected in source image. Please try with another one!')
return
# process image to image
if has_image_extention(args.target_path):
if predict_image(args.target_path) > 0.85:
if has_image_extention(roop.globals.target_path):
if predict_image(roop.globals.target_path) > 0.85:
destroy()
process_img(args.source_path, args.target_path, args.output_path)
if is_image(args.target_path):
status('Swapping to image succeed!')
process_img(roop.globals.source_path, roop.globals.target_path, roop.globals.output_path)
if is_image(roop.globals.target_path):
update_status('Swapping to image succeed!')
else:
status('Swapping to image failed!')
update_status('Swapping to image failed!')
return
# process image to videos
seconds, probabilities = predict_video_frames(video_path=args.target_path, frame_interval=100)
seconds, probabilities = predict_video_frames(video_path=roop.globals.target_path, frame_interval=100)
if any(probability > 0.85 for probability in probabilities):
destroy()
status('Creating temp resources...')
create_temp(args.target_path)
status('Extracting frames...')
extract_frames(args.target_path)
frame_paths = get_temp_frames_paths(args.target_path)
conditional_process_video(args.source_path, frame_paths)
update_status('Creating temp resources...')
create_temp(roop.globals.target_path)
update_status('Extracting frames...')
extract_frames(roop.globals.target_path)
frame_paths = get_temp_frames_paths(roop.globals.target_path)
conditional_process_video(roop.globals.source_path, frame_paths)
# prevent memory leak using ffmpeg with cuda
if args.gpu_vendor == 'nvidia':
if roop.globals.gpu_vendor == 'nvidia':
torch.cuda.empty_cache()
if roop.globals.keep_fps:
status('Detecting fps...')
fps = detect_fps(args.source_path)
status(f'Creating video with {fps} fps...')
create_video(args.target_path, fps)
update_status('Detecting fps...')
fps = detect_fps(roop.globals.source_path)
update_status(f'Creating video with {fps} fps...')
create_video(roop.globals.target_path, fps)
else:
status('Creating video with 30 fps...')
create_video(args.target_path, 30)
update_status('Creating video with 30 fps...')
create_video(roop.globals.target_path, 30)
if roop.globals.keep_audio:
if roop.globals.keep_fps:
status('Restoring audio...')
update_status('Restoring audio...')
else:
status('Restoring audio might cause issues as fps are not kept...')
restore_audio(args.target_path, args.output_path)
update_status('Restoring audio might cause issues as fps are not kept...')
restore_audio(roop.globals.target_path, roop.globals.output_path)
else:
move_temp(args.target_path, args.output_path)
clean_temp(args.target_path)
if is_video(args.target_path):
status('Swapping to video succeed!')
move_temp(roop.globals.target_path, roop.globals.output_path)
clean_temp(roop.globals.target_path)
if is_video(roop.globals.target_path):
update_status('Swapping to video succeed!')
else:
status('Swapping to video failed!')
def select_face_handler(path: str):
args.source_path = path
def select_target_handler(path: str):
args.target_path = path
return preview_video(args.target_path)
def toggle_all_faces_handler(value: int):
roop.globals.all_faces = True if value == 1 else False
def toggle_fps_limit_handler(value: int):
args.keep_fps = int(value != 1)
def toggle_keep_frames_handler(value: int):
args.keep_frames = value
def save_file_handler(path: str):
args.output_path = path
def create_test_preview(frame_number):
return process_faces(
get_face_single(cv2.imread(args.source_path)),
get_video_frame(args.target_path, frame_number)
)
update_status('Swapping to video failed!')
def destroy() -> None:
clean_temp(args.target_path)
if roop.globals.target_path:
clean_temp(roop.globals.target_path)
quit()
def run() -> None:
global all_faces, keep_frames, limit_fps
handle_parse()
parse_args()
pre_check()
limit_resources()
if roop.globals.headless:
start()
else:
window = ui.init(
{
'all_faces': args.all_faces,
'keep_fps': args.keep_fps,
'keep_frames': args.keep_frames
},
select_face_handler,
select_target_handler,
toggle_all_faces_handler,
toggle_fps_limit_handler,
toggle_keep_frames_handler,
save_file_handler,
start,
get_video_frame,
create_test_preview
)
window = ui.init(start, destroy)
window.mainloop()

View File

@ -1,12 +1,17 @@
import onnxruntime
source_path = None
target_path = None
output_path = None
keep_fps = None
keep_audio = None
keep_frames = None
all_faces = None
many_faces = None
video_quality = None
cpu_cores = None
gpu_threads = None
gpu_vendor = None
max_memory = None
headless = None
log_level = 'error'
providers = onnxruntime.get_available_providers()

View File

@ -5,7 +5,7 @@ import cv2
import insightface
import threading
import roop.globals
from roop.analyser import get_face_single, get_face_many
from roop.analyser import get_one_face, get_many_faces
FACE_SWAPPER = None
THREAD_LOCK = threading.Lock()
@ -27,20 +27,20 @@ def swap_face_in_frame(source_face, target_face, frame):
def process_faces(source_face, target_frame):
if roop.globals.all_faces:
many_faces = get_face_many(target_frame)
if roop.globals.many_faces:
many_faces = get_many_faces(target_frame)
if many_faces:
for face in many_faces:
target_frame = swap_face_in_frame(source_face, face, target_frame)
else:
face = get_face_single(target_frame)
face = get_one_face(target_frame)
if face:
target_frame = swap_face_in_frame(source_face, face, target_frame)
return target_frame
def process_frames(source_img, frame_paths, progress=None):
source_face = get_face_single(cv2.imread(source_img))
source_face = get_one_face(cv2.imread(source_img))
for frame_path in frame_paths:
frame = cv2.imread(frame_path)
try:
@ -77,9 +77,9 @@ def multi_process_frame(source_img, frame_paths, progress):
def process_img(source_img, target_path, output_file):
frame = cv2.imread(target_path)
face = get_face_single(frame)
source_face = get_face_single(cv2.imread(source_img))
result = get_face_swapper().get(frame, face, source_face, paste_back=True)
target_frame = get_one_face(frame)
source_face = get_one_face(cv2.imread(source_img))
result = get_face_swapper().get(frame, target_frame, source_face, paste_back=True)
cv2.imwrite(output_file, result)

View File

@ -1,303 +1,232 @@
import os
import tkinter as tk
from typing import Any, Callable, Tuple
from PIL import Image, ImageTk, ImageOps
import webbrowser
from tkinter import filedialog
from tkinter.filedialog import asksaveasfilename
import threading
from typing import Callable, Any, Tuple
from roop.utilities import is_image
import cv2
from PIL import Image, ImageTk, ImageOps
import roop.globals
from roop.analyser import get_one_face
from roop.capturer import get_video_frame
from roop.swapper import process_faces
from roop.utilities import is_image, is_video
max_preview_size = 800
PRIMARY_COLOR = '#2d3436'
SECONDARY_COLOR = '#74b9ff'
TERTIARY_COLOR = '#f1c40f'
ACCENT_COLOR = '#2ecc71'
WINDOW_HEIGHT = 700
WINDOW_WIDTH = 600
PREVIEW_HEIGHT = 700
PREVIEW_WIDTH = 1200
def create_preview(parent):
global preview_image_frame, preview_frame_slider, test_button
def init(start: Callable, destroy: Callable) -> tk.Tk:
global ROOT, PREVIEW
preview_window = tk.Toplevel(parent)
# Override close button
preview_window.protocol("WM_DELETE_WINDOW", hide_preview)
preview_window.withdraw()
preview_window.title("Preview")
preview_window.configure(bg="red")
preview_window.resizable(width=False, height=False)
ROOT = create_root(start, destroy)
PREVIEW = create_preview(ROOT)
frame = tk.Frame(preview_window, background="#2d3436")
frame.pack(fill='both', side='left', expand='True')
# Preview image
preview_image_frame = tk.Label(frame)
preview_image_frame.pack(side='top')
# Bottom frame
buttons_frame = tk.Frame(frame, background="#2d3436")
buttons_frame.pack(fill='both', side='bottom')
current_frame = tk.IntVar()
preview_frame_slider = tk.Scale(
buttons_frame,
from_=0,
to=0,
orient='horizontal',
variable=current_frame
)
preview_frame_slider.pack(fill='both', side='left', expand='True')
test_button = tk.Button(buttons_frame, text="Test", bg="#f1c40f", relief="flat", width=15, borderwidth=0, highlightthickness=0)
test_button.pack(side='right', fill='y')
return preview_window
return ROOT
def show_preview():
preview.deiconify()
preview_visible.set(True)
def create_root(start: Callable, destroy: Callable) -> tk.Tk:
global source_label, target_label, status_label
root = tk.Tk()
root.minsize(WINDOW_WIDTH, WINDOW_HEIGHT)
root.title('roop')
root.configure(bg=PRIMARY_COLOR)
root.option_add('*Font', ('Arial', 11))
root.protocol('WM_DELETE_WINDOW', lambda: destroy())
source_label = tk.Label(root, bg=PRIMARY_COLOR)
source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25)
target_label = tk.Label(root, bg=PRIMARY_COLOR)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25)
source_button = create_primary_button(root, 'Select a face', lambda: select_source_path())
source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1)
target_button = create_primary_button(root, 'Select a target', lambda: select_target_path())
target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
keep_fps_value = tk.BooleanVar(value=roop.globals.keep_fps)
keep_fps_checkbox = create_checkbox(root, 'Limit to 30 fps', keep_fps_value, lambda: setattr(roop.globals, 'keep_fps', not roop.globals.keep_fps))
keep_fps_checkbox.place(relx=0.1, rely=0.6)
keep_frames_value = tk.BooleanVar(value=roop.globals.keep_frames)
keep_frames_checkbox = create_checkbox(root, 'Keep frames dir', keep_frames_value, lambda: setattr(roop.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_checkbox.place(relx=0.1, rely=0.65)
keep_audio_value = tk.BooleanVar(value=roop.globals.keep_audio)
keep_audio_checkbox = create_checkbox(root, 'Keep original audio', keep_frames_value, lambda: setattr(roop.globals, 'keep_audio', keep_audio_value.get()))
keep_audio_checkbox.place(relx=0.6, rely=0.6)
many_faces_value = tk.BooleanVar(value=roop.globals.many_faces)
many_faces_checkbox = create_checkbox(root, 'Replace all faces', many_faces_value, lambda: setattr(roop.globals, 'many_faces', keep_audio_value.get()))
many_faces_checkbox.place(relx=0.6, rely=0.65)
start_button = create_secondary_button(root, 'Start', lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.75, relwidth=0.2, relheight=0.05)
stop_button = create_secondary_button(root, 'Destroy', lambda: destroy())
stop_button.place(relx=0.4, rely=0.75, relwidth=0.2, relheight=0.05)
preview_button = create_secondary_button(root, 'Preview', lambda: toggle_preview())
preview_button.place(relx=0.65, rely=0.75, relwidth=0.2, relheight=0.05)
status_label = tk.Label(root, justify='center', text='Status: None', fg=ACCENT_COLOR, bg=PRIMARY_COLOR)
status_label.place(relx=0.1, rely=0.9)
return root
def hide_preview():
def create_preview(parent) -> tk.Toplevel:
global preview_label, preview_scale
preview = tk.Toplevel(parent)
preview.withdraw()
preview_visible.set(False)
preview.title('Preview')
preview.configure(bg=PRIMARY_COLOR)
preview.option_add('*Font', ('Arial', 11))
preview.minsize(PREVIEW_WIDTH, PREVIEW_HEIGHT)
preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview())
preview_label = tk.Label(preview, bg=PRIMARY_COLOR)
preview_label.pack(fill='both', expand=True)
preview_scale = tk.Scale(preview, orient='horizontal', command=lambda frame_value: update_preview(int(frame_value)))
preview_scale.pack(fill='x')
return preview
def set_preview_handler(test_handler):
test_button.config(command = test_handler)
def init_slider(frames_count, change_handler):
preview_frame_slider.configure(to=frames_count, command=lambda value: change_handler(preview_frame_slider.get()))
preview_frame_slider.set(0)
def update_preview(frame):
img = Image.fromarray(frame)
img = ImageOps.contain(img, (max_preview_size, max_preview_size), Image.LANCZOS)
photo_img = ImageTk.PhotoImage(img)
preview_image_frame.configure(image=photo_img)
preview_image_frame.image = photo_img
def select_face(select_face_handler: Callable[[str], None]):
if select_face_handler:
path = filedialog.askopenfilename(title="Select a face")
preview_face(path)
return select_face_handler(path)
return None
def update_slider_handler(get_video_frame, video_path):
return lambda frame_number: update_preview(get_video_frame(video_path, frame_number))
def test_preview(create_test_preview):
frame = create_test_preview(preview_frame_slider.get())
update_preview(frame)
def update_slider(get_video_frame, create_test_preview, video_path, frames_amount):
init_slider(frames_amount, update_slider_handler(get_video_frame, video_path))
set_preview_handler(lambda: preview_thread(lambda: test_preview(create_test_preview)))
def analyze_target(select_target_handler: Callable[[str], Tuple[int, Any]], target_path: tk.StringVar, frames_amount: tk.IntVar):
path = filedialog.askopenfilename(title="Select a target")
target_path.set(path)
amount, frame = select_target_handler(path)
frames_amount.set(amount)
preview_target(frame)
update_preview(frame)
def select_target(select_target_handler: Callable[[str], Tuple[int, Any]], target_path: tk.StringVar, frames_amount: tk.IntVar):
if select_target_handler:
analyze_target(select_target_handler, target_path, frames_amount)
def save_file(save_file_handler: Callable[[str], None], target_path: str):
filename, ext = 'output.mp4', '.mp4'
if is_image(target_path):
filename, ext = 'output.png', '.png'
if save_file_handler:
return save_file_handler(asksaveasfilename(initialfile=filename, defaultextension=ext, filetypes=[("All Files","*.*"),("Videos","*.mp4")]))
return None
def toggle_all_faces(toggle_all_faces_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_all_faces_handler:
return lambda: toggle_all_faces_handler(variable.get())
return None
def toggle_fps_limit(toggle_all_faces_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_all_faces_handler:
return lambda: toggle_all_faces_handler(variable.get())
return None
def toggle_keep_frames(toggle_keep_frames_handler: Callable[[int], None], variable: tk.IntVar):
if toggle_keep_frames_handler:
return lambda: toggle_keep_frames_handler(variable.get())
return None
def create_button(parent, text, command):
def create_primary_button(parent: Any, text: str, command: Callable) -> tk.Button:
return tk.Button(
parent,
text=text,
command=command,
bg="#f1c40f",
relief="flat",
bg=PRIMARY_COLOR,
fg=SECONDARY_COLOR,
relief='flat',
highlightthickness=4,
highlightbackground=SECONDARY_COLOR,
activebackground=SECONDARY_COLOR,
borderwidth=4
)
def create_secondary_button(parent: Any, text: str, command: Callable) -> tk.Button:
return tk.Button(
parent,
text=text,
command=command,
bg=TERTIARY_COLOR,
relief='flat',
borderwidth=0,
highlightthickness=0
)
def create_background_button(parent, text, command):
button = create_button(parent, text, command)
button.configure(
bg="#2d3436",
fg="#74b9ff",
highlightthickness=4,
highlightbackground="#74b9ff",
activebackground="#74b9ff",
borderwidth=4
)
return button
def create_check(parent, text, variable, command):
def create_checkbox(parent: Any, text: str, variable: tk.BooleanVar, command: Callable) -> tk.Checkbutton:
return tk.Checkbutton(
parent,
anchor="w",
relief="groove",
activebackground="#2d3436",
activeforeground="#74b9ff",
selectcolor="black",
text=text,
fg="#dfe6e9",
borderwidth=0,
highlightthickness=0,
bg="#2d3436",
variable=variable,
command=command
command=command,
relief='flat',
bg=PRIMARY_COLOR,
activebackground=PRIMARY_COLOR,
activeforeground=SECONDARY_COLOR,
selectcolor=PRIMARY_COLOR,
fg=SECONDARY_COLOR,
borderwidth=0,
highlightthickness=0
)
def preview_thread(thread_function):
threading.Thread(target=thread_function).start()
def update_status(text: str) -> None:
status_label['text'] = text
ROOT.update()
def open_preview_window(get_video_frame, target_path):
if preview_visible.get():
hide_preview()
def select_source_path():
source_path = filedialog.askopenfilename(title='Select an face image')
if is_image(source_path):
roop.globals.source_path = source_path
image = render_image_preview(roop.globals.source_path, (200, 200))
source_label.configure(image=image)
source_label.image = image
else:
show_preview()
if target_path:
frame = get_video_frame(target_path)
update_preview(frame)
roop.globals.source_path = None
source_label.configure(image=None)
source_label.image = None
def preview_face(path):
img = Image.open(path)
img = ImageOps.fit(img, (180, 180), Image.LANCZOS)
photo_img = ImageTk.PhotoImage(img)
face_label.configure(image=photo_img)
face_label.image = photo_img
def select_target_path():
target_path = filedialog.askopenfilename(title='Select an image or video target')
if is_image(target_path):
roop.globals.target_path = target_path
image = render_image_preview(roop.globals.target_path)
target_label.configure(image=image)
target_label.image = image
elif is_video(target_path):
roop.globals.target_path = target_path
video_frame = render_video_preview(target_path, (200, 200))
target_label.configure(image=video_frame)
target_label.image = video_frame
else:
roop.globals.target_path = None
target_label.configure(image=None)
target_label.image = None
def preview_target(frame):
img = Image.fromarray(frame)
img = ImageOps.fit(img, (180, 180), Image.LANCZOS)
photo_img = ImageTk.PhotoImage(img)
target_label.configure(image=photo_img)
target_label.image = photo_img
def select_output_path(start):
output_path = filedialog.asksaveasfilename(title='Save to output file', initialfile='output.mp4')
if output_path:
roop.globals.output_path = output_path
start()
def update_status_label(value):
status_label["text"] = value
window.update()
def render_image_preview(image_path: str, dimensions: Tuple[int, int] = None) -> ImageTk.PhotoImage:
image = Image.open(image_path)
if dimensions:
image = ImageOps.fit(image, dimensions, Image.LANCZOS)
return ImageTk.PhotoImage(image)
def init(
initial_values: dict,
select_face_handler: Callable[[str], None],
select_target_handler: Callable[[str], Tuple[int, Any]],
toggle_all_faces_handler: Callable[[int], None],
toggle_fps_limit_handler: Callable[[int], None],
toggle_keep_frames_handler: Callable[[int], None],
save_file_handler: Callable[[str], None],
start: Callable[[], None],
get_video_frame: Callable[[str, int], None],
create_test_preview: Callable[[int], Any],
):
global window, preview, preview_visible, face_label, target_label, status_label
def render_video_preview(video_path: str, dimensions: Tuple[int, int] = None, frame_number: int = 1) -> ImageTk.PhotoImage:
capture = cv2.VideoCapture(video_path)
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if dimensions:
image = ImageOps.fit(image, dimensions, Image.LANCZOS)
return ImageTk.PhotoImage(image)
capture.release()
cv2.destroyAllWindows()
window = tk.Tk()
window.geometry("600x700")
window.title("roop")
window.configure(bg="#2d3436")
window.resizable(width=False, height=False)
preview_visible = tk.BooleanVar(window, False)
target_path = tk.StringVar()
frames_amount = tk.IntVar()
def toggle_preview() -> None:
if PREVIEW.state() == 'normal':
PREVIEW.withdraw()
else:
update_preview(1)
PREVIEW.deiconify()
# Preview window
preview = create_preview(window)
# Contact information
support_link = tk.Label(window, text="Donate to project <3", fg="#fd79a8", bg="#2d3436", cursor="hand2", font=("Arial", 8))
support_link.place(x=180,y=20,width=250,height=30)
support_link.bind("<Button-1>", lambda e: webbrowser.open("https://github.com/sponsors/s0md3v"))
left_frame = tk.Frame(window)
left_frame.place(x=60, y=100, width=180, height=180)
face_label = tk.Label(left_frame)
face_label.pack(fill='both', side='top', expand=True)
right_frame = tk.Frame(window)
right_frame.place(x=360, y=100, width=180, height=180)
target_label = tk.Label(right_frame)
target_label.pack(fill='both', side='top', expand=True)
# Select a face button
face_button = create_background_button(window, "Select a face", lambda: [
select_face(select_face_handler)
])
face_button.place(x=60,y=320,width=180,height=80)
# Select a target button
target_button = create_background_button(window, "Select a target", lambda: [
select_target(select_target_handler, target_path, frames_amount),
update_slider(get_video_frame, create_test_preview, target_path.get(), frames_amount.get())
])
target_button.place(x=360,y=320,width=180,height=80)
# All faces checkbox
all_faces = tk.IntVar(None, initial_values['all_faces'])
all_faces_checkbox = create_check(window, "Process all faces in frame", all_faces, toggle_all_faces(toggle_all_faces_handler, all_faces))
all_faces_checkbox.place(x=60,y=500,width=240,height=31)
# FPS limit checkbox
limit_fps = tk.IntVar(None, not initial_values['keep_fps'])
fps_checkbox = create_check(window, "Limit FPS to 30", limit_fps, toggle_fps_limit(toggle_fps_limit_handler, limit_fps))
fps_checkbox.place(x=60,y=475,width=240,height=31)
# Keep frames checkbox
keep_frames = tk.IntVar(None, initial_values['keep_frames'])
frames_checkbox = create_check(window, "Keep frames dir", keep_frames, toggle_keep_frames(toggle_keep_frames_handler, keep_frames))
frames_checkbox.place(x=60,y=450,width=240,height=31)
# Start button
start_button = create_button(window, "Start", lambda: [save_file(save_file_handler, target_path.get()), preview_thread(lambda: start(update_preview))])
start_button.place(x=170,y=560,width=120,height=49)
# Preview button
preview_button = create_button(window, "Preview", lambda: open_preview_window(get_video_frame, target_path.get()))
preview_button.place(x=310,y=560,width=120,height=49)
# Status label
status_label = tk.Label(window, width=580, justify="center", text="Status: waiting for input...", fg="#2ecc71", bg="#2d3436")
status_label.place(x=10,y=640,width=580,height=30)
return window
def update_preview(frame_number: int) -> None:
if roop.globals.source_path and roop.globals.target_path and frame_number:
video_frame = process_faces(
get_one_face(cv2.imread(roop.globals.source_path)),
get_video_frame(roop.globals.target_path, frame_number)
)
img = Image.fromarray(video_frame)
img = ImageOps.contain(img, (PREVIEW_WIDTH, PREVIEW_HEIGHT), Image.LANCZOS)
img = ImageTk.PhotoImage(img)
preview_label.configure(image=img)
preview_label.image = img

View File

@ -5,6 +5,8 @@ import subprocess
from pathlib import Path
from typing import List, Any
import cv2
import roop.globals
from PIL import Image
@ -33,7 +35,7 @@ def extract_frames(target_path: str) -> None:
def create_video(target_path: str, fps: int) -> None:
run_ffmpeg(['-i', get_temp_directory_path(target_path) + os.sep + '%04d.png', '-framerate', str(fps), '-c:v', 'libx264', '-crf', '7', '-pix_fmt', 'yuv420p', '-y', get_temp_file_path(target_path)])
run_ffmpeg(['-i', get_temp_directory_path(target_path) + os.sep + '%04d.png', '-framerate', str(fps), '-c:v', 'libx264', '-crf', str(roop.globals.video_quality), '-pix_fmt', 'yuv420p', '-y', get_temp_file_path(target_path)])
def restore_audio(target_path: str, output_path: str) -> None:
@ -65,18 +67,19 @@ def move_temp(target_path: str, output_path: str) -> None:
def clean_temp(target_path: str) -> None:
if not roop.globals.keep_frames:
shutil.rmtree(get_temp_directory_path(target_path))
temp_directory_path = get_temp_directory_path(target_path)
if not roop.globals.keep_frames and os.path.isdir(temp_directory_path):
shutil.rmtree(temp_directory_path)
def has_image_extention(image_path: str) -> bool:
return image_path.lower().endswith(('png', 'jpg', 'jpeg', 'bmp'))
return image_path.lower().endswith(('png', 'jpg', 'jpeg'))
def is_image(path: str) -> bool:
if os.path.isfile(path):
def is_image(image_path: str) -> bool:
if image_path and os.path.isfile(image_path):
try:
image = Image.open(path)
image = Image.open(image_path)
image.verify()
return True
except Exception:
@ -84,10 +87,14 @@ def is_image(path: str) -> bool:
return False
def is_video(path: str) -> bool:
def is_video(video_path: str) -> bool:
if video_path and os.path.isfile(video_path):
try:
run_ffmpeg(['-v', 'error', '-i', path, '-f', 'null', '-'])
return True
except subprocess.CalledProcessError:
capture = cv2.VideoCapture(video_path)
if capture.isOpened():
is_video, _ = capture.read()
capture.release()
return is_video
except Exception:
pass
return False