diff --git a/cni/plugins/ipam/multi-nic-ipam/ipam.go b/cni/plugins/ipam/multi-nic-ipam/ipam.go index 7a923ecf..029716a1 100644 --- a/cni/plugins/ipam/multi-nic-ipam/ipam.go +++ b/cni/plugins/ipam/multi-nic-ipam/ipam.go @@ -10,6 +10,7 @@ import ( "fmt" "io/ioutil" "net/http" + "time" "bytes" "errors" @@ -58,23 +59,27 @@ func RequestIP(daemonIP string, daemonPort int, podName string, podNamespace str jsonReq, err := json.Marshal(request) if err != nil { - return response, errors.New(fmt.Sprintf("Marshal fail: %v", err)) + return response, fmt.Errorf("marshal fail: %v", err) } else { - res, err := http.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) + client := http.Client{ + Timeout: 2 * time.Minute, + } + defer client.CloseIdleConnections() + res, err := client.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) if err != nil { - return response, errors.New(fmt.Sprintf("Post fail: %v", err)) + return response, fmt.Errorf("post fail: %v", err) } + defer res.Body.Close() if res.StatusCode != http.StatusOK { return response, errors.New(res.Status) } - body, err := ioutil.ReadAll(res.Body) if err != nil { - return response, errors.New(fmt.Sprintf("Read body: %v", err)) + return response, fmt.Errorf("read body: %v", err) } err = json.Unmarshal(body, &response) if err == nil && len(response) == 0 { - return response, fmt.Errorf("Response nothing") + return response, fmt.Errorf("response nothing") } return response, err } @@ -97,23 +102,28 @@ func Deallocate(daemonPort int, podName string, podNamespace string, hostName st jsonReq, err := json.Marshal(request) if err != nil { - return response, errors.New(fmt.Sprintf("Marshal fail: %v", err)) + return response, fmt.Errorf("marshal fail: %v", err) } else { - res, err := http.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) + client := http.Client{ + Timeout: 2 * time.Minute, + } + defer client.CloseIdleConnections() + res, err := client.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) if err != nil { - return response, errors.New(fmt.Sprintf("Post fail: %v", err)) + return response, fmt.Errorf("post fail: %v", err) } + defer res.Body.Close() if res.StatusCode != http.StatusOK { return response, errors.New(res.Status) } body, err := ioutil.ReadAll(res.Body) if err != nil { - return response, errors.New(fmt.Sprintf("Read body: %v", err)) + return response, fmt.Errorf("read body: %v", err) } err = json.Unmarshal(body, &response) if err == nil && len(response) == 0 { - return response, fmt.Errorf("Response nothing") + return response, fmt.Errorf("response nothing") } return response, err } diff --git a/cni/plugins/main/multi-nic/selector.go b/cni/plugins/main/multi-nic/selector.go index fa4db265..f4679700 100644 --- a/cni/plugins/main/multi-nic/selector.go +++ b/cni/plugins/main/multi-nic/selector.go @@ -10,6 +10,7 @@ import ( "fmt" "io/ioutil" "net/http" + "time" "bytes" "errors" @@ -56,23 +57,28 @@ func selectNICs(daemonIP string, daemonPort int, podName string, podNamespace st jsonReq, err := json.Marshal(request) if err != nil { - return response, errors.New(fmt.Sprintf("Marshal fail: %v", err)) + return response, fmt.Errorf("marshal fail: %v", err) } else { - res, err := http.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) + client := http.Client{ + Timeout: 5 * time.Minute, + } + defer client.CloseIdleConnections() + res, err := client.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) if err != nil { - return response, errors.New(fmt.Sprintf("Post fail: %v", err)) + return response, fmt.Errorf("post fail: %v", err) } + defer res.Body.Close() if res.StatusCode != http.StatusOK { return response, errors.New(res.Status) } body, err := ioutil.ReadAll(res.Body) if err != nil { - return response, errors.New(fmt.Sprintf("Read body: %v", err)) + return response, fmt.Errorf("read body: %v", err) } err = json.Unmarshal(body, &response) if err == nil && len(response.Masters) == 0 { - return response, fmt.Errorf("Response nothing") + return response, fmt.Errorf("response nothing") } return response, err } diff --git a/controllers/cidr_handler.go b/controllers/cidr_handler.go index bfd728ec..eaa227bd 100644 --- a/controllers/cidr_handler.go +++ b/controllers/cidr_handler.go @@ -201,7 +201,9 @@ func (h *CIDRHandler) DeleteCIDR(cidr multinicv1.CIDR) error { } instance, err := h.GetCIDR(name) if err == nil { - err = h.Client.Delete(context.Background(), instance) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err = h.Client.Delete(ctx, instance) } if err != nil { errorMsg = errorMsg + fmt.Sprintf("%v,", err) @@ -525,14 +527,18 @@ func (h *CIDRHandler) updateCIDR(cidrSpec multinicv1.CIDRSpec, new bool) (bool, if err == nil { updatedCIDR := existCIDR.DeepCopy() updatedCIDR.Spec = spec - err = h.Client.Update(context.TODO(), updatedCIDR) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err = h.Client.Update(ctx, updatedCIDR) if err == nil { h.SafeCache.SetCache(def.Name, updatedCIDR.Spec) } h.CleanPendingIPPools(ippoolSnapshot, def.Name, updatedCIDR.Spec) } else { if new { - err = h.Client.Create(context.TODO(), mapObj) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err = h.Client.Create(ctx, mapObj) if err == nil { h.SafeCache.SetCache(def.Name, mapObj.Spec) } diff --git a/controllers/daemon_connector.go b/controllers/daemon_connector.go index 6ac77fd3..2f67f536 100644 --- a/controllers/daemon_connector.go +++ b/controllers/daemon_connector.go @@ -10,7 +10,6 @@ import ( "fmt" "io/ioutil" "net/http" - "time" "bytes" "errors" @@ -73,11 +72,13 @@ func (dc DaemonConnector) GetInterfaces(podAddress string) ([]multinicv1.Interfa var interfaces []multinicv1.InterfaceInfoType address := podAddress + INTERFACE_PATH // try connect and get interface from daemon pod - res, err := http.Get(address) + client := http.Client{} + defer client.CloseIdleConnections() + res, err := client.Get(address) if err != nil { return []multinicv1.InterfaceInfoType{}, err } - + defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) if err != nil { return []multinicv1.InterfaceInfoType{}, err @@ -100,12 +101,13 @@ func (dc DaemonConnector) Join(podAddress string, hifs []multinicv1.InterfaceInf return err } else { client := http.Client{ - Timeout: 5 * time.Second, + Timeout: vars.ContextTimeout, } res, err := client.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) if err != nil { return err } + defer res.Body.Close() if res.StatusCode != http.StatusOK { return errors.New(res.Status) } @@ -145,7 +147,15 @@ func (dc DaemonConnector) putRouteRequest(podAddress string, path string, cidrNa if err != nil { return response, err } else { - res, err := http.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) + client := http.Client{ + Timeout: vars.ContextTimeout, + } + defer client.CloseIdleConnections() + res, err := client.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) + if err != nil { + return response, fmt.Errorf("post fail: %v", err) + } + defer res.Body.Close() if err != nil { response.Message = vars.ConnectionRefusedError return response, err diff --git a/controllers/hostinterface_handler.go b/controllers/hostinterface_handler.go index 5b692110..616acc84 100644 --- a/controllers/hostinterface_handler.go +++ b/controllers/hostinterface_handler.go @@ -61,7 +61,9 @@ func (h *HostInterfaceHandler) initHostInterface(hostName string, interfaces []m // CreateHostInterface creates new HostInterface from an interface list get from daemon pods func (h *HostInterfaceHandler) CreateHostInterface(hostName string, interfaces []multinicv1.InterfaceInfoType) error { newHif := h.initHostInterface(hostName, interfaces) - return h.Client.Create(context.TODO(), newHif) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + return h.Client.Create(ctx, newHif) } // UpdateHostInterface updates HostInterface @@ -73,7 +75,9 @@ func (h *HostInterfaceHandler) UpdateHostInterface(oldObj multinicv1.HostInterfa Interfaces: interfaces, }, } - return updateHif, h.Client.Update(context.TODO(), updateHif) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + return updateHif, h.Client.Update(ctx, updateHif) } // GetHostInterface gets HostInterface from hostname @@ -83,7 +87,9 @@ func (h *HostInterfaceHandler) GetHostInterface(name string) (*multinicv1.HostIn Name: name, Namespace: metav1.NamespaceAll, } - err := h.Client.Get(context.TODO(), namespacedName, instance) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err := h.Client.Get(ctx, namespacedName, instance) return instance, err } @@ -107,7 +113,9 @@ func (h *HostInterfaceHandler) ListHostInterface() (map[string]multinicv1.HostIn func (h *HostInterfaceHandler) DeleteHostInterface(name string) error { instance, err := h.GetHostInterface(name) if err == nil { - err = h.Client.Delete(context.TODO(), instance) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err = h.Client.Delete(ctx, instance) } return err } diff --git a/controllers/ippool_handler.go b/controllers/ippool_handler.go index a0937115..ffcc0229 100644 --- a/controllers/ippool_handler.go +++ b/controllers/ippool_handler.go @@ -69,7 +69,9 @@ func (h *IPPoolHandler) DeleteIPPool(netAttachDef string, podCIDR string) error name := h.GetIPPoolName(netAttachDef, podCIDR) instance, err := h.GetIPPool(name) if err == nil { - err = h.Client.Delete(context.TODO(), instance) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err = h.Client.Delete(ctx, instance) } return err } @@ -132,7 +134,9 @@ func (h *IPPoolHandler) UpdateIPPool(netAttachDef string, podCIDR string, vlanCI ippool.Spec = spec ippool.Spec.Allocations = prevSpec.Allocations ippool.ObjectMeta.Labels = labels - err = h.Client.Update(context.TODO(), ippool) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err = h.Client.Update(ctx, ippool) if !reflect.DeepEqual(prevSpec.Excludes, excludesInterface) { // report if allocated ip addresses have conflicts with the new IPPool (for example, in exclude list) invalidAllocations := h.checkPoolValidity(excludesInterface, prevSpec.Allocations) @@ -154,7 +158,9 @@ func (h *IPPoolHandler) UpdateIPPool(netAttachDef string, podCIDR string, vlanCI }, Spec: spec, } - err = h.Client.Create(context.Background(), newIPPool) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err = h.Client.Create(ctx, newIPPool) vars.IPPoolLog.V(5).Info(fmt.Sprintf("New IPPool %s: %v, %v", ippoolName, newIPPool, err)) } return err @@ -192,7 +198,9 @@ func (h *IPPoolHandler) PatchIPPoolAllocations(ippoolName string, newAllocations } patch := client.MergeFrom(ippool.DeepCopy()) ippool.Spec.Allocations = newAllocations - return h.Client.Patch(context.Background(), ippool, patch) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + return h.Client.Patch(ctx, ippool, patch) } func (h *IPPoolHandler) UpdateIPPools(defName string, entries []multinicv1.CIDREntry, excludes []compute.IPValue) { @@ -234,6 +242,8 @@ func (h *IPPoolHandler) AddLabel(ippool *multinicv1.IPPool) error { labels := map[string]string{vars.HostNameLabel: hostName, vars.DefNameLabel: netAttachDef} patch := client.MergeFrom(ippool.DeepCopy()) ippool.ObjectMeta.Labels = labels - err := h.Client.Patch(context.Background(), ippool, patch) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err := h.Client.Patch(ctx, ippool, patch) return err } diff --git a/controllers/multinicnetwork_controller.go b/controllers/multinicnetwork_controller.go index c671c6d2..d96402c4 100644 --- a/controllers/multinicnetwork_controller.go +++ b/controllers/multinicnetwork_controller.go @@ -184,14 +184,18 @@ func (r *MultiNicNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Requ vars.NetworkLog.V(3).Info(fmt.Sprintf("CIDR %s successfully applied", multinicnetworkName)) } } - } else if !instance.Spec.IsMultiNICIPAM && routeStatus == multinicv1.RouteNoApplied { + } else if !instance.Spec.IsMultiNICIPAM && (routeStatus == "" || routeStatus == multinicv1.RouteNoApplied) { // not related to L3 + instance.Status.Message = "" + instance.Status.RouteStatus = multinicv1.RouteNoApplied + vars.NetworkLog.V(3).Info(fmt.Sprintf("Update %s status (Non-MultiNICIPAM, RouteNoApplied)", multinicnetworkName)) err = r.CIDRHandler.MultiNicNetworkHandler.UpdateNetConfigStatus(instance, multinicv1.ConfigComplete, "") if err != nil { vars.NetworkLog.V(3).Info(fmt.Sprintf("Failed to UpdateNetConfigStatus %s for non-L3: %v", instance.Name, err)) } } else if routeStatus != multinicv1.AllRouteApplied { // some route still fails + vars.NetworkLog.V(3).Info(fmt.Sprintf("Update %s status (waiting for route configuration)", multinicnetworkName)) err = r.CIDRHandler.MultiNicNetworkHandler.UpdateNetConfigStatus(instance, multinicv1.WaitForConfig, "") if err != nil { vars.NetworkLog.V(3).Info(fmt.Sprintf("Failed to UpdateNetConfigStatus %s at route failure: %v", instance.Name, err)) @@ -234,12 +238,16 @@ func (r *MultiNicNetworkReconciler) GetIPAMConfig(instance *multinicv1.MultiNicN ipamConfig.MasterNetAddrs = instance.Spec.MasterNetAddrs return ipamConfig, nil } - return nil, fmt.Errorf("non-MultiNicIPAM") + vars.NetworkLog.V(3).Info("non-MultiNicIPAM") + return nil, nil } // HandleMultiNicIPAM handles ipam if target type func (r *MultiNicNetworkReconciler) HandleMultiNicIPAM(instance *multinicv1.MultiNicNetwork) error { ipamConfig, err := r.GetIPAMConfig(instance) + if ipamConfig == nil { + return err + } if err == nil { cidrName := instance.GetName() _, err := r.CIDRHandler.GetCIDR(cidrName) diff --git a/controllers/multinicnetwork_handler.go b/controllers/multinicnetwork_handler.go index ab08a680..1e17445b 100644 --- a/controllers/multinicnetwork_handler.go +++ b/controllers/multinicnetwork_handler.go @@ -42,7 +42,9 @@ func (h *MultiNicNetworkHandler) GetNetwork(name string) (*multinicv1.MultiNicNe Name: name, Namespace: metav1.NamespaceAll, } - err := h.Client.Get(context.TODO(), namespacedName, instance) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err := h.Client.Get(ctx, namespacedName, instance) return instance, err } @@ -111,8 +113,16 @@ func (h *MultiNicNetworkHandler) updateStatus(instance *multinicv1.MultiNicNetwo RouteStatus: status, } + if !NetStatusUpdated(instance, netStatus) { + vars.NetworkLog.V(2).Info(fmt.Sprintf("No status update %s", instance.Name)) + return netStatus, nil + } + + vars.NetworkLog.V(2).Info(fmt.Sprintf("Update %s status", instance.Name)) instance.Status = netStatus - err := h.Client.Status().Update(context.Background(), instance) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err := h.Client.Status().Update(ctx, instance) if err != nil { vars.NetworkLog.V(2).Info(fmt.Sprintf("Failed to update %s status: %v", instance.Name, err)) } else { @@ -121,6 +131,28 @@ func (h *MultiNicNetworkHandler) updateStatus(instance *multinicv1.MultiNicNetwo return netStatus, err } +func NetStatusUpdated(instance *multinicv1.MultiNicNetwork, newStatus multinicv1.MultiNicNetworkStatus) bool { + prevStatus := instance.Status + if prevStatus.Message != newStatus.Message || prevStatus.RouteStatus != newStatus.RouteStatus || prevStatus.NetConfigStatus != newStatus.NetConfigStatus || prevStatus.DiscoverStatus != newStatus.DiscoverStatus { + return true + } + if len(prevStatus.ComputeResults) != len(newStatus.ComputeResults) { + return true + } + prevComputeMap := make(map[string]int) + for _, status := range prevStatus.ComputeResults { + prevComputeMap[status.NetAddress] = status.NumOfHost + } + for _, status := range newStatus.ComputeResults { + if numOfHost, found := prevComputeMap[status.NetAddress]; !found { + return true + } else if numOfHost != status.NumOfHost { + return true + } + } + return false +} + func (h *MultiNicNetworkHandler) UpdateNetConfigStatus(instance *multinicv1.MultiNicNetwork, netConfigStatus multinicv1.NetConfigStatus, message string) error { if message != "" { instance.Status.Message = message @@ -128,9 +160,10 @@ func (h *MultiNicNetworkHandler) UpdateNetConfigStatus(instance *multinicv1.Mult if instance.Status.ComputeResults == nil { instance.Status.ComputeResults = []multinicv1.NicNetworkResult{} } - instance.Status.LastSyncTime = metav1.Now() instance.Status.NetConfigStatus = netConfigStatus - err := h.Client.Status().Update(context.Background(), instance) + ctx, cancel := context.WithTimeout(context.Background(), vars.ContextTimeout) + defer cancel() + err := h.Client.Status().Update(ctx, instance) if err != nil { vars.NetworkLog.V(2).Info(fmt.Sprintf("Failed to update %s status: %v", instance.Name, err)) } else { diff --git a/daemon/src/backend/handler.go b/daemon/src/backend/handler.go index 7bd126cd..0d884203 100644 --- a/daemon/src/backend/handler.go +++ b/daemon/src/backend/handler.go @@ -23,7 +23,8 @@ import ( ) const ( - API_VERSION = "multinic.fms.io/v1" + API_VERSION = "multinic.fms.io/v1" + APISERVER_TIMEOUT = 2 * time.Minute ) type DynamicHandler struct { @@ -71,7 +72,9 @@ func (h *DynamicHandler) Create(mapObj map[string]interface{}, namespace string, gvr, _ := schema.ParseResourceArg(h.ResourceName) log.Println(fmt.Sprintf("Create %s/%s", h.ResourceName, mapObj["metadata"].(map[string]interface{})["name"])) start := time.Now() - res, err := h.DYN.Resource(*gvr).Namespace(namespace).Create(context.TODO(), obj, options) + ctx, cancel := context.WithTimeout(context.Background(), APISERVER_TIMEOUT) + defer cancel() + res, err := h.DYN.Resource(*gvr).Namespace(namespace).Create(ctx, obj, options) elapsed := time.Since(start) log.Println(fmt.Sprintf("Create%s elapsed: %d us", h.Kind, int64(elapsed/time.Microsecond))) return res, err @@ -82,7 +85,9 @@ func (h *DynamicHandler) Update(mapObj map[string]interface{}, namespace string, gvr, _ := schema.ParseResourceArg(h.ResourceName) log.Println(fmt.Sprintf("Update %s/%s", h.ResourceName, mapObj["metadata"].(map[string]interface{})["name"])) start := time.Now() - res, err := h.DYN.Resource(*gvr).Namespace(namespace).Update(context.TODO(), obj, options) + ctx, cancel := context.WithTimeout(context.Background(), APISERVER_TIMEOUT) + defer cancel() + res, err := h.DYN.Resource(*gvr).Namespace(namespace).Update(ctx, obj, options) elapsed := time.Since(start) log.Println(fmt.Sprintf("Update%s elapsed: %d us", h.Kind, int64(elapsed/time.Microsecond))) return res, err @@ -91,7 +96,7 @@ func (h *DynamicHandler) Update(mapObj map[string]interface{}, namespace string, func (h *DynamicHandler) List(namespace string, options metav1.ListOptions) (*unstructured.UnstructuredList, error) { gvr, _ := schema.ParseResourceArg(h.ResourceName) start := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), APISERVER_TIMEOUT) defer cancel() res, err := h.DYN.Resource(*gvr).Namespace(namespace).List(ctx, options) elapsed := time.Since(start) @@ -102,7 +107,9 @@ func (h *DynamicHandler) List(namespace string, options metav1.ListOptions) (*un func (h *DynamicHandler) Get(name string, namespace string, options metav1.GetOptions) (*unstructured.Unstructured, error) { gvr, _ := schema.ParseResourceArg(h.ResourceName) start := time.Now() - res, err := h.DYN.Resource(*gvr).Namespace(namespace).Get(context.TODO(), name, options) + ctx, cancel := context.WithTimeout(context.Background(), APISERVER_TIMEOUT) + defer cancel() + res, err := h.DYN.Resource(*gvr).Namespace(namespace).Get(ctx, name, options) elapsed := time.Since(start) log.Println(fmt.Sprintf("Get%s elapsed: %d us", h.Kind, int64(elapsed/time.Microsecond))) return res, err @@ -112,7 +119,9 @@ func (h *DynamicHandler) Delete(name string, namespace string, options metav1.De gvr, _ := schema.ParseResourceArg(h.ResourceName) log.Println(fmt.Sprintf("Delete %s/%s", h.ResourceName, name)) start := time.Now() - err := h.DYN.Resource(*gvr).Namespace(namespace).Delete(context.TODO(), name, options) + ctx, cancel := context.WithTimeout(context.Background(), APISERVER_TIMEOUT) + defer cancel() + err := h.DYN.Resource(*gvr).Namespace(namespace).Delete(ctx, name, options) elapsed := time.Since(start) log.Println(fmt.Sprintf("Delete%s elapsed: %d us", h.Kind, int64(elapsed/time.Microsecond))) return err @@ -122,7 +131,9 @@ func (h *DynamicHandler) Patch(name string, namespace string, pt types.PatchType gvr, _ := schema.ParseResourceArg(h.ResourceName) log.Println(fmt.Sprintf("Patch %s/%s - %s", h.ResourceName, name, string(data))) start := time.Now() - res, err := h.DYN.Resource(*gvr).Namespace(namespace).Patch(context.TODO(), name, pt, data, options) + ctx, cancel := context.WithTimeout(context.Background(), APISERVER_TIMEOUT) + defer cancel() + res, err := h.DYN.Resource(*gvr).Namespace(namespace).Patch(ctx, name, pt, data, options) elapsed := time.Since(start) log.Println(fmt.Sprintf("Patch%s elapsed: %d us", h.Kind, int64(elapsed/time.Microsecond))) return res, err diff --git a/daemon/src/main.go b/daemon/src/main.go index f86ddb1d..c0886668 100644 --- a/daemon/src/main.go +++ b/daemon/src/main.go @@ -109,11 +109,16 @@ func Greet(targetHost string, myIP string) { log.Printf("Fail to marshal: %v", err) return } else { - res, err := http.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) + client := http.Client{ + Timeout: 2 * time.Minute, + } + defer client.CloseIdleConnections() + res, err := client.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) if err != nil { log.Printf("Fail to post: %v", err) return } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Printf("Status: %v", res.StatusCode) return @@ -282,5 +287,11 @@ func main() { router := handleRequests() daemonAddress := fmt.Sprintf("0.0.0.0:%d", DAEMON_PORT) log.Printf("Listening @%s", daemonAddress) - log.Fatal(http.ListenAndServe(daemonAddress, router)) + srv := &http.Server{ + Addr: daemonAddress, + Handler: router, + ReadTimeout: 10 * time.Minute, + WriteTimeout: 10 * time.Minute, + } + log.Fatal(srv.ListenAndServe()) } diff --git a/e2e-test/cni-stub/exec/ipam.go b/e2e-test/cni-stub/exec/ipam.go index 71968bb7..3387b4e9 100644 --- a/e2e-test/cni-stub/exec/ipam.go +++ b/e2e-test/cni-stub/exec/ipam.go @@ -10,6 +10,7 @@ import ( "fmt" "io/ioutil" "net/http" + "time" "bytes" "errors" @@ -48,23 +49,28 @@ func RequestIP(daemonIP string, daemonPort int, podName string, podNamespace str jsonReq, err := json.Marshal(request) if err != nil { - return response, errors.New(fmt.Sprintf("Marshal fail: %v", err)) + return response, fmt.Errorf("marshal fail: %v", err) } else { - res, err := http.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) + client := http.Client{ + Timeout: 2 * time.Minute, + } + defer client.CloseIdleConnections() + res, err := client.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) if err != nil { - return response, errors.New(fmt.Sprintf("Post fail: %v", err)) + return response, fmt.Errorf("post fail: %v", err) } + defer res.Body.Close() if res.StatusCode != http.StatusOK { return response, errors.New(res.Status) } body, err := ioutil.ReadAll(res.Body) if err != nil { - return response, errors.New(fmt.Sprintf("Read body: %v", err)) + return response, fmt.Errorf("read body: %v", err) } err = json.Unmarshal(body, &response) if err == nil && len(response) == 0 { - return response, fmt.Errorf("Response nothing") + return response, fmt.Errorf("response nothing") } return response, err } @@ -82,12 +88,17 @@ func Deallocate(daemonIP string, daemonPort int, podName string, podNamespace st jsonReq, err := json.Marshal(request) if err != nil { - return errors.New(fmt.Sprintf("Marshal fail: %v", err)) + return fmt.Errorf("marshal fail: %v", err) } else { - res, err := http.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) + client := http.Client{ + Timeout: 2 * time.Minute, + } + defer client.CloseIdleConnections() + res, err := client.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) if err != nil { - return errors.New(fmt.Sprintf("Post fail: %v", err)) + return fmt.Errorf("post fail: %v", err) } + defer res.Body.Close() if res.StatusCode != http.StatusOK { return errors.New(res.Status) } diff --git a/e2e-test/cni-stub/exec/selector.go b/e2e-test/cni-stub/exec/selector.go index 439be1d4..b2bd2eb5 100644 --- a/e2e-test/cni-stub/exec/selector.go +++ b/e2e-test/cni-stub/exec/selector.go @@ -10,6 +10,7 @@ import ( "fmt" "io/ioutil" "net/http" + "time" "bytes" "errors" @@ -64,23 +65,28 @@ func SelectNICs(daemonIP string, daemonPort int, podName string, podNamespace st jsonReq, err := json.Marshal(request) if err != nil { - return response, errors.New(fmt.Sprintf("Marshal fail: %v", err)) + return response, fmt.Errorf("marshal fail: %v", err) } else { - res, err := http.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) + client := http.Client{ + Timeout: 2 * time.Minute, + } + defer client.CloseIdleConnections() + res, err := client.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) if err != nil { - return response, errors.New(fmt.Sprintf("Post fail: %v", err)) + return response, fmt.Errorf("post fail: %v", err) } + defer res.Body.Close() if res.StatusCode != http.StatusOK { return response, errors.New(res.Status) } body, err := ioutil.ReadAll(res.Body) if err != nil { - return response, errors.New(fmt.Sprintf("Read body: %v", err)) + return response, fmt.Errorf("read body: %v", err) } err = json.Unmarshal(body, &response) if err == nil && len(response.Masters) == 0 { - return response, fmt.Errorf("Response nothing") + return response, fmt.Errorf("response nothing") } return response, err } diff --git a/e2e-test/daemon-stub/daemon.go b/e2e-test/daemon-stub/daemon.go index 66062162..ead595d8 100644 --- a/e2e-test/daemon-stub/daemon.go +++ b/e2e-test/daemon-stub/daemon.go @@ -14,6 +14,7 @@ import ( "net/http" "os" "strings" + "time" "strconv" @@ -157,11 +158,16 @@ func Greet(targetHost string, myIP string) { log.Printf("Fail to marshal: %v", err) return } else { - res, err := http.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) + client := http.Client{ + Timeout: 2 * time.Minute, + } + defer client.CloseIdleConnections() + res, err := client.Post(address, "application/json; charset=utf-8", bytes.NewBuffer(jsonReq)) if err != nil { log.Printf("Fail to post: %v", err) return } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Printf("Status: %v", res.StatusCode) return diff --git a/plugin/net_attach_def.go b/plugin/net_attach_def.go index 5c78ca4b..7b79a6d1 100644 --- a/plugin/net_attach_def.go +++ b/plugin/net_attach_def.go @@ -108,6 +108,18 @@ func GetNetAttachDefHandler(config *rest.Config) (*NetAttachDefHandler, error) { }, nil } +func CheckDefChanged(def, existingDef *NetworkAttachmentDefinition) bool { + if def.Spec.Config != existingDef.Spec.Config || len(def.Annotations) != len(existingDef.Annotations) { + return true + } + for k, v := range existingDef.Annotations { + if def.Annotations[k] != v { + return true + } + } + return false +} + // CreateOrUpdate creates new NetworkAttachmentDefinition resource if not exists, otherwise update func (h *NetAttachDefHandler) CreateOrUpdate(net *multinicv1.MultiNicNetwork, pluginStr string, annotations map[string]string) error { defs, err := h.generate(net, pluginStr, annotations) @@ -122,9 +134,14 @@ func (h *NetAttachDefHandler) CreateOrUpdate(net *multinicv1.MultiNicNetwork, pl if h.IsExist(name, namespace) { existingDef, _ := h.Get(name, namespace) def.ObjectMeta = existingDef.ObjectMeta - err := h.DynamicHandler.Update(namespace, def, result) - if err != nil { - errMsg = fmt.Sprintf("%s\n%s: %v", errMsg, namespace, err) + if CheckDefChanged(def, existingDef) { + if namespace == "default" { + vars.NetworkLog.V(2).Info(fmt.Sprintf("Update net-attach-def %s", def.Name)) + } + err := h.DynamicHandler.Update(namespace, def, result) + if err != nil { + errMsg = fmt.Sprintf("%s\n%s: %v", errMsg, namespace, err) + } } } else { err := h.DynamicHandler.Create(namespace, def, result) @@ -134,7 +151,7 @@ func (h *NetAttachDefHandler) CreateOrUpdate(net *multinicv1.MultiNicNetwork, pl } } if errMsg != "" { - return fmt.Errorf(errMsg) + vars.NetworkLog.V(2).Info(errMsg) } return nil } @@ -159,6 +176,40 @@ func (h *NetAttachDefHandler) getNamespace(net *multinicv1.MultiNicNetwork) ([]s return namespaces, nil } +// NetToDef generates net-attach-def from multinicnetwork on specific namespace called by generate function +func NetToDef(namespace string, net *multinicv1.MultiNicNetwork, pluginStr string, annotations map[string]string) (*NetworkAttachmentDefinition, error) { + name := net.GetName() + config := &NetConf{ + NetConf: types.NetConf{ + CNIVersion: CNI_VERSION, + Name: name, + Type: vars.TargetCNI, + }, + Subnet: net.Spec.Subnet, + MasterNetAddrs: net.Spec.MasterNetAddrs, + IsMultiNICIPAM: net.Spec.IsMultiNICIPAM, + DaemonPort: vars.DaemonPort, + } + var ipamObj map[string]interface{} + configBytes, _ := json.Marshal(config) + configStr := string(configBytes) + err := json.Unmarshal([]byte(net.Spec.IPAM), &ipamObj) + if err != nil { + return nil, err + } + ipamBytes, _ := json.Marshal(ipamObj) + pluginValue := fmt.Sprintf("\"plugin\":%s", pluginStr) + ipamValue := fmt.Sprintf("\"ipam\":%s", string(ipamBytes)) + configStr = strings.ReplaceAll(configStr, "\"ipam\":{}", ipamValue) + configStr = strings.ReplaceAll(configStr, "\"plugin\":null", pluginValue) + metaObj := GetMetaObject(name, namespace, annotations) + spec := NetworkAttachmentDefinitionSpec{ + Config: configStr, + } + netattachdef := NewNetworkAttachmentDefinition(metaObj, spec) + return &netattachdef, nil +} + // generate initializes NetworkAttachmentDefinition objects from MultiNicNetwork and unmarshal plugin func (h *NetAttachDefHandler) generate(net *multinicv1.MultiNicNetwork, pluginStr string, annotations map[string]string) ([]*NetworkAttachmentDefinition, error) { defs := []*NetworkAttachmentDefinition{} @@ -168,37 +219,11 @@ func (h *NetAttachDefHandler) generate(net *multinicv1.MultiNicNetwork, pluginSt } vars.NetworkLog.V(2).Info(fmt.Sprintf("generate net-attach-def config on %d namespaces", len(namespaces))) for _, ns := range namespaces { - name := net.GetName() - namespace := ns - config := &NetConf{ - NetConf: types.NetConf{ - CNIVersion: CNI_VERSION, - Name: name, - Type: vars.TargetCNI, - }, - Subnet: net.Spec.Subnet, - MasterNetAddrs: net.Spec.MasterNetAddrs, - IsMultiNICIPAM: net.Spec.IsMultiNICIPAM, - DaemonPort: vars.DaemonPort, - } - var ipamObj map[string]interface{} - configBytes, _ := json.Marshal(config) - configStr := string(configBytes) - err := json.Unmarshal([]byte(net.Spec.IPAM), &ipamObj) + def, err := NetToDef(ns, net, pluginStr, annotations) if err != nil { return defs, err } - ipamBytes, _ := json.Marshal(ipamObj) - pluginValue := fmt.Sprintf("\"plugin\":%s", pluginStr) - ipamValue := fmt.Sprintf("\"ipam\":%s", string(ipamBytes)) - configStr = strings.ReplaceAll(configStr, "\"ipam\":{}", ipamValue) - configStr = strings.ReplaceAll(configStr, "\"plugin\":null", pluginValue) - metaObj := GetMetaObject(name, namespace, annotations) - spec := NetworkAttachmentDefinitionSpec{ - Config: configStr, - } - netattachdef := NewNetworkAttachmentDefinition(metaObj, spec) - defs = append(defs, &netattachdef) + defs = append(defs, def) } return defs, nil } diff --git a/unit-test/multinicnetwork_test.go b/unit-test/multinicnetwork_test.go index 90ecde9d..b91e3017 100644 --- a/unit-test/multinicnetwork_test.go +++ b/unit-test/multinicnetwork_test.go @@ -8,8 +8,12 @@ package controllers import ( "fmt" + multinicv1 "github.com/foundation-model-stack/multi-nic-cni/api/v1" + "github.com/foundation-model-stack/multi-nic-cni/controllers" + "github.com/foundation-model-stack/multi-nic-cni/plugin" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" //+kubebuilder:scaffold:imports ) @@ -33,3 +37,164 @@ var _ = Describe("Test deploying MultiNicNetwork", func() { Expect(err).NotTo(HaveOccurred()) }) }) + +var _ = Describe("Test definition changes check", func() { + cniVersion := "0.3.0" + cniType := "ipvlan" + mode := "l2" + mtu := 1500 + cniArgs := make(map[string]string) + cniArgs["mode"] = mode + cniArgs["mtu"] = fmt.Sprintf("%d", mtu) + + multinicnetwork := getMultiNicCNINetwork("test-mn", cniVersion, cniType, cniArgs) + It("detect no change", func() { + mainPlugin, annotations, err := multinicnetworkReconciler.GetMainPluginConf(multinicnetwork) + Expect(err).NotTo(HaveOccurred()) + def, err := plugin.NetToDef("", multinicnetwork, mainPlugin, annotations) + Expect(err).NotTo(HaveOccurred()) + defCopy, err := plugin.NetToDef("", multinicnetwork, mainPlugin, annotations) + Expect(err).NotTo(HaveOccurred()) + changed := plugin.CheckDefChanged(defCopy, def) + Expect(changed).To(BeFalse()) + }) + + It("detect annotation change", func() { + mainPlugin, annotations, err := multinicnetworkReconciler.GetMainPluginConf(multinicnetwork) + Expect(err).NotTo(HaveOccurred()) + def, err := plugin.NetToDef("", multinicnetwork, mainPlugin, annotations) + Expect(err).NotTo(HaveOccurred()) + + newAnnotations := map[string]string{"resource": "new"} + defWithNewAnnotation, err := plugin.NetToDef("", multinicnetwork, mainPlugin, newAnnotations) + Expect(err).NotTo(HaveOccurred()) + changed := plugin.CheckDefChanged(defWithNewAnnotation, def) + Expect(changed).To(BeTrue()) + }) + + It("detect config change", func() { + mainPlugin, annotations, err := multinicnetworkReconciler.GetMainPluginConf(multinicnetwork) + Expect(err).NotTo(HaveOccurred()) + def, err := plugin.NetToDef("", multinicnetwork, mainPlugin, annotations) + Expect(err).NotTo(HaveOccurred()) + + newCniArgs := make(map[string]string) + newCniArgs["mode"] = "l3" + newCniArgs["mtu"] = fmt.Sprintf("%d", mtu) + changedArgsNetwork := getMultiNicCNINetwork("test-mn", cniVersion, cniType, newCniArgs) + newMainPlugin, annotations, err := multinicnetworkReconciler.GetMainPluginConf(changedArgsNetwork) + Expect(err).NotTo(HaveOccurred()) + defWithNewArgs, err := plugin.NetToDef("", changedArgsNetwork, newMainPlugin, annotations) + Expect(err).NotTo(HaveOccurred()) + changed := plugin.CheckDefChanged(defWithNewArgs, def) + Expect(changed).To(BeTrue()) + }) +}) + +func getNetStatus(computeResults []multinicv1.NicNetworkResult, discoverStatus multinicv1.DiscoverStatus, netConfigStatus multinicv1.NetConfigStatus, routeStatus multinicv1.RouteStatus) multinicv1.MultiNicNetworkStatus { + return multinicv1.MultiNicNetworkStatus{ + ComputeResults: computeResults, + DiscoverStatus: discoverStatus, + NetConfigStatus: netConfigStatus, + RouteStatus: routeStatus, + Message: "", + LastSyncTime: metav1.Now(), + } +} + +func testNewNetStatus(multinicnetwork *multinicv1.MultiNicNetwork, newStatus multinicv1.MultiNicNetworkStatus, expectedChange bool) *multinicv1.MultiNicNetwork { + if expectedChange { + updated := controllers.NetStatusUpdated(multinicnetwork, newStatus) + // check new update + Expect(updated).To(Equal(expectedChange)) + // update status + multinicnetwork.Status = newStatus + } + updated := controllers.NetStatusUpdated(multinicnetwork, newStatus) + // expect no update + Expect(updated).To(BeFalse()) + return multinicnetwork +} + +var _ = Describe("Test multinicnetwork status change check", func() { + cniVersion := "0.3.0" + cniType := "ipvlan" + mode := "l3" + mtu := 1500 + cniArgs := make(map[string]string) + cniArgs["mode"] = mode + cniArgs["mtu"] = fmt.Sprintf("%d", mtu) + + It("detect change from no status", func() { + multinicnetwork := getMultiNicCNINetwork("test-mn", cniVersion, cniType, cniArgs) + initStatus := getNetStatus([]multinicv1.NicNetworkResult{}, multinicv1.DiscoverStatus{}, multinicv1.WaitForConfig, multinicv1.ApplyingRoute) + updated := controllers.NetStatusUpdated(multinicnetwork, initStatus) + Expect(updated).To(BeTrue()) + }) + + It("detect change on compute results", func() { + multinicnetwork := getMultiNicCNINetwork("test-mn", cniVersion, cniType, cniArgs) + multinicnetwork.Status = getNetStatus([]multinicv1.NicNetworkResult{}, multinicv1.DiscoverStatus{}, multinicv1.WaitForConfig, multinicv1.ApplyingRoute) + + net1 := multinicv1.NicNetworkResult{ + NetAddress: "192.168.0.0/24", + NumOfHost: 1, + } + net2 := multinicv1.NicNetworkResult{ + NetAddress: "192.168.1.0/24", + NumOfHost: 2, + } + + computeResults := []multinicv1.NicNetworkResult{net1} + newStatus := getNetStatus(computeResults, multinicv1.DiscoverStatus{}, multinicv1.WaitForConfig, multinicv1.ApplyingRoute) + expectedChange := true + multinicnetwork = testNewNetStatus(multinicnetwork, newStatus, expectedChange) + + // add new compute result + computeResults = []multinicv1.NicNetworkResult{net1, net2} + newStatus = getNetStatus(computeResults, multinicv1.DiscoverStatus{}, multinicv1.WaitForConfig, multinicv1.ApplyingRoute) + expectedChange = true + multinicnetwork = testNewNetStatus(multinicnetwork, newStatus, expectedChange) + + // change order + computeResults = []multinicv1.NicNetworkResult{net2, net1} + newStatus = getNetStatus(computeResults, multinicv1.DiscoverStatus{}, multinicv1.WaitForConfig, multinicv1.ApplyingRoute) + expectedChange = false + multinicnetwork = testNewNetStatus(multinicnetwork, newStatus, expectedChange) + + // change values + net1.NetAddress = "192.168.0.2/24" + computeResults = []multinicv1.NicNetworkResult{net1, net2} + newStatus = getNetStatus(computeResults, multinicv1.DiscoverStatus{}, multinicv1.WaitForConfig, multinicv1.ApplyingRoute) + expectedChange = true + testNewNetStatus(multinicnetwork, newStatus, expectedChange) + net1.NumOfHost = 3 + computeResults = []multinicv1.NicNetworkResult{net1, net2} + newStatus = getNetStatus(computeResults, multinicv1.DiscoverStatus{}, multinicv1.WaitForConfig, multinicv1.ApplyingRoute) + expectedChange = true + testNewNetStatus(multinicnetwork, newStatus, expectedChange) + }) + + It("detect change on simple status", func() { + multinicnetwork := getMultiNicCNINetwork("test-mn", cniVersion, cniType, cniArgs) + multinicnetwork.Status = getNetStatus([]multinicv1.NicNetworkResult{}, multinicv1.DiscoverStatus{}, multinicv1.WaitForConfig, multinicv1.ApplyingRoute) + + // change discover status + discoverStatus := multinicv1.DiscoverStatus{ + ExistDaemon: 10, + } + newStatus := getNetStatus([]multinicv1.NicNetworkResult{}, discoverStatus, multinicv1.WaitForConfig, multinicv1.ApplyingRoute) + expectedChange := true + testNewNetStatus(multinicnetwork, newStatus, expectedChange) + + // change config status + newStatus = getNetStatus([]multinicv1.NicNetworkResult{}, discoverStatus, multinicv1.ConfigComplete, multinicv1.ApplyingRoute) + expectedChange = true + testNewNetStatus(multinicnetwork, newStatus, expectedChange) + + // change route status + newStatus = getNetStatus([]multinicv1.NicNetworkResult{}, discoverStatus, multinicv1.ConfigComplete, multinicv1.AllRouteApplied) + expectedChange = true + testNewNetStatus(multinicnetwork, newStatus, expectedChange) + }) +})