Compare commits

..

No commits in common. "nsfw" and "main" have entirely different histories.
nsfw ... main

20 changed files with 416 additions and 797 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 537 KiB

Binary file not shown.

Binary file not shown.

View File

@ -14,19 +14,3 @@ jobs:
python-version: 3.9
- run: pip install flake8
- run: flake8 run.py core
test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up ffmpeg
uses: FedericoCarboni/setup-ffmpeg@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- run: pip install -r requirements.txt gdown
- run: gdown 13QpWFWJ37EB-nHrEOY64CEtQWY-tz7DZ
- run: python run.py -f=.github/examples/face.jpg -t=.github/examples/target.mp4 -o=.github/examples/output.mp4
- run: ffmpeg -i .github/examples/snapshot.mp4 -i .github/examples/output.mp4 -filter_complex psnr -f null -

3
.gitignore vendored
View File

@ -1,3 +0,0 @@
.idea
temp
__pycache__

View File

@ -34,28 +34,19 @@ Additional command line arguments are given below:
```
options:
-h, --help show this help message and exit
-f SOURCE_PATH, --face SOURCE_PATH
use a face image
-f SOURCE_IMG, --face SOURCE_IMG
use this face
-t TARGET_PATH, --target TARGET_PATH
replace image or video with face
-o OUTPUT_PATH, --output OUTPUT_PATH
replace this face
-o OUTPUT_FILE, --output OUTPUT_FILE
save output to this file
--gpu use gpu
--keep-fps maintain original fps
--keep-audio maintain original audio
--keep-frames keep frames directory
--many-faces swap every face in the frame
--video-encoder VIDEO_ENCODER
adjust output video encoder
--video-quality VIDEO_QUALITY
adjust output video quality
--max-memory MAX_MEMORY
maximum amount of RAM in GB to be used
--cpu-cores CPU_CORES
number of CPU cores to use
--gpu-threads GPU_THREADS
number of threads to be use for the GPU
--gpu-vendor {apple,amd,nvidia}
select your GPU vendor
--max-cores CORES_COUNT
number of cores to be use for CPU mode
```
Looking for a CLI mode? Using the -f/--face argument will make the program in cli mode.
@ -66,7 +57,6 @@ Looking for a CLI mode? Using the -f/--face argument will make the program in cl
- [ ] Support for replacing multiple faces
## Credits
- [henryruhs](https://github.com/henryruhs): for being an irreplacable contributor to the project
- [ffmpeg](https://ffmpeg.org/): for making video related operations easy
- [deepinsight](https://github.com/deepinsight): for their [insightface](https://github.com/deepinsight/insightface) project which provided a well-made library and models.
- and all developers behind libraries used in this project.

20
core/analyser.py Normal file
View File

@ -0,0 +1,20 @@
import insightface
import core.globals
FACE_ANALYSER = None
def get_face_analyser():
global FACE_ANALYSER
if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=core.globals.providers)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
return FACE_ANALYSER
def get_face(img_data):
face = get_face_analyser().get(img_data)
try:
return sorted(face, key=lambda x: x.bbox[0])[0]
except IndexError:
return None

7
core/globals.py Normal file
View File

@ -0,0 +1,7 @@
import onnxruntime
use_gpu = False
providers = onnxruntime.get_available_providers()
if 'TensorrtExecutionProvider' in providers:
providers.remove('TensorrtExecutionProvider')

44
core/swapper.py Normal file
View File

@ -0,0 +1,44 @@
import os
from tqdm import tqdm
import cv2
import insightface
import core.globals
from core.analyser import get_face
FACE_SWAPPER = None
def get_face_swapper():
global FACE_SWAPPER
if FACE_SWAPPER is None:
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx')
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=core.globals.providers)
return FACE_SWAPPER
def process_video(source_img, frame_paths):
source_face = get_face(cv2.imread(source_img))
with tqdm(total=len(frame_paths), desc="Processing", unit="frame", dynamic_ncols=True, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]') as progress:
for frame_path in frame_paths:
frame = cv2.imread(frame_path)
try:
face = get_face(frame)
if face:
result = get_face_swapper().get(frame, face, source_face, paste_back=True)
cv2.imwrite(frame_path, result)
progress.set_postfix(status='.', refresh=True)
else:
progress.set_postfix(status='S', refresh=True)
except Exception:
progress.set_postfix(status='E', refresh=True)
pass
progress.update(1)
def process_img(source_img, target_path, output_file):
frame = cv2.imread(target_path)
face = get_face(frame)
source_face = get_face(cv2.imread(source_img))
result = get_face_swapper().get(frame, face, source_face, paste_back=True)
cv2.imwrite(output_file, result)
print("\n\nImage saved as:", output_file, "\n\n")

64
core/utils.py Normal file
View File

@ -0,0 +1,64 @@
import os
import shutil
sep = "/"
if os.name == "nt":
sep = "\\"
def path(string):
if sep == "\\":
return string.replace("/", "\\")
return string
def run_command(command, mode="silent"):
if mode == "debug":
return os.system(command)
return os.popen(command).read()
def detect_fps(input_path):
input_path = path(input_path)
output = os.popen(f'ffprobe -v error -select_streams v -of default=noprint_wrappers=1:nokey=1 -show_entries stream=r_frame_rate "{input_path}"').read()
if "/" in output:
try:
return int(output.split(os.sep)[0]) // int(output.split(os.sep)[1]), output.removesuffix('\n')
except:
pass
return 30, 30
def set_fps(input_path, output_path, fps):
input_path, output_path = path(input_path), path(output_path)
os.system(f'ffmpeg -i "{input_path}" -filter:v fps=fps={fps} "{output_path}"')
def create_video(video_name, fps, output_dir):
output_dir = path(output_dir)
os.system(f'ffmpeg -framerate "{fps}" -i "{output_dir}{sep}%04d.png" -c:v libx264 -crf 7 -pix_fmt yuv420p -y "{output_dir}{sep}output.mp4"')
def extract_frames(input_path, output_dir):
input_path, output_dir = path(input_path), path(output_dir)
os.system(f'ffmpeg -i "{input_path}" "{output_dir}{sep}%04d.png"')
def add_audio(output_dir, target_path, video, keep_frames, output_file):
video_name = os.path.splitext(video)[0]
save_to = output_file if output_file else output_dir + "/swapped-" + video_name + ".mp4"
save_to_ff, output_dir_ff = path(save_to), path(output_dir)
os.system(f'ffmpeg -i "{output_dir_ff}{sep}output.mp4" -i "{output_dir_ff}{sep}{video}" -c:v copy -map 0:v:0 -map 1:a:0 -y "{save_to_ff}"')
if not os.path.isfile(save_to):
shutil.move(output_dir + "/output.mp4", save_to)
if not keep_frames:
shutil.rmtree(output_dir)
def is_img(path):
return path.lower().endswith(("png", "jpg", "jpeg", "bmp"))
def rreplace(s, old, new, occurrence):
li = s.rsplit(old, occurrence)
return new.join(li)

View File

@ -1,5 +1,3 @@
--extra-index-url https://download.pytorch.org/whl/cu118
numpy==1.23.5
opencv-python==4.7.0.72
onnx==1.14.0
@ -7,12 +5,9 @@ insightface==0.7.3
psutil==5.9.5
tk==0.1.0
pillow==9.5.0
torch==2.0.1+cu118
onnxruntime==1.15.0; sys_platform == 'darwin' and platform_machine != 'arm64'
onnxruntime-silicon==1.13.1; sys_platform == 'darwin' and platform_machine == 'arm64'
onnxruntime-gpu==1.15.0; sys_platform != 'darwin'
tensorflow==2.13.0rc1; sys_platform == 'darwin'
tensorflow==2.12.0; sys_platform != 'darwin'
torch==2.0.1
onnxruntime-gpu==1.15.0
tensorflow==2.12.0
opennsfw2==0.10.2
protobuf==4.23.2
tqdm==4.65.0

View File

@ -1,28 +0,0 @@
from typing import Any
import insightface
import roop.globals
FACE_ANALYSER = None
def get_face_analyser() -> Any:
global FACE_ANALYSER
if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=roop.globals.providers)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
return FACE_ANALYSER
def get_one_face(image_data) -> Any:
face = get_face_analyser().get(image_data)
try:
return min(face, key=lambda x: x.bbox[0])
except ValueError:
return None
def get_many_faces(image_data) -> Any:
try:
return get_face_analyser().get(image_data)
except IndexError:
return None

View File

@ -1,12 +0,0 @@
import cv2
def get_video_frame(video_path: str, frame_number: int = 1):
capture = cv2.VideoCapture(video_path)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_frame, frame = capture.read()
capture.release()
if has_frame:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return None

View File

@ -1,233 +0,0 @@
#!/usr/bin/env python3
import os
import sys
# single thread doubles performance of gpu-mode - needs to be set before torch import
if any(arg.startswith('--gpu-vendor') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
from typing import List
import platform
import signal
import shutil
import argparse
import psutil
import torch
import tensorflow
import multiprocessing
from opennsfw2 import predict_video_frames, predict_image
import cv2
import roop.globals
import roop.ui as ui
from roop.swapper import process_video, process_image
from roop.utilities import has_image_extention, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frames_paths, restore_audio, create_temp, move_temp, clean_temp
from roop.analyser import get_one_face
if 'ROCMExecutionProvider' in roop.globals.providers:
del torch
warnings.simplefilter(action='ignore', category=FutureWarning)
def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--face', help='use a face image', dest='source_path')
parser.add_argument('-t', '--target', help='replace image or video with face', dest='target_path')
parser.add_argument('-o', '--output', help='save output to this file', dest='output_path')
parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False)
parser.add_argument('--keep-audio', help='maintain original audio', dest='keep_audio', action='store_true', default=True)
parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False)
parser.add_argument('--many-faces', help='swap every face in the frame', dest='many_faces', action='store_true', default=False)
parser.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264')
parser.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18)
parser.add_argument('--max-memory', help='maximum amount of RAM in GB to be used', dest='max_memory', type=int, default=suggest_max_memory())
parser.add_argument('--cpu-cores', help='number of CPU cores to use', dest='cpu_cores', type=int, default=suggest_cpu_cores())
parser.add_argument('--gpu-threads', help='number of threads to be use for the GPU', dest='gpu_threads', type=int, default=suggest_gpu_threads())
parser.add_argument('--gpu-vendor', help='select your GPU vendor', dest='gpu_vendor', choices=['apple', 'amd', 'nvidia'])
args = parser.parse_known_args()[0]
roop.globals.source_path = args.source_path
roop.globals.target_path = args.target_path
roop.globals.output_path = args.output_path
roop.globals.headless = args.source_path or args.target_path or args.output_path
roop.globals.keep_fps = args.keep_fps
roop.globals.keep_audio = args.keep_audio
roop.globals.keep_frames = args.keep_frames
roop.globals.many_faces = args.many_faces
roop.globals.video_encoder = args.video_encoder
roop.globals.video_quality = args.video_quality
roop.globals.max_memory = args.max_memory
roop.globals.cpu_cores = args.cpu_cores
roop.globals.gpu_threads = args.gpu_threads
if args.gpu_vendor:
roop.globals.gpu_vendor = args.gpu_vendor
else:
roop.globals.providers = ['CPUExecutionProvider']
def suggest_max_memory() -> int:
if platform.system().lower() == 'darwin':
return 4
return 16
def suggest_gpu_threads() -> int:
if 'ROCMExecutionProvider' in roop.globals.providers:
return 2
return 8
def suggest_cpu_cores() -> int:
if platform.system().lower() == 'darwin':
return 2
return int(max(psutil.cpu_count() / 2, 1))
def limit_resources() -> None:
# prevent tensorflow memory leak
gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True)
if roop.globals.max_memory:
memory = roop.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'darwin':
memory = roop.globals.max_memory * 1024 ** 6
if platform.system().lower() == 'windows':
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
else:
import resource
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
def pre_check() -> None:
if sys.version_info < (3, 9):
quit('Python version is not supported - please upgrade to 3.9 or higher.')
if not shutil.which('ffmpeg'):
quit('ffmpeg is not installed!')
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx')
if not os.path.isfile(model_path):
quit('File "inswapper_128.onnx" does not exist!')
if roop.globals.gpu_vendor == 'apple':
if 'CoreMLExecutionProvider' not in roop.globals.providers:
quit('You are using --gpu=apple flag but CoreML is not available or properly installed on your system.')
if roop.globals.gpu_vendor == 'amd':
if 'ROCMExecutionProvider' not in roop.globals.providers:
quit('You are using --gpu=amd flag but ROCM is not available or properly installed on your system.')
if roop.globals.gpu_vendor == 'nvidia':
if not torch.cuda.is_available():
quit('You are using --gpu=nvidia flag but CUDA is not available or properly installed on your system.')
if torch.version.cuda > '11.8':
quit(f'CUDA version {torch.version.cuda} is not supported - please downgrade to 11.8')
if torch.version.cuda < '11.4':
quit(f'CUDA version {torch.version.cuda} is not supported - please upgrade to 11.8')
if torch.backends.cudnn.version() < 8220:
quit(f'CUDNN version { torch.backends.cudnn.version()} is not supported - please upgrade to 8.9.1')
if torch.backends.cudnn.version() > 8910:
quit(f'CUDNN version { torch.backends.cudnn.version()} is not supported - please downgrade to 8.9.1')
def conditional_process_video(source_path: str, frame_paths: List[str]) -> None:
pool_amount = len(frame_paths) // roop.globals.cpu_cores
if pool_amount > 2 and roop.globals.cpu_cores > 1 and roop.globals.gpu_vendor is None:
global POOL
POOL = multiprocessing.Pool(roop.globals.cpu_cores, maxtasksperchild=1)
pools = []
for i in range(0, len(frame_paths), pool_amount):
pool = POOL.apply_async(process_video, args=(source_path, frame_paths[i:i + pool_amount], 'cpu'))
pools.append(pool)
for pool in pools:
pool.get()
POOL.close()
POOL.join()
else:
process_video(roop.globals.source_path, frame_paths, 'gpu')
def update_status(message: str) -> None:
value = 'Status: ' + message
print(value)
if not roop.globals.headless:
ui.update_status(value)
def start() -> None:
if not roop.globals.source_path or not os.path.isfile(roop.globals.source_path):
update_status('Select an image that contains a face.')
return
elif not roop.globals.target_path or not os.path.isfile(roop.globals.target_path):
update_status('Select an image or video target!')
return
test_face = get_one_face(cv2.imread(roop.globals.source_path))
if not test_face:
update_status('No face detected in source image. Please try with another one!')
return
# process image to image
if has_image_extention(roop.globals.target_path):
if predict_image(roop.globals.target_path) > 0.85:
destroy()
process_image(roop.globals.source_path, roop.globals.target_path, roop.globals.output_path)
if is_image(roop.globals.target_path):
update_status('Swapping to image succeed!')
else:
update_status('Swapping to image failed!')
return
# process image to videos
# seconds, probabilities = predict_video_frames(video_path=roop.globals.target_path, frame_interval=100)
# if any(probability > 0.85 for probability in probabilities):
# destroy()
update_status('Creating temp resources...')
create_temp(roop.globals.target_path)
update_status('Extracting frames...')
extract_frames(roop.globals.target_path)
frame_paths = get_temp_frames_paths(roop.globals.target_path)
update_status('Swapping in progress...')
conditional_process_video(roop.globals.source_path, frame_paths)
# prevent memory leak using ffmpeg with cuda
if roop.globals.gpu_vendor == 'nvidia':
torch.cuda.empty_cache()
if roop.globals.keep_fps:
update_status('Detecting fps...')
fps = detect_fps(roop.globals.target_path)
update_status(f'Creating video with {fps} fps...')
create_video(roop.globals.target_path, fps)
else:
update_status('Creating video with 30 fps...')
create_video(roop.globals.target_path, 30)
if roop.globals.keep_audio:
if roop.globals.keep_fps:
update_status('Restoring audio...')
else:
update_status('Restoring audio might cause issues as fps are not kept...')
restore_audio(roop.globals.target_path, roop.globals.output_path)
else:
move_temp(roop.globals.target_path, roop.globals.output_path)
clean_temp(roop.globals.target_path)
if is_video(roop.globals.target_path):
update_status('Swapping to video succeed!')
else:
update_status('Swapping to video failed!')
def destroy() -> None:
if roop.globals.target_path:
clean_temp(roop.globals.target_path)
quit()
def run() -> None:
parse_args()
pre_check()
limit_resources()
if roop.globals.headless:
start()
else:
window = ui.init(start, destroy)
window.mainloop()

View File

@ -1,21 +0,0 @@
import onnxruntime
source_path = None
target_path = None
output_path = None
keep_fps = None
keep_audio = None
keep_frames = None
many_faces = None
video_encoder = None
video_quality = None
max_memory = None
cpu_cores = None
gpu_threads = None
gpu_vendor = None
headless = None
log_level = 'error'
providers = onnxruntime.get_available_providers()
if 'TensorrtExecutionProvider' in providers:
providers.remove('TensorrtExecutionProvider')

View File

@ -1,95 +0,0 @@
import os
from typing import Any
from tqdm import tqdm
import cv2
import insightface
import threading
import roop.globals
from roop.analyser import get_one_face, get_many_faces
FACE_SWAPPER = None
THREAD_LOCK = threading.Lock()
def get_face_swapper() -> None:
global FACE_SWAPPER
with THREAD_LOCK:
if FACE_SWAPPER is None:
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx')
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=roop.globals.providers)
return FACE_SWAPPER
def swap_face_in_frame(source_face: Any, target_face: Any, frame: Any) -> None:
if target_face:
return get_face_swapper().get(frame, target_face, source_face, paste_back=True)
return frame
def process_faces(source_face: Any, target_frame: Any) -> Any:
if roop.globals.many_faces:
many_faces = get_many_faces(target_frame)
if many_faces:
for face in many_faces:
target_frame = swap_face_in_frame(source_face, face, target_frame)
else:
face = get_one_face(target_frame)
if face:
target_frame = swap_face_in_frame(source_face, face, target_frame)
return target_frame
def process_frames(source_path: str, frame_paths: [str], progress=None) -> None:
source_face = get_one_face(cv2.imread(source_path))
for frame_path in frame_paths:
frame = cv2.imread(frame_path)
try:
result = process_faces(source_face, frame)
cv2.imwrite(frame_path, result)
except Exception as exception:
print(exception)
pass
if progress:
progress.update(1)
def multi_process_frame(source_img, frame_paths, progress) -> None:
threads = []
frames_per_thread = len(frame_paths) // roop.globals.gpu_threads
remaining_frames = len(frame_paths) % roop.globals.gpu_threads
start_index = 0
# create threads by frames
for _ in range(roop.globals.gpu_threads):
end_index = start_index + frames_per_thread
if remaining_frames > 0:
end_index += 1
remaining_frames -= 1
thread_frame_paths = frame_paths[start_index:end_index]
thread = threading.Thread(target=process_frames, args=(source_img, thread_frame_paths, progress))
threads.append(thread)
thread.start()
start_index = end_index
# join threads
for thread in threads:
thread.join()
def process_image(source_path: str, target_path: str, output_file) -> None:
frame = cv2.imread(target_path)
target_frame = get_one_face(frame)
source_face = get_one_face(cv2.imread(source_path))
result = get_face_swapper().get(frame, target_frame, source_face, paste_back=True)
cv2.imwrite(output_file, result)
def process_video(source_path: str, frame_paths: [str], mode: str) -> None:
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths)
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
if mode == 'cpu':
progress.set_postfix({'mode': mode, 'cores': roop.globals.cpu_cores, 'memory': roop.globals.max_memory})
process_frames(source_path, frame_paths, progress)
elif mode == 'gpu':
progress.set_postfix({'mode': mode, 'threads': roop.globals.gpu_threads, 'memory': roop.globals.max_memory})
multi_process_frame(source_path, frame_paths, progress)

