-
Notifications
You must be signed in to change notification settings - Fork 77
/
multipleValueModeWrite.txt
72 lines (61 loc) · 2.66 KB
/
multipleValueModeWrite.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def createDatabase(dbName,tableName, ps1, ps2, numMetrics){
m = "tag" + string(1..numMetrics)
schema = table(1:0,`id`datetime join m, [INT,DATETIME] join take(FLOAT,50) )
db1 = database("",VALUE,ps1)
db2 = database("",RANGE,ps2)
db = database(dbName,COMPO,[db1,db2])
db.createPartitionedTable(schema,tableName,`datetime`id)
}
def generate1DayData(day,id,freqPerDay,numMetrics){
startTime = day.datetime()
idSize = size(id)
numRecords = freqPerDay * idSize
idVec = array(INT, numRecords)
for(i in 0:idSize) idVec[(i*freqPerDay) : ((i+1)*freqPerDay)] = id[i]
t = table(idVec ,take(startTime + (0..(freqPerDay-1)), numRecords) as ts)
m = "tag" + string(1..numMetrics)
for (i in 0 : numMetrics) t[m[i]] =rand(1.0, numRecords)
return t
}
def singleThreadWriting(id, startDay, days, freqPerDay, numMachinesPerPartition,numMetrics,dbName,tableName){
t = loadTable(dbName,tableName)
idSize=size(id)
for(d in 0:days){
index=0
do{
idMax= numMachinesPerPartition - 1
if(idSize - index <= 9) idMax = idSize - index - 1
t.append!(generate1DayData(startDay + d,id[index+0..idMax],freqPerDay,numMetrics))
index +=numMachinesPerPartition
}while (index < idSize)
}
}
def multipleThreadWriting(id, startDay, days,freqPerDay, numMachinesPerPartition,numMetrics, threads,dbName,tableName) {
//split devVec to multiple part for parallel writing
idCountPerThread = ceil(id.size() \ threads/10)*10
ploop(singleThreadWriting{,startDay, days,freqPerDay, numMachinesPerPartition,numMetrics,dbName,tableName}, id.cut(idCountPerThread))
}
def mainJob(id,startDay,days, ps1, ps2,freqPerDay, numMachinesPerPartition,threads) {
dbName="dfs://mvmDemo"
tableName="machines"
numMetrics = 50
if(existsDatabase(dbName))
dropDatabase(dbName)
createDatabase(dbName,tableName, ps1, ps2, numMetrics)
if(threads == 1)
submitJob("submit_singleThreadWriting", "write data", singleThreadWriting{id, startDay, days,freqPerDay, numMachinesPerPartition,numMetrics,dbName,tableName})
else
submitJob("submit_multiThreadWriting", "write data", multipleThreadWriting{id, startDay, days,freqPerDay, numMachinesPerPartition,numMetrics, threads,dbName,tableName})
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
login("admin","123456")
freqPerDay=86400
numMachines=100
id=1..numMachines //设备编号
startDay=2020.09.01//开始日期
days=5 //写几天数据
threads = 20 //多个线程同时写
numMachinesPerPartition=10
ps1=2020.09.01..2020.12.31
ps2=numMachinesPerPartition*(0..(numMachines/numMachinesPerPartition))+1
mainJob(id, startDay, days, ps1, ps2, freqPerDay, numMachinesPerPartition, threads)