-
Notifications
You must be signed in to change notification settings - Fork 54
/
收获服务器.py
174 lines (150 loc) · 5.47 KB
/
收获服务器.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import os
import random
import logging
import threading
from functools import lru_cache
from typing import Tuple
import flask
import prometheus_client
from 打点 import tqdm, tqdm面板, 直方图打点, 打点
from 信息 import 荣
from 类 import 阵
from 存储 import 索引空间
from utils import netloc, json_loads, 小清洗, 好ThreadPoolExecutor
from 配置 import 单键最多url, 单键最多相同域名url, 存储位置, 大清洗行数, 新增键需url数
logging.getLogger('werkzeug').setLevel(logging.ERROR)
app = flask.Flask(__name__)
面板 = tqdm面板(['内存键数', '收到请求数', '丢弃行数', '内存行数', '小清洗次数', '大清洗索引数'])
面板['内存行数'].total = 大清洗行数
索引行数打点 = 直方图打点('索引行数', [round(2**((i+1)*0.5)) for i in range(32)])
词抽样 = 打点('词抽样', labelnames=['word', 'f'])
索引增加行数打点 = 直方图打点('索引增加行数打点', [-1, 0, 1, 3, 7, 12, 19, 31, 48, 73, 112, 169, 256, 386, 580, 872, 1310])
prometheus_client.start_http_server(14951)
df = 索引空间(存储位置/'键')
临时df = {}
偏执 = 0
大清 = threading.Lock()
def 消重(q: 阵) -> 阵:
qq = []
有 = set()
for v, url in q:
if url in 有:
continue
有.add(url)
qq.append((v, url))
return qq
def 降解(q: 阵) -> 阵:
qq = []
有 = set()
for v, url in sorted(q, key=lambda x: -len(x[1])):
k = url
if k.startswith('https://'):
k = k[8:]
elif k.startswith('http://'):
k = k[7:]
if k.endswith('/'):
k = k[:-1]
if k in 有:
continue
有.add(k)
qq.append((v, url))
return qq
@lru_cache(maxsize=100_0000)
def 低(k: str) -> float:
l = df.get(k, [])
if len(l) < 单键最多url:
return -1
else:
return min(0.05, sorted([v for v, url in l], reverse=True)[单键最多url-1])
@app.route('/l', methods=['POST'])
def l():
global 偏执
文件名, kvs = json_loads(flask.request.data)
文件名 = 文件名.replace('\n', '')
netloc(文件名)
面板['收到请求数'].update(1)
for k, v in kvs:
if k not in 临时df:
临时df[k] = []
dfk = 临时df[k]
if len(dfk) > 15 and v < 低(k):
面板['丢弃行数'].update(1)
continue
dfk.append((v, 文件名))
面板['内存键数'].n = len(临时df)
面板['内存键数'].refresh()
大清.acquire()
偏执 += 1
if (偏执+1) % 1000 == 0:
内存行数 = sum([len(临时df[k]) for k in [*临时df]])
面板['内存行数'].update(内存行数-面板['内存行数'].n)
if 内存行数 > 大清洗行数:
偏执 = 0
大清洗()
os._exit(0)
大清.release()
return 'ok'
def 洗(item) -> Tuple[int, int, str]:
k, v = item
原v = df.get(k, [])
if not 原v:
if len(v) < 新增键需url数:
return 0, 0, '丢弃'
z = 消重(tuple(v) + tuple(原v))
if random.random() < 0.02:
z = 降解(z)
if len(z) > 单键最多url*1.1 or random.random() < 0.02:
面板['小清洗次数'].update(1)
zt = 小清洗(sorted(z, key=lambda x: x[0] * (1 + 荣(x[1])), reverse=True), 单键最多相同域名url)
z = zt[:单键最多url]
if len(z) > 30 and random.random() < 0.2:
z = sorted(z, reverse=True, key=lambda x: x[1]) # 让压缩算法高兴
df[k] = z
diff = len(z) - len(原v)
状态 = '?'
if not 原v:
状态 = '新增'
elif diff > 0:
状态 = '变长'
elif diff == 0:
状态 = '不变'
elif diff < 0:
状态 = '变短'
return len(z), diff, 状态
def 大清洗():
global 临时df
pool = 好ThreadPoolExecutor(max_workers=8)
try:
_临时df = 临时df
临时df = {}
状态统计 = {}
状态值统计 = {}
items = [*_临时df.items()]
random.shuffle(items)
for l, i, 状态 in tqdm(pool.map(洗, items), ncols=70, desc='大清洗', total=len(_临时df)):
状态统计[状态] = 状态统计.get(状态, 0) + 1
状态值统计[状态] = 状态值统计.get(状态, 0) + i
面板['大清洗索引数'].update(i)
if 状态 != '丢弃':
索引行数打点.observe(l)
索引增加行数打点.observe(i)
print(f'\n\n\n\n大清洗好了。\n增加了{sum(状态值统计.values())}行。\n键状态: {状态统计}\n键状态值: {状态值统计}')
except Exception as e:
print('完蛋了!')
logging.exception(e)
while True:
...
def _抽样检查(word: str):
s = df.get(word, [])
词抽样.labels(word=word, f='长度').set(len(s))
词抽样.labels(word=word, f='繁荣长度').set(len([i for i in s if 荣(i[1])]))
zt = 小清洗(sorted(s, key=lambda x: x[0] * (1 + 荣(x[1])), reverse=True), 单键最多相同域名url)
if zt:
小 = zt[:单键最多url][-1]
词抽样.labels(word=word, f='总能量').set(sum([i[0] * (1 + 荣(i[1])) for i in zt]))
词抽样.labels(word=word, f='最小').set(小[0])
词抽样.labels(word=word, f='最小能量').set(小[0] * (1 + 荣(小[1])))
if __name__ == '__main__':
for word in ['我', '的', '世界', 'minecraft', 'python', 'github', 'atri', '砂砾', '冰雹', '岩浆', '光谱']:
_抽样检查(word)
app.run()