From df761e662bfcac0ef88665319a11463be18062f8 Mon Sep 17 00:00:00 2001 From: Liu Chao Date: Fri, 27 Sep 2024 11:17:52 +0800 Subject: [PATCH] feat: querier add version check --- server/controller/common/const.go | 2 + .../tagrecorder/check/dictionary.go | 379 ------------------ server/controller/tagrecorder/dictionary.go | 10 +- .../engine/clickhouse/clickhouse_test.go | 2 +- .../engine/clickhouse/client/client.go | 61 ++- .../engine/clickhouse/tag/translation.go | 2 +- .../engine/clickhouse/view/function.go | 2 + 7 files changed, 75 insertions(+), 383 deletions(-) delete mode 100644 server/controller/tagrecorder/check/dictionary.go diff --git a/server/controller/common/const.go b/server/controller/common/const.go index bf78571f5ca..464fe930ed4 100644 --- a/server/controller/common/const.go +++ b/server/controller/common/const.go @@ -703,3 +703,5 @@ const ( ) const TRISOLARIS_NODE_TYPE_MASTER = "master" + +const CLICK_HOUSE_VERSION = "24" diff --git a/server/controller/tagrecorder/check/dictionary.go b/server/controller/tagrecorder/check/dictionary.go deleted file mode 100644 index 2416fcd75ef..00000000000 --- a/server/controller/tagrecorder/check/dictionary.go +++ /dev/null @@ -1,379 +0,0 @@ -/* - * Copyright (c) 2023 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package tagrecorder - -import ( - "context" - "fmt" - "os" - "sort" - "strconv" - "strings" - - "golang.org/x/exp/slices" - - mapset "github.com/deckarep/golang-set" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - - "github.com/deepflowio/deepflow/server/controller/common" - "github.com/deepflowio/deepflow/server/controller/db/clickhouse" -) - -func (c *TagRecorder) UpdateChDictionary() { - log.Info("tagrecorder update ch dictionary") - kubeconfig := c.cfg.Kubeconfig - var config *rest.Config - var err error - if kubeconfig != "" { - config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - log.Error(err) - return - } - } else { - config, err = rest.InClusterConfig() - if err != nil { - log.Error(err) - return - } - } - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - log.Error(err) - return - } - ctx := context.Background() - namespace := os.Getenv(common.NAME_SPACE_KEY) - endpoints, err := clientset.CoreV1().Endpoints(namespace).List(ctx, metav1.ListOptions{}) - if err != nil { - log.Error(err) - return - } - if len(endpoints.Items) == 0 { - log.Warningf("no endpoints in %s", namespace) - } - endpointName := c.cfg.ClickHouseCfg.Host - findEndpoint := false - for _, endpoint := range endpoints.Items { - if endpoint.Name != endpointName { - continue - } - findEndpoint = true - subsets := endpoint.Subsets - for _, subset := range subsets { - for _, address := range subset.Addresses { - clickHouseCfg := c.cfg.ClickHouseCfg - if strings.Contains(address.IP, ":") { - clickHouseCfg.Host = fmt.Sprintf("[%s]", address.IP) - } else { - clickHouseCfg.Host = address.IP - } - for _, port := range subset.Ports { - if port.Name == c.cfg.ClickHouseCfg.EndpointTcpPortName { - clickHouseCfg.Port = uint32(port.Port) - // 在本区域所有数据节点更新字典 - // Update the dictionary at all data nodes in the region - replicaSQL := fmt.Sprintf("REPLICA (HOST '%s' PRIORITY %s)", c.cfg.MySqlCfg.Host, "1") - connect, err := clickhouse.Connect(clickHouseCfg) - if err != nil { - continue - } - log.Infof("refresh clickhouse dictionary in (%s: %d)", address.IP, clickHouseCfg.Port) - var databases []string - - // 检查并创建数据库 - // Check and create the database - if err := connect.Select(&databases, "SHOW DATABASES"); err != nil { - log.Error(err) - connect.Close() - continue - } - // 删除deepflow数据库 - // Drop database deepflow - if slices.Contains(databases, "deepflow") { - dropSql := "DROP DATABASE IF EXISTS deepflow" - _, err = connect.Exec(dropSql) - if err != nil { - log.Error(err) - connect.Close() - continue - } - } - - sort.Strings(databases) - databaseIndex := sort.SearchStrings(databases, c.cfg.ClickHouseCfg.Database) - if len(databases) == 0 || databaseIndex == len(databases) || databases[databaseIndex] != c.cfg.ClickHouseCfg.Database { - log.Infof("create database %s", c.cfg.ClickHouseCfg.Database) - sql := fmt.Sprintf("CREATE DATABASE %s", c.cfg.ClickHouseCfg.Database) - _, err = connect.Exec(sql) - if err != nil { - log.Error(err) - connect.Close() - continue - } - } - - // 获取数据库中当前的字典 - // Get the current dictionary in the database - dictionaries := []string{} - if err := connect.Select(&dictionaries, fmt.Sprintf("SHOW DICTIONARIES IN %s", c.cfg.ClickHouseCfg.Database)); err != nil { - log.Error(err) - connect.Close() - continue - } - wantedDicts := mapset.NewSet( - CH_DICTIONARY_IP_RESOURCE, - CH_DICTIONARY_IP_RELATION, - CH_DICTIONARY_POD_K8S_LABEL, - CH_DICTIONARY_POD_K8S_LABELS, - CH_DICTIONARY_REGION, - CH_DICTIONARY_AZ, - CH_DICTIONARY_VPC, - CH_DICTIONARY_VL2, - CH_DICTIONARY_POD_CLUSTER, - CH_DICTIONARY_POD_NAMESPACE, - CH_DICTIONARY_POD_NODE, - CH_DICTIONARY_POD_GROUP, - CH_DICTIONARY_POD, - CH_DICTIONARY_DEVICE, - CH_DICTIONARY_VTAP_PORT, - CH_DICTIONARY_TAP_TYPE, - CH_DICTIONARY_VTAP, - CH_DICTIONARY_VTAP_PORT, - CH_DICTIONARY_POD_NODE_PORT, - CH_DICTIONARY_POD_GROUP_PORT, - CH_DICTIONARY_POD_PORT, - CH_DICTIONARY_DEVICE_PORT, - CH_DICTIONARY_IP_PORT, - CH_DICTIONARY_SERVER_PORT, - CH_DICTIONARY_LB_LISTENER, - CH_DICTIONARY_POD_INGRESS, - CH_DICTIONARY_NODE_TYPE, - CH_STRING_DICTIONARY_ENUM, - CH_INT_DICTIONARY_ENUM, - CH_DICTIONARY_CHOST_CLOUD_TAG, - CH_DICTIONARY_POD_NS_CLOUD_TAG, - CH_DICTIONARY_CHOST_CLOUD_TAGS, - CH_DICTIONARY_POD_NS_CLOUD_TAGS, - CH_DICTIONARY_OS_APP_TAG, - CH_DICTIONARY_OS_APP_TAGS, - CH_DICTIONARY_GPROCESS, - CH_DICTIONARY_POD_SERVICE_K8S_LABEL, - CH_DICTIONARY_POD_SERVICE_K8S_LABELS, - - CH_DICTIONARY_POD_K8S_ANNOTATION, - CH_DICTIONARY_POD_K8S_ANNOTATIONS, - CH_DICTIONARY_POD_SERVICE_K8S_ANNOTATION, - CH_DICTIONARY_POD_SERVICE_K8S_ANNOTATIONS, - CH_DICTIONARY_POD_K8S_ENV, - CH_DICTIONARY_POD_K8S_ENVS, - CH_DICTIONARY_POD_SERVICE, - CH_DICTIONARY_CHOST, - CH_TARGET_LABEL, - CH_APP_LABEL, - CH_PROMETHEUS_LABEL_NAME, - CH_PROMETHEUS_METRIC_NAME, - CH_PROMETHEUS_METRIC_APP_LABEL_LAYOUT, - CH_PROMETHEUS_TARGET_LABEL_LAYOUT, - - CH_DICTIONARY_POLICY, - CH_DICTIONARY_NPB_TUNNEL, - ) - chDicts := mapset.NewSet() - for _, dictionary := range dictionaries { - chDicts.Add(dictionary) - } - - // 删除不存在的字典 - // Delete a dictionary that does not exist - delDicts := chDicts.Difference(wantedDicts) - var delDictError error - for _, dict := range delDicts.ToSlice() { - dropSQL := fmt.Sprintf("DROP DICTIONARY %s.%s", c.cfg.ClickHouseCfg.Database, dict) - _, err = connect.Exec(dropSQL) - if err != nil { - delDictError = err - log.Error(err) - break - } - } - if delDictError != nil { - connect.Close() - continue - } - - // 创建期望的字典 - // Creating the desired dictionary - addDicts := wantedDicts.Difference(chDicts) - var addDictError error - for _, dict := range addDicts.ToSlice() { - dictName := dict.(string) - chTable := "ch_" + strings.TrimSuffix(dictName, "_map") - createSQL := CREATE_SQL_MAP[dictName] - mysqlPortStr := strconv.Itoa(int(c.cfg.MySqlCfg.Port)) - createSQL = fmt.Sprintf(createSQL, c.cfg.ClickHouseCfg.Database, dictName, mysqlPortStr, c.cfg.MySqlCfg.UserName, c.cfg.MySqlCfg.UserPassword, replicaSQL, c.cfg.MySqlCfg.Database, chTable, chTable, c.cfg.TagRecorderCfg.DictionaryRefreshInterval) - log.Infof("create dictionary %s", dictName) - log.Info(createSQL) - _, err = connect.Exec(createSQL) - if err != nil { - addDictError = err - log.Error(err) - break - } - } - if addDictError != nil { - connect.Close() - continue - } - - // 检查并更新已存在字典 - // Check and update existing dictionaries - checkDicts := chDicts.Intersect(wantedDicts) - var updateDictError error - for _, dict := range checkDicts.ToSlice() { - dictName := dict.(string) - chTable := "ch_" + strings.TrimSuffix(dictName, "_map") - showSQL := fmt.Sprintf("SHOW CREATE DICTIONARY %s.%s", c.cfg.ClickHouseCfg.Database, dictName) - dictSQL := make([]string, 0) - if err := connect.Select(&dictSQL, showSQL); err != nil { - updateDictError = err - log.Error(err) - break - } - if len(dictSQL) <= 0 { - break - } - createSQL := CREATE_SQL_MAP[dictName] - mysqlPortStr := strconv.Itoa(int(c.cfg.MySqlCfg.Port)) - createSQL = fmt.Sprintf(createSQL, c.cfg.ClickHouseCfg.Database, dictName, mysqlPortStr, c.cfg.MySqlCfg.UserName, c.cfg.MySqlCfg.UserPassword, replicaSQL, c.cfg.MySqlCfg.Database, chTable, chTable, c.cfg.TagRecorderCfg.DictionaryRefreshInterval) - // In the new version of CK (version after 23.8), when ‘SHOW CREATE DICTIONARY’ does not display plain text password information, the password is fixedly displayed as ‘[HIDDEN]’, and password comparison needs to be repair. - checkDictSQL := strings.Replace(dictSQL[0], "[HIDDEN]", c.cfg.MySqlCfg.UserPassword, 1) - if createSQL == checkDictSQL { - continue - } - log.Infof("update dictionary %s", dictName) - log.Infof("exist dictionary %s", checkDictSQL) - log.Infof("wanted dictionary %s", createSQL) - dropSQL := fmt.Sprintf("DROP DICTIONARY %s.%s", c.cfg.ClickHouseCfg.Database, dictName) - _, err = connect.Exec(dropSQL) - if err != nil { - updateDictError = err - log.Error(err) - break - } - _, err = connect.Exec(createSQL) - if err != nil { - updateDictError = err - log.Error(err) - break - } - } - if updateDictError != nil { - connect.Close() - continue - } - - // Get the current view in the database - views := []string{} - log.Infof(fmt.Sprintf("SHOW TABLES FROM %s LIKE '%%view'", c.cfg.ClickHouseCfg.Database)) - if err := connect.Select(&views, fmt.Sprintf("SHOW TABLES FROM %s LIKE '%%view'", c.cfg.ClickHouseCfg.Database)); err != nil { - log.Error(err) - connect.Close() - continue - } - - // Create the desired view - wantedViews := mapset.NewSet(CH_APP_LABEL_LIVE_VIEW, CH_TARGET_LABEL_LIVE_VIEW) - chViews := mapset.NewSet() - for _, view := range views { - chViews.Add(view) - } - addViews := wantedViews.Difference(chViews) - var addViewError error - for _, view := range addViews.ToSlice() { - viewName := view.(string) - createSQL := CREATE_SQL_MAP[viewName] - createSQL = fmt.Sprintf(createSQL, c.cfg.TagRecorderCfg.LiveViewRefreshSecond) - _, err = connect.Exec(createSQL) - if err != nil { - addViewError = err - log.Error(err) - break - } - } - if addViewError != nil { - connect.Close() - continue - } - - // Check and update existing views - checkViews := chViews.Intersect(wantedViews) - var updateViewError error - for _, view := range checkViews.ToSlice() { - viewName := view.(string) - log.Info(viewName) - showSQL := fmt.Sprintf("SHOW CREATE TABLE %s.%s", c.cfg.ClickHouseCfg.Database, viewName) - viewSQL := make([]string, 0) - if err := connect.Select(&viewSQL, showSQL); err != nil { - updateViewError = err - log.Error(err) - break - } - if len(viewSQL) <= 0 { - break - } - createSQL := CREATE_SQL_MAP[viewName] - createSQL = fmt.Sprintf(createSQL, c.cfg.TagRecorderCfg.LiveViewRefreshSecond) - if createSQL == viewSQL[0] { - continue - } - log.Infof("update view %s", viewName) - log.Infof("exist view %s", viewSQL[0]) - log.Infof("wanted view %s", createSQL) - dropSQL := fmt.Sprintf("DROP TABLE %s.%s", c.cfg.ClickHouseCfg.Database, viewName) - _, err = connect.Exec(dropSQL) - if err != nil { - updateViewError = err - log.Error(err) - break - } - _, err = connect.Exec(createSQL) - if err != nil { - updateViewError = err - log.Error(err) - break - } - } - if updateViewError != nil { - connect.Close() - continue - } - - connect.Close() - } - } - } - } - } - if !findEndpoint { - log.Warningf("%s endpoint not found!", endpointName) - } - return -} diff --git a/server/controller/tagrecorder/dictionary.go b/server/controller/tagrecorder/dictionary.go index b554996cebf..4d4b84f3007 100644 --- a/server/controller/tagrecorder/dictionary.go +++ b/server/controller/tagrecorder/dictionary.go @@ -370,7 +370,15 @@ func (c *Dictionary) update(clickHouseCfg *clickhouse.ClickHouseConfig) { if updateDictError != nil { return } - + // Get version + versions := []string{} + if err := ckDb.Select(&versions, "SELECT version()"); err != nil { + log.Error(err, logger.NewORGPrefix(orgID)) + return + } + if versions[0] > common.CLICK_HOUSE_VERSION { + continue + } // Get the current view in the database views := []string{} if err := ckDb.Select(&views, fmt.Sprintf("SHOW TABLES FROM %s LIKE '%%view'", ckDatabaseName)); err != nil { diff --git a/server/querier/engine/clickhouse/clickhouse_test.go b/server/querier/engine/clickhouse/clickhouse_test.go index 71355d72521..7763cef0073 100644 --- a/server/querier/engine/clickhouse/clickhouse_test.go +++ b/server/querier/engine/clickhouse/clickhouse_test.go @@ -234,7 +234,7 @@ var ( output: []string{"SELECT if(type IN [0, 2],1,0) AS `request` FROM flow_log.`l7_flow_log` PREWHERE (observation_point GLOBAL IN (SELECT value FROM flow_tag.string_enum_map WHERE name ilike 'xxx' and tag_name='observation_point')) LIMIT 0, 50"}, }, { input: "select Histogram(Sum(byte),10) AS histo from l4_flow_log", - output: []string{"SELECT histogramIf(10)(`_sum_byte_tx+byte_rx`,`_sum_byte_tx+byte_rx`>0) AS `histo` FROM (SELECT SUM(byte_tx+byte_rx) AS `_sum_byte_tx+byte_rx` FROM flow_log.`l4_flow_log` LIMIT 10000)"}, + output: []string{"SELECT histogramIf(10)(assumeNotNull(`_sum_byte_tx+byte_rx`),`_sum_byte_tx+byte_rx`>0) AS `histo` FROM (SELECT SUM(byte_tx+byte_rx) AS `_sum_byte_tx+byte_rx` FROM flow_log.`l4_flow_log` LIMIT 10000)"}, }, { input: "select Sum(log_count) from event", output: []string{"SELECT SUM(1) AS `Sum(log_count)` FROM event.`event` LIMIT 10000"}, diff --git a/server/querier/engine/clickhouse/client/client.go b/server/querier/engine/clickhouse/client/client.go index 51517a5c09c..632569d934a 100644 --- a/server/querier/engine/clickhouse/client/client.go +++ b/server/querier/engine/clickhouse/client/client.go @@ -28,6 +28,7 @@ import ( clickhouse "github.com/ClickHouse/clickhouse-go/v2" //"github.com/k0kubun/pp" + ctrCommon "github.com/deepflowio/deepflow/server/controller/common" "github.com/deepflowio/deepflow/server/querier/common" "github.com/deepflowio/deepflow/server/querier/config" "github.com/deepflowio/deepflow/server/querier/statsd" @@ -50,6 +51,7 @@ type QueryParams struct { // All ClickHouse Client share one connection var connection clickhouse.Conn +var version string type Client struct { Host string @@ -99,6 +101,9 @@ func (c *Client) init(query_uuid string) error { connection = conn } c.connection = connection + if version == "" { + version, _ = c.GetVersion() + } return nil } @@ -110,7 +115,12 @@ func (c *Client) DoQuery(params *QueryParams) (result *common.Result, err error) sqlstr, callbacks, query_uuid, columnSchemaMap, simpleSql := params.Sql, params.Callbacks, params.QueryUUID, params.ColumnSchemaMap, params.SimpleSql queryCacheStr := "" if params.UseQueryCache { - queryCacheStr = " SETTINGS use_query_cache = true, query_cache_store_results_of_queries_with_nondeterministic_functions = 1" + queryCacheStr = " SETTINGS use_query_cache = true" + if version > ctrCommon.CLICK_HOUSE_VERSION { + queryCacheStr += ", query_cache_nondeterministic_function_handling = 'save'" + } else { + queryCacheStr += ", query_cache_store_results_of_queries_with_nondeterministic_functions = 1" + } if params.QueryCacheTTL != "" { queryCacheStr += fmt.Sprintf(", query_cache_ttl = %s", params.QueryCacheTTL) } @@ -124,6 +134,12 @@ func (c *Client) DoQuery(params *QueryParams) (result *common.Result, err error) } sqlstr = strings.ReplaceAll(sqlstr, "flow_tag", fmt.Sprintf("%04d_flow_tag", orgIDInt)) } + // live view + if version > ctrCommon.CLICK_HOUSE_VERSION { + sqlstr = strings.ReplaceAll(sqlstr, "app_label_live_view", "app_label_map") + sqlstr = strings.ReplaceAll(sqlstr, "target_label_live_view", "target_label_map") + } + err = c.init(query_uuid) if err != nil { return nil, err @@ -209,3 +225,46 @@ func (c *Client) DoQuery(params *QueryParams) (result *common.Result, err error) log.Infof("query_uuid: %s. query api statistics: %d rows, %d columns, %d bytes, cost %f ms", c.Debug.QueryUUID, resRows, resColumns, resSize, float64(queryTime.Milliseconds())) return result, nil } + +func (c *Client) GetVersion() (version string, err error) { + defer c.Close() + ctx := c.Context + if c.Context == nil { + ctx = context.Background() + } + sqlstr := "SELECT version()" + rows, err := c.connection.Query(ctx, sqlstr) + if err != nil { + log.Errorf("query clickhouse Error: %s, sql: %s", err, sqlstr) + return + } + defer rows.Close() + columns := rows.ColumnTypes() + columnNames := make([]interface{}, 0, len(columns)) + // 获取列名和列类型 + for _, column := range columns { + columnNames = append(columnNames, column.Name()) + } + columnValues := make([]interface{}, len(columns)) + for i := range columns { + columnValues[i] = reflect.New(columns[i].ScanType()).Interface() + } + for rows.Next() { + if err = rows.Scan(columnValues...); err != nil { + return + } + for _, rawValue := range columnValues { + value := TransType(rawValue) + version, _ = value.(string) + log.Infof("database version is %s", version) + return + } + } + // Even if the query operation produces an error, it does not necessarily return an error in the'err 'parameter, + // so the return value of the'rows. Err () ' method must be checked to ensure that the query operation is successful + if err = rows.Err(); err != nil { + log.Errorf("query clickhouse Error: %s, sql: %s", err, sqlstr) + return + } + return +} diff --git a/server/querier/engine/clickhouse/tag/translation.go b/server/querier/engine/clickhouse/tag/translation.go index 70b6d8e5785..e35160b7a35 100644 --- a/server/querier/engine/clickhouse/tag/translation.go +++ b/server/querier/engine/clickhouse/tag/translation.go @@ -1020,7 +1020,7 @@ func GenerateTagResoureMap() map[string]map[string]*Tag { // nullable int_enum tag do not return default value tagResourceMap["span_kind"] = map[string]*Tag{ "enum": NewTag( - "if(isNull(span_kind), '', dictGetOrDefault('flow_tag.int_enum_map', 'name', ('%s',toUInt64(span_kind)), span_kind))", + "if(isNull(span_kind), '', dictGetOrDefault('flow_tag.int_enum_map', 'name', ('%s',toUInt64(assumeNotNull(span_kind))), span_kind))", "", "toUInt64(span_kind) GLOBAL IN (SELECT value FROM flow_tag.int_enum_map WHERE name %s %s and tag_name='%s')", "toUInt64(span_kind) GLOBAL IN (SELECT value FROM flow_tag.int_enum_map WHERE %s(name,%s) and tag_name='%s')", diff --git a/server/querier/engine/clickhouse/view/function.go b/server/querier/engine/clickhouse/view/function.go index 65cc1524b8d..3aab3f056a8 100644 --- a/server/querier/engine/clickhouse/view/function.go +++ b/server/querier/engine/clickhouse/view/function.go @@ -570,7 +570,9 @@ func (f *HistogramFunction) WriteTo(buf *bytes.Buffer) { buf.WriteString("histogramIf(") buf.WriteString(FormatField(f.Fields[1].ToString())) buf.WriteString(")(") + buf.WriteString("assumeNotNull(") buf.WriteString(f.Fields[0].ToString()) + buf.WriteString(")") buf.WriteString(fmt.Sprintf(",%s>0)", f.Fields[0].ToString())) if f.Alias != "" { buf.WriteString(" AS ")