Skip to content

Commit

Permalink
Merge pull request #110 from r3inbowari/add/multi
Browse files Browse the repository at this point in the history
feat: test using multi server
  • Loading branch information
r3inbowari authored Feb 25, 2023
2 parents 4a107ff + 281eaba commit 4d46617
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 43 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Flags:
eg: --proxy=http://10.20.0.101:7890
--source Bind a source interface for the speedtest.
eg: --source=10.20.0.101
-m --multi Enable multi mode.
-t --thread Set the number of speedtest threads.
--version Show application version.
```

Expand Down
75 changes: 70 additions & 5 deletions speedtest.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
Expand All @@ -13,14 +14,16 @@ import (
var (
showList = kingpin.Flag("list", "Show available speedtest.net servers.").Short('l').Bool()
serverIds = kingpin.Flag("server", "Select server id to run speedtest.").Short('s').Ints()
customURL = kingpin.Flag("custom-url", "Specify the url of the server instead of getting a list from Speedtest.net").String()
customURL = kingpin.Flag("custom-url", "Specify the url of the server instead of getting a list from Speedtest.net.").String()
savingMode = kingpin.Flag("saving-mode", "Using less memory (≒10MB), though low accuracy (especially > 30Mbps).").Bool()
jsonOutput = kingpin.Flag("json", "Output results in json format").Bool()
location = kingpin.Flag("location", "Change the location with a precise coordinate. Format: lat,lon").String()
city = kingpin.Flag("city", "Change the location with a predefined city label.").String()
showCityList = kingpin.Flag("city-list", "List all predefined city labels.").Bool()
proxy = kingpin.Flag("proxy", "Set a proxy(http(s) or socks) for the speedtest.").String()
source = kingpin.Flag("source", "Bind a source interface for the speedtest.").String()
multi = kingpin.Flag("multi", "Enable multi mode.").Short('m').Bool()
thread = kingpin.Flag("thread", "Set the number of speedtest threads.").Short('t').Int()
)

type fullOutput struct {
Expand All @@ -36,6 +39,7 @@ func main() {
kingpin.Parse()

var speedtestClient = speedtest.New()
speedtestClient.SetNThread(*thread)

if len(*proxy) > 0 || len(*source) > 0 {
config := &speedtest.UserConfig{
Expand Down Expand Up @@ -93,7 +97,11 @@ func main() {
targets = []*speedtest.Server{target}
}

startTest(targets, *savingMode, *jsonOutput)
if *multi {
startMultiTest(targets[0], servers, *savingMode, *jsonOutput)
} else {
startTest(targets, *savingMode, *jsonOutput, *multi)
}

if *jsonOutput {
jsonBytes, err := json.Marshal(
Expand All @@ -109,7 +117,37 @@ func main() {
}
}

func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool) {
func startMultiTest(s *speedtest.Server, servers speedtest.Servers, savingMode bool, jsonOutput bool) {
// Reset DataManager counters, avoid measurement of multiple server result mixing.
s.Context.Reset()
if !jsonOutput {
showServer(s)
}

err := s.PingTest()
checkError(err)

if jsonOutput {
err = s.MultiDownloadTestContext(context.Background(), servers, savingMode)
checkError(err)
s.Context.Wait()
err = s.MultiUploadTestContext(context.Background(), servers, savingMode)
checkError(err)
return
}

showLatencyResult(s)
err = testDownloadM(s, servers, savingMode)
checkError(err)
// It is necessary to wait for the release of the last test resource,
// otherwise the overload will cause excessive data deviation
s.Context.Wait()
err = testUploadM(s, servers, savingMode)
checkError(err)
showServerResult(s)
}

func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool, multi bool) {
for _, s := range servers {
// Reset DataManager counters, avoid measurement of multiple server result mixing.
s.Context.Reset()
Expand All @@ -121,12 +159,11 @@ func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool) {
checkError(err)

if jsonOutput {
err := s.DownloadTest(savingMode)
err = s.DownloadTest(savingMode)
checkError(err)
s.Context.Wait()
err = s.UploadTest(savingMode)
checkError(err)

continue
}

Expand All @@ -147,6 +184,34 @@ func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool) {
}
}

func testDownloadM(server *speedtest.Server, servers speedtest.Servers, savingMode bool) error {
quit := make(chan bool)
fmt.Printf("Download Test: ")
go dots(quit)
err := server.MultiDownloadTestContext(context.Background(), servers, savingMode)
checkError(err)
quit <- true
if err != nil {
return err
}
fmt.Println()
return err
}

func testUploadM(server *speedtest.Server, servers speedtest.Servers, savingMode bool) error {
quit := make(chan bool)
fmt.Printf("Upload Test: ")
go dots(quit)
err := server.MultiUploadTestContext(context.Background(), servers, savingMode)
checkError(err)
quit <- true
if err != nil {
return err
}
fmt.Println()
return nil
}

func testDownload(server *speedtest.Server, savingMode bool) error {
quit := make(chan bool)
fmt.Printf("Download Test: ")
Expand Down
144 changes: 112 additions & 32 deletions speedtest/data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ type Manager interface {
CallbackDownloadRate(callback func(downRate float64)) *time.Ticker
CallbackUploadRate(callback func(upRate float64)) *time.Ticker

DownloadRateCaptureHandler(fn func())
UploadRateCaptureHandler(fn func())
RegisterDownloadHandler(fn func()) *FuncGroup
RegisterUploadHandler(fn func()) *FuncGroup

// Wait for the upload or download task to end to avoid errors caused by core occupation
Wait()
Reset()

SetNThread(n int) Manager
}

type Chunk interface {
Expand All @@ -55,6 +57,15 @@ const TypeEmptyChunk = 0
const TypeDownload = 1
const TypeUpload = 2

type FuncGroup struct {
fns []func()
manager *DataManager
}

func (f *FuncGroup) Add(fn func()) {
f.fns = append(f.fns, fn)
}

type DataManager struct {
totalDownload int64
totalUpload int64
Expand All @@ -70,6 +81,11 @@ type DataManager struct {
captureTime time.Duration
rateCaptureFrequency time.Duration
nThread int

running bool

dFn *FuncGroup
uFn *FuncGroup
}

func NewDataManager() *DataManager {
Expand All @@ -78,6 +94,8 @@ func NewDataManager() *DataManager {
captureTime: time.Second * 10,
rateCaptureFrequency: time.Second,
}
ret.dFn = &FuncGroup{manager: ret}
ret.uFn = &FuncGroup{manager: ret}
return ret
}

Expand Down Expand Up @@ -130,56 +148,103 @@ func (dm *DataManager) Wait() {
}
}

func (dm *DataManager) DownloadRateCaptureHandler(fn func()) {
dm.testHandler(dm.downloadRateCapture, fn)
func (dm *DataManager) RegisterUploadHandler(fn func()) *FuncGroup {
if len(dm.uFn.fns) < dm.nThread {
dm.uFn.Add(fn)
}
return dm.uFn
}

func (dm *DataManager) UploadRateCaptureHandler(fn func()) {
dm.testHandler(dm.uploadRateCapture, fn)
func (dm *DataManager) RegisterDownloadHandler(fn func()) *FuncGroup {
if len(dm.dFn.fns) < dm.nThread {
dm.dFn.Add(fn)
}
return dm.dFn
}

func (dm *DataManager) testHandler(captureFunc func() *time.Ticker, fn func()) {
ticker := captureFunc()
running := true
func (f *FuncGroup) Start(mainRequestHandlerIndex int) {
if len(f.fns) == 0 {
panic("empty task stack")
}
if mainRequestHandlerIndex > len(f.fns)-1 {
mainRequestHandlerIndex = 0
}
mainLoadFactor := 0.3
// When the number of processor cores is equivalent to the processing program,
// the processing efficiency reaches the highest level (VT is not considered).
mainN := int(mainLoadFactor * float64(len(f.fns)))
if mainN == 0 {
mainN = 1
}
if len(f.fns) == 1 {
mainN = f.manager.nThread
}
auxN := f.manager.nThread - mainN

wg := sync.WaitGroup{}
time.AfterFunc(dm.captureTime, func() {
f.manager.running = true
ticker := f.manager.rateCapture()
time.AfterFunc(f.manager.captureTime, func() {
ticker.Stop()
running = false
f.manager.running = false
})
// When the number of processor cores is equivalent to the processing program,
// the processing efficiency reaches the highest level (VT is not considered).
for i := 0; i < dm.nThread; i++ {

for i := 0; i < mainN; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if !running {
if !f.manager.running {
return
}
fn()
f.fns[mainRequestHandlerIndex]()
}
}()
}
wg.Wait()
}

func (dm *DataManager) downloadRateCapture() *time.Ticker {
return dm.rateCapture(dm.GetTotalDownload, &dm.DownloadRateSequence)
}
for j := 0; j < auxN; {
for i := range f.fns {
if j == auxN {
break
}
if i == mainRequestHandlerIndex {
continue
}
wg.Add(1)
t := i
go func() {
defer wg.Done()
for {
if !f.manager.running {
return
}
f.fns[t]()
}
}()
j++
}
}

func (dm *DataManager) uploadRateCapture() *time.Ticker {
return dm.rateCapture(dm.GetTotalUpload, &dm.UploadRateSequence)
wg.Wait()
}

func (dm *DataManager) rateCapture(rateFunc func() int64, dst *[]int64) *time.Ticker {
func (dm *DataManager) rateCapture() *time.Ticker {
ticker := time.NewTicker(dm.rateCaptureFrequency)
oldTotal := rateFunc()
oldTotalDownload := dm.totalDownload
oldTotalUpload := dm.totalUpload
go func() {
for range ticker.C {
newTotal := rateFunc()
delta := newTotal - oldTotal
oldTotal = newTotal
*dst = append(*dst, delta)
newTotalDownload := dm.totalDownload
newTotalUpload := dm.totalUpload
deltaDownload := newTotalDownload - oldTotalDownload
deltaUpload := newTotalUpload - oldTotalUpload
oldTotalDownload = newTotalDownload
oldTotalUpload = newTotalUpload
if deltaDownload != 0 {
dm.DownloadRateSequence = append(dm.DownloadRateSequence, deltaDownload)
}
if deltaUpload != 0 {
dm.UploadRateSequence = append(dm.UploadRateSequence, deltaUpload)
}
}
}()
return ticker
Expand Down Expand Up @@ -221,8 +286,12 @@ func (dm *DataManager) SetCaptureTime(duration time.Duration) *DataManager {
return dm
}

func (dm *DataManager) SetNThread(n int) *DataManager {
dm.nThread = n
func (dm *DataManager) SetNThread(n int) Manager {
if n < 1 {
dm.nThread = runtime.NumCPU()
} else {
dm.nThread = n
}
return dm
}

Expand All @@ -232,6 +301,8 @@ func (dm *DataManager) Reset() {
dm.DataGroup = []*DataChunk{}
dm.DownloadRateSequence = []int64{}
dm.UploadRateSequence = []int64{}
dm.dFn.fns = []func(){}
dm.uFn.fns = []func(){}
}

func (dm *DataManager) GetAvgDownloadRate() float64 {
Expand Down Expand Up @@ -350,6 +421,9 @@ func (dc *DataChunk) Read(b []byte) (n int, err error) {

// calcMAFilter Median-Averaging Filter
func calcMAFilter(list []int64) float64 {
if len(list) == 0 {
return 0
}
var sum int64 = 0
n := len(list)
if n == 0 {
Expand All @@ -370,6 +444,9 @@ func calcMAFilter(list []int64) float64 {
}

func pautaFilter(vector []int64) []int64 {
if len(vector) == 0 {
return vector
}
mean, _, std, _, _ := sampleVariance(vector)
var retVec []int64
for _, value := range vector {
Expand All @@ -382,6 +459,9 @@ func pautaFilter(vector []int64) []int64 {

// standardDeviation sample Variance
func sampleVariance(vector []int64) (mean, variance, stdDev, min, max int64) {
if len(vector) == 0 {
return 0, 0, 0, 0, 0
}
var sumNum, accumulate int64
min = math.MaxInt64
max = math.MinInt64
Expand Down
Loading

0 comments on commit 4d46617

Please sign in to comment.