diff --git a/src/VERSION b/src/VERSION index 7a8b5d13b..be46f5131 100755 --- a/src/VERSION +++ b/src/VERSION @@ -1,44 +1,14 @@ #!/bin/sh # program: dpvs -# Dec 19, 2023 # +# Mar 12, 2024 # ## -# Features -# - New tool: **dpvs-agent**, a management daemon tool for dpvs based on OpenAPI. -# - New tool: **healthcheck**, a service health check daemon tool cooperating with dpvs-agent. -# - Dpvs: Develop **passive health check** methods for tcp and bidirectional udp backends. -# - Dpvs: Add supports for **Proxy Protocol** with both v1 and v2 versions. -# - Dpvs: Add supports for extended statistics of ethernet devices. -# - Dpvs: Add configuration file and dpip supports for allmulticast setting switch. -# - Build: Transfer all build configurations to a top-level file `config.mk`. -# - Containerization: Draft a Dockerfile and a tutorial document to build and run dpvs in container. -# # Bugfixes -# - Dpvs: Protect toa from source address spoofing attack and increase success ratio for source address delievery via toa. -# - Dpvs: Adjust tcp window scale in outbound direction for synproxy to improve throughput in bulk upload cases. -# - Dpvs: Fix timer inaccuracy problem when timing over 524s. -# - Dpvs: Fix the crash problem caused by ether address list buffer overflow. -# - Dpvs: Fix the crash problem caused by dividing by zero when bonding slaves attempt to send packets out. -# - Dpvs: Fix the crash problem caused by inconsistent data structures of `dp_vs_dest_compat` between dpvs and keepalived. -# - Dpvs: Correct ipo option length for judgement of branching to standalone uoa. -# - Dpvs: Inhibit setting multicast ether address from slave lcores. -# - Dpvs: Fix service flag conflicts of synproxy and expire-quiescent. -# - Dpvs: Fix the chaos use of flag, flags and fwdmode in dest and service structures. -# - Dpvs: Fix service flush function not usable problem. -# - Dpvs: Fix invalid port problem when getting verbose information of netif devices. -# - Dpvs: Use atomic operation to generate packet id for ipv4 header. -# - Dpvs: Remove fragile implementations of strategy routing for snat. -# - Dpvs: Remove the stale config item "ipc_msg/unix_domain". -# - Keepalived: Do not delete and re-add vs/rs to eliminate service disturbances at reload. -# - Keepalived: Fix a carsh problem caused by missing definition of allowlist/denylist config items. -# - Ipvsadm: Add `conn-timeout` configuration option for service. -# - Ipvsadm: Fix the ambiguous use of '-Y' configuration option. -# - Ipvsadm: Fix icmpv6 configuration option `-1` lost problem.. -# - Ipvsadm: Update help text, including supported schedulers, laddr and allow/deny ip list. -# - Dpip: Fix line break problem in help message. -# - Uoa: Enable ipv6 with a macro for uoa example server. +# - tools: Fix concurrency problem between dpvs-agent and healthcheck in editing realserver . +# - tools/dpvs-agent: Add the snapshot cache. +# - tools/healthchech: Fix occasionally arising bad icmp checksum problem for udp and udpping checkers. # export VERSION=1.9 -export RELEASE=6 +export RELEASE=7 echo $VERSION-$RELEASE diff --git a/src/ipvs/ip_vs_dest.c b/src/ipvs/ip_vs_dest.c index 8564c6ebe..b5e0c8575 100644 --- a/src/ipvs/ip_vs_dest.c +++ b/src/ipvs/ip_vs_dest.c @@ -74,6 +74,10 @@ static void __dp_vs_dest_update(struct dp_vs_service *svc, int conn_flags; rte_atomic16_set(&dest->weight, udest->weight); + if (udest->flags & DPVS_DEST_F_INHIBITED) + dp_vs_dest_set_inhibited(dest); + else + dp_vs_dest_clear_inhibited(dest); conn_flags = udest->conn_flags | DPVS_CONN_F_INACTIVE; dest->fwdmode = udest->fwdmode; rte_atomic16_set(&dest->conn_flags, conn_flags); diff --git a/tools/dpvs-agent/cmd/dpvs-agent-server/local_init.go b/tools/dpvs-agent/cmd/dpvs-agent-server/local_init.go index f5a8a6968..b8bb050e5 100644 --- a/tools/dpvs-agent/cmd/dpvs-agent-server/local_init.go +++ b/tools/dpvs-agent/cmd/dpvs-agent-server/local_init.go @@ -32,15 +32,16 @@ func (agent *DpvsAgentServer) LocalLoad(cp *pool.ConnPool, parentLogger hclog.Lo logger = parentLogger.Named("LoadConfigFile") } - snapshot := settings.ShareSnapshot() - if err := snapshot.LoadFrom(settings.LocalConfigFile(), logger); err != nil { + nodeSnap := settings.ShareSnapshot() + if err := nodeSnap.LoadFrom(settings.LocalConfigFile(), logger); err != nil { return err } - announcePort := snapshot.NodeSpec.AnnouncePort - laddrs := snapshot.NodeSpec.Laddrs + announcePort := nodeSnap.NodeSpec.AnnouncePort + laddrs := nodeSnap.NodeSpec.Laddrs - for _, service := range snapshot.Services { + for _, snap := range nodeSnap.Snapshot { + service := snap.Service // 1> ipvsadm -A vip:port -s wrr vs := types.NewVirtualServerSpec() vs.SetAddr(service.Addr) diff --git a/tools/dpvs-agent/cmd/ipvs/delete_vs_vip_port.go b/tools/dpvs-agent/cmd/ipvs/delete_vs_vip_port.go index 03a0f7a6c..9e0429e08 100644 --- a/tools/dpvs-agent/cmd/ipvs/delete_vs_vip_port.go +++ b/tools/dpvs-agent/cmd/ipvs/delete_vs_vip_port.go @@ -45,14 +45,21 @@ func (h *delVsItem) Handle(params apiVs.DeleteVsVipPortParams) middleware.Respon return apiVs.NewDeleteVsVipPortFailure() } + shareSnapshot := settings.ShareSnapshot() + snapshot := shareSnapshot.SnapshotGet(params.VipPort) + if snapshot != nil { + snapshot.Lock() + defer snapshot.Unlock() + } + result := vs.Del(h.connPool, h.logger) switch result { case types.EDPVS_OK: - settings.ShareSnapshot().ServiceDel(params.VipPort) + shareSnapshot.ServiceDel(params.VipPort) h.logger.Info("Del virtual server success.", "VipPort", params.VipPort) return apiVs.NewDeleteVsVipPortOK() case types.EDPVS_NOTEXIST: - settings.ShareSnapshot().ServiceDel(params.VipPort) + shareSnapshot.ServiceDel(params.VipPort) h.logger.Warn("Del a not exist virtual server done.", "VipPort", params.VipPort, "result", result.String()) return apiVs.NewDeleteVsVipPortNotFound() default: diff --git a/tools/dpvs-agent/cmd/ipvs/get_vs.go b/tools/dpvs-agent/cmd/ipvs/get_vs.go index d9cd5b798..e3247cacd 100644 --- a/tools/dpvs-agent/cmd/ipvs/get_vs.go +++ b/tools/dpvs-agent/cmd/ipvs/get_vs.go @@ -40,18 +40,28 @@ func NewGetVs(cp *pool.ConnPool, parentLogger hclog.Logger) *getVs { } func (h *getVs) Handle(params apiVs.GetVsParams) middleware.Responder { + shareSnapshot := settings.ShareSnapshot() + if params.Healthcheck != nil && !*params.Healthcheck { + return apiVs.NewGetVsOK().WithPayload(shareSnapshot.GetModels(h.logger)) + } + + // if params.Snapshot != nil && *params.Snapshot { + // shareSnapshot.DumpTo(settings.LocalConfigFile(), h.logger) + // } + front := types.NewVirtualServerFront() vss, err := front.Get(h.connPool, h.logger) if err != nil { h.logger.Error("Get virtual server list failed.", "Error", err.Error()) - // FIXME: Invalid - return apiVs.NewGetVsOK() + return apiVs.NewGetVsNoContent() } - shareSnapshot := settings.ShareSnapshot() + vsModels := models.VirtualServerList{ + Items: make([]*models.VirtualServerSpecExpand, len(vss)), + } h.logger.Info("Get all virtual server done.", "vss", vss) - for _, vs := range vss { + for i, vs := range vss { front := types.NewRealServerFront() err := front.ParseVipPortProto(vs.ID()) @@ -69,45 +79,18 @@ func (h *getVs) Handle(params apiVs.GetVsParams) middleware.Responder { h.logger.Info("Get real server list of virtual server success.", "ID", vs.ID(), "rss", rss) - vsModel := vs.GetModel() - vsStats := (*types.ServerStats)(vsModel.Stats) - vsModel.RSs = new(models.RealServerExpandList) - vsModel.RSs.Items = make([]*models.RealServerSpecExpand, len(rss)) + vsModels.Items[i] = vs.GetModel() + vsStats := (*types.ServerStats)(vsModels.Items[i].Stats) + vsModels.Items[i].RSs = new(models.RealServerExpandList) + vsModels.Items[i].RSs.Items = make([]*models.RealServerSpecExpand, len(rss)) for j, rs := range rss { rsModel := rs.GetModel() rsStats := (*types.ServerStats)(rsModel.Stats) - vsModel.RSs.Items[j] = rsModel + vsModels.Items[i].RSs.Items[j] = rsModel vsStats.Increase(rsStats) } - - if shareSnapshot.NodeSpec.Laddrs == nil { - laddr := types.NewLocalAddrFront() - if err := laddr.ParseVipPortProto(vs.ID()); err != nil { - // FIXME: Invalid - return apiVs.NewGetVsOK() - } - - laddrs, err := laddr.Get(h.connPool, h.logger) - if err != nil { - // FIXME: Invalid - return apiVs.NewGetVsOK() - } - - shareSnapshot.NodeSpec.Laddrs = new(models.LocalAddressExpandList) - laddrModels := shareSnapshot.NodeSpec.Laddrs - laddrModels.Items = make([]*models.LocalAddressSpecExpand, len(laddrs)) - for k, lip := range laddrs { - laddrModels.Items[k] = lip.GetModel() - } - } - - shareSnapshot.ServiceUpsert(vsModel) - } - - if params.Snapshot != nil && *params.Snapshot { - shareSnapshot.DumpTo(settings.LocalConfigFile(), h.logger) } - return apiVs.NewGetVsOK().WithPayload(shareSnapshot.GetModels(h.logger)) + return apiVs.NewGetVsOK().WithPayload(&vsModels) } diff --git a/tools/dpvs-agent/cmd/ipvs/get_vs_vip_port.go b/tools/dpvs-agent/cmd/ipvs/get_vs_vip_port.go index 91c7bc60c..da9373711 100644 --- a/tools/dpvs-agent/cmd/ipvs/get_vs_vip_port.go +++ b/tools/dpvs-agent/cmd/ipvs/get_vs_vip_port.go @@ -15,6 +15,8 @@ package ipvs import ( + "strings" + "github.com/dpvs-agent/models" "github.com/dpvs-agent/pkg/ipc/pool" "github.com/dpvs-agent/pkg/ipc/types" @@ -40,10 +42,33 @@ func NewGetVsVipPort(cp *pool.ConnPool, parentLogger hclog.Logger) *getVsVipPort } func (h *getVsVipPort) Handle(params apiVs.GetVsVipPortParams) middleware.Responder { + shareSnapshot := settings.ShareSnapshot() + if params.Healthcheck != nil && !*params.Healthcheck { + vsModel := shareSnapshot.ServiceGet(params.VipPort) + if vsModel != nil { + vsModels := new(models.VirtualServerList) + vsModels.Items = make([]*models.VirtualServerSpecExpand, 1) + vsModels.Items[0] = vsModel + return apiVs.NewGetVsVipPortOK().WithPayload(vsModels) + } + } + + vaild := true var vss []*types.VirtualServerSpec spec := types.NewVirtualServerSpec() err := spec.ParseVipPortProto(params.VipPort) if err != nil { + vaild = false + if params.Healthcheck != nil && !*params.Healthcheck { + // invalid VipPort string + // respond full cache info + vsModels := shareSnapshot.GetModels(h.logger) + if len(vsModels.Items) != 0 { + return apiVs.NewGetVsVipPortOK().WithPayload(vsModels) + } + // read from dpvs memory + } + h.logger.Warn("Convert to virtual server failed. Get All virtual server.", "VipPort", params.VipPort, "Error", err.Error()) front := types.NewVirtualServerFront() vss, err = front.Get(h.connPool, h.logger) @@ -56,10 +81,9 @@ func (h *getVsVipPort) Handle(params apiVs.GetVsVipPortParams) middleware.Respon return apiVs.NewGetVsVipPortNotFound() } - shareSnapshot := settings.ShareSnapshot() - - vsModels := new(models.VirtualServerList) - vsModels.Items = make([]*models.VirtualServerSpecExpand, len(vss)) + vsModels := &models.VirtualServerList{ + Items: make([]*models.VirtualServerSpecExpand, len(vss)), + } for i, vs := range vss { front := types.NewRealServerFront() @@ -80,20 +104,32 @@ func (h *getVsVipPort) Handle(params apiVs.GetVsVipPortParams) middleware.Respon h.logger.Info("Get real server list of virtual server success.", "ID", vs.ID(), "rss", rss) vsModel := vs.GetModel() - shareSnapshot.ServiceUpsert(vsModel) - // vsModel.Version = shareSnapshot.ServiceVersion(vs.ID()) vsModels.Items[i] = vsModel - vsStats := (*types.ServerStats)(vsModels.Items[i].Stats) - vsModels.Items[i].RSs = new(models.RealServerExpandList) - vsModels.Items[i].RSs.Items = make([]*models.RealServerSpecExpand, len(rss)) + vsStats := (*types.ServerStats)(vsModel.Stats) + vsModel.RSs = new(models.RealServerExpandList) + vsModel.RSs.Items = make([]*models.RealServerSpecExpand, len(rss)) for j, rs := range rss { rsModel := rs.GetModel() rsStats := (*types.ServerStats)(rsModel.Stats) - vsModels.Items[i].RSs.Items[j] = rsModel + vsModel.RSs.Items[j] = rsModel vsStats.Increase(rsStats) } } + if vaild { + targetModels := &models.VirtualServerList{ + Items: make([]*models.VirtualServerSpecExpand, 1), + } + + for _, vsModel := range vsModels.Items { + typesVsModel := (*types.VirtualServerSpecExpandModel)(vsModel) + if strings.EqualFold(spec.ID(), typesVsModel.ID()) { + targetModels.Items[0] = vsModel + return apiVs.NewGetVsVipPortOK().WithPayload(targetModels) + } + } + } + return apiVs.NewGetVsVipPortOK().WithPayload(vsModels) } diff --git a/tools/dpvs-agent/cmd/ipvs/post_vs_vip_port_rs.go b/tools/dpvs-agent/cmd/ipvs/post_vs_vip_port_rs.go index aa4c98506..80bd06a39 100644 --- a/tools/dpvs-agent/cmd/ipvs/post_vs_vip_port_rs.go +++ b/tools/dpvs-agent/cmd/ipvs/post_vs_vip_port_rs.go @@ -15,7 +15,9 @@ package ipvs import ( - // "github.com/dpvs-agent/models" + "strings" + + "github.com/dpvs-agent/models" "github.com/dpvs-agent/pkg/ipc/pool" "github.com/dpvs-agent/pkg/ipc/types" "github.com/dpvs-agent/pkg/settings" @@ -46,6 +48,10 @@ func (h *postVsRs) Handle(params apiVs.PostVsVipPortRsParams) middleware.Respond return apiVs.NewPostVsVipPortRsInvalidFrontend() } + if params.Rss == nil || params.Rss.Items == nil { + return apiVs.NewPostVsVipPortRsInvalidFrontend() + } + rss := make([]*types.RealServerSpec, len(params.Rss.Items)) for i, rs := range params.Rss.Items { var fwdmode types.DpvsFwdMode @@ -56,15 +62,58 @@ func (h *postVsRs) Handle(params apiVs.PostVsVipPortRsParams) middleware.Respond rss[i].SetWeight(uint32(rs.Weight)) rss[i].SetProto(front.GetProto()) rss[i].SetAddr(rs.IP) - rss[i].SetInhibited(rs.Inhibited) rss[i].SetOverloaded(rs.Overloaded) rss[i].SetFwdMode(fwdmode) + // NOTE: inhibited set by healthcheck module with API /vs/${ID}/rs/health only + // we clear it default + inhibited := false + if rs.Inhibited != nil { + inhibited = *rs.Inhibited + } + rss[i].SetInhibited(&inhibited) + } + + shareSnapshot := settings.ShareSnapshot() + if shareSnapshot.ServiceLock(params.VipPort) { + defer shareSnapshot.ServiceUnlock(params.VipPort) } result := front.Update(rss, h.connPool, h.logger) switch result { case types.EDPVS_EXIST, types.EDPVS_OK: - settings.ShareSnapshot().ServiceVersionUpdate(params.VipPort, h.logger) + // Update Snapshot + vsModel := shareSnapshot.ServiceGet(params.VipPort) + if vsModel == nil { + spec := types.NewVirtualServerSpec() + err := spec.ParseVipPortProto(params.VipPort) + if err != nil { + h.logger.Warn("Convert to virtual server failed.", "VipPort", params.VipPort, "Error", err.Error()) + // FIXME return + } + vss, err := spec.Get(h.connPool, h.logger) + if err != nil { + h.logger.Error("Get virtual server failed.", "svc VipPort", params.VipPort, "Error", err.Error()) + // FIXME return + } + + for _, vs := range vss { + if strings.EqualFold(vs.ID(), spec.ID()) { + shareSnapshot.ServiceAdd(vs) + break + } + } + } else { + vsModel.RSs = &models.RealServerExpandList{ + Items: make([]*models.RealServerSpecExpand, len(rss)), + } + + for i, rs := range rss { + vsModel.RSs.Items[i] = rs.GetModel() + } + } + + shareSnapshot.ServiceVersionUpdate(params.VipPort, h.logger) + h.logger.Info("Set real server to virtual server success.", "VipPort", params.VipPort, "rss", rss, "result", result.String()) return apiVs.NewPostVsVipPortRsOK() default: diff --git a/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port.go b/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port.go index 0ee30e708..b040e2074 100644 --- a/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port.go +++ b/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port.go @@ -17,6 +17,7 @@ package ipvs import ( "strings" + "github.com/dpvs-agent/models" "github.com/dpvs-agent/pkg/ipc/pool" "github.com/dpvs-agent/pkg/ipc/types" "github.com/dpvs-agent/pkg/settings" @@ -85,23 +86,59 @@ func (h *putVsItem) Handle(params apiVs.PutVsVipPortParams) middleware.Responder } } + shareSnapshot := settings.ShareSnapshot() result := vs.Add(h.connPool, h.logger) h.logger.Info("Add virtual server done.", "vs", vs, "result", result.String()) switch result { case types.EDPVS_OK: // return 201 - settings.ShareSnapshot().ServiceAdd(vs) + shareSnapshot.ServiceAdd(vs) h.logger.Info("Created new virtual server success.", "VipPort", params.VipPort) return apiVs.NewPutVsVipPortCreated() case types.EDPVS_EXIST: h.logger.Info("The virtual server already exist! Try to update.", "VipPort", params.VipPort) + + if shareSnapshot.ServiceLock(vs.ID()) { + defer shareSnapshot.ServiceUnlock(vs.ID()) + } + reason := vs.Update(h.connPool, h.logger) if reason != types.EDPVS_OK { // return 461 h.logger.Error("Update virtual server failed.", "VipPort", params.VipPort, "reason", reason.String()) return apiVs.NewPutVsVipPortInvalidBackend() } + + newVsModel := vs.GetModel() + vsModel := shareSnapshot.ServiceGet(vs.ID()) + if vsModel == nil { + newVsModel.RSs = &models.RealServerExpandList{ + Items: make([]*models.RealServerSpecExpand, 0), + } + shareSnapshot.ServiceUpsert(newVsModel) + return apiVs.NewPutVsVipPortOK() + } + + vsModel.Bps = newVsModel.Bps + vsModel.ConnTimeout = newVsModel.ConnTimeout + vsModel.LimitProportion = newVsModel.LimitProportion + vsModel.ExpireQuiescent = newVsModel.ExpireQuiescent + vsModel.Fwmark = newVsModel.Fwmark + vsModel.SynProxy = newVsModel.SynProxy + vsModel.Match = newVsModel.Match + vsModel.SchedName = newVsModel.SchedName + vsModel.Timeout = newVsModel.Timeout + vsModel.Flags = newVsModel.Flags + if vsModel.RSs == nil { + vsModel.RSs = &models.RealServerExpandList{} + } + + if vsModel.RSs.Items == nil { + vsModel.RSs.Items = make([]*models.RealServerSpecExpand, 0) + } + h.logger.Info("Update virtual server success.", "VipPort", params.VipPort) + // return 200 return apiVs.NewPutVsVipPortOK() default: diff --git a/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port_rs.go b/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port_rs.go index 2c596ab27..0699daa5b 100644 --- a/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port_rs.go +++ b/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port_rs.go @@ -15,8 +15,12 @@ package ipvs import ( + "fmt" + "strings" + "github.com/dpvs-agent/pkg/ipc/pool" "github.com/dpvs-agent/pkg/ipc/types" + "github.com/dpvs-agent/pkg/settings" apiVs "github.com/dpvs-agent/restapi/operations/virtualserver" @@ -45,7 +49,7 @@ func (h *putVsRs) Handle(params apiVs.PutVsVipPortRsParams) middleware.Responder } var rss []*types.RealServerSpec - if params.Rss != nil { + if params.Rss != nil && params.Rss.Items != nil { rss = make([]*types.RealServerSpec, len(params.Rss.Items)) for i, rs := range params.Rss.Items { var fwdmode types.DpvsFwdMode @@ -57,9 +61,21 @@ func (h *putVsRs) Handle(params apiVs.PutVsVipPortRsParams) middleware.Responder rss[i].SetProto(front.GetProto()) rss[i].SetWeight(uint32(rs.Weight)) rss[i].SetFwdMode(fwdmode) - rss[i].SetInhibited(rs.Inhibited) rss[i].SetOverloaded(rs.Overloaded) + // NOTE: inhibited set by healthcheck module with API /vs/${ID}/rs/health only + // we clear it default + inhibited := false + if rs.Inhibited != nil { + inhibited = *rs.Inhibited + } + rss[i].SetInhibited(&inhibited) } + h.logger.Info("Apply real server update.", "VipPort", params.VipPort, "rss", rss) + } + + shareSnapshot := settings.ShareSnapshot() + if shareSnapshot.ServiceLock(params.VipPort) { + defer shareSnapshot.ServiceUnlock(params.VipPort) } existOnly := false @@ -69,6 +85,33 @@ func (h *putVsRs) Handle(params apiVs.PutVsVipPortRsParams) middleware.Responder switch result { case types.EDPVS_EXIST, types.EDPVS_OK: h.logger.Info("Set real server sets success.", "VipPort", params.VipPort, "rss", rss, "result", result.String()) + // Update Snapshot + vsModel := shareSnapshot.ServiceGet(params.VipPort) + newRSs := make([]*types.RealServerSpec, 0) + for _, newRs := range rss { + exist := false + for _, cacheRs := range vsModel.RSs.Items { + rsID := fmt.Sprintf("%s:%d", cacheRs.Spec.IP, cacheRs.Spec.Port) + if strings.EqualFold(newRs.ID(), rsID) { + // update weight only + inhibited := newRs.GetInhibited() + cacheRs.Spec.Weight = uint16(newRs.GetWeight()) + cacheRs.Spec.Inhibited = &inhibited + exist = true + break + } + } + + if !exist { + newRSs = append(newRSs, newRs) + } + } + + for _, rs := range newRSs { + vsModel.RSs.Items = append(vsModel.RSs.Items, rs.GetModel()) + } + + shareSnapshot.ServiceVersionUpdate(params.VipPort, h.logger) return apiVs.NewPutVsVipPortRsOK() case types.EDPVS_NOTEXIST: h.logger.Error("Unreachable branch") diff --git a/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port_rs_health.go b/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port_rs_health.go index 31d56449f..e86b55930 100644 --- a/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port_rs_health.go +++ b/tools/dpvs-agent/cmd/ipvs/put_vs_vip_port_rs_health.go @@ -15,13 +15,14 @@ package ipvs import ( + "fmt" "strings" + "github.com/dpvs-agent/models" "github.com/dpvs-agent/pkg/ipc/pool" "github.com/dpvs-agent/pkg/ipc/types" "github.com/dpvs-agent/pkg/settings" - "github.com/dpvs-agent/models" apiVs "github.com/dpvs-agent/restapi/operations/virtualserver" "github.com/go-openapi/runtime/middleware" @@ -40,7 +41,6 @@ func NewPutVsRsHealth(cp *pool.ConnPool, parentLogger hclog.Logger) *putVsRsHeal } return &putVsRsHealth{connPool: cp, logger: logger} } - func (h *putVsRsHealth) Handle(params apiVs.PutVsVipPortRsHealthParams) middleware.Responder { front := types.NewRealServerFront() if err := front.ParseVipPortProto(params.VipPort); err != nil { @@ -48,64 +48,74 @@ func (h *putVsRsHealth) Handle(params apiVs.PutVsVipPortRsHealthParams) middlewa return apiVs.NewPutVsVipPortRsHealthInvalidFrontend() } - // get active backends - active, err := front.Get(h.connPool, h.logger) - if err != nil { - return apiVs.NewPutVsVipPortRsHealthInvalidBackend() - } - + activeRSs := make(map[string]*models.RealServerSpecExpand) + var vsModel *models.VirtualServerSpecExpand shareSnapshot := settings.ShareSnapshot() - version := shareSnapshot.ServiceVersion(params.VipPort) - - activeRSs := make(map[string]*types.RealServerSpec) - for _, rs := range active { - activeRSs[rs.ID()] = rs - } + if shareSnapshot.ServiceRLock(params.VipPort) { + defer shareSnapshot.ServiceRUnlock(params.VipPort) - rssModels := new(models.RealServerExpandList) - rssModels.Items = make([]*models.RealServerSpecExpand, len(active)) - validRSs := make([]*types.RealServerSpec, 0) - if params.Rss != nil { - for i, rs := range params.Rss.Items { - var fwdmode types.DpvsFwdMode - fwdmode.FromString(rs.Mode) - newRs := types.NewRealServerSpec() - newRs.SetAf(front.GetAf()) - newRs.SetAddr(rs.IP) - newRs.SetPort(rs.Port) - newRs.SetProto(front.GetProto()) - newRs.SetWeight(uint32(rs.Weight)) - newRs.SetFwdMode(fwdmode) - newRs.SetInhibited(rs.Inhibited) - newRs.SetOverloaded(rs.Overloaded) + vsModel = shareSnapshot.ServiceGet(params.VipPort) + if vsModel != nil { + for _, rs := range vsModel.RSs.Items { + rsModel := (*types.RealServerSpecExpandModel)(rs) + activeRSs[rsModel.ID()] = rs + } - if activeRs, existed := activeRSs[newRs.ID()]; existed { - rssModels.Items[i] = activeRs.GetModel() - validRSs = append(validRSs, newRs) + validRSs := make([]*types.RealServerSpec, 0) + if params.Rss != nil && params.Rss.Items != nil { + for _, rs := range params.Rss.Items { + var fwdmode types.DpvsFwdMode + fwdmode.FromString(rs.Mode) + newRs := types.NewRealServerSpec() + newRs.SetAf(front.GetAf()) + newRs.SetAddr(rs.IP) + newRs.SetPort(rs.Port) + newRs.SetProto(front.GetProto()) + newRs.SetWeight(uint32(rs.Weight)) + newRs.SetFwdMode(fwdmode) + newRs.SetInhibited(rs.Inhibited) + newRs.SetOverloaded(rs.Overloaded) + if _, existed := activeRSs[newRs.ID()]; existed { + validRSs = append(validRSs, newRs) + from := activeRSs[newRs.ID()].Spec + to := newRs + h.logger.Info("real server update.", "ID", newRs.ID(), "client Version", params.Version, "from", from, "to", to) + } + } } - } - } - if !strings.EqualFold(params.Version, version) { - h.logger.Info("The service", "VipPort", params.VipPort, "version expired. The newest version", version) - return apiVs.NewPutVsVipPortRsHealthUnexpected().WithPayload(rssModels) - } + if !strings.EqualFold(vsModel.Version, params.Version) { + h.logger.Info("The service", "VipPort", params.VipPort, "version expired. Latest Version", vsModel.Version, "Client Version", params.Version) + return apiVs.NewPutVsVipPortRsHealthUnexpected().WithPayload(vsModel) + } - existOnly := true - result := front.Edit(existOnly, validRSs, h.connPool, h.logger) - switch result { - case types.EDPVS_EXIST, types.EDPVS_OK: - h.logger.Info("Set real server sets success.", "VipPort", params.VipPort, "validRSs", validRSs, "result", result.String()) - return apiVs.NewPutVsVipPortRsHealthOK().WithPayload(rssModels) - case types.EDPVS_NOTEXIST: - if existOnly { - h.logger.Error("Edit not exist real server.", "VipPort", params.VipPort, "validRSs", validRSs, "result", result.String()) - return apiVs.NewPutVsVipPortRsHealthInvalidFrontend() + existOnly := true + result := front.Edit(existOnly, validRSs, h.connPool, h.logger) + switch result { + case types.EDPVS_EXIST, types.EDPVS_OK: + for _, newRs := range validRSs { + for _, rs := range vsModel.RSs.Items { + rsID := fmt.Sprintf("%s:%d", rs.Spec.IP, rs.Spec.Port) + if strings.EqualFold(newRs.ID(), rsID) { + inhibited := newRs.GetInhibited() + rs.Spec.Inhibited = &inhibited + break + } + } + } + h.logger.Info("Set real server sets success.", "VipPort", params.VipPort, "validRSs", validRSs, "result", result.String()) + return apiVs.NewPutVsVipPortRsHealthOK() + case types.EDPVS_NOTEXIST: + if existOnly { + h.logger.Error("Edit not exist real server.", "VipPort", params.VipPort, "validRSs", validRSs, "result", result.String()) + return apiVs.NewPutVsVipPortRsHealthInvalidFrontend() + } + h.logger.Error("Unreachable branch") + default: + h.logger.Error("Set real server sets failed.", "VipPort", params.VipPort, "validRSs", validRSs, "result", result.String()) + return apiVs.NewPutVsVipPortRsHealthInvalidBackend() + } } - h.logger.Error("Unreachable branch") - default: - h.logger.Error("Set real server sets failed.", "VipPort", params.VipPort, "validRSs", validRSs, "result", result.String()) - return apiVs.NewPutVsVipPortRsHealthInvalidBackend() } return apiVs.NewPutVsVipPortRsHealthFailure() } diff --git a/tools/dpvs-agent/dpvs-agent-api.yaml b/tools/dpvs-agent/dpvs-agent-api.yaml index 0640a145c..f06d79c92 100644 --- a/tools/dpvs-agent/dpvs-agent-api.yaml +++ b/tools/dpvs-agent/dpvs-agent-api.yaml @@ -504,6 +504,8 @@ definitions: - tcp - udp - ping + - udpping + - http VirtualServerList: type: object properties: @@ -613,8 +615,6 @@ definitions: LimitProportion: type: "integer" format: "uint32" - #Addr: - # type: "string" ProxyProtocol: type: "string" enum: @@ -948,18 +948,25 @@ paths: parameters: - "$ref": "#/parameters/stats" - "$ref": "#/parameters/snapshot" + - "$ref": "#/parameters/healthcheck" summary: "display all vip:port:proto and rsip:port list" responses: '200': description: Success schema: "$ref": "#/definitions/VirtualServerList" + '204': + description: No Content + x-go-name: NoContent + schema: + "$ref": "#/definitions/VirtualServerList" /vs/{VipPort}: get: tags: - "virtualserver" parameters: - "$ref": "#/parameters/snapshot" + - "$ref": "#/parameters/healthcheck" - "$ref": "#/parameters/service-id" - "$ref": "#/parameters/stats" summary: "get a specific virtual server" @@ -1110,13 +1117,14 @@ paths: responses: '200': description: Success - schema: - "$ref": "#/definitions/RealServerExpandList" + #schema: + # "$ref": "#/definitions/RealServerExpandList" '270': description: "the rss-config parameter is outdated, update nothing and return the latest rs info" x-go-name: Unexpected schema: - "$ref": "#/definitions/RealServerExpandList" + # "$ref": "#/definitions/RealServerExpandList" + "$ref": "#/definitions/VirtualServerSpecExpand" '460': description: Invalid frontend in service configuration x-go-name: InvalidFrontend diff --git a/tools/dpvs-agent/models/dest_check_spec.go b/tools/dpvs-agent/models/dest_check_spec.go index 2e2e9cf9a..a2870aa5e 100644 --- a/tools/dpvs-agent/models/dest_check_spec.go +++ b/tools/dpvs-agent/models/dest_check_spec.go @@ -41,6 +41,12 @@ const ( // DestCheckSpecPing captures enum value "ping" DestCheckSpecPing DestCheckSpec = "ping" + + // DestCheckSpecUdpping captures enum value "udpping" + DestCheckSpecUdpping DestCheckSpec = "udpping" + + // DestCheckSpecHTTP captures enum value "http" + DestCheckSpecHTTP DestCheckSpec = "http" ) // for schema @@ -48,7 +54,7 @@ var destCheckSpecEnum []interface{} func init() { var res []DestCheckSpec - if err := json.Unmarshal([]byte(`["passive","tcp","udp","ping"]`), &res); err != nil { + if err := json.Unmarshal([]byte(`["passive","tcp","udp","ping","udpping","http"]`), &res); err != nil { panic(err) } for _, v := range res { diff --git a/tools/dpvs-agent/pkg/ipc/types/snapshot.go b/tools/dpvs-agent/pkg/ipc/types/snapshot.go index 4491d82c9..28b713b08 100644 --- a/tools/dpvs-agent/pkg/ipc/types/snapshot.go +++ b/tools/dpvs-agent/pkg/ipc/types/snapshot.go @@ -3,9 +3,11 @@ package types import ( "encoding/json" "fmt" + "net" "os" "strconv" "strings" + "sync" "time" "golang.org/x/sys/unix" @@ -14,62 +16,186 @@ import ( "github.com/hashicorp/go-hclog" ) +type ServiceSnapshot struct { + Service *models.VirtualServerSpecExpand + lock *sync.RWMutex +} + type NodeSnapshot struct { NodeSpec *models.DpvsNodeSpec - Services map[string]*models.VirtualServerSpecExpand + Snapshot map[string]*ServiceSnapshot +} + +func (snap *ServiceSnapshot) Lock() { + snap.lock.Lock() +} + +func (snap *ServiceSnapshot) Unlock() { + snap.lock.Unlock() +} + +func (snap *ServiceSnapshot) RLock() { + snap.lock.RLock() +} + +func (snap *ServiceSnapshot) RUnlock() { + snap.lock.RUnlock() +} + +func (node *NodeSnapshot) SnapshotID(id string) string { + items := strings.Split(id, "-") + if len(items) != 3 { + return "" + } + + proto := items[2] + svcProto := "tcp" + switch strings.ToLower(proto) { + case "udp", "tcp": + svcProto = strings.ToLower(proto) + default: + return "" + } + + port, err := strconv.Atoi(items[1]) + if err != nil { + return "" + } + vsPort := uint16(port) + + vip := net.ParseIP(items[0]) + if vip == nil { + return "" + } + + return fmt.Sprintf("%s-%d-%s", strings.ToLower(vip.String()), vsPort, svcProto) +} + +func (node *NodeSnapshot) ServiceRLock(id string) bool { + snapID := node.SnapshotID(id) + + snap, exist := node.Snapshot[strings.ToLower(snapID)] + if exist { + snap.RLock() + } + + return exist } -func (snapshot *NodeSnapshot) ServiceVersionUpdate(id string, logger hclog.Logger) { - services := snapshot.Services - logger.Info("Update server version begin.", "id", id, "services", services) - if _, exist := services[strings.ToLower(id)]; exist { - services[strings.ToLower(id)].Version = strconv.FormatInt(time.Now().UnixNano()/1e6, 10) +func (node *NodeSnapshot) ServiceRUnlock(id string) { + snapID := node.SnapshotID(id) + if snap, exist := node.Snapshot[strings.ToLower(snapID)]; exist { + snap.RUnlock() + } +} + +func (node *NodeSnapshot) ServiceLock(id string) bool { + snapID := node.SnapshotID(id) + snap, exist := node.Snapshot[strings.ToLower(snapID)] + if exist { + snap.Lock() + } + + return exist +} + +func (node *NodeSnapshot) ServiceUnlock(id string) { + snapID := node.SnapshotID(id) + if snap, exist := node.Snapshot[strings.ToLower(snapID)]; exist { + snap.Unlock() + } +} + +func (node *NodeSnapshot) ServiceVersionUpdate(id string, logger hclog.Logger) { + snapID := node.SnapshotID(id) + snapshot := node.Snapshot + logger.Info("Update server version begin.", "id", id, "services snapshot", snapshot) + if _, exist := snapshot[strings.ToLower(snapID)]; exist { + expireVersion := snapshot[strings.ToLower(snapID)].Service.Version + snapshot[strings.ToLower(snapID)].Service.Version = strconv.FormatInt(time.Now().UnixNano()/1e6, 10) + latestVersion := snapshot[strings.ToLower(snapID)].Service.Version + + logger.Info("Service version update done.", "expireVersion", expireVersion, "latest Version", latestVersion) return } - logger.Error("Update service version failed.", "id", id) + logger.Error("Update service version failed. Service not Exist.", "id", id) +} + +func (node *NodeSnapshot) SnapshotGet(id string) *ServiceSnapshot { + snapID := node.SnapshotID(id) + if snap, exist := node.Snapshot[strings.ToLower(snapID)]; exist { + return snap + } + return nil } -func (snapshot *NodeSnapshot) ServiceDel(id string) { - if _, exist := snapshot.Services[strings.ToLower(id)]; exist { - delete(snapshot.Services, strings.ToLower(id)) +func (node *NodeSnapshot) ServiceGet(id string) *models.VirtualServerSpecExpand { + snapID := node.SnapshotID(id) + if snap, exist := node.Snapshot[strings.ToLower(snapID)]; exist { + return snap.Service } + return nil } -func (snapshot *NodeSnapshot) ServiceVersion(id string) string { - if _, exist := snapshot.Services[strings.ToLower(id)]; exist { - return snapshot.Services[strings.ToLower(id)].Version +func (node *NodeSnapshot) ServiceDel(id string) { + snapID := node.SnapshotID(id) + if _, exist := node.Snapshot[strings.ToLower(snapID)]; exist { + delete(node.Snapshot, strings.ToLower(snapID)) + } +} + +func (node *NodeSnapshot) ServiceVersion(id string) string { + snapID := node.SnapshotID(id) + if _, exist := node.Snapshot[strings.ToLower(snapID)]; exist { + return node.Snapshot[strings.ToLower(snapID)].Service.Version } return strconv.FormatInt(time.Now().UnixNano()/1e6, 10) } -func (snapshot *NodeSnapshot) ServiceAdd(vs *VirtualServerSpec) { - version := snapshot.ServiceVersion(vs.ID()) +func (node *NodeSnapshot) ServiceAdd(vs *VirtualServerSpec) { + version := node.ServiceVersion(vs.ID()) - snapshot.Services[strings.ToLower(vs.ID())] = vs.GetModel() + svc := vs.GetModel() + svc.Version = version + if svc.RSs == nil { + svc.RSs = &models.RealServerExpandList{Items: make([]*models.RealServerSpecExpand, 0)} + } - snapshot.Services[strings.ToLower(vs.ID())].Version = version + node.Snapshot[strings.ToLower(vs.ID())] = &ServiceSnapshot{Service: svc, lock: new(sync.RWMutex)} } -func (snapshot *NodeSnapshot) ServiceUpsert(spec *models.VirtualServerSpecExpand) { +func (node *NodeSnapshot) ServiceUpsert(spec *models.VirtualServerSpecExpand) { svc := (*VirtualServerSpecExpandModel)(spec) - version := snapshot.ServiceVersion(svc.ID()) + version := node.ServiceVersion(svc.ID()) - snapshot.Services[strings.ToLower(svc.ID())] = spec + if _, exist := node.Snapshot[strings.ToLower(svc.ID())]; !exist { + node.Snapshot[strings.ToLower(svc.ID())] = &ServiceSnapshot{Service: spec, lock: new(sync.RWMutex)} + } else { + node.Snapshot[strings.ToLower(svc.ID())].Service = spec + } - snapshot.Services[strings.ToLower(svc.ID())].Version = version + node.Snapshot[strings.ToLower(svc.ID())].Service.Version = version } -func (snapshot *NodeSnapshot) GetModels(logger hclog.Logger) *models.VirtualServerList { - services := &models.VirtualServerList{Items: make([]*models.VirtualServerSpecExpand, len(snapshot.Services))} +func (node *NodeSnapshot) GetModels(logger hclog.Logger) *models.VirtualServerList { + services := &models.VirtualServerList{Items: make([]*models.VirtualServerSpecExpand, len(node.Snapshot))} i := 0 - for _, svc := range snapshot.Services { - services.Items[i] = svc + for _, snap := range node.Snapshot { + services.Items[i] = snap.Service i++ } + + logger.Info("services", services) return services } +type RealServerSpecExpandModel models.RealServerSpecExpand + +func (rs *RealServerSpecExpandModel) ID() string { + return fmt.Sprintf("%s:%d", net.ParseIP(rs.Spec.IP), rs.Spec.Port) +} + type VirtualServerSpecExpandModel models.VirtualServerSpecExpand func (spec *VirtualServerSpecExpandModel) ID() string { @@ -77,10 +203,11 @@ func (spec *VirtualServerSpecExpandModel) ID() string { if spec.Proto == unix.IPPROTO_UDP { proto = "udp" } - return fmt.Sprintf("%s-%d-%s", spec.Addr, spec.Port, proto) + + return fmt.Sprintf("%s-%d-%s", net.ParseIP(spec.Addr).String(), spec.Port, proto) } -func (snapshot *NodeSnapshot) LoadFrom(cacheFile string, logger hclog.Logger) error { +func (node *NodeSnapshot) LoadFrom(cacheFile string, logger hclog.Logger) error { content, err := os.ReadFile(cacheFile) if err != nil { logger.Error("Read dpvs service cache file failed.", "Error", err.Error()) @@ -92,19 +219,19 @@ func (snapshot *NodeSnapshot) LoadFrom(cacheFile string, logger hclog.Logger) er return err } - snapshot.NodeSpec = nodeSnapshot.NodeSpec + node.NodeSpec = nodeSnapshot.NodeSpec for _, svcModel := range nodeSnapshot.Services.Items { svc := (*VirtualServerSpecExpandModel)(svcModel) - snapshot.Services[strings.ToLower(svc.ID())] = svcModel + node.Snapshot[strings.ToLower(svc.ID())].Service = svcModel } return nil } -func (snapshot *NodeSnapshot) DumpTo(cacheFile string, logger hclog.Logger) error { +func (node *NodeSnapshot) DumpTo(cacheFile string, logger hclog.Logger) error { nodeSnapshot := &models.NodeServiceSnapshot{ - NodeSpec: snapshot.NodeSpec, - Services: snapshot.GetModels(logger), + NodeSpec: node.NodeSpec, + Services: node.GetModels(logger), } content, err := json.Marshal(nodeSnapshot) diff --git a/tools/dpvs-agent/pkg/settings/settings.go b/tools/dpvs-agent/pkg/settings/settings.go index 5027d4344..218e8302f 100644 --- a/tools/dpvs-agent/pkg/settings/settings.go +++ b/tools/dpvs-agent/pkg/settings/settings.go @@ -23,7 +23,7 @@ func setUp() { NodeSpec: &models.DpvsNodeSpec{ AnnouncePort: &models.VsAnnouncePort{}, }, - Services: make(map[string]*models.VirtualServerSpecExpand), + Snapshot: make(map[string]*types.ServiceSnapshot), } } diff --git a/tools/dpvs-agent/restapi/embedded_spec.go b/tools/dpvs-agent/restapi/embedded_spec.go index f55b028fe..aef67a7fe 100644 --- a/tools/dpvs-agent/restapi/embedded_spec.go +++ b/tools/dpvs-agent/restapi/embedded_spec.go @@ -578,6 +578,9 @@ func init() { }, { "$ref": "#/parameters/snapshot" + }, + { + "$ref": "#/parameters/healthcheck" } ], "responses": { @@ -586,6 +589,13 @@ func init() { "schema": { "$ref": "#/definitions/VirtualServerList" } + }, + "204": { + "description": "No Content", + "schema": { + "$ref": "#/definitions/VirtualServerList" + }, + "x-go-name": "NoContent" } } } @@ -600,6 +610,9 @@ func init() { { "$ref": "#/parameters/snapshot" }, + { + "$ref": "#/parameters/healthcheck" + }, { "$ref": "#/parameters/service-id" }, @@ -1370,15 +1383,12 @@ func init() { ], "responses": { "200": { - "description": "Success", - "schema": { - "$ref": "#/definitions/RealServerExpandList" - } + "description": "Success" }, "270": { "description": "the rss-config parameter is outdated, update nothing and return the latest rs info", "schema": { - "$ref": "#/definitions/RealServerExpandList" + "$ref": "#/definitions/VirtualServerSpecExpand" }, "x-go-name": "Unexpected" }, @@ -1443,7 +1453,9 @@ func init() { "passive", "tcp", "udp", - "ping" + "ping", + "udpping", + "http" ] }, "DpvsNodeSpec": { @@ -2963,6 +2975,12 @@ func init() { "default": true, "name": "snapshot", "in": "query" + }, + { + "type": "boolean", + "default": false, + "name": "healthcheck", + "in": "query" } ], "responses": { @@ -2971,6 +2989,13 @@ func init() { "schema": { "$ref": "#/definitions/VirtualServerList" } + }, + "204": { + "description": "No Content", + "schema": { + "$ref": "#/definitions/VirtualServerList" + }, + "x-go-name": "NoContent" } } } @@ -2988,6 +3013,12 @@ func init() { "name": "snapshot", "in": "query" }, + { + "type": "boolean", + "default": false, + "name": "healthcheck", + "in": "query" + }, { "type": "string", "name": "VipPort", @@ -3894,15 +3925,12 @@ func init() { ], "responses": { "200": { - "description": "Success", - "schema": { - "$ref": "#/definitions/RealServerExpandList" - } + "description": "Success" }, "270": { "description": "the rss-config parameter is outdated, update nothing and return the latest rs info", "schema": { - "$ref": "#/definitions/RealServerExpandList" + "$ref": "#/definitions/VirtualServerSpecExpand" }, "x-go-name": "Unexpected" }, @@ -3967,7 +3995,9 @@ func init() { "passive", "tcp", "udp", - "ping" + "ping", + "udpping", + "http" ] }, "DpvsNodeSpec": { diff --git a/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_parameters.go b/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_parameters.go index 4c527457b..baac157f1 100644 --- a/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_parameters.go +++ b/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_parameters.go @@ -22,11 +22,14 @@ func NewGetVsParams() GetVsParams { var ( // initialize parameters with default values - snapshotDefault = bool(true) - statsDefault = bool(false) + healthcheckDefault = bool(false) + snapshotDefault = bool(true) + statsDefault = bool(false) ) return GetVsParams{ + Healthcheck: &healthcheckDefault, + Snapshot: &snapshotDefault, Stats: &statsDefault, @@ -42,6 +45,11 @@ type GetVsParams struct { // HTTP Request Object HTTPRequest *http.Request `json:"-"` + /* + In: query + Default: false + */ + Healthcheck *bool /* In: query Default: true @@ -65,6 +73,11 @@ func (o *GetVsParams) BindRequest(r *http.Request, route *middleware.MatchedRout qs := runtime.Values(r.URL.Query()) + qHealthcheck, qhkHealthcheck, _ := qs.GetOK("healthcheck") + if err := o.bindHealthcheck(qHealthcheck, qhkHealthcheck, route.Formats); err != nil { + res = append(res, err) + } + qSnapshot, qhkSnapshot, _ := qs.GetOK("snapshot") if err := o.bindSnapshot(qSnapshot, qhkSnapshot, route.Formats); err != nil { res = append(res, err) @@ -80,6 +93,30 @@ func (o *GetVsParams) BindRequest(r *http.Request, route *middleware.MatchedRout return nil } +// bindHealthcheck binds and validates parameter Healthcheck from query. +func (o *GetVsParams) bindHealthcheck(rawData []string, hasKey bool, formats strfmt.Registry) error { + var raw string + if len(rawData) > 0 { + raw = rawData[len(rawData)-1] + } + + // Required: false + // AllowEmptyValue: false + + if raw == "" { // empty values pass all other validations + // Default values have been previously initialized by NewGetVsParams() + return nil + } + + value, err := swag.ConvertBool(raw) + if err != nil { + return errors.InvalidType("healthcheck", "query", "bool", raw) + } + o.Healthcheck = &value + + return nil +} + // bindSnapshot binds and validates parameter Snapshot from query. func (o *GetVsParams) bindSnapshot(rawData []string, hasKey bool, formats strfmt.Registry) error { var raw string diff --git a/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_responses.go b/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_responses.go index c67eec7a7..701d342e1 100644 --- a/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_responses.go +++ b/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_responses.go @@ -57,3 +57,48 @@ func (o *GetVsOK) WriteResponse(rw http.ResponseWriter, producer runtime.Produce } } } + +// GetVsNoContentCode is the HTTP code returned for type GetVsNoContent +const GetVsNoContentCode int = 204 + +/* +GetVsNoContent No Content + +swagger:response getVsNoContent +*/ +type GetVsNoContent struct { + + /* + In: Body + */ + Payload *models.VirtualServerList `json:"body,omitempty"` +} + +// NewGetVsNoContent creates GetVsNoContent with default headers values +func NewGetVsNoContent() *GetVsNoContent { + + return &GetVsNoContent{} +} + +// WithPayload adds the payload to the get vs no content response +func (o *GetVsNoContent) WithPayload(payload *models.VirtualServerList) *GetVsNoContent { + o.Payload = payload + return o +} + +// SetPayload sets the payload to the get vs no content response +func (o *GetVsNoContent) SetPayload(payload *models.VirtualServerList) { + o.Payload = payload +} + +// WriteResponse to the client +func (o *GetVsNoContent) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) { + + rw.WriteHeader(204) + if o.Payload != nil { + payload := o.Payload + if err := producer.Produce(rw, payload); err != nil { + panic(err) // let the recovery middleware deal with this + } + } +} diff --git a/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_urlbuilder.go b/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_urlbuilder.go index 35fc9b8f6..3c2fbac4d 100644 --- a/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_urlbuilder.go +++ b/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_urlbuilder.go @@ -15,8 +15,9 @@ import ( // GetVsURL generates an URL for the get vs operation type GetVsURL struct { - Snapshot *bool - Stats *bool + Healthcheck *bool + Snapshot *bool + Stats *bool _basePath string // avoid unkeyed usage @@ -52,6 +53,14 @@ func (o *GetVsURL) Build() (*url.URL, error) { qs := make(url.Values) + var healthcheckQ string + if o.Healthcheck != nil { + healthcheckQ = swag.FormatBool(*o.Healthcheck) + } + if healthcheckQ != "" { + qs.Set("healthcheck", healthcheckQ) + } + var snapshotQ string if o.Snapshot != nil { snapshotQ = swag.FormatBool(*o.Snapshot) diff --git a/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_vip_port_parameters.go b/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_vip_port_parameters.go index 77ef69f53..e5e06dc91 100644 --- a/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_vip_port_parameters.go +++ b/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_vip_port_parameters.go @@ -22,11 +22,14 @@ func NewGetVsVipPortParams() GetVsVipPortParams { var ( // initialize parameters with default values - snapshotDefault = bool(true) - statsDefault = bool(false) + healthcheckDefault = bool(false) + snapshotDefault = bool(true) + statsDefault = bool(false) ) return GetVsVipPortParams{ + Healthcheck: &healthcheckDefault, + Snapshot: &snapshotDefault, Stats: &statsDefault, @@ -47,6 +50,11 @@ type GetVsVipPortParams struct { In: path */ VipPort string + /* + In: query + Default: false + */ + Healthcheck *bool /* In: query Default: true @@ -75,6 +83,11 @@ func (o *GetVsVipPortParams) BindRequest(r *http.Request, route *middleware.Matc res = append(res, err) } + qHealthcheck, qhkHealthcheck, _ := qs.GetOK("healthcheck") + if err := o.bindHealthcheck(qHealthcheck, qhkHealthcheck, route.Formats); err != nil { + res = append(res, err) + } + qSnapshot, qhkSnapshot, _ := qs.GetOK("snapshot") if err := o.bindSnapshot(qSnapshot, qhkSnapshot, route.Formats); err != nil { res = append(res, err) @@ -104,6 +117,30 @@ func (o *GetVsVipPortParams) bindVipPort(rawData []string, hasKey bool, formats return nil } +// bindHealthcheck binds and validates parameter Healthcheck from query. +func (o *GetVsVipPortParams) bindHealthcheck(rawData []string, hasKey bool, formats strfmt.Registry) error { + var raw string + if len(rawData) > 0 { + raw = rawData[len(rawData)-1] + } + + // Required: false + // AllowEmptyValue: false + + if raw == "" { // empty values pass all other validations + // Default values have been previously initialized by NewGetVsVipPortParams() + return nil + } + + value, err := swag.ConvertBool(raw) + if err != nil { + return errors.InvalidType("healthcheck", "query", "bool", raw) + } + o.Healthcheck = &value + + return nil +} + // bindSnapshot binds and validates parameter Snapshot from query. func (o *GetVsVipPortParams) bindSnapshot(rawData []string, hasKey bool, formats strfmt.Registry) error { var raw string diff --git a/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_vip_port_urlbuilder.go b/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_vip_port_urlbuilder.go index 0344bdc38..e09106c05 100644 --- a/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_vip_port_urlbuilder.go +++ b/tools/dpvs-agent/restapi/operations/virtualserver/get_vs_vip_port_urlbuilder.go @@ -18,8 +18,9 @@ import ( type GetVsVipPortURL struct { VipPort string - Snapshot *bool - Stats *bool + Healthcheck *bool + Snapshot *bool + Stats *bool _basePath string // avoid unkeyed usage @@ -62,6 +63,14 @@ func (o *GetVsVipPortURL) Build() (*url.URL, error) { qs := make(url.Values) + var healthcheckQ string + if o.Healthcheck != nil { + healthcheckQ = swag.FormatBool(*o.Healthcheck) + } + if healthcheckQ != "" { + qs.Set("healthcheck", healthcheckQ) + } + var snapshotQ string if o.Snapshot != nil { snapshotQ = swag.FormatBool(*o.Snapshot) diff --git a/tools/dpvs-agent/restapi/operations/virtualserver/put_vs_vip_port_rs_health_responses.go b/tools/dpvs-agent/restapi/operations/virtualserver/put_vs_vip_port_rs_health_responses.go index c3904ebe9..b00b1d3fa 100644 --- a/tools/dpvs-agent/restapi/operations/virtualserver/put_vs_vip_port_rs_health_responses.go +++ b/tools/dpvs-agent/restapi/operations/virtualserver/put_vs_vip_port_rs_health_responses.go @@ -22,11 +22,6 @@ PutVsVipPortRsHealthOK Success swagger:response putVsVipPortRsHealthOK */ type PutVsVipPortRsHealthOK struct { - - /* - In: Body - */ - Payload *models.RealServerExpandList `json:"body,omitempty"` } // NewPutVsVipPortRsHealthOK creates PutVsVipPortRsHealthOK with default headers values @@ -35,27 +30,12 @@ func NewPutVsVipPortRsHealthOK() *PutVsVipPortRsHealthOK { return &PutVsVipPortRsHealthOK{} } -// WithPayload adds the payload to the put vs vip port rs health o k response -func (o *PutVsVipPortRsHealthOK) WithPayload(payload *models.RealServerExpandList) *PutVsVipPortRsHealthOK { - o.Payload = payload - return o -} - -// SetPayload sets the payload to the put vs vip port rs health o k response -func (o *PutVsVipPortRsHealthOK) SetPayload(payload *models.RealServerExpandList) { - o.Payload = payload -} - // WriteResponse to the client func (o *PutVsVipPortRsHealthOK) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) { + rw.Header().Del(runtime.HeaderContentType) //Remove Content-Type on empty responses + rw.WriteHeader(200) - if o.Payload != nil { - payload := o.Payload - if err := producer.Produce(rw, payload); err != nil { - panic(err) // let the recovery middleware deal with this - } - } } // PutVsVipPortRsHealthUnexpectedCode is the HTTP code returned for type PutVsVipPortRsHealthUnexpected @@ -71,7 +51,7 @@ type PutVsVipPortRsHealthUnexpected struct { /* In: Body */ - Payload *models.RealServerExpandList `json:"body,omitempty"` + Payload *models.VirtualServerSpecExpand `json:"body,omitempty"` } // NewPutVsVipPortRsHealthUnexpected creates PutVsVipPortRsHealthUnexpected with default headers values @@ -81,13 +61,13 @@ func NewPutVsVipPortRsHealthUnexpected() *PutVsVipPortRsHealthUnexpected { } // WithPayload adds the payload to the put vs vip port rs health unexpected response -func (o *PutVsVipPortRsHealthUnexpected) WithPayload(payload *models.RealServerExpandList) *PutVsVipPortRsHealthUnexpected { +func (o *PutVsVipPortRsHealthUnexpected) WithPayload(payload *models.VirtualServerSpecExpand) *PutVsVipPortRsHealthUnexpected { o.Payload = payload return o } // SetPayload sets the payload to the put vs vip port rs health unexpected response -func (o *PutVsVipPortRsHealthUnexpected) SetPayload(payload *models.RealServerExpandList) { +func (o *PutVsVipPortRsHealthUnexpected) SetPayload(payload *models.VirtualServerSpecExpand) { o.Payload = payload } diff --git a/tools/healthcheck/pkg/helthcheck/checker.go b/tools/healthcheck/pkg/helthcheck/checker.go index 745924b7b..e756fd7c8 100644 --- a/tools/healthcheck/pkg/helthcheck/checker.go +++ b/tools/healthcheck/pkg/helthcheck/checker.go @@ -24,7 +24,7 @@ import ( log "github.com/golang/glog" ) -const uweightDefault uint16 = 1 +const uweightDefault uint16 = 100 var ( proxyProtoV1LocalCmd = "PROXY UNKNOWN\r\n" @@ -35,7 +35,7 @@ var ( ) // Checks provides a map of healthcheck configurations. -type Checkers struct { +type CheckerConfigs struct { Configs map[Id]*CheckerConfig } @@ -69,7 +69,7 @@ func NewChecker(notify chan<- *Notification, state State, weight uint16) *Checke state: state, uweight: weight, notify: notify, - update: make(chan CheckerConfig, 1), + update: make(chan CheckerConfig, 2), quit: make(chan bool, 1), } } @@ -79,6 +79,7 @@ func (hc *Checker) Status() Status { hc.lock.RLock() defer hc.lock.RUnlock() status := Status{ + Version: hc.Version, LastCheck: hc.start, Failures: hc.failures, Successes: hc.successes, @@ -95,14 +96,55 @@ func (hc *Checker) Status() Status { } func (hc *Checker) updateConfig(conf *CheckerConfig) { - hc.CheckerConfig = *conf - if conf.State != StateUnhealthy { + // Note: + // The conf::Weight must be the original weight not modified by healthcheck program, + // while conf::State reflects the health state derived from the inhibited flag set by + // the healthcheck program. + + //log.Infof("[updateConfig] id(%v) version(%d->%d) target(%v) %v->%v weight(%d->%d)\n", conf.Id, + // hc.Version, conf.Version, conf.Target, hc.State, conf.State, hc.Weight, conf.Weight) + + if len(hc.Id) == 0 { + hc.CheckerConfig = *conf + return + } + + if conf.Version < hc.Version { + return + } + + hc.State = conf.State + hc.lock.Lock() + state := hc.state + hc.state = conf.State + hc.lock.Unlock() + if state != conf.State { + log.Warningf("%v: healthcheck's state changed externally %v -> %v", + hc.Id, state, conf.State) + } + + if conf.State != StateUnhealthy || conf.State == StateUnhealthy && conf.Weight > 0 { + // Update all the checker configs when conf's state is healthy. + hc.Weight = conf.Weight + if conf.Interval > 0 { + hc.Interval = conf.Interval + } + if conf.Timeout > 0 { + hc.Timeout = conf.Timeout + } + if conf.Retry > 0 { + hc.Retry = conf.Retry + } + hc.lock.Lock() weight := hc.uweight hc.uweight = conf.Weight hc.lock.Unlock() + + hc.Version = conf.Version if weight != conf.Weight { - log.Infof("%v: user weight changed %d -> %d", hc.Id, weight, conf.Weight) + log.Warningf("%v: healthcheck's user weight changed %d -> %d", + hc.Id, weight, conf.Weight) } } } @@ -154,8 +196,10 @@ func (hc *Checker) healthcheck() { } status := "SUCCESS" + state := StateHealthy if !result.Success { status = "FAILURE" + state = StateUnhealthy } log.Infof("%v: %s: %v", hc.Id, status, result) @@ -164,15 +208,12 @@ func (hc *Checker) healthcheck() { hc.start = start hc.result = result - var state State if result.Success { - state = StateHealthy hc.failed = 0 hc.successes++ } else { hc.failed++ hc.failures++ - state = StateUnhealthy } if hc.state == StateHealthy && hc.failed > 0 && hc.failed <= uint64(hc.CheckerConfig.Retry) { @@ -222,7 +263,7 @@ func (hc *Checker) Run(start <-chan time.Time) { return case config := <-hc.update: - if hc.Interval != config.Interval { + if config.Interval > 0 && hc.Interval != config.Interval { ticker.Stop() if start != nil { <-start @@ -255,6 +296,6 @@ func (hc *Checker) Update(config *CheckerConfig) { select { case hc.update <- *config: default: - log.Warningf("Unable to update %v, last update still queued", hc.Id) + log.Warningf("Unable to update %v, last two update still queued", hc.Id) } } diff --git a/tools/healthcheck/pkg/helthcheck/configs.go b/tools/healthcheck/pkg/helthcheck/configs.go index 7759928b5..375261cb6 100644 --- a/tools/healthcheck/pkg/helthcheck/configs.go +++ b/tools/healthcheck/pkg/helthcheck/configs.go @@ -63,6 +63,10 @@ func DefaultServerConfig() ServerConfig { type CheckerConfig struct { Id + // Version denotes the virtual service version. It used to protect the vs from + // incorrect weight updates by healthcheck when the vs's weight changed externally. + Version uint64 + Target State Weight uint16 @@ -76,11 +80,12 @@ type CheckerConfig struct { var DefaultCheckConfig CheckerConfig // NewConfig returns an initialised Config. -func NewCheckerConfig(id *Id, checker CheckMethod, - target *Target, state State, weight uint16, - interval, timeout time.Duration, retry uint) *CheckerConfig { +func NewCheckerConfig(id *Id, version uint64, checker CheckMethod, + target *Target, state State, weight uint16, interval, + timeout time.Duration, retry uint) *CheckerConfig { config := CheckerConfig{ Id: *id, + Version: version, Target: *target, State: state, Weight: weight, @@ -89,6 +94,8 @@ func NewCheckerConfig(id *Id, checker CheckMethod, Timeout: timeout, Retry: retry, } - config.BindConfig(&config) + if config.CheckMethod != nil { + config.BindConfig(&config) + } return &config } diff --git a/tools/healthcheck/pkg/helthcheck/http_checker.go b/tools/healthcheck/pkg/helthcheck/http_checker.go index 97b7d5102..099d1ed2c 100644 --- a/tools/healthcheck/pkg/helthcheck/http_checker.go +++ b/tools/healthcheck/pkg/helthcheck/http_checker.go @@ -64,6 +64,7 @@ func NewHttpChecker(method, host, uri string, proxyProto int) *HttpChecker { } return &HttpChecker{ Method: method, + Host: host, Uri: uri, ResponseCodes: []HttpCodeRange{{200, 299}, {300, 399}, {400, 499}}, Response: "", diff --git a/tools/healthcheck/pkg/helthcheck/http_checker_test.go b/tools/healthcheck/pkg/helthcheck/http_checker_test.go index 0fbbff3b4..5d3b3d491 100644 --- a/tools/healthcheck/pkg/helthcheck/http_checker_test.go +++ b/tools/healthcheck/pkg/helthcheck/http_checker_test.go @@ -58,7 +58,7 @@ func TestHttpChecker(t *testing.T) { } */ id := Id(target.String()) - config := NewCheckerConfig(&id, checker, &target, StateUnknown, + config := NewCheckerConfig(&id, 0, checker, &target, StateUnknown, 0, 3*time.Second, 2*time.Second, 3) result := checker.Check(target, config.Timeout) fmt.Printf("[ HTTP ] %s ==> %v\n", target, result) @@ -68,14 +68,14 @@ func TestHttpChecker(t *testing.T) { checker := NewHttpChecker("", "", "", 1) checker.Host = target.Addr() id := Id(target.String()) - config := NewCheckerConfig(&id, checker, &target, StateUnknown, + config := NewCheckerConfig(&id, 0, checker, &target, StateUnknown, 0, 3*time.Second, 2*time.Second, 3) result := checker.Check(target, config.Timeout) fmt.Printf("[ HTTP(PPv1) ] %s ==> %v\n", target, result) checker2 := NewHttpChecker("", "", "", 2) checker2.Host = target.Addr() id2 := Id(target.String()) - config2 := NewCheckerConfig(&id2, checker2, &target, StateUnknown, + config2 := NewCheckerConfig(&id2, 0, checker2, &target, StateUnknown, 0, 3*time.Second, 2*time.Second, 3) result2 := checker2.Check(target, config2.Timeout) fmt.Printf("[ HTTP(PPv2) ] %s ==> %v\n", target, result2) @@ -90,7 +90,7 @@ func TestHttpChecker(t *testing.T) { checker.Secure = true } id := Id(host) - config := NewCheckerConfig(&id, checker, &Target{}, StateUnknown, + config := NewCheckerConfig(&id, 0, checker, &Target{}, StateUnknown, 0, 3*time.Second, 2*time.Second, 3) result := checker.Check(Target{}, config.Timeout) if result.Success == false { diff --git a/tools/healthcheck/pkg/helthcheck/ping_checker.go b/tools/healthcheck/pkg/helthcheck/ping_checker.go index dae8eda92..a1401f66d 100644 --- a/tools/healthcheck/pkg/helthcheck/ping_checker.go +++ b/tools/healthcheck/pkg/helthcheck/ping_checker.go @@ -21,6 +21,7 @@ package hc import ( "bytes" + "encoding/binary" "fmt" "math/rand" "net" @@ -113,8 +114,8 @@ func newICMPv4EchoRequest(id, seqnum, msglen uint16, filler []byte) icmpMsg { cs := icmpChecksum(msg) // place checksum back in header; using ^= avoids the assumption that the // checksum bytes are zero - msg[2] ^= uint8(cs & 0xff) - msg[3] ^= uint8(cs >> 8) + cs ^= binary.BigEndian.Uint16(msg[2:4]) + binary.BigEndian.PutUint16(msg[2:4], cs) return msg } @@ -122,13 +123,13 @@ func icmpChecksum(msg icmpMsg) uint16 { cklen := len(msg) s := uint32(0) for i := 0; i < cklen-1; i += 2 { - s += uint32(msg[i+1])<<8 | uint32(msg[i]) + s += uint32(binary.BigEndian.Uint16(msg[i : i+2])) } if cklen&1 == 1 { - s += uint32(msg[cklen-1]) + s += uint32(msg[cklen-1]) << 8 } s = (s >> 16) + (s & 0xffff) - s = s + (s >> 16) + s += (s >> 16) return uint16(^s) } @@ -175,10 +176,13 @@ func exchangeICMPEcho(network string, ip net.IP, timeout time.Duration, echo icm c.SetDeadline(time.Now().Add(timeout)) reply := make([]byte, 256) for { - _, addr, err := c.ReadFrom(reply) + n, addr, err := c.ReadFrom(reply) if err != nil { return err } + if n < 0 || n > len(reply) { + return fmt.Errorf("Unexpect ICMP reply len %d", n) + } if !ip.Equal(net.ParseIP(addr.String())) { continue } @@ -191,9 +195,9 @@ func exchangeICMPEcho(network string, ip net.IP, timeout time.Duration, echo icm continue } if reply[0] == ICMP4_ECHO_REPLY { - cs := icmpChecksum(reply) + cs := icmpChecksum(reply[:n]) if cs != 0 { - return fmt.Errorf("Bad ICMP checksum: %x", rchksum) + return fmt.Errorf("Bad ICMP checksum: %x, len: %d, data: %v", rchksum, n, reply[:n]) } } // TODO(angusc): Validate checksum for IPv6 diff --git a/tools/healthcheck/pkg/helthcheck/ping_checker_test.go b/tools/healthcheck/pkg/helthcheck/ping_checker_test.go index 7b86e665a..b462c1dd7 100644 --- a/tools/healthcheck/pkg/helthcheck/ping_checker_test.go +++ b/tools/healthcheck/pkg/helthcheck/ping_checker_test.go @@ -37,7 +37,7 @@ func TestPingChecker(t *testing.T) { for _, target := range ping_targets { checker := NewPingChecker() id := Id(target.IP.String()) - config := NewCheckerConfig(&id, checker, + config := NewCheckerConfig(&id, 0, checker, &target, StateUnknown, 0, 3*time.Second, 1*time.Second, 3) result := checker.Check(target, config.Timeout) diff --git a/tools/healthcheck/pkg/helthcheck/server.go b/tools/healthcheck/pkg/helthcheck/server.go index d7af16f08..edcbb6561 100644 --- a/tools/healthcheck/pkg/helthcheck/server.go +++ b/tools/healthcheck/pkg/helthcheck/server.go @@ -35,6 +35,7 @@ type Server struct { healthchecks map[Id]*Checker configs chan map[Id]*CheckerConfig notify chan *Notification + resync chan *CheckerConfig quit chan bool } @@ -63,6 +64,7 @@ func NewServer(cfg *ServerConfig) *Server { healthchecks: make(map[Id]*Checker), notify: make(chan *Notification, cfg.NotifyChannelSize), configs: make(chan map[Id]*CheckerConfig), + resync: make(chan *CheckerConfig, cfg.NotifyChannelSize), quit: make(chan bool, 1), } @@ -78,8 +80,10 @@ func (s *Server) NewChecker(typ lb.Checker, proto utils.IPProto) CheckMethod { checker = NewUDPChecker("", "", 0) case lb.CheckerPING: checker = NewPingChecker() - case lb.CheckerUDPPing: + case lb.CheckerUDPPING: checker = NewUDPPingChecker("", "", 0) + case lb.CheckerHTTP: + checker = NewHttpChecker("", "", "", 0) case lb.CheckerNone: if s.config.LbAutoMethod { switch proto { @@ -94,12 +98,12 @@ func (s *Server) NewChecker(typ lb.Checker, proto utils.IPProto) CheckMethod { } // getHealthchecks attempts to get the current healthcheck configurations from DPVS -func (s *Server) getHealthchecks() (*Checkers, error) { +func (s *Server) getHealthchecks() (*CheckerConfigs, error) { vss, err := s.comm.ListVirtualServices() if err != nil { return nil, err } - results := &Checkers{Configs: make(map[Id]*CheckerConfig)} + results := &CheckerConfigs{Configs: make(map[Id]*CheckerConfig)} for _, vs := range vss { for _, rs := range vs.RSs { target := &Target{rs.IP, rs.Port, vs.Protocol} @@ -111,13 +115,16 @@ func (s *Server) getHealthchecks() (*Checkers, error) { } weight := rs.Weight state := StateUnknown - if weight > 0 && rs.Inhibited == false { - state = StateHealthy - } else if weight == 0 && rs.Inhibited == true { + // Backend can be down adminstratively, so its weight + // should not be considered for health state. + if rs.Inhibited { state = StateUnhealthy + } else { + state = StateHealthy } // TODO: allow users to specify check interval, timeout and retry - config := NewCheckerConfig(id, checker, + config := NewCheckerConfig(id, + vs.Version, checker, target, state, weight, DefaultCheckConfig.Interval, DefaultCheckConfig.Timeout, @@ -151,17 +158,17 @@ func (s *Server) updater() { // notifier batches healthcheck notifications and sends them to DPVS. func (s *Server) notifier() { - // TODO: support more concurrency and rate limit + // TODO: support a lot more concurrences and rate limit for { select { case notification := <-s.notify: log.Infof("Sending notification >>> %v", notification) - //fmt.Println("Sending notification >>>", notification) inhibited := false if notification.Status.State == StateUnhealthy { inhibited = true } vs := &lb.VirtualService{ + Version: notification.Status.Version, Id: notification.Id.Vs(), Protocol: notification.Target.Proto, RSs: []lb.RealServer{{ @@ -172,9 +179,35 @@ func (s *Server) notifier() { }}, } - if err := s.comm.UpdateByChecker([]lb.VirtualService{*vs}); err != nil { + if changed, err := s.comm.UpdateByChecker(vs); err != nil { log.Warningf("Failed to Update %v healthy status to %v(weight: %d): %v", notification.Id, notification.State, notification.Status.Weight, err) + } else if changed != nil { + for _, rs := range changed.RSs { + version := changed.Version + id := notification.Id + target := &Target{rs.IP, rs.Port, vs.Protocol} + if !target.Equal(id.Rs()) { + continue + } + weight := rs.Weight + state := StateUnknown + if rs.Inhibited { + state = StateUnhealthy + } else { + state = StateHealthy + } + log.Warningf("%v::%s has changed, resync config %v ...", + notification.Id, notification.Target, rs) + config := NewCheckerConfig(&id, version, nil, target, state, weight, 0, 0, 0) + s.resync <- config + break + } + } else { + // resync checker config to stop repeated notificaitons + config := NewCheckerConfig(¬ification.Id, notification.Version, nil, + ¬ification.Target, notification.State, notification.Weight, 0, 0, 0) + s.resync <- config } } } @@ -186,10 +219,9 @@ func (s *Server) notifier() { // the current configurations to each of the running healthchecks. func (s *Server) manager() { notifyTicker := time.NewTicker(s.config.NotifyInterval) - var configs map[Id]*CheckerConfig for { select { - case configs = <-s.configs: + case configs := <-s.configs: // Remove healthchecks that have been deleted. for id, hc := range s.healthchecks { @@ -205,8 +237,8 @@ func (s *Server) manager() { hc := NewChecker(s.notify, conf.State, conf.Weight) hc.SetDryrun(s.config.DryRun) s.healthchecks[id] = hc - checkTicker := time.NewTicker(time.Duration((1 + rand.Intn( - int(DefaultCheckConfig.Interval.Milliseconds())))) * time.Millisecond) + checkTicker := time.NewTicker(time.Duration(1+rand.Intn(int( + DefaultCheckConfig.Interval.Milliseconds()))) * time.Millisecond) go hc.Run(checkTicker.C) } } @@ -215,14 +247,13 @@ func (s *Server) manager() { for id, hc := range s.healthchecks { hc.Update(configs[id]) } - case <-notifyTicker.C: log.Infof("Total checkers: %d", len(s.healthchecks)) - // Send notifications when status changed. - for id, hc := range s.healthchecks { + // Send notifications periodically when status in checker doesn't match config. + // It should get here only when the notification had failed. + for _, hc := range s.healthchecks { notification := hc.Notification() - if configs[id].State != notification.State { - // FIXME: Don't resend the notification after a successful one. + if hc.State != notification.State { hc.notify <- notification } } @@ -230,12 +261,25 @@ func (s *Server) manager() { } } +func (s *Server) resyncer() { + for { + select { + case conf := <-s.resync: + hc := s.healthchecks[conf.Id] + if hc != nil { + hc.Update(conf) + } + } + } +} + // Run runs a healthcheck server. func (s *Server) Run() { log.Infof("Starting healthcheck server (%v) ...", s.config) go s.updater() go s.notifier() go s.manager() + go s.resyncer() <-s.quit } diff --git a/tools/healthcheck/pkg/helthcheck/types.go b/tools/healthcheck/pkg/helthcheck/types.go index 560ebc84e..a45578836 100644 --- a/tools/healthcheck/pkg/helthcheck/types.go +++ b/tools/healthcheck/pkg/helthcheck/types.go @@ -56,29 +56,6 @@ func (id Id) Rs() *Target { return NewTargetFromStr(strId[idx+1:]) } -// MethodType is the type of check method supported for now. -type MethodType int - -const ( - MethodTypeNone MethodType = iota - MethodTypeTCP - MethodTypeUDP - MethodTypePING -) - -// String returns the name for the given MethodType. -func (h MethodType) String() string { - switch h { - case MethodTypeTCP: - return "TCP" - case MethodTypeUDP: - return "UDP" - case MethodTypePING: - return "PING" - } - return "(unknown)" -} - // CheckMethod is the interface that must be implemented by a healthcheck. type CheckMethod interface { Check(target Target, timeout time.Duration) *Result @@ -123,11 +100,11 @@ func NewTargetFromStr(str string) *Target { if idx1 < 0 || idx2 < 0 || idx1 >= idx2 { return nil } - port, err := strconv.ParseUint(str[idx2:], 10, 16) + port, err := strconv.ParseUint(str[idx2+1:], 10, 16) if err != nil { return nil } - proto := utils.IPProtoFromStr(str[idx1:idx2]) + proto := utils.IPProtoFromStr(str[idx1+1 : idx2]) if proto == 0 { return nil } @@ -146,6 +123,16 @@ func (t Target) String() string { return fmt.Sprintf("[%v]:%v:%d", t.IP, t.Proto, t.Port) } +func (t *Target) Equal(t2 *Target) bool { + if t2 == nil { + return false + } + if t.Port != t2.Port || t.Proto != t2.Proto { + return false + } + return t.IP.Equal(t2.IP) +} + // Addr returns the IP:Port representation of a healthcheck target func (t Target) Addr() string { if t.IP.To4() != nil { @@ -201,6 +188,7 @@ func NewResult(start time.Time, msg string, success bool, err error) *Result { // Status represents the current status of a healthcheck instance. type Status struct { + Version uint64 // the vs version LastCheck time.Time Duration time.Duration Failures uint64 @@ -219,7 +207,7 @@ type Notification struct { // String returns the string representation for the given notification. func (n *Notification) String() string { - return fmt.Sprintf("ID %v, %v, Weight %d, Fail %v, Success %v, Last check %s in %v", n.Id, - stateNames[n.Status.State], n.Status.Weight, n.Status.Failures, n.Status.Successes, - n.Status.LastCheck.Format("2006-01-02 15:04:05.000"), n.Status.Duration) + return fmt.Sprintf("ID %v, Version %d, %v, Weight %d, Fail %v, Success %v, Last check %s in %v", + n.Id, n.Version, stateNames[n.Status.State], n.Status.Weight, n.Status.Failures, + n.Status.Successes, n.Status.LastCheck.Format("2006-01-02 15:04:05.000"), n.Status.Duration) } diff --git a/tools/healthcheck/pkg/helthcheck/udp_ping_checker_test.go b/tools/healthcheck/pkg/helthcheck/udp_ping_checker_test.go index 321035336..a93bb356f 100644 --- a/tools/healthcheck/pkg/helthcheck/udp_ping_checker_test.go +++ b/tools/healthcheck/pkg/helthcheck/udp_ping_checker_test.go @@ -39,7 +39,7 @@ func TestUDPPingChecker(t *testing.T) { for _, target := range udpping_targets { checker := NewUDPPingChecker("", "", 0) id := Id(target.String()) - config := NewCheckerConfig(&id, checker, + config := NewCheckerConfig(&id, 0, checker, &target, StateUnknown, 0, 3*time.Second, 2*time.Second, 3) result := checker.Check(target, config.Timeout) diff --git a/tools/healthcheck/pkg/lb/dpvs_agent.go b/tools/healthcheck/pkg/lb/dpvs_agent.go index 29d403430..6634b14be 100644 --- a/tools/healthcheck/pkg/lb/dpvs_agent.go +++ b/tools/healthcheck/pkg/lb/dpvs_agent.go @@ -21,6 +21,7 @@ import ( "io" "net" "net/http" + "strconv" "strings" "time" @@ -32,7 +33,7 @@ var _ Comm = (*DpvsAgentComm)(nil) var ( serverDefault = "localhost:53225" listUri = LbApi{"/v2/vs", http.MethodGet} - noticeUri = LbApi{"/v2/vs/%s/rs?healthcheck=true", http.MethodPut} + noticeUri = LbApi{"/v2/vs/%s/rs/health?version=%d", http.MethodPut} client *http.Client = &http.Client{Timeout: httpClientTimeout} ) @@ -53,7 +54,7 @@ type DpvsAgentRs struct { IP string `json:"ip"` Port uint16 `json:"port"` Weight uint16 `json:"weight"` - Inhibited bool `json:"inhibited,omitempty"` + Inhibited *bool `json:"inhibited,omitempty"` } type DpvsAgentRsItem struct { @@ -68,7 +69,9 @@ type DpvsAgentRsListPut struct { Items []DpvsAgentRs } +// refer to `tools/dpvs-agent/models/virtual_server_spec_expand.go: VirtualServerSpecExpand` type DpvsAgentVs struct { + Version string Addr string Port uint16 Proto uint16 @@ -86,6 +89,10 @@ func (avs *DpvsAgentVs) serviceId() string { } func (avs *DpvsAgentVs) toVs() (*VirtualService, error) { + version, err := strconv.ParseUint(avs.Version, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid Vs Version %q", avs.Version) + } vip := net.ParseIP(avs.Addr) if vip == nil { return nil, fmt.Errorf("invalid Vs Addr %q", avs.Addr) @@ -105,9 +112,14 @@ func (avs *DpvsAgentVs) toVs() (*VirtualService, error) { checker = CheckerUDP case "ping": checker = CheckerPING + case "udpping": + checker = CheckerUDPPING + case "http": + checker = CheckerHTTP } } vs := &VirtualService{ + Version: version, Checker: checker, IP: vip, Port: vport, @@ -115,19 +127,10 @@ func (avs *DpvsAgentVs) toVs() (*VirtualService, error) { RSs: make([]RealServer, len(avs.Rss.Items)), } vs.Id = avs.serviceId() - - for i, ars := range avs.Rss.Items { - rip := net.ParseIP(ars.Spec.IP) - if rip == nil { - return nil, fmt.Errorf("%s: invalid Rs IP %q", vs.Id, ars.Spec.IP) - } - rs := &RealServer{ - IP: rip, - Port: ars.Spec.Port, - Weight: ars.Spec.Weight, - Inhibited: ars.Spec.Inhibited, - } - vs.RSs[i] = *rs + if rss, err := avs.Rss.toRsList(); err != nil { + return nil, fmt.Errorf("%s: %v", vs.Id, err) + } else { + vs.RSs = rss } return vs, nil } @@ -147,6 +150,26 @@ func (avslist *DpvsAgentVsList) toVsList() ([]VirtualService, error) { return vslist, nil } +func (arsl *DpvsAgentRsList) toRsList() ([]RealServer, error) { + rss := make([]RealServer, len(arsl.Items)) + for i, ars := range arsl.Items { + rip := net.ParseIP(ars.Spec.IP) + if rip == nil { + return nil, fmt.Errorf("invalid RS IP %q", ars.Spec.IP) + } + rs := &RealServer{ + IP: rip, + Port: ars.Spec.Port, + Weight: ars.Spec.Weight, + } + if ars.Spec.Inhibited != nil { + rs.Inhibited = *ars.Spec.Inhibited + } + rss[i] = *rs + } + return rss, nil +} + func NewDpvsAgentComm(server string) *DpvsAgentComm { if len(server) == 0 { server = serverDefault @@ -189,40 +212,49 @@ func (comm *DpvsAgentComm) ListVirtualServices() ([]VirtualService, error) { return vslist, nil } -func (comm *DpvsAgentComm) UpdateByChecker(targets []VirtualService) error { - // TODO: support batch operation - for _, vs := range targets { - for _, rs := range vs.RSs { - ars := &DpvsAgentRsListPut{ - Items: []DpvsAgentRs{ - { - IP: rs.IP.String(), - Port: rs.Port, - Weight: rs.Weight, - Inhibited: rs.Inhibited, - }, +func (comm *DpvsAgentComm) UpdateByChecker(vs *VirtualService) (*VirtualService, error) { + for _, rs := range vs.RSs { + ars := &DpvsAgentRsListPut{ + Items: []DpvsAgentRs{ + { + IP: rs.IP.String(), + Port: rs.Port, + Weight: rs.Weight, + Inhibited: &rs.Inhibited, }, - } - data, err := json.Marshal(ars) + }, + } + data, err := json.Marshal(ars) + if err != nil { + return nil, err + } + for _, notice := range comm.noticeApis { + url := fmt.Sprintf(notice.Url, vs.Id, vs.Version) + req, err := http.NewRequest(notice.HttpMethod, url, bytes.NewBuffer(data)) + req.Header.Set("Content-Type", "application/json") + resp, err := client.Do(req) if err != nil { - return err + return nil, err } - for _, notice := range comm.noticeApis { - url := fmt.Sprintf(notice.Url, vs.Id) - req, err := http.NewRequest(notice.HttpMethod, url, bytes.NewBuffer(data)) - req.Header.Set("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - return err + //fmt.Printf("Request: %v, Code: %v\n", req, resp.Status) + if resp.StatusCode != 200 { + if data, err = io.ReadAll(resp.Body); err != nil { + return nil, fmt.Errorf("CODE: %v", resp.StatusCode) + } + var vs DpvsAgentVs + if err = json.Unmarshal(data, &vs); err != nil { + return nil, fmt.Errorf("CODE: %v, ERROR: %s", resp.StatusCode, + strings.TrimSpace(string(data))) } - //fmt.Println("Code:", resp.Status) - if resp.StatusCode != 200 { - data, _ = io.ReadAll(resp.Body) - return fmt.Errorf("CODE: %v, ERROR: %s", resp.StatusCode, strings.TrimSpace(string(data))) + ret, err := vs.toVs() + if err != nil { + //fmt.Println("Data:", data, "len(RSs): ", len(rss.Items), "RSs:", rss) + return nil, fmt.Errorf("CODE: %v, Error: %v", resp.StatusCode, err) } - resp.Body.Close() + return ret, nil } + resp.Body.Close() } } - return nil + return nil, nil } diff --git a/tools/healthcheck/pkg/lb/dpvs_agent_test.go b/tools/healthcheck/pkg/lb/dpvs_agent_test.go index 2d8e00122..1379b9483 100644 --- a/tools/healthcheck/pkg/lb/dpvs_agent_test.go +++ b/tools/healthcheck/pkg/lb/dpvs_agent_test.go @@ -26,24 +26,22 @@ func TestListAndUpdate(t *testing.T) { t.Errorf("list error: %v", err) } t.Logf("list Results: %v", vss) - if len(vss) < 2 { + if len(vss) < 1 { return } - t.Logf("Updating %v", vss[1]) - vss[1].RSs[0].Weight = 0 - vss[1].RSs[0].Inhibited = true - //vss[1].RSs[0].Port = 8081 - //vss[1].RSs[1].Weight = 100 - //vss[1].RSs[1].Inhibited = false - //vss[1].RSs[1].IP = net.ParseIP("1.2.3.4") - if err = comm.UpdateByChecker(vss[1:2]); err != nil { + t.Logf("Updating %v", vss[0]) + vss[0].RSs[0].Weight = 0 + vss[0].RSs[0].Inhibited = true + //vss[0].RSs[0].Port = 8081 + //vss[0].RSs[0].IP = net.ParseIP("1.2.3.4") + if _, err = comm.UpdateByChecker(&vss[0]); err != nil { t.Errorf("inhibit rs error: %v", err) } time.Sleep(3 * time.Second) - t.Logf("Restoring %v", vss[1]) - vss[1].RSs[0].Weight = 100 - vss[1].RSs[0].Inhibited = false - if err = comm.UpdateByChecker(vss[1:2]); err != nil { + t.Logf("Restoring %v", vss[0]) + vss[0].RSs[0].Weight = 100 + vss[0].RSs[0].Inhibited = false + if _, err = comm.UpdateByChecker(&vss[0]); err != nil { t.Errorf("restore rs error: %v", err) } } diff --git a/tools/healthcheck/pkg/lb/types.go b/tools/healthcheck/pkg/lb/types.go index 092f4fb22..1becf5e89 100644 --- a/tools/healthcheck/pkg/lb/types.go +++ b/tools/healthcheck/pkg/lb/types.go @@ -27,7 +27,8 @@ const ( CheckerTCP CheckerUDP CheckerPING - CheckerUDPPing + CheckerUDPPING + CheckerHTTP ) type RealServer struct { @@ -39,6 +40,7 @@ type RealServer struct { type VirtualService struct { Id string + Version uint64 Checker Checker Protocol utils.IPProto Port uint16 @@ -47,8 +49,11 @@ type VirtualService struct { } type Comm interface { + // Get the list of VS/RS prepared for healthcheck. ListVirtualServices() ([]VirtualService, error) - UpdateByChecker(targets []VirtualService) error + // Update RSs health state, return nil error and the lastest info of RSs whose + // weight have been changed administively on success, or error on failure. + UpdateByChecker(targets *VirtualService) (*VirtualService, error) } func (checker Checker) String() string { @@ -61,6 +66,10 @@ func (checker Checker) String() string { return "checker_udp" case CheckerPING: return "checker_ping" + case CheckerUDPPING: + return "checker_udpping" + case CheckerHTTP: + return "checker_http" } return "checker_unknown" } diff --git a/tools/healthcheck/test/stress-test.sh b/tools/healthcheck/test/stress-test.sh index 6ea848f85..e72f59153 100755 --- a/tools/healthcheck/test/stress-test.sh +++ b/tools/healthcheck/test/stress-test.sh @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +#dpvs_agent_server=localhost:53225 ## Step 1. echo -e "Cleaning existing services ..." @@ -23,40 +24,44 @@ now=$(date +%F.%T) echo -e "[$now] Start" ## Step 2. -echo -e "Adding test services ..." +now=$(date +%F.%T) +echo -e "[$now] Adding test services ..." rsid=5000 for i in $(seq 0 32) do for j in $(seq 1 255) do vip="192.168.${i}.${j}" - flag="-t" - #udp=$((j%2)) - #[ "$udp" -eq 1 ] && flag="-u" - #echo $vip $flag - ipvsadm -A $flag $vip:80 + #echo $vip:80 + ipvsadm -At $vip:80 + #curl -sS -X PUT "http://${dpvs_agent_server}/v2/vs/${vip}-80-tcp" -H "Content-type:application/json" -d "{\"SchedName\":\"wrr\"}" >/dev/null + ipvsadm -Pt $vip:80 -z 192.168.88.241 -F dpdk0 >/dev/null 2>&1 + #curl -X PUT "http://${dpvs_agent_server}/v2/vs/${vip}-80-tcp/laddr" -H "Content-type:application/json" -d "{\"device\":\"dpdk0\", \"addr\":\"192.168.88.241\"}" for k in $(seq 5) do seg3=$((rsid/255)) seg4=$((rsid%255)) rsid=$((rsid+1)) rip="192.168.${seg3}.${seg4}" - #echo "-> $rip" - ipvsadm -a $flag $vip:80 -r $rip:8080 -b -w 100 + #echo "-> $rip:8080" + ipvsadm -at $vip:80 -r $rip:8080 -b -w 100 + #curl -sS -X PUT "http://${dpvs_agent_server}/v2/vs/${vip}-80-tcp/rs" -H "Content-type:application/json" -d "{\"Items\":[{\"ip\":\"${rip}\", \"port\":80, \"weight\":100}]}" > /dev/null done #dpip addr add $vip/32 dev dpdk0 done done ## Step 3. +now=$(date +%F.%T) echo "" echo "****************************************" -echo -e "Start healthcheck program on your own." +echo -e "[$now] Start healthcheck program on your own." echo "****************************************" echo "" ## Step 4. -echo -e "Do Checking ..." +now=$(date +%F.%T) +echo -e "[$now] Do Checking ..." while true do now=$(date +%F.%T)