- refactored swapper.py to optimized code logic
- refactored "get_face" and "get_all_faces" to "get_face_single" and "get_face_many" respectively - moved all global booleans to top of file
This commit is contained in:
parent
b155a2cb36
commit
d17a440cd6
@ -12,7 +12,7 @@ def get_face_analyser():
|
||||
return FACE_ANALYSER
|
||||
|
||||
|
||||
def get_face(img_data):
|
||||
def get_face_single(img_data):
|
||||
face = get_face_analyser().get(img_data)
|
||||
try:
|
||||
return sorted(face, key=lambda x: x.bbox[0])[0]
|
||||
@ -20,7 +20,7 @@ def get_face(img_data):
|
||||
return None
|
||||
|
||||
|
||||
def get_all_faces(img_data):
|
||||
def get_face_many(img_data):
|
||||
try:
|
||||
return get_face_analyser().get(img_data)
|
||||
except IndexError:
|
||||
|
@ -1,8 +1,8 @@
|
||||
import onnxruntime
|
||||
|
||||
use_gpu = False
|
||||
providers = onnxruntime.get_available_providers()
|
||||
all_faces = False
|
||||
providers = onnxruntime.get_available_providers()
|
||||
|
||||
if 'TensorrtExecutionProvider' in providers:
|
||||
providers.remove('TensorrtExecutionProvider')
|
||||
|
@ -3,7 +3,7 @@ from tqdm import tqdm
|
||||
import cv2
|
||||
import insightface
|
||||
import core.globals
|
||||
from core.analyser import get_face, get_all_faces
|
||||
from core.analyser import get_face_single, get_face_many
|
||||
|
||||
FACE_SWAPPER = None
|
||||
|
||||
@ -16,34 +16,41 @@ def get_face_swapper():
|
||||
return FACE_SWAPPER
|
||||
|
||||
|
||||
def swap_face_in_frame(source_face, target_face, frame):
|
||||
if target_face:
|
||||
return get_face_swapper().get(frame, target_face, source_face, paste_back=True)
|
||||
return frame
|
||||
|
||||
|
||||
def process_faces(source_face, frame, progress, all_faces=True):
|
||||
if all_faces:
|
||||
many_faces = get_face_many(frame)
|
||||
if many_faces:
|
||||
for face in many_faces:
|
||||
frame = swap_face_in_frame(source_face, face, frame)
|
||||
progress.set_postfix(status='.', refresh=True)
|
||||
else:
|
||||
progress.set_postfix(status='S', refresh=True)
|
||||
else:
|
||||
face = get_face_single(frame)
|
||||
if face:
|
||||
frame = swap_face_in_frame(source_face, face, frame)
|
||||
progress.set_postfix(status='.', refresh=True)
|
||||
else:
|
||||
progress.set_postfix(status='S', refresh=True)
|
||||
return frame
|
||||
|
||||
|
||||
def process_video(source_img, frame_paths):
|
||||
source_face = get_face(cv2.imread(source_img))
|
||||
with tqdm(total=len(frame_paths), desc="Processing", unit="frame", dynamic_ncols=True, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]') as progress:
|
||||
source_face = get_face_single(cv2.imread(source_img))
|
||||
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
|
||||
|
||||
with tqdm(total=len(frame_paths), desc="Processing", unit="frame", dynamic_ncols=True, bar_format=progress_bar_format) as progress:
|
||||
for frame_path in frame_paths:
|
||||
frame = cv2.imread(frame_path)
|
||||
swapper = get_face_swapper()
|
||||
try:
|
||||
if core.globals.all_faces:
|
||||
all_faces = get_all_faces(frame)
|
||||
result = frame
|
||||
if len(all_faces) > 0:
|
||||
for singleFace in all_faces:
|
||||
if singleFace:
|
||||
result = swapper.get(result, singleFace, source_face, paste_back=True)
|
||||
progress.set_postfix(status='.', refresh=True)
|
||||
else:
|
||||
progress.set_postfix(status='S', refresh=True)
|
||||
else:
|
||||
progress.set_postfix(status='S', refresh=True)
|
||||
cv2.imwrite(frame_path, result)
|
||||
else:
|
||||
face = get_face(frame)
|
||||
if face:
|
||||
result = swapper.get(frame, face, source_face, paste_back=True)
|
||||
cv2.imwrite(frame_path, result)
|
||||
progress.set_postfix(status='.', refresh=True)
|
||||
else:
|
||||
progress.set_postfix(status='S', refresh=True)
|
||||
result = process_faces(source_face, frame, progress, core.globals.all_faces)
|
||||
cv2.imwrite(frame_path, result)
|
||||
except Exception:
|
||||
progress.set_postfix(status='E', refresh=True)
|
||||
pass
|
||||
@ -52,8 +59,8 @@ def process_video(source_img, frame_paths):
|
||||
|
||||
def process_img(source_img, target_path, output_file):
|
||||
frame = cv2.imread(target_path)
|
||||
face = get_face(frame)
|
||||
source_face = get_face(cv2.imread(source_img))
|
||||
face = get_face_single(frame)
|
||||
source_face = get_face_single(cv2.imread(source_img))
|
||||
result = get_face_swapper().get(frame, face, source_face, paste_back=True)
|
||||
cv2.imwrite(output_file, result)
|
||||
print("\n\nImage saved as:", output_file, "\n\n")
|
||||
|
4
run.py
4
run.py
@ -22,7 +22,7 @@ from PIL import Image, ImageTk
|
||||
import core.globals
|
||||
from core.swapper import process_video, process_img
|
||||
from core.utils import is_img, detect_fps, set_fps, create_video, add_audio, extract_frames, rreplace
|
||||
from core.analyser import get_face
|
||||
from core.analyser import get_face_single
|
||||
|
||||
if 'ROCMExecutionProvider' in core.globals.providers:
|
||||
del torch
|
||||
@ -188,7 +188,7 @@ def start():
|
||||
global pool
|
||||
pool = mp.Pool(args['cores_count'])
|
||||
target_path = args['target_path']
|
||||
test_face = get_face(cv2.imread(args['source_img']))
|
||||
test_face = get_face_single(cv2.imread(args['source_img']))
|
||||
if not test_face:
|
||||
print("\n[WARNING] No face detected in source image. Please try with another one.\n")
|
||||
return
|
||||
|
Loading…
Reference in New Issue
Block a user