From 455b14d5326952a2134928cc928e58020c5dd5ae Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Sun, 19 Feb 2023 12:47:17 +0800 Subject: [PATCH 1/4] add: more platforms --- .goreleaser.yml | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/.goreleaser.yml b/.goreleaser.yml index 9509a50..3d3b4ef 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -13,19 +13,33 @@ builds: - linux - windows - darwin + - openbsd goarch: - - 386 - amd64 - arm - arm64 - ignore: - - goos: windows - goarch: arm64 + - 386 + - s390x + - ppc64 + - ppc64le + - riscv64 + - mips + - mips64 + - mipsle + - mips64le + goarm: + - 5 + - 6 + - 7 + gomips: + - hardfloat + - softfloat archives: - replacements: darwin: Darwin linux: Linux windows: Windows + openbsd: OpenBSD 386: i386 amd64: x86_64 checksum: From 02b5355798fe2eebf628922578b27b7ae0be827d Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Thu, 23 Feb 2023 00:52:45 +0800 Subject: [PATCH 2/4] fix: list all servers --- speedtest/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/speedtest/server.go b/speedtest/server.go index a68abc2..6e6ad27 100644 --- a/speedtest/server.go +++ b/speedtest/server.go @@ -17,7 +17,7 @@ import ( ) const ( - speedTestServersUrl = "https://www.speedtest.net/api/js/servers?limit=10" + speedTestServersUrl = "https://www.speedtest.net/api/js/servers?" speedTestServersAlternativeUrl = "https://www.speedtest.net/speedtest-servers-static.php" ) From 543a64ca849602e1c45aad8b5da7fd1765d4ff92 Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Thu, 23 Feb 2023 00:55:10 +0800 Subject: [PATCH 3/4] change: use http ping --- speedtest/request.go | 55 ++++++++++++++++++++++++++++++++++-------- speedtest/server.go | 12 +++------ speedtest/speedtest.go | 2 +- 3 files changed, 50 insertions(+), 19 deletions(-) diff --git a/speedtest/request.go b/speedtest/request.go index c3185af..b89bbe1 100644 --- a/speedtest/request.go +++ b/speedtest/request.go @@ -5,7 +5,6 @@ import ( "errors" "math" "net/http" - "net/url" "strconv" "strings" "time" @@ -33,7 +32,7 @@ func (s *Server) DownloadTestContext(ctx context.Context) error { func (s *Server) downloadTestContext(ctx context.Context, downloadRequest downloadFunc) error { s.Context.DownloadRateCaptureHandler(func() { - _ = downloadRequest(ctx, s, 5) + _ = downloadRequest(ctx, s, 3) }) s.DLSpeed = s.Context.GetAvgDownloadRate() return nil @@ -51,7 +50,7 @@ func (s *Server) UploadTestContext(ctx context.Context, savingMode bool) error { func (s *Server) uploadTestContext(ctx context.Context, uploadRequest uploadFunc) error { s.Context.UploadRateCaptureHandler(func() { - _ = uploadRequest(ctx, s, 5) + _ = uploadRequest(ctx, s, 4) }) s.ULSpeed = s.Context.GetAvgUploadRate() return nil @@ -99,12 +98,10 @@ func (s *Server) PingTest() error { // PingTestContext executes test to measure latency, observing the given context. func (s *Server) PingTestContext(ctx context.Context) error { - URL, err := url.ParseRequestURI(s.URL) - if err != nil { - return err - } - pingURL := strings.Split(URL.Host, ":")[0] - vectorPingResult, err := s.StdPing(ctx, pingURL, 6666, 32, 10, time.Millisecond*200, nil) + + pingURL := strings.Split(s.URL, "/upload.php")[0] + "/latency.txt" + + vectorPingResult, err := s.HTTPPing(ctx, pingURL, 10, time.Millisecond*200, nil) if err != nil || len(vectorPingResult) == 0 { return err } @@ -117,7 +114,45 @@ func (s *Server) PingTestContext(ctx context.Context) error { return nil } -func (s *Server) StdPing( +func (s *Server) HTTPPing( + ctx context.Context, + dst string, + echoTimes int, + echoFreq time.Duration, + callback func(latency time.Duration), +) ([]int64, error) { + + failTimes := 0 + var latencies []int64 + + for i := 0; i < echoTimes; i++ { + sTime := time.Now() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, dst, nil) + if err != nil { + return nil, err + } + + resp, err := s.Context.doer.Do(req) + + endTime := time.Since(sTime) + if err != nil { + failTimes++ + continue + } + resp.Body.Close() + latencies = append(latencies, endTime.Nanoseconds()/2) + if callback != nil { + callback(endTime) + } + time.Sleep(echoFreq) + } + if failTimes == echoTimes { + return nil, errors.New("server connect timeout") + } + return latencies, nil +} + +func (s *Server) ICMPPing( ctx context.Context, dst string, readTimeout int, diff --git a/speedtest/server.go b/speedtest/server.go index 6e6ad27..9aef287 100644 --- a/speedtest/server.go +++ b/speedtest/server.go @@ -196,17 +196,12 @@ func (s *Speedtest) FetchServerListContext(ctx context.Context, user *User) (Ser } var wg sync.WaitGroup + pCtx, fc := context.WithTimeout(context.Background(), time.Second*5) for _, server := range servers { - URL, err1 := url.ParseRequestURI(server.URL) - if err1 != nil { - server.Latency = -1 - continue - } - - pingURL := strings.Split(URL.Host, ":")[0] wg.Add(1) go func(gs *Server) { - if latency, err2 := gs.StdPing(ctx, pingURL, 2000, 32, 1, time.Millisecond*100, nil); err2 != nil || len(latency) != 1 { + pingURL := strings.Split(gs.URL, "/upload.php")[0] + "/latency.txt" + if latency, err2 := gs.HTTPPing(pCtx, pingURL, 1, time.Millisecond, nil); err2 != nil || len(latency) != 1 { gs.Latency = -1 } else { gs.Latency = time.Duration(latency[0]) * time.Nanosecond @@ -216,6 +211,7 @@ func (s *Speedtest) FetchServerListContext(ctx context.Context, user *User) (Ser } wg.Wait() + fc() return servers, nil } diff --git a/speedtest/speedtest.go b/speedtest/speedtest.go index e50b906..7aea34f 100644 --- a/speedtest/speedtest.go +++ b/speedtest/speedtest.go @@ -10,7 +10,7 @@ import ( ) var ( - version = "1.4.1" + version = "1.4.2" DefaultUserAgent = fmt.Sprintf("showwin/speedtest-go %s", version) ) From 281eabac1cbcbbe3809878238607421a61c28f16 Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Sat, 25 Feb 2023 21:05:57 +0800 Subject: [PATCH 4/4] feat: test using multi server --- README.md | 2 + speedtest.go | 75 ++++++++++++++++++-- speedtest/data_manager.go | 144 +++++++++++++++++++++++++++++--------- speedtest/request.go | 53 ++++++++++++-- speedtest/server.go | 17 +++++ speedtest/speedtest.go | 2 +- 6 files changed, 250 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 51d442f..ce242b4 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,8 @@ Flags: eg: --proxy=http://10.20.0.101:7890 --source Bind a source interface for the speedtest. eg: --source=10.20.0.101 + -m --multi Enable multi mode. + -t --thread Set the number of speedtest threads. --version Show application version. ``` diff --git a/speedtest.go b/speedtest.go index ff6422c..c812920 100644 --- a/speedtest.go +++ b/speedtest.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "fmt" "log" @@ -13,7 +14,7 @@ import ( var ( showList = kingpin.Flag("list", "Show available speedtest.net servers.").Short('l').Bool() serverIds = kingpin.Flag("server", "Select server id to run speedtest.").Short('s').Ints() - customURL = kingpin.Flag("custom-url", "Specify the url of the server instead of getting a list from Speedtest.net").String() + customURL = kingpin.Flag("custom-url", "Specify the url of the server instead of getting a list from Speedtest.net.").String() savingMode = kingpin.Flag("saving-mode", "Using less memory (≒10MB), though low accuracy (especially > 30Mbps).").Bool() jsonOutput = kingpin.Flag("json", "Output results in json format").Bool() location = kingpin.Flag("location", "Change the location with a precise coordinate. Format: lat,lon").String() @@ -21,6 +22,8 @@ var ( showCityList = kingpin.Flag("city-list", "List all predefined city labels.").Bool() proxy = kingpin.Flag("proxy", "Set a proxy(http(s) or socks) for the speedtest.").String() source = kingpin.Flag("source", "Bind a source interface for the speedtest.").String() + multi = kingpin.Flag("multi", "Enable multi mode.").Short('m').Bool() + thread = kingpin.Flag("thread", "Set the number of speedtest threads.").Short('t').Int() ) type fullOutput struct { @@ -36,6 +39,7 @@ func main() { kingpin.Parse() var speedtestClient = speedtest.New() + speedtestClient.SetNThread(*thread) if len(*proxy) > 0 || len(*source) > 0 { config := &speedtest.UserConfig{ @@ -93,7 +97,11 @@ func main() { targets = []*speedtest.Server{target} } - startTest(targets, *savingMode, *jsonOutput) + if *multi { + startMultiTest(targets[0], servers, *savingMode, *jsonOutput) + } else { + startTest(targets, *savingMode, *jsonOutput, *multi) + } if *jsonOutput { jsonBytes, err := json.Marshal( @@ -109,7 +117,37 @@ func main() { } } -func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool) { +func startMultiTest(s *speedtest.Server, servers speedtest.Servers, savingMode bool, jsonOutput bool) { + // Reset DataManager counters, avoid measurement of multiple server result mixing. + s.Context.Reset() + if !jsonOutput { + showServer(s) + } + + err := s.PingTest() + checkError(err) + + if jsonOutput { + err = s.MultiDownloadTestContext(context.Background(), servers, savingMode) + checkError(err) + s.Context.Wait() + err = s.MultiUploadTestContext(context.Background(), servers, savingMode) + checkError(err) + return + } + + showLatencyResult(s) + err = testDownloadM(s, servers, savingMode) + checkError(err) + // It is necessary to wait for the release of the last test resource, + // otherwise the overload will cause excessive data deviation + s.Context.Wait() + err = testUploadM(s, servers, savingMode) + checkError(err) + showServerResult(s) +} + +func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool, multi bool) { for _, s := range servers { // Reset DataManager counters, avoid measurement of multiple server result mixing. s.Context.Reset() @@ -121,12 +159,11 @@ func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool) { checkError(err) if jsonOutput { - err := s.DownloadTest(savingMode) + err = s.DownloadTest(savingMode) checkError(err) s.Context.Wait() err = s.UploadTest(savingMode) checkError(err) - continue } @@ -147,6 +184,34 @@ func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool) { } } +func testDownloadM(server *speedtest.Server, servers speedtest.Servers, savingMode bool) error { + quit := make(chan bool) + fmt.Printf("Download Test: ") + go dots(quit) + err := server.MultiDownloadTestContext(context.Background(), servers, savingMode) + checkError(err) + quit <- true + if err != nil { + return err + } + fmt.Println() + return err +} + +func testUploadM(server *speedtest.Server, servers speedtest.Servers, savingMode bool) error { + quit := make(chan bool) + fmt.Printf("Upload Test: ") + go dots(quit) + err := server.MultiUploadTestContext(context.Background(), servers, savingMode) + checkError(err) + quit <- true + if err != nil { + return err + } + fmt.Println() + return nil +} + func testDownload(server *speedtest.Server, savingMode bool) error { quit := make(chan bool) fmt.Printf("Download Test: ") diff --git a/speedtest/data_manager.go b/speedtest/data_manager.go index 8b91318..11e4fa7 100644 --- a/speedtest/data_manager.go +++ b/speedtest/data_manager.go @@ -28,12 +28,14 @@ type Manager interface { CallbackDownloadRate(callback func(downRate float64)) *time.Ticker CallbackUploadRate(callback func(upRate float64)) *time.Ticker - DownloadRateCaptureHandler(fn func()) - UploadRateCaptureHandler(fn func()) + RegisterDownloadHandler(fn func()) *FuncGroup + RegisterUploadHandler(fn func()) *FuncGroup // Wait for the upload or download task to end to avoid errors caused by core occupation Wait() Reset() + + SetNThread(n int) Manager } type Chunk interface { @@ -55,6 +57,15 @@ const TypeEmptyChunk = 0 const TypeDownload = 1 const TypeUpload = 2 +type FuncGroup struct { + fns []func() + manager *DataManager +} + +func (f *FuncGroup) Add(fn func()) { + f.fns = append(f.fns, fn) +} + type DataManager struct { totalDownload int64 totalUpload int64 @@ -70,6 +81,11 @@ type DataManager struct { captureTime time.Duration rateCaptureFrequency time.Duration nThread int + + running bool + + dFn *FuncGroup + uFn *FuncGroup } func NewDataManager() *DataManager { @@ -78,6 +94,8 @@ func NewDataManager() *DataManager { captureTime: time.Second * 10, rateCaptureFrequency: time.Second, } + ret.dFn = &FuncGroup{manager: ret} + ret.uFn = &FuncGroup{manager: ret} return ret } @@ -130,56 +148,103 @@ func (dm *DataManager) Wait() { } } -func (dm *DataManager) DownloadRateCaptureHandler(fn func()) { - dm.testHandler(dm.downloadRateCapture, fn) +func (dm *DataManager) RegisterUploadHandler(fn func()) *FuncGroup { + if len(dm.uFn.fns) < dm.nThread { + dm.uFn.Add(fn) + } + return dm.uFn } -func (dm *DataManager) UploadRateCaptureHandler(fn func()) { - dm.testHandler(dm.uploadRateCapture, fn) +func (dm *DataManager) RegisterDownloadHandler(fn func()) *FuncGroup { + if len(dm.dFn.fns) < dm.nThread { + dm.dFn.Add(fn) + } + return dm.dFn } -func (dm *DataManager) testHandler(captureFunc func() *time.Ticker, fn func()) { - ticker := captureFunc() - running := true +func (f *FuncGroup) Start(mainRequestHandlerIndex int) { + if len(f.fns) == 0 { + panic("empty task stack") + } + if mainRequestHandlerIndex > len(f.fns)-1 { + mainRequestHandlerIndex = 0 + } + mainLoadFactor := 0.3 + // When the number of processor cores is equivalent to the processing program, + // the processing efficiency reaches the highest level (VT is not considered). + mainN := int(mainLoadFactor * float64(len(f.fns))) + if mainN == 0 { + mainN = 1 + } + if len(f.fns) == 1 { + mainN = f.manager.nThread + } + auxN := f.manager.nThread - mainN + wg := sync.WaitGroup{} - time.AfterFunc(dm.captureTime, func() { + f.manager.running = true + ticker := f.manager.rateCapture() + time.AfterFunc(f.manager.captureTime, func() { ticker.Stop() - running = false + f.manager.running = false }) - // When the number of processor cores is equivalent to the processing program, - // the processing efficiency reaches the highest level (VT is not considered). - for i := 0; i < dm.nThread; i++ { + + for i := 0; i < mainN; i++ { wg.Add(1) go func() { defer wg.Done() for { - if !running { + if !f.manager.running { return } - fn() + f.fns[mainRequestHandlerIndex]() } }() } - wg.Wait() -} - -func (dm *DataManager) downloadRateCapture() *time.Ticker { - return dm.rateCapture(dm.GetTotalDownload, &dm.DownloadRateSequence) -} + for j := 0; j < auxN; { + for i := range f.fns { + if j == auxN { + break + } + if i == mainRequestHandlerIndex { + continue + } + wg.Add(1) + t := i + go func() { + defer wg.Done() + for { + if !f.manager.running { + return + } + f.fns[t]() + } + }() + j++ + } + } -func (dm *DataManager) uploadRateCapture() *time.Ticker { - return dm.rateCapture(dm.GetTotalUpload, &dm.UploadRateSequence) + wg.Wait() } -func (dm *DataManager) rateCapture(rateFunc func() int64, dst *[]int64) *time.Ticker { +func (dm *DataManager) rateCapture() *time.Ticker { ticker := time.NewTicker(dm.rateCaptureFrequency) - oldTotal := rateFunc() + oldTotalDownload := dm.totalDownload + oldTotalUpload := dm.totalUpload go func() { for range ticker.C { - newTotal := rateFunc() - delta := newTotal - oldTotal - oldTotal = newTotal - *dst = append(*dst, delta) + newTotalDownload := dm.totalDownload + newTotalUpload := dm.totalUpload + deltaDownload := newTotalDownload - oldTotalDownload + deltaUpload := newTotalUpload - oldTotalUpload + oldTotalDownload = newTotalDownload + oldTotalUpload = newTotalUpload + if deltaDownload != 0 { + dm.DownloadRateSequence = append(dm.DownloadRateSequence, deltaDownload) + } + if deltaUpload != 0 { + dm.UploadRateSequence = append(dm.UploadRateSequence, deltaUpload) + } } }() return ticker @@ -221,8 +286,12 @@ func (dm *DataManager) SetCaptureTime(duration time.Duration) *DataManager { return dm } -func (dm *DataManager) SetNThread(n int) *DataManager { - dm.nThread = n +func (dm *DataManager) SetNThread(n int) Manager { + if n < 1 { + dm.nThread = runtime.NumCPU() + } else { + dm.nThread = n + } return dm } @@ -232,6 +301,8 @@ func (dm *DataManager) Reset() { dm.DataGroup = []*DataChunk{} dm.DownloadRateSequence = []int64{} dm.UploadRateSequence = []int64{} + dm.dFn.fns = []func(){} + dm.uFn.fns = []func(){} } func (dm *DataManager) GetAvgDownloadRate() float64 { @@ -350,6 +421,9 @@ func (dc *DataChunk) Read(b []byte) (n int, err error) { // calcMAFilter Median-Averaging Filter func calcMAFilter(list []int64) float64 { + if len(list) == 0 { + return 0 + } var sum int64 = 0 n := len(list) if n == 0 { @@ -370,6 +444,9 @@ func calcMAFilter(list []int64) float64 { } func pautaFilter(vector []int64) []int64 { + if len(vector) == 0 { + return vector + } mean, _, std, _, _ := sampleVariance(vector) var retVec []int64 for _, value := range vector { @@ -382,6 +459,9 @@ func pautaFilter(vector []int64) []int64 { // standardDeviation sample Variance func sampleVariance(vector []int64) (mean, variance, stdDev, min, max int64) { + if len(vector) == 0 { + return 0, 0, 0, 0, 0 + } var sumNum, accumulate int64 min = math.MaxInt64 max = math.MinInt64 diff --git a/speedtest/request.go b/speedtest/request.go index b89bbe1..12b0855 100644 --- a/speedtest/request.go +++ b/speedtest/request.go @@ -20,6 +20,50 @@ var ( ulSizes = [...]int{100, 300, 500, 800, 1000, 1500, 2500, 3000, 3500, 4000} // kB ) +func (s *Server) MultiDownloadTestContext(ctx context.Context, servers Servers, savingMode bool) error { + ss := servers.Available() + if ss.Len() == 0 { + return errors.New("not found available servers") + } + mainIDIndex := 0 + var fp *FuncGroup + for i, server := range *ss { + if server.ID == s.ID { + mainIDIndex = i + } + sp := server + fp = server.Context.RegisterDownloadHandler(func() { + _ = downloadRequest(ctx, sp, 3) + }) + } + fp.Start(mainIDIndex) // block here + //var serverPointer *Server = (*ss)[0] + s.DLSpeed = fp.manager.GetAvgDownloadRate() + return nil +} + +func (s *Server) MultiUploadTestContext(ctx context.Context, servers Servers, savingMode bool) error { + ss := servers.Available() + if ss.Len() == 0 { + return errors.New("not found available servers") + } + mainIDIndex := 0 + var fp *FuncGroup + for i, server := range *ss { + if server.ID == s.ID { + mainIDIndex = i + } + sp := server + fp = server.Context.RegisterUploadHandler(func() { + _ = uploadRequest(ctx, sp, 3) + }) + } + fp.Start(mainIDIndex) // block here + //var serverPointer *Server = (*ss)[0] + s.ULSpeed = fp.manager.GetAvgUploadRate() + return nil +} + // DownloadTest executes the test to measure download speed func (s *Server) DownloadTest(savingMode bool) error { return s.downloadTestContext(context.Background(), downloadRequest) @@ -31,9 +75,9 @@ func (s *Server) DownloadTestContext(ctx context.Context) error { } func (s *Server) downloadTestContext(ctx context.Context, downloadRequest downloadFunc) error { - s.Context.DownloadRateCaptureHandler(func() { + s.Context.RegisterDownloadHandler(func() { _ = downloadRequest(ctx, s, 3) - }) + }).Start(0) s.DLSpeed = s.Context.GetAvgDownloadRate() return nil } @@ -49,9 +93,9 @@ func (s *Server) UploadTestContext(ctx context.Context, savingMode bool) error { } func (s *Server) uploadTestContext(ctx context.Context, uploadRequest uploadFunc) error { - s.Context.UploadRateCaptureHandler(func() { + s.Context.RegisterUploadHandler(func() { _ = uploadRequest(ctx, s, 4) - }) + }).Start(0) s.ULSpeed = s.Context.GetAvgUploadRate() return nil } @@ -59,7 +103,6 @@ func (s *Server) uploadTestContext(ctx context.Context, uploadRequest uploadFunc func downloadRequest(ctx context.Context, s *Server, w int) error { size := dlSizes[w] xdlURL := strings.Split(s.URL, "/upload.php")[0] + "/random" + strconv.Itoa(size) + "x" + strconv.Itoa(size) + ".jpg" - req, err := http.NewRequestWithContext(ctx, http.MethodGet, xdlURL, nil) if err != nil { return err diff --git a/speedtest/server.go b/speedtest/server.go index 9aef287..f11ea8f 100644 --- a/speedtest/server.go +++ b/speedtest/server.go @@ -92,6 +92,23 @@ type ByDistance struct { Servers } +func (servers Servers) Available() *Servers { + retServer := Servers{} + for _, server := range servers { + if server.Latency != -1 { + retServer = append(retServer, server) + } + } + for i := 0; i < len(retServer)-1; i++ { + for j := 0; j < len(retServer)-i-1; j++ { + if retServer[j].Latency > retServer[j+1].Latency { + retServer[j], retServer[j+1] = retServer[j+1], retServer[j] + } + } + } + return &retServer +} + // Len finds length of servers. For sorting servers. func (servers Servers) Len() int { return len(servers) diff --git a/speedtest/speedtest.go b/speedtest/speedtest.go index 7aea34f..586cade 100644 --- a/speedtest/speedtest.go +++ b/speedtest/speedtest.go @@ -10,7 +10,7 @@ import ( ) var ( - version = "1.4.2" + version = "1.5.0" DefaultUserAgent = fmt.Sprintf("showwin/speedtest-go %s", version) )