Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add traffic limiting capabilities #118

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions config_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,14 @@ keys:
port: 9001
cipher: chacha20-ietf-poly1305
secret: Secret2
traffic_limits:
large_scale_limit: 100000000000
large_scale_period: "30d"
small_scale_limit: 12800000000
small_scale_period: "1s"

default_traffic_limits:
large_scale_limit: 2000000000
large_scale_period: "30d"
small_scale_limit: 128000000
small_scale_period: "1s"
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/prometheus/procfs v0.1.3 // indirect
github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3 // indirect
golang.org/x/sys v0.0.0-20200824131525-c12d262b63d8 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
google.golang.org/protobuf v1.23.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200824131525-c12d262b63d8 h1:AvbQYmiaaaza3cW3QXRyPo5kYgpFIzOAfeAAN7m3qQ4=
golang.org/x/sys v0.0.0-20200824131525-c12d262b63d8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
226 changes: 219 additions & 7 deletions integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func startUDPEchoServer(t testing.TB) (*net.UDPConn, *sync.WaitGroup) {
return conn, &running
}

func makeLimiter(cipherList service.CipherList) service.TrafficLimiter {
c := service.MakeTestTrafficLimiterConfig(cipherList)
return service.NewTrafficLimiter(&c)
}

func TestTCPEcho(t *testing.T) {
echoListener, echoRunning := startTCPEchoServer(t)

Expand All @@ -111,7 +116,7 @@ func TestTCPEcho(t *testing.T) {
}
replayCache := service.NewReplayCache(5)
const testTimeout = 200 * time.Millisecond
proxy := service.NewTCPService(cipherList, &replayCache, &metrics.NoOpMetrics{}, testTimeout)
proxy := service.NewTCPService(cipherList, &replayCache, &metrics.NoOpMetrics{}, testTimeout, makeLimiter(cipherList))
proxy.SetTargetIPValidator(allowAll)
go proxy.Serve(proxyListener)

Expand Down Expand Up @@ -164,6 +169,213 @@ func TestTCPEcho(t *testing.T) {
echoRunning.Wait()
}

func TestTrafficLimiterTCP(t *testing.T) {
echoListener, echoRunning := startTCPEchoServer(t)

proxyListener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0})
if err != nil {
t.Fatalf("ListenTCP failed: %v", err)
}
secrets := ss.MakeTestSecrets(1)
cipherList, err := service.MakeTestCiphers(secrets)
if err != nil {
t.Fatal(err)
}
replayCache := service.NewReplayCache(5)
const testTimeout = 5 * time.Second

key := cipherList.SnapshotForClientIP(net.IP{})[0].Value.(*service.CipherEntry).ID
const tok = 1024
trafficLimiter := service.NewTrafficLimiter(&service.TrafficLimiterConfig{
KeyToLimits: map[string]*service.TrafficLimits{
key: &service.KeyLimits{
LargeScaleLimit: 80 * tok,
LargeScalePeriod: 60 * time.Second,
SmallScaleLimit: 10 * tok,
SmallScalePeriod: 100 * time.Millisecond,
},
},
})

proxy := service.NewTCPService(cipherList, &replayCache, &metrics.NoOpMetrics{}, testTimeout, trafficLimiter)
proxy.SetTargetIPValidator(allowAll)
go proxy.Serve(proxyListener)

proxyHost, proxyPort, err := net.SplitHostPort(proxyListener.Addr().String())
if err != nil {
t.Fatal(err)
}
portNum, err := strconv.Atoi(proxyPort)
if err != nil {
t.Fatal(err)
}
client, err := client.NewClient(proxyHost, portNum, secrets[0], ss.TestCipher)
if err != nil {
t.Fatalf("Failed to create ShadowsocksClient: %v", err)
}

{
const N = 20 * tok
up := ss.MakeTestPayload(N)
conn, err := client.DialTCP(nil, echoListener.Addr().String())
if err != nil {
t.Fatalf("ShadowsocksClient.DialTCP failed: %v", err)
}
start := time.Now()
n, err := conn.Write(up)
if err != nil {
t.Fatal(err)
}
if n != N {
t.Fatalf("Tried to upload %d bytes, but only sent %d", N, n)
}

down := make([]byte, N)
n, err = io.ReadFull(conn, down)
if err != nil && err != io.EOF {
t.Fatal(err)
}
if n != N {
t.Fatalf("Expected to download %d bytes, but only received %d", N, n)
}
if time.Now().Sub(start) < 100*time.Millisecond {
t.Fatalf("Download too fast")
}

if !bytes.Equal(up, down) {
t.Fatal("Echo mismatch")
}

conn.Close()
}

{
const N = 50 * tok
up := ss.MakeTestPayload(N)
conn, err := client.DialTCP(nil, echoListener.Addr().String())
if err != nil {
t.Fatalf("ShadowsocksClient.DialTCP failed: %v", err)
}

_, err = conn.Write(up)
if err != nil {
// No write error is expected
// as proxy just discards all the input
t.Fatalf("Unexpected error: %v", err)
}

down := make([]byte, N)
_, err = io.ReadFull(conn, down)
if err == nil {
t.Fatalf("Expected read error")
}
conn.Close()
}

proxy.Stop()
echoListener.Close()
echoRunning.Wait()
}

func TestTrafficLimiterUDP(t *testing.T) {
echoConn, echoRunning := startUDPEchoServer(t)

proxyConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0})
if err != nil {
t.Fatalf("ListenTCP failed: %v", err)
}
secrets := ss.MakeTestSecrets(1)
cipherList, err := service.MakeTestCiphers(secrets)
if err != nil {
t.Fatal(err)
}
testMetrics := &fakeUDPMetrics{fakeLocation: "QQ"}

const tok = 1024
key := cipherList.SnapshotForClientIP(net.IP{})[0].Value.(*service.CipherEntry).ID
smallScalePeriod := 100 * time.Millisecond
trafficLimiter := service.NewTrafficLimiter(&service.TrafficLimiterConfig{
KeyToLimits: map[string]*service.TrafficLimits{
key: &service.KeyLimits{
LargeScaleLimit: 100 * tok,
LargeScalePeriod: 60 * time.Second,
SmallScaleLimit: 10 * tok,
SmallScalePeriod: smallScalePeriod,
},
},
})

proxy := service.NewUDPService(time.Hour, cipherList, testMetrics, trafficLimiter)
proxy.SetTargetIPValidator(allowAll)
go proxy.Serve(proxyConn)

proxyHost, proxyPort, err := net.SplitHostPort(proxyConn.LocalAddr().String())
if err != nil {
t.Fatal(err)
}
portNum, err := strconv.Atoi(proxyPort)
if err != nil {
t.Fatal(err)
}
client, err := client.NewClient(proxyHost, portNum, secrets[0], ss.TestCipher)
if err != nil {
t.Fatalf("Failed to create ShadowsocksClient: %v", err)
}
conn, err := client.ListenUDP(nil)
if err != nil {
t.Fatalf("ShadowsocksClient.ListenUDP failed: %v", err)
}

run := func(N int, expectReadError bool) {
up := ss.MakeTestPayload(N)
n, err := conn.WriteTo(up, echoConn.LocalAddr())
if err != nil {
t.Fatal(err)
}
if n != N {
t.Fatalf("Tried to upload %d bytes, but only sent %d", N, n)
}

conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))

down := make([]byte, N)
n, addr, err := conn.ReadFrom(down)
if err != nil {
if !expectReadError {
t.Fatalf("Unexpected read error: %v", err)
}
return
} else {
if expectReadError {
t.Fatalf("Expected read error")
}
}
if n != N {
t.Fatalf("Tried to download %d bytes, but only sent %d", N, n)
}
if addr.String() != echoConn.LocalAddr().String() {
t.Errorf("Reported address mismatch: %s != %s", addr.String(), echoConn.LocalAddr().String())
}

if !bytes.Equal(up, down) {
t.Fatal("Echo mismatch")
}
}

for i := 0; i < 7; i++ {
run(5*tok, false)
run(5*tok, true)
time.Sleep(smallScalePeriod)
}

run(10*tok, true)

conn.Close()
echoConn.Close()
echoRunning.Wait()
proxy.GracefulStop()
}

type statusMetrics struct {
metrics.NoOpMetrics
sync.Mutex
Expand All @@ -184,7 +396,7 @@ func TestRestrictedAddresses(t *testing.T) {
require.NoError(t, err)
const testTimeout = 200 * time.Millisecond
testMetrics := &statusMetrics{}
proxy := service.NewTCPService(cipherList, nil, testMetrics, testTimeout)
proxy := service.NewTCPService(cipherList, nil, testMetrics, testTimeout, makeLimiter(cipherList))
go proxy.Serve(proxyListener)

proxyHost, proxyPort, err := net.SplitHostPort(proxyListener.Addr().String())
Expand Down Expand Up @@ -266,7 +478,7 @@ func TestUDPEcho(t *testing.T) {
t.Fatal(err)
}
testMetrics := &fakeUDPMetrics{fakeLocation: "QQ"}
proxy := service.NewUDPService(time.Hour, cipherList, testMetrics)
proxy := service.NewUDPService(time.Hour, cipherList, testMetrics, makeLimiter(cipherList))
proxy.SetTargetIPValidator(allowAll)
go proxy.Serve(proxyConn)

Expand Down Expand Up @@ -363,7 +575,7 @@ func BenchmarkTCPThroughput(b *testing.B) {
b.Fatal(err)
}
const testTimeout = 200 * time.Millisecond
proxy := service.NewTCPService(cipherList, nil, &metrics.NoOpMetrics{}, testTimeout)
proxy := service.NewTCPService(cipherList, nil, &metrics.NoOpMetrics{}, testTimeout, makeLimiter(cipherList))
proxy.SetTargetIPValidator(allowAll)
go proxy.Serve(proxyListener)

Expand Down Expand Up @@ -430,7 +642,7 @@ func BenchmarkTCPMultiplexing(b *testing.B) {
}
replayCache := service.NewReplayCache(service.MaxCapacity)
const testTimeout = 200 * time.Millisecond
proxy := service.NewTCPService(cipherList, &replayCache, &metrics.NoOpMetrics{}, testTimeout)
proxy := service.NewTCPService(cipherList, &replayCache, &metrics.NoOpMetrics{}, testTimeout, makeLimiter(cipherList))
proxy.SetTargetIPValidator(allowAll)
go proxy.Serve(proxyListener)

Expand Down Expand Up @@ -505,7 +717,7 @@ func BenchmarkUDPEcho(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewUDPService(time.Hour, cipherList, &metrics.NoOpMetrics{})
proxy := service.NewUDPService(time.Hour, cipherList, &metrics.NoOpMetrics{}, makeLimiter(cipherList))
proxy.SetTargetIPValidator(allowAll)
go proxy.Serve(proxyConn)

Expand Down Expand Up @@ -554,7 +766,7 @@ func BenchmarkUDPManyKeys(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewUDPService(time.Hour, cipherList, &metrics.NoOpMetrics{})
proxy := service.NewUDPService(time.Hour, cipherList, &metrics.NoOpMetrics{}, makeLimiter(cipherList))
proxy.SetTargetIPValidator(allowAll)
go proxy.Serve(proxyConn)

Expand Down
Loading