diff --git a/go.mod b/go.mod index 285fd35..c8f8075 100644 --- a/go.mod +++ b/go.mod @@ -6,19 +6,20 @@ require ( github.com/Luzifer/go-dhparam v1.1.0 github.com/evanphx/json-patch v4.11.0+incompatible github.com/evanphx/json-patch/v5 v5.5.0 + github.com/go-logr/logr v0.4.0 github.com/openshift/api v0.0.0-20210625082935-ad54d363d274 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.8.1 k8s.io/api v0.21.2 k8s.io/apimachinery v0.21.3 k8s.io/client-go v0.21.2 + k8s.io/klog/v2 v2.8.0 k8s.io/utils v0.0.0-20210527160623-6fdb442a123b sigs.k8s.io/controller-runtime v0.9.2 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-logr/logr v0.4.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.5 // indirect @@ -38,7 +39,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect - k8s.io/klog/v2 v2.8.0 // indirect k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect sigs.k8s.io/yaml v1.2.0 // indirect diff --git a/state_transfer/endpoint/service/service.go b/state_transfer/endpoint/service/service.go index 7bc28cc..89dccb5 100644 --- a/state_transfer/endpoint/service/service.go +++ b/state_transfer/endpoint/service/service.go @@ -6,6 +6,7 @@ import ( "github.com/konveyor/crane-lib/state_transfer/endpoint" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" @@ -137,7 +138,7 @@ func (s *ServiceEndpoint) createService(c client.Client) error { } err := c.Create(context.TODO(), &service, &client.CreateOptions{}) - if err != nil { + if err != nil && !k8serrors.IsAlreadyExists(err) { return err } diff --git a/state_transfer/example_test.go b/state_transfer/example_test.go index 987616b..f461a5f 100644 --- a/state_transfer/example_test.go +++ b/state_transfer/example_test.go @@ -2,24 +2,31 @@ package state_transfer_test import ( "context" + "fmt" "log" + "testing" "time" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2/klogr" - "github.com/konveyor/crane-lib/state_transfer" "github.com/konveyor/crane-lib/state_transfer/endpoint" "github.com/konveyor/crane-lib/state_transfer/endpoint/route" "github.com/konveyor/crane-lib/state_transfer/meta" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + statetransfermeta "github.com/konveyor/crane-lib/state_transfer/meta" "github.com/konveyor/crane-lib/state_transfer/transfer" "github.com/konveyor/crane-lib/state_transfer/transfer/rclone" "github.com/konveyor/crane-lib/state_transfer/transfer/rsync" "github.com/konveyor/crane-lib/state_transfer/transport" "github.com/konveyor/crane-lib/state_transfer/transport/stunnel" + routev1 "github.com/openshift/api/route/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -33,28 +40,15 @@ var ( // This example shows how to wire up the components of the lib to // transfer data from one PVC to another -func Example_basicTransfer() { - srcClient, err := client.New(srcCfg, client.Options{Scheme: runtime.NewScheme()}) - if err != nil { - log.Fatal(err, "unable to create source client") - } - - destClient, err := client.New(destCfg, client.Options{Scheme: runtime.NewScheme()}) - if err != nil { - log.Fatal(err, "unable to create destination client") - } - - // quiesce the applications if needed on the source side - err = state_transfer.QuiesceApplications(srcCfg, srcNamespace) - if err != nil { - log.Fatal(err, "unable to quiesce application on source cluster") - } +func TestExample_basicTransfer(t *testing.T) { + srcClient := buildTestClient(createPvc(srcPVC, srcNamespace)) + destClient := buildTestClient() // set up the PVC on destination to receive the data pvc := &corev1.PersistentVolumeClaim{} - err = srcClient.Get(context.TODO(), client.ObjectKey{Namespace: srcNamespace, Name: srcPVC}, pvc) + err := srcClient.Get(context.TODO(), client.ObjectKey{Namespace: srcNamespace, Name: srcPVC}, pvc) if err != nil { - log.Fatal(err, "unable to get source PVC") + t.Fatalf("unable to get source PVC: %v", err) } destPVC := pvc.DeepCopy() @@ -64,35 +58,57 @@ func Example_basicTransfer() { pvc.Annotations = map[string]string{} err = destClient.Create(context.TODO(), destPVC, &client.CreateOptions{}) if err != nil { - log.Fatal(err, "unable to create destination PVC") + t.Fatalf("unable to create destination PVC: %v", err) } - pvcList, err := transfer.NewPVCPairList( + pvcList, err := transfer.NewFilesystemPVCPairList( transfer.NewPVCPair(pvc, destPVC), ) if err != nil { - log.Fatal(err, "invalid pvc list") + t.Fatalf("invalid pvc list: %v", err) } // create a route for data transfer r := route.NewEndpoint( types.NamespacedName{ - Namespace: pvc.Name, - Name: pvc.Namespace, - }, route.EndpointTypePassthrough, statetransfermeta.Labels, "") + Namespace: pvc.Namespace, + Name: pvc.Name, + }, route.EndpointTypePassthrough, statetransfermeta.Labels, "test.domain") e, err := endpoint.Create(r, destClient) if err != nil { - log.Fatal(err, "unable to create route endpoint") + t.Fatalf("unable to create route endpoint: %v", err) } - _ = wait.PollUntil(time.Second*5, func() (done bool, err error) { - ready, err := e.IsHealthy(destClient) - if err != nil { - log.Println(err, "unable to check route health, retrying...") - return false, nil - } - return ready, nil - }, make(<-chan struct{})) + route := &routev1.Route{} + // Mark the route as admitted. + err = destClient.Get(context.TODO(), client.ObjectKey{Namespace: destPVC.Namespace, Name: destPVC.Name}, route) + if err != nil { + t.Fatalf("unable to get route: %v, %s/%s", err, destPVC.Namespace, destPVC.Name) + } + route.Status = routev1.RouteStatus{ + Ingress: []routev1.RouteIngress{ + { + Conditions: []routev1.RouteIngressCondition{ + { + Type: routev1.RouteAdmitted, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + err = destClient.Status().Update(context.TODO(), route) + if err != nil { + t.Fatalf("unable to update route status: %v", err) + } + + ready, err := e.IsHealthy(destClient) + if err != nil { + t.Fatalf("unable to check route health: %v", err) + } + if !ready { + t.Fatalf("route is not ready") + } // create an stunnel transport to carry the data over the route s := stunnel.NewTransport(statetransfermeta.NewNamespacedPair( @@ -101,25 +117,25 @@ func Example_basicTransfer() { types.NamespacedName{ Name: destPVC.Name, Namespace: destPVC.Namespace}, ), &transport.Options{}) - _, err = transport.CreateServer(s, destClient, e) + _, err = transport.CreateServer(s, destClient, "fs", e) if err != nil { - log.Fatal(err, "error creating stunnel server") + t.Fatalf("error creating stunnel server: %v", err) } - _, err = transport.CreateClient(s, srcClient, e) + s, err = transport.CreateClient(s, srcClient, "fs", e) if err != nil { - log.Fatal(err, "error creating stunnel client") + t.Fatalf("error creating stunnel client: %v", err) } // Create Rclone Transfer Pod - t, err := rclone.NewTransfer(s, r, srcCfg, destCfg, pvcList) + tr, err := rclone.NewTransfer(s, r, srcClient, destClient, pvcList) if err != nil { - log.Fatal(err, "errror creating rclone transfer") + t.Fatalf("errror creating rclone transfer: %v", err) } - err = transfer.CreateServer(t) + err = transfer.CreateServer(tr) if err != nil { - log.Fatal(err, "error creating rclone server") + t.Fatalf("error creating rclone server: %v", err) } // Rsync Example @@ -130,7 +146,7 @@ func Example_basicTransfer() { } rsyncTransferOptions = append(rsyncTransferOptions, customTransferOptions...) - rsyncTransfer, err := rsync.NewTransfer(s, r, srcCfg, destCfg, pvcList, rsyncTransferOptions...) + rsyncTransfer, err := rsync.NewTransfer(s, r, srcClient, destClient, pvcList, klogr.New(), rsyncTransferOptions...) if err != nil { log.Fatal(err, "error creating rsync transfer") } else { @@ -138,7 +154,7 @@ func Example_basicTransfer() { } // Create Rclone Client Pod - err = transfer.CreateClient(t) + err = transfer.CreateClient(tr) if err != nil { log.Fatal(err, "error creating rclone client") } @@ -168,7 +184,7 @@ func Example_getFromCreatedObjects() { destPVC := pvc.DeepCopy() - pvcList, err := transfer.NewPVCPairList( + pvcList, err := transfer.NewFilesystemPVCPairList( transfer.NewPVCPair(pvc, destPVC), ) if err != nil { @@ -184,12 +200,12 @@ func Example_getFromCreatedObjects() { types.NamespacedName{Namespace: srcNamespace, Name: srcPVC}, types.NamespacedName{Namespace: srcNamespace, Name: srcPVC}, ) - s, err := stunnel.GetTransportFromKubeObjects(srcClient, destClient, nnPair, e, &transport.Options{}) + s, err := stunnel.GetTransportFromKubeObjects(srcClient, destClient, "fs", nnPair, e, &transport.Options{}) if err != nil { log.Fatal(err, "error getting stunnel transport") } - pvcList, err = transfer.NewPVCPairList( + pvcList, err = transfer.NewFilesystemPVCPairList( transfer.NewPVCPair(pvc, nil), ) if err != nil { @@ -197,7 +213,7 @@ func Example_getFromCreatedObjects() { } // Create Rclone Transfer Pod - t, err := rclone.NewTransfer(s, e, srcCfg, destCfg, pvcList) + t, err := rclone.NewTransfer(s, e, srcClient, destClient, pvcList) if err != nil { log.Fatal(err, "errror creating rclone transfer") } @@ -224,3 +240,27 @@ func Example_getFromCreatedObjects() { // TODO: check if the client is completed } + +func buildTestClient(objects ...runtime.Object) client.Client { + s := scheme.Scheme + schemeInitFuncs := []func(*runtime.Scheme) error{ + corev1.AddToScheme, + routev1.AddToScheme, + } + for _, f := range schemeInitFuncs { + if err := f(s); err != nil { + panic(fmt.Errorf("failed to initiate the scheme %w", err)) + } + } + + return fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() +} + +func createPvc(name, namespace string) *corev1.PersistentVolumeClaim { + return &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } +} diff --git a/state_transfer/transfer/blockrsync/blockrsync.go b/state_transfer/transfer/blockrsync/blockrsync.go new file mode 100644 index 0000000..c93fb5e --- /dev/null +++ b/state_transfer/transfer/blockrsync/blockrsync.go @@ -0,0 +1,74 @@ +package blockrsync + +import ( + "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/konveyor/crane-lib/state_transfer/endpoint" + "github.com/konveyor/crane-lib/state_transfer/transfer" + "github.com/konveyor/crane-lib/state_transfer/transport" +) + +const ( + blockrsyncImage = "quay.io/awels/blockrsync:latest" + volumeName = "volume" + BlockRsyncContainer = "blockrsync" + Proxy = "proxy" +) + +type BlockrsyncTransfer struct { + log logr.Logger + username string + password string + source client.Client + destination client.Client + pvcList transfer.PVCPairList + transport transport.Transport + endpoint endpoint.Endpoint + transferOptions *TransferOptions +} + +func NewTransfer(t transport.Transport, e endpoint.Endpoint, src client.Client, + dest client.Client, pvcList transfer.PVCPairList, log logr.Logger, options *TransferOptions) (transfer.Transfer, error) { + err := validatePVCList(pvcList) + if err != nil { + return nil, err + } + return &BlockrsyncTransfer{ + log: log, + transport: t, + endpoint: e, + source: src, + destination: dest, + pvcList: pvcList, + transferOptions: options, + }, nil +} + +func (r *BlockrsyncTransfer) PVCs() transfer.PVCPairList { + return r.pvcList +} + +func (r *BlockrsyncTransfer) Endpoint() endpoint.Endpoint { + return r.endpoint +} + +func (r *BlockrsyncTransfer) Transport() transport.Transport { + return r.transport +} + +func (r *BlockrsyncTransfer) Source() client.Client { + return r.source +} + +func (r *BlockrsyncTransfer) Destination() client.Client { + return r.destination +} + +func (r *BlockrsyncTransfer) Username() string { + return r.username +} + +func (r *BlockrsyncTransfer) Password() string { + return r.password +} diff --git a/state_transfer/transfer/blockrsync/client.go b/state_transfer/transfer/blockrsync/client.go new file mode 100644 index 0000000..b93963e --- /dev/null +++ b/state_transfer/transfer/blockrsync/client.go @@ -0,0 +1,203 @@ +package blockrsync + +import ( + "context" + "fmt" + "strconv" + "strings" + + "github.com/konveyor/crane-lib/state_transfer/transfer" + "github.com/konveyor/crane-lib/state_transfer/transport" + "github.com/konveyor/crane-lib/state_transfer/transport/stunnel" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + stunnelCommunicationVolumeName = "stunnel-communication" + stunnelCommunicationVolumePath = "/usr/share/stunnel-communication" + rsyncDoneFile = "blockrsync-done" + proxyListenPort = "9002" +) + +func (r *BlockrsyncTransfer) CreateClient(c client.Client) error { + pvc := r.pvcList[0] + + _, err := transport.CreateClient(r.Transport(), c, "block", r.Endpoint()) + if err != nil { + return err + } + + err = createBlockrsyncClient(c, r, pvc) + if err != nil { + return err + } + + return nil +} + +func createBlockrsyncClient(c client.Client, r *BlockrsyncTransfer, pvc transfer.PVCPair) error { + podLabels := r.transferOptions.SourcePodMeta.Labels + podLabels["pvc"] = pvc.Source().LabelSafeName() + + containers := []v1.Container{ + { + Name: Proxy, + ImagePullPolicy: v1.PullAlways, + Image: r.transferOptions.GetBlockrsyncClientImage(), + Command: getProxyCommand(r.Transport().Port(), pvc.Source().LabelSafeName()), + VolumeMounts: []v1.VolumeMount{ + { + Name: stunnelCommunicationVolumeName, + MountPath: stunnelCommunicationVolumePath, + }, + }, + }, + { + Name: BlockRsyncContainer, + ImagePullPolicy: v1.PullAlways, + Image: blockrsyncImage, + }, + } + addVolumeToContainer(pvc.Source().Claim(), pvc.Source().LabelSafeName(), pvc.Source().LabelSafeName(), &containers[1]) + containers[1].Command = getBlockrsyncCommand(proxyListenPort, containers[1].Env[0].Value) + + customizeTransportContainers(r.Transport().Type(), r.transport.ClientContainers()) + containers = append(containers, r.Transport().ClientContainers()...) + + volumes := []v1.Volume{ + { + Name: pvc.Source().LabelSafeName(), + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Source().Claim().Name, + }, + }, + }, + { + Name: stunnelCommunicationVolumeName, + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{Medium: v1.StorageMediumDefault}, + }, + }, + } + + volumes = append(volumes, r.Transport().ClientVolumes()...) + + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "blockrsync-", + Namespace: pvc.Source().Claim().Namespace, + Labels: podLabels, + }, + Spec: v1.PodSpec{ + Containers: containers, + Volumes: volumes, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + } + + return c.Create(context.TODO(), &pod, &client.CreateOptions{}) +} + +func getProxyCommand(port int32, identifier string) []string { + proxyCommand := []string{"/proxy", + "--source", + "--target-address", + "localhost", + "--identifier", + identifier, + "--listen-port", + proxyListenPort, + "--target-port", + strconv.Itoa(int(port)), + "--control-file", + fmt.Sprintf("%s/%s", stunnelCommunicationVolumePath, rsyncDoneFile), + } + return []string{ + "/bin/bash", + "-c", + strings.Join(proxyCommand, " "), + } +} + +func getBlockrsyncCommand(port, file string) []string { + proxyCommand := []string{"/blockrsync", + file, + "--source", + "--target-address", + "localhost", + "--port", + port, + "--zap-log-level", + "3", + "--block-size", + "131072", + } + return []string{ + "/bin/bash", + "-c", + strings.Join(proxyCommand, " "), + } +} + +func addVolumeToContainer(pvc *v1.PersistentVolumeClaim, header, identifier string, container *v1.Container) { + sourceVolumeMode := v1.PersistentVolumeFilesystem + if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock { + sourceVolumeMode = v1.PersistentVolumeBlock + } + if sourceVolumeMode == v1.PersistentVolumeFilesystem { + container.Env = append(container.Env, v1.EnvVar{ + Name: fmt.Sprintf("id-%s", header), + Value: fmt.Sprintf("/mnt/%s/disk.img", identifier), + }) + container.VolumeMounts = append(container.VolumeMounts, v1.VolumeMount{ + Name: identifier, + MountPath: fmt.Sprintf("/mnt/%s", identifier), + }) + } else { + container.Env = append(container.Env, v1.EnvVar{ + Name: fmt.Sprintf("id-%s", header), + Value: fmt.Sprintf("/dev/%s", identifier), + }) + container.VolumeDevices = append(container.VolumeDevices, v1.VolumeDevice{ + Name: identifier, + DevicePath: fmt.Sprintf("/dev/%s", identifier), + }) + } +} + +func customizeTransportContainers(t transport.TransportType, containers []v1.Container) { + switch t { + case stunnel.TransportTypeStunnel: + var stunnelContainer *v1.Container + for i := range containers { + c := &containers[i] + if c.Name == stunnel.StunnelContainer { + stunnelContainer = c + } + } + stunnelContainer.Command = []string{ + "/bin/bash", + "-c", + `/bin/stunnel /etc/stunnel/stunnel.conf +while true +do test -f /usr/share/stunnel-communication/blockrsync-done +if [ $? -eq 0 ] +then + break +else + sleep 1 +fi +done +exit 0`, + } + stunnelContainer.VolumeMounts = append( + stunnelContainer.VolumeMounts, + v1.VolumeMount{ + Name: stunnelCommunicationVolumeName, + MountPath: stunnelCommunicationVolumePath, + }) + } +} diff --git a/state_transfer/transfer/blockrsync/client_test.go b/state_transfer/transfer/blockrsync/client_test.go new file mode 100644 index 0000000..fc8424a --- /dev/null +++ b/state_transfer/transfer/blockrsync/client_test.go @@ -0,0 +1,42 @@ +package blockrsync + +import ( + "context" + "testing" + + "github.com/konveyor/crane-lib/state_transfer/transfer" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestCreateClient(t *testing.T) { + transferOptions := &TransferOptions{ + SourcePodMeta: transfer.ResourceMetadata{ + Labels: map[string]string{}, + }, + blockrsyncClientImage: "does.io/clientimage:latest", + } + + tr, srcClient, _ := createTransfer(transferOptions, t) + if err := tr.CreateClient(srcClient); err != nil { + t.Fatalf("unable to create client: %v", err) + } + + clientPodList := &corev1.PodList{} + if err := srcClient.List(context.TODO(), clientPodList, &client.ListOptions{ + Namespace: testNamespace, + LabelSelector: labels.SelectorFromSet(labels.Set{ + "pvc": "test-pvc", + }), + }); err != nil { + t.Fatalf("unable to get server pod: %v", err) + } + if len(clientPodList.Items) != 1 { + t.Fatalf("client pod not found") + } + clientPod := clientPodList.Items[0] + if clientPod.Spec.Containers[0].Image != transferOptions.blockrsyncClientImage { + t.Fatalf("client pod image not set correctly") + } +} diff --git a/state_transfer/transfer/blockrsync/options.go b/state_transfer/transfer/blockrsync/options.go new file mode 100644 index 0000000..b37e9ee --- /dev/null +++ b/state_transfer/transfer/blockrsync/options.go @@ -0,0 +1,26 @@ +package blockrsync + +import "github.com/konveyor/crane-lib/state_transfer/transfer" + +type TransferOptions struct { + SourcePodMeta transfer.ResourceMetadata + DestinationPodMeta transfer.ResourceMetadata + username string + password string + blockrsyncServerImage string + blockrsyncClientImage string +} + +func (t *TransferOptions) GetBlockrsyncServerImage() string { + if t.blockrsyncServerImage == "" { + return blockrsyncImage + } + return t.blockrsyncServerImage +} + +func (t *TransferOptions) GetBlockrsyncClientImage() string { + if t.blockrsyncClientImage == "" { + return blockrsyncImage + } + return t.blockrsyncClientImage +} diff --git a/state_transfer/transfer/blockrsync/server.go b/state_transfer/transfer/blockrsync/server.go new file mode 100644 index 0000000..fa30719 --- /dev/null +++ b/state_transfer/transfer/blockrsync/server.go @@ -0,0 +1,117 @@ +package blockrsync + +import ( + "context" + "fmt" + "strconv" + "strings" + + "github.com/konveyor/crane-lib/state_transfer/endpoint" + "github.com/konveyor/crane-lib/state_transfer/transfer" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + blockrsyncServerPodName = "blockrsync-server" +) + +func (r *BlockrsyncTransfer) CreateServer(c client.Client) error { + err := r.createBlockrysncServer(c) + if err != nil { + return err + } + + _, err = endpoint.Create(r.Endpoint(), c) + return err +} + +func (r *BlockrsyncTransfer) IsServerHealthy(c client.Client) (bool, error) { + deploymentLabels := r.Endpoint().Labels() + deploymentLabels["pvc"] = r.pvcList[0].Destination().LabelSafeName() + return transfer.AreFilteredPodsHealthy(c, r.pvcList.GetDestinationNamespaces()[0], deploymentLabels) +} + +func (r *BlockrsyncTransfer) createBlockrysncServer(c client.Client) error { + pvcs := r.PVCs() + destNs := r.pvcList.GetDestinationNamespaces()[0] + containers := make([]v1.Container, 0) + volumes := make([]v1.Volume, 0) + blockRsyncCommand := []string{"/proxy", + "--target", + "--listen-port", + strconv.Itoa(int(r.Transport().ExposedPort())), + "--blockrsync-path", + "/blockrsync", + "--control-file", + fmt.Sprintf("%s/%s", stunnelCommunicationVolumePath, rsyncDoneFile), + "--block-size", + "131072", + } + container := v1.Container{ + Name: BlockRsyncContainer, + ImagePullPolicy: v1.PullAlways, + Image: r.transferOptions.GetBlockrsyncServerImage(), + Ports: []v1.ContainerPort{ + { + Name: "blockrsync", + Protocol: v1.ProtocolTCP, + ContainerPort: r.Transport().ExposedPort(), + }, + }, + VolumeMounts: []v1.VolumeMount{ + { + Name: stunnelCommunicationVolumeName, + MountPath: stunnelCommunicationVolumePath, + }, + }, + } + for _, pvc := range pvcs { + blockRsyncCommand = append(blockRsyncCommand, "--identifier", pvc.Source().LabelSafeName()) + addVolumeToContainer(pvc.Destination().Claim(), pvc.Source().LabelSafeName(), pvc.Destination().LabelSafeName(), &container) + volumes = append(volumes, v1.Volume{ + Name: pvc.Destination().LabelSafeName(), + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Destination().Claim().Name, + }, + }, + }) + } + blockRsyncContainerCommand := []string{ + "/bin/bash", + "-c", + strings.Join(blockRsyncCommand, " "), + } + container.Command = blockRsyncContainerCommand + containers = append(containers, container) + + containers = append(containers, r.Transport().ServerContainers()...) + + volumes = append(volumes, v1.Volume{ + Name: stunnelCommunicationVolumeName, + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{Medium: v1.StorageMediumDefault}, + }, + }) + + volumes = append(volumes, r.Transport().ServerVolumes()...) + + server := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: blockrsyncServerPodName, + Namespace: destNs, + Labels: r.transferOptions.SourcePodMeta.Labels, + }, + Spec: v1.PodSpec{ + Containers: containers, + Volumes: volumes, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + } + + return c.Create(context.TODO(), &server, &client.CreateOptions{}) +} diff --git a/state_transfer/transfer/blockrsync/server_test.go b/state_transfer/transfer/blockrsync/server_test.go new file mode 100644 index 0000000..191f8ed --- /dev/null +++ b/state_transfer/transfer/blockrsync/server_test.go @@ -0,0 +1,161 @@ +package blockrsync + +import ( + "context" + "fmt" + "testing" + + routev1 "github.com/openshift/api/route/v1" + "k8s.io/klog/v2/klogr" + + "github.com/konveyor/crane-lib/state_transfer/endpoint" + "github.com/konveyor/crane-lib/state_transfer/endpoint/route" + statetransfermeta "github.com/konveyor/crane-lib/state_transfer/meta" + "github.com/konveyor/crane-lib/state_transfer/transfer" + "github.com/konveyor/crane-lib/state_transfer/transport/null" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + corev1 "k8s.io/api/core/v1" +) + +const ( + testNamespace = "test-namespace" + testRouteName = "test-route" +) + +func TestCreateServer(t *testing.T) { + transferOptions := &TransferOptions{ + blockrsyncServerImage: "does.io/serverimage:latest", + } + + tr, _, destClient := createTransfer(transferOptions, t) + if err := tr.CreateServer(destClient); err != nil { + t.Fatalf("CreateServer should not return an error\n %v", err) + } + // Do it again, should create an error this time due to already existing resource. + if err := tr.CreateServer(destClient); err == nil { + t.Fatalf("CreateServer should return an error") + } + + serverPod := &corev1.Pod{} + if err := destClient.Get(context.TODO(), client.ObjectKey{Namespace: testNamespace, Name: blockrsyncServerPodName}, serverPod); err != nil { + t.Fatalf("unable to get server pod: %v", err) + } + if serverPod.Spec.Containers[0].Image != transferOptions.blockrsyncServerImage { + t.Fatalf("server pod image not set correctly") + } + + // This will return an error since the pod status is not set in a unit test + if _, err := tr.IsServerHealthy(destClient); err == nil { + t.Fatalf("IsServerHealthy should return an error\n") + } +} + +func buildTestClient(objects ...runtime.Object) client.Client { + s := scheme.Scheme + schemeInitFuncs := []func(*runtime.Scheme) error{ + corev1.AddToScheme, + routev1.AddToScheme, + } + for _, f := range schemeInitFuncs { + if err := f(s); err != nil { + panic(fmt.Errorf("failed to initiate the scheme %w", err)) + } + } + + return fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() +} + +func createEndpoint(t *testing.T, name, namespace string, c client.Client) endpoint.Endpoint { + // create a route for data transfer + r := route.NewEndpoint( + types.NamespacedName{ + Namespace: namespace, + Name: name, + }, route.EndpointTypePassthrough, statetransfermeta.Labels, "test.domain") + e, err := endpoint.Create(r, c) + if err != nil { + t.Fatalf("unable to create route endpoint: %v", err) + } + + route := &routev1.Route{} + // Mark the route as admitted. + err = c.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: name}, route) + if err != nil { + t.Fatalf("unable to get route: %v, %s/%s", err, namespace, name) + } + route.Status = routev1.RouteStatus{ + Ingress: []routev1.RouteIngress{ + { + Conditions: []routev1.RouteIngressCondition{ + { + Type: routev1.RouteAdmitted, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + err = c.Status().Update(context.TODO(), route) + if err != nil { + t.Fatalf("unable to update route status: %v", err) + } + + ready, err := e.IsHealthy(c) + if err != nil { + t.Fatalf("unable to check route health: %v", err) + } + if !ready { + t.Fatalf("route is not ready") + } + return r +} + +func createTransfer(transferOptions *TransferOptions, t *testing.T) (*BlockrsyncTransfer, client.Client, client.Client) { + srcClient := buildTestClient() + destClient := buildTestClient() + e := createEndpoint(t, testRouteName, testNamespace, destClient) + if e == nil { + t.Fatalf("unable to create endpoint") + } + transport := null.NewTransport(&testNamespacedNamePair{}) + log := klogr.New() + pvcList := transfer.PVCPairList{ + &testPVCPair{ + source: &testPVC{ + label: testPVCName, + pvc: createPVC(testPVCName, testNamespace, &block), + }, + dest: &testPVC{ + label: testPVCName, + pvc: createPVC(testPVCName, testNamespace, &block), + }, + }, + } + tr, err := NewTransfer(transport, e, srcClient, destClient, pvcList, log, transferOptions) + if err != nil { + t.Fatalf("NewTransfer should not return an error\n %v", err) + } + if tr == nil { + t.Fatalf("NewTransfer should return a valid transfer") + } + + return tr.(*BlockrsyncTransfer), srcClient, destClient +} + +type testNamespacedNamePair struct { + src types.NamespacedName + dst types.NamespacedName +} + +func (t *testNamespacedNamePair) Source() types.NamespacedName { + return t.src +} + +func (t *testNamespacedNamePair) Destination() types.NamespacedName { + return t.dst +} diff --git a/state_transfer/transfer/blockrsync/validation.go b/state_transfer/transfer/blockrsync/validation.go new file mode 100644 index 0000000..226aaf2 --- /dev/null +++ b/state_transfer/transfer/blockrsync/validation.go @@ -0,0 +1,81 @@ +package blockrsync + +import ( + "fmt" + + "github.com/konveyor/crane-lib/state_transfer/transfer" + corev1 "k8s.io/api/core/v1" + errorsutil "k8s.io/apimachinery/pkg/util/errors" + validation "k8s.io/apimachinery/pkg/util/validation" +) + +const ( + kubeVirtAnnKey = "cdi.kubevirt.io/storage.contentType" + kubevirtContentType = "kubevirt" +) + +// validatePVCList validates list of PVCs provided to blockrsync transfer +// list cannot contain pvcs belonging to two or more source/destination namespaces +// list must contain at exactly one pvc +// labelSafeNames of all pvcs must be valid label values +// labelSafeNames must be unique within the namespace of the pvc +// volume mode must be block or filesystem if the pvc has an annotation that indicates +// it is a kubevirt disk pvc. +func validatePVCList(pvcList transfer.PVCPairList) error { + validationErrors := []error{} + + srcNamespaces := pvcList.GetSourceNamespaces() + destNamespaces := pvcList.GetDestinationNamespaces() + if len(srcNamespaces) > 1 || len(destNamespaces) > 1 { + validationErrors = append(validationErrors, + fmt.Errorf("rsync transfer does not support migrating PVCs belonging to multiple source/destination namespaces")) + } + + if len(pvcList) == 0 { + validationErrors = append(validationErrors, fmt.Errorf("at least one pvc must be provided")) + } else { + if err := validatePVCName(pvcList[0]); err != nil { + validationErrors = append( + validationErrors, + errorsutil.NewAggregate([]error{ + fmt.Errorf("pvc name validation failed for pvc %s with error", pvcList[0].Source().Claim().Name), + err, + })) + } + } + return errorsutil.NewAggregate(validationErrors) +} + +// validatePVCName validates pvc names for blockrsync transfer +func validatePVCName(pvcPair transfer.PVCPair) error { + validationErrors := []error{} + if errs := validation.IsValidLabelValue(pvcPair.Source().LabelSafeName()); len(errs) > 0 { + validationErrors = append(validationErrors, + fmt.Errorf("labelSafeName() for %s must be a valid label value", pvcPair.Source().Claim().Name)) + } + if errs := validation.IsValidLabelValue(pvcPair.Destination().LabelSafeName()); len(errs) > 0 { + validationErrors = append(validationErrors, + fmt.Errorf("labelSafeName() for %s must be a valid label value", pvcPair.Destination().Claim().Name)) + } + if err := isBlockOrKubeVirtDisk(pvcPair.Source().Claim()); err != nil { + validationErrors = append(validationErrors, err) + } + if err := isBlockOrKubeVirtDisk(pvcPair.Destination().Claim()); err != nil { + validationErrors = append(validationErrors, err) + } + pvcPair.Source().Claim() + return errorsutil.NewAggregate(validationErrors) +} + +func isPVCBlock(pvc *corev1.PersistentVolumeClaim) bool { + return pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == corev1.PersistentVolumeBlock +} + +func isBlockOrKubeVirtDisk(pvc *corev1.PersistentVolumeClaim) error { + if !isPVCBlock(pvc) { + if v, ok := pvc.GetAnnotations()[kubeVirtAnnKey]; !ok || v != kubevirtContentType { + return fmt.Errorf("%s is not a block, or VM disk volume", pvc.Name) + } + } + return nil +} diff --git a/state_transfer/transfer/blockrsync/validation_test.go b/state_transfer/transfer/blockrsync/validation_test.go new file mode 100644 index 0000000..0514222 --- /dev/null +++ b/state_transfer/transfer/blockrsync/validation_test.go @@ -0,0 +1,167 @@ +package blockrsync + +import ( + "strings" + "testing" + + "github.com/konveyor/crane-lib/state_transfer/transfer" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + block = corev1.PersistentVolumeBlock + fileSystem = corev1.PersistentVolumeFilesystem +) + +const ( + testPVCName = "test-pvc" +) + +func TestIsBlockOrVM(t *testing.T) { + if err := isBlockOrKubeVirtDisk(createPVC(testPVCName, testNamespace, &block)); err != nil { + t.Errorf("isBlockOrKubeVirtDisk() should return nil, %v", err) + } + fsPvc := createPVC(testPVCName, testNamespace, &fileSystem) + if err := isBlockOrKubeVirtDisk(fsPvc); err == nil { + t.Errorf("isBlockOrKubeVirtDisk() should not return nil") + } + fsPvc.Annotations = map[string]string{ + kubeVirtAnnKey: kubevirtContentType, + } + if err := isBlockOrKubeVirtDisk(fsPvc); err != nil { + t.Errorf("isBlockOrKubeVirtDisk() should return nil, %v", err) + } +} + +func TestValidatePVCName(t *testing.T) { + pvcPair := transfer.NewPVCPair(createPVC(testPVCName, testNamespace, &block), createPVC(testPVCName, testNamespace, &block)) + if err := validatePVCName(pvcPair); err != nil { + t.Errorf("validatePVCName() should return nil, %v", err) + } + pvcPair = transfer.NewPVCPair(createPVC(testPVCName, testNamespace, &fileSystem), createPVC("test-pvc-2", testNamespace, &fileSystem)) + if err := validatePVCName(pvcPair); err == nil { + t.Errorf("validatePVCName() should not return nil") + } + + pvcPair = &testPVCPair{ + source: &testPVC{ + label: strings.Repeat("a", 64), + pvc: createPVC(testPVCName, testNamespace, &block), + }, + dest: &testPVC{ + label: strings.Repeat("a", 64), + pvc: createPVC("test-pvc-2", testNamespace, &block), + }, + } + if err := validatePVCName(pvcPair); err == nil { + t.Errorf("validatePVCName() should not return nil") + } +} + +func TestValidatePVCList(t *testing.T) { + pvcList := transfer.PVCPairList{ + &testPVCPair{ + source: &testPVC{ + label: testPVCName, + pvc: createPVC(testPVCName, testNamespace, &block), + }, + dest: &testPVC{ + label: testPVCName, + pvc: createPVC(testPVCName, testNamespace, &block), + }, + }, + } + if err := validatePVCList(pvcList); err != nil { + t.Errorf("validatePVCList() should return nil, %v", err) + } + + pvcList = transfer.PVCPairList{ + &testPVCPair{ + source: &testPVC{ + label: testPVCName, + pvc: createPVC(testPVCName, testNamespace, &block), + }, + dest: &testPVC{ + label: testPVCName, + pvc: createPVC(testPVCName, testNamespace, &block), + }, + }, + &testPVCPair{ + source: &testPVC{ + label: testPVCName, + pvc: createPVC(testPVCName, "test-namespace2", &block), + }, + dest: &testPVC{ + label: testPVCName, + pvc: createPVC(testPVCName, "test-namespace2", &block), + }, + }, + } + if err := validatePVCList(pvcList); err == nil { + t.Errorf("validatePVCList() should not return nil") + } + + pvcList = transfer.PVCPairList{ + &testPVCPair{ + source: &testPVC{ + label: testPVCName, + pvc: createPVC(testPVCName, testNamespace, &fileSystem), + }, + dest: &testPVC{ + label: testPVCName, + pvc: createPVC(testPVCName, testNamespace, &block), + }, + }, + } + if err := validatePVCList(pvcList); err == nil { + t.Errorf("validatePVCList() should not return nil") + } + + pvcList = transfer.PVCPairList{} + if err := validatePVCList(pvcList); err == nil { + t.Errorf("validatePVCList() should not return nil") + } + +} + +func createPVC(name, namespace string, volumeMode *corev1.PersistentVolumeMode) *corev1.PersistentVolumeClaim { + return &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + VolumeMode: volumeMode, + }, + } +} + +type testPVCPair struct { + source *testPVC + dest *testPVC +} + +func (p *testPVCPair) Source() transfer.PVC { + return p.source +} + +func (p *testPVCPair) Destination() transfer.PVC { + return p.dest +} + +type testPVC struct { + label string + pvc *corev1.PersistentVolumeClaim +} + +func (p *testPVC) LabelSafeName() string { + return p.label +} + +func (p *testPVC) Claim() *corev1.PersistentVolumeClaim { + return p.pvc +} diff --git a/state_transfer/transfer/pvc_list.go b/state_transfer/transfer/pvc_list.go index 8295535..23aeedd 100644 --- a/state_transfer/transfer/pvc_list.go +++ b/state_transfer/transfer/pvc_list.go @@ -9,6 +9,11 @@ import ( "k8s.io/apimachinery/pkg/types" ) +const ( + kubeVirtAnnKey = "cdi.kubevirt.io/storage.contentType" + kubevirtContentType = "kubevirt" +) + // PVCPairList defines a managed list of PVCPair type PVCPairList []PVCPair @@ -24,6 +29,9 @@ func (p pvc) Claim() *v1.PersistentVolumeClaim { // LabelSafeName returns a name which is guaranteed to be a safe label value func (p pvc) LabelSafeName() string { + if p.p == nil { + return "" + } return getMD5Hash(p.p.Name) } @@ -58,8 +66,34 @@ func NewPVCPair(src *v1.PersistentVolumeClaim, dest *v1.PersistentVolumeClaim) P return newPvcPair } -// NewPVCPairList when given a list of PVCPair, returns a managed list -func NewPVCPairList(pvcs ...PVCPair) (PVCPairList, error) { +// NewFilesystemPVCPairList when given a list of PVCPair, returns a managed list +func NewBlockOrVMDiskPVCPairList(pvcs ...PVCPair) (PVCPairList, error) { + pvcList := PVCPairList{} + for _, p := range pvcs { + newPvc := pvcPair{} + if p.Source() == nil { + return nil, fmt.Errorf("source pvc definition cannot be nil") + } + newPvc.src = p.Source() + if p.Destination() == nil { + newPvc.dest = p.Source() + } else { + newPvc.dest = p.Destination() + } + + if isBlockOrVMDisk(newPvc.src.Claim()) && isBlockOrVMDisk(newPvc.dest.Claim()) { + pvcList = append(pvcList, &newPvc) + } + if isBlockOrVMDisk(newPvc.src.Claim()) && !isBlockOrVMDisk(newPvc.dest.Claim()) || + !isBlockOrVMDisk(newPvc.src.Claim()) && isBlockOrVMDisk(newPvc.dest.Claim()) { + return nil, fmt.Errorf("source and destination must be the same type of volume") + } + } + return pvcList, nil +} + +// NewFilesystemPVCPairList when given a list of PVCPair, returns a managed list +func NewFilesystemPVCPairList(pvcs ...PVCPair) (PVCPairList, error) { pvcList := PVCPairList{} for _, p := range pvcs { newPvc := pvcPair{} @@ -72,19 +106,41 @@ func NewPVCPairList(pvcs ...PVCPair) (PVCPairList, error) { } else { newPvc.dest = p.Destination() } - pvcList = append(pvcList, &newPvc) + + if !isBlockOrVMDisk(newPvc.src.Claim()) && !isBlockOrVMDisk(newPvc.dest.Claim()) { + pvcList = append(pvcList, &newPvc) + } + if isBlockOrVMDisk(newPvc.src.Claim()) && !isBlockOrVMDisk(newPvc.dest.Claim()) || + !isBlockOrVMDisk(newPvc.src.Claim()) && isBlockOrVMDisk(newPvc.dest.Claim()) { + return nil, fmt.Errorf("source and destination must be the same type of volume") + } } return pvcList, nil } +func isBlockOrVMDisk(pvc *v1.PersistentVolumeClaim) bool { + if pvc == nil { + return false + } + isBlock := pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock + if !isBlock { + if v, ok := pvc.GetAnnotations()[kubeVirtAnnKey]; !ok || v != kubevirtContentType { + return false + } + } + return isBlock +} + // GetSourceNamespaces returns all source namespaces present in the list of pvcs func (p PVCPairList) GetSourceNamespaces() (namespaces []string) { nsSet := map[string]bool{} for i := range p { pvcPair := p[i] - if _, exists := nsSet[pvcPair.Source().Claim().Namespace]; !exists { - nsSet[pvcPair.Source().Claim().Namespace] = true - namespaces = append(namespaces, pvcPair.Source().Claim().Namespace) + if pvcPair != nil && pvcPair.Source() != nil && pvcPair.Source().Claim() != nil { + if _, exists := nsSet[pvcPair.Source().Claim().Namespace]; !exists { + nsSet[pvcPair.Source().Claim().Namespace] = true + namespaces = append(namespaces, pvcPair.Source().Claim().Namespace) + } } } return @@ -95,9 +151,11 @@ func (p PVCPairList) GetDestinationNamespaces() (namespaces []string) { nsSet := map[string]bool{} for i := range p { pvcPair := p[i] - if _, exists := nsSet[pvcPair.Destination().Claim().Namespace]; !exists { - nsSet[pvcPair.Destination().Claim().Namespace] = true - namespaces = append(namespaces, pvcPair.Destination().Claim().Namespace) + if pvcPair != nil && pvcPair.Source() != nil && pvcPair.Source().Claim() != nil { + if _, exists := nsSet[pvcPair.Destination().Claim().Namespace]; !exists { + nsSet[pvcPair.Destination().Claim().Namespace] = true + namespaces = append(namespaces, pvcPair.Destination().Claim().Namespace) + } } } return diff --git a/state_transfer/transfer/rclone/client.go b/state_transfer/transfer/rclone/client.go index 98baa09..f35e3e5 100644 --- a/state_transfer/transfer/rclone/client.go +++ b/state_transfer/transfer/rclone/client.go @@ -29,7 +29,7 @@ func (r *RcloneTransfer) CreateClient(c client.Client) error { return err } - _, err = transport.CreateClient(r.Transport(), c, r.Endpoint()) + _, err = transport.CreateClient(r.Transport(), c, "", r.Endpoint()) if err != nil { return err } diff --git a/state_transfer/transfer/rclone/rclone.go b/state_transfer/transfer/rclone/rclone.go index 8f3dc50..ef5946d 100644 --- a/state_transfer/transfer/rclone/rclone.go +++ b/state_transfer/transfer/rclone/rclone.go @@ -4,7 +4,7 @@ import ( "github.com/konveyor/crane-lib/state_transfer/endpoint" "github.com/konveyor/crane-lib/state_transfer/transfer" "github.com/konveyor/crane-lib/state_transfer/transport" - "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -17,15 +17,15 @@ const ( type RcloneTransfer struct { username string password string - source *rest.Config - destination *rest.Config + source client.Client + destination client.Client pvcList transfer.PVCPairList transport transport.Transport endpoint endpoint.Endpoint port int32 } -func NewTransfer(t transport.Transport, e endpoint.Endpoint, src *rest.Config, dest *rest.Config, pvcList transfer.PVCPairList) (transfer.Transfer, error) { +func NewTransfer(t transport.Transport, e endpoint.Endpoint, src client.Client, dest client.Client, pvcList transfer.PVCPairList) (transfer.Transfer, error) { err := validatePVCList(pvcList) if err != nil { return nil, err @@ -51,11 +51,11 @@ func (r *RcloneTransfer) Transport() transport.Transport { return r.transport } -func (r *RcloneTransfer) Source() *rest.Config { +func (r *RcloneTransfer) Source() client.Client { return r.source } -func (r *RcloneTransfer) Destination() *rest.Config { +func (r *RcloneTransfer) Destination() client.Client { return r.destination } diff --git a/state_transfer/transfer/rsync/client.go b/state_transfer/transfer/rsync/client.go index 49324be..748d861 100644 --- a/state_transfer/transfer/rsync/client.go +++ b/state_transfer/transfer/rsync/client.go @@ -45,10 +45,15 @@ func createRsyncClient(c client.Client, r *RsyncTransfer, ns string) error { } podLabels := transferOptions.SourcePodMeta.Labels for _, pvc := range r.pvcList.InSourceNamespace(ns) { + fileSystemCount := 0 // create Rsync command for PVC rsyncCommand := []string{"/usr/bin/rsync"} rsyncCommand = append(rsyncCommand, rsyncOptions...) - rsyncCommand = append(rsyncCommand, fmt.Sprintf("%s/", getMountPathForPVC(pvc.Source()))) + isFileSystem := pvc.Source().Claim().Spec.VolumeMode == nil || *pvc.Source().Claim().Spec.VolumeMode == v1.PersistentVolumeFilesystem + if isFileSystem { + fileSystemCount++ + rsyncCommand = append(rsyncCommand, fmt.Sprintf("%s/", getMountPathForPVC(pvc.Source()))) + } rsyncCommand = append(rsyncCommand, fmt.Sprintf("rsync://%s@%s/%s --port %d", transferOptions.username, transfer.ConnectionHostname(r), @@ -76,10 +81,6 @@ func createRsyncClient(c client.Client, r *RsyncTransfer, ns string) error { }, VolumeMounts: []v1.VolumeMount{ - { - Name: "mnt", - MountPath: getMountPathForPVC(pvc.Source()), - }, { Name: "rsync-communication", MountPath: "/usr/share/rsync", @@ -87,6 +88,12 @@ func createRsyncClient(c client.Client, r *RsyncTransfer, ns string) error { }, }, } + if isFileSystem { + containers[0].VolumeMounts = append(containers[0].VolumeMounts, v1.VolumeMount{ + Name: "mnt", + MountPath: getMountPathForPVC(pvc.Source()), + }) + } // attach transport containers customizeTransportClientContainers(r.Transport()) containers = append(containers, r.Transport().ClientContainers()...) @@ -98,19 +105,21 @@ func createRsyncClient(c client.Client, r *RsyncTransfer, ns string) error { volumes := []v1.Volume{ { + Name: "rsync-communication", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{Medium: v1.StorageMediumDefault}, + }, + }, + } + if isFileSystem { + volumes = append(volumes, v1.Volume{ Name: "mnt", VolumeSource: v1.VolumeSource{ PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ ClaimName: pvc.Source().Claim().Name, }, }, - }, - { - Name: "rsync-communication", - VolumeSource: v1.VolumeSource{ - EmptyDir: &v1.EmptyDirVolumeSource{Medium: v1.StorageMediumDefault}, - }, - }, + }) } volumes = append(volumes, r.Transport().ClientVolumes()...) podSpec := v1.PodSpec{ @@ -130,8 +139,10 @@ func createRsyncClient(c client.Client, r *RsyncTransfer, ns string) error { Spec: podSpec, } - err := c.Create(context.TODO(), &pod, &client.CreateOptions{}) - errs = append(errs, err) + if fileSystemCount > 0 { + err := c.Create(context.TODO(), &pod, &client.CreateOptions{}) + errs = append(errs, err) + } } return errorsutil.NewAggregate(errs) diff --git a/state_transfer/transfer/rsync/rsync.go b/state_transfer/transfer/rsync/rsync.go index d91a9f3..3d76bf6 100644 --- a/state_transfer/transfer/rsync/rsync.go +++ b/state_transfer/transfer/rsync/rsync.go @@ -3,12 +3,13 @@ package rsync import ( "fmt" + "github.com/go-logr/logr" "github.com/konveyor/crane-lib/state_transfer/endpoint" "github.com/konveyor/crane-lib/state_transfer/meta" "github.com/konveyor/crane-lib/state_transfer/transfer" "github.com/konveyor/crane-lib/state_transfer/transport" v1 "k8s.io/api/core/v1" - "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -25,10 +26,11 @@ const ( ) type RsyncTransfer struct { + Log logr.Logger username string password string - source *rest.Config - destination *rest.Config + source client.Client + destination client.Client pvcList transfer.PVCPairList transport transport.Transport endpoint endpoint.Endpoint @@ -36,8 +38,8 @@ type RsyncTransfer struct { options TransferOptions } -func NewTransfer(t transport.Transport, e endpoint.Endpoint, src *rest.Config, dest *rest.Config, - pvcList transfer.PVCPairList, opts ...TransferOption) (transfer.Transfer, error) { +func NewTransfer(t transport.Transport, e endpoint.Endpoint, src client.Client, dest client.Client, + pvcList transfer.PVCPairList, log logr.Logger, opts ...TransferOption) (transfer.Transfer, error) { err := validatePVCList(pvcList) if err != nil { return nil, err @@ -54,6 +56,7 @@ func NewTransfer(t transport.Transport, e endpoint.Endpoint, src *rest.Config, d destination: dest, pvcList: pvcList, options: options, + Log: log, }, nil } @@ -69,11 +72,11 @@ func (r *RsyncTransfer) Transport() transport.Transport { return r.transport } -func (r *RsyncTransfer) Source() *rest.Config { +func (r *RsyncTransfer) Source() client.Client { return r.source } -func (r *RsyncTransfer) Destination() *rest.Config { +func (r *RsyncTransfer) Destination() client.Client { return r.destination } diff --git a/state_transfer/transfer/rsync/server.go b/state_transfer/transfer/rsync/server.go index 1d08085..aea3f13 100644 --- a/state_transfer/transfer/rsync/server.go +++ b/state_transfer/transfer/rsync/server.go @@ -185,12 +185,14 @@ func createRsyncServer(c client.Client, r *RsyncTransfer, ns string) error { } pvcVolumeMounts := []corev1.VolumeMount{} for _, pvc := range r.pvcList.InDestinationNamespace(ns) { - pvcVolumeMounts = append( - pvcVolumeMounts, - corev1.VolumeMount{ - Name: pvc.Destination().LabelSafeName(), - MountPath: fmt.Sprintf("/mnt/%s/%s", pvc.Destination().Claim().Namespace, pvc.Destination().LabelSafeName()), - }) + if pvc.Source().Claim().Spec.VolumeMode == nil || *pvc.Source().Claim().Spec.VolumeMode == corev1.PersistentVolumeFilesystem { + pvcVolumeMounts = append( + pvcVolumeMounts, + corev1.VolumeMount{ + Name: pvc.Destination().LabelSafeName(), + MountPath: fmt.Sprintf("/mnt/%s/%s", pvc.Destination().Claim().Namespace, pvc.Destination().LabelSafeName()), + }) + } } volumeMounts = append(volumeMounts, configVolumeMounts...) volumeMounts = append(volumeMounts, pvcVolumeMounts...) @@ -253,18 +255,22 @@ func createRsyncServer(c client.Client, r *RsyncTransfer, ns string) error { }, } pvcVolumes := []corev1.Volume{} + filesystemCount := 0 for _, pvc := range r.pvcList.InDestinationNamespace(ns) { - pvcVolumes = append( - pvcVolumes, - corev1.Volume{ - Name: pvc.Destination().LabelSafeName(), - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvc.Destination().Claim().Name, + if pvc.Source().Claim().Spec.VolumeMode == nil || *pvc.Source().Claim().Spec.VolumeMode == corev1.PersistentVolumeFilesystem { + filesystemCount++ + pvcVolumes = append( + pvcVolumes, + corev1.Volume{ + Name: pvc.Destination().LabelSafeName(), + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Destination().Claim().Name, + }, }, }, - }, - ) + ) + } } volumes := append(pvcVolumes, configVolumes...) volumes = append(volumes, r.Transport().ServerVolumes()...) @@ -285,9 +291,12 @@ func createRsyncServer(c client.Client, r *RsyncTransfer, ns string) error { Spec: podSpec, } - err := c.Create(context.TODO(), server, &client.CreateOptions{}) - if err != nil && !k8serrors.IsAlreadyExists(err) { + if filesystemCount > 0 { + err := c.Create(context.TODO(), server, &client.CreateOptions{}) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return err + } return err } - return err + return nil } diff --git a/state_transfer/transfer/transfer.go b/state_transfer/transfer/transfer.go index c49be6d..897047b 100644 --- a/state_transfer/transfer/transfer.go +++ b/state_transfer/transfer/transfer.go @@ -12,16 +12,15 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" errorsutil "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" ) // Transfer knows how to transfer PV data from a source to a destination type Transfer interface { // Source returns a source client - Source() *rest.Config + Source() client.Client // Destination returns a destination client - Destination() *rest.Config + Destination() client.Client // Endpoint returns the endpoint used by the transfer Endpoint() endpoint.Endpoint // Transport returns the transport used by the transfer @@ -46,12 +45,8 @@ func CreateServer(t Transfer) error { if err := corev1.AddToScheme(scheme); err != nil { return err } - c, err := client.New(t.Source(), client.Options{Scheme: scheme}) - if err != nil { - return err - } - err = t.CreateServer(c) + err := t.CreateServer(t.Destination()) if err != nil { return err } @@ -64,17 +59,7 @@ func DeleteServer(t Transfer) error { } func CreateClient(t Transfer) error { - c, err := client.New(t.Destination(), client.Options{}) - if err != nil { - return err - } - - err = t.CreateClient(c) - if err != nil { - return err - } - - return nil + return t.CreateClient(t.Source()) } func DeleteClient(t Transfer) error { @@ -110,7 +95,7 @@ func IsPodHealthy(c client.Client, pod client.ObjectKey) (bool, error) { func areContainersReady(pod *corev1.Pod) (bool, error) { if len(pod.Status.ContainerStatuses) != 2 { - return false, fmt.Errorf("expected two contaier statuses found %d, for pod %s", len(pod.Status.ContainerStatuses), client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}) + return false, fmt.Errorf("expected two container statuses found %d, for pod %s", len(pod.Status.ContainerStatuses), client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}) } for _, containerStatus := range pod.Status.ContainerStatuses { diff --git a/state_transfer/transport/null/client.go b/state_transfer/transport/null/client.go index b4069eb..f4cdb58 100644 --- a/state_transfer/transport/null/client.go +++ b/state_transfer/transport/null/client.go @@ -5,6 +5,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func (s *NullTransport) CreateClient(c client.Client, endpoint endpoint.Endpoint) error { +func (s *NullTransport) CreateClient(c client.Client, prefix string, endpoint endpoint.Endpoint) error { return nil } diff --git a/state_transfer/transport/null/server.go b/state_transfer/transport/null/server.go index 7d19f97..3ad63cb 100644 --- a/state_transfer/transport/null/server.go +++ b/state_transfer/transport/null/server.go @@ -5,7 +5,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func (s *NullTransport) CreateServer(c client.Client, e endpoint.Endpoint) error { +func (s *NullTransport) CreateServer(c client.Client, prefix string, e endpoint.Endpoint) error { s.direct = true s.port = e.Port() return nil diff --git a/state_transfer/transport/stunnel/client.go b/state_transfer/transport/stunnel/client.go index ab4877e..30c0489 100644 --- a/state_transfer/transport/stunnel/client.go +++ b/state_transfer/transport/stunnel/client.go @@ -47,38 +47,38 @@ const ( ` ) -func (s *StunnelTransport) CreateClient(c client.Client, e endpoint.Endpoint) error { - err := createClientResources(c, s, e) +func (s *StunnelTransport) CreateClient(c client.Client, prefix string, e endpoint.Endpoint) error { + err := createClientResources(c, s, prefix, e) return err } -func createClientResources(c client.Client, s *StunnelTransport, e endpoint.Endpoint) error { +func createClientResources(c client.Client, s *StunnelTransport, prefix string, e endpoint.Endpoint) error { errs := []error{} // assuming the name of the endpoint is the same as the name of the PVC - err := createClientConfig(c, s, e) + err := createClientConfig(c, s, prefix, e) errs = append(errs, err) - err = createClientSecret(c, s, e) + err = createClientSecret(c, s, prefix, e) errs = append(errs, err) setClientContainers(s, e) - createClientVolumes(s) + createClientVolumes(s, prefix) return errorsutil.NewAggregate(errs) } -func getClientConfig(c client.Client, obj types.NamespacedName) (*corev1.ConfigMap, error) { +func getClientConfig(c client.Client, obj types.NamespacedName, prefix string) (*corev1.ConfigMap, error) { cm := &corev1.ConfigMap{} err := c.Get(context.Background(), types.NamespacedName{ Namespace: obj.Namespace, - Name: defaultStunnelClientConfig, + Name: withPrefix(prefix, defaultStunnelClientConfig), }, cm) return cm, err } -func createClientConfig(c client.Client, s *StunnelTransport, e endpoint.Endpoint) error { +func createClientConfig(c client.Client, s *StunnelTransport, prefix string, e endpoint.Endpoint) error { var caVerifyLevel string if s.Options().CAVerifyLevel == "" { @@ -112,7 +112,7 @@ func createClientConfig(c client.Client, s *StunnelTransport, e endpoint.Endpoin stunnelConfigMap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: s.nsNamePair.Source().Namespace, - Name: defaultStunnelClientConfig, + Name: withPrefix(prefix, defaultStunnelClientConfig), Labels: e.Labels(), }, Data: map[string]string{ @@ -122,24 +122,29 @@ func createClientConfig(c client.Client, s *StunnelTransport, e endpoint.Endpoin err = c.Create(context.TODO(), stunnelConfigMap, &client.CreateOptions{}) if err != nil && !k8serrors.IsAlreadyExists(err) { return err + } else if k8serrors.IsAlreadyExists(err) { + err = c.Update(context.TODO(), stunnelConfigMap, &client.UpdateOptions{}) + if err != nil { + return err + } } return nil } -func getClientSecret(c client.Client, obj types.NamespacedName) (*corev1.Secret, error) { +func getClientSecret(c client.Client, obj types.NamespacedName, prefix string) (*corev1.Secret, error) { secret := &corev1.Secret{} err := c.Get(context.Background(), types.NamespacedName{ Namespace: obj.Namespace, - Name: defaultStunnelClientSecret, + Name: withPrefix(prefix, defaultStunnelClientSecret), }, secret) return secret, err } -func createClientSecret(c client.Client, s *StunnelTransport, e endpoint.Endpoint) error { +func createClientSecret(c client.Client, s *StunnelTransport, prefix string, e endpoint.Endpoint) error { stunnelSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: s.nsNamePair.Source().Namespace, - Name: defaultStunnelClientSecret, + Name: withPrefix(prefix, defaultStunnelClientSecret), Labels: e.Labels(), }, Data: map[string][]byte{ @@ -186,14 +191,14 @@ func setClientContainers(s *StunnelTransport, e endpoint.Endpoint) { } } -func createClientVolumes(s *StunnelTransport) { +func createClientVolumes(s *StunnelTransport, prefix string) { s.clientVolumes = []corev1.Volume{ { Name: defaultStunnelClientConfig, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ - Name: defaultStunnelClientConfig, + Name: withPrefix(prefix, defaultStunnelClientConfig), }, }, }, @@ -202,7 +207,7 @@ func createClientVolumes(s *StunnelTransport) { Name: defaultStunnelClientSecret, VolumeSource: corev1.VolumeSource{ Secret: &corev1.SecretVolumeSource{ - SecretName: defaultStunnelClientSecret, + SecretName: withPrefix(prefix, defaultStunnelClientSecret), Items: []corev1.KeyToPath{ { Key: "tls.crt", diff --git a/state_transfer/transport/stunnel/client_test.go b/state_transfer/transport/stunnel/client_test.go new file mode 100644 index 0000000..24691f0 --- /dev/null +++ b/state_transfer/transport/stunnel/client_test.go @@ -0,0 +1,241 @@ +package stunnel + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/konveyor/crane-lib/state_transfer/endpoint" + "github.com/konveyor/crane-lib/state_transfer/endpoint/route" + "github.com/konveyor/crane-lib/state_transfer/transport" + routev1 "github.com/openshift/api/route/v1" + corev1 "k8s.io/api/core/v1" + + statetransfermeta "github.com/konveyor/crane-lib/state_transfer/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +const ( + testNamespace = "test-namespace" + stunnelCMKey = "stunnel.conf" + crtKey = "tls.crt" + keyKey = "tls.key" +) + +func TestCreateClientConfig(t *testing.T) { + client := buildTestClient() + e := createEndpoint(t, testRouteName, testNamespace, client) + if e == nil { + t.Fatalf("unable to create endpoint") + } + stunnelTransport := createStunnel(testTunnelName, testNamespace, testRouteName, testNamespace) + if err := createClientConfig(client, stunnelTransport, "fs", e); err != nil { + t.Fatalf("unable to create client config: %v", err) + } + cm, err := getClientConfig(client, types.NamespacedName{ + Namespace: testNamespace, + Name: "test-tunnel", + }, "fs") + if err != nil { + t.Fatalf("unable to get client config: %v", err) + } + if cm == nil { + t.Fatalf("client config not found") + } + if len(e.Labels()) != len(cm.Labels) { + t.Fatalf("client config labels length does not match, on new CM") + } + for k, v := range e.Labels() { + if cm.Labels[k] != v { + t.Fatalf("client config labels do not match, on new CM") + } + } + if !strings.Contains(cm.Data[stunnelCMKey], "test-route-test-namespace.test.domain:443") { + t.Fatalf("client config does not contain the correct route") + } + + t.Run("CreateClientConfigUpdate", func(t *testing.T) { + // Ensure that if the config map already exists, the contents are updated. + cm.Labels = map[string]string{"test": "label"} + cm.Data[stunnelCMKey] = "test" + err = client.Update(context.Background(), cm) + if err != nil { + t.Fatalf("unable to update client config map with old data: %v", err) + } + stunnelTransport.Options().CAVerifyLevel = "5" + stunnelTransport.Options().NoVerifyCA = true + + if err := createClientConfig(client, stunnelTransport, "fs", e); err != nil { + t.Fatalf("unable to create client config: %v", err) + } + cm, err := getClientConfig(client, types.NamespacedName{ + Namespace: testNamespace, + Name: "test-tunnel", + }, "fs") + if err != nil { + t.Fatalf("unable to get client config: %v", err) + } + if cm == nil { + t.Fatalf("client config not found") + } + if len(e.Labels()) != len(cm.Labels) { + t.Fatalf("client config labels do not match") + } + for k, v := range e.Labels() { + if cm.Labels[k] != v { + t.Fatalf("client config labels do not match") + } + } + if !strings.Contains(cm.Data[stunnelCMKey], "test-route-test-namespace.test.domain:443") { + t.Fatalf("client config does not contain the correct route") + } + if !strings.Contains(cm.Data[stunnelCMKey], "verify = 5") { + t.Fatalf("client config does not contain the correct caVerifyLevel %s", cm.Data[stunnelCMKey]) + } + }) +} + +func TestCreateClientSecret(t *testing.T) { + client := buildTestClient() + e := createEndpoint(t, testRouteName, testNamespace, client) + if e == nil { + t.Fatalf("unable to create endpoint") + } + stunnelTransport := createStunnel(testTunnelName, testNamespace, testRouteName, testNamespace) + + if err := createClientSecret(client, stunnelTransport, "fs", e); err != nil { + t.Fatalf("unable to create client secret: %v", err) + } + secret, err := getClientSecret(client, types.NamespacedName{ + Namespace: testNamespace, + }, "fs") + if err != nil { + t.Fatalf("unable to get client secret: %v", err) + } + if secret == nil { + t.Fatalf("client secret not found") + } + if len(e.Labels()) != len(secret.Labels) { + t.Fatalf("client secret labels length does not match, on new secret") + } + for k, v := range e.Labels() { + if secret.Labels[k] != v { + t.Fatalf("client secret labels do not match, on new secret") + } + } + if len(secret.Data) != 2 { + t.Fatalf("client secret does not contain the correct number of keys") + } + if _, ok := secret.Data[crtKey]; !ok { + t.Fatalf("client secret does not contain the correct keys") + } + if _, ok := secret.Data[keyKey]; !ok { + t.Fatalf("client secret does not contain the correct keys") + } + +} + +func TestCreateClient(t *testing.T) { + client := buildTestClient() + e := createEndpoint(t, testRouteName, testNamespace, client) + if e == nil { + t.Fatalf("unable to create endpoint") + } + stunnelTransport := createStunnel(testTunnelName, testNamespace, testRouteName, testNamespace) + if err := stunnelTransport.CreateClient(client, "", e); err != nil { + t.Fatalf("unable to create client: %v", err) + } + + containers := stunnelTransport.clientContainers + if len(containers) != 1 { + t.Fatalf("Number of client containers is not the expected 1, %d", len(containers)) + } + volumes := stunnelTransport.clientVolumes + if len(volumes) != 2 { + t.Fatalf("Number of client volumes is not the expected 2, %d", len(volumes)) + } +} + +func buildTestClient(objects ...runtime.Object) client.Client { + s := scheme.Scheme + schemeInitFuncs := []func(*runtime.Scheme) error{ + corev1.AddToScheme, + routev1.AddToScheme, + } + for _, f := range schemeInitFuncs { + if err := f(s); err != nil { + panic(fmt.Errorf("failed to initiate the scheme %w", err)) + } + } + + return fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() +} + +func createEndpoint(t *testing.T, name, namespace string, c client.Client) endpoint.Endpoint { + // create a route for data transfer + r := route.NewEndpoint( + types.NamespacedName{ + Namespace: namespace, + Name: name, + }, route.EndpointTypePassthrough, statetransfermeta.Labels, "test.domain") + e, err := endpoint.Create(r, c) + if err != nil { + t.Fatalf("unable to create route endpoint: %v", err) + } + + route := &routev1.Route{} + // Mark the route as admitted. + err = c.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: name}, route) + if err != nil { + t.Fatalf("unable to get route: %v, %s/%s", err, namespace, name) + } + route.Status = routev1.RouteStatus{ + Ingress: []routev1.RouteIngress{ + { + Conditions: []routev1.RouteIngressCondition{ + { + Type: routev1.RouteAdmitted, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + err = c.Status().Update(context.TODO(), route) + if err != nil { + t.Fatalf("unable to update route status: %v", err) + } + + ready, err := e.IsHealthy(c) + if err != nil { + t.Fatalf("unable to check route health: %v", err) + } + if !ready { + t.Fatalf("route is not ready") + } + return r +} + +func createStunnel(name, namespace, destName, destNamespace string) *StunnelTransport { + // create an stunnel transport to carry the data over the route + s := NewTransport(statetransfermeta.NewNamespacedPair( + types.NamespacedName{ + Name: name, Namespace: namespace}, + types.NamespacedName{ + Name: destName, Namespace: destNamespace}, + ), &transport.Options{}) + + crt, _, key, err := transport.GenerateSSLCert() + if err != nil { + return nil + } + s.(*StunnelTransport).crt = crt + s.(*StunnelTransport).key = key + + return s.(*StunnelTransport) // Type assertion to convert s to *StunnelTransport +} diff --git a/state_transfer/transport/stunnel/server.go b/state_transfer/transport/stunnel/server.go index 2fde932..e06429b 100644 --- a/state_transfer/transport/stunnel/server.go +++ b/state_transfer/transport/stunnel/server.go @@ -35,32 +35,32 @@ TIMEOUTclose = 0 ` ) -func (s *StunnelTransport) CreateServer(c client.Client, e endpoint.Endpoint) error { - err := createStunnelServerResources(c, s, e) +func (s *StunnelTransport) CreateServer(c client.Client, prefix string, e endpoint.Endpoint) error { + err := createStunnelServerResources(c, s, prefix, e) return err } -func createStunnelServerResources(c client.Client, s *StunnelTransport, e endpoint.Endpoint) error { +func createStunnelServerResources(c client.Client, s *StunnelTransport, prefix string, e endpoint.Endpoint) error { errs := []error{} - err := createStunnelServerConfig(c, s, e) + err := createStunnelServerConfig(c, s, prefix, e) errs = append(errs, err) - err = createStunnelServerSecret(c, s, e) + err = createStunnelServerSecret(c, s, prefix, e) errs = append(errs, err) createStunnelServerContainers(s, e) - createStunnelServerVolumes(s) + createStunnelServerVolumes(s, prefix) return errorsutil.NewAggregate(errs) } -func createStunnelServerConfig(c client.Client, s *StunnelTransport, e endpoint.Endpoint) error { +func createStunnelServerConfig(c client.Client, s *StunnelTransport, prefix string, e endpoint.Endpoint) error { ports := map[string]string{ // port on which Stunnel service listens on, must connect with endpoint "acceptPort": strconv.Itoa(int(e.Port())), - // port in the container on which Transfer is listening on + // port in the container on which filesystem Transfer is listening "connectPort": strconv.Itoa(int(s.ExposedPort())), } @@ -78,7 +78,7 @@ func createStunnelServerConfig(c client.Client, s *StunnelTransport, e endpoint. stunnelConfigMap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: s.nsNamePair.Destination().Namespace, - Name: defaultStunnelServerConfig, + Name: withPrefix(prefix, defaultStunnelServerConfig), Labels: e.Labels(), }, Data: map[string]string{ @@ -89,20 +89,25 @@ func createStunnelServerConfig(c client.Client, s *StunnelTransport, e endpoint. err = c.Create(context.TODO(), stunnelConfigMap, &client.CreateOptions{}) if err != nil && !k8serrors.IsAlreadyExists(err) { return err + } else if k8serrors.IsAlreadyExists(err) { + err = c.Update(context.TODO(), stunnelConfigMap, &client.UpdateOptions{}) + if err != nil { + return err + } } return nil } -func getServerConfig(c client.Client, obj types.NamespacedName) (*corev1.ConfigMap, error) { +func getServerConfig(c client.Client, obj types.NamespacedName, prefix string) (*corev1.ConfigMap, error) { cm := &corev1.ConfigMap{} err := c.Get(context.Background(), types.NamespacedName{ Namespace: obj.Namespace, - Name: defaultStunnelServerConfig, + Name: withPrefix(prefix, defaultStunnelServerConfig), }, cm) return cm, err } -func createStunnelServerSecret(c client.Client, s *StunnelTransport, e endpoint.Endpoint) error { +func createStunnelServerSecret(c client.Client, s *StunnelTransport, prefix string, e endpoint.Endpoint) error { _, crt, key, err := transport.GenerateSSLCert() s.key = key s.crt = crt @@ -113,7 +118,7 @@ func createStunnelServerSecret(c client.Client, s *StunnelTransport, e endpoint. stunnelSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: s.nsNamePair.Destination().Namespace, - Name: defaultStunnelServerSecret, + Name: withPrefix(prefix, defaultStunnelServerSecret), Labels: e.Labels(), }, Data: map[string][]byte{ @@ -129,11 +134,11 @@ func createStunnelServerSecret(c client.Client, s *StunnelTransport, e endpoint. return nil } -func getServerSecret(c client.Client, obj types.NamespacedName) (*corev1.Secret, error) { +func getServerSecret(c client.Client, obj types.NamespacedName, prefix string) (*corev1.Secret, error) { secret := &corev1.Secret{} err := c.Get(context.Background(), types.NamespacedName{ Namespace: obj.Namespace, - Name: defaultStunnelServerSecret, + Name: withPrefix(prefix, defaultStunnelServerSecret), }, secret) return secret, err } @@ -169,14 +174,14 @@ func createStunnelServerContainers(s *StunnelTransport, e endpoint.Endpoint) { } } -func createStunnelServerVolumes(s *StunnelTransport) { +func createStunnelServerVolumes(s *StunnelTransport, prefix string) { s.serverVolumes = []corev1.Volume{ { Name: defaultStunnelServerConfig, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ - Name: defaultStunnelServerConfig, + Name: withPrefix(prefix, defaultStunnelServerConfig), }, }, }, @@ -185,7 +190,7 @@ func createStunnelServerVolumes(s *StunnelTransport) { Name: defaultStunnelServerSecret, VolumeSource: corev1.VolumeSource{ Secret: &corev1.SecretVolumeSource{ - SecretName: defaultStunnelServerSecret, + SecretName: withPrefix(prefix, defaultStunnelServerSecret), Items: []corev1.KeyToPath{ { Key: "tls.crt", diff --git a/state_transfer/transport/stunnel/server_test.go b/state_transfer/transport/stunnel/server_test.go new file mode 100644 index 0000000..455e829 --- /dev/null +++ b/state_transfer/transport/stunnel/server_test.go @@ -0,0 +1,149 @@ +package stunnel + +import ( + "context" + "fmt" + "strings" + "testing" + + "k8s.io/apimachinery/pkg/types" +) + +const ( + testTunnelName = "test-tunnel" + testRouteName = "test-route" +) + +func TestCreateServerConfig(t *testing.T) { + client := buildTestClient() + e := createEndpoint(t, testRouteName, testNamespace, client) + if e == nil { + t.Fatalf("unable to create endpoint") + } + stunnelTransport := createStunnel(testTunnelName, testNamespace, testRouteName, testNamespace) + if err := createStunnelServerConfig(client, stunnelTransport, "fs", e); err != nil { + t.Fatalf("unable to create server config: %v", err) + } + cm, err := getServerConfig(client, types.NamespacedName{ + Namespace: testNamespace, + Name: testTunnelName, + }, "fs") + if err != nil { + t.Fatalf("unable to get server config: %v", err) + } + if cm == nil { + t.Fatalf("server config not found") + } + if len(e.Labels()) != len(cm.Labels) { + t.Fatalf("server config labels length does not match, on new CM") + } + for k, v := range e.Labels() { + if cm.Labels[k] != v { + t.Fatalf("server config labels do not match, on new CM") + } + } + if !strings.Contains(cm.Data[stunnelCMKey], fmt.Sprintf("connect = %d", stunnelTransport.ExposedPort())) { + t.Fatalf("server config does not contain the correct connect port %s", cm.Data[stunnelCMKey]) + } + if !strings.Contains(cm.Data[stunnelCMKey], fmt.Sprintf("accept = %d", e.Port())) { + t.Fatalf("server config does not contain the correct accept port %s", cm.Data[stunnelCMKey]) + } + t.Run("CreateServerConfigUpdate", func(t *testing.T) { + // Ensure that if the config map already exists, the contents are updated. + cm.Labels = map[string]string{"test": "label"} + cm.Data[stunnelCMKey] = "test" + err = client.Update(context.Background(), cm) + if err != nil { + t.Fatalf("unable to update server config map with old data: %v", err) + } + if err := createStunnelServerConfig(client, stunnelTransport, "fs", e); err != nil { + t.Fatalf("unable to create server config: %v", err) + } + cm, err := getServerConfig(client, types.NamespacedName{ + Namespace: testNamespace, + Name: testTunnelName, + }, "fs") + if err != nil { + t.Fatalf("unable to get server config: %v", err) + } + if cm == nil { + t.Fatalf("server config not found") + } + if len(e.Labels()) != len(cm.Labels) { + t.Fatalf("server config labels do not match") + } + for k, v := range e.Labels() { + if cm.Labels[k] != v { + t.Fatalf("server config labels do not match") + } + } + if !strings.Contains(cm.Data[stunnelCMKey], fmt.Sprintf("connect = %d", stunnelTransport.ExposedPort())) { + t.Fatalf("server config does not contain the correct connect port %s", cm.Data[stunnelCMKey]) + } + if !strings.Contains(cm.Data[stunnelCMKey], fmt.Sprintf("accept = %d", e.Port())) { + t.Fatalf("server config does not contain the correct accept port %s", cm.Data[stunnelCMKey]) + } + }) + +} + +func TestCreateServerSecret(t *testing.T) { + client := buildTestClient() + e := createEndpoint(t, testRouteName, testNamespace, client) + if e == nil { + t.Fatalf("unable to create endpoint") + } + stunnelTransport := createStunnel("test-stunnel", testNamespace, testRouteName, testNamespace) + if err := createStunnelServerSecret(client, stunnelTransport, "fs", e); err != nil { + t.Fatalf("unable to create server secret: %v", err) + } + secret, err := getServerSecret(client, types.NamespacedName{ + Namespace: testNamespace, + Name: testTunnelName, + }, "fs") + if err != nil { + t.Fatalf("unable to get server secret: %v", err) + } + if secret == nil { + t.Fatalf("server secret not found") + } + if len(e.Labels()) != len(secret.Labels) { + t.Fatalf("server secret labels length does not match, on new secret") + } + for k, v := range e.Labels() { + if secret.Labels[k] != v { + t.Fatalf("server secret labels do not match, on new secret") + } + } + if len(secret.Data) != 2 { + t.Fatalf("server secret does not contain the correct number of keys") + } + if _, ok := secret.Data[crtKey]; !ok { + t.Fatalf("server secret does not contain the correct keys") + } + if _, ok := secret.Data[keyKey]; !ok { + t.Fatalf("server secret does not contain the correct keys") + } +} + +func TestCreateServer(t *testing.T) { + client := buildTestClient() + e := createEndpoint(t, testRouteName, testNamespace, client) + if e == nil { + t.Fatalf("unable to create endpoint") + } + stunnelTransport := createStunnel(testTunnelName, testNamespace, testRouteName, testNamespace) + + if err := stunnelTransport.CreateServer(client, "", e); err != nil { + t.Fatalf("unable to create server: %v", err) + } + + containers := stunnelTransport.serverContainers + if len(containers) != 1 { + t.Fatalf("Number of server containers is not the expected 1, %d", len(containers)) + } + volumes := stunnelTransport.serverVolumes + if len(volumes) != 2 { + t.Fatalf("Number of server volumes is not the expected 2, %d", len(volumes)) + } +} diff --git a/state_transfer/transport/stunnel/stunnel.go b/state_transfer/transport/stunnel/stunnel.go index c4e39a1..7184b10 100644 --- a/state_transfer/transport/stunnel/stunnel.go +++ b/state_transfer/transport/stunnel/stunnel.go @@ -37,8 +37,6 @@ type StunnelTransport struct { clientVolumes []corev1.Volume direct bool options *transport.Options - noVerifyCA bool - caVerifyLevel string nsNamePair meta.NamespacedNamePair } @@ -113,42 +111,42 @@ func (s *StunnelTransport) getStunnelClientImage() string { } } -// GetTransportFromKubeObjects checks if the required configmaps and secretes are created for the transport -//. It populates the fields for the Transport needed for transfer object. +// GetTransportFromKubeObjects checks if the required configmaps and secrets are created for the transport +// . It populates the fields for the Transport needed for transfer object. // NOTE: this method will be removed in the future interfaces. 'options' are not persisted in the system // therefore, they require to be passed from outside by the consumers every time a transport is fetched -func GetTransportFromKubeObjects(srcClient client.Client, destClient client.Client, nnPair meta.NamespacedNamePair, e endpoint.Endpoint, options *transport.Options) (transport.Transport, error) { - _, err := getClientConfig(srcClient, nnPair.Source()) +func GetTransportFromKubeObjects(srcClient client.Client, destClient client.Client, prefix string, nnPair meta.NamespacedNamePair, e endpoint.Endpoint, options *transport.Options) (transport.Transport, error) { + _, err := getClientConfig(srcClient, nnPair.Source(), prefix) switch { case errors.IsNotFound(err): - fmt.Printf("transport: %s Client Config is not created", nnPair.Source()) + fmt.Printf("transport: %s Client Config is not created, prefix: %s", nnPair.Source(), prefix) return nil, err case err != nil: return nil, err } - _, err = getServerConfig(destClient, nnPair.Destination()) + _, err = getServerConfig(destClient, nnPair.Destination(), prefix) switch { case errors.IsNotFound(err): - fmt.Printf("transport: %s Server Config is not created", nnPair.Destination()) + fmt.Printf("transport: %s Server Config is not created, prefix: %s", nnPair.Destination(), prefix) return nil, err case err != nil: return nil, err } - clientSecretCreated, err := getClientSecret(srcClient, nnPair.Source()) + clientSecretCreated, err := getClientSecret(srcClient, nnPair.Source(), prefix) switch { case errors.IsNotFound(err): - fmt.Printf("transport: %s Client secret is not created", nnPair.Source()) + fmt.Printf("transport: %s Client secret is not created, prefix: %s", nnPair.Source(), prefix) return nil, err case err != nil: return nil, err } - _, err = getServerSecret(destClient, nnPair.Destination()) + _, err = getServerSecret(destClient, nnPair.Destination(), prefix) switch { case errors.IsNotFound(err): - fmt.Printf("transport: %s Server secret is not created", nnPair.Destination()) + fmt.Printf("transport: %s Server secret is not created, prefix: %s", nnPair.Destination(), prefix) return nil, err case err != nil: return nil, err @@ -174,8 +172,8 @@ func GetTransportFromKubeObjects(srcClient client.Client, destClient client.Clie s.key = bytes.NewBuffer(key) s.crt = bytes.NewBuffer(crt) - createStunnelServerVolumes(s) - createClientVolumes(s) + createStunnelServerVolumes(s, prefix) + createClientVolumes(s, prefix) setClientContainers(s, e) createStunnelServerContainers(s, e) s.nsNamePair = nnPair @@ -185,3 +183,10 @@ func GetTransportFromKubeObjects(srcClient client.Client, destClient client.Clie func (s *StunnelTransport) Options() *transport.Options { return s.options } + +func withPrefix(prefix string, name string) string { + if prefix == "" { + prefix = "fs" + } + return fmt.Sprintf("%s-%s", prefix, name) +} diff --git a/state_transfer/transport/stunnel/stunnel_test.go b/state_transfer/transport/stunnel/stunnel_test.go new file mode 100644 index 0000000..3c8e901 --- /dev/null +++ b/state_transfer/transport/stunnel/stunnel_test.go @@ -0,0 +1,168 @@ +package stunnel + +import ( + "testing" + + "github.com/konveyor/crane-lib/state_transfer/transport" + "k8s.io/apimachinery/pkg/types" +) + +const ( + sourceName = "source" + sourceNamespace = "source-namespace" + destName = "dest" + destNamespace = "dest-namespace" + + clientImage = "custom-client-image" + serverImage = "custom-server-image" +) + +func TestGetTransportFromKubeObjects(t *testing.T) { + srcClient := buildTestClient() + destClient := buildTestClient() + + e := createEndpoint(t, testRouteName, testNamespace, destClient) + if e == nil { + t.Fatalf("unable to create endpoint") + } + nnPair := &testNamespacedPair{ + src: types.NamespacedName{Name: sourceName, Namespace: sourceNamespace}, + dest: types.NamespacedName{Name: destName, Namespace: destNamespace}, + } + + stunnelTransport := createStunnel(sourceName, sourceNamespace, destName, destNamespace) + + t.Run("GetTransportFromKubeObjectsNoClientConfig", func(t *testing.T) { + _, err := GetTransportFromKubeObjects(srcClient, destClient, "fs", nnPair, e, nil) + if err == nil { + t.Fatalf("No client config set, should get error") + } + }) + // Create client and server config maps + if err := createClientConfig(srcClient, stunnelTransport, "fs", e); err != nil { + t.Fatalf("unable to create client config: %v", err) + } + t.Run("GetTransportFromKubeObjectsNoServerConfig", func(t *testing.T) { + _, err := GetTransportFromKubeObjects(srcClient, destClient, "fs", nnPair, e, nil) + if err == nil { + t.Fatalf("No server config set, should get error") + } + }) + if err := createStunnelServerConfig(destClient, stunnelTransport, "fs", e); err != nil { + t.Fatalf("unable to create server config: %v", err) + } + t.Run("GetTransportFromKubeObjectsNoClientSecret", func(t *testing.T) { + _, err := GetTransportFromKubeObjects(srcClient, destClient, "fs", nnPair, e, nil) + if err == nil { + t.Fatalf("No client secret set, should get error") + } + }) + // Create client and server secrets + if err := createClientSecret(srcClient, stunnelTransport, "fs", e); err != nil { + t.Fatalf("unable to create client secret: %v", err) + } + t.Run("GetTransportFromKubeObjectsNoServerSecret", func(t *testing.T) { + _, err := GetTransportFromKubeObjects(srcClient, destClient, "fs", nnPair, e, nil) + if err == nil { + t.Fatalf("No server secret set, should get error") + } + }) + if err := createStunnelServerSecret(destClient, stunnelTransport, "fs", e); err != nil { + t.Fatalf("unable to create server secret: %v", err) + } + tr, err := GetTransportFromKubeObjects(srcClient, destClient, "fs", nnPair, e, nil) + if err != nil { + t.Fatalf("unable to get transport: %v", err) + } + if tr, ok := tr.(*StunnelTransport); ok { + verifyDefaultTransport(tr, defaultStunnelImage, defaultStunnelImage, t) + } else { + t.Fatalf("unable to convert transport to *StunnelTransport") + } + + t.Run("GetTransportFromKubeObjectsWithCustomImages", func(t *testing.T) { + options := &transport.Options{ + StunnelClientImage: clientImage, + StunnelServerImage: serverImage, + } + tr, err := GetTransportFromKubeObjects(srcClient, destClient, "fs", nnPair, e, options) + if err != nil { + t.Fatalf("unable to get transport: %v", err) + } + if tr, ok := tr.(*StunnelTransport); ok { + verifyDefaultTransport(tr, clientImage, serverImage, t) + } else { + t.Fatalf("unable to convert transport to *StunnelTransport") + } + }) +} + +func verifyDefaultTransport(tr *StunnelTransport, clientImage, serverImage string, t *testing.T) { + if tr == nil { + t.Fatalf("transport is nil") + } + if tr.CA() != nil { + t.Fatalf("CA is not nil") + } + if tr.Crt() == nil { + t.Fatalf("Crt is nil") + } + if tr.Key() == nil { + t.Fatalf("Key is nil") + } + if tr.ExposedPort() != int32(2222) { + t.Fatalf("ExposedPort is not 2222") + } + if tr.Port() != int32(6443) { + t.Fatalf("Port is not 6443, %d", tr.Port()) + } + if len(tr.ClientContainers()) != 1 { + t.Fatalf("Number of client containers is not the expected 1, %d", len(tr.ClientContainers())) + } + if len(tr.ServerContainers()) != 1 { + t.Fatalf("Number of server containers is not the expected 1, %d", len(tr.ServerContainers())) + } + if len(tr.ClientVolumes()) != 2 { + t.Fatalf("Number of client volumes is not the expected 2, %d", len(tr.ClientVolumes())) + } + if len(tr.ServerVolumes()) != 2 { + t.Fatalf("Number of server volumes is not the expected 2, %d", len(tr.ServerVolumes())) + } + if tr.Direct() { + t.Fatalf("Direct is true") + } + if tr.Type() != TransportTypeStunnel { + t.Fatalf("Type is not TransportTypeStunnel") + } + if tr.NamespacedNamePair().Source().Name != sourceName { + t.Fatalf("Source name is not %s", sourceName) + } + if tr.NamespacedNamePair().Source().Namespace != sourceNamespace { + t.Fatalf("Source namespace is not %s", sourceNamespace) + } + if tr.NamespacedNamePair().Destination().Name != destName { + t.Fatalf("Destination name is not %s", destName) + } + if tr.NamespacedNamePair().Destination().Namespace != destNamespace { + t.Fatalf("Destination namespace is not %s", destNamespace) + } + if tr.getStunnelServerImage() != serverImage { + t.Fatalf("Server image is not %s", serverImage) + } + if tr.getStunnelClientImage() != clientImage { + t.Fatalf("Client image is not %s", clientImage) + } +} + +type testNamespacedPair struct { + src types.NamespacedName + dest types.NamespacedName +} + +func (t *testNamespacedPair) Source() types.NamespacedName { + return t.src +} + +func (t *testNamespacedPair) Destination() types.NamespacedName { + return t.dest +} diff --git a/state_transfer/transport/transport.go b/state_transfer/transport/transport.go index 0e6ee71..c3e1d74 100644 --- a/state_transfer/transport/transport.go +++ b/state_transfer/transport/transport.go @@ -39,8 +39,8 @@ type Transport interface { // ServerVolumes returns a list of volumes transfers can add to their server Pods ServerVolumes() []v1.Volume Direct() bool - CreateServer(client.Client, endpoint.Endpoint) error - CreateClient(client.Client, endpoint.Endpoint) error + CreateServer(client.Client, string, endpoint.Endpoint) error + CreateClient(client.Client, string, endpoint.Endpoint) error Options() *Options // Type Type() TransportType @@ -58,8 +58,8 @@ type Options struct { type TransportType string -func CreateServer(t Transport, c client.Client, e endpoint.Endpoint) (Transport, error) { - err := t.CreateServer(c, e) +func CreateServer(t Transport, c client.Client, prefix string, e endpoint.Endpoint) (Transport, error) { + err := t.CreateServer(c, prefix, e) if err != nil { return nil, err } @@ -67,8 +67,8 @@ func CreateServer(t Transport, c client.Client, e endpoint.Endpoint) (Transport, return t, nil } -func CreateClient(t Transport, c client.Client, e endpoint.Endpoint) (Transport, error) { - err := t.CreateClient(c, e) +func CreateClient(t Transport, c client.Client, prefix string, e endpoint.Endpoint) (Transport, error) { + err := t.CreateClient(c, prefix, e) if err != nil { return nil, err }