Skip to content

Commit

Permalink
Refactor excludePodIP function for improved readability and simplicit…
Browse files Browse the repository at this point in the history
…y and add tests for ESClient.excludePodIP function

Signed-off-by: Abouzar Kamaee <[email protected]>
  • Loading branch information
Abouzar Kamaee committed Jun 12, 2024
1 parent 81b3c41 commit b1eb694
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 25 deletions.
50 changes: 25 additions & 25 deletions operator/es_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error {
return err
}
c.logger().Infof("Excluding pod %s/%s from shard allocation", pod.Namespace, pod.Name)
err = c.excludePodIP(pod)
err = c.excludePodIP(pod.Status.PodIP)
if err != nil {
return err
}
Expand Down Expand Up @@ -272,41 +272,41 @@ func (c *ESClient) getClusterSettings() (*ESSettings, error) {
return &esSettings, nil
}

// adds the podIP to Elasticsearch exclude._ip list
func (c *ESClient) excludePodIP(pod *v1.Pod) error {

// excludePodIP adds the podIP to Elasticsearch exclude._ip list
func (c *ESClient) excludePodIP(podIP string) error {
c.mux.Lock()
defer c.mux.Unlock()

podIP := pod.Status.PodIP

// Fetch current cluster settings
esSettings, err := c.getClusterSettings()
if err != nil {
c.mux.Unlock()
return err
}

excludeString := esSettings.GetPersistentExcludeIPs().ValueOrZero()

// add pod IP to exclude list
ips := []string{}
if excludeString != "" {
ips = strings.Split(excludeString, ",")
// Get the current exclude IPs
esExcludedIPsString := esSettings.GetPersistentExcludeIPs().ValueOrZero()
var esExcludedIPs []string
if esExcludedIPsString != "" {
esExcludedIPs = strings.Split(esExcludedIPsString, ",")
}
var foundPodIP bool
for _, ip := range ips {

// Check if pod IP is already in the list
for _, ip := range esExcludedIPs {
if ip == podIP {
foundPodIP = true
break
// Pod IP is already in the exclude list, no need to add
return nil
}
}
if !foundPodIP {
ips = append(ips, podIP)
sort.Strings(ips)
err = c.setExcludeIPs(strings.Join(ips, ","), esSettings)
}

c.mux.Unlock()
return err
// Add pod IP to the list
esExcludedIPs = append(esExcludedIPs, podIP)
sort.Strings(esExcludedIPs)
newESExcludedPodIPsString := strings.Join(esExcludedIPs, ",")

c.logger().Infof("Updating exclude._ip list to '%s' after adding IP '%s'", newESExcludedPodIPsString, podIP)

// Update exclude._ip setting
return c.setExcludeIPs(newESExcludedPodIPsString, esSettings)
}

// undoExcludePodIP Removes the pod's IP from Elasticsearch exclude._ip list
Expand Down Expand Up @@ -422,7 +422,7 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error {

// make sure the IP is still excluded, this could have been updated in the meantime.
if remainingShards > 0 {
err = c.excludePodIP(pod)
err = c.excludePodIP(pod.Status.PodIP)
if err != nil {
return true, err
}
Expand Down
84 changes: 84 additions & 0 deletions operator/es_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package operator
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -572,3 +573,86 @@ func TestESSettingsMergeNonEmtpyTransientSettings(t *testing.T) {
})
}
}

func TestExcludePodIP(t *testing.T) {
tests := []struct {
name string
podIP string
getSettingsResponse *http.Response
putSettingsResponse *http.Response
expectedExcludedIPs string
expectedErr error
}{
{
name: "Add ip to empty exclude list",
podIP: "10.0.0.1",
getSettingsResponse: httpmock.NewStringResponse(http.StatusOK, `{}`),
putSettingsResponse: httpmock.NewStringResponse(http.StatusOK, `{}`),
expectedExcludedIPs: "10.0.0.1",
},
{
name: "Add ip to non-empty exclude list",
podIP: "10.0.0.1",
getSettingsResponse: httpmock.NewStringResponse(http.StatusOK, `{"persistent":{"cluster":{"routing":{"allocation":{"exclude":{"_ip":"10.0.0.2,10.0.0.3"}}}}}}`),
putSettingsResponse: httpmock.NewStringResponse(http.StatusOK, `{}`),
expectedExcludedIPs: "10.0.0.1,10.0.0.2,10.0.0.3",
},
{
name: "Pod IP already in exclude list",
podIP: "10.0.0.1",
getSettingsResponse: httpmock.NewStringResponse(http.StatusOK, `{"persistent":{"cluster":{"routing":{"allocation":{"exclude":{"_ip":"10.0.0.1,10.0.0.2,10.0.0.3"}}}}}}`),
putSettingsResponse: httpmock.NewStringResponse(http.StatusOK, `{}`),
expectedExcludedIPs: "10.0.0.1,10.0.0.2,10.0.0.3",
},
{
name: "Fetch(GET) cluster settings error",
getSettingsResponse: httpmock.NewStringResponse(http.StatusInternalServerError, `{}`),
expectedErr: fmt.Errorf("code status 500 - {}"),
},
{
name: "Update(PUT) cluster settings error",
getSettingsResponse: httpmock.NewStringResponse(http.StatusOK, `{}`),
putSettingsResponse: httpmock.NewStringResponse(http.StatusBadGateway, `{}`),
expectedErr: fmt.Errorf("code status 502 - {}"),
},
}

httpmock.Activate()
defer httpmock.Deactivate()
for _, tt := range tests {
httpmock.Reset()
httpmock.RegisterResponder("GET", "http://elasticsearch:9200/_cluster/settings",
httpmock.ResponderFromResponse(tt.getSettingsResponse))
httpmock.RegisterResponder("PUT", "http://elasticsearch:9200/_cluster/settings",
func(request *http.Request) (*http.Response, error) {
var esSettings ESSettings
body, err := io.ReadAll(request.Body)
if err != nil {
return nil, err
}
err = json.Unmarshal(body, &esSettings)
if err != nil {
return nil, err
}
assert.Equal(t, esSettings.GetPersistentExcludeIPs().ValueOrZero(), tt.expectedExcludedIPs)
return tt.putSettingsResponse, nil
})
esUrl, _ := url.Parse("http://elasticsearch:9200")
config := &DrainingConfig{
MaxRetries: 999,
MinimumWaitTime: 10 * time.Second,
MaximumWaitTime: 30 * time.Second,
}
client := &ESClient{
Endpoint: esUrl,
DrainingConfig: config,
}
err := client.excludePodIP(tt.podIP)
if tt.expectedErr != nil {
assert.Error(t, err)
assert.EqualError(t, err, tt.expectedErr.Error())
} else {
assert.NoError(t, err)
}
}
}

0 comments on commit b1eb694

Please sign in to comment.