View File

@ -1,246 +0,0 @@
import os
import tkinter as tk
from tkinter import filedialog
from typing import Callable, Any, Tuple
import cv2
from PIL import Image, ImageTk, ImageOps
import roop.globals
from roop.analyser import get_one_face
from roop.capturer import get_video_frame
from roop.swapper import process_faces
from roop.utilities import is_image, is_video
PRIMARY_COLOR = '#2d3436'
SECONDARY_COLOR = '#74b9ff'
TERTIARY_COLOR = '#f1c40f'
ACCENT_COLOR = '#2ecc71'
WINDOW_HEIGHT = 700
WINDOW_WIDTH = 600
PREVIEW_MAX_HEIGHT = 700
PREVIEW_MAX_WIDTH = 1200
RECENT_DIRECTORY_SOURCE = None
RECENT_DIRECTORY_TARGET = None
RECENT_DIRECTORY_OUTPUT = None
def init(start: Callable, destroy: Callable) -> tk.Tk:
global ROOT, PREVIEW
ROOT = create_root(start, destroy)
PREVIEW = create_preview(ROOT)
return ROOT
def create_root(start: Callable, destroy: Callable) -> tk.Tk:
global source_label, target_label, status_label
root = tk.Tk()
root.minsize(WINDOW_WIDTH, WINDOW_HEIGHT)
root.title('roop')
root.configure(bg=PRIMARY_COLOR)
root.option_add('*Font', ('Arial', 11))
root.protocol('WM_DELETE_WINDOW', lambda: destroy())
source_label = tk.Label(root, bg=PRIMARY_COLOR)
source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25)
target_label = tk.Label(root, bg=PRIMARY_COLOR)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25)
source_button = create_primary_button(root, 'Select a face', lambda: select_source_path())
source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1)
target_button = create_primary_button(root, 'Select a target', lambda: select_target_path())
target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
keep_fps_value = tk.BooleanVar(value=roop.globals.keep_fps)
keep_fps_checkbox = create_checkbox(root, 'Limit to 30 fps', keep_fps_value, lambda: setattr(roop.globals, 'keep_fps', not roop.globals.keep_fps))
keep_fps_checkbox.place(relx=0.1, rely=0.6)
keep_frames_value = tk.BooleanVar(value=roop.globals.keep_frames)
keep_frames_checkbox = create_checkbox(root, 'Keep frames dir', keep_frames_value, lambda: setattr(roop.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_checkbox.place(relx=0.1, rely=0.65)
keep_audio_value = tk.BooleanVar(value=roop.globals.keep_audio)
keep_audio_checkbox = create_checkbox(root, 'Keep original audio', keep_audio_value, lambda: setattr(roop.globals, 'keep_audio', keep_audio_value.get()))
keep_audio_checkbox.place(relx=0.6, rely=0.6)
many_faces_value = tk.BooleanVar(value=roop.globals.many_faces)
many_faces_checkbox = create_checkbox(root, 'Replace all faces', many_faces_value, lambda: setattr(roop.globals, 'many_faces', many_faces_value.get()))
many_faces_checkbox.place(relx=0.6, rely=0.65)
start_button = create_secondary_button(root, 'Start', lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.75, relwidth=0.2, relheight=0.05)
stop_button = create_secondary_button(root, 'Destroy', lambda: destroy())
stop_button.place(relx=0.4, rely=0.75, relwidth=0.2, relheight=0.05)
preview_button = create_secondary_button(root, 'Preview', lambda: toggle_preview())
preview_button.place(relx=0.65, rely=0.75, relwidth=0.2, relheight=0.05)
status_label = tk.Label(root, justify='center', text='Status: None', fg=ACCENT_COLOR, bg=PRIMARY_COLOR)
status_label.place(relx=0.1, rely=0.9)
return root
def create_preview(parent) -> tk.Toplevel:
global preview_label, preview_scale
preview = tk.Toplevel(parent)
preview.withdraw()
preview.title('Preview')
preview.configure(bg=PRIMARY_COLOR)
preview.option_add('*Font', ('Arial', 11))
preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview())
preview.resizable(width=False, height=False)
preview_label = tk.Label(preview, bg=PRIMARY_COLOR)
preview_label.pack(fill='both', expand=True)
preview_scale = tk.Scale(preview, orient='horizontal', command=lambda frame_value: update_preview(int(frame_value)))
preview_scale.pack(fill='x')
return preview
def create_primary_button(parent: Any, text: str, command: Callable) -> tk.Button:
return tk.Button(
parent,
text=text,
command=command,
bg=PRIMARY_COLOR,
fg=SECONDARY_COLOR,
relief='flat',
highlightthickness=4,
highlightbackground=SECONDARY_COLOR,
activebackground=SECONDARY_COLOR,
borderwidth=4
)
def create_secondary_button(parent: Any, text: str, command: Callable) -> tk.Button:
return tk.Button(
parent,
text=text,
command=command,
bg=TERTIARY_COLOR,
relief='flat',
borderwidth=0,
highlightthickness=0
)
def create_checkbox(parent: Any, text: str, variable: tk.BooleanVar, command: Callable) -> tk.Checkbutton:
return tk.Checkbutton(
parent,
text=text,
variable=variable,
command=command,
relief='flat',
bg=PRIMARY_COLOR,
activebackground=PRIMARY_COLOR,
activeforeground=SECONDARY_COLOR,
selectcolor=PRIMARY_COLOR,
fg=SECONDARY_COLOR,
borderwidth=0,
highlightthickness=0
)
def update_status(text: str) -> None:
status_label['text'] = text
ROOT.update()
def select_source_path():
global RECENT_DIRECTORY_SOURCE
source_path = filedialog.askopenfilename(title='Select an face image', initialdir=RECENT_DIRECTORY_SOURCE)
if is_image(source_path):
roop.globals.source_path = source_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(roop.globals.source_path)
image = render_image_preview(roop.globals.source_path, (200, 200))
source_label.configure(image=image)
source_label.image = image
else:
roop.globals.source_path = None
source_label.configure(image=None)
source_label.image = None
def select_target_path():
global RECENT_DIRECTORY_TARGET
target_path = filedialog.askopenfilename(title='Select an image or video target', initialdir=RECENT_DIRECTORY_TARGET)
if is_image(target_path):
roop.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(roop.globals.target_path)
image = render_image_preview(roop.globals.target_path)
target_label.configure(image=image)
target_label.image = image
elif is_video(target_path):
roop.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(roop.globals.target_path)
video_frame = render_video_preview(target_path, (200, 200))
target_label.configure(image=video_frame)
target_label.image = video_frame
else:
roop.globals.target_path = None
target_label.configure(image=None)
target_label.image = None
def select_output_path(start):
global RECENT_DIRECTORY_OUTPUT
if is_image(roop.globals.target_path):
output_path = filedialog.asksaveasfilename(title='Save image output', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
elif is_video(roop.globals.target_path):
output_path = filedialog.asksaveasfilename(title='Save video output', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
if output_path:
roop.globals.output_path = output_path
RECENT_DIRECTORY_OUTPUT = os.path.dirname(roop.globals.output_path)
start()
def render_image_preview(image_path: str, dimensions: Tuple[int, int] = None) -> ImageTk.PhotoImage:
image = Image.open(image_path)
if dimensions:
image = ImageOps.fit(image, dimensions, Image.LANCZOS)
return ImageTk.PhotoImage(image)
def render_video_preview(video_path: str, dimensions: Tuple[int, int] = None, frame_number: int = 1) -> ImageTk.PhotoImage:
capture = cv2.VideoCapture(video_path)
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if dimensions:
image = ImageOps.fit(image, dimensions, Image.LANCZOS)
return ImageTk.PhotoImage(image)
capture.release()
cv2.destroyAllWindows()
def toggle_preview() -> None:
if PREVIEW.state() == 'normal':
PREVIEW.withdraw()
else:
update_preview(1)
PREVIEW.deiconify()
def update_preview(frame_number: int) -> None:
if roop.globals.source_path and roop.globals.target_path and frame_number:
video_frame = process_faces(
get_one_face(cv2.imread(roop.globals.source_path)),
get_video_frame(roop.globals.target_path, frame_number)
)
image = Image.fromarray(video_frame)
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
image = ImageTk.PhotoImage(image)
preview_label.configure(image=image)
preview_label.image = image

View File

@ -1,114 +0,0 @@
import glob
import os
import shutil
import subprocess
from pathlib import Path
from typing import List
import cv2
from PIL import Image
import roop.globals
TEMP_FILE = 'temp.mp4'
TEMP_DIRECTORY = 'temp'
def run_ffmpeg(args: List) -> None:
commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', roop.globals.log_level]
commands.extend(args)
try:
subprocess.check_output(commands, stderr=subprocess.STDOUT)
except Exception:
pass
def detect_fps(target_path: str) -> int:
command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path]
output = subprocess.check_output(command).decode().strip()
try:
return int(eval(output))
except Exception:
pass
return 30
def extract_frames(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-i', target_path, os.path.join(temp_directory_path, '%04d.png')])
def create_video(target_path: str, fps: int) -> None:
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-i', os.path.join(temp_directory_path, '%04d.png'), '-framerate', str(fps), '-c:v', roop.globals.video_encoder, '-crf', str(roop.globals.video_quality), '-pix_fmt', 'yuv420p', '-y', get_temp_file_path(target_path)])
def restore_audio(target_path: str, output_path: str) -> None:
temp_file_path = get_temp_file_path(target_path)
run_ffmpeg(['-i', temp_file_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
if not os.path.isfile(output_path):
move_temp(target_path, output_path)
def get_temp_frames_paths(target_path: str) -> List:
temp_directory_path = get_temp_directory_path(target_path)
return glob.glob(os.path.join(temp_directory_path, '*.png'))
def get_temp_directory_path(target_path: str) -> str:
filename, _ = os.path.splitext(os.path.basename(target_path))
target_name = os.path.dirname(target_path)
return os.path.join(target_name, TEMP_DIRECTORY, filename)
def get_temp_file_path(target_path: str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, TEMP_FILE)
def create_temp(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
Path(temp_directory_path).mkdir(parents=True, exist_ok=True)
def move_temp(target_path: str, output_path: str) -> None:
temp_file_path = get_temp_file_path(target_path)
if os.path.isfile(temp_file_path):
shutil.move(temp_file_path, output_path)
def clean_temp(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
parent_directory_path = os.path.dirname(temp_directory_path)
parent_directory_name = os.path.basename(parent_directory_path)
if not roop.globals.keep_frames and os.path.isdir(temp_directory_path):
shutil.rmtree(temp_directory_path)
if not os.listdir(parent_directory_path) and parent_directory_name == TEMP_DIRECTORY:
os.rmdir(parent_directory_path)
def has_image_extention(image_path: str) -> bool:
return image_path.lower().endswith(('png', 'jpg', 'jpeg'))
def is_image(image_path: str) -> bool:
if image_path and os.path.isfile(image_path):
try:
image = Image.open(image_path)
image.verify()
return True
except Exception:
pass
return False
def is_video(video_path: str) -> bool:
if video_path and os.path.isfile(video_path):
try:
capture = cv2.VideoCapture(video_path)
if capture.isOpened():
is_video, _ = capture.read()
capture.release()
return is_video
except Exception:
pass
return False

273
run.py
View File

@ -1,6 +1,273 @@
#!/usr/bin/env python3
from roop import core
import platform
import signal
import sys
import shutil
import glob
import argparse
import multiprocessing as mp
import os
import torch
from pathlib import Path
import tkinter as tk
from tkinter import filedialog
from opennsfw2 import predict_video_frames, predict_image
from tkinter.filedialog import asksaveasfilename
import webbrowser
import psutil
import cv2
import threading
from PIL import Image, ImageTk
import core.globals
from core.swapper import process_video, process_img
from core.utils import is_img, detect_fps, set_fps, create_video, add_audio, extract_frames, rreplace
from core.analyser import get_face
if __name__ == '__main__':
core.run()
if 'ROCMExecutionProvider' in core.globals.providers:
del torch
pool = None
args = {}
signal.signal(signal.SIGINT, lambda signal_number, frame: quit())
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--face', help='use this face', dest='source_img')
parser.add_argument('-t', '--target', help='replace this face', dest='target_path')
parser.add_argument('-o', '--output', help='save output to this file', dest='output_file')
parser.add_argument('--gpu', help='use gpu', dest='gpu', action='store_true', default=False)
parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False)
parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False)
parser.add_argument('--max-memory', help='maximum amount of RAM in GB to be used', type=int)
parser.add_argument('--max-cores', help='number of cores to be use for CPU mode', dest='cores_count', type=int, default=max(psutil.cpu_count() - 2, 2))
for name, value in vars(parser.parse_args()).items():
args[name] = value
sep = "/"
if os.name == "nt":
sep = "\\"
def limit_resources():
if args['max_memory']:
memory = args['max_memory'] * 1024 * 1024 * 1024
if str(platform.system()).lower() == 'windows':
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
else:
import resource
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
def pre_check():
if sys.version_info < (3, 9):
quit('Python version is not supported - please upgrade to 3.9 or higher')
if not shutil.which('ffmpeg'):
quit('ffmpeg is not installed!')
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'inswapper_128.onnx')
if not os.path.isfile(model_path):
quit('File "inswapper_128.onnx" does not exist!')
if '--gpu' in sys.argv:
if 'ROCMExecutionProvider' not in core.globals.providers:
CUDA_VERSION = torch.version.cuda
CUDNN_VERSION = torch.backends.cudnn.version()
if not torch.cuda.is_available() or not CUDA_VERSION:
quit("You are using --gpu flag but CUDA isn't available or properly installed on your system.")
if CUDA_VERSION > '11.8':
quit(f"CUDA version {CUDA_VERSION} is not supported - please downgrade to 11.8")
if CUDA_VERSION < '11.4':
quit(f"CUDA version {CUDA_VERSION} is not supported - please upgrade to 11.8")
if CUDNN_VERSION < 8220:
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please upgrade to 8.9.1")
if CUDNN_VERSION > 8910:
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please downgrade to 8.9.1")
else:
core.globals.providers = ['CPUExecutionProvider']
def start_processing():
if args['gpu']:
process_video(args['source_img'], args["frame_paths"])
return
frame_paths = args["frame_paths"]
n = len(frame_paths)//(args['cores_count'])
processes = []
for i in range(0, len(frame_paths), n):
p = pool.apply_async(process_video, args=(args['source_img'], frame_paths[i:i+n],))
processes.append(p)
for p in processes:
p.get()
pool.close()
pool.join()
def preview_image(image_path):
img = Image.open(image_path)
img = img.resize((180, 180), Image.ANTIALIAS)
photo_img = ImageTk.PhotoImage(img)
left_frame = tk.Frame(window)
left_frame.place(x=60, y=100)
img_label = tk.Label(left_frame, image=photo_img)
img_label.image = photo_img
img_label.pack()
def preview_video(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("Error opening video file")
return
ret, frame = cap.read()
if ret:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = Image.fromarray(frame)
img = img.resize((180, 180), Image.ANTIALIAS)
photo_img = ImageTk.PhotoImage(img)
right_frame = tk.Frame(window)
right_frame.place(x=360, y=100)
img_label = tk.Label(right_frame, image=photo_img)
img_label.image = photo_img
img_label.pack()
cap.release()
def select_face():
args['source_img'] = filedialog.askopenfilename(title="Select a face")
preview_image(args['source_img'])
def select_target():
args['target_path'] = filedialog.askopenfilename(title="Select a target")
threading.Thread(target=preview_video, args=(args['target_path'],)).start()
def toggle_fps_limit():
args['keep_fps'] = limit_fps.get() != True
def toggle_keep_frames():
args['keep_frames'] = keep_frames.get() != True
def save_file():
filename, ext = 'output.mp4', '.mp4'
if is_img(args['target_path']):
filename, ext = 'output.png', '.png'
args['output_file'] = asksaveasfilename(initialfile=filename, defaultextension=ext, filetypes=[("All Files","*.*"),("Videos","*.mp4")])
def status(string):
if 'cli_mode' in args:
print("Status: " + string)
else:
status_label["text"] = "Status: " + string
window.update()
def start():
if not args['source_img'] or not os.path.isfile(args['source_img']):
print("\n[WARNING] Please select an image containing a face.")
return
elif not args['target_path'] or not os.path.isfile(args['target_path']):
print("\n[WARNING] Please select a video/image to swap face in.")
return
if not args['output_file']:
target_path = args['target_path']
args['output_file'] = rreplace(target_path, "/", "/swapped-", 1) if "/" in target_path else "swapped-" + target_path
global pool
pool = mp.Pool(args['cores_count'])
target_path = args['target_path']
test_face = get_face(cv2.imread(args['source_img']))
if not test_face:
print("\n[WARNING] No face detected in source image. Please try with another one.\n")
return
if is_img(target_path):
if predict_image(target_path) > 0.7:
quit()
process_img(args['source_img'], target_path, args['output_file'])
status("swap successful!")
return
seconds, probabilities = predict_video_frames(video_path=args['target_path'], frame_interval=100)
if any(probability > 0.7 for probability in probabilities):
quit()
video_name_full = target_path.split("/")[-1]
video_name = os.path.splitext(video_name_full)[0]
output_dir = os.path.dirname(target_path) + "/" + video_name
Path(output_dir).mkdir(exist_ok=True)
status("detecting video's FPS...")
fps, exact_fps = detect_fps(target_path)
if not args['keep_fps'] and fps > 30:
this_path = output_dir + "/" + video_name + ".mp4"
set_fps(target_path, this_path, 30)
target_path, exact_fps = this_path, 30
else:
shutil.copy(target_path, output_dir)
status("extracting frames...")
extract_frames(target_path, output_dir)
args['frame_paths'] = tuple(sorted(
glob.glob(output_dir + "/*.png"),
key=lambda x: int(x.split(sep)[-1].replace(".png", ""))
))
status("swapping in progress...")
start_processing()
status("creating video...")
create_video(video_name, exact_fps, output_dir)
status("adding audio...")
add_audio(output_dir, target_path, video_name_full, args['keep_frames'], args['output_file'])
save_path = args['output_file'] if args['output_file'] else output_dir + "/" + video_name + ".mp4"
print("\n\nVideo saved as:", save_path, "\n\n")
status("swap successful!")
if __name__ == "__main__":
global status_label, window
pre_check()
limit_resources()
if args['source_img']:
args['cli_mode'] = True
start()
quit()
window = tk.Tk()
window.geometry("600x700")
window.title("roop")
window.configure(bg="#2d3436")
window.resizable(width=False, height=False)
# Contact information
support_link = tk.Label(window, text="Donate to project <3", fg="#fd79a8", bg="#2d3436", cursor="hand2", font=("Arial", 8))
support_link.place(x=180,y=20,width=250,height=30)
support_link.bind("<Button-1>", lambda e: webbrowser.open("https://github.com/sponsors/s0md3v"))
# Select a face button
face_button = tk.Button(window, text="Select a face", command=select_face, bg="#2d3436", fg="#74b9ff", highlightthickness=4, relief="flat", highlightbackground="#74b9ff", activebackground="#74b9ff", borderwidth=4)
face_button.place(x=60,y=320,width=180,height=80)
# Select a target button
target_button = tk.Button(window, text="Select a target", command=select_target, bg="#2d3436", fg="#74b9ff", highlightthickness=4, relief="flat", highlightbackground="#74b9ff", activebackground="#74b9ff", borderwidth=4)
target_button.place(x=360,y=320,width=180,height=80)
# FPS limit checkbox
limit_fps = tk.IntVar()
fps_checkbox = tk.Checkbutton(window, relief="groove", activebackground="#2d3436", activeforeground="#74b9ff", selectcolor="black", text="Limit FPS to 30", fg="#dfe6e9", borderwidth=0, highlightthickness=0, bg="#2d3436", variable=limit_fps, command=toggle_fps_limit)
fps_checkbox.place(x=30,y=500,width=240,height=31)
fps_checkbox.select()
# Keep frames checkbox
keep_frames = tk.IntVar()
frames_checkbox = tk.Checkbutton(window, relief="groove", activebackground="#2d3436", activeforeground="#74b9ff", selectcolor="black", text="Keep frames dir", fg="#dfe6e9", borderwidth=0, highlightthickness=0, bg="#2d3436", variable=keep_frames, command=toggle_keep_frames)
frames_checkbox.place(x=37,y=450,width=240,height=31)
# Start button
start_button = tk.Button(window, text="Start", bg="#f1c40f", relief="flat", borderwidth=0, highlightthickness=0, command=lambda: [save_file(), start()])
start_button.place(x=240,y=560,width=120,height=49)
# Status label
status_label = tk.Label(window, width=580, justify="center", text="Status: waiting for input...", fg="#2ecc71", bg="#2d3436")
status_label.place(x=10,y=640,width=580,height=30)
window.mainloop()