forked from sunfkny/genshin-gacha-export
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
467 lines (398 loc) · 16.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
import json
import time
import requests
from urllib import parse
import os
import sys
from config import Config
from time import sleep
import traceback
import UIGF_converter
import gachaMetadata
gachaQueryTypeIds = gachaMetadata.gachaQueryTypeIds
gachaQueryTypeNames = gachaMetadata.gachaQueryTypeNames
gachaQueryTypeDict = gachaMetadata.gachaQueryTypeDict
def main():
print("获取抽卡记录", flush=True)
# gachaInfo = getGachaInfo()
# gachaTypes = getGachaTypes()
# gachaTypeIds = [banner["key"] for banner in gachaTypes]
# gachaTypeNames = [key["name"] for key in gachaTypes]
gachaData = {}
# gachaData["gachaType"] = gachaTypes
# gachaData["gachaInfo"] = gachaInfo
gachaData["gachaLog"] = {}
for gachaTypeId in gachaQueryTypeIds:
gachaLog = getGachaLogs(gachaTypeId, gachaQueryTypeDict)
gachaData["gachaLog"][gachaTypeId] = gachaLog
uid_flag = 1
for gachaType in gachaData["gachaLog"]:
for log in gachaData["gachaLog"][gachaType]:
if uid_flag and log["uid"]:
gachaData["uid"] = log["uid"]
uid_flag = 0
# del log["uid"]
# del log["count"]
# del log["gacha_type"]
gen_path = os.path.dirname(os.path.realpath(sys.argv[0]))
uid = gachaData["uid"]
localDataFilePath = f"{gen_path}\\gachaData-{uid}.json"
if os.path.isfile(localDataFilePath):
with open(localDataFilePath, "r", encoding="utf-8") as f:
localData = json.load(f)
mergeData = mergeDataFunc(localData, gachaData)
else:
mergeData = gachaData
mergeData["gachaType"]=gachaQueryTypeDict
print("写入JSON", end="...", flush=True)
# 抽卡报告读取 gachaData.json
with open(f"{gen_path}\\gachaData.json", "w", encoding="utf-8") as f:
json.dump(mergeData, f, ensure_ascii=False, sort_keys=False, indent=4)
# 待合并数据 gachaData-{uid}.json
with open(f"{gen_path}\\gachaData-{uid}.json", "w", encoding="utf-8") as f:
json.dump(mergeData, f, ensure_ascii=False, sort_keys=False, indent=4)
# 备份历史数据防止爆炸 gachaData-{uid}-{t}.json
t = time.strftime("%Y%m%d%H%M%S", time.localtime())
with open(f"{gen_path}\\gachaData-{uid}-{t}.json", "w", encoding="utf-8") as f:
json.dump(mergeData, f, ensure_ascii=False, sort_keys=False, indent=4)
print("OK", flush=True)
if s.getKey("FLAG_UIGF_JSON"):
print("写入UIGF JSON", end="...", flush=True)
with open(f"{gen_path}\\UIGF_gachaData-{uid}-{t}.json", "w", encoding="utf-8") as f:
UIGF_data = UIGF_converter.convert(uid)
json.dump(UIGF_data, f, ensure_ascii=False, sort_keys=False, indent=4)
print("OK", flush=True)
if s.getKey("FLAG_CLEAN"):
print("清除记录", end="...", flush=True)
gen_path = os.path.dirname(os.path.realpath(sys.argv[0]))
del_paths = [name for name in os.listdir(gen_path) if name.startswith("gacha") and (name.endswith(".csv") or name.endswith(".xlsx"))]
for del_path in del_paths:
try:
os.remove(gen_path + "\\" + del_path)
print(del_path, end=" ", flush=True)
except:
pass
print("")
if s.getKey("FLAG_WRITE_XLSX"):
import writeXLSX
writeXLSX.main()
if s.getKey("FLAG_SHOW_REPORT"):
import statisticsData
statisticsData.main()
def mergeDataFunc(localData, gachaData):
# gachaTypes = gachaData["gachaType"]
# gachaTypeIds = [banner["key"] for banner in gachaTypes]
# gachaTypeNames = [banner["name"] for banner in gachaTypes]
# gachaTypeDict = dict(zip(gachaTypeIds, gachaTypeNames))
for banner in gachaQueryTypeDict:
bannerLocal = localData["gachaLog"][banner]
bannerGet = gachaData["gachaLog"][banner]
if bannerGet == bannerLocal:
pass
else:
print("合并", gachaQueryTypeDict[banner])
flaglist = [1] * len(bannerGet)
loc = [[i["time"],i["name"]] for i in bannerLocal]
for i in range(len(bannerGet)):
gachaGet = bannerGet[i]
get = [gachaGet["time"],gachaGet["name"]]
# if gachaGet in bannerLocal:
# flaglist.append(1)
if get in loc:
pass
else:
flaglist[i] = 0
print("获取到", len(flaglist), "条记录")
tempData = []
for i in range(len(bannerGet)):
if flaglist[i] == 0:
gachaGet = bannerGet[i]
tempData.insert(0, gachaGet)
print("追加", len(tempData), "条记录")
for i in tempData:
localData["gachaLog"][banner].insert(0, i)
return localData
# def getGachaTypes():
# tmp_url = url.replace("getGachaLog", "getConfigList")
# parsed = urllib.parse.urlparse(tmp_url)
# querys = urllib.parse.parse_qsl(parsed.query)
# param_dict = dict(querys)
# param_dict["lang"] = "zh-cn"
# param = urllib.parse.urlencode(param_dict)
# path = tmp_url.split("?")[0]
# tmp_url = path + "?" + param
# r = requests.get(tmp_url)
# s = r.content.decode("utf-8")
# configList = json.loads(s)
# gachaTypeLists = []
# return configList["data"]["gacha_type_list"]
def getGachaLogs(gachaTypeId, gachaTypeDict):
size = "20"
# api限制一页最大20
gachaList = []
end_id="0"
for page in range(1, 9999):
print(f"正在获取 {gachaTypeDict[gachaTypeId]} 第 {page} 页", flush=True)
api = getApi(gachaTypeId, size, page, end_id)
r = requests.get(api)
s = r.content.decode("utf-8")
j = json.loads(s)
gacha = j["data"]["list"]
if not len(gacha):
break
for i in gacha:
gachaList.append(i)
end_id=j["data"]["list"][-1]["id"]
sleep(0.5)
return gachaList
def getApi(gachaType, size, page, end_id=""):
parsed = parse.urlparse(url)
querys = parse.parse_qsl(str(parsed.query))
param_dict = dict(querys)
param_dict["size"] = size
param_dict["gacha_type"] = gachaType
param_dict["page"] = page
param_dict["lang"] = "zh-cn"
param_dict["end_id"] = end_id
param = parse.urlencode(param_dict)
path = str(url).split("?")[0]
api = path + "?" + param
return api
def checkApi(url):
if not url:
print("url为空")
return False
if "getGachaLog" not in url:
print("错误的url,检查是否包含getGachaLog")
return False
try:
r = requests.get(url)
s = r.content.decode("utf-8")
j = json.loads(s)
except Exception as e:
print("API请求解析出错:\n", traceback.format_exc())
return False
if not j["data"]:
if j["message"] == "authkey valid error":
print("authkey错误")
else:
print("数据为空,错误代码:" + j["message"])
return False
return True
def getQueryVariable(variable):
query = str(url).split("?")[1]
vars = query.split("&")
for v in vars:
if v.split("=")[0] == variable:
return v.split("=")[1]
return ""
def getGachaInfo():
region = getQueryVariable("region")
lang = getQueryVariable("lang")
gachaInfoUrl = "https://webstatic.mihoyo.com/hk4e/gacha_info/{}/items/{}.json".format(region, lang)
r = requests.get(gachaInfoUrl)
s = r.content.decode("utf-8")
gachaInfo = json.loads(s)
return gachaInfo
def pressAnyKeyToExit(msg="执行结束,按任意键退出"):
from sys import exit
print("")
print(msg, end="...", flush=True)
try:
input()
except:
pass
exit()
def setProxy(enable, proxyIp, IgnoreIp):
import winreg
xpath = r"Software\Microsoft\Windows\CurrentVersion\Internet Settings"
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, xpath, 0, winreg.KEY_WRITE)
winreg.SetValueEx(key, "ProxyEnable", 0, winreg.REG_DWORD, enable)
winreg.SetValueEx(key, "ProxyServer", 0, winreg.REG_SZ, proxyIp)
winreg.SetValueEx(key, "ProxyOverride", 0, winreg.REG_SZ, IgnoreIp)
except Exception as e:
print("设置代理出错:\n", traceback.format_exc())
finally:
None
def enableProxy():
proxyIP = "127.0.0.1:8889"
IgnoreIp = "172.*;192.*;"
setProxy(1, proxyIP, IgnoreIp)
def disableProxy():
setProxy(0, "", "")
class Addon(object):
def __init__(self):
pass
def request(self, flow):
if "mihoyo.com/event/gacha_info/api/getGachaLog" in flow.request.url:
global url
url = flow.request.url
s.setKey("url", url)
m.shutdown()
def response(self, flow):
pass
def capture():
from mitmproxy.options import Options
from mitmproxy.proxy.config import ProxyConfig
from mitmproxy.proxy.server import ProxyServer
from mitmproxy.tools.dump import DumpMaster
import asyncio
import signal
try:
options = Options(listen_host="0.0.0.0", listen_port=8889, http2=True)
config = ProxyConfig(options)
global m
m = DumpMaster(options, with_termlog=False, with_dumper=False)
m.server = ProxyServer(config)
m.addons.add(Addon())
loop = asyncio.get_event_loop()
try:
loop.add_signal_handler(signal.SIGINT, getattr(m, "prompt_for_exit", m.shutdown))
loop.add_signal_handler(signal.SIGTERM, m.shutdown)
except NotImplementedError:
pass
async def wakeup():
while True:
await asyncio.sleep(0.2)
asyncio.ensure_future(wakeup())
m.run()
except KeyboardInterrupt:
print("中止抓包")
print("清除代理", end="...", flush=True)
disableProxy()
print("成功", flush=True)
pressAnyKeyToExit()
except TypeError:
pass
except Exception as e:
print("抓包模块出错:\n", traceback.format_exc())
if __name__ == "__main__":
global url
url = ""
gen_path = os.path.dirname(os.path.realpath(sys.argv[0]))
s = Config(gen_path + "\\config.json")
latest = "https://pd.zwc365.com/seturl/https://raw.githubusercontent.com/sunfkny/genshin-gacha-export/main/verison.txt"
try:
print("检查更新中...", end="", flush=True)
latestVerison = requests.get(latest).text
verison = s.getKey("verison")
if verison != latestVerison:
print(f"当前版本{verison}不是最新\n请到 https://github.com/sunfkny/genshin-gacha-export/releases 下载最新版本{latestVerison}")
except Exception:
print("检查更新失败", flush=True)
FLAG_USE_CONFIG_URL = s.getKey("FLAG_USE_CONFIG_URL")
if FLAG_USE_CONFIG_URL:
print("检查配置文件中的链接", end="...", flush=True)
url = s.getKey("url")
if checkApi(url):
print("合法")
main()
pressAnyKeyToExit()
FLAG_MANUAL_INPUT_URL = s.getKey("FLAG_MANUAL_INPUT_URL")
while FLAG_MANUAL_INPUT_URL:
try:
url = input("输入url: ")
if not checkApi(url):
continue
else:
FLAG_MANUAL_INPUT_URL = False
main()
pressAnyKeyToExit()
except:
continue
FLAG_USE_LOG_URL = s.getKey("FLAG_USE_LOG_URL")
if FLAG_USE_LOG_URL:
try:
USERPROFILE = os.environ["USERPROFILE"]
output_log_path = ""
output_log_path_cn = os.path.join(USERPROFILE, "AppData", "LocalLow", "miHoYo", "原神", "output_log.txt")
output_log_path_global = os.path.join(USERPROFILE, "AppData", "LocalLow", "miHoYo", "Genshin Impact", "output_log.txt")
if os.path.isfile(output_log_path_cn):
print("检测到国服日志文件")
output_log_path = output_log_path_cn
if os.path.isfile(output_log_path_global):
print("检测到海外服日志文件")
output_log_path = output_log_path_global
if os.path.isfile(output_log_path_cn) and os.path.isfile(output_log_path_global):
flag = True
while flag:
c = input("检测到两个日志文件,输入1选择国服,输入2选择海外服:")
if c == "1":
output_log_path = output_log_path_cn
flag = False
elif c == "2":
output_log_path = output_log_path_global
flag = False
if not os.path.isfile(output_log_path_cn) and not os.path.isfile(output_log_path_global):
print("没有检测到日志文件")
else:
# with open(output_log_path, "r", encoding="utf-8") as f:
with open(output_log_path, "r", encoding="mbcs", errors="ignore") as f:
log = f.readlines()
for line in log:
if line.startswith("OnGetWebViewPageFinish") and line.endswith("#/log\n"):
url = line.replace("OnGetWebViewPageFinish:", "").replace("\n", "")
if url == "":
print("日志文件中没有链接")
else:
spliturl = str(url).split("?")
if "webstatic-sea" in spliturl[0] or "hk4e-api-os" in spliturl[0]:
spliturl[0] = "https://hk4e-api-os.mihoyo.com/event/gacha_info/api/getGachaLog"
else:
spliturl[0] = "https://hk4e-api.mihoyo.com/event/gacha_info/api/getGachaLog"
url = "?".join(spliturl)
print("检查日志文件中的链接", end="...", flush=True)
if checkApi(url):
print("合法")
gen_path = os.path.dirname(os.path.realpath(sys.argv[0]))
configPath = os.path.join(gen_path, "config.json")
s = Config(configPath)
s.setKey("url", url)
main()
pressAnyKeyToExit()
except Exception as e:
print("日志读取模块出错:\n", traceback.format_exc())
pressAnyKeyToExit()
FLAG_USE_CAPTURE = s.getKey("FLAG_USE_CAPTURE")
if FLAG_USE_CAPTURE:
flag = True
print("开始通过抓包模式捕获链接\n注意:必须等程序运行结束或者Ctrl+C退出,不要直接关闭,否则会上不了网\n可以用解压出来的关闭代理bat脚本恢复,或者 设置 - 网络和Internet - 代理 - 使用代理服务器 - 关闭")
while flag:
try:
i = input("确定使用抓包模式吗?输入yes确认执行:")
if i == "yes":
flag = False
except KeyboardInterrupt:
print("取消抓包模式")
pressAnyKeyToExit()
except Exception as e:
print(traceback.format_exc())
continue
try:
print("设置代理", end="...", flush=True)
enableProxy()
print("成功", flush=True)
print("请打开抽卡记录页面,并翻页几次")
print("正在捕获链接", end="...", flush=True)
capture()
print("成功")
print("清除代理", end="...", flush=True)
disableProxy()
print("成功", flush=True)
print("检查链接", end="...", flush=True)
if not checkApi(url):
pressAnyKeyToExit()
print("合法")
main()
pressAnyKeyToExit()
except KeyboardInterrupt:
print("中止抓包模式")
print("清除代理", end="...", flush=True)
disableProxy()
print("成功", flush=True)
pressAnyKeyToExit()
except Exception as e:
disableProxy()
print("抓包模块出错:\n", traceback.format_exc())
pressAnyKeyToExit()