Skip to content

Commit

Permalink
fix: code pylint formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
DDSRem committed Dec 28, 2024
1 parent 3db4419 commit 6c58f97
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 100 deletions.
13 changes: 7 additions & 6 deletions aliyuntvtoken_connector/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from flask import Flask, request, Response
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
import json
import base64
import requests
import uuid
import hashlib
import random

import requests
from flask import Flask, request, Response
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad


app = Flask(__name__)
headers = {
Expand Down Expand Up @@ -71,11 +72,11 @@ def oauth_token():
"refresh_token": refresh_token
}

timestamp = str(requests.get('http://api.extscreen.com/timestamp').json()['data']['timestamp'])
timestamp = str(requests.get('http://api.extscreen.com/timestamp', timeout=10).json()['data']['timestamp'])
unique_id = uuid.uuid4().hex
wifimac = str(random.randint(10**11, 10**12 - 1))

resp = requests.post("http://api.extscreen.com/aliyundrive/v3/token", data=req_body, headers={**get_params(timestamp, unique_id, wifimac), **headers}) # noqa: E501
resp = requests.post("http://api.extscreen.com/aliyundrive/v3/token", data=req_body, headers={**get_params(timestamp, unique_id, wifimac), **headers}, timeout=10) # noqa: E501
if resp.status_code == 200:
resp_data = resp.json()
ciphertext = resp_data["data"]["ciphertext"]
Expand Down
65 changes: 35 additions & 30 deletions glue_python/aliyuntoken/aliyuntoken.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,25 @@

import json
import base64
import requests
import time
import logging
import os
import threading
import sys
import qrcode
import argparse

import requests
import qrcode
from flask import Flask, send_file, render_template, jsonify


app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
last_status = 0
LAST_STATUS = 0
if sys.platform.startswith('win32'):
qrcode_dir = 'qrcode.png'
QRCODE_DIR = 'qrcode.png'
else:
qrcode_dir= '/aliyuntoken/qrcode.png'
QRCODE_DIR= '/aliyuntoken/qrcode.png'


headers = {
Expand All @@ -39,24 +40,28 @@
}


def poll_qrcode_status(data, log_print):
global last_status
# pylint: disable=W0603
def poll_qrcode_status(_data, log_print):
"""
循环等待扫码
"""
global LAST_STATUS
while True:
re = requests.post('https://api.xhofe.top/alist/ali/ck', json=data, headers=headers)
if re.status_code == 200:
re_data = json.loads(re.text)
if re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED':
h = re_data['content']['data']['bizExt']
_re = requests.post('https://api.xhofe.top/alist/ali/ck', json=_data, headers=headers, timeout=10)
if _re.status_code == 200:
_re_data = json.loads(_re.text)
if _re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED':
h = _re_data['content']['data']['bizExt']
c = json.loads(base64.b64decode(h).decode('gbk'))
refresh_token = c['pds_login_result']['refreshToken']
if sys.platform.startswith('win32'):
with open('mytoken.txt', 'w') as f:
with open('mytoken.txt', 'w', encoding='utf-8') as f:
f.write(refresh_token)
else:
with open('/data/mytoken.txt', 'w') as f:
with open('/data/mytoken.txt', 'w', encoding='utf-8') as f:
f.write(refresh_token)
logging.info('扫码成功, refresh_token 已写入文件!')
last_status = 1
LAST_STATUS = 1
break
else:
if log_print:
Expand All @@ -71,36 +76,36 @@ def index():

@app.route('/image')
def serve_image():
return send_file(qrcode_dir, mimetype='image/png')
return send_file(QRCODE_DIR, mimetype='image/png')


@app.route('/status')
def status():
if last_status == 1:
if LAST_STATUS == 1:
return jsonify({'status': 'success'})
elif last_status == 2:
elif LAST_STATUS == 2:
return jsonify({'status': 'failure'})
else:
return jsonify({'status': 'unknown'})


@app.route('/shutdown_server', methods=['GET'])
def shutdown():
if os.path.isfile(qrcode_dir):
os.remove(qrcode_dir)
if os.path.isfile(QRCODE_DIR):
os.remove(QRCODE_DIR)
os._exit(0)


if __name__ == '__main__':
if os.path.isfile(qrcode_dir):
os.remove(qrcode_dir)
if os.path.isfile(QRCODE_DIR):
os.remove(QRCODE_DIR)
parser = argparse.ArgumentParser(description='AliyunPan Refresh Token')
parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式')
args = parser.parse_args()
logging.info('二维码生成中...')
re_count = 0
while True:
re = requests.get('https://api.xhofe.top/alist/ali/qr', headers=headers)
re = requests.get('https://api.xhofe.top/alist/ali/qr', headers=headers, timeout=10)
if re.status_code == 200:
re_data = json.loads(re.content)
t = str(re_data['content']['data']['t'])
Expand All @@ -111,8 +116,8 @@ def shutdown():
qr.add_data(codeContent)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img.save(qrcode_dir)
if os.path.isfile(qrcode_dir):
img.save(QRCODE_DIR)
if os.path.isfile(QRCODE_DIR):
logging.info('二维码生成完成!')
break
time.sleep(1)
Expand All @@ -127,13 +132,13 @@ def shutdown():
threading.Thread(target=poll_qrcode_status, args=(data, False)).start()
logging.info('请打开阿里云盘扫描此二维码!')
qr.print_ascii(invert=True, tty=sys.stdout.isatty())
while last_status != 1:
while LAST_STATUS != 1:
time.sleep(1)
if os.path.isfile(qrcode_dir):
os.remove(qrcode_dir)
if os.path.isfile(QRCODE_DIR):
os.remove(QRCODE_DIR)
os._exit(0)
else:
logging.error('未知的扫码模式')
if os.path.isfile(qrcode_dir):
os.remove(qrcode_dir)
if os.path.isfile(QRCODE_DIR):
os.remove(QRCODE_DIR)
os._exit(1)
65 changes: 35 additions & 30 deletions glue_python/aliyuntoken/aliyuntoken_nn.ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,49 @@

