Skip to content

Commit

Permalink
Support system functions (#193)
Browse files Browse the repository at this point in the history
* support system function

* support system function during upgrade
  • Loading branch information
bjeevan-ib authored Oct 25, 2023
1 parent c2bc2b8 commit 686fcaa
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 4 deletions.
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ push-chart-crd:
clean:
rm -f *build.properties || true
rm -f *.tgz || true
rm .push* .image*
rm .push* .image* || true

deploy-crds: .id build-chart-crd
${HELM} upgrade --install --namespace $(cat .id) ${CRDS_CHART} --version $(CHART_VERSION)
Expand All @@ -354,8 +354,7 @@ deploy: .id docker-push build-chart
--create-namespace \
-f helm/db-controller/minikube.yaml \
--set dbController.class=`cat .id` \
--set image.tag="${TAG}" \
--set db.identifier.prefix=`cat .id` ${HELM_SETFLAGS}
--set image.tag="${TAG}" ${HELM_SETFLAGS}

undeploy: .id
helm delete --namespace `cat .id` `cat .id`-db-ctrl
14 changes: 14 additions & 0 deletions controllers/databaseclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ func (r *DatabaseClaimReconciler) updateStatus(ctx context.Context, dbClaim *per
if err != nil {
return r.manageError(ctx, dbClaim, err)
}
dbClaim.Status.ActiveDB = *dbClaim.Status.NewDB.DeepCopy()
dbClaim.Status.NewDB = persistancev1.Status{ConnectionInfo: &persistancev1.DatabaseClaimConnectionInfo{}}
}

return r.reconcileMigrateToNewDB(ctx, dbClaim)
Expand Down Expand Up @@ -540,6 +542,10 @@ func (r *DatabaseClaimReconciler) reconcileUseExistingDB(ctx context.Context, db
return err
}
}
err = dbClient.ManageSystemFunctions(dbName, r.getSystemFunctions())
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -603,6 +609,10 @@ func (r *DatabaseClaimReconciler) reconcileNewDB(ctx context.Context,
if err != nil {
return ctrl.Result{}, err
}
err = dbClient.ManageSystemFunctions(GetDBName(dbClaim), r.getSystemFunctions())
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -1072,6 +1082,10 @@ func (r *DatabaseClaimReconciler) getDbSubnetGroupNameRef() string {
return r.Config.GetString("dbSubnetGroupNameRef")
}

func (r *DatabaseClaimReconciler) getSystemFunctions() map[string]string {
return r.Config.GetStringMapString("systemFunctions")
}

func (r *DatabaseClaimReconciler) getDynamicHostWaitTime() time.Duration {
t := r.Config.GetInt("dynamicHostWaitTimeMin")
if t > maxWaitTime {
Expand Down
1 change: 1 addition & 0 deletions helm/db-controller/minikube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ zapLogger:
level: debug
dbproxy:
enabled: false
env: box-3
2 changes: 1 addition & 1 deletion helm/db-controller/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ metadata:
data:
{{- with .Values.controllerConfig }}
"config.yaml": |-
{{- toYaml . | trim | nindent 4 }}
{{- tpl (toYaml .) $ | trim | nindent 4 }}
{{ end }}
10 changes: 10 additions & 0 deletions helm/db-controller/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ secrets:
enabled: false

env: local
ib:
realm: "us"
lifecycle: "dev"
db:
identifier:
prefix: "{{ .Values.env }}"
Expand Down Expand Up @@ -158,3 +161,10 @@ controllerConfig:
enablePerfInsight: true
# Possible values for enableCloudwatchLogsExport are all, none, postgresql and upgrade.
enableCloudwatchLogsExport: "none"
# system funtions are created as functions in the database. The prefix is used as the schema name.
# only "ib_" prefixed functions are supported at this time.
systemFunctions:
ib_realm: "{{ .Values.ib.realm }}"
ib_env: "{{ .Values.env }}"
ib_lifecycle: "{{ .Values.lifecycle }}"

77 changes: 77 additions & 0 deletions pkg/dbclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,83 @@ func (pc *client) CreateDefaultExtentions(dbName string) error {
return err
}

func (pc *client) ManageSystemFunctions(dbName string, functions map[string]string) error {
db, err := pc.getDB(dbName)
if err != nil {
pc.log.Error(err, "could not connect to db", "database", dbName)
return err
}
pc.log.Info("ManageSystemFunctions - connected to " + dbName)
defer db.Close()
//check if schema ib exists
var exists bool
err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = 'ib')").Scan(&exists)
if err != nil {
pc.log.Error(err, "could not query for schema ib")
return err
}
if !exists {
createSchema := `
CREATE SCHEMA IF NOT EXISTS ib;
REVOKE ALL ON SCHEMA ib FROM PUBLIC;
GRANT USAGE ON SCHEMA ib TO PUBLIC;
REVOKE ALL ON ALL TABLES IN SCHEMA ib FROM PUBLIC ;
GRANT SELECT ON ALL TABLES IN SCHEMA ib TO PUBLIC;
`
//create schema ib
_, err = db.Exec(createSchema)
if err != nil {
pc.log.Error(err, "could not create schema ib")
return err
}
}
var currVal string
var create bool
var schema string
sep := "_" // separator between schema and function name
for f, v := range functions {
create = false
// split ib_lifecycle into schema and function name. eg. ib_life_cycle -> ib.life_cycle
f_parts := strings.Split(f, sep)
if len(f_parts) == 1 {
schema = "public"
} else {
schema = f_parts[0]
f = pq.QuoteIdentifier(strings.Join(f_parts[1:], sep))
}

err := db.QueryRow(fmt.Sprintf("SELECT %s.%s()", schema, f)).Scan(&currVal)
//check if error contains "does not exist"
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
pc.log.Info("function does not exist - create it", "function", f)
create = true
} else {
return err
}
} else {
if currVal != v {
pc.log.Info("function value is not correct - update it", "function", f)
create = true
}
}
if create {
createFunction := `
CREATE OR REPLACE FUNCTION %s.%s()
RETURNS text AS $$SELECT text '%s'$$
LANGUAGE sql IMMUTABLE PARALLEL SAFE;
`
_, err = db.Exec(fmt.Sprintf(createFunction, schema, f, v))
if err != nil {
pc.log.Error(err, "could not create function", "database_name",
dbName, "function", f, "value", v)
return err
}
}
}
return nil
}

func (pc *client) CreateGroup(dbName, rolename string) (bool, error) {
start := time.Now()
var exists bool
Expand Down
32 changes: 32 additions & 0 deletions pkg/dbclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,38 @@ func TestPostgresClientOperations(t *testing.T) {
if _, err := db.Exec("CREATE EXTENSION IF NOT EXISTS citext"); err != nil {
t.Errorf("citext is not created: %s", err)
}
t.Logf("create system functions")
functions := map[string]string{
"ib_realm": "eu",
"ib_env": "box-3",
"ib_lifecycle": "dev",
}
err = pc.ManageSystemFunctions(tt.args.dbName, functions)
if (err != nil) != tt.wantErr {
t.Errorf("\t%s ManageSystemFunctions() error = %v, wantErr %v", failed, err, tt.wantErr)
} else {
t.Logf("\t%s create system functions passed", succeed)
}
t.Logf("re-create same system functions")
err = pc.ManageSystemFunctions(tt.args.dbName, functions)
if (err != nil) != tt.wantErr {
t.Errorf("\t%s re-create ManageSystemFunctions() error = %v, wantErr %v", failed, err, tt.wantErr)
} else {
t.Logf("\t%s create same system functions passed", succeed)
}
t.Logf("re-create different system functions")
functions = map[string]string{
"ib_realm": "us",
"ib_env": "box-4",
"ib_lifecycle": "stage",
}
err = pc.ManageSystemFunctions(tt.args.dbName, functions)
if (err != nil) != tt.wantErr {
t.Errorf("\t%s re-create ManageSystemFunctions() error = %v, wantErr %v", failed, err, tt.wantErr)
} else {
t.Logf("\t%s create different system functions passed", succeed)
}

})
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/dbclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Client interface {
ManageReplicationRole(username string, enableReplicationRole bool) error
ManageSuperUserRole(username string, enableSuperUser bool) error
ManageCreateRole(username string, enableCreateRole bool) error
ManageSystemFunctions(dbName string, functions map[string]string) error

DBCloser
}
Expand Down
1 change: 1 addition & 0 deletions pkg/pgctl/pgctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ func (s *copy_schema_state) Execute() (State, error) {
"--no-subscriptions",
"--no-privileges",
"--no-owner",
"--exclude-schema=ib",
})

dumpExec := dump.Exec(ExecOptions{StreamPrint: true})
Expand Down

0 comments on commit 686fcaa

Please sign in to comment.