Merge remote-tracking branch 'origin/main'
# Conflicts: # core/globals.py # core/processor.py # run.py
This commit is contained in:
16
.github/workflows/ci.yml
vendored
Normal file
16
.github/workflows/ci.yml
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
name: ci
|
||||
|
||||
on: [ push, pull_request ]
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
- name: Set up Python 3.8
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: 3.8
|
||||
- run: pip install flake8
|
||||
- run: flake8 run.py core
|
||||
24
README.md
24
README.md
@@ -24,17 +24,23 @@ Choose a face (image with desired face) and the target image/video (image/video
|
||||
Don't touch the FPS checkbox unless you know what you are doing.
|
||||
|
||||
Additional command line arguments are given below:
|
||||
|
||||
```
|
||||
-h, --help show this help message and exit
|
||||
-f SOURCE_IMG, --face SOURCE_IMG
|
||||
options:
|
||||
-h, --help show this help message and exit
|
||||
-f SOURCE_IMG, --face SOURCE_IMG
|
||||
use this face
|
||||
-t TARGET_PATH, --target TARGET_PATH
|
||||
-t TARGET_PATH, --target TARGET_PATH
|
||||
replace this face
|
||||
-o OUTPUT_FILE, --output OUTPUT_FILE
|
||||
save output to this file
|
||||
--keep-fps keep original fps
|
||||
--gpu use gpu
|
||||
--keep-frames don't delete frames directory
|
||||
-o OUTPUT_FILE, --output OUTPUT_FILE
|
||||
save output to this file
|
||||
--gpu use gpu
|
||||
--keep-fps maintain original fps
|
||||
--keep-frames keep frames directory
|
||||
--max-memory MAX_MEMORY
|
||||
set max memory
|
||||
--max-cores CORES_COUNT
|
||||
set max cpu cores
|
||||
```
|
||||
|
||||
Looking for a CLI mode? Using the -f/--face argument will make the program in cli mode.
|
||||
@@ -45,7 +51,7 @@ Looking for a CLI mode? Using the -f/--face argument will make the program in cl
|
||||
- [ ] Support for replacing multiple faces
|
||||
|
||||
## Disclaimer
|
||||
Deepfake software already exist. This is just an experiment to make the existing techniques better. Users are expected to use this to learn about AI and not use it for illicit or unethical purposes. Users must get consent from the concerned people before using their face and must not hide the fact that it is a deepfake when posting content online. I am not responsible for any malicious activity done through this software, this is a purely educational project aimed at exploring AI.
|
||||
Better deepfake software than this already exist, this is just a hobby project I created to learn about AI. Users are expected to use this program for learning programming and using the software in good faith. Users must get consent from the concerned people before using their face and must not hide the fact that it is a deepfake when posting content online. I am not responsible for malicious behaviour of end-users.
|
||||
|
||||
## Credits
|
||||
- [ffmpeg](https://ffmpeg.org/): for making video related operations easy
|
||||
|
||||
@@ -1,14 +1,21 @@
|
||||
import insightface
|
||||
import core.globals
|
||||
|
||||
face_analyser = insightface.app.FaceAnalysis(name='buffalo_l', providers=core.globals.providers)
|
||||
face_analyser.prepare(ctx_id=0, det_size=(640, 640))
|
||||
FACE_ANALYSER = None
|
||||
|
||||
|
||||
def get_face_analyser():
|
||||
global FACE_ANALYSER
|
||||
if FACE_ANALYSER is None:
|
||||
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=core.globals.providers)
|
||||
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
|
||||
return FACE_ANALYSER
|
||||
|
||||
|
||||
def get_face(img_data):
|
||||
analysed = face_analyser.get(img_data)
|
||||
face = get_face_analyser().get(img_data)
|
||||
try:
|
||||
return sorted(analysed, key=lambda x: x.bbox[0])[0]
|
||||
return sorted(face, key=lambda x: x.bbox[0])[0]
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
|
||||
@@ -3,3 +3,6 @@ import onnxruntime
|
||||
use_gpu = False
|
||||
providers = onnxruntime.get_available_providers()
|
||||
all_faces = False
|
||||
|
||||
if 'TensorrtExecutionProvider' in providers:
|
||||
providers.remove('TensorrtExecutionProvider')
|
||||
|
||||
@@ -3,12 +3,16 @@ import cv2
|
||||
import insightface
|
||||
import core.globals
|
||||
from core.config import get_face
|
||||
from core.utils import rreplace
|
||||
|
||||
if os.path.isfile('inswapper_128.onnx'):
|
||||
face_swapper = insightface.model_zoo.get_model('inswapper_128.onnx', providers=core.globals.providers)
|
||||
else:
|
||||
quit('File "inswapper_128.onnx" does not exist!')
|
||||
FACE_SWAPPER = None
|
||||
|
||||
|
||||
def get_face_swapper():
|
||||
global FACE_SWAPPER
|
||||
if FACE_SWAPPER is None:
|
||||
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../inswapper_128.onnx')
|
||||
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=core.globals.providers)
|
||||
return FACE_SWAPPER
|
||||
|
||||
|
||||
def process_video(source_img, frame_paths):
|
||||
@@ -21,16 +25,15 @@ def process_video(source_img, frame_paths):
|
||||
result = frame
|
||||
for singleFace in all_faces:
|
||||
if singleFace:
|
||||
result = face_swapper.get(result, singleFace, source_face, paste_back=True)
|
||||
result = get_face_swapper().get(result, singleFace, source_face, paste_back=True)
|
||||
print('.', end='', flush=True)
|
||||
else:
|
||||
print('S', end='', flush=True)
|
||||
if result is not None:
|
||||
cv2.imwrite(frame_path, result)
|
||||
cv2.imwrite(frame_path, result)
|
||||
else:
|
||||
face = get_face(frame)
|
||||
if face:
|
||||
result = face_swapper.get(frame, face, source_face, paste_back=True)
|
||||
result = get_face_swapper().get(frame, face, source_face, paste_back=True)
|
||||
cv2.imwrite(frame_path, result)
|
||||
print('.', end='', flush=True)
|
||||
else:
|
||||
@@ -45,6 +48,6 @@ def process_img(source_img, target_path, output_file):
|
||||
frame = cv2.imread(target_path)
|
||||
face = get_face(frame)
|
||||
source_face = get_face(cv2.imread(source_img))
|
||||
result = face_swapper.get(frame, face, source_face, paste_back=True)
|
||||
result = get_face_swapper().get(frame, face, source_face, paste_back=True)
|
||||
cv2.imwrite(output_file, result)
|
||||
print("\n\nImage saved as:", output_file, "\n\n")
|
||||
|
||||
@@ -23,10 +23,10 @@ def detect_fps(input_path):
|
||||
output = os.popen(f'ffprobe -v error -select_streams v -of default=noprint_wrappers=1:nokey=1 -show_entries stream=r_frame_rate "{input_path}"').read()
|
||||
if "/" in output:
|
||||
try:
|
||||
return int(output.split("/")[0]) // int(output.split("/")[1])
|
||||
return int(output.split("/")[0]) // int(output.split("/")[1]), output
|
||||
except:
|
||||
pass
|
||||
return 60
|
||||
return 30, 30
|
||||
|
||||
|
||||
def set_fps(input_path, output_path, fps):
|
||||
@@ -36,7 +36,7 @@ def set_fps(input_path, output_path, fps):
|
||||
|
||||
def create_video(video_name, fps, output_dir):
|
||||
output_dir = path(output_dir)
|
||||
os.system(f'ffmpeg -framerate {fps} -i "{output_dir}{sep}%04d.png" -c:v libx264 -crf 32 -pix_fmt yuv420p -y "{output_dir}{sep}output.mp4"')
|
||||
os.system(f'ffmpeg -framerate "{fps}" -i "{output_dir}{sep}%04d.png" -c:v libx264 -crf 7 -pix_fmt yuv420p -y "{output_dir}{sep}output.mp4"')
|
||||
|
||||
|
||||
def extract_frames(input_path, output_dir):
|
||||
@@ -44,14 +44,13 @@ def extract_frames(input_path, output_dir):
|
||||
os.system(f'ffmpeg -i "{input_path}" "{output_dir}{sep}%04d.png"')
|
||||
|
||||
|
||||
def add_audio(output_dir, target_path, keep_frames, output_file):
|
||||
video = target_path.split("/")[-1]
|
||||
video_name = video.split(".")[0]
|
||||
save_to = output_file if output_file else output_dir + f"/swapped-" + video_name + ".mp4"
|
||||
def add_audio(output_dir, target_path, video, keep_frames, output_file):
|
||||
video_name = os.path.splitext(video)[0]
|
||||
save_to = output_file if output_file else output_dir + "/swapped-" + video_name + ".mp4"
|
||||
save_to_ff, output_dir_ff = path(save_to), path(output_dir)
|
||||
os.system(f'ffmpeg -i "{output_dir_ff}{sep}output.mp4" -i "{output_dir_ff}{sep}{video}" -c:v copy -map 0:v:0 -map 1:a:0 -y "{save_to_ff}"')
|
||||
if not os.path.isfile(save_to):
|
||||
shutil.move(output_dir + f"/output.mp4", save_to)
|
||||
shutil.move(output_dir + "/output.mp4", save_to)
|
||||
if not keep_frames:
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@ onnx==1.14.0
|
||||
insightface==0.7.3
|
||||
psutil==5.9.5
|
||||
tk==0.1.0
|
||||
pillow==9.0.1
|
||||
pillow==9.5.0
|
||||
torch==2.0.1
|
||||
onnxruntime-gpu==1.15.0
|
||||
opennsfw2==0.10.2
|
||||
protobuf==3.20.2
|
||||
|
||||
104
run.py
104
run.py
@@ -1,32 +1,22 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import platform
|
||||
import sys
|
||||
import time
|
||||
import shutil
|
||||
import core.globals
|
||||
|
||||
if not shutil.which('ffmpeg'):
|
||||
print('ffmpeg is not installed. Read the docs: https://github.com/s0md3v/roop#installation.\n' * 10)
|
||||
quit()
|
||||
if '--gpu' not in sys.argv:
|
||||
core.globals.providers = ['CPUExecutionProvider']
|
||||
elif 'ROCMExecutionProvider' not in core.globals.providers:
|
||||
import torch
|
||||
if not torch.cuda.is_available():
|
||||
quit("You are using --gpu flag but CUDA isn't available or properly installed on your system.")
|
||||
if '--all-faces' in sys.argv or '-a' in sys.argv:
|
||||
core.globals.all_faces = True
|
||||
|
||||
|
||||
import glob
|
||||
import argparse
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import random
|
||||
from pathlib import Path
|
||||
import tkinter as tk
|
||||
from tkinter import filedialog
|
||||
from opennsfw2 import predict_image as face_check
|
||||
from tkinter.filedialog import asksaveasfilename
|
||||
import core.globals
|
||||
from core.processor import process_video, process_img
|
||||
from core.utils import is_img, detect_fps, set_fps, create_video, add_audio, extract_frames
|
||||
from core.utils import is_img, detect_fps, set_fps, create_video, add_audio, extract_frames, rreplace
|
||||
from core.config import get_face
|
||||
import webbrowser
|
||||
import psutil
|
||||
@@ -34,6 +24,9 @@ import cv2
|
||||
import threading
|
||||
from PIL import Image, ImageTk
|
||||
|
||||
if 'ROCMExecutionProvider' not in core.globals.providers:
|
||||
import torch
|
||||
|
||||
pool = None
|
||||
args = {}
|
||||
|
||||
@@ -41,22 +34,68 @@ parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-f', '--face', help='use this face', dest='source_img')
|
||||
parser.add_argument('-t', '--target', help='replace this face', dest='target_path')
|
||||
parser.add_argument('-o', '--output', help='save output to this file', dest='output_file')
|
||||
parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False)
|
||||
parser.add_argument('--gpu', help='use gpu', dest='gpu', action='store_true', default=False)
|
||||
parser.add_argument('--keep-fps', help='maintain original fps', dest='keep_fps', action='store_true', default=False)
|
||||
parser.add_argument('--keep-frames', help='keep frames directory', dest='keep_frames', action='store_true', default=False)
|
||||
parser.add_argument('--max-memory', help='set max memory', type=int)
|
||||
parser.add_argument('--max-cores', help='number of cores to use', dest='cores_count', type=int, default=max(psutil.cpu_count() - 2, 2))
|
||||
parser.add_argument('-a', '--all-faces', help='swap all faces in frame', dest='all_faces', default=False)
|
||||
|
||||
for name, value in vars(parser.parse_args()).items():
|
||||
args[name] = value
|
||||
|
||||
|
||||
sep = "/"
|
||||
if os.name == "nt":
|
||||
sep = "\\"
|
||||
|
||||
|
||||
def limit_resources():
|
||||
if args['max_memory']:
|
||||
memory = args['max_memory'] * 1024 * 1024 * 1024
|
||||
if str(platform.system()).lower() == 'windows':
|
||||
import ctypes
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
|
||||
else:
|
||||
import resource
|
||||
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
|
||||
|
||||
|
||||
def pre_check():
|
||||
if sys.version_info < (3, 8):
|
||||
quit('Python version is not supported - please upgrade to 3.8 or higher')
|
||||
if not shutil.which('ffmpeg'):
|
||||
quit('ffmpeg is not installed!')
|
||||
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'inswapper_128.onnx')
|
||||
if not os.path.isfile(model_path):
|
||||
quit('File "inswapper_128.onnx" does not exist!')
|
||||
if '--gpu' in sys.argv:
|
||||
NVIDIA_PROVIDERS = ['CUDAExecutionProvider', 'TensorrtExecutionProvider']
|
||||
if len(list(set(core.globals.providers) - set(NVIDIA_PROVIDERS))) == 1:
|
||||
CUDA_VERSION = torch.version.cuda
|
||||
CUDNN_VERSION = torch.backends.cudnn.version()
|
||||
if not torch.cuda.is_available() or not CUDA_VERSION:
|
||||
quit("You are using --gpu flag but CUDA isn't available or properly installed on your system.")
|
||||
if CUDA_VERSION > '11.8':
|
||||
quit(f"CUDA version {CUDA_VERSION} is not supported - please downgrade to 11.8")
|
||||
if CUDA_VERSION < '11.4':
|
||||
quit(f"CUDA version {CUDA_VERSION} is not supported - please upgrade to 11.8")
|
||||
if CUDNN_VERSION < 8220:
|
||||
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please upgrade to 8.9.1")
|
||||
if CUDNN_VERSION > 8910:
|
||||
quit(f"CUDNN version {CUDNN_VERSION} is not supported - please downgrade to 8.9.1")
|
||||
else:
|
||||
core.globals.providers = ['CPUExecutionProvider']
|
||||
if '--all-faces' in sys.argv or '-a' in sys.argv:
|
||||
core.globals.all_faces = True
|
||||
|
||||
|
||||
def start_processing():
|
||||
start_time = time.time()
|
||||
threshold = len(['frame_args']) if len(args['frame_paths']) <= 10 else 10
|
||||
for i in range(threshold):
|
||||
if face_check(random.choice(args['frame_paths'])) > 0.8:
|
||||
quit("[WARNING] Unable to determine location of the face in the target. Please make sure the target isn't wearing clothes matching to their skin.")
|
||||
if args['gpu']:
|
||||
process_video(args['source_img'], args["frame_paths"])
|
||||
end_time = time.time()
|
||||
@@ -64,7 +103,7 @@ def start_processing():
|
||||
print(f"Processing time: {end_time - start_time:.2f} seconds", flush=True)
|
||||
return
|
||||
frame_paths = args["frame_paths"]
|
||||
n = len(frame_paths)//(psutil.cpu_count()-1)
|
||||
n = len(frame_paths)//(args['cores_count'])
|
||||
processes = []
|
||||
for i in range(0, len(frame_paths), n):
|
||||
p = pool.apply_async(process_video, args=(args['source_img'], frame_paths[i:i+n],))
|
||||
@@ -77,6 +116,7 @@ def start_processing():
|
||||
print(flush=True)
|
||||
print(f"Processing time: {end_time - start_time:.2f} seconds", flush=True)
|
||||
|
||||
|
||||
def preview_image(image_path):
|
||||
img = Image.open(image_path)
|
||||
img = img.resize((180, 180), Image.ANTIALIAS)
|
||||
@@ -150,41 +190,45 @@ def start():
|
||||
print("\n[WARNING] Please select a video/image to swap face in.")
|
||||
return
|
||||
if not args['output_file']:
|
||||
args['output_file'] = rreplace(args['target_path'], "/", "/swapped-", 1) if "/" in target_path else "swapped-"+target_path
|
||||
target_path = args['target_path']
|
||||
args['output_file'] = rreplace(target_path, "/", "/swapped-", 1) if "/" in target_path else "swapped-" + target_path
|
||||
global pool
|
||||
pool = mp.Pool(psutil.cpu_count()-1)
|
||||
pool = mp.Pool(args['cores_count'])
|
||||
target_path = args['target_path']
|
||||
test_face = get_face(cv2.imread(args['source_img']))
|
||||
if not test_face:
|
||||
print("\n[WARNING] No face detected in source image. Please try with another one.\n")
|
||||
return
|
||||
if is_img(target_path):
|
||||
if face_check(target_path) > 0.7:
|
||||
quit("[WARNING] Unable to determine location of the face in the target. Please make sure the target isn't wearing clothes matching to their skin.")
|
||||
process_img(args['source_img'], target_path, args['output_file'])
|
||||
status("swap successful!")
|
||||
return
|
||||
video_name = target_path.split("/")[-1].split(".")[0]
|
||||
output_dir = target_path.replace(target_path.split("/")[-1], "").rstrip("/") + "/" + video_name
|
||||
video_name_full = target_path.split("/")[-1]
|
||||
video_name = os.path.splitext(video_name_full)[0]
|
||||
output_dir = os.path.dirname(target_path) + "/" + video_name
|
||||
Path(output_dir).mkdir(exist_ok=True)
|
||||
status("detecting video's FPS...")
|
||||
fps = detect_fps(target_path)
|
||||
fps, exact_fps = detect_fps(target_path)
|
||||
if not args['keep_fps'] and fps > 30:
|
||||
this_path = output_dir + "/" + video_name + ".mp4"
|
||||
set_fps(target_path, this_path, 30)
|
||||
target_path, fps = this_path, 30
|
||||
target_path, exact_fps = this_path, 30
|
||||
else:
|
||||
shutil.copy(target_path, output_dir)
|
||||
status("extracting frames...")
|
||||
extract_frames(target_path, output_dir)
|
||||
args['frame_paths'] = tuple(sorted(
|
||||
glob.glob(output_dir + f"/*.png"),
|
||||
glob.glob(output_dir + "/*.png"),
|
||||
key=lambda x: int(x.split(sep)[-1].replace(".png", ""))
|
||||
))
|
||||
status("swapping in progress...")
|
||||
start_processing()
|
||||
status("creating video...")
|
||||
create_video(video_name, fps, output_dir)
|
||||
create_video(video_name, exact_fps, output_dir)
|
||||
status("adding audio...")
|
||||
add_audio(output_dir, target_path, args['keep_frames'], args['output_file'])
|
||||
add_audio(output_dir, target_path, video_name_full, args['keep_frames'], args['output_file'])
|
||||
save_path = args['output_file'] if args['output_file'] else output_dir + "/" + video_name + ".mp4"
|
||||
print("\n\nVideo saved as:", save_path, "\n\n")
|
||||
status("swap successful!")
|
||||
@@ -192,6 +236,10 @@ def start():
|
||||
|
||||
if __name__ == "__main__":
|
||||
global status_label, window
|
||||
|
||||
pre_check()
|
||||
limit_resources()
|
||||
|
||||
if args['source_img']:
|
||||
args['cli_mode'] = True
|
||||
start()
|
||||
|
||||
Reference in New Issue
Block a user