forked from Neutree/netease-cached-music
-
Notifications
You must be signed in to change notification settings - Fork 0
/
decrypt.py
343 lines (303 loc) · 15.3 KB
/
decrypt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
#coding : utf-8
import time
import os
import re
import shutil
import sys
import glob
import eyed3
from eyed3.id3.tag import Tag, ImagesAccessor, LyricsAccessor
from eyed3.id3.frames import ImageFrame
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent, LoggingEventHandler
from selenium.webdriver.remote.remote_connection import LOGGER as seleniumLogger
import logging,threading
import filetype
from mutagen.mp4 import MP4
from configparser import ConfigParser
from chrome_Daemon import MyChromeDaemon
from loadinfo2db import MyDB, load_cache_file_info
class Netease_music:
def __init__(self, db, src_dir='', des_dir=''):
''' src_dir 保存着被网易云缓存的音乐文件(需要解密)
des_dir 保存解密后的音乐文件 (带封面 歌词 歌手信息)
'''
# 初始化工作文件夹
self.src_dir = src_dir
self.des_dir = des_dir
self.lrc_dir = os.path.join(des_dir, 'lyric') # 缓存下载的歌词文件
self.cover_dir = os.path.join(des_dir, 'cover') # 缓存歌词封面
self.msc_dir = os.path.join(des_dir, 'msc') # 缓存解密的音乐文件
self.ready_msc_dir = os.path.join(des_dir,'readyMusic') # 处理完毕的音乐文件
self.ready_msc_list_file = os.path.join(self.ready_msc_dir,".list.txt")
logging.info("Current src_dir: %s", src_dir)
logging.info("Current des_dir: %s", des_dir)
self.db = db
# 新建目标文件夹
if not os.path.exists(des_dir):
os.mkdir(des_dir)
if not os.path.exists(self.lrc_dir):
os.mkdir(self.lrc_dir)
if not os.path.exists(self.msc_dir):
os.mkdir(self.msc_dir)
if not os.path.exists(self.cover_dir):
os.mkdir(self.cover_dir)
if not os.path.exists(self.ready_msc_dir):
os.mkdir(self.ready_msc_dir)
# 找出要转换的加密文件
os.chdir(src_dir)
self.uc_files = glob.glob('*.uc') #+ glob.glob('*.uc!') # 找出符合文件名的文件
self.id_uc_mp = {} # 文件名字和id对应表
for i in self.uc_files:
self.id_uc_mp[self.getSongId(i)] = i
# 从缓存文件名中提取出歌曲的id
# 缓存文件名字 结构: 156828-128-4e5e8487039537d52d70e2e37ce85682.idx/info/uc
# uc 音频文件
# info 格式 音量
# idx 文件大小 文件已经下载的区间?
def getSongId(self, name):
id = name[:name.find('-')]
logging.debug("getSongId: %s", id)
return id
def getInfoFromWeb(self, musicId):
daemon.browser.get("https://music.163.com/#/song?id=%s" % musicId)
daemon.browser.switch_to.frame("contentFrame")
img = daemon.browser.find_element_by_xpath('//body/div[3]/div[1]/div/div/div[1]/div[1]/div[1]/div[1]/img')
title = daemon.browser.find_element_by_xpath('//body/div[3]/div[1]/div/div/div[1]/div[1]/div[2]/div[1]/div/em')
artist = daemon.browser.find_elements_by_xpath('//body/div[3]/div[1]/div/div/div[1]/div[1]/div[2]/p[1]/span/a')
album = None
try:
album = daemon.browser.find_element_by_xpath('//body/div[3]/div[1]/div/div/div[1]/div[1]/div[2]/p[2]/a')
except:
pass
imgUrl = img.get_attribute("data-src")
titleStr = title.get_attribute("innerHTML")
artistStr = [x.get_attribute("innerHTML") for x in artist]
artistStr = " / ".join(artistStr)
if album is None:
albumStr = ""
else:
albumStr = album.get_attribute("innerHTML")
# lyricStr = daemon.lyric_queue.get(True)
return [
imgUrl,
titleStr,
artistStr,
albumStr,
# lyricStr,
]
def decrypt(self, musicId):
cachePath = os.path.join(self.src_dir, self.id_uc_mp.get(musicId))
mscFilePath = os.path.join(self.msc_dir,musicId+'.mp3')
try: # from web
with open(mscFilePath,'wb') as f:
f.write(bytes(self._decrypt(cachePath)))
except Exception as e: # from file
logging.error("musicId: %s file decrypt fail: %s", musicId, e)
return None
logging.debug("musicId: %s file decrypt to %s", musicId, mscFilePath)
return mscFilePath
def _decrypt(self,cachePath):
with open(cachePath, 'rb') as f:
btay = bytearray(f.read())
for i, j in enumerate(btay):
btay[i] = j ^ 0xa3
return btay
def getAllMusic(self):
with open(self.ready_msc_list_file, "a+") as file: # 打开记录已经 查找过的歌曲记录 的文件
file.seek(0)
alreadyIds = file.readlines() # 读取所有的id
for _i, musicId in enumerate(self.id_uc_mp):
logging.info("doing: %s", musicId)
if musicId+"\n" in alreadyIds: # 已经记录过的
continue
path = self.getMusic_only_text(musicId)
if path is None:
continue;
else:
file.write("%s\n"%musicId)
alreadyIds.append("%s\n"%musicId)
# self._getAllMusic(ids, file)
# 只向音乐文件中添加 作者等信息 不添加图片节省存储空间
def getMusic_only_text(self, musicId, cache_file_info, db):
logging.info("===================================only text")
logging.info("开始转存 %s 歌曲", musicId)
# 破解id对应文件名字的 文件 返回 结果文件路径
mscFilePath = self.decrypt(musicId) # 把异或后的文件保存到msc文件夹中, 然后开始找tab信息
if mscFilePath is None:
return None;
logging.info("歌曲 转换成功")
try:
info = self.getInfoFromWeb(musicId)
except Exception as e:
logging.error("歌曲 %s web加载失败: %s", musicId, e)
e.with_traceback()
return None;
logging.info("歌曲 web 资料获取成功")
logging.info(f"对文件添加元数据:{mscFilePath}")
# 识别音频文件格式
mscFileType = filetype.guess(mscFilePath)
if mscFileType is None:
logging.error(f"歌曲文件格式识别失败!{mscFilePath}")
return None;
mscFileExtension = mscFileType.extension
if mscFileExtension == 'mp3':
# 是mp3格式:
mscFile = eyed3.load(mscFilePath)
mscFile.tag.album = info[3]
mscFile.tag.artist = info[2]
mscFile.tag.title = info[1]
mscFile.tag.user_text_frames.set("netease_url", musicId)
mscFile.tag.save(encoding="utf8", version=(2, 3, 0))
elif mscFileExtension == 'm4a' or mscFileExtension == 'mp4' :
m4aMscFile = MP4(mscFilePath)
m4aMscFile.tags['\xa9nam']=info[1]
m4aMscFile.tags['\xa9ART']=info[2]
m4aMscFile.tags['\xa9alb']=info[3]
m4aMscFile.tags['\xa9cmt']=f"netease_url:{musicId}"
m4aMscFile.save()
mscFileExtension = 'm4a'
else:
logging.error(f"歌曲文件格式:{mscFileExtension},无法添加元数据, 所以没有添加数据")
# 重命名后移动文件
# 重命名的格式不能修改 数据库模块 会从文件中提取musicId
mscFileDstPath = os.path.join(self.ready_msc_dir,
"%s - %s.%s" % (
re.sub(r"[\/\\\:\*\?\"\<\>\|]", "", info[1]),
musicId,
mscFileExtension
))
shutil.move(mscFilePath, mscFileDstPath)
logging.info(f"歌曲保存至指定目录成功{musicId},{mscFileDstPath}")
self.update_cache_info_into_db(cache_file_info, info,db)
logging.info(f"歌曲保存db成功")
logging.info("=end==================================only text")
return mscFileDstPath
def update_cache_info_into_db(self, cache_file_info, song_info, db):
[f_size,f_mtime,f_ctime,song_id, song_rate, fname] = cache_file_info
[imgUrl, titleStr, artistStr, albumStr] = song_info
cursor = db.cursor
cursor.execute("select file_size from CACHE_FILE_T where file_filename= ?", (fname,))
result = cursor.fetchall()
if len(result)!=0:
cursor.execute('delete from CACHE_FILE_T where file_filename=?', (fname,))
cursor.execute('insert into CACHE_FILE_T '+\
'(song_id,song_rate,file_size,file_createtime,file_modifytime,file_filename,title,album,artist,imgUrl) values (?,?,?,?,?,?,?,?,?,?)',
(song_id, song_rate, f_size, f_ctime,f_mtime,fname,titleStr,albumStr,artistStr, imgUrl))
db.conn.commit()
def getMusic(self, musicId):
logging.info("开始转存 %s 歌曲", musicId)
# 破解id对应文件名字的 文件 返回 结果文件路径
mscFilePath = self.decrypt(musicId) # 把异或后的文件保存到msc文件夹中, 然后开始找tab信息
logging.info("歌曲 转换成功")
try:
info = self.getInfoFromWeb(musicId)
except Exception as e:
logging.info("歌曲 %s web加载失败: %s", musicId, e)
e.with_traceback()
return None;
logging.info("歌曲 web 资料获取成功")
# print(info)
print(mscFilePath)
mscFile = eyed3.load(mscFilePath)
mscFile.tag: Tag;
mscFile.tag.images: ImagesAccessor
mscFile.tag.lyrics: LyricsAccessor
picPath = os.path.join(self.cover_dir, musicId + ".jpg")
daemon.download_pic(info[0] + "?param=500y500", picPath) # 下载图片
imageLoad = open(picPath, "rb").read()
mscFile.tag.images.set(ImageFrame.FRONT_COVER, imageLoad, "image/jpeg", u"cover_description")
# imageLoad = open(r"C:\Users\DogEgg\Pictures\Saved Pictures\back_cover.jpg", "rb").read() # 可以放多个图片,iTunes只识别第一张,foobar2000识别多个type的图片 所有的软件对歌手的头像都是联网下载的 不会看tag里面的图片集
# mscFile.tag.images.set(ImageFrame.LEAD_ARTIST, imageLoad, "image/jpeg", u"artist")
# mscFile.tag.images.set(ImageFrame.ARTIST, imageLoad, "image/jpeg", u"artist")
# mscFile.tag.lyrics.set(info[4])
mscFile.tag.album = info[3]
mscFile.tag.artist = info[2]
mscFile.tag.title = info[1]
mscFile.tag.user_text_frames.set("netease_url", musicId)
# windows 自带的播放器 Groove音乐 只支持 id3v2.3 格式的, 之前没有指定version 有些文件被保存为 v2.4的,在windows文件夹的预览中,就看不到封面图片了
mscFile.tag.save(encoding="utf8", version=(2, 3, 0))
mscFileDstPath = os.path.join(self.ready_msc_dir,
"%s - %s.mp3" % (re.sub(r"[\/\\\:\*\?\"\<\>\|]", "", info[1]), musicId))
shutil.move(mscFilePath, mscFileDstPath)
logging.info("歌曲保存至指定目录成功")
return mscFileDstPath
def start_dir_watch(self):
# TODO 没有把触发事件的文件加入 记录文件中
logging.info("start dir watch")
observer = Observer()
event_handler = self.FileModifyHandler(self)
# event_handler = LoggingEventHandler()
observer.schedule(event_handler, self.src_dir, recursive=False)
observer.start()
return observer
class FileModifyHandler(FileSystemEventHandler):
def __init__(self, context):
super().__init__()
self.lastModifyFile = None;
self.context = context
self.db = None
logging.info('FileModifyHandler init done..')
# on_modified在一个单独的进程中运行, 和init进程 不是同一个
def on_modified(self, event: FileSystemEvent):
if self.db is None:
self.db = MyDB(self.context.db.db_path)
fname = os.path.basename(event.src_path)
cursor = self.db.cursor
# 只监视uc文件
if not fname.endswith("uc"):
return;
# 获取文件基本信息
f_info = load_cache_file_info(event.src_path)
[f_size,f_mtime,f_ctime,song_id, song_rate] = f_info
f_info.append(fname)
# 查询数据库 这个文件名 对应的历史文件
cursor.execute("select file_size from CACHE_FILE_T where file_filename= ?", (fname,))
result = cursor.fetchall()
logging.info(f"db查询缓存文件名:{fname},结果个数:{len(result)}")
if f_size==0: # 文件空:不操作
logging.info(f'\tcache文件{fname}为空, 不操作')
return
# 查到历史信息 且历史文件较大:不操作
if len(result) != 0:
older_file_size = result[0][0]
logging.info(f"\t该文件数据库中存在{fname},文件大小:{older_file_size},当前大小:{f_size}")
if int(older_file_size) >= int(f_size):
return;
# (当前文件更大) 或者 (数据库中没有且文件有大小) 就重新转码
basename = os.path.basename(event.src_path)
songId = self.context.getSongId(basename)
self.context.id_uc_mp[songId] = event.src_path
self.context.getMusic_only_text(songId, f_info, self.db)
if __name__ == '__main__':
config = ConfigParser()
config.read('./decrypt.config')
srcDir = config["global"]["srcDir"]
desDir = config["global"]["desDir"]
db_filename = config["global"]["dbFilename"]
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%H:%M:%S')
seleniumLogger.setLevel(logging.WARNING)
logging = logging.getLogger("NeteaseDecrypt")
db_path = os.path.join(desDir, db_filename)
db = MyDB(db_path)
daemon = MyChromeDaemon()
# daemon.listen_ready.acquire() # 等待 ichrome的监听线程开始运行 阻塞
# daemon.browser.get("https://music.163.com/")
# handler = Netease_music("E:/workspace/netease-cached-music/cache/", "E:/workspace/netease-cached-music/dst/")
# handler = Netease_music("C:\\Users\\DogEgg\\AppData\\Local\\Netease\\CloudMusic\\Cache\\Cache",
# "E:/workspace/netease-cached-music/dst/")
# 子线程监视文件夹变动
handler = Netease_music(db, srcDir, desDir)
ob_thread = handler.start_dir_watch()
# 主线程等待 ctrl+c 信号退出
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
a = ob_thread.stop()
logging.info('key board interrupt.. exit..')
ob_thread.join()
# handler.getMusic()
# print(handler.getInfoFromWeb("65528"))