Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

general structure #229

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.idea
.idea/*
**/__pycache__
.env
gf/gf/weights/*
gf/weights/*
app/logs
!app/logs/.gitignore
app/files
!app/files/.gitignore
110 changes: 110 additions & 0 deletions app/aws_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import json
import boto3
from typing import Union, Tuple, Any
from app.settings import AWS_CONFIG
import os
from app.utilities import now, generate_video_path, generate_final_video, generate_s3_video_arn, generate_s3_media_arn
from botocore.exceptions import ClientError
import requests
import time
# from balancer import Balancer
# import shutil

class AWSProcessor:
def __init__(self):
self.sqs_client = boto3.client(
'sqs', aws_access_key_id=AWS_CONFIG['key'], aws_secret_access_key=AWS_CONFIG['secret'], region_name=AWS_CONFIG['region'])
self.s3_client = boto3.client(
's3', aws_access_key_id=AWS_CONFIG['key'], aws_secret_access_key=AWS_CONFIG['secret'])

self.bucket = AWS_CONFIG['bucket']
self.sqs_url = AWS_CONFIG['sqs']
self.upload_bucket = AWS_CONFIG['final_upload_bucket']

def get_sqs_client(self):
return self.sqs_client

def get_s3_client(self):
return self.s3_client

def upload_logs(self, uid=None, instance_id=None):
log_general = 'app/logs/{}_{}.log'.format(now(True), instance_id)
job_log = False
if uid is not None:
job_log = "app/logs/{}.log".format(uid)
try:
self.s3_client.upload_file(log_general, self.bucket, "logs/bg-removal/{}".format(os.path.basename(log_general)))
if uid is not None and job_log:
self.s3_client.upload_file(job_log, self.bucket, "logs/bg-removal/{}".format(os.path.basename(job_log)))
except ClientError as E:
raise Exception(E)

def uplaod_final_video(self, uid, final_video_local):
try:
_, extension = os.path.splitext(final_video_local)
self.s3_client.upload_file(final_video_local, self.upload_bucket, "avatars/users/{}{}".format(uid, extension), ExtraArgs={'ACL': 'public-read'})
return "https://{}.s3.amazonaws.com/avatars/users/{}{}".format(self.upload_bucket, uid, extension)
except ClientError as E:
raise Exception(E)

def delete_sqs_message(self, handler):
self.sqs_client.delete_message(
QueueUrl=self.sqs_url,
ReceiptHandle=handler
)


def get_sqs(self, process_name) -> Union[Tuple[Any, Any], bool]:
"""
This method is responsible for reading AWS SQS queues through aws_process or
Returns:
Union[dict, bool]:
"""
# balancer = Balancer('lipsync')
# if balancer.is_main():
# balancer.create_blocker(process_name)
# elif balancer.main_running():
# time.sleep(30)
# return False, False
# elif not balancer.can_run():
# time.sleep(30)
# return False, False

response = self.sqs_client.receive_message(QueueUrl=self.sqs_url, MaxNumberOfMessages=1, WaitTimeSeconds=2)

for message in response.get('Messages', []):
message_body = message['Body']
sqs_message_handler = message['ReceiptHandle']
# while not balancer.can_run():
# time.sleep(10)
# continue

return json.loads(message_body), sqs_message_handler

# balancer.remove_process(process_name)
return False, False

def generate_full_url(self, arn):
return "https://{}.s3.amazonaws.com/tts/{}.wav".format(self.bucket, arn)

def download_video(self, video, uid):
bucket, arn = generate_s3_media_arn(video)
_, extension = os.path.splitext(video)
video_local = generate_video_path(uid, extension)

try:
self.s3_client.download_file(bucket, arn, video_local)
except Exception as E:
print(E)
return video_local, extension

def file_exists(self, arn):
try:
self.s3_client.head_object(Bucket='mltts', Key=arn)
except ClientError as e:
if e.response['Error']['Code'] == "404":
return False
else:
return False
else:
return True
104 changes: 104 additions & 0 deletions app/balancer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os

from settings import BALANCER, BALANCER_TIMES, BALANCER_SERVER_TYPES, BALANCER_RESOURCES, SERVER_RESOURCES_TOTAL
from utilities import now
from os.path import exists
from datetime import datetime
import nvidia_smi
import psutil
import time


class Balancer:
def __init__(self, current_server):
self.current_server = current_server

def is_main(self):
return self.current_server == BALANCER['main_for']

def create_blocker(self, process_name):
with open('/home/ubuntu/processes/{}/{}'.format(self.current_server, process_name), 'w+') as f:
f.write(now())

def main_running(self):
main_server_processes = len(os.listdir('/home/ubuntu/processes/{}'.format(BALANCER['main_for'])))
return main_server_processes != 0

def can_run_main(self):
processes = 0
for proc in BALANCER_SERVER_TYPES:
if proc == self.current_server:
continue
processes = len(os.listdir('/home/ubuntu/processes/{}'.format(proc))) + \
len(os.listdir('/home/ubuntu/processes/{}'.format(proc))) + \
len(os.listdir('/home/ubuntu/processes/{}'.format(proc)))

print("processes ", processes)
if processes == 0:
return True

return False

def can_run(self):
if self.is_main():
print('main check')
return self.can_run_main()
print('has time {}'.format(self.has_time()))
print('has resource {}'.format(self.has_resource()))
return self.has_time() and self.has_resource()

def remove_process(self, process_name):
process_filename = '/home/ubuntu/processes/{}/{}'.format(self.current_server, process_name)
if exists(process_filename):
os.remove(process_filename)

def has_time(self):
time_info = os.popen('who -b').read()
time = int(time_info.split(':')[-1]) # 22 sarqvela
time_now = int(datetime.now().strftime("%M"))
remaining_time = time + 55 - time_now
return remaining_time >= BALANCER_TIMES[self.current_server]

def has_resource(self):
cpu_use = 0
ram_use = 0
gpu_use = 0
nvidia_smi.nvmlInit()

for i in range(60):
cpu_use += psutil.cpu_percent()

handle = nvidia_smi.nvmlDeviceGetHandleByIndex(0)
info = nvidia_smi.nvmlDeviceGetMemoryInfo(handle)

gpu_use += info.free / (1024 ** 2)
ram_use += psutil.virtual_memory().free / (1024 ** 2)

time.sleep(0.5)

gpu = int(gpu_use / 60)
ram = int(ram_use / 60)
cpu = int(cpu_use / 60)

if gpu < BALANCER_RESOURCES[self.current_server]['gpu'] or ram < BALANCER_RESOURCES[self.current_server]['ram']:
return False

gpu, ram, cpu = self.other_processes_status()
if gpu < BALANCER_RESOURCES[self.current_server]['gpu'] or ram < BALANCER_RESOURCES[self.current_server]['ram']:
return False

return True

def other_processes_status(self):
gpu_free_proc = int(SERVER_RESOURCES_TOTAL['gpu'])
ram_free_proc = int(SERVER_RESOURCES_TOTAL['ram'])
cpu_free_proc = 100

for proc in BALANCER_SERVER_TYPES:
if proc == self.current_server:
continue
gpu_free_proc -= int(len(os.listdir('/home/ubuntu/processes/{}'.format(proc)))) * 1.2 * int(BALANCER_RESOURCES[proc]['gpu'])
ram_free_proc -= int(len(os.listdir('/home/ubuntu/processes/{}'.format(proc)))) * 1.2 * int(BALANCER_RESOURCES[proc]['ram'])
# cpu_free_proc -= int(len(os.listdir('/home/ubuntu/processes/{}'.format(proc)))) * 1.2 * BALANCER_RESOURCES[proc]['cpu']

return gpu_free_proc, ram_free_proc, cpu_free_proc
27 changes: 27 additions & 0 deletions app/bg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import torch
from model import MattingNetwork
from inference import convert_video
from app.utilities import generate_final_video, generate_video_path
from app.settings import VIDEO_CONFIG


def removal(uid, extension, video_local, seq_chunk):
model = MattingNetwork('mobilenetv3').eval().cuda() # or "resnet50"
model.load_state_dict(torch.load('rvm_mobilenetv3.pth'))

final_video = generate_final_video(uid, extension)

convert_video(
model, # The model, can be on any device (cpu or cuda).
input_source=video_local, # A video file or an image sequence directory.
output_type='video', # Choose "video" or "png_sequence"
output_composition=final_video, # File path if video; directory path if png sequence.
# output_alpha="pha.mp4", # [Optional] Output the raw alpha prediction.
# output_foreground="fgr.mp4", # [Optional] Output the raw foreground prediction.
output_video_mbps=4, # Output video mbps. Not needed for png sequence.
downsample_ratio=None, # A hyperparameter to adjust or use None for auto.
seq_chunk=seq_chunk, # Process n frames at once for better parallelism.
)

return final_video

75 changes: 75 additions & 0 deletions app/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os
from dotenv import load_dotenv

load_dotenv()

WEBHOOK_CONFIG = {
'key': os.getenv('API_TOKEN'),
'url': os.getenv('WEBHOOK_URL')
}

RESPONSE_CODES = {}

AWS_CONFIG = {
'key': os.getenv('AWS_KEY'),
'secret': os.getenv('AWS_SECRET'),
'bucket': os.getenv('AWS_BUCKET'),
'region': os.getenv('AWS_REGION'),
'sqs': os.getenv('AWS_SQS_URL'),
'sqs_handler': 'handler.json',
'final_upload_bucket': os.getenv('UPLOAD_BUCKET')
}

SERVER = {
'main_id': os.getenv('MAIN_ID')
}

SHUTDOWN_TIMINGS = {
'minutes': os.getenv('MINUTES'),
'intermediate': os.getenv('INTERMEDIATE')
}

BALANCER = {
'main_for': os.getenv('MAIN_FOR')
}

BALANCER_SERVER_TYPES = ['cloning', 'tts', 'lipsync', 'tortoise']

BALANCER_TIMES = {
'cloning': 420,
'tts': 3,
'lipsync': 20,
'tortoise': 10
}

BALANCER_RESOURCES = {
'cloning': {
'cpu': 100,
'ram': 3550,
'gpu': 3658
},
'tts': {
'cpu': 17,
'ram': 3335,
'gpu': 2060
},
'lipsync': {
'cpu': 100,
'ram': 22000,
'gpu': 14000
},
'tortoise': {
'cpu': 77,
'ram': 9582,
'gpu': 15200
}
}

SERVER_RESOURCES_TOTAL = {
'gpu': os.getenv('GPU_TOTAL'),
'ram': os.getenv('RAM_TOTAL')
}

VIDEO_CONFIG = {
'seq_chunk': os.getenv('SEQ_CHUNK')
}
Loading