Skip to content

Commit

Permalink
[PWX-27076, PWX-26970]: Fix confluent kafka async dr migration issues (
Browse files Browse the repository at this point in the history
…#1269)

* PWX-27076: Migrate CRD objects from same group even when there are no CRs

Signed-off-by: Priyanshu Pandey <[email protected]>

* PWX-27076: Do not migrate service object if owner reference is set.

Signed-off-by: Priyanshu Pandey <[email protected]>

* PWX-26970: Update pvc owner references on destination

Signed-off-by: Priyanshu Pandey <[email protected]>

* PWX-27076: Updating loop break condition and appending kind+group name to appreg

Signed-off-by: Priyanshu Pandey <[email protected]>

* PWX-27076: Client param inside a function and updating conditional statements as per review

Signed-off-by: Priyanshu Pandey <[email protected]>

---------

Signed-off-by: Priyanshu Pandey <[email protected]>
  • Loading branch information
pp511 authored Feb 9, 2023
1 parent 08ed4c0 commit a452432
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 115 deletions.
7 changes: 3 additions & 4 deletions pkg/applicationmanager/controllers/applicationbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"strings"
"time"

"github.com/libopenstorage/stork/pkg/utils"

"github.com/go-openapi/inflect"
"github.com/libopenstorage/stork/drivers"
"github.com/libopenstorage/stork/drivers/volume"
Expand All @@ -27,6 +25,7 @@ import (
"github.com/libopenstorage/stork/pkg/objectstore"
"github.com/libopenstorage/stork/pkg/resourcecollector"
"github.com/libopenstorage/stork/pkg/rule"
"github.com/libopenstorage/stork/pkg/utils"
"github.com/libopenstorage/stork/pkg/version"
"github.com/portworx/sched-ops/k8s/apiextensions"
"github.com/portworx/sched-ops/k8s/core"
Expand Down Expand Up @@ -1217,7 +1216,7 @@ func (a *ApplicationBackupController) backupResources(
}
}
if len(incResNsBatch) != 0 {
objects, err := a.resourceCollector.GetResources(
objects, _, err := a.resourceCollector.GetResources(
incResNsBatch,
backup.Spec.Selectors,
objectMap,
Expand All @@ -1237,7 +1236,7 @@ func (a *ApplicationBackupController) backupResources(
for _, resource := range resourceTypes {
if resource.Kind == backupResourceType || (backupResourceType == "PersistentVolumeClaim" && resource.Kind == "PersistentVolume") {
log.ApplicationBackupLog(backup).Tracef("GetResourcesType for : %v", resource.Kind)
objects, err := a.resourceCollector.GetResourcesForType(resource, nil, resourceTypeNsBatch, backup.Spec.Selectors, nil, true, resourceCollectorOpts)
objects, _, err := a.resourceCollector.GetResourcesForType(resource, nil, resourceTypeNsBatch, backup.Spec.Selectors, nil, true, resourceCollectorOpts)
if err != nil {
log.ApplicationBackupLog(backup).Errorf("Error getting resources: %v", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/applicationmanager/controllers/applicationclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func (a *ApplicationCloneController) applyResources(
func (a *ApplicationCloneController) cloneResources(
clone *stork_api.ApplicationClone,
) error {
allObjects, err := a.resourceCollector.GetResources(
allObjects, _, err := a.resourceCollector.GetResources(
[]string{clone.Spec.SourceNamespace},
clone.Spec.Selectors,
nil,
Expand Down
34 changes: 34 additions & 0 deletions pkg/applicationmanager/controllers/applicationregistration.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,40 @@ func createAppReg(regCRD map[string][]stork_api.ApplicationResource) error {
if _, err := stork.Instance().CreateApplicationRegistration(appReg); err != nil && !errors.IsAlreadyExists(err) {
logrus.Errorf("unable to register app %v, err: %v", appReg, err)
return err
} else if errors.IsAlreadyExists(err) {
// Get the existing app reg
existingAppReg, err := stork.Instance().GetApplicationRegistration(appReg.Name)
if err != nil {
logrus.Errorf("unable to get an existing application registration")
} else {
needsRegistration := false
appRegName := appReg.Name
for _, newResource := range appReg.Resources {
found := false
for _, existingResource := range existingAppReg.Resources {
if newResource.Group == existingResource.Group &&
newResource.Version == existingResource.Version &&
newResource.Kind == existingResource.Kind {
found = true
break
}
}
if !found {
// Found a resource whose Group+Kind does not match with an existing app reg
// For ex. strimzi.kafka.io.Kafka vs platform.confluent.io.Kafka
needsRegistration = true
appRegName = appReg.Name + "-" + strings.ToLower(newResource.Kind) + "-" + newResource.Version + "-" + newResource.Group
break
}
}
if needsRegistration {
appReg.Name = appRegName
if _, err := stork.Instance().CreateApplicationRegistration(appReg); err != nil && !errors.IsAlreadyExists(err) {
logrus.Errorf("unable to register app %v, err: %v", appReg, err)
return err
}
}
}
}
}
return nil
Expand Down
Loading

0 comments on commit a452432

Please sign in to comment.