From a48590b9679da3e1c47b85afd41f992b6bd101b7 Mon Sep 17 00:00:00 2001 From: OpsecGuy <30886720+OpsecGuy@users.noreply.github.com> Date: Mon, 5 Dec 2022 22:29:15 +0100 Subject: [PATCH] Added Linux support --- app.py | 4 +- config.py | 15 ++--- gui.py | 167 ++++++++++++++++++++++++++++++++++-------------------- logger.py | 4 +- 4 files changed, 116 insertions(+), 74 deletions(-) diff --git a/app.py b/app.py index fa0e3b7..3c00a87 100644 --- a/app.py +++ b/app.py @@ -1,5 +1,5 @@ """Awesome Server Manager v1 - Python 3.11.0""" -import threading + import gui def main(): @@ -7,7 +7,7 @@ def main(): Entry Point """ wnd.create() - threading.Thread(target=wnd.update, daemon=True).start() + gui.threading.Thread(target=wnd.update, daemon=True).start() wnd.run() wnd.destroy() diff --git a/config.py b/config.py index f92320a..fd28606 100644 --- a/config.py +++ b/config.py @@ -26,19 +26,20 @@ def __init__(self) -> None: Config initialization """ print('Config initialization started.') + self.config_file = 'servers.json' + if platform.system() == 'Windows': - self.config_file = 'servers.json' self.config_path = f"{os.getcwd()}\\{self.config_file}" - - if os.path.exists(self.config_path) is False: - self.create_example() - print(f'Could not find {self.config_file}! New config has been created.') + elif platform.system() == 'Linux': - print(f'System LINUX is not supported yet!') - os._exit(0) + self.config_path = f"{os.getcwd()}/{self.config_file}" else: print(f'Your system is not supported!') os._exit(0) + + if os.path.exists(self.config_path) is False: + self.create_example() + print(f'Could not find {self.config_file}! New config has been created.') def create_example(self) -> bool: """ diff --git a/gui.py b/gui.py index 4dcaf25..b6fd937 100644 --- a/gui.py +++ b/gui.py @@ -8,7 +8,7 @@ import requests import config import logger -import enum +import threading class Window(): """Manage visual interface""" @@ -20,7 +20,7 @@ def __init__(self) -> None: print('Window initialization started.') self.cfg = config.Config() self.logger = logger.Logger() - self.__version__ = '1.0.2.1' + self.__version__ = '1.0.2.2' self.functions = { 'Test Connect': self.connect, @@ -39,23 +39,23 @@ def create(self) -> None: Creates window context. """ dpg.create_context() - with dpg.window( label='Window', width=400, height=500, no_title_bar=True, - no_resize=False, + no_resize=True, no_move=True, tag='w_main'): with dpg.tab_bar(label="Menu"): with dpg.tab(label="Main"): - with dpg.group(horizontal=True): - dpg.add_button(label='Refresh List', - tag='b_refresh', - callback=lambda: dpg.configure_item(item='servers_list', - items=self.cfg.get_servers())) + dpg.add_button(label='Refresh List', + width=100, + height=20, + tag='b_refresh', + callback=lambda: dpg.configure_item(item='servers_list', + items=self.cfg.get_servers())) with dpg.group(horizontal=True): dpg.add_listbox(items=self.cfg.get_servers(), num_items=8, @@ -65,7 +65,7 @@ def create(self) -> None: dpg.add_text(label='IP', show=False, color=(0, 255, 0), - tag='ip',) + tag='ip') dpg.add_text(label='Username', show=False, color=(220, 0, 40), @@ -75,26 +75,27 @@ def create(self) -> None: color=(220, 0, 40), tag='password') dpg.add_separator() - dpg.add_input_text(label='Commands File', - default_value='commands', - width=200, - tag='i_commands') + dpg.add_text(default_value='Commands File Name') + dpg.add_input_text(default_value='commands', + width=200, + tag='i_commands') with dpg.group(horizontal=True): - dpg.add_radio_button(label='File extension', + dpg.add_radio_button(label='File Extension', items=['.txt', '.json'], default_value='.txt', horizontal=True, tag='rb_extension') dpg.add_separator() + dpg.add_text(default_value='Chose Operation:') dpg.add_combo(items=tuple(self.functions.keys()), tag='i_function') - dpg.add_button(label='Start', tag='b_start', callback=lambda: self.functions.get(f"{dpg.get_value('i_function')}")()) - dpg.add_button(label='Update Client', - show=False, - tag='b_update', - callback=lambda: webbrowser.open( - 'https://github.com/OpsecGuy/Awesome-Server-Manager/releases' - )) + dpg.add_button(label='Start', + width=80, + height=20, + tag='b_start', + callback=lambda: threading.Thread(target=self.functions.get(f"{dpg.get_value('i_function')}"), + daemon=True).start()) + with dpg.tab(label="Logs"): dpg.add_input_text(label='', readonly=True, @@ -105,10 +106,11 @@ def create(self) -> None: tag='i_logs_area') with dpg.group(horizontal=True): dpg.add_button(label='Clear Logs', - callback=self.logger.reset) + callback=self.logger.flush) + with dpg.tab(label="Settings"): dpg.add_checkbox(label='Show Server Info', tag='c_show_server_info') - dpg.add_input_float(label='Server timeout', + dpg.add_input_float(label='Server Timeout', default_value=3.0, min_value=1.0, max_value=15.0, @@ -118,6 +120,14 @@ def create(self) -> None: width=200, tag='i_server_timeout') + dpg.add_button(label='Check For Updates', + width=140, + height=20, + tag='b_update', + callback=lambda: webbrowser.open( + 'https://github.com/OpsecGuy/Awesome-Server-Manager/releases' + )) + # Tooltips area with dpg.tooltip(parent='c_show_server_info'): dpg.add_text('Shows/Hides server info in Main tab.\nip\nlogin\npassword\n') @@ -130,25 +140,40 @@ def update(self) -> None: """ Keeps GUI updated when changes are done. """ - last_changed = False + last_changed = '' while True: + cmd_file_win = f"{os.getcwd()}\\{dpg.get_value('i_commands') + dpg.get_value('rb_extension')}" + cmd_file_lin = f"{os.getcwd()}/{dpg.get_value('i_commands') + dpg.get_value('rb_extension')}" + + # Check if config file exists if os.path.exists(self.cfg.config_path) is False: self.logger.log(f'Could not find {self.cfg.config_file}.\n\ New config has been created.') self.cfg.create_example() + # Check if command file exists + if config.platform.system() == 'Windows': + if os.path.exists(cmd_file_win) is False: + dpg.configure_item(item='text_color', value=(255, 0, 0)) + else: + dpg.configure_item(item='text_color', value=(0, 255, 0)) + + elif config.platform.system() == 'Linux': + if os.path.exists(cmd_file_lin) is False: + dpg.configure_item(item='text_color', value=(255, 0, 0)) + else: + dpg.configure_item(item='text_color', value=(0, 255, 0)) + + # TO:DO - Call it once if server has changed if dpg.get_value('c_show_server_info') == True: - last_changed = True - if last_changed == True: - self.show_context() - dpg.set_value('ip', self.cfg.get_value(dpg.get_value('servers_list'), 'IP')) - dpg.set_value('username', self.cfg.get_value(dpg.get_value('servers_list'), 'username')) - dpg.set_value('password', self.cfg.get_value(dpg.get_value('servers_list'), 'password')) - else: - last_changed = False - self.hide_context() - - # Static + self.show_server_info() + dpg.set_value('ip', self.cfg.get_value(dpg.get_value('servers_list'), 'IP')) + dpg.set_value('username', self.cfg.get_value(dpg.get_value('servers_list'), 'username')) + dpg.set_value('password', self.cfg.get_value(dpg.get_value('servers_list'), 'password')) + elif dpg.get_value('c_show_server_info') == False: + self.hide_server_info() + + # Static updates dpg.set_value('i_logs_area', '\n'.join(self.logger.logs_buffer)) # Resizable @@ -157,10 +182,6 @@ def update(self) -> None: dpg.configure_item(item='w_main', width=vp_width - 5) dpg.configure_item(item='w_main', height=vp_height - 5) dpg.configure_item(item='i_logs_area', width=vp_width - 30) - - # Version check - if self.get_current_version() != self.__version__: - dpg.configure_item('b_update', show=True) time.sleep(0.001) @@ -168,6 +189,7 @@ def run(self) -> None: """ Execute/Start window thread. """ + self.custom_themes() dpg.create_viewport( title=f'Awesome Server Manager {self.__version__}', height=500, @@ -175,27 +197,41 @@ def run(self) -> None: resizable=True, vsync=True) + dpg.bind_theme('base_theme') + dpg.bind_item_theme('i_commands', 'i_commands_file') dpg.setup_dearpygui() dpg.show_viewport() dpg.start_dearpygui() + def custom_themes(self) -> None: + dpg.add_theme(tag='base_theme') + dpg.add_theme(tag='i_commands_file') + + with dpg.theme_component(parent='base_theme'): + dpg.add_theme_style(dpg.mvStyleVar_WindowTitleAlign, 0.50, 0.50) + dpg.add_theme_color(dpg.mvThemeCol_CheckMark, (0, 255, 0)) + dpg.add_theme_color(dpg.mvThemeCol_ButtonActive,(0, 142, 100, 130)) + + with dpg.theme_component(parent='i_commands_file'): + dpg.add_theme_color(dpg.mvThemeCol_Text, (255, 0, 0, 255), tag='text_color') + def destroy(self) -> None: """ Destroys window context. """ dpg.destroy_context() - def hide_context(self) -> None: + def hide_server_info(self) -> None: """ - Hides given context. + Hides server info. """ dpg.hide_item('ip') dpg.hide_item('username') dpg.hide_item('password') - def show_context(self) -> None: + def show_server_info(self) -> None: """ - Force visibility of wanted context. + Shows server info. """ dpg.show_item('ip') dpg.show_item('username') @@ -261,65 +297,70 @@ def get_current_version(self): Returns: str: current version """ - return requests.get(url='https://raw.githubusercontent.com/OpsecGuy/Awesome-Server-Manager/main/version', - headers={'Cache-Control': 'no-cache', 'Pragma': 'no-cache'}, - timeout=5.0).text.replace('\n', '') + try: + return requests.get(url='https://raw.githubusercontent.com/OpsecGuy/Awesome-Server-Manager/main/version', + headers={'Cache-Control': 'no-cache', 'Pragma': 'no-cache'}, + timeout=5.0).text.replace('\n', '') + except requests.ReadTimeout as err: + pass def connect(self) -> bool: """ Check if it's possible to connect to the server.\n Error log is printed out in GUI. """ + server_ip = self.cfg.get_value(dpg.get_value("servers_list"), "IP") client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.is_valid(protection='light'): - self.logger.log(f'[CONNECT] Connecting to {self.cfg.get_value(dpg.get_value("servers_list"), "IP")}') + self.logger.log(f'[CONNECT] Connecting to {server_ip}') try: - client.connect(hostname=self.cfg.get_value(dpg.get_value("servers_list"), "IP"), + client.connect(hostname=server_ip, port=int(self.cfg.get_value(dpg.get_value('servers_list'), 'port')), username=self.cfg.get_value(dpg.get_value("servers_list"), "username"), password=self.cfg.get_value(dpg.get_value("servers_list"), "password"), timeout=dpg.get_value('i_server_timeout')) client.close() - self.logger.log('[CONNECT] Task Completed!') + self.logger.log(f'[{server_ip}] Task Completed!') except socket.timeout: - self.logger.log('[CONNECT] Fail: Server Timed out!') + self.logger.log(f'[{server_ip}] Fail: Server Timed out!') return False def execute_cmd(self) -> bool: """ - Executes preapered commands to be executed on the server.\n - Creates log file with format: *ip*.txt.\n - If the same server has already log file, old one gonna be flushed. + Executes prepared commands to be used on the server.\n + Creates log file in format: *ip*.txt.\n + If the same server has already log file, previous logs will be flushed. """ + server_ip = self.cfg.get_value(dpg.get_value("servers_list"), "IP") client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.is_valid(protection='full'): - self.logger.log(f'[EXECUTE] Connecting to {self.cfg.get_value(dpg.get_value("servers_list"), "IP")}') + self.logger.log(f'[EXECUTE] Connecting to {server_ip}') try: - client.connect(hostname=self.cfg.get_value(dpg.get_value("servers_list"), "IP"), + client.connect(hostname=server_ip, port=int(self.cfg.get_value(dpg.get_value('servers_list'), 'port')), username=self.cfg.get_value(dpg.get_value("servers_list"), "username"), password=self.cfg.get_value(dpg.get_value("servers_list"), "password"), timeout=dpg.get_value('i_server_timeout')) except socket.timeout: - self.logger.log('[EXECUTE] Fail: Server Timed out!') + self.logger.log(f'[{server_ip}] Fail: Server Timed out!') return try: - with open(f'log_{self.cfg.get_value(dpg.get_value("servers_list"), "IP")}.txt', 'w+', encoding='utf-8') as log_file: - self.logger.log(f'[EXECUTE] Writing server logs to log_{self.cfg.get_value(dpg.get_value("servers_list"), "IP")}.txt') + with open(f'log_{server_ip}.txt', 'w+', encoding='utf-8') as log_file: + self.logger.log(f'[{server_ip}] Writing server logs to log_{self.cfg.get_value(dpg.get_value("servers_list"), "IP")}.txt') try: stdin, stdout, stderr = client.exec_command(self.parse_command()) - self.logger.log('[EXECUTE] Executing commands...') + self.logger.log(f'[{server_ip}] Executing commands. Please wait...') except paramiko.SSHException: - self.logger.log('[EXECUTE] Failed to execute commands!') + self.logger.log(f'[{server_ip}] Failed to execute commands!') return for output in iter(stdout.readline, ''): log_file.writelines(output) except OSError: - self.logger.log('[EXECUTE] Failed open/write to the log file!') + self.logger.log(f'[{server_ip}] Failed open/write to the log file!') client.close() - self.logger.log('[EXECUTE] Task Completed!') + self.logger.log(f'[{server_ip}] Task Completed!') diff --git a/logger.py b/logger.py index 173ac27..d5b8db5 100644 --- a/logger.py +++ b/logger.py @@ -23,8 +23,8 @@ def log(self, text: str) -> str: text = f"{datetime.datetime.now().strftime('%H:%M:%S')} {text}" return self.logs_buffer.append(text) - def reset(self) -> None: + def flush(self) -> None: """ - Reset log buffer. + Flush log buffer. """ self.logs_buffer.clear()