diff --git a/README.md b/README.md index 43b53e0b8..771efb147 100644 --- a/README.md +++ b/README.md @@ -368,6 +368,7 @@ It has following flags: --min-native float Minimum amount of chain native coins (xDAI) nodes should have. --min-swarm float Minimum amount of swarm tokens (xBZZ) nodes should have. --namespace string Kubernetes namespace. Overrides cluster name if set. +--label-selector string Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected. --timeout duration Timeout. (default 5m0s) --wallet-key string Hex-encoded private key for the Bee node wallet. Required. ``` @@ -385,13 +386,14 @@ Command **node-operator** uses tool It has following flags: ```console ---geth-url string Endpoint to chain node. Required. ---help help for node-operator ---min-native float Minimum amount of chain native coins (xDAI) nodes should have. ---min-swarm float Minimum amount of swarm tokens (xBZZ) nodes should have. ---namespace string Kubernetes namespace to scan for scheduled pods. ---timeout duration Timeout. Default is infinite. ---wallet-key string Hex-encoded private key for the Bee node wallet. Required. +--geth-url string Endpoint to chain node. Required. +--help help for node-operator +--min-native float Minimum amount of chain native coins (xDAI) nodes should have. +--min-swarm float Minimum amount of swarm tokens (xBZZ) nodes should have. +--namespace string Kubernetes namespace to scan for scheduled pods. +--label-selector string Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected. +--timeout duration Timeout. Default is infinite. +--wallet-key string Hex-encoded private key for the Bee node wallet. Required. ``` example: @@ -408,7 +410,7 @@ example: ```console --config string config file (default is $HOME/.beekeeper.yaml) ---config-dir string config directory (default is $HOME/.beekeeper/) (default "C:\\Users\\ljubi\\.beekeeper") +--config-dir string config directory (default is $HOME/.beekeeper/) --config-git-branch string Git branch (default "main") --config-git-password string Git password or personal access tokens (needed for private repos) --config-git-repo string Git repository with configurations (uses config directory when Git repo is not specified) (default "") diff --git a/cmd/beekeeper/cmd/node_funder.go b/cmd/beekeeper/cmd/node_funder.go index ebdf83ab6..0d3345d4c 100644 --- a/cmd/beekeeper/cmd/node_funder.go +++ b/cmd/beekeeper/cmd/node_funder.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/ethersphere/beekeeper/pkg/config" @@ -13,6 +14,8 @@ import ( "github.com/spf13/cobra" ) +const nodeFunderLabelSelector string = "beekeeper.ethswarm.org/node-funder=true" + func (c *command) initNodeFunderCmd() (err error) { const ( optionNameAddresses = "addresses" @@ -23,6 +26,7 @@ func (c *command) initNodeFunderCmd() (err error) { optionNameMinNative = "min-native" optionNameMinSwarm = "min-swarm" optionNameTimeout = "timeout" + optionNameLabelSelector = "label-selector" ) cmd := &cobra.Command{ @@ -81,7 +85,8 @@ func (c *command) initNodeFunderCmd() (err error) { var nodeLister funder.NodeLister // if addresses are provided, use them, not k8s client to list nodes if cfg.Namespace != "" { - nodeLister = newNodeLister(c.k8sClient, c.log) + label := c.globalConfig.GetString(optionNameLabelSelector) + nodeLister = newNodeLister(c.k8sClient, label, c.log) } return funder.Fund(ctx, funder.Config{ @@ -105,6 +110,7 @@ func (c *command) initNodeFunderCmd() (err error) { cmd.Flags().String(optionNameWalletKey, "", "Hex-encoded private key for the Bee node wallet. Required.") cmd.Flags().Float64(optionNameMinNative, 0, "Minimum amount of chain native coins (xDAI) nodes should have.") cmd.Flags().Float64(optionNameMinSwarm, 0, "Minimum amount of swarm tokens (xBZZ) nodes should have.") + cmd.Flags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected.") cmd.Flags().Duration(optionNameTimeout, 5*time.Minute, "Timeout.") c.root.AddCommand(cmd) @@ -114,12 +120,14 @@ func (c *command) initNodeFunderCmd() (err error) { type nodeLister struct { k8sClient *k8s.Client + label string log logging.Logger } -func newNodeLister(k8sClient *k8s.Client, l logging.Logger) *nodeLister { +func newNodeLister(k8sClient *k8s.Client, label string, l logging.Logger) *nodeLister { return &nodeLister{ k8sClient: k8sClient, + label: label, log: l, } } @@ -133,12 +141,12 @@ func (nf *nodeLister) List(ctx context.Context, namespace string) (nodes []funde return nil, fmt.Errorf("namespace not provided") } - ingressHosts, err := nf.k8sClient.Ingress.ListAPINodesHosts(ctx, namespace) + ingressHosts, err := nf.k8sClient.Ingress.GetIngressHosts(ctx, namespace, nf.label) if err != nil { return nil, fmt.Errorf("list ingress api nodes hosts: %s", err.Error()) } - ingressRouteHosts, err := nf.k8sClient.IngressRoute.ListAPINodesHosts(ctx, namespace) + ingressRouteHosts, err := nf.k8sClient.IngressRoute.GetIngressHosts(ctx, namespace, nf.label) if err != nil { return nil, fmt.Errorf("list ingress route api nodes hosts: %s", err.Error()) } @@ -147,7 +155,7 @@ func (nf *nodeLister) List(ctx context.Context, namespace string) (nodes []funde for _, node := range ingressHosts { nodes = append(nodes, funder.NodeInfo{ - Name: node.Name, + Name: strings.TrimSuffix(node.Name, "-api"), Address: fmt.Sprintf("http://%s", node.Host), }) } diff --git a/cmd/beekeeper/cmd/operator.go b/cmd/beekeeper/cmd/operator.go index 48759f6ee..b6ddf40af 100644 --- a/cmd/beekeeper/cmd/operator.go +++ b/cmd/beekeeper/cmd/operator.go @@ -18,6 +18,7 @@ func (c *command) initOperatorCmd() (err error) { optionNameMinNative = "min-native" optionNameMinSwarm = "min-swarm" optionNameTimeout = "timeout" + optionNameLabelSelector = "label-selector" ) cmd := &cobra.Command{ @@ -26,7 +27,11 @@ func (c *command) initOperatorCmd() (err error) { Long: `Node operator scans for scheduled pods and funds them using node-funder. beekeeper node-operator`, RunE: func(cmd *cobra.Command, args []string) (err error) { cfg := config.NodeFunder{} - namespace := c.globalConfig.GetString(optionNameNamespace) + + var namespace string + if namespace = c.globalConfig.GetString(optionNameNamespace); namespace == "" { + return errors.New("namespace not provided") + } // chain node endpoint check if cfg.ChainNodeEndpoint = c.globalConfig.GetString(optionNameChainNodeEndpoint); cfg.ChainNodeEndpoint == "" { @@ -62,6 +67,7 @@ func (c *command) initOperatorCmd() (err error) { ChainNodeEndpoint: cfg.ChainNodeEndpoint, MinAmounts: cfg.MinAmounts, K8sClient: c.k8sClient, + LabelSelector: c.globalConfig.GetString(optionNameLabelSelector), }).Run(ctxNew) }, PreRunE: c.preRunE, @@ -72,6 +78,7 @@ func (c *command) initOperatorCmd() (err error) { cmd.Flags().String(optionNameWalletKey, "", "Hex-encoded private key for the Bee node wallet. Required.") cmd.Flags().Float64(optionNameMinNative, 0, "Minimum amount of chain native coins (xDAI) nodes should have.") cmd.Flags().Float64(optionNameMinSwarm, 0, "Minimum amount of swarm tokens (xBZZ) nodes should have.") + cmd.Flags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. An empty string disables filtering, allowing all resources to be selected.") cmd.Flags().Duration(optionNameTimeout, 0*time.Minute, "Timeout. Default is infinite.") c.root.AddCommand(cmd) diff --git a/config/config.yaml b/config/config.yaml index 88d5f6622..b73637a5a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -105,6 +105,7 @@ node-groups: app.kubernetes.io/name: "bee" app.kubernetes.io/part-of: "bee" app.kubernetes.io/version: "latest" + beekeeper.ethswarm.org/node-funder: "true" node-selector: node-group: "private" persistence-enabled: false diff --git a/config/local.yaml b/config/local.yaml index d2a1d924e..57e9fbbc0 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -117,6 +117,7 @@ node-groups: app.kubernetes.io/name: "bee" app.kubernetes.io/part-of: "bee" app.kubernetes.io/version: "latest" + beekeeper.ethswarm.org/node-funder: "true" node-selector: node-group: "local" persistence-enabled: false diff --git a/config/testnet-giant.yaml b/config/testnet-giant.yaml index 8f21f8c43..ca8f21261 100644 --- a/config/testnet-giant.yaml +++ b/config/testnet-giant.yaml @@ -53,6 +53,7 @@ node-groups: app.kubernetes.io/name: "bee" app.kubernetes.io/part-of: "bee" app.kubernetes.io/version: "latest" + beekeeper.ethswarm.org/node-funder: "true" node-selector: node-group: "testnet-giant" persistence-enabled: false diff --git a/config/testnet.yaml b/config/testnet.yaml index b76bef31f..d7f7a2f93 100644 --- a/config/testnet.yaml +++ b/config/testnet.yaml @@ -50,6 +50,7 @@ node-groups: app.kubernetes.io/name: "bee" app.kubernetes.io/part-of: "bee" app.kubernetes.io/version: "latest" + beekeeper.ethswarm.org/node-funder: "true" node-selector: node-group: "private" persistence-enabled: true diff --git a/pkg/k8s/customresource/ingressroute/client.go b/pkg/k8s/customresource/ingressroute/client.go index d18fec747..87e22d513 100644 --- a/pkg/k8s/customresource/ingressroute/client.go +++ b/pkg/k8s/customresource/ingressroute/client.go @@ -3,7 +3,6 @@ package ingressroute import ( "context" "fmt" - "strings" "github.com/ethersphere/beekeeper/pkg/k8s/ingress" "k8s.io/apimachinery/pkg/api/errors" @@ -82,10 +81,10 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (err error) return } -// ListAPINodesHosts list Ingresses that are nodes -func (c *Client) ListAPINodesHosts(ctx context.Context, namespace string) (nodes []ingress.NodeInfo, err error) { +// GetIngressHosts list Ingress Routes hosts using label as selector, for the given namespace. If label is empty, all Ingresses are listed. +func (c *Client) GetIngressHosts(ctx context.Context, namespace, label string) (nodes []ingress.NodeInfo, err error) { ingressRoutes, err := c.clientset.IngressRoutes(namespace).List(ctx, metav1.ListOptions{ - LabelSelector: "app.kubernetes.io/name=bee", + LabelSelector: label, }) if err != nil { if errors.IsNotFound(err) { @@ -96,15 +95,13 @@ func (c *Client) ListAPINodesHosts(ctx context.Context, namespace string) (nodes if ingressRoutes != nil { for _, ingressRoute := range ingressRoutes.Items { - if strings.HasSuffix(ingressRoute.Name, "-api") { - for _, route := range ingressRoute.Spec.Routes { - host := route.GetHost() - if host != "" { - nodes = append(nodes, ingress.NodeInfo{ - Name: strings.TrimSuffix(ingressRoute.Name, "-api"), - Host: host, - }) - } + for _, route := range ingressRoute.Spec.Routes { + host := route.GetHost() + if host != "" { + nodes = append(nodes, ingress.NodeInfo{ + Name: ingressRoute.Name, + Host: host, + }) } } } diff --git a/pkg/k8s/ingress/client.go b/pkg/k8s/ingress/client.go index 57a0d7f54..34836c67f 100644 --- a/pkg/k8s/ingress/client.go +++ b/pkg/k8s/ingress/client.go @@ -3,7 +3,6 @@ package ingress import ( "context" "fmt" - "strings" v1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -76,10 +75,10 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (err error) return } -// ListAPINodesHosts list Ingresses that are nodes -func (c *Client) ListAPINodesHosts(ctx context.Context, namespace string) (nodes []NodeInfo, err error) { +// GetIngressHosts list Ingresses hosts using label as selector, for the given namespace. If label is empty, all Ingresses are listed. +func (c *Client) GetIngressHosts(ctx context.Context, namespace, label string) (nodes []NodeInfo, err error) { ingreses, err := c.clientset.NetworkingV1().Ingresses(namespace).List(ctx, metav1.ListOptions{ - LabelSelector: "app.kubernetes.io/name=bee", + LabelSelector: label, }) if err != nil { if errors.IsNotFound(err) { @@ -89,14 +88,12 @@ func (c *Client) ListAPINodesHosts(ctx context.Context, namespace string) (nodes } for _, ingress := range ingreses.Items { - if strings.HasSuffix(ingress.Name, "-api") { - for _, rule := range ingress.Spec.Rules { - if rule.Host != "" { - nodes = append(nodes, NodeInfo{ - Name: strings.TrimSuffix(ingress.Name, "-api"), - Host: rule.Host, - }) - } + for _, rule := range ingress.Spec.Rules { + if rule.Host != "" { + nodes = append(nodes, NodeInfo{ + Name: ingress.Name, + Host: rule.Host, + }) } } } diff --git a/pkg/k8s/pod/client.go b/pkg/k8s/pod/client.go index 9a08e0586..f8a5b28ef 100644 --- a/pkg/k8s/pod/client.go +++ b/pkg/k8s/pod/client.go @@ -73,22 +73,20 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (err error) return } -// EventsWatch watches for events. -func (c *Client) EventsWatch(ctx context.Context, namespace string, operatorChan chan string) (err error) { - c.log.Infof("starting events watch") +// WatchNewRunning detects new running Pods in the namespace and sends their IPs to the channel. +func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector string, newPodIps chan string) (err error) { + c.log.Infof("starting events watch in namespace %s, label selector %s", namespace, labelSelector) defer c.log.Infof("events watch done") - defer close(operatorChan) + defer close(newPodIps) watcher, err := c.clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{ - // TODO: add this label to beekeeper and filter on it => app.kubernetes.io/name=bee - LabelSelector: "app.kubernetes.io/name=bee", + LabelSelector: labelSelector, }) if err != nil { return fmt.Errorf("getting pod events in namespace %s: %w", namespace, err) } defer watcher.Stop() - // listen for either events from the watcher or a context cancellation for { select { case <-ctx.Done(): @@ -98,13 +96,12 @@ func (c *Client) EventsWatch(ctx context.Context, namespace string, operatorChan return fmt.Errorf("watch channel closed") } switch event.Type { - // case watch.Added: // if we want to do something with already running pods + // case watch.Added: // already running pods case watch.Modified: pod, ok := event.Object.(*v1.Pod) if ok { - if pod.Status.PodIP != "" && pod.ObjectMeta.DeletionTimestamp == nil { - c.log.Tracef("new pod event: {%s}, {%s}, {%s}, {%s}, {%v}", event.Type, pod.Name, pod.Status.Phase, pod.Status.PodIP, pod.ObjectMeta.DeletionTimestamp) - operatorChan <- pod.Status.PodIP + if pod.Status.PodIP != "" && pod.ObjectMeta.DeletionTimestamp == nil && pod.Status.Phase == v1.PodRunning { + newPodIps <- pod.Status.PodIP } } } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 50512b552..eb0485a2d 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -23,6 +23,7 @@ type ClientConfig struct { MinAmounts config.MinAmounts K8sClient *k8s.Client HTTPClient *http.Client // injected HTTP client + LabelSelector string } type Client struct { @@ -55,21 +56,21 @@ func (c *Client) Run(ctx context.Context) error { c.Log.Infof("operator started") defer c.Log.Infof("operator done") - operatorChan := make(chan string) + newPodIps := make(chan string) go func() { for { select { case <-ctx.Done(): c.Log.Error("operator context canceled") return - case podIp, ok := <-operatorChan: + case podIp, ok := <-newPodIps: if !ok { c.Log.Error("operator channel closed") return } c.Log.Debugf("operator received pod ip: %s", podIp) - addresses, err := c.processPodIP(ctx, podIp) + addresses, err := c.getAddresses(ctx, podIp) if err != nil { c.Log.Errorf("process pod ip: %v", err) continue @@ -88,22 +89,21 @@ func (c *Client) Run(ctx context.Context) error { }, nil, nil, funder.WithLoggerOption(c.Log)) if err != nil { c.Log.Errorf("funder: %v", err) - continue } } } }() - err := c.K8sClient.Pods.EventsWatch(ctx, c.Namespace, operatorChan) - if err != nil { + if err := c.K8sClient.Pods.WatchNewRunning(ctx, c.Namespace, c.LabelSelector, newPodIps); err != nil { return fmt.Errorf("events watch: %v", err) } + return nil } -func (c *Client) processPodIP(ctx context.Context, podIp string) (bee.Addresses, error) { - // http://10.3.247.202:1635/addresses - // bee.Addresses is struct that represents response with field Ethereum string +// getAddresses sends a request to the pod IP and retrieves the Addresses struct, +// which includes overlay, underlay addresses, Ethereum address, and public keys. +func (c *Client) getAddresses(ctx context.Context, podIp string) (bee.Addresses, error) { url := &url.URL{ Scheme: "http", Host: podIp + ":1633", // it is possible to extract port from service @@ -121,15 +121,10 @@ func (c *Client) processPodIP(ctx context.Context, podIp string) (bee.Addresses, } defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return bee.Addresses{}, fmt.Errorf("read body: %s", err.Error()) - } - var addresses bee.Addresses - err = json.Unmarshal(body, &addresses) - if err != nil { - return bee.Addresses{}, fmt.Errorf("unmarshal body: %s", err.Error()) + + if err = json.NewDecoder(resp.Body).Decode(&addresses); err != nil { + return bee.Addresses{}, fmt.Errorf("decode body: %s", err.Error()) } return addresses, nil