-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
【字幕语料】人人影视字幕 #103
Comments
踩坑:sql导出csv或者tsv有个恶心的双引号转义和换行问题,现在用的办法是上clickhouse用转json脚本:https://clickhouse.com/docs/knowledgebase/mysql-to-parquet-csv-json |
修改了pysubs2的一部分源码,来避免一些错误,拯救一些字幕: substation.py import logging
import re
import warnings
from typing import Any, Union, Optional, Dict, Tuple, List, TextIO
from .base import FormatBase
from ..ssaevent import SSAEvent
from ..ssastyle import SSAStyle
from ..common import Color, Alignment, SSA_ALIGNMENT
from ..time import make_time, ms_to_times, timestamp_to_ms, TIMESTAMP, TIMESTAMP_SHORT
from ..ssafile import SSAFile
def ass_to_ssa_alignment(i: int) -> int:
warnings.warn("ass_to_ssa_alignment function is deprecated, please use the Alignment enum", DeprecationWarning)
return SSA_ALIGNMENT[i-1]
def ssa_to_ass_alignment(i: int) -> int:
warnings.warn("ssa_to_ass_alignment function is deprecated, please use the Alignment enum", DeprecationWarning)
return SSA_ALIGNMENT.index(i) + 1
SECTION_HEADING = re.compile(
r"^.{,3}" # allow 3 chars at start of line for BOM
r"\[" # open square bracket
r"[^]]*[a-z][^]]*" # inside square brackets, at least one lowercase letter (this guards vs. uuencoded font data)
r"]" # close square bracket
)
ATTACHMENT_FILE_HEADING = re.compile(r"(fontname|filename):\s+(?P<name>\S+)")
STYLE_FORMAT_LINE = {
"ass": "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic,"
" Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment,"
" MarginL, MarginR, MarginV, Encoding",
"ssa": "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, TertiaryColour, BackColour, Bold, Italic,"
" BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, AlphaLevel, Encoding"
}
STYLE_FIELDS = {
"ass": ["fontname", "fontsize", "primarycolor", "secondarycolor", "outlinecolor", "backcolor", "bold", "italic",
"underline", "strikeout", "scalex", "scaley", "spacing", "angle", "borderstyle", "outline", "shadow",
"alignment", "marginl", "marginr", "marginv", "encoding"],
"ssa": ["fontname", "fontsize", "primarycolor", "secondarycolor", "tertiarycolor", "backcolor", "bold", "italic",
"borderstyle", "outline", "shadow", "alignment", "marginl", "marginr", "marginv", "alphalevel", "encoding"]
}
EVENT_FORMAT_LINE = {
"ass": "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text",
"ssa": "Format: Marked, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text"
}
EVENT_FIELDS = {
"ass": ["layer", "start", "end", "style", "name", "marginl", "marginr", "marginv", "effect", "text"],
"ssa": ["marked", "start", "end", "style", "name", "marginl", "marginr", "marginv", "effect", "text"]
}
#: Largest timestamp allowed in SubStation, ie. 9:59:59.99.
MAX_REPRESENTABLE_TIME = make_time(h=10) - 10
def color_to_ass_rgba(c: Color) -> str:
return f"&H{((c.a << 24) | (c.b << 16) | (c.g << 8) | c.r):08X}"
def color_to_ssa_rgb(c: Color) -> str:
return f"{((c.b << 16) | (c.g << 8) | c.r)}"
NON_HEX_PAT = re.compile(r"[^0-9a-fA-F]")
def rgba_to_color(s: str) -> Color:
if s[0] == '&':
x = int(re.sub(NON_HEX_PAT, "", s[2:]), base=16)
else:
x = int(s)
r = x & 0xff
g = (x >> 8) & 0xff
b = (x >> 16) & 0xff
a = (x >> 24) & 0xff
return Color(r, g, b, a)
def is_valid_field_content(s: str) -> bool:
"""
Returns True if string s can be stored in a SubStation field.
Fields are written in CSV-like manner, thus commas and/or newlines
are not acceptable in the string.
"""
return "\n" not in s and "," not in s
def parse_tags(text: str, style: SSAStyle = SSAStyle.DEFAULT_STYLE,
styles: Optional[Dict[str, SSAStyle]] = None) -> List[Tuple[str, SSAStyle]]:
"""
Split text into fragments with computed SSAStyles.
Returns list of tuples (fragment, style), where fragment is a part of text
between two brace-delimited override sequences, and style is the computed
styling of the fragment, ie. the original style modified by all override
sequences before the fragment.
Newline and non-breakable space overrides are left as-is.
Supported override tags:
- i, b, u, s
- r (with or without style name)
"""
if styles is None:
styles = {}
fragments = SSAEvent.OVERRIDE_SEQUENCE.split(text)
if len(fragments) == 1:
return [(text, style)]
def apply_overrides(all_overrides: str) -> SSAStyle:
s = style.copy()
for tag in re.findall(r"\\[ibusp][0-9]|\\r[a-zA-Z_0-9 ]*", all_overrides):
if tag == r"\r":
s = style.copy() # reset to original line style
elif tag.startswith(r"\r"):
name = tag[2:]
if name in styles:
# reset to named style
s = styles[name].copy()
else:
if "i" in tag:
s.italic = "1" in tag
elif "b" in tag:
s.bold = "1" in tag
elif "u" in tag:
s.underline = "1" in tag
elif "s" in tag:
s.strikeout = "1" in tag
elif "p" in tag:
try:
scale = int(tag[2:])
except (ValueError, IndexError):
continue
s.drawing = scale > 0
return s
overrides = SSAEvent.OVERRIDE_SEQUENCE.findall(text)
overrides_prefix_sum = ["".join(overrides[:i]) for i in range(len(overrides) + 1)]
computed_styles = map(apply_overrides, overrides_prefix_sum)
return list(zip(fragments, computed_styles))
NOTICE = "Script generated by pysubs2\nhttps://pypi.python.org/pypi/pysubs2"
class SubstationFormat(FormatBase):
"""SubStation Alpha (ASS, SSA) subtitle format implementation"""
@staticmethod
def ms_to_timestamp(requested_ms: int) -> str:
"""Convert ms to 'H:MM:SS.cc'"""
if requested_ms < 0:
requested_ms = 0
if requested_ms > MAX_REPRESENTABLE_TIME:
warnings.warn("Overflow in SubStation timestamp, clamping to MAX_REPRESENTABLE_TIME", RuntimeWarning)
requested_ms = MAX_REPRESENTABLE_TIME
# Aegisub does rounding, see https://github.com/Aegisub/Aegisub/blob/6f546951b4f004da16ce19ba638bf3eedefb9f31/libaegisub/include/libaegisub/ass/time.h#L32
round_ms = ((requested_ms + 5) - (requested_ms + 5) % 10)
h, m, s, ms = ms_to_times(round_ms)
cs = ms // 10
return f"{h:01d}:{m:02d}:{s:02d}.{cs:02d}"
@classmethod
def guess_format(cls, text: str) -> Optional[str]:
"""See :meth:`pysubs2.formats.FormatBase.guess_format()`"""
if re.search(r"V4\+ Styles", text, re.IGNORECASE):
return "ass"
elif re.search(r"V4 Styles", text, re.IGNORECASE):
return "ssa"
else:
return None
@classmethod
def from_file(cls, subs: "SSAFile", fp: TextIO, format_: str, **kwargs: Any) -> None:
"""See :meth:`pysubs2.formats.FormatBase.from_file()`"""
def string_to_field(f: str, v: str) -> Any:
# Per issue #45, we should handle the case where there is extra whitespace around the values.
# Extra whitespace is removed in non-string fields where it would break the parser otherwise,
# and in font name (where it doesn't really make sense). It is preserved in Dialogue string
# fields like Text, Name and Effect (to avoid introducing unnecessary change to parser output).
if f in {"start", "end"}:
v = v.strip()
if v.startswith("-"):
# handle negative timestamps
v = v[1:]
sign = -1
else:
sign = 1
m = TIMESTAMP.match(v)
if m is None:
m = TIMESTAMP_SHORT.match(v)
if m is None:
raise ValueError(f"Failed to parse timestamp: {v!r} {f}")
return sign * timestamp_to_ms(m.groups())
elif "color" in f:
v = v.strip()
return rgba_to_color(v)
elif f in {"bold", "underline", "italic", "strikeout"}:
return v != "0"
elif f in {"borderstyle", "encoding", "marginl", "marginr", "marginv", "layer", "alphalevel"}:
try:
return int(v)
except ValueError:
warnings.warn(f"Failed to parse {f}, using default", RuntimeWarning)
return 0
elif f in {"fontsize", "scalex", "scaley", "spacing", "angle", "outline", "shadow"}:
return float(v)
elif f == "marked":
return v.endswith("1")
elif f == "alignment":
try:
if format_ == "ass":
return Alignment(int(v))
else:
return Alignment.from_ssa_alignment(int(v))
except Exception:
warnings.warn("Failed to parse alignment, using default", RuntimeWarning)
return Alignment.BOTTOM_CENTER
elif f == "fontname":
return v.strip()
else:
return v
subs.info.clear()
subs.aegisub_project.clear()
subs.styles.clear()
subs.fonts_opaque.clear()
subs.graphics_opaque.clear()
inside_info_section = False
inside_aegisub_section = False
inside_font_section = False
inside_graphic_section = False
current_attachment_name = None
current_attachment_lines_buffer = []
current_attachment_is_font = None
for lineno, line in enumerate(fp, 1):
line = line.strip()
# if SECTION_HEADING.match(line):
# logging.debug("at line %d: section heading %s", lineno, line)
# inside_info_section = "Info" in line
# inside_aegisub_section = "Aegisub" in line
# inside_font_section = "Fonts" in line
# inside_graphic_section = "Graphics" in line
# elif inside_info_section or inside_aegisub_section:
# if line.startswith(";"):
# continue # skip comments
# try:
# k, v = line.split(":", 1)
# if inside_info_section:
# subs.info[k] = v.strip()
# elif inside_aegisub_section:
# subs.aegisub_project[k] = v.strip()
# except ValueError:
# pass
# elif inside_font_section or inside_graphic_section:
# m = ATTACHMENT_FILE_HEADING.match(line)
# current_attachment_is_font = inside_font_section
# if current_attachment_name and (m or not line):
# # flush last font/picture on newline or new font/picture name
# attachment_data = current_attachment_lines_buffer[:]
# if inside_font_section:
# subs.fonts_opaque[current_attachment_name] = attachment_data
# elif inside_graphic_section:
# subs.graphics_opaque[current_attachment_name] = attachment_data
# else:
# raise NotImplementedError("Bad attachment section, expected [Fonts] or [Graphics]")
# logging.debug("at line %d: finished attachment definition %s", lineno, current_attachment_name)
# current_attachment_lines_buffer.clear()
# current_attachment_name = None
# if m:
# # start new font/picture
# attachment_name = m.group("name")
# current_attachment_name = attachment_name
# elif line:
# # add non-empty line to current buffer
# current_attachment_lines_buffer.append(line)
# elif line.startswith("Style:"):
# _, rest = line.split(":", 1)
# buf = rest.strip().split(",")
# name, *raw_fields = buf
# field_dict = {f: string_to_field(f, v) for f, v in zip(STYLE_FIELDS[format_], raw_fields)}
# sty = SSAStyle(**field_dict)
# subs.styles[name] = sty
# elif line.startswith("Dialogue:") or line.startswith("Comment:"):
if line.startswith("Dialogue:") or line.startswith("Comment:"):
ev_type, rest = line.split(":", 1)
raw_fields = rest.strip().split(",", len(EVENT_FIELDS[format_])-1)
field_dict = {}
for f, v in zip(EVENT_FIELDS[format_], raw_fields):
try:
field_dict[f] = string_to_field(f, v)
except Exception as e:
print("[WARNING]:", e, f, v)
field_dict["type"] = ev_type
ev = SSAEvent(**field_dict)
subs.events.append(ev)
# cleanup fonts/pictures
if current_attachment_name:
# flush last font on EOF or new section w/o newline
attachment_data = current_attachment_lines_buffer[:]
if current_attachment_is_font:
subs.fonts_opaque[current_attachment_name] = attachment_data
else:
subs.graphics_opaque[current_attachment_name] = attachment_data
logging.debug("at EOF: finished attachment definition %s", current_attachment_name)
current_attachment_lines_buffer.clear()
current_attachment_name = None
@classmethod
def to_file(cls, subs: "SSAFile", fp: TextIO, format_: str, header_notice: str = NOTICE, **kwargs: Any) -> None:
"""See :meth:`pysubs2.formats.FormatBase.to_file()`"""
print("[Script Info]", file=fp)
for line in header_notice.splitlines(False):
print(";", line, file=fp)
subs.info["ScriptType"] = "v4.00+" if format_ == "ass" else "v4.00"
for k, v in subs.info.items():
print(k, v, sep=": ", file=fp)
if subs.aegisub_project:
print("\n[Aegisub Project Garbage]", file=fp)
for k, v in subs.aegisub_project.items():
print(k, v, sep=": ", file=fp)
def field_to_string(f: str, v: Any, line: Union[SSAEvent, SSAStyle]) -> str:
if f in {"start", "end"}:
return cls.ms_to_timestamp(v)
elif f == "marked":
return f"Marked={v:d}"
elif f == "alignment":
if isinstance(v, Alignment):
alignment = v
else:
warnings.warn("The 'alignment' attribute of SSAStyle should be an Alignment instance, using plain int is deprecated", DeprecationWarning)
alignment = Alignment(v)
if format_ == "ssa":
return str(alignment.to_ssa_alignment())
else:
return str(alignment.value)
elif isinstance(v, bool):
return "-1" if v else "0"
elif isinstance(v, int):
return str(v)
elif isinstance(v, float):
return str(int(v) if v.is_integer() else v)
elif isinstance(v, str):
return v
elif isinstance(v, Color):
if format_ == "ass":
return color_to_ass_rgba(v)
else:
return color_to_ssa_rgb(v)
else:
raise TypeError(f"Unexpected type when writing a SubStation field {f!r} for line {line!r}")
print("\n[V4+ Styles]" if format_ == "ass" else "\n[V4 Styles]", file=fp)
print(STYLE_FORMAT_LINE[format_], file=fp)
for name, sty in subs.styles.items():
fields = [field_to_string(f, getattr(sty, f), sty) for f in STYLE_FIELDS[format_]]
print(f"Style: {name}", *fields, sep=",", file=fp)
if subs.fonts_opaque:
print("\n[Fonts]", file=fp)
for font_name, font_lines in sorted(subs.fonts_opaque.items()):
print(f"fontname: {font_name}", file=fp)
for line in font_lines:
print(line, file=fp)
print(file=fp)
if subs.graphics_opaque:
print("\n[Graphics]", file=fp)
for picture_name, picture_lines in sorted(subs.graphics_opaque.items()):
print(f"filename: {picture_name}", file=fp)
for line in picture_lines:
print(line, file=fp)
print(file=fp)
print("\n[Events]", file=fp)
print(EVENT_FORMAT_LINE[format_], file=fp)
for ev in subs.events:
fields = [field_to_string(f, getattr(ev, f), ev) for f in EVENT_FIELDS[format_]]
print(ev.type, end=": ", file=fp)
print(*fields, sep=",", file=fp) |
现阶段先做了一份处理ass的脚本,只处理单文件中含有双语对的字幕,今后如果要跨文件对齐字幕的话,可能需要引入联合国管线的做法,比较麻烦。 这份脚本也可以稍微修改成为srt字幕处理脚本。 import os
from queue import Empty
import pysubs2
import re
import itertools
import json
import gc
import keyboard
import random
import pickle
import copy
import datetime
from pathlib import Path
from collections import Counter
import multiprocessing as mp
KEYWORD2LANG = {
"cn": ("zh_text",),
"chs": ("zh_text",),
"简体": ("zh_text",),
"cht": ("cht_text",),
"繁体": ("cht_text",),
"en": ("en_text",),
"eng": ("en_text",),
"英文": ("en_text",),
"chi_eng": ("zh_text","en_text"),
"cnen": ("zh_text","en_text"),
"en&cn": ("zh_text","en_text"),
"chs&en": ("zh_text","en_text"),
"chs&eng": ("zh_text","en_text"),
"cht&eng": ("cht_text","en_text"),
"简体&英文": ("zh_text","en_text"),
"繁体&英文": ("cht_text", "en_text"),
}
SQL_INFO_LANG = {
"cnen": ("zh_text","en_text"),
"cn": ("zh_text",),
"en": ("en_text",),
"jp": ("ja_text",),
"tw": ("cht_text",),
}
SCANCODE2LANGTUP = {
78: ("zh_text","en_text"), # numpad +
44: ("zh_text",), # z
20: ("cht_text",), # t
18: ("en_text",), # e
74: ("cht_text", "en_text"), # numpad -
}
STYLE_FORMATTER = re.compile(r"\{\\.*?\}")
FLAT_PATH = Path(r"F:\rrwCORPUS\flat")
# ASS_PATH_CACHE_PICKLE = Path(r"F:\rrwCORPUS\ass_path.pkl")
# CHINESE_PAT = re.compile(r'[\u4e00-\u9fff\u3400-\u4DBF\u20000-\u2A6DF]')
CHINESE_PAT = re.compile(r'[\u4e00-\u9fff]')
JAPANESE_PAT = re.compile(r'[\u3040-\u309F\u30A0-\u30FF]') # 只有平假和片假
KOREAN_PAT = re.compile(r'[\uAC00-\uD7AF\u3130-\u318F\u1100-\u11FF]')
ENCODE_TRY_ORDER = ("utf-16", "utf-8", "gbk")
DUAL_ASS_OUT_JSONL = Path(r"F:\rrwCORPUS\dual_ass.jsonl")
SUB_ENCODE_ERROR_LOG = Path(r"F:\rrwCORPUS\encode_error.log")
TEMPLATE_CORPUS = {
"文件名": "",
"it_text": "",
"zh_text": "",
"en_text": "",
"ar_text": "",
"nl_text": "",
"de_text": "",
"eo_text": "",
"fr_text": "",
"he_text": "",
"ja_text": "",
"pt_text": "",
"ru_text": "",
"es_text": "",
"sv_text": "",
"ko_text": "",
"th_text": "",
"id_text":"",
"cht_text":"",
"vi_text":"",
"扩展字段": {},
"时间": datetime.datetime.today().strftime("%Y%m%d")
}
class HARDCODED_PARAMETERS:
dual_ass_valid_dual_line_rate = 0.5 # 如果含\N的行占总行数少于这个比例,这份文件不处理
dual_ass_linecount_ignore_th = 10 # 可能的双语行数小于这个数,这个文件就不收录了
identify_ja_linecount_rate = 0.3 # 如果【含假名的行】占总数的比例大于这个值,则认为是日文(优先于中文韩文)
identify_ko_linecount_rate = 0.3 # 如果【含韩文的行】占总数的比例大于这个值,则认为是韩文(优先于中文)
identify_zh_linecount_rate = 0.5 # 如果【含汉字的行】占总数的比例大于这个值,则认为是中文
auto_delete_small_file_th = 32 # st_size小于此值的文件,删除
def dual_handler(filepathname: Path, subs: pysubs2.ssafile.SSAFile, langtup: tuple):
r"""处理\N隔开的双语字幕"""
subgrid = []
for line in subs:
subline = re.sub(STYLE_FORMATTER, "", line.text)
if subline.count(r'\N') != 1:
continue
subgrid.append(tuple(subline.split(r'\N')))
if len(subgrid) < HARDCODED_PARAMETERS.dual_ass_linecount_ignore_th:
return False
if len(subs) * HARDCODED_PARAMETERS.dual_ass_valid_dual_line_rate >= len(subgrid):
return False
# 只能处理要么是中日要么是中英,繁中和简中判断用langtup作为依据
counters = [Counter() for _ in subgrid[0]]
line_labels = [] # 长度 = subgrid,每行元素个数等于grid
for grid in subgrid:
line_label = ["en_text" for _ in grid]
for langidx, langtext in enumerate(grid):
if CHINESE_PAT.search(langtext):
counters[langidx]["zh"] += 1
line_label[langidx] = "zh_text"
if JAPANESE_PAT.search(langtext):
counters[langidx]["ja"] += 1
line_label[langidx] = "ja_text"
if KOREAN_PAT.search(langtext):
counters[langidx]["ko"] += 1
line_label[langidx] = "ko_text"
line_labels.append(tuple(line_label))
lang_label = ["en_text" for _ in subgrid[0]]
for idx, counter in enumerate(counters):
if counter["ja"] > len(subgrid) * HARDCODED_PARAMETERS.identify_ja_linecount_rate:
lang_label[idx] = "ja_text"
elif counter["ko"] > len(subgrid) * HARDCODED_PARAMETERS.identify_ko_linecount_rate:
lang_label[idx] = "ko_text"
elif counter["zh"] > len(subgrid) * HARDCODED_PARAMETERS.identify_zh_linecount_rate:
if langtup and "cht_text" in langtup:
lang_label[idx] = "cht_text"
else:
lang_label[idx] = "zh_text"
if lang_label[0] == lang_label[1]: # 全是同一份语言的,不要
return False
results = []
# with open(DUAL_ASS_OUT_JSONL, "a", encoding="utf-8") as f:
for gidx, grid in enumerate(subgrid):
if line_labels[gidx][0] == line_labels[gidx][1]:
continue
temp = copy.deepcopy(TEMPLATE_CORPUS)
# temp['文件名'] = filepathname.namez
for lang_key, lang_text in zip(line_labels[gidx], grid):
temp[lang_key] = lang_text
temp["扩展字段"]["k"] = filepathname.name
temp["扩展字段"] = json.dumps(temp["扩展字段"], ensure_ascii=False, sort_keys=True)
tempstr = json.dumps(temp, ensure_ascii=False, sort_keys=True)
results.append(tempstr + '\n')
return ''.join(results)
# f.write('\n')
# 有B人不好好起名
ReLyNnA_REPLACE_PAT = re.compile(b'^.*' + re.escape(b"ReLyNnA") + b'.*$(?:\r?\n)?', re.MULTILINE)
def dual_ass_worker(qi: mp.Queue, qo: mp.Queue, qe: mp.Queue):
while 1:
sql_line_filedir = qi.get()
if sql_line_filedir is None:
print("Got None, Exit.", mp.current_process().name)
gc.collect()
return
# print("Got", sql_line_filedir, mp.current_process().name)
for filepathname in itertools.chain(
sql_line_filedir.rglob(r"*.ssa"),
sql_line_filedir.rglob(r"*.[aA][sS][sS]")
):
filepathname: Path
if filepathname.is_dir(): # 会有.ass结尾的文件夹
continue
if filepathname.stat().st_size < HARDCODED_PARAMETERS.auto_delete_small_file_th:
filepathname.unlink()
print("DELETE EMPTY FILE:", filepathname)
continue
# print("R",filepathname)
# filepathname = Path(filepathname)
subs = None
prvexc = None
with open(filepathname, "rb") as f:
fc = f.read()
if ReLyNnA_REPLACE_PAT.search(fc):
with open(filepathname, "wb") as f:
f.write(re.sub(ReLyNnA_REPLACE_PAT, b"", fc))
for enc in ENCODE_TRY_ORDER:
try:
subs = pysubs2.load(filepathname, encoding=enc, format_="ass")
break
except Exception as e:
prvexc = e
# print(filepathname, enc, "got error:", e)
if not subs:
print(filepathname, "decode failed:", prvexc)
# qe.put("E",str(filepathname))
qe.put_nowait(str(filepathname))
continue
filename = filepathname.name
nameparts = filename.split('.')
for namepart in nameparts[-1::-1]:
langtup = KEYWORD2LANG.get(namepart.lower())
if langtup:
break
if not langtup:
with open(sql_line_filedir / "info_from_sql.json", "r", encoding="utf-8") as f:
lang = json.load(f)["lang"]
# print("[lang]",lang)
langtup = SQL_INFO_LANG.get(lang)
res = dual_handler(filepathname, subs, langtup)
if res: # 如果是单语种,如何处理?
# qo.put("R",res)
qo.put_nowait(res)
if __name__ == "__main__":
# ass_paths = []
# ass_counter = 0
# if ASS_PATH_CACHE_PICKLE.exists():
# with open(ASS_PATH_CACHE_PICKLE, "rb") as f:
# ass_paths = pickle.load(f)
# print("loaded",len(ass_paths))
# else:
# for sql_line_filedir in FLAT_PATH.iterdir():
# for filepathname in sql_line_filedir.rglob(r"*.[aA][sS][sS]"):
# ass_paths.append(filepathname)
# with open(ASS_PATH_CACHE_PICKLE, "wb") as f:
# pickle.dump(ass_paths, f)
# random.shuffle(ass_paths)
# print("ass_count",len(ass_paths)) # 205943
# DUAL_ASS_OUT_JSONL.unlink(missing_ok=True)
mgr = mp.Manager()
qi = mgr.Queue()
qo = mgr.Queue()
qe = mgr.Queue() # 直接用mp.Queue()在windows上会卡死,必现
ps = [
mp.Process(target=dual_ass_worker, args=(qi,qo,qe)) for _ in range(8)
]
for x in ps: x.start()
# ct = 0
for sql_line_filedir in FLAT_PATH.iterdir():
# ct += 1
# if ct >= 10000:
# break
# for idx, filepathname in enumerate(ass_paths[:10]):
# if idx % 1000 == 0:
# print("Ai", idx, filepathname)
# # print("A",filepathname)
qi.put(sql_line_filedir)
# print(sql_line_filedir)
for x in ps: qi.put(None)
gc.collect()
print("Main thread waiting...")
for i, x in enumerate(ps):
x.join()
print(i, "Join", x)
print("child process all done.")
with (open(DUAL_ASS_OUT_JSONL, "w", encoding="utf-8") as fo,
open(SUB_ENCODE_ERROR_LOG, "w", encoding="utf-8") as fe):
while 1:
try:
res = qe.get_nowait()
fe.write(res + '\n')
except Exception:
break
while 1:
try:
# typ, res = qo.get_nowait()
res = qo.get_nowait()
# if typ == "R":
# for r in res:
fo.write(res)
# else:
# fe.write(res + '\n')
except Empty:
break
# ass_counter += 1
# print("LOADING",filepathname)
# subs = None
# for enc in ASS_ENCODE_TRY_ORDER:
# try:
# subs = pysubs2.load(filepathname, encoding=enc)
# break
# except Exception as e:
# print(enc, "got error:", e)
# if not subs:
# continue
# filename = filepathname.name
# print(subs)
# # for line in subs[-25:]:
# # print(re.sub(ASS_FORMATTER, "", line.text)[:80])
# nameparts = filename.split('.')
# for namepart in nameparts[-1::-1]:
# langtup = KEYWORD2LANG.get(namepart.lower())
# if langtup:
# break
# print(langtup)
# if not langtup:
# with open(sql_line_filedir / "info_from_sql.json", "r", encoding="utf-8") as f:
# lang = json.load(f)["lang"]
# print("[lang]",lang)
# langtup = SQL_INFO_LANG.get(lang)
# dual_ass_handler(filepathname, subs, langtup)
# print(f"{ass_counter}/{len(ass_paths)} {langtup}>>>")
# while 1:
# key = keyboard.read_event()
# print(key, key.event_type, key.scan_code)
# if key.event_type == "up":
# if key.name == "esc":
# exit(0)
# if inputtup := SCANCODE2LANGTUP.get(key.scan_code):
# langtup = inputtup
# print(f"set langtup to {langtup}")
# if key.scan_code == 82:
# break |
备份一下写的其它脚本: 因为发现sql中数据有意义,没有最终用上的平展所有压缩包的脚本flat.py: from pathlib import Path
from collections import Counter
import os
import subprocess
input_dir = Path(r"F:\rrwCORPUS\字幕备份")
out_dir = Path(r"F:\rrwCORPUS\ex")
out_dir.mkdir(exist_ok=True)
archive_ext = {
'zip',
'rar',
'7z',
}
ignore = True
if __name__ == "__main__":
ext_counter = Counter()
for filename in input_dir.rglob("*"):
if filename.is_dir():
continue
if str(filename.absolute()) == r"F:\rrwCORPUS\字幕备份\2014\0205\1dec4846ce49e78814b96dd939556192.zip":
ignore = False
if ignore:
continue
# print(filename)
ext = filename.name.split('.')[-1]
ext_counter[ext] += 1
if ext in archive_ext:
cmd = f'bz x -cp:936 -aou -y -p:letv {filename.absolute()} {out_dir.absolute()}'
print(cmd)
os.system(cmd)
# process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# # 向进程的标准输入流中写入密码,并发送换行符
# stdout, stderr = process.communicate(input=f"letv\n".encode())
# # 打印命令输出
# print(stdout.decode()) # 正常输出
# if stderr:
# print(stderr.decode()) # 错误输出(如果有的话)
print(ext_counter) 根据sql将文件复制到新的文件夹,并且解压压缩包的脚本,命令行工具使用Bandizip: tsv_reader.py from collections import Counter
from pathlib import Path
import multiprocessing as mp
import os
import csv
import json
import shutil
ARCHIVE_EXT = {
'zip',
'rar',
'7z',
}
OSS_PATH = Path(r"F:\rrwCORPUS\b2")
OUT_PATH = Path(r"F:\rrwCORPUS\flat")
OUT_PATH.mkdir(exist_ok=True)
def copy_related_files():
sid2info = {}
nameset = set()
for filename in OSS_PATH.rglob("*"):
if filename.is_dir():
continue
fp = filename.parts[3:]
setkey = '/'.join(fp)
# print(setkey, fp)
nameset.add(setkey)
# input(">>>")
print(len(nameset))
with open("st.json", 'r', encoding='utf-8') as f:
# dr = csv.DictReader(f, delimiter='\t')
jlist = json.load(f)
for line in jlist['data']:
sid2info[str(line['id'])] = line
with open("subtitle_format_rel.csv", 'r', encoding='utf-8') as f:
dr = csv.DictReader(f, delimiter='\t')
for line in dr:
sid = line.pop('subtitleid')
if sid in sid2info:
sid2info[sid].update(line)
with open("subtitle_lang_rel.csv", 'r', encoding='utf-8') as f:
dr = csv.DictReader(f, delimiter='\t')
for line in dr:
sid = line.pop('subtitleid')
if sid in sid2info:
sid2info[sid].update(line)
with open("notfound.txt", "w", encoding='utf-8') as fnf:
for sid, j in sid2info.items():
fdir: Path = OUT_PATH / sid
fdir.mkdir(exist_ok=True)
with open(fdir / "info_from_sql.json", 'w', encoding='utf-8') as f:
json.dump(j, f, ensure_ascii=False, sort_keys=True)
if not j['file']:
continue
jfile = j['file'].removeprefix('./')
attfp: Path = OSS_PATH / jfile
if not attfp.exists(): # 尝试修复截断的rar
occ = []
subs = '/'.join(attfp.parts[3:])
print("Try correct:", subs)
for name in nameset:
if name.startswith(subs):
occ.append(name)
if len(occ) == 1:
attfp = OSS_PATH / occ[0]
try:
shutil.copy(attfp, fdir)
except FileNotFoundError:
print(sid, "Not Found:", attfp)
fnf.write(f"{sid} {attfp}\n")
except PermissionError:
print(sid, "PermissionError:", attfp)
fnf.write(f"{sid} {attfp}\n")
print(fdir)
def unpack_archive_worker(q: mp.Queue):
while 1:
cmd = q.get()
if cmd is None:
return
os.popen(cmd)
def unpack_archive():
task_queue = mp.Queue()
ps = [
mp.Process(target=unpack_archive_worker, args=(task_queue,)) for _ in range(8)
]
for x in ps:
x.start()
ext_counter = Counter()
for filename in OUT_PATH.rglob("*"):
if filename.is_dir():
continue
ext = filename.name.split('.')[-1]
ext_counter[ext] += 1
if ext in ARCHIVE_EXT:
cmd = f'bz x -cp:936 -aoa -y -p:letv "{filename.absolute()}" "{filename.parent.absolute()}"'
print(filename)
task_queue.put(cmd)
for x in ps:
task_queue.put(None)
for x in ps:
x.join()
print(ext_counter)
# Counter({'srt': 195622, 'ass': 158819, 'json': 56134, 'zip': 32550, 'rar': 25104, 'txt': 714, 'tv': 373, 'ASS': 78, '7z': 72, 'SRT': 65, 'ssa': 50, 'doc': 30, 'DS_Store': 28, 'nfo': 26, 'jpg': 26, 'mht': 12, 'url': 8, 'png': 7, 'idx': 6, 'db': 6, 'delay': 5, '绠€浣_srt': 5, 'RAR': 3, 'DVD-RMVB-人人影视-softice_': 3, 'smi': 3, 'torrent': 3, 'Srt': 3, 'cfg': 3, 'sub': 2, 'docx': 2, '~a~': 2, 'sup': 2, 'html': 1, 'ZIP': 1, 'style': 1, 'King&_039': 1, '7': 1, 'tmp': 1, 'xls': 1, 'xlsx': 1, 'ttf': 1, 'TTF': 1})
if __name__ == "__main__":
# copy_related_files()
unpack_archive() 快速定位问题,尝试解压的工具脚本: parse_test.py import argparse
import pysubs2
from pathlib import Path
parser = argparse.ArgumentParser(description='''
''')
parser.add_argument('input', type=str, help='The input file path', nargs='?')
parser.add_argument('-e', '--enc', type=str, help='Encoding', default='utf-16')
args = parser.parse_args()
if __name__ == "__main__":
s = pysubs2.load(args.input, encoding=args.enc)
print(s)
for i in s[:20]:
print(i) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
人人影视字幕:https://pan.baidu.com/s/1TjAJl7pN2xLDyGtHiCmPyw 提取码:rrys 待处理
The text was updated successfully, but these errors were encountered: