Skip to content

Commit

Permalink
Added Linux support
Browse files Browse the repository at this point in the history
  • Loading branch information
OpsecGuy authored Dec 5, 2022
1 parent 8ce17ff commit a48590b
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 74 deletions.
4 changes: 2 additions & 2 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""Awesome Server Manager v1 - Python 3.11.0"""
import threading

import gui

def main():
"""
Entry Point
"""
wnd.create()
threading.Thread(target=wnd.update, daemon=True).start()
gui.threading.Thread(target=wnd.update, daemon=True).start()
wnd.run()
wnd.destroy()

Expand Down
15 changes: 8 additions & 7 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ def __init__(self) -> None:
Config initialization
"""
print('Config initialization started.')
self.config_file = 'servers.json'

if platform.system() == 'Windows':
self.config_file = 'servers.json'
self.config_path = f"{os.getcwd()}\\{self.config_file}"

if os.path.exists(self.config_path) is False:
self.create_example()
print(f'Could not find {self.config_file}! New config has been created.')

elif platform.system() == 'Linux':
print(f'System LINUX is not supported yet!')
os._exit(0)
self.config_path = f"{os.getcwd()}/{self.config_file}"
else:
print(f'Your system is not supported!')
os._exit(0)

if os.path.exists(self.config_path) is False:
self.create_example()
print(f'Could not find {self.config_file}! New config has been created.')

def create_example(self) -> bool:
"""
Expand Down
167 changes: 104 additions & 63 deletions gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import requests
import config
import logger
import enum
import threading

class Window():
"""Manage visual interface"""
Expand All @@ -20,7 +20,7 @@ def __init__(self) -> None:
print('Window initialization started.')
self.cfg = config.Config()
self.logger = logger.Logger()
self.__version__ = '1.0.2.1'
self.__version__ = '1.0.2.2'

self.functions = {
'Test Connect': self.connect,
Expand All @@ -39,23 +39,23 @@ def create(self) -> None:
Creates window context.
"""
dpg.create_context()

with dpg.window(
label='Window',
width=400,
height=500,
no_title_bar=True,
no_resize=False,
no_resize=True,
no_move=True,
tag='w_main'):

with dpg.tab_bar(label="Menu"):
with dpg.tab(label="Main"):
with dpg.group(horizontal=True):
dpg.add_button(label='Refresh List',
tag='b_refresh',
callback=lambda: dpg.configure_item(item='servers_list',
items=self.cfg.get_servers()))
dpg.add_button(label='Refresh List',
width=100,
height=20,
tag='b_refresh',
callback=lambda: dpg.configure_item(item='servers_list',
items=self.cfg.get_servers()))
with dpg.group(horizontal=True):
dpg.add_listbox(items=self.cfg.get_servers(),
num_items=8,
Expand All @@ -65,7 +65,7 @@ def create(self) -> None:
dpg.add_text(label='IP',
show=False,
color=(0, 255, 0),
tag='ip',)
tag='ip')
dpg.add_text(label='Username',
show=False,
color=(220, 0, 40),
Expand All @@ -75,26 +75,27 @@ def create(self) -> None:
color=(220, 0, 40),
tag='password')
dpg.add_separator()
dpg.add_input_text(label='Commands File',
default_value='commands',
width=200,
tag='i_commands')
dpg.add_text(default_value='Commands File Name')
dpg.add_input_text(default_value='commands',
width=200,
tag='i_commands')

with dpg.group(horizontal=True):
dpg.add_radio_button(label='File extension',
dpg.add_radio_button(label='File Extension',
items=['.txt', '.json'],
default_value='.txt',
horizontal=True,
tag='rb_extension')
dpg.add_separator()
dpg.add_text(default_value='Chose Operation:')
dpg.add_combo(items=tuple(self.functions.keys()), tag='i_function')
dpg.add_button(label='Start', tag='b_start', callback=lambda: self.functions.get(f"{dpg.get_value('i_function')}")())
dpg.add_button(label='Update Client',
show=False,
tag='b_update',
callback=lambda: webbrowser.open(
'https://github.com/OpsecGuy/Awesome-Server-Manager/releases'
))
dpg.add_button(label='Start',
width=80,
height=20,
tag='b_start',
callback=lambda: threading.Thread(target=self.functions.get(f"{dpg.get_value('i_function')}"),
daemon=True).start())

with dpg.tab(label="Logs"):
dpg.add_input_text(label='',
readonly=True,
Expand All @@ -105,10 +106,11 @@ def create(self) -> None:
tag='i_logs_area')
with dpg.group(horizontal=True):
dpg.add_button(label='Clear Logs',
callback=self.logger.reset)
callback=self.logger.flush)

