diff --git a/cabin/config-public.go b/cabin/config-public.go index c343642..467017e 100644 --- a/cabin/config-public.go +++ b/cabin/config-public.go @@ -96,12 +96,13 @@ var ( func prepPublicHubConfig() error { err := config.Register(&config.Option{ - Name: "Name", - Key: publicCfgOptionNameKey, - Description: "Human readable name of the Hub.", - OptType: config.OptTypeString, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionNameDefault, + Name: "Name", + Key: publicCfgOptionNameKey, + Description: "Human readable name of the Hub.", + OptType: config.OptTypeString, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionNameDefault, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionNameOrder, }, @@ -112,12 +113,13 @@ func prepPublicHubConfig() error { publicCfgOptionName = config.GetAsString(publicCfgOptionNameKey, publicCfgOptionNameDefault) err = config.Register(&config.Option{ - Name: "Group", - Key: publicCfgOptionGroupKey, - Description: "Name of the hub group this Hub belongs to.", - OptType: config.OptTypeString, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionGroupDefault, + Name: "Group", + Key: publicCfgOptionGroupKey, + Description: "Name of the hub group this Hub belongs to.", + OptType: config.OptTypeString, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionGroupDefault, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionGroupOrder, }, @@ -128,12 +130,13 @@ func prepPublicHubConfig() error { publicCfgOptionGroup = config.GetAsString(publicCfgOptionGroupKey, publicCfgOptionGroupDefault) err = config.Register(&config.Option{ - Name: "Contact Address", - Key: publicCfgOptionContactAddressKey, - Description: "Contact address where the Hub operator can be reached.", - OptType: config.OptTypeString, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionContactAddressDefault, + Name: "Contact Address", + Key: publicCfgOptionContactAddressKey, + Description: "Contact address where the Hub operator can be reached.", + OptType: config.OptTypeString, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionContactAddressDefault, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionContactAddressOrder, }, @@ -144,12 +147,13 @@ func prepPublicHubConfig() error { publicCfgOptionContactAddress = config.GetAsString(publicCfgOptionContactAddressKey, publicCfgOptionContactAddressDefault) err = config.Register(&config.Option{ - Name: "Contact Service", - Key: publicCfgOptionContactServiceKey, - Description: "Name of the service the contact address corresponds to, if not email.", - OptType: config.OptTypeString, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionContactServiceDefault, + Name: "Contact Service", + Key: publicCfgOptionContactServiceKey, + Description: "Name of the service the contact address corresponds to, if not email.", + OptType: config.OptTypeString, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionContactServiceDefault, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionContactServiceOrder, }, @@ -160,12 +164,13 @@ func prepPublicHubConfig() error { publicCfgOptionContactService = config.GetAsString(publicCfgOptionContactServiceKey, publicCfgOptionContactServiceDefault) err = config.Register(&config.Option{ - Name: "Hosters", - Key: publicCfgOptionHostersKey, - Description: "List of all involved entities and organisations that are involved in hosting this Hub.", - OptType: config.OptTypeStringArray, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionHostersDefault, + Name: "Hosters", + Key: publicCfgOptionHostersKey, + Description: "List of all involved entities and organisations that are involved in hosting this Hub.", + OptType: config.OptTypeStringArray, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionHostersDefault, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionHostersOrder, }, @@ -176,12 +181,13 @@ func prepPublicHubConfig() error { publicCfgOptionHosters = config.GetAsStringArray(publicCfgOptionHostersKey, publicCfgOptionHostersDefault) err = config.Register(&config.Option{ - Name: "Datacenter", - Key: publicCfgOptionDatacenterKey, - Description: "Identifier of the datacenter this Hub is hosted in.", - OptType: config.OptTypeString, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionDatacenterDefault, + Name: "Datacenter", + Key: publicCfgOptionDatacenterKey, + Description: "Identifier of the datacenter this Hub is hosted in.", + OptType: config.OptTypeString, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionDatacenterDefault, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionDatacenterOrder, }, @@ -192,12 +198,13 @@ func prepPublicHubConfig() error { publicCfgOptionDatacenter = config.GetAsString(publicCfgOptionDatacenterKey, publicCfgOptionDatacenterDefault) err = config.Register(&config.Option{ - Name: "IPv4", - Key: publicCfgOptionIPv4Key, - Description: "IPv4 address of this Hub. Must be globally reachable.", - OptType: config.OptTypeString, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionIPv4Default, + Name: "IPv4", + Key: publicCfgOptionIPv4Key, + Description: "IPv4 address of this Hub. Must be globally reachable.", + OptType: config.OptTypeString, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionIPv4Default, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionIPv4Order, }, @@ -208,12 +215,13 @@ func prepPublicHubConfig() error { publicCfgOptionIPv4 = config.GetAsString(publicCfgOptionIPv4Key, publicCfgOptionIPv4Default) err = config.Register(&config.Option{ - Name: "IPv6", - Key: publicCfgOptionIPv6Key, - Description: "IPv6 address of this Hub. Must be globally reachable.", - OptType: config.OptTypeString, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionIPv6Default, + Name: "IPv6", + Key: publicCfgOptionIPv6Key, + Description: "IPv6 address of this Hub. Must be globally reachable.", + OptType: config.OptTypeString, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionIPv6Default, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionIPv6Order, }, @@ -253,12 +261,13 @@ func prepPublicHubConfig() error { publicCfgOptionTransports = config.GetAsStringArray(publicCfgOptionTransportsKey, publicCfgOptionTransportsDefault) err = config.Register(&config.Option{ - Name: "Entry", - Key: publicCfgOptionEntryKey, - Description: "Define an entry policy. The format is the same for the endpoint lists. Default is permit.", - OptType: config.OptTypeStringArray, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionEntryDefault, + Name: "Entry", + Key: publicCfgOptionEntryKey, + Description: "Define an entry policy. The format is the same for the endpoint lists. Default is permit.", + OptType: config.OptTypeStringArray, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionEntryDefault, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionEntryOrder, config.DisplayHintAnnotation: endpoints.DisplayHintEndpointList, @@ -270,12 +279,13 @@ func prepPublicHubConfig() error { publicCfgOptionEntry = config.GetAsStringArray(publicCfgOptionEntryKey, publicCfgOptionEntryDefault) err = config.Register(&config.Option{ - Name: "Exit", - Key: publicCfgOptionExitKey, - Description: "Define an exit policy. The format is the same for the endpoint lists. Default is permit.", - OptType: config.OptTypeStringArray, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionExitDefault, + Name: "Exit", + Key: publicCfgOptionExitKey, + Description: "Define an exit policy. The format is the same for the endpoint lists. Default is permit.", + OptType: config.OptTypeStringArray, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionExitDefault, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionExitOrder, config.DisplayHintAnnotation: endpoints.DisplayHintEndpointList, @@ -287,12 +297,13 @@ func prepPublicHubConfig() error { publicCfgOptionExit = config.GetAsStringArray(publicCfgOptionExitKey, publicCfgOptionExitDefault) err = config.Register(&config.Option{ - Name: "Allow Unencrypted Connections", - Key: publicCfgOptionAllowUnencryptedKey, - Description: "Advertise that this Hub is available for handling unencrypted connections, as detected by clients.", - OptType: config.OptTypeBool, - ExpertiseLevel: config.ExpertiseLevelExpert, - DefaultValue: publicCfgOptionAllowUnencryptedDefault, + Name: "Allow Unencrypted Connections", + Key: publicCfgOptionAllowUnencryptedKey, + Description: "Advertise that this Hub is available for handling unencrypted connections, as detected by clients.", + OptType: config.OptTypeBool, + ExpertiseLevel: config.ExpertiseLevelExpert, + RequiresRestart: true, + DefaultValue: publicCfgOptionAllowUnencryptedDefault, Annotations: config.Annotations{ config.DisplayOrderAnnotation: publicCfgOptionAllowUnencryptedOrder, }, diff --git a/captain/config.go b/captain/config.go index d72a8fc..68b3c5d 100644 --- a/captain/config.go +++ b/captain/config.go @@ -6,6 +6,7 @@ import ( "github.com/safing/portbase/config" "github.com/safing/portmaster/profile" "github.com/safing/portmaster/profile/endpoints" + "github.com/safing/spn/conf" "github.com/safing/spn/navigator" ) @@ -44,6 +45,12 @@ var ( cfgOptionSpecialAccessCode config.StringOption //nolint:unused // Linter, you drunk? cfgOptionSpecialAccessCodeOrder = 160 + // IPv6 must be global and accessible. + cfgOptionBindToAdvertisedKey = "spn/publicHub/bindToAdvertised" + cfgOptionBindToAdvertised config.BoolOption + cfgOptionBindToAdvertisedDefault = false + cfgOptionBindToAdvertisedOrder = 161 + // Config options for use. cfgOptionRoutingAlgorithm config.StringOption ) @@ -163,6 +170,25 @@ This setting mainly exists for when you need to simulate your presence in anothe } cfgOptionSpecialAccessCode = config.Concurrent.GetAsString(cfgOptionSpecialAccessCodeKey, "") + if conf.PublicHub() { + err = config.Register(&config.Option{ + Name: "Connect From Advertised IPs Only", + Key: cfgOptionBindToAdvertisedKey, + Description: "Only connect from (bind to) the advertised IP addresses.", + OptType: config.OptTypeBool, + ExpertiseLevel: config.ExpertiseLevelExpert, + DefaultValue: cfgOptionBindToAdvertisedDefault, + RequiresRestart: true, + Annotations: config.Annotations{ + config.DisplayOrderAnnotation: cfgOptionBindToAdvertisedOrder, + }, + }) + if err != nil { + return err + } + cfgOptionBindToAdvertised = config.GetAsBool(cfgOptionBindToAdvertisedKey, cfgOptionBindToAdvertisedDefault) + } + // Config options for use. cfgOptionRoutingAlgorithm = config.Concurrent.GetAsString(profile.CfgOptionRoutingAlgorithmKey, navigator.DefaultRoutingProfileID) diff --git a/captain/public.go b/captain/public.go index 41220f5..137abec 100644 --- a/captain/public.go +++ b/captain/public.go @@ -68,6 +68,9 @@ func loadPublicIdentity() (err error) { publicIdentity.Hub.Info.IPv4 != nil, publicIdentity.Hub.Info.IPv6 != nil, ) + if cfgOptionBindToAdvertised() { + conf.SetConnectAddr(publicIdentity.Hub.Info.IPv4, publicIdentity.Hub.Info.IPv6) + } // Set Home Hub before updating the hub on the map, as this would trigger a // recalculation without a Home Hub. diff --git a/conf/networks.go b/conf/networks.go index a46da4a..b08551f 100644 --- a/conf/networks.go +++ b/conf/networks.go @@ -1,6 +1,11 @@ package conf -import "github.com/tevino/abool" +import ( + "net" + "sync" + + "github.com/tevino/abool" +) var ( hubHasV4 = abool.New() @@ -22,3 +27,55 @@ func HubHasIPv4() bool { func HubHasIPv6() bool { return hubHasV6.IsSet() } + +var ( + connectIPv4 net.IP + connectIPv6 net.IP + connectIPLock sync.Mutex +) + +// SetConnectAddr sets the preferred connect (bind) addresses. +func SetConnectAddr(ip4, ip6 net.IP) { + connectIPLock.Lock() + defer connectIPLock.Unlock() + + connectIPv4 = ip4 + connectIPv6 = ip6 +} + +// GetConnectAddr returns an address with the preferred connect (bind) +// addresses for the given dial network. +// The dial network must have a suffix specify the IP version. +func GetConnectAddr(dialNetwork string) net.Addr { + connectIPLock.Lock() + defer connectIPLock.Unlock() + + switch dialNetwork { + case "ip4": + if connectIPv4 != nil { + return &net.IPAddr{IP: connectIPv4} + } + case "ip6": + if connectIPv6 != nil { + return &net.IPAddr{IP: connectIPv6} + } + case "tcp4": + if connectIPv4 != nil { + return &net.TCPAddr{IP: connectIPv4} + } + case "tcp6": + if connectIPv6 != nil { + return &net.TCPAddr{IP: connectIPv6} + } + case "udp4": + if connectIPv4 != nil { + return &net.UDPAddr{IP: connectIPv4} + } + case "udp6": + if connectIPv6 != nil { + return &net.UDPAddr{IP: connectIPv6} + } + } + + return nil +} diff --git a/crew/op_connect.go b/crew/op_connect.go index 4330d27..438d5bf 100644 --- a/crew/op_connect.go +++ b/crew/op_connect.go @@ -40,8 +40,8 @@ type ConnectOp struct { doneWriting chan struct{} // Metrics - incomingTraffic *uint64 - outgoingTraffic *uint64 + incomingTraffic atomic.Uint64 + outgoingTraffic atomic.Uint64 started time.Time // Connection @@ -72,6 +72,27 @@ type ConnectRequest struct { QueueSize uint32 `json:"qs,omitempty"` } +// DialNetwork returns the address of the connect request. +func (r *ConnectRequest) DialNetwork() string { + if ip4 := r.IP.To4(); ip4 != nil { + switch r.Protocol { //nolint:exhaustive // Only looking for supported protocols. + case packet.TCP: + return "tcp4" + case packet.UDP: + return "udp4" + } + } else { + switch r.Protocol { //nolint:exhaustive // Only looking for supported protocols. + case packet.TCP: + return "tcp6" + case packet.UDP: + return "udp6" + } + } + + return "" +} + // Address returns the address of the connext request. func (r *ConnectRequest) Address() string { return net.JoinHostPort(r.IP.String(), strconv.Itoa(int(r.Port))) @@ -136,8 +157,6 @@ func NewConnectOp(tunnel *Tunnel) (*ConnectOp, *terminal.Error) { } // Setup metrics. - op.incomingTraffic = new(uint64) - op.outgoingTraffic = new(uint64) op.started = time.Now() module.StartWorker("connect op conn reader", op.connReader) @@ -167,56 +186,122 @@ func startConnectOp(t terminal.Terminal, opID uint32, data *container.Container) return nil, terminal.ErrInvalidOptions.With("invalid queue size of %d", request.QueueSize) } - // Check if connection target is in global scope. - ipScope := netutils.GetIPScope(request.IP) - if ipScope != netutils.Global { - return nil, terminal.ErrPermissionDenied.With("denied request to connect to non-global IP %s", request.IP) - } - - // Get protocol net for connecting. - var dialNet string - switch request.Protocol { //nolint:exhaustive // Only looking at specific values. - case packet.TCP: - dialNet = "tcp" - case packet.UDP: - dialNet = "udp" - default: - return nil, terminal.ErrIncorrectUsage.With("protocol %s is not supported", request.Protocol) - } - - // Check exit policy. - if tErr := checkExitPolicy(request); tErr != nil { - return nil, tErr - } - - // Connect to destination. - conn, err := net.DialTimeout(dialNet, request.Address(), 3*time.Second) - if err != nil { - return nil, terminal.ErrConnectionError.With("failed to connect to %s: %w", request, err) + // Check if IP seems valid. + if len(request.IP) != net.IPv4len && len(request.IP) != net.IPv6len { + return nil, terminal.ErrInvalidOptions.With("ip address is not valid") } // Create and initialize operation. op := &ConnectOp{ doneWriting: make(chan struct{}), t: t, - conn: conn, request: request, } op.InitOperationBase(t, opID) op.ctx, op.cancelCtx = context.WithCancel(t.Ctx()) op.dfq = terminal.NewDuplexFlowQueue(op.Ctx(), request.QueueSize, op.submitUpstream) - // Setup metrics. - op.incomingTraffic = new(uint64) - op.outgoingTraffic = new(uint64) + // Start worker to complete setting up the connection. + module.StartWorker("connect op setup", op.handleSetup) + + return op, nil +} + +func (op *ConnectOp) handleSetup(_ context.Context) error { + // Get terminal session for rate limiting. + var session *terminal.Session + if sessionTerm, ok := op.t.(terminal.SessionTerminal); ok { + session = sessionTerm.GetSession() + } else { + log.Errorf("spn/crew: %T is not a session terminal, aborting op %s#%d", op.t, op.t.FmtID(), op.ID()) + op.Stop(op, terminal.ErrInternalError.With("no session available")) + return nil + } + + // Limit concurrency of connecting. + cancelErr := session.LimitConcurrency(op.Ctx(), func() { + op.setup(session) + }) + + // If context was canceled, stop operation. + if cancelErr != nil { + op.Stop(op, terminal.ErrCanceled.With(cancelErr.Error())) + } + + // Do not return a worker error. + return nil +} + +func (op *ConnectOp) setup(session *terminal.Session) { + // Rate limit before connecting. + if tErr := session.RateLimit(); tErr != nil { + // Fake connection error when rate limited. + if tErr.Is(terminal.ErrRateLimited) { + op.Stop(op, tErr.With(session.RateLimitInfo())) + return + } + op.Stop(op, tErr) + return + } + + // Check if connection target is in global scope. + ipScope := netutils.GetIPScope(op.request.IP) + if ipScope != netutils.Global { + session.ReportSuspiciousActivity(terminal.SusFactorQuiteUnusual) + op.Stop(op, terminal.ErrPermissionDenied.With("denied request to connect to non-global IP %s", op.request.IP)) + return + } + + // Check exit policy. + if tErr := checkExitPolicy(op.request); tErr != nil { + session.ReportSuspiciousActivity(terminal.SusFactorQuiteUnusual) + op.Stop(op, tErr) + return + } + + // Check one last time before connecting if operation was not canceled. + if op.Ctx().Err() != nil { + op.Stop(op, terminal.ErrCanceled.With(op.Ctx().Err().Error())) + return + } + + // Connect to destination. + dialNet := op.request.DialNetwork() + if dialNet == "" { + session.ReportSuspiciousActivity(terminal.SusFactorCommon) + op.Stop(op, terminal.ErrIncorrectUsage.With("protocol %s is not supported", op.request.Protocol)) + return + } + dialer := &net.Dialer{ + Timeout: 10 * time.Second, + LocalAddr: conf.GetConnectAddr(dialNet), + FallbackDelay: -1, // Disables Fast Fallback from IPv6 to IPv4. + KeepAlive: -1, // Disable keep-alive. + } + conn, err := dialer.DialContext(op.Ctx(), dialNet, op.request.Address()) + if err != nil { + // Connection errors are common, but still a bit suspicious. + var netError net.Error + switch { + case errors.As(err, &netError) && netError.Timeout(): + session.ReportSuspiciousActivity(terminal.SusFactorCommon) + case errors.Is(err, context.Canceled): + session.ReportSuspiciousActivity(terminal.SusFactorCommon) + default: + session.ReportSuspiciousActivity(terminal.SusFactorWeirdButOK) + } + + op.Stop(op, terminal.ErrConnectionError.With("failed to connect to %s: %w", op.request, err)) + return + } + op.conn = conn // Start worker. module.StartWorker("connect op conn reader", op.connReader) module.StartWorker("connect op conn writer", op.connWriter) module.StartWorker("connect op flow handler", op.dfq.FlowHandler) - log.Infof("spn/crew: connected op %s#%d to %s", op.t.FmtID(), op.ID(), request) - return op, nil + log.Infof("spn/crew: connected op %s#%d to %s", op.t.FmtID(), op.ID(), op.request) } func (op *ConnectOp) submitUpstream(msg *terminal.Msg, timeout time.Duration) { @@ -245,7 +330,7 @@ func (op *ConnectOp) connReader(_ context.Context) error { defer func() { atomic.AddInt64(activeConnectOps, -1) connectOpDurationHistogram.UpdateDuration(op.started) - connectOpIncomingDataHistogram.Update(float64(atomic.LoadUint64(op.incomingTraffic))) + connectOpIncomingDataHistogram.Update(float64(op.incomingTraffic.Load())) }() rateLimiter := terminal.NewRateLimiter(rateLimitMaxMbit) @@ -269,7 +354,7 @@ func (op *ConnectOp) connReader(_ context.Context) error { // Submit metrics. connectOpIncomingBytes.Add(n) - inBytes := atomic.AddUint64(op.incomingTraffic, uint64(n)) + inBytes := op.incomingTraffic.Add(uint64(n)) // Rate limit if over threshold. if inBytes > rateLimitThreshold { @@ -308,7 +393,7 @@ func (op *ConnectOp) Deliver(msg *terminal.Msg) *terminal.Error { func (op *ConnectOp) connWriter(_ context.Context) error { // Metrics submitting. defer func() { - connectOpOutgoingDataHistogram.Update(float64(atomic.LoadUint64(op.outgoingTraffic))) + connectOpOutgoingDataHistogram.Update(float64(op.outgoingTraffic.Load())) }() defer func() { @@ -352,7 +437,7 @@ writing: // Submit metrics. connectOpOutgoingBytes.Add(len(data)) - out := atomic.AddUint64(op.outgoingTraffic, uint64(len(data))) + out := op.outgoingTraffic.Add(uint64(len(data))) // Rate limit if over threshold. if out > rateLimitThreshold { @@ -411,17 +496,21 @@ func (op *ConnectOp) HandleStop(err *terminal.Error) (errorToSend *terminal.Erro reportConnectError(err) } - // If the op was ended locally, send all data before closing. - // If the op was ended remotely, don't bother sending remaining data. - if !err.IsExternal() { - // Flushing could mean sending a full buffer of 50000 packets. - op.dfq.Flush(5 * time.Minute) - } + // If the connection has sent or received any data so far, finish the data + // flows as it makes sense. + if op.incomingTraffic.Load() > 0 || op.outgoingTraffic.Load() > 0 { + // If the op was ended locally, send all data before closing. + // If the op was ended remotely, don't bother sending remaining data. + if !err.IsExternal() { + // Flushing could mean sending a full buffer of 50000 packets. + op.dfq.Flush(5 * time.Minute) + } - // If the op was ended remotely, write all remaining received data. - // If the op was ended locally, don't bother writing remaining data. - if err.IsExternal() { - <-op.doneWriting + // If the op was ended remotely, write all remaining received data. + // If the op was ended locally, don't bother writing remaining data. + if err.IsExternal() { + <-op.doneWriting + } } // Cancel workers. @@ -431,7 +520,7 @@ func (op *ConnectOp) HandleStop(err *terminal.Error) (errorToSend *terminal.Erro // error and no data was received. if op.entry && // On clients only. err.Is(terminal.ErrConnectionError) && - atomic.LoadUint64(op.outgoingTraffic) == 0 { + op.outgoingTraffic.Load() == 0 { // Only if no data was received (ie. sent to local application). op.tunnel.avoidDestinationHub() } diff --git a/crew/op_ping.go b/crew/op_ping.go index 3b928b6..06d1f9b 100644 --- a/crew/op_ping.go +++ b/crew/op_ping.go @@ -59,7 +59,7 @@ func NewPingOp(t terminal.Terminal) (*PingOp, *terminal.Error) { // Create operation and init. op := &PingOp{ - started: time.Now(), + started: time.Now().UTC(), nonce: nonce, } op.OneOffOperationBase.Init() @@ -111,7 +111,7 @@ func startPingOp(t terminal.Terminal, opID uint32, data *container.Container) (t // Create response. response, err := dsd.Dump(&PingOpResponse{ Nonce: request.Nonce, - Time: time.Now(), + Time: time.Now().UTC(), }, dsd.CBOR) if err != nil { return nil, terminal.ErrInternalError.With("failed to create ping response: %w", err) diff --git a/docks/crane_terminal.go b/docks/crane_terminal.go index ec0fec7..3787a56 100644 --- a/docks/crane_terminal.go +++ b/docks/crane_terminal.go @@ -12,6 +12,9 @@ import ( type CraneTerminal struct { *terminal.TerminalBase + // Add-Ons + terminal.SessionAddOn + crane *Crane } diff --git a/docks/op_whoami.go b/docks/op_whoami.go new file mode 100644 index 0000000..0dc2492 --- /dev/null +++ b/docks/op_whoami.go @@ -0,0 +1,135 @@ +package docks + +import ( + "time" + + "github.com/safing/portbase/container" + "github.com/safing/portbase/formats/dsd" + "github.com/safing/spn/terminal" +) + +const ( + // WhoAmIType is the type ID of the latency test operation. + WhoAmIType = "whoami" + + whoAmITimeout = 3 * time.Second +) + +// WhoAmIOp is used to request some metadata about the other side. +type WhoAmIOp struct { + terminal.OneOffOperationBase + + response *WhoAmIResponse +} + +// WhoAmIResponse is a whoami response. +type WhoAmIResponse struct { + // Timestamp in nanoseconds + Timestamp int64 `cbor:"t,omitempty" json:"t,omitempty"` + + // Addr is the remote address as reported by the crane terminal (IP and port). + Addr string `cbor:"a,omitempty" json:"a,omitempty"` +} + +// Type returns the type ID. +func (op *WhoAmIOp) Type() string { + return WhoAmIType +} + +func init() { + terminal.RegisterOpType(terminal.OperationFactory{ + Type: WhoAmIType, + Start: startWhoAmI, + }) +} + +// WhoAmI executes a whoami operation and returns the response. +func WhoAmI(t terminal.Terminal) (*WhoAmIResponse, *terminal.Error) { + whoami, err := NewWhoAmIOp(t) + if err.IsError() { + return nil, err + } + + // Wait for response. + select { + case tErr := <-whoami.Result: + if tErr.IsError() { + return nil, tErr + } + return whoami.response, nil + case <-time.After(whoAmITimeout * 2): + return nil, terminal.ErrTimeout + } +} + +// NewWhoAmIOp starts a new whoami operation. +func NewWhoAmIOp(t terminal.Terminal) (*WhoAmIOp, *terminal.Error) { + // Create operation and init. + op := &WhoAmIOp{} + op.OneOffOperationBase.Init() + + // Send ping. + tErr := t.StartOperation(op, nil, whoAmITimeout) + if tErr != nil { + return nil, tErr + } + + return op, nil +} + +// Deliver delivers a message to the operation. +func (op *WhoAmIOp) Deliver(msg *terminal.Msg) *terminal.Error { + defer msg.Finish() + + // Parse response. + response := &WhoAmIResponse{} + _, err := dsd.Load(msg.Data.CompileData(), response) + if err != nil { + return terminal.ErrMalformedData.With("failed to parse ping response: %w", err) + } + + op.response = response + return terminal.ErrExplicitAck +} + +func startWhoAmI(t terminal.Terminal, opID uint32, data *container.Container) (terminal.Operation, *terminal.Error) { + // Get crane terminal, if available. + ct, _ := t.(*CraneTerminal) + + // Create response. + r := &WhoAmIResponse{ + Timestamp: time.Now().UnixNano(), + } + if ct != nil { + r.Addr = ct.RemoteAddr().String() + } + response, err := dsd.Dump(r, dsd.CBOR) + if err != nil { + return nil, terminal.ErrInternalError.With("failed to create whoami response: %w", err) + } + + // Send response. + msg := terminal.NewMsg(response) + msg.FlowID = opID + msg.Unit.MakeHighPriority() + if terminal.UsePriorityDataMsgs { + msg.Type = terminal.MsgTypePriorityData + } + tErr := t.Send(msg, whoAmITimeout) + if tErr != nil { + // Finish message unit on failure. + msg.Finish() + return nil, tErr.With("failed to send ping response") + } + + // Operation is just one response and finished successfully. + return nil, nil +} + +// HandleStop gives the operation the ability to cleanly shut down. +// The returned error is the error to send to the other side. +// Should never be called directly. Call Stop() instead. +func (op *WhoAmIOp) HandleStop(err *terminal.Error) (errorToSend *terminal.Error) { + // Continue with usual handling of inherited base. + return op.OneOffOperationBase.HandleStop(err) +} diff --git a/docks/op_whoami_test.go b/docks/op_whoami_test.go new file mode 100644 index 0000000..01650ec --- /dev/null +++ b/docks/op_whoami_test.go @@ -0,0 +1,24 @@ +package docks + +import ( + "testing" + + "github.com/safing/spn/terminal" +) + +func TestWhoAmIOp(t *testing.T) { + t.Parallel() + + // Create test terminal pair. + a, _, err := terminal.NewSimpleTestTerminalPair(0, 0, nil) + if err != nil { + t.Fatalf("failed to create test terminal pair: %s", err) + } + + // Run op. + resp, tErr := WhoAmI(a) + if tErr.IsError() { + t.Fatal(tErr) + } + t.Logf("whoami: %+v", resp) +} diff --git a/patrol/domains.go b/patrol/domains.go index 5f125f9..43fff82 100644 --- a/patrol/domains.go +++ b/patrol/domains.go @@ -8,8 +8,8 @@ import ( // getRandomTestDomain returns a random test domain from the test domain list. // Not cryptographically secure random, though. func getRandomTestDomain() string { - rand.Seed(time.Now().UnixNano()) - return testDomains[rand.Intn(len(testDomains)-1)] //nolint:gosec // Weak randomness is not an issue here. + rng := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec + return testDomains[rng.Intn(len(testDomains)-1)] //nolint:gosec // Weak randomness is not an issue here. } // testDomains is a list of domains to check if they respond successfully to a HTTP GET request. diff --git a/patrol/http.go b/patrol/http.go index c08974b..d361c60 100644 --- a/patrol/http.go +++ b/patrol/http.go @@ -148,6 +148,7 @@ func CheckHTTPSConnection(ctx context.Context, network, domain string) (statusCo } dialer := &net.Dialer{ Timeout: 15 * time.Second, + LocalAddr: conf.GetConnectAddr(network), FallbackDelay: -1, // Disables Fast Fallback from IPv6 to IPv4. KeepAlive: -1, // Disable keep-alive. } diff --git a/ships/http.go b/ships/http.go index 1412025..ced7c65 100644 --- a/ships/http.go +++ b/ships/http.go @@ -7,8 +7,10 @@ import ( "io" "net" "net/http" + "time" "github.com/safing/portbase/log" + "github.com/safing/spn/conf" "github.com/safing/spn/hub" ) @@ -67,10 +69,19 @@ func launchHTTPShip(ctx context.Context, transport *hub.Transport, ip net.IP) (S request.Header.Set("Upgrade", "SPN") // Create connection. - conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{ - IP: ip, - Port: int(transport.Port), - }) + var dialNet string + if ip4 := ip.To4(); ip4 != nil { + dialNet = "tcp4" + } else { + dialNet = "tcp6" + } + dialer := &net.Dialer{ + Timeout: 30 * time.Second, + LocalAddr: conf.GetConnectAddr(dialNet), + FallbackDelay: -1, // Disables Fast Fallback from IPv6 to IPv4. + KeepAlive: -1, // Disable keep-alive. + } + conn, err := dialer.DialContext(ctx, dialNet, net.JoinHostPort(ip.String(), portToA(transport.Port))) if err != nil { return nil, fmt.Errorf("failed to connect: %w", err) } diff --git a/ships/tcp.go b/ships/tcp.go index 0f9df71..546802d 100644 --- a/ships/tcp.go +++ b/ships/tcp.go @@ -2,9 +2,11 @@ package ships import ( "context" + "fmt" "net" "time" + "github.com/safing/spn/conf" "github.com/safing/spn/hub" ) @@ -26,12 +28,21 @@ func init() { } func launchTCPShip(ctx context.Context, transport *hub.Transport, ip net.IP) (Ship, error) { + var dialNet string + if ip4 := ip.To4(); ip4 != nil { + dialNet = "tcp4" + } else { + dialNet = "tcp6" + } dialer := &net.Dialer{ - Timeout: 30 * time.Second, + Timeout: 30 * time.Second, + LocalAddr: conf.GetConnectAddr(dialNet), + FallbackDelay: -1, // Disables Fast Fallback from IPv6 to IPv4. + KeepAlive: -1, // Disable keep-alive. } - conn, err := dialer.DialContext(ctx, "tcp", net.JoinHostPort(ip.String(), portToA(transport.Port))) + conn, err := dialer.DialContext(ctx, dialNet, net.JoinHostPort(ip.String(), portToA(transport.Port))) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to connect: %w", err) } ship := &TCPShip{ diff --git a/terminal/errors.go b/terminal/errors.go index cbddaa8..619bf18 100644 --- a/terminal/errors.go +++ b/terminal/errors.go @@ -206,6 +206,7 @@ var ( ErrIntegrity = registerError(14, errors.New("integrity violated")) ErrInvalidOptions = registerError(15, errors.New("invalid options")) ErrHubNotReady = registerError(16, errors.New("hub not ready")) + ErrRateLimited = registerError(24, errors.New("rate limited")) ErrIncorrectUsage = registerError(22, errors.New("incorrect usage")) ErrTimeout = registerError(62, errors.New("timed out")) ErrUnsupportedVersion = registerError(93, errors.New("unsupported version")) diff --git a/terminal/operation.go b/terminal/operation.go index 01946a4..100936e 100644 --- a/terminal/operation.go +++ b/terminal/operation.go @@ -239,7 +239,7 @@ func (t *TerminalBase) StopOperation(op Operation, err *Error) { switch { case err == nil: log.Debugf("spn/terminal: operation %s %s stopped", op.Type(), fmtOperationID(t.parentID, t.id, op.ID())) - case err.IsOK() || err.Is(ErrTryAgainLater): + case err.IsOK(), err.Is(ErrTryAgainLater), err.Is(ErrRateLimited): log.Debugf("spn/terminal: operation %s %s stopped: %s", op.Type(), fmtOperationID(t.parentID, t.id, op.ID()), err) default: log.Warningf("spn/terminal: operation %s %s failed: %s", op.Type(), fmtOperationID(t.parentID, t.id, op.ID()), err) diff --git a/terminal/session.go b/terminal/session.go new file mode 100644 index 0000000..61dee28 --- /dev/null +++ b/terminal/session.go @@ -0,0 +1,158 @@ +package terminal + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/safing/portbase/log" +) + +const ( + rateLimitMinOps = 100 + rateLimitMaxOpsPerSecond = 20 // TODO: Reduce to 10 after test phase. + + rateLimitMinSuspicion = 25 + rateLimitMaxSuspicionPerSecond = 2 // TODO: Reduce to 1 after test phase. + + // Make this big enough to trigger suspicion limit in first blast. + concurrencyPoolSize = 30 +) + +// Session holds terminal metadata for operations. +type Session struct { + sync.RWMutex + + // Rate Limiting. + + // started holds the unix timestamp in seconds when the session was started. + // It is set when the Session is created and may be treated as a constant. + started int64 + + // opCount is the amount of operations started. + opCount atomic.Int64 + + // suspicionScore holds a score of suspicious activity. + // Every suspicious operations is counted as at least 1. + // Rate limited operations because of suspicion are also counted as 1. + suspicionScore atomic.Int64 + + concurrencyPool chan struct{} +} + +// SessionTerminal is an interface for terminals that support authorization. +type SessionTerminal interface { + GetSession() *Session +} + +// SessionAddOn can be inherited by terminals to add support for sessions. +type SessionAddOn struct { + lock sync.Mutex + + // session holds the terminal session. + session *Session +} + +// GetSession returns the terminal's session. +func (t *SessionAddOn) GetSession() *Session { + t.lock.Lock() + defer t.lock.Unlock() + + // Create session if it does not exist. + if t.session == nil { + t.session = NewSession() + } + + return t.session +} + +// NewSession returns a new session. +func NewSession() *Session { + return &Session{ + started: time.Now().Unix() - 1, // Ensure a 1 second difference to current time. + concurrencyPool: make(chan struct{}, concurrencyPoolSize), + } +} + +// RateLimitInfo returns some basic information about the status of the rate limiter. +func (s *Session) RateLimitInfo() string { + secondsActive := time.Now().Unix() - s.started + + return fmt.Sprintf( + "%do/s %ds/s %ds", + s.opCount.Load()/secondsActive, + s.suspicionScore.Load()/secondsActive, + secondsActive, + ) +} + +// RateLimit enforces a rate and suspicion limit. +func (s *Session) RateLimit() *Error { + secondsActive := time.Now().Unix() - s.started + + // Check the suspicion limit. + score := s.suspicionScore.Load() + if score > rateLimitMinSuspicion { + scorePerSecond := score / secondsActive + if scorePerSecond >= rateLimitMaxSuspicionPerSecond { + // Add current try to suspicion score. + s.suspicionScore.Add(1) + + return ErrRateLimited + } + } + + // Check the rate limit. + count := s.opCount.Add(1) + if count > rateLimitMinOps { + opsPerSecond := count / secondsActive + if opsPerSecond >= rateLimitMaxOpsPerSecond { + return ErrRateLimited + } + } + + return nil +} + +// Suspicion Factors. +const ( + SusFactorCommon = 1 + SusFactorWeirdButOK = 5 + SusFactorQuiteUnusual = 10 + SusFactorMustBeMalicious = 100 +) + +// ReportSuspiciousActivity reports suspicious activity of the terminal. +func (s *Session) ReportSuspiciousActivity(factor int64) { + s.suspicionScore.Add(factor) +} + +// LimitConcurrency limits concurrent executions. +// If over the limit, waiting goroutines are selected randomly. +// It returns the context error if it was canceled. +func (s *Session) LimitConcurrency(ctx context.Context, f func()) error { + // Wait for place in pool. + select { + case <-ctx.Done(): + return ctx.Err() + case s.concurrencyPool <- struct{}{}: + // We added our entry to the pool, continue with execution. + } + + // Drain own spot if pool after execution. + defer func() { + select { + case <-s.concurrencyPool: + // Own entry drained. + default: + // This should never happen, but let's play safe and not deadlock when pool is empty. + log.Warningf("spn/session: failed to drain own entry from concurrency pool") + } + }() + + // Execute and return. + f() + return nil +} diff --git a/terminal/session_test.go b/terminal/session_test.go new file mode 100644 index 0000000..269aa52 --- /dev/null +++ b/terminal/session_test.go @@ -0,0 +1,92 @@ +package terminal + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRateLimit(t *testing.T) { + t.Parallel() + + var tErr *Error + s := NewSession() + + // Everything should be okay within the min limit. + for i := 0; i < rateLimitMinOps; i++ { + tErr = s.RateLimit() + if tErr != nil { + t.Error("should not rate limit within min limit") + } + } + + // Somewhere here we should rate limiting. + for i := 0; i < rateLimitMaxOpsPerSecond; i++ { + tErr = s.RateLimit() + } + assert.ErrorIs(t, tErr, ErrRateLimited, "should rate limit") +} + +func TestSuspicionLimit(t *testing.T) { + t.Parallel() + + var tErr *Error + s := NewSession() + + // Everything should be okay within the min limit. + for i := 0; i < rateLimitMinSuspicion; i++ { + tErr = s.RateLimit() + if tErr != nil { + t.Error("should not rate limit within min limit") + } + s.ReportSuspiciousActivity(SusFactorCommon) + } + + // Somewhere here we should rate limiting. + for i := 0; i < rateLimitMaxSuspicionPerSecond; i++ { + tErr = s.RateLimit() + s.ReportSuspiciousActivity(SusFactorCommon) + } + assert.ErrorIs(t, tErr, ErrRateLimited, "should rate limit") +} + +func TestConcurrencyLimit(t *testing.T) { + t.Parallel() + + s := NewSession() + started := time.Now() + wg := sync.WaitGroup{} + workTime := 1 * time.Millisecond + workers := concurrencyPoolSize * 10 + + // Start many workers to test concurrency. + wg.Add(workers) + for i := 0; i < workers; i++ { + workerNum := i + go func() { + defer func() { + _ = recover() + }() + _ = s.LimitConcurrency(context.Background(), func() { + time.Sleep(workTime) + wg.Done() + + // Panic sometimes. + if workerNum%concurrencyPoolSize == 0 { + panic("test") + } + }) + }() + } + + // Wait and check time needed. + wg.Wait() + if time.Since(started) < (time.Duration(workers) * workTime / concurrencyPoolSize) { + t.Errorf("workers were too quick - only took %s", time.Since(started)) + } else { + t.Logf("workers were correctly limited - took %s", time.Since(started)) + } +} diff --git a/testing/simple/config-template.json b/testing/simple/config-template.json index 0bdfd2a..c9baca7 100644 --- a/testing/simple/config-template.json +++ b/testing/simple/config-template.json @@ -11,8 +11,9 @@ "spn": { "publicHub": { "name": "test-$HUBNAME", - "transports": ["http:80", "tcp:17"], - "allowUnencrypted": true + "transports": ["http:80", "http:8080", "tcp:17"], + "allowUnencrypted": true, + "bindToAdvertised": true } } } diff --git a/testing/simple/inject-intel.sh b/testing/simple/inject-intel.sh index 02dc115..a57cd72 100755 --- a/testing/simple/inject-intel.sh +++ b/testing/simple/inject-intel.sh @@ -2,7 +2,7 @@ cd "$( dirname "${BASH_SOURCE[0]}" )" -MAIN_INTEL_FILE="main-intel.json" +MAIN_INTEL_FILE="intel-testnet.json" if [[ ! -f $MAIN_INTEL_FILE ]]; then echo "missing $MAIN_INTEL_FILE" diff --git a/testing/simple/intel-client.yaml b/testing/simple/intel-client.yaml new file mode 100644 index 0000000..28f0a68 --- /dev/null +++ b/testing/simple/intel-client.yaml @@ -0,0 +1,25 @@ +# Get current list of IDs from test net: +# curl http://127.0.0.1:817/api/v1/spn/map/test/pins | jq ".[] | .ID" +# Then import into test client with: +# curl -X POST --upload-file intel-client.yaml http://127.0.0.1:817/api/v1/spn/map/test/intel/update +Hubs: + Zwm48YWWFGdwXjhE1MyEkWfqxPr9DiUBoXpusTZ1FMQnuK: + Trusted: true + Zwu5LkkbfCbAcYxWG3vtWF1VvWjgWpc1GJfkwRdLFNtytV: + Trusted: true + ZwuQpz5CqYmYoLnt9KXQ8oxnmosBzfrCYwCGhxT4fsG1Dz: + Trusted: true + ZwwmC3dHzr7J6XW9mc2KD6FDNuXwPVJUFi9dLnDSNMyjLk: + Trusted: true + ZwxSBdvqtJyz8zRWKZe6QyK51KH9av6VFay2GQvpFrWKHq: + Trusted: true + ZwxnuL6zMLj4AxJX8Bj369w2tNrVtYxzffVcXZuMxdxbGj: + Trusted: true + ZwyXdnC8JkC7m796skGD7QWGoYycByR3KVntkXMY8CxRZQ: + Trusted: true + Zwz7AHiH1EevD9eYFqvQQPbVWyBBcksTRxxafbRx5Cvc4F: + Trusted: true + ZwzMtc65t9XBMwmLm2xNSL69FvqHGPLiqeNBZ3eEN5a9sS: + Trusted: true + ZwzjnCUNGsuWnkYmN3QEj8JPLxU6V1QQFk9b47AigmPKiH: + Trusted: true diff --git a/testing/simple/main-intel.json b/testing/simple/intel-testnet.json similarity index 100% rename from testing/simple/main-intel.json rename to testing/simple/intel-testnet.json diff --git a/testing/simple/stop.sh b/testing/simple/stop.sh index 81fd8a4..f5af89a 100755 --- a/testing/simple/stop.sh +++ b/testing/simple/stop.sh @@ -2,8 +2,8 @@ cd "$( dirname "${BASH_SOURCE[0]}" )" -docker-compose -p spn-test-simple stop -docker-compose -p spn-test-simple rm +docker compose -p spn-test-simple stop +docker compose -p spn-test-simple rm oldnet=$(docker network ls | grep spn-test-simple | cut -d" " -f1) if [[ $oldnet != "" ]]; then diff --git a/unit/unit_test.go b/unit/unit_test.go index 06af6d6..8f5a5ac 100644 --- a/unit/unit_test.go +++ b/unit/unit_test.go @@ -13,7 +13,9 @@ import ( ) func TestUnit(t *testing.T) { //nolint:paralleltest - rand.Seed(time.Now().UnixNano()) + // Ignore deprectation, as the given alternative is not safe for concurrent use. + // The global rand methods use a locked seed, which is not available from outside. + rand.Seed(time.Now().UnixNano()) //nolint size := 1000000 workers := 100