import json
import base64
import requests
import time
import logging
import os
import threading
import sys
import qrcode
import argparse

import requests
import qrcode
from flask import Flask, send_file, render_template, jsonify


app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
last_status = 0
LAST_STATUS = 0
if sys.platform.startswith('win32'):
qrcode_dir = 'qrcode.png'
QRCODE_DIR = 'qrcode.png'
else:
qrcode_dir= '/aliyuntoken/qrcode.png'
QRCODE_DIR= '/aliyuntoken/qrcode.png'


def poll_qrcode_status(data, log_print):
global last_status
# pylint: disable=W0603
def poll_qrcode_status(_data, log_print):
"""
循环等待扫码
"""
global LAST_STATUS
while True:
re = requests.post('https://api-cf.nn.ci/alist/ali/ck', json=data)
if re.status_code == 200:
re_data = json.loads(re.text)
if re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED':
h = re_data['content']['data']['bizExt']
_re = requests.post('https://api-cf.nn.ci/alist/ali/ck', json=_data, timeout=10)
if _re.status_code == 200:
_re_data = json.loads(_re.text)
if _re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED':
h = _re_data['content']['data']['bizExt']
c = json.loads(base64.b64decode(h).decode('gbk'))
refresh_token = c['pds_login_result']['refreshToken']
if sys.platform.startswith('win32'):
with open('mytoken.txt', 'w') as f:
with open('mytoken.txt', 'w', encoding='utf-8') as f:
f.write(refresh_token)
else:
with open('/data/mytoken.txt', 'w') as f:
with open('/data/mytoken.txt', 'w', encoding='utf-8') as f:
f.write(refresh_token)
logging.info('扫码成功, refresh_token 已写入文件!')
last_status = 1
LAST_STATUS = 1
break
else:
if log_print:
Expand All @@ -54,36 +59,36 @@ def index():

@app.route('/image')
def serve_image():
return send_file(qrcode_dir, mimetype='image/png')
return send_file(QRCODE_DIR, mimetype='image/png')


@app.route('/status')
def status():
if last_status == 1:
if LAST_STATUS == 1:
return jsonify({'status': 'success'})
elif last_status == 2:
elif LAST_STATUS == 2:
return jsonify({'status': 'failure'})
else:
return jsonify({'status': 'unknown'})


@app.route('/shutdown_server', methods=['GET'])
def shutdown():
if os.path.isfile(qrcode_dir):
os.remove(qrcode_dir)
if os.path.isfile(QRCODE_DIR):
os.remove(QRCODE_DIR)
os._exit(0)


if __name__ == '__main__':
if os.path.isfile(qrcode_dir):
os.remove(qrcode_dir)
if os.path.isfile(QRCODE_DIR):
os.remove(QRCODE_DIR)
parser = argparse.ArgumentParser(description='AliyunPan Refresh Token')
parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式')
args = parser.parse_args()
logging.info('二维码生成中...')
re_count = 0
while True:
re = requests.get('https://api-cf.nn.ci/alist/ali/qr')
re = requests.get('https://api-cf.nn.ci/alist/ali/qr', timeout=10)
if re.status_code == 200:
re_data = json.loads(re.content)
t = str(re_data['content']['data']['t'])
Expand All @@ -94,8 +99,8 @@ def shutdown():
qr.add_data(codeContent)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img.save(qrcode_dir)
if os.path.isfile(qrcode_dir):
img.save(QRCODE_DIR)
if os.path.isfile(QRCODE_DIR):
logging.info('二维码生成完成!')
break
time.sleep(1)
Expand All @@ -110,13 +115,13 @@ def shutdown():
threading.Thread(target=poll_qrcode_status, args=(data, False)).start()
logging.info('请打开阿里云盘扫描此二维码!')
qr.print_ascii(invert=True, tty=sys.stdout.isatty())
while last_status != 1:
while LAST_STATUS != 1:
time.sleep(1)
if os.path.isfile(qrcode_dir):
os.remove(qrcode_dir)
if os.path.isfile(QRCODE_DIR):
os.remove(QRCODE_DIR)
os._exit(0)
else:
logging.error('未知的扫码模式')
if os.path.isfile(qrcode_dir):
os.remove(qrcode_dir)
if os.path.isfile(QRCODE_DIR):
os.remove(QRCODE_DIR)
os._exit(1)
Loading

0 comments on commit 6c58f97

Please sign in to comment.