with dpg.tab(label="Settings"):
dpg.add_checkbox(label='Show Server Info', tag='c_show_server_info')
dpg.add_input_float(label='Server timeout',
dpg.add_input_float(label='Server Timeout',
default_value=3.0,
min_value=1.0,
max_value=15.0,
Expand All @@ -118,6 +120,14 @@ def create(self) -> None:
width=200,
tag='i_server_timeout')

dpg.add_button(label='Check For Updates',
width=140,
height=20,
tag='b_update',
callback=lambda: webbrowser.open(
'https://github.com/OpsecGuy/Awesome-Server-Manager/releases'
))

# Tooltips area
with dpg.tooltip(parent='c_show_server_info'):
dpg.add_text('Shows/Hides server info in Main tab.\nip\nlogin\npassword\n')
Expand All @@ -130,25 +140,40 @@ def update(self) -> None:
"""
Keeps GUI updated when changes are done.
"""
last_changed = False
last_changed = ''
while True:
cmd_file_win = f"{os.getcwd()}\\{dpg.get_value('i_commands') + dpg.get_value('rb_extension')}"
cmd_file_lin = f"{os.getcwd()}/{dpg.get_value('i_commands') + dpg.get_value('rb_extension')}"

# Check if config file exists
if os.path.exists(self.cfg.config_path) is False:
self.logger.log(f'Could not find {self.cfg.config_file}.\n\
New config has been created.')
self.cfg.create_example()

# Check if command file exists
if config.platform.system() == 'Windows':
if os.path.exists(cmd_file_win) is False:
dpg.configure_item(item='text_color', value=(255, 0, 0))
else:
dpg.configure_item(item='text_color', value=(0, 255, 0))

elif config.platform.system() == 'Linux':
if os.path.exists(cmd_file_lin) is False:
dpg.configure_item(item='text_color', value=(255, 0, 0))
else:
dpg.configure_item(item='text_color', value=(0, 255, 0))

# TO:DO - Call it once if server has changed
if dpg.get_value('c_show_server_info') == True:
last_changed = True
if last_changed == True:
self.show_context()
dpg.set_value('ip', self.cfg.get_value(dpg.get_value('servers_list'), 'IP'))
dpg.set_value('username', self.cfg.get_value(dpg.get_value('servers_list'), 'username'))
dpg.set_value('password', self.cfg.get_value(dpg.get_value('servers_list'), 'password'))
else:
last_changed = False
self.hide_context()

# Static
self.show_server_info()
dpg.set_value('ip', self.cfg.get_value(dpg.get_value('servers_list'), 'IP'))
dpg.set_value('username', self.cfg.get_value(dpg.get_value('servers_list'), 'username'))
dpg.set_value('password', self.cfg.get_value(dpg.get_value('servers_list'), 'password'))
elif dpg.get_value('c_show_server_info') == False:
self.hide_server_info()

# Static updates
dpg.set_value('i_logs_area', '\n'.join(self.logger.logs_buffer))

# Resizable
Expand All @@ -157,45 +182,56 @@ def update(self) -> None:
dpg.configure_item(item='w_main', width=vp_width - 5)
dpg.configure_item(item='w_main', height=vp_height - 5)
dpg.configure_item(item='i_logs_area', width=vp_width - 30)

# Version check
if self.get_current_version() != self.__version__:
dpg.configure_item('b_update', show=True)

time.sleep(0.001)

def run(self) -> None:
"""
Execute/Start window thread.
"""
self.custom_themes()
dpg.create_viewport(
title=f'Awesome Server Manager {self.__version__}',
height=500,
width=400,
resizable=True,
vsync=True)

dpg.bind_theme('base_theme')
dpg.bind_item_theme('i_commands', 'i_commands_file')
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.start_dearpygui()

def custom_themes(self) -> None:
dpg.add_theme(tag='base_theme')
dpg.add_theme(tag='i_commands_file')

with dpg.theme_component(parent='base_theme'):
dpg.add_theme_style(dpg.mvStyleVar_WindowTitleAlign, 0.50, 0.50)
dpg.add_theme_color(dpg.mvThemeCol_CheckMark, (0, 255, 0))
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive,(0, 142, 100, 130))

with dpg.theme_component(parent='i_commands_file'):
dpg.add_theme_color(dpg.mvThemeCol_Text, (255, 0, 0, 255), tag='text_color')

def destroy(self) -> None:
"""
Destroys window context.
"""
dpg.destroy_context()

def hide_context(self) -> None:
def hide_server_info(self) -> None:
"""
Hides given context.
Hides server info.
"""
dpg.hide_item('ip')
dpg.hide_item('username')
dpg.hide_item('password')

def show_context(self) -> None:
def show_server_info(self) -> None:
"""
Force visibility of wanted context.
Shows server info.
"""
dpg.show_item('ip')
dpg.show_item('username')
Expand Down Expand Up @@ -261,65 +297,70 @@ def get_current_version(self):
Returns:
str: current version
"""
return requests.get(url='https://raw.githubusercontent.com/OpsecGuy/Awesome-Server-Manager/main/version',
headers={'Cache-Control': 'no-cache', 'Pragma': 'no-cache'},
timeout=5.0).text.replace('\n', '')
try:
return requests.get(url='https://raw.githubusercontent.com/OpsecGuy/Awesome-Server-Manager/main/version',
headers={'Cache-Control': 'no-cache', 'Pragma': 'no-cache'},
timeout=5.0).text.replace('\n', '')
except requests.ReadTimeout as err:
pass

def connect(self) -> bool:
"""
Check if it's possible to connect to the server.\n
Error log is printed out in GUI.
"""
server_ip = self.cfg.get_value(dpg.get_value("servers_list"), "IP")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.is_valid(protection='light'):
self.logger.log(f'[CONNECT] Connecting to {self.cfg.get_value(dpg.get_value("servers_list"), "IP")}')
self.logger.log(f'[CONNECT] Connecting to {server_ip}')
try:
client.connect(hostname=self.cfg.get_value(dpg.get_value("servers_list"), "IP"),
client.connect(hostname=server_ip,
port=int(self.cfg.get_value(dpg.get_value('servers_list'), 'port')),
username=self.cfg.get_value(dpg.get_value("servers_list"), "username"),
password=self.cfg.get_value(dpg.get_value("servers_list"), "password"),
timeout=dpg.get_value('i_server_timeout'))
client.close()
self.logger.log('[CONNECT] Task Completed!')
self.logger.log(f'[{server_ip}] Task Completed!')
except socket.timeout:
self.logger.log('[CONNECT] Fail: Server Timed out!')
self.logger.log(f'[{server_ip}] Fail: Server Timed out!')
return False

def execute_cmd(self) -> bool:
"""
Executes preapered commands to be executed on the server.\n
Creates log file with format: *ip*.txt.\n
If the same server has already log file, old one gonna be flushed.
Executes prepared commands to be used on the server.\n
Creates log file in format: *ip*.txt.\n
If the same server has already log file, previous logs will be flushed.
"""
server_ip = self.cfg.get_value(dpg.get_value("servers_list"), "IP")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.is_valid(protection='full'):
self.logger.log(f'[EXECUTE] Connecting to {self.cfg.get_value(dpg.get_value("servers_list"), "IP")}')
self.logger.log(f'[EXECUTE] Connecting to {server_ip}')
try:
client.connect(hostname=self.cfg.get_value(dpg.get_value("servers_list"), "IP"),
client.connect(hostname=server_ip,
port=int(self.cfg.get_value(dpg.get_value('servers_list'), 'port')),
username=self.cfg.get_value(dpg.get_value("servers_list"), "username"),
password=self.cfg.get_value(dpg.get_value("servers_list"), "password"),
timeout=dpg.get_value('i_server_timeout'))
except socket.timeout:
self.logger.log('[EXECUTE] Fail: Server Timed out!')
self.logger.log(f'[{server_ip}] Fail: Server Timed out!')
return

try:
with open(f'log_{self.cfg.get_value(dpg.get_value("servers_list"), "IP")}.txt', 'w+', encoding='utf-8') as log_file:
self.logger.log(f'[EXECUTE] Writing server logs to log_{self.cfg.get_value(dpg.get_value("servers_list"), "IP")}.txt')
with open(f'log_{server_ip}.txt', 'w+', encoding='utf-8') as log_file:
self.logger.log(f'[{server_ip}] Writing server logs to log_{self.cfg.get_value(dpg.get_value("servers_list"), "IP")}.txt')
try:
stdin, stdout, stderr = client.exec_command(self.parse_command())
self.logger.log('[EXECUTE] Executing commands...')
self.logger.log(f'[{server_ip}] Executing commands. Please wait...')
except paramiko.SSHException:
self.logger.log('[EXECUTE] Failed to execute commands!')
self.logger.log(f'[{server_ip}] Failed to execute commands!')
return

for output in iter(stdout.readline, ''):
log_file.writelines(output)
except OSError:
self.logger.log('[EXECUTE] Failed open/write to the log file!')
self.logger.log(f'[{server_ip}] Failed open/write to the log file!')

client.close()
self.logger.log('[EXECUTE] Task Completed!')
self.logger.log(f'[{server_ip}] Task Completed!')
4 changes: 2 additions & 2 deletions logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def log(self, text: str) -> str:
text = f"{datetime.datetime.now().strftime('%H:%M:%S')} {text}"
return self.logs_buffer.append(text)

def reset(self) -> None:
def flush(self) -> None:
"""
Reset log buffer.
Flush log buffer.
"""
self.logs_buffer.clear()

0 comments on commit a48590b

Please sign in to comment.