From eae859de20969deffa0b8898d3b11a1b9dda96db Mon Sep 17 00:00:00 2001 From: chenquanzhao Date: Tue, 23 Apr 2019 13:44:11 +0800 Subject: [PATCH] Check valid kafka topics when disable auto-create topic --- Makefile | 1 + .../filebeat-pilot-kafka-kubernetes.yml | 96 +++++++++++++++++++ pilot/filebeat_piloter.go | 5 + pilot/fluentd_piloter.go | 5 + pilot/pilot.go | 31 ++++++ pilot/piloter.go | 1 + pilot/util.go | 16 ++++ quickstart/es.yml | 4 +- quickstart/filebeat/es.yml | 4 +- 9 files changed, 159 insertions(+), 4 deletions(-) create mode 100644 examples/with-configmap/filebeat-pilot-kafka-kubernetes.yml create mode 100644 pilot/util.go diff --git a/Makefile b/Makefile index a45075c1..ddd8a2c3 100644 --- a/Makefile +++ b/Makefile @@ -51,6 +51,7 @@ container: clean-container .container-$(ARCH) $(DOCKER) build --no-cache --pull -t $(MULTI_ARCH_IMG)-filebeat:$(TAG) -f Dockerfile.filebeat $(TEMP_DIR)/ ifeq ($(ARCH), amd64) $(DOCKER) tag $(MULTI_ARCH_IMG)-filebeat:$(TAG) $(IMAGE)-filebeat:$(TAG) + $(DOCKER) tag $(MULTI_ARCH_IMG)-filebeat:$(TAG) $(IMGNAME):$(TAG) endif @echo "+ Building container image $(MULTI_ARCH_IMG)-fluentd:$(TAG)" diff --git a/examples/with-configmap/filebeat-pilot-kafka-kubernetes.yml b/examples/with-configmap/filebeat-pilot-kafka-kubernetes.yml new file mode 100644 index 00000000..b34ae04b --- /dev/null +++ b/examples/with-configmap/filebeat-pilot-kafka-kubernetes.yml @@ -0,0 +1,96 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: log-pilot-configuration +data: + logging_output: "kafka" + kafka_brokers: "kafka1:9092,kafka2:9092" + # configure all valid topics in kafka + # when disable auto-create topic + kafka_topics: "topic1,topic2,topic3" +--- +apiVersion: extensions/v1beta1 +kind: DaemonSet +metadata: + name: log-pilot + labels: + k8s-app: log-pilot +spec: + updateStrategy: + type: RollingUpdate + template: + metadata: + labels: + k8s-app: log-pilot + spec: + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule + containers: + - name: log-pilot + image: registry.cn-hangzhou.aliyuncs.com/acs/log-pilot:0.9.6-filebeat + env: + - name: "LOGGING_OUTPUT" + valueFrom: + configMapKeyRef: + name: log-pilot-configuration + key: logging_output + - name: "KAFKA_BROKERS" + valueFrom: + configMapKeyRef: + name: log-pilot-configuration + key: kafka_brokers + - name: "NODE_NAME" + valueFrom: + fieldRef: + fieldPath: spec.nodeName + volumeMounts: + - name: sock + mountPath: /var/run/docker.sock + - name: logs + mountPath: /var/log/filebeat + - name: state + mountPath: /var/lib/filebeat + - name: root + mountPath: /host + readOnly: true + - name: localtime + mountPath: /etc/localtime + # configure all valid topics in kafka + # when disable auto-create topic + - name: config-volume + mountPath: /etc/filebeat/config + securityContext: + capabilities: + add: + - SYS_ADMIN + terminationGracePeriodSeconds: 30 + volumes: + - name: sock + hostPath: + path: /var/run/docker.sock + type: Socket + - name: logs + hostPath: + path: /var/log/filebeat + type: DirectoryOrCreate + - name: state + hostPath: + path: /var/lib/filebeat + type: DirectoryOrCreate + - name: root + hostPath: + path: / + type: Directory + - name: localtime + hostPath: + path: /etc/localtime + type: File + # kubelet sync period + - name: config-volume + configMap: + name: log-pilot-configuration + items: + - key: kafka_topics + path: kafka_topics \ No newline at end of file diff --git a/pilot/filebeat_piloter.go b/pilot/filebeat_piloter.go index a13aacd4..45442000 100644 --- a/pilot/filebeat_piloter.go +++ b/pilot/filebeat_piloter.go @@ -303,3 +303,8 @@ func (p *FilebeatPiloter) Name() string { func (p *FilebeatPiloter) OnDestroyEvent(container string) error { return p.feed(container) } + +// GetBaseConf returns plugin root directory +func (p *FilebeatPiloter) GetBaseConf() string { + return FILEBEAT_BASE_CONF +} diff --git a/pilot/fluentd_piloter.go b/pilot/fluentd_piloter.go index 19142d22..ac289998 100644 --- a/pilot/fluentd_piloter.go +++ b/pilot/fluentd_piloter.go @@ -142,3 +142,8 @@ func (p *FluentdPiloter) OnDestroyEvent(container string) error { log.Info("refactor in the future!!!") return nil } + +// GetBaseConf returns plugin root directory +func (p *FluentdPiloter) GetBaseConf() string { + return FLUENTD_BASE_CONF +} diff --git a/pilot/pilot.go b/pilot/pilot.go index 589861dc..1b9ae125 100644 --- a/pilot/pilot.go +++ b/pilot/pilot.go @@ -505,6 +505,32 @@ func (p *Pilot) parseTags(tags string) (map[string]string, error) { return tagMap, nil } +func (p *Pilot) tryCheckKafkaTopic(topic string) error { + output := os.Getenv(ENV_LOGGING_OUTPUT) + if output != "kafka" { + return nil + } + + topicPath := filepath.Join(p.piloter.GetBaseConf(), "config", "kafka_topics") + if _, err := os.Stat(topicPath); os.IsNotExist(err) { + log.Info("ignore checking the validity of kafka topic") + return nil + } + + topics, err := ReadFile(topicPath, ",") + if err != nil { + return err + } + + for _, t := range topics { + if t == topic { + return nil + } + } + + return fmt.Errorf("invalid topic: %s, supported topics: %v", topic, topics) +} + func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath string, mounts map[string]types.MountPoint) (*LogConfig, error) { path := strings.TrimSpace(info.value) if path == "" { @@ -535,6 +561,11 @@ func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath strin } } + // try to check the validity of the target topic for kafka + if err := p.tryCheckKafkaTopic(tagMap["topic"]); err != nil { + return nil, err + } + format := info.children["format"] if format == nil || format.value == "none" { format = newLogInfoNode("nonex") diff --git a/pilot/piloter.go b/pilot/piloter.go index ecdcaf98..4d0c5ba4 100644 --- a/pilot/piloter.go +++ b/pilot/piloter.go @@ -21,6 +21,7 @@ type Piloter interface { Reload() error Stop() error + GetBaseConf() string GetConfHome() string GetConfPath(container string) string diff --git a/pilot/util.go b/pilot/util.go new file mode 100644 index 00000000..f4db2502 --- /dev/null +++ b/pilot/util.go @@ -0,0 +1,16 @@ +package pilot + +import ( + "io/ioutil" + "strings" +) + +// ReadFile return string list separated by separator +func ReadFile(path string, separator string) ([]string, error) { + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + + return strings.Split(string(data), separator), nil +} diff --git a/quickstart/es.yml b/quickstart/es.yml index bf5701ed..d13d39b2 100644 --- a/quickstart/es.yml +++ b/quickstart/es.yml @@ -3,10 +3,10 @@ services: elasticsearch: ports: - 9200:9200 - image: elasticsearch + image: elasticsearch:5.5.1 kibana: - image: kibana + image: kibana:5.5.1 ports: - 5601:5601 environment: diff --git a/quickstart/filebeat/es.yml b/quickstart/filebeat/es.yml index 0038ed44..ec2d5ae0 100644 --- a/quickstart/filebeat/es.yml +++ b/quickstart/filebeat/es.yml @@ -3,10 +3,10 @@ services: elasticsearch: ports: - 9200:9200 - image: elasticsearch + image: elasticsearch:5.5.1 kibana: - image: kibana + image: kibana:5.5.1 ports: - 5601:5601 environment: