From 6c58f9700c233b5279177492586ed6da08a1fb3c Mon Sep 17 00:00:00 2001 From: DDSRem <73049927+DDSRem@users.noreply.github.com> Date: Sat, 28 Dec 2024 10:57:35 +0800 Subject: [PATCH] fix: code pylint formatting --- aliyuntvtoken_connector/main.py | 13 ++-- glue_python/aliyuntoken/aliyuntoken.py | 65 +++++++++-------- glue_python/aliyuntoken/aliyuntoken_nn.ci.py | 65 +++++++++-------- glue_python/aliyuntoken/aliyuntoken_vercel.py | 73 ++++++++++--------- 4 files changed, 116 insertions(+), 100 deletions(-) diff --git a/aliyuntvtoken_connector/main.py b/aliyuntvtoken_connector/main.py index 58c238be9f..c46fdffcc8 100644 --- a/aliyuntvtoken_connector/main.py +++ b/aliyuntvtoken_connector/main.py @@ -1,13 +1,14 @@ -from flask import Flask, request, Response -from Crypto.Cipher import AES -from Crypto.Util.Padding import unpad import json import base64 -import requests import uuid import hashlib import random +import requests +from flask import Flask, request, Response +from Crypto.Cipher import AES +from Crypto.Util.Padding import unpad + app = Flask(__name__) headers = { @@ -71,11 +72,11 @@ def oauth_token(): "refresh_token": refresh_token } - timestamp = str(requests.get('http://api.extscreen.com/timestamp').json()['data']['timestamp']) + timestamp = str(requests.get('http://api.extscreen.com/timestamp', timeout=10).json()['data']['timestamp']) unique_id = uuid.uuid4().hex wifimac = str(random.randint(10**11, 10**12 - 1)) - resp = requests.post("http://api.extscreen.com/aliyundrive/v3/token", data=req_body, headers={**get_params(timestamp, unique_id, wifimac), **headers}) # noqa: E501 + resp = requests.post("http://api.extscreen.com/aliyundrive/v3/token", data=req_body, headers={**get_params(timestamp, unique_id, wifimac), **headers}, timeout=10) # noqa: E501 if resp.status_code == 200: resp_data = resp.json() ciphertext = resp_data["data"]["ciphertext"] diff --git a/glue_python/aliyuntoken/aliyuntoken.py b/glue_python/aliyuntoken/aliyuntoken.py index 5f5bbe409d..dbac1ad42c 100644 --- a/glue_python/aliyuntoken/aliyuntoken.py +++ b/glue_python/aliyuntoken/aliyuntoken.py @@ -2,24 +2,25 @@ import json import base64 -import requests import time import logging import os import threading import sys -import qrcode import argparse + +import requests +import qrcode from flask import Flask, send_file, render_template, jsonify app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 if sys.platform.startswith('win32'): - qrcode_dir = 'qrcode.png' + QRCODE_DIR = 'qrcode.png' else: - qrcode_dir= '/aliyuntoken/qrcode.png' + QRCODE_DIR= '/aliyuntoken/qrcode.png' headers = { @@ -39,24 +40,28 @@ } -def poll_qrcode_status(data, log_print): - global last_status +# pylint: disable=W0603 +def poll_qrcode_status(_data, log_print): + """ + 循环等待扫码 + """ + global LAST_STATUS while True: - re = requests.post('https://api.xhofe.top/alist/ali/ck', json=data, headers=headers) - if re.status_code == 200: - re_data = json.loads(re.text) - if re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED': - h = re_data['content']['data']['bizExt'] + _re = requests.post('https://api.xhofe.top/alist/ali/ck', json=_data, headers=headers, timeout=10) + if _re.status_code == 200: + _re_data = json.loads(_re.text) + if _re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED': + h = _re_data['content']['data']['bizExt'] c = json.loads(base64.b64decode(h).decode('gbk')) refresh_token = c['pds_login_result']['refreshToken'] if sys.platform.startswith('win32'): - with open('mytoken.txt', 'w') as f: + with open('mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) else: - with open('/data/mytoken.txt', 'w') as f: + with open('/data/mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) logging.info('扫码成功, refresh_token 已写入文件!') - last_status = 1 + LAST_STATUS = 1 break else: if log_print: @@ -71,14 +76,14 @@ def index(): @app.route('/image') def serve_image(): - return send_file(qrcode_dir, mimetype='image/png') + return send_file(QRCODE_DIR, mimetype='image/png') @app.route('/status') def status(): - if last_status == 1: + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) @@ -86,21 +91,21 @@ def status(): @app.route('/shutdown_server', methods=['GET']) def shutdown(): - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) if __name__ == '__main__': - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) parser = argparse.ArgumentParser(description='AliyunPan Refresh Token') parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式') args = parser.parse_args() logging.info('二维码生成中...') re_count = 0 while True: - re = requests.get('https://api.xhofe.top/alist/ali/qr', headers=headers) + re = requests.get('https://api.xhofe.top/alist/ali/qr', headers=headers, timeout=10) if re.status_code == 200: re_data = json.loads(re.content) t = str(re_data['content']['data']['t']) @@ -111,8 +116,8 @@ def shutdown(): qr.add_data(codeContent) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") - img.save(qrcode_dir) - if os.path.isfile(qrcode_dir): + img.save(QRCODE_DIR) + if os.path.isfile(QRCODE_DIR): logging.info('二维码生成完成!') break time.sleep(1) @@ -127,13 +132,13 @@ def shutdown(): threading.Thread(target=poll_qrcode_status, args=(data, False)).start() logging.info('请打开阿里云盘扫描此二维码!') qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1: + while LAST_STATUS != 1: time.sleep(1) - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) else: logging.error('未知的扫码模式') - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(1) diff --git a/glue_python/aliyuntoken/aliyuntoken_nn.ci.py b/glue_python/aliyuntoken/aliyuntoken_nn.ci.py index d2dcaf262d..858fbde79a 100644 --- a/glue_python/aliyuntoken/aliyuntoken_nn.ci.py +++ b/glue_python/aliyuntoken/aliyuntoken_nn.ci.py @@ -2,44 +2,49 @@ import json import base64 -import requests import time import logging import os import threading import sys -import qrcode import argparse + +import requests +import qrcode from flask import Flask, send_file, render_template, jsonify app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 if sys.platform.startswith('win32'): - qrcode_dir = 'qrcode.png' + QRCODE_DIR = 'qrcode.png' else: - qrcode_dir= '/aliyuntoken/qrcode.png' + QRCODE_DIR= '/aliyuntoken/qrcode.png' -def poll_qrcode_status(data, log_print): - global last_status +# pylint: disable=W0603 +def poll_qrcode_status(_data, log_print): + """ + 循环等待扫码 + """ + global LAST_STATUS while True: - re = requests.post('https://api-cf.nn.ci/alist/ali/ck', json=data) - if re.status_code == 200: - re_data = json.loads(re.text) - if re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED': - h = re_data['content']['data']['bizExt'] + _re = requests.post('https://api-cf.nn.ci/alist/ali/ck', json=_data, timeout=10) + if _re.status_code == 200: + _re_data = json.loads(_re.text) + if _re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED': + h = _re_data['content']['data']['bizExt'] c = json.loads(base64.b64decode(h).decode('gbk')) refresh_token = c['pds_login_result']['refreshToken'] if sys.platform.startswith('win32'): - with open('mytoken.txt', 'w') as f: + with open('mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) else: - with open('/data/mytoken.txt', 'w') as f: + with open('/data/mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) logging.info('扫码成功, refresh_token 已写入文件!') - last_status = 1 + LAST_STATUS = 1 break else: if log_print: @@ -54,14 +59,14 @@ def index(): @app.route('/image') def serve_image(): - return send_file(qrcode_dir, mimetype='image/png') + return send_file(QRCODE_DIR, mimetype='image/png') @app.route('/status') def status(): - if last_status == 1: + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) @@ -69,21 +74,21 @@ def status(): @app.route('/shutdown_server', methods=['GET']) def shutdown(): - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) if __name__ == '__main__': - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) parser = argparse.ArgumentParser(description='AliyunPan Refresh Token') parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式') args = parser.parse_args() logging.info('二维码生成中...') re_count = 0 while True: - re = requests.get('https://api-cf.nn.ci/alist/ali/qr') + re = requests.get('https://api-cf.nn.ci/alist/ali/qr', timeout=10) if re.status_code == 200: re_data = json.loads(re.content) t = str(re_data['content']['data']['t']) @@ -94,8 +99,8 @@ def shutdown(): qr.add_data(codeContent) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") - img.save(qrcode_dir) - if os.path.isfile(qrcode_dir): + img.save(QRCODE_DIR) + if os.path.isfile(QRCODE_DIR): logging.info('二维码生成完成!') break time.sleep(1) @@ -110,13 +115,13 @@ def shutdown(): threading.Thread(target=poll_qrcode_status, args=(data, False)).start() logging.info('请打开阿里云盘扫描此二维码!') qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1: + while LAST_STATUS != 1: time.sleep(1) - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) else: logging.error('未知的扫码模式') - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(1) diff --git a/glue_python/aliyuntoken/aliyuntoken_vercel.py b/glue_python/aliyuntoken/aliyuntoken_vercel.py index d0c7c61195..5160a410ee 100644 --- a/glue_python/aliyuntoken/aliyuntoken_vercel.py +++ b/glue_python/aliyuntoken/aliyuntoken_vercel.py @@ -1,48 +1,53 @@ #!/usr/local/bin/python3 import json -import requests import time import logging import os import threading import sys import argparse + +import requests import qrcode from flask import Flask, send_file, render_template, jsonify app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 if sys.platform.startswith('win32'): - qrcode_dir = 'qrcode.png' + QRCODE_DIR = 'qrcode.png' else: - qrcode_dir= '/aliyuntoken/qrcode.png' + QRCODE_DIR= '/aliyuntoken/qrcode.png' -def poll_qrcode_status(data, log_print): - global last_status - ck = str(data['ck']) - t = str(data['t']) +# pylint: disable=W0603 +def poll_qrcode_status(_data, log_print): + """ + 循环等待扫码 + """ + global LAST_STATUS + _ck = str(_data['ck']) + _t = str(_data['t']) while True: - re = requests.get(f'https://aliyuntoken.vercel.app/api/state-query?ck={ck}&t={t}') - if re.status_code == 200: - re_data = json.loads(re.text) - if re_data['data']['qrCodeStatus'] == 'CONFIRMED': - refresh_token = re_data['data']['bizExt']['pds_login_result']['refreshToken'] + _re = requests.get(f'https://aliyuntoken.vercel.app/api/state-query?ck={_ck}&t={_t}', timeout=10) + if _re.status_code == 200: + _re_data = json.loads(_re.text) + if _re_data['data']['qrCodeStatus'] == 'CONFIRMED': + refresh_token = _re_data['data']['bizExt']['pds_login_result']['refreshToken'] if sys.platform.startswith('win32'): - with open('mytoken.txt', 'w') as f: + with open('mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) else: - with open('/data/mytoken.txt', 'w') as f: + with open('/data/mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) logging.info('扫码成功, refresh_token 已写入文件!') - last_status = 1 + LAST_STATUS = 1 break - elif re_data['data']['qrCodeStatus'] == 'EXPIRED': + elif _re_data['data']['qrCodeStatus'] == 'EXPIRED': logging.error('二维码无效或已过期!') - last_status = 2 + LAST_STATUS = 2 break else: if log_print: @@ -57,14 +62,14 @@ def index(): @app.route('/image') def serve_image(): - return send_file(qrcode_dir, mimetype='image/png') + return send_file(QRCODE_DIR, mimetype='image/png') @app.route('/status') def status(): - if last_status == 1: + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) @@ -72,21 +77,21 @@ def status(): @app.route('/shutdown_server', methods=['GET']) def shutdown(): - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) if __name__ == '__main__': - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) parser = argparse.ArgumentParser(description='AliyunPan Refresh Token') parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式') args = parser.parse_args() logging.info('二维码生成中...') re_count = 0 while True: - re = requests.get('https://aliyuntoken.vercel.app/api/generate') + re = requests.get('https://aliyuntoken.vercel.app/api/generate', timeout=10) if re.status_code == 200: re_data = json.loads(re.content) t = str(re_data['t']) @@ -97,8 +102,8 @@ def shutdown(): qr.add_data(codeContent) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") - img.save(qrcode_dir) - if os.path.isfile(qrcode_dir): + img.save(QRCODE_DIR) + if os.path.isfile(QRCODE_DIR): logging.info('二维码生成完成!') break time.sleep(1) @@ -112,15 +117,15 @@ def shutdown(): elif args.qrcode_mode == 'shell': threading.Thread(target=poll_qrcode_status, args=(data, False)).start() qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1 and last_status != 2: + while LAST_STATUS != 1 and LAST_STATUS != 2: time.sleep(1) - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) - if last_status == 2: + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) + if LAST_STATUS == 2: os._exit(1) os._exit(0) else: logging.error('未知的扫码模式') - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(1)