Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support system functions #193

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ push-chart-crd:
clean:
rm -f *build.properties || true
rm -f *.tgz || true
rm .push* .image*
rm .push* .image* || true

deploy-crds: .id build-chart-crd
${HELM} upgrade --install --namespace $(cat .id) ${CRDS_CHART} --version $(CHART_VERSION)
Expand All @@ -354,8 +354,7 @@ deploy: .id docker-push build-chart
--create-namespace \
-f helm/db-controller/minikube.yaml \
--set dbController.class=`cat .id` \
--set image.tag="${TAG}" \
--set db.identifier.prefix=`cat .id` ${HELM_SETFLAGS}
--set image.tag="${TAG}" ${HELM_SETFLAGS}

undeploy: .id
helm delete --namespace `cat .id` `cat .id`-db-ctrl
14 changes: 14 additions & 0 deletions controllers/databaseclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ func (r *DatabaseClaimReconciler) updateStatus(ctx context.Context, dbClaim *per
if err != nil {
return r.manageError(ctx, dbClaim, err)
}
dbClaim.Status.ActiveDB = *dbClaim.Status.NewDB.DeepCopy()
dbClaim.Status.NewDB = persistancev1.Status{ConnectionInfo: &persistancev1.DatabaseClaimConnectionInfo{}}
}

return r.reconcileMigrateToNewDB(ctx, dbClaim)
Expand Down Expand Up @@ -540,6 +542,10 @@ func (r *DatabaseClaimReconciler) reconcileUseExistingDB(ctx context.Context, db
return err
}
}
err = dbClient.ManageSystemFunctions(dbName, r.getSystemFunctions())
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -603,6 +609,10 @@ func (r *DatabaseClaimReconciler) reconcileNewDB(ctx context.Context,
if err != nil {
return ctrl.Result{}, err
}
err = dbClient.ManageSystemFunctions(GetDBName(dbClaim), r.getSystemFunctions())
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -1072,6 +1082,10 @@ func (r *DatabaseClaimReconciler) getDbSubnetGroupNameRef() string {
return r.Config.GetString("dbSubnetGroupNameRef")
}

func (r *DatabaseClaimReconciler) getSystemFunctions() map[string]string {
return r.Config.GetStringMapString("systemFunctions")
}

func (r *DatabaseClaimReconciler) getDynamicHostWaitTime() time.Duration {
t := r.Config.GetInt("dynamicHostWaitTimeMin")
if t > maxWaitTime {
Expand Down
1 change: 1 addition & 0 deletions helm/db-controller/minikube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ zapLogger:
level: debug
dbproxy:
enabled: false
env: box-3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we defaulting this to box-3 if this values file is only for minikube?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. just for testing purpose.

2 changes: 1 addition & 1 deletion helm/db-controller/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ metadata:
data:
{{- with .Values.controllerConfig }}
"config.yaml": |-
{{- toYaml . | trim | nindent 4 }}
{{- tpl (toYaml .) $ | trim | nindent 4 }}
{{ end }}
10 changes: 10 additions & 0 deletions helm/db-controller/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ secrets:
enabled: false

env: local
ib:
realm: "us"
lifecycle: "dev"
db:
identifier:
prefix: "{{ .Values.env }}"
Expand Down Expand Up @@ -158,3 +161,10 @@ controllerConfig:
enablePerfInsight: true
# Possible values for enableCloudwatchLogsExport are all, none, postgresql and upgrade.
enableCloudwatchLogsExport: "none"
# system funtions are created as functions in the database. The prefix is used as the schema name.
# only "ib_" prefixed functions are supported at this time.
systemFunctions:
ib_realm: "{{ .Values.ib.realm }}"
ib_env: "{{ .Values.env }}"
ib_lifecycle: "{{ .Values.lifecycle }}"

77 changes: 77 additions & 0 deletions pkg/dbclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,83 @@ func (pc *client) CreateDefaultExtentions(dbName string) error {
return err
}

func (pc *client) ManageSystemFunctions(dbName string, functions map[string]string) error {
db, err := pc.getDB(dbName)
if err != nil {
pc.log.Error(err, "could not connect to db", "database", dbName)
return err
}
pc.log.Info("ManageSystemFunctions - connected to " + dbName)
defer db.Close()
//check if schema ib exists
var exists bool
err = db.QueryRow("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = 'ib')").Scan(&exists)
if err != nil {
pc.log.Error(err, "could not query for schema ib")
return err
}
if !exists {
createSchema := `
CREATE SCHEMA IF NOT EXISTS ib;
REVOKE ALL ON SCHEMA ib FROM PUBLIC;
GRANT USAGE ON SCHEMA ib TO PUBLIC;
REVOKE ALL ON ALL TABLES IN SCHEMA ib FROM PUBLIC ;
GRANT SELECT ON ALL TABLES IN SCHEMA ib TO PUBLIC;
`
//create schema ib
_, err = db.Exec(createSchema)
if err != nil {
pc.log.Error(err, "could not create schema ib")
return err
}
}
var currVal string
var create bool
var schema string
sep := "_" // separator between schema and function name
for f, v := range functions {
create = false
// split ib_lifecycle into schema and function name. eg. ib_life_cycle -> ib.life_cycle
f_parts := strings.Split(f, sep)
if len(f_parts) == 1 {
schema = "public"
} else {
schema = f_parts[0]
f = pq.QuoteIdentifier(strings.Join(f_parts[1:], sep))
}

err := db.QueryRow(fmt.Sprintf("SELECT %s.%s()", schema, f)).Scan(&currVal)
//check if error contains "does not exist"
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
pc.log.Info("function does not exist - create it", "function", f)
create = true
} else {
return err
}
} else {
if currVal != v {
pc.log.Info("function value is not correct - update it", "function", f)
create = true
}
}
if create {
createFunction := `
CREATE OR REPLACE FUNCTION %s.%s()
RETURNS text AS $$SELECT text '%s'$$
LANGUAGE sql IMMUTABLE PARALLEL SAFE;
`
_, err = db.Exec(fmt.Sprintf(createFunction, schema, f, v))
if err != nil {
pc.log.Error(err, "could not create function", "database_name",
dbName, "function", f, "value", v)
return err
}
}
}
return nil
}

func (pc *client) CreateGroup(dbName, rolename string) (bool, error) {
start := time.Now()
var exists bool
Expand Down
32 changes: 32 additions & 0 deletions pkg/dbclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,38 @@ func TestPostgresClientOperations(t *testing.T) {
if _, err := db.Exec("CREATE EXTENSION IF NOT EXISTS citext"); err != nil {
t.Errorf("citext is not created: %s", err)
}
t.Logf("create system functions")
functions := map[string]string{
"ib_realm": "eu",
"ib_env": "box-3",
"ib_lifecycle": "dev",
}
err = pc.ManageSystemFunctions(tt.args.dbName, functions)
if (err != nil) != tt.wantErr {
t.Errorf("\t%s ManageSystemFunctions() error = %v, wantErr %v", failed, err, tt.wantErr)
} else {
t.Logf("\t%s create system functions passed", succeed)
}
t.Logf("re-create same system functions")
err = pc.ManageSystemFunctions(tt.args.dbName, functions)
if (err != nil) != tt.wantErr {
t.Errorf("\t%s re-create ManageSystemFunctions() error = %v, wantErr %v", failed, err, tt.wantErr)
} else {
t.Logf("\t%s create same system functions passed", succeed)
}
t.Logf("re-create different system functions")
functions = map[string]string{
"ib_realm": "us",
"ib_env": "box-4",
"ib_lifecycle": "stage",
}
err = pc.ManageSystemFunctions(tt.args.dbName, functions)
if (err != nil) != tt.wantErr {
t.Errorf("\t%s re-create ManageSystemFunctions() error = %v, wantErr %v", failed, err, tt.wantErr)
} else {
t.Logf("\t%s create different system functions passed", succeed)
}

})
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/dbclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Client interface {
ManageReplicationRole(username string, enableReplicationRole bool) error
ManageSuperUserRole(username string, enableSuperUser bool) error
ManageCreateRole(username string, enableCreateRole bool) error
ManageSystemFunctions(dbName string, functions map[string]string) error

DBCloser
}
Expand Down
1 change: 1 addition & 0 deletions pkg/pgctl/pgctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ func (s *copy_schema_state) Execute() (State, error) {
"--no-subscriptions",
"--no-privileges",
"--no-owner",
"--exclude-schema=ib",
})

dumpExec := dump.Exec(ExecOptions{StreamPrint: true})
Expand Down
Loading