Add typing to swapper

This commit is contained in:
henryruhs 2023-06-09 12:39:50 +02:00
parent 27a76eea60
commit 1a7db694cf

View File

@ -1,4 +1,6 @@
import os import os
from typing import Any
from tqdm import tqdm from tqdm import tqdm
import cv2 import cv2
import insightface import insightface
@ -10,7 +12,7 @@ FACE_SWAPPER = None
THREAD_LOCK = threading.Lock() THREAD_LOCK = threading.Lock()
def get_face_swapper(): def get_face_swapper() -> None:
global FACE_SWAPPER global FACE_SWAPPER
with THREAD_LOCK: with THREAD_LOCK:
if FACE_SWAPPER is None: if FACE_SWAPPER is None:
@ -19,13 +21,13 @@ def get_face_swapper():
return FACE_SWAPPER return FACE_SWAPPER
def swap_face_in_frame(source_face, target_face, frame): def swap_face_in_frame(source_face: Any, target_face: Any, frame: Any) -> None:
if target_face: if target_face:
return get_face_swapper().get(frame, target_face, source_face, paste_back=True) return get_face_swapper().get(frame, target_face, source_face, paste_back=True)
return frame return frame
def process_faces(source_face, target_frame): def process_faces(source_face: Any, target_frame: Any) -> Any:
if roop.globals.many_faces: if roop.globals.many_faces:
many_faces = get_many_faces(target_frame) many_faces = get_many_faces(target_frame)
if many_faces: if many_faces:
@ -38,8 +40,8 @@ def process_faces(source_face, target_frame):
return target_frame return target_frame
def process_frames(source_img, frame_paths, progress=None): def process_frames(source_path: str, frame_paths: [str], progress=None) -> None:
source_face = get_one_face(cv2.imread(source_img)) source_face = get_one_face(cv2.imread(source_path))
for frame_path in frame_paths: for frame_path in frame_paths:
frame = cv2.imread(frame_path) frame = cv2.imread(frame_path)
try: try:
@ -52,15 +54,14 @@ def process_frames(source_img, frame_paths, progress=None):
progress.update(1) progress.update(1)
def multi_process_frame(source_img, frame_paths, progress): def multi_process_frame(source_img, frame_paths, progress) -> None:
threads = [] threads = []
num_threads = roop.globals.gpu_threads frames_per_thread = len(frame_paths) // roop.globals.gpu_threads
num_frames_per_thread = len(frame_paths) // num_threads remaining_frames = len(frame_paths) % roop.globals.gpu_threads
remaining_frames = len(frame_paths) % num_threads
start_index = 0 start_index = 0
# create threads by frames # create threads by frames
for _ in range(num_threads): for _ in range(roop.globals.gpu_threads):
end_index = start_index + num_frames_per_thread end_index = start_index + frames_per_thread
if remaining_frames > 0: if remaining_frames > 0:
end_index += 1 end_index += 1
remaining_frames -= 1 remaining_frames -= 1
@ -74,15 +75,15 @@ def multi_process_frame(source_img, frame_paths, progress):
thread.join() thread.join()
def process_image(source_img, target_path, output_file): def process_image(source_path: str, target_path: str, output_file) -> None:
frame = cv2.imread(target_path) frame = cv2.imread(target_path)
target_frame = get_one_face(frame) target_frame = get_one_face(frame)
source_face = get_one_face(cv2.imread(source_img)) source_face = get_one_face(cv2.imread(source_path))
result = get_face_swapper().get(frame, target_frame, source_face, paste_back=True) result = get_face_swapper().get(frame, target_frame, source_face, paste_back=True)
cv2.imwrite(output_file, result) cv2.imwrite(output_file, result)
def process_video(source_path, frame_paths, mode: str): def process_video(source_path: str, frame_paths: [str], mode: str) -> None:
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths) total = len(frame_paths)
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: