From 0ab82cf103fbb7205b7f3b16c88ba6c2b9c194dc Mon Sep 17 00:00:00 2001 From: Anmol Date: Fri, 23 Dec 2022 18:33:13 +0530 Subject: [PATCH] Add initial code for collector service (#30) * Add initial code for collector service * update makefile to change name * Adding fileserver to read and write from files * Add ability to store snapshots in the file servers * Add snapshot and export capabilities for collector service * Fix collector code * Add collector service and add prestop lifecycle hook to register snapshots * Update gas for hermes * update version of chart --- charts/devnet/Chart.yaml | 2 +- charts/devnet/scripts/register_snapshots.sh | 24 ++ charts/devnet/templates/chain/genesis.yaml | 22 ++ charts/devnet/templates/chain/validator.yaml | 19 ++ charts/devnet/templates/collector.yaml | 77 ++++++ .../templates/relayers/hermes/configmap.yaml | 6 +- charts/devnet/values.yaml | 7 + collector/Dockerfile | 27 ++ collector/Makefile | 16 ++ collector/app.go | 134 ++++++++++ collector/config.go | 115 +++++++++ collector/const.go | 21 ++ collector/error.go | 64 +++++ collector/fileserver.go | 243 ++++++++++++++++++ collector/go.mod | 19 ++ collector/go.sum | 35 +++ collector/handler.go | 112 ++++++++ collector/main.go | 61 +++++ collector/model.go | 62 +++++ exposer/handler.go | 1 - 20 files changed, 1062 insertions(+), 5 deletions(-) create mode 100644 charts/devnet/scripts/register_snapshots.sh create mode 100644 charts/devnet/templates/collector.yaml create mode 100644 collector/Dockerfile create mode 100644 collector/Makefile create mode 100644 collector/app.go create mode 100644 collector/config.go create mode 100644 collector/const.go create mode 100644 collector/error.go create mode 100644 collector/fileserver.go create mode 100644 collector/go.mod create mode 100644 collector/go.sum create mode 100644 collector/handler.go create mode 100644 collector/main.go create mode 100644 collector/model.go diff --git a/charts/devnet/Chart.yaml b/charts/devnet/Chart.yaml index 77fd396c..712947e8 100644 --- a/charts/devnet/Chart.yaml +++ b/charts/devnet/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.10 +version: 0.1.11 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/devnet/scripts/register_snapshots.sh b/charts/devnet/scripts/register_snapshots.sh new file mode 100644 index 00000000..b767e48a --- /dev/null +++ b/charts/devnet/scripts/register_snapshots.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +CHAIN_ID="${CHAIN_ID:=osmosis-1}" +VAL_NAME="${VAL_NAME:=osmosis}" +CHAIN_DIR="${CHAIN_DIR:=$HOME/.osmosisd}" +COLLECTOR_HOST="${COLLECTOR_HOST}" + +set -euxo pipefail + +snapshot_name=data_${VAL_NAME}_$(date "+%F-%H-%M-%S") + +# Create the snapshot that will be uploaded +function create_snapshot { + tar -czvf /opt/${snapshot_name}.tar.gz $CHAIN_DIR/data +} + +# Register the snapshot to the collector service +function register_snapshot { + url=${COLLECTOR_HOST}/chains/${CHAIN_ID}/validators/${VAL_NAME}/snapshots/${snapshot_name}.tar.gz + curl -v -i ${url} -H'Content-Encoding: gzip' -H'Content-TYPE: application/gzip' --data-binary @/opt/${snapshot_name}.tar.gz +} + +create_snapshot +register_snapshot \ No newline at end of file diff --git a/charts/devnet/templates/chain/genesis.yaml b/charts/devnet/templates/chain/genesis.yaml index 87bac2aa..ce3ba2ec 100644 --- a/charts/devnet/templates/chain/genesis.yaml +++ b/charts/devnet/templates/chain/genesis.yaml @@ -109,6 +109,10 @@ spec: env: {{- include "devnet.defaultEvnVars" $defaultChain | indent 12 }} {{- include "devnet.evnVars" $chain | indent 12 }} + - name: COLLECTOR_SERVICE + value: collector + - name: COLLECTOR_PORT + value: "8070" command: - bash - "-c" @@ -124,6 +128,24 @@ spec: name: node - mountPath: /configs name: addresses + - mountPath: /scripts + name: scripts + {{- if $.Values.collector.enabled }} + lifecycle: + preStop: + exec: + command: + - bash + - "-c" + - "-e" + - | + VAL_INDEX=${HOSTNAME##*-} + VAL_NAME=$(jq -r ".genesis[$VAL_INDEX].name" /configs/keys.json) + echo "Validator Index: $VAL_INDEX, Key name: $VAL_NAME" + + COLLECTOR_HOST=http://$COLLECTOR_SERVICE.$NAMESPACE.svc.cluster.local:$COLLECTOR_PORT + VAL_NAME=$VAL_NAME COLLECTOR_HOST=$COLLECTOR_HOST bash -e /scripts/register_snapshots.sh + {{- end }} - name: exposer image: {{ $.Values.exposer.image }} imagePullPolicy: Always diff --git a/charts/devnet/templates/chain/validator.yaml b/charts/devnet/templates/chain/validator.yaml index 57c7ea08..a5c774b0 100644 --- a/charts/devnet/templates/chain/validator.yaml +++ b/charts/devnet/templates/chain/validator.yaml @@ -132,6 +132,10 @@ spec: {{- include "devnet.defaultEvnVars" $defaultChain | indent 12 }} {{- include "devnet.evnVars" $chain | indent 12 }} {{- include "devnet.genesisVars" $dataExposer | indent 12}} + - name: COLLECTOR_SERVICE + value: collector + - name: COLLECTOR_PORT + value: "8070" command: - bash - "-c" @@ -158,6 +162,21 @@ spec: $CHAIN_BIN keys list | jq VAL_NAME=$VAL_NAME bash -e /scripts/create_validator.sh + {{- if $.Values.collector.enabled }} + preStop: + exec: + command: + - bash + - "-c" + - "-e" + - | + VAL_INDEX=${HOSTNAME##*-} + VAL_NAME=$(jq -r ".validators[$VAL_INDEX].name" /configs/keys.json) + echo "Validator Index: $VAL_INDEX, Key name: $VAL_NAME" + + COLLECTOR_HOST=http://$COLLECTOR_SERVICE.$NAMESPACE.svc.cluster.local:$COLLECTOR_PORT + VAL_NAME=$VAL_NAME COLLECTOR_HOST=$COLLECTOR_HOST bash -e /scripts/register_snapshots.sh + {{- end }} resources: {{- include "devnet.validator.resources" $chain | trim | nindent 12 }} volumeMounts: - mountPath: {{ $defaultChain.home }} diff --git a/charts/devnet/templates/collector.yaml b/charts/devnet/templates/collector.yaml new file mode 100644 index 00000000..f059173c --- /dev/null +++ b/charts/devnet/templates/collector.yaml @@ -0,0 +1,77 @@ +{{- if .Values.collector.enabled }} +--- +apiVersion: v1 +kind: Service +metadata: + name: collector + labels: + app.kubernetes.io/name: collector +spec: + clusterIP: None + ports: + - name: collector + port: 8070 + protocol: TCP + targetPort: 8070 + selector: + app.kubernetes.io/name: collector +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: collector +spec: + replicas: 1 + revisionHistoryLimit: 3 + selector: + matchLabels: + app.kubernetes.io/instance: collector + app.kubernetes.io/name: collector + template: + metadata: + annotations: + quality: release + role: api-gateway + sla: high + tier: gateway + labels: + app.kubernetes.io/instance: collector + app.kubernetes.io/type: collector + app.kubernetes.io/name: collector + app.kubernetes.io/version: {{ $.Chart.AppVersion }} + spec: + containers: + - name: collector + image: {{ .Values.collector.image }} + imagePullPolicy: Always + env: + - name: COLLECTOR_ADDR + value: ":8070" + - name: COLLECTOR_DIR_PATH + value: /opt/collector + command: [ "collector" ] + resources: + limits: + cpu: "1" + memory: 4Gi + requests: + cpu: "0.5" + memory: 2Gi + volumeMounts: + - mountPath: /opt/collector + name: collector + readinessProbe: + tcpSocket: + port: 8070 + initialDelaySeconds: 60 + periodSeconds: 30 + livenessProbe: + tcpSocket: + port: 8070 + initialDelaySeconds: 60 + periodSeconds: 30 + volumes: + - name: collector + emptyDir: {} +--- +{{- end }} diff --git a/charts/devnet/templates/relayers/hermes/configmap.yaml b/charts/devnet/templates/relayers/hermes/configmap.yaml index 18b27f48..53c74038 100644 --- a/charts/devnet/templates/relayers/hermes/configmap.yaml +++ b/charts/devnet/templates/relayers/hermes/configmap.yaml @@ -89,10 +89,10 @@ data: websocket_addr = "ws://{{ $chain }}-genesis.{{ $.Release.Namespace }}.svc.cluster.local:26657/websocket" {{- with index $.Values.defaultChains $fullchain.type }} account_prefix = "{{ .prefix }}" - gas_price = { price = 0.025, denom = "{{ .denom }}" } + gas_price = { price = 0.25, denom = "{{ .denom }}" } {{- end }} - default_gas = 50000000 - max_gas = 100000000 + default_gas = 500000000 + max_gas = 1000000000 rpc_timeout = "10s" store_prefix = "ibc" gas_multiplier = 1.5 diff --git a/charts/devnet/values.yaml b/charts/devnet/values.yaml index 4d0b8836..63fad2e7 100644 --- a/charts/devnet/values.yaml +++ b/charts/devnet/values.yaml @@ -187,3 +187,10 @@ chainRegistry: image: anmol1696/chain-registry ports: rest: 8090 + +collector: + enabled: false + image: anmol1696/collector:latest + localhost: true + ports: + rest: 8070 \ No newline at end of file diff --git a/collector/Dockerfile b/collector/Dockerfile new file mode 100644 index 00000000..8a1ec16f --- /dev/null +++ b/collector/Dockerfile @@ -0,0 +1,27 @@ +FROM golang:1.19-alpine AS build-env + +# Set up dependencies +ENV PACKAGES curl make git libc-dev bash gcc linux-headers + +# Set working directory for the build +WORKDIR /usr/local/share/app + +# Add source files +COPY . . + +# Install minimum necessary dependencies, build Cosmos SDK, remove packages +RUN apk add --no-cache $PACKAGES && go build -mod readonly -o collector ./... + +# Final image +FROM alpine:3.16 + +# Install ca-certificates +RUN apk add --update ca-certificates jq bash curl +WORKDIR /usr/local/share/app + +RUN ls /usr/bin + +# Copy over binaries from the build-env +COPY --from=build-env /usr/local/share/app/collector /usr/bin/collector + +EXPOSE 8081 diff --git a/collector/Makefile b/collector/Makefile new file mode 100644 index 00000000..06a3dca3 --- /dev/null +++ b/collector/Makefile @@ -0,0 +1,16 @@ +DOCKER := $(shell which docker) +DOCKER_REPO_NAME := anmol1696 +DOCKER_IMAGE := collector +DOCKER_TAG_NAME := latest + +DOCKER_ARGS += --platform linux/amd64 + +docker-build: + $(DOCKER) buildx build $(DOCKER_ARGS) \ + -t $(DOCKER_REPO_NAME)/$(DOCKER_IMAGE):$(DOCKER_TAG_NAME) . + +docker-build-push: docker-build + $(DOCKER) push $(DOCKER_REPO_NAME)/$(DOCKER_IMAGE):$(DOCKER_TAG_NAME) + +docker-run: + $(DOCKER) run --rm -it --entrypoint /bin/bash $(DOCKER_REPO_NAME)/$(DOCKER_IMAGE):$(DOCKER_TAG_NAME) diff --git a/collector/app.go b/collector/app.go new file mode 100644 index 00000000..ff970008 --- /dev/null +++ b/collector/app.go @@ -0,0 +1,134 @@ +package main + +import ( + "fmt" + "net/http" + "time" + + "github.com/go-chi/chi" + "github.com/go-chi/chi/middleware" + "github.com/go-chi/render" + "go.uber.org/zap" +) + +type AppServer struct { + config *Config + db *FileDB + logger *zap.Logger + server *http.Server + router http.Handler +} + +func NewAppServer(config *Config) (*AppServer, error) { + log, err := NewLogger(config) + if err != nil { + return nil, err + } + log.Info( + "Starting the service", + zap.String("prog", Prog), + zap.String("version", Version), + zap.Any("config", config), + ) + + app := &AppServer{ + config: config, + logger: log, + db: NewFileDB(log, config.DirPath), + } + + // Setup routes + router, err := app.Router() + if err != nil { + log.Error("Error setting up routes", zap.Error(err)) + return nil, err + } + app.router = router + + return app, err +} + +func (a *AppServer) Router() (*chi.Mux, error) { + router := chi.NewRouter() + router.MethodNotAllowed(MethodNotAllowed) + router.NotFound(NotFound) + + // Set middleware + router.Use(a.panicRecovery) + router.Use(render.SetContentType(render.ContentTypeJSON)) + + // Setup routes + // handler of export states + router.Get("/chains", a.GetChains) + router.Route("/chains/{chain}/validators/{validator}", func(r chi.Router) { + r.Get("/exports", a.GetChainExports) + r.Get("/exports/{id}", a.GetChainExport) + r.Post("/exports/{id}", a.SetChainExport) + r.Get("/snapshots", a.GetChainSnapshots) + r.Get("/snapshots/{id}", a.GetChainSnapshot) + r.Post("/snapshots/{id}", a.SetChainSnapshot) + }) + + return router, nil +} + +func (a *AppServer) loggingMiddleware(next http.Handler) http.Handler { + fn := func(w http.ResponseWriter, r *http.Request) { + ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor) + start := time.Now() + defer func() { + a.logger.Info("client request", + zap.Duration("latency", time.Since(start)), + zap.Int("status", ww.Status()), + zap.Int("bytes", ww.BytesWritten()), + zap.String("client_ip", r.RemoteAddr), + zap.String("method", r.Method), + zap.String("path", r.URL.Path), + zap.String("request-id", middleware.GetReqID(r.Context()))) + }() + + next.ServeHTTP(ww, r) + } + return http.HandlerFunc(fn) +} + +func (a *AppServer) panicRecovery(next http.Handler) http.Handler { + fn := func(w http.ResponseWriter, r *http.Request) { + defer func() { + if rc := recover(); rc != nil { + err, ok := rc.(error) + if !ok { + err = fmt.Errorf("panic: %v", rc) + } + a.logger.Error("panic error", + zap.String("request-id", middleware.GetReqID(r.Context())), + zap.Error(err)) + + render.Render(w, r, ErrInternalServer) + return + } + }() + next.ServeHTTP(w, r) + } + return http.HandlerFunc(fn) +} + +func (a *AppServer) Run() error { + a.logger.Info("App starting", zap.Any("Config", a.config)) + + // Setup server + server := &http.Server{ + Addr: a.config.Addr, + Handler: a.router, + } + a.server = server + + // Start http server as long-running go routine + go func() { + if err := server.ListenAndServe(); err != nil { + a.logger.Error("failed to start the App HTTP server", zap.Error(err)) + } + }() + + return nil +} diff --git a/collector/config.go b/collector/config.go new file mode 100644 index 00000000..a8bdf076 --- /dev/null +++ b/collector/config.go @@ -0,0 +1,115 @@ +package main + +import ( + "fmt" + "reflect" + + "github.com/urfave/cli" + "go.uber.org/zap" +) + +func NewDefaultConfig() *Config { + return &Config{ + Addr: ":8070", + DirPath: "/opt/collector", + SnapshotExt: "tar.gz", + ExportExt: "json", + } +} + +type Config struct { + // Addr is the interface and port to bind the HTTP service on + Addr string `name:"addr" json:"addr" env:"ADDR" usage:"IP address and port to listen on"` + // DirPath is the directory which is used by the fileserver to store files + DirPath string `name:"dir-path" json:"dir_path" env:"DIR_PATH" usage:"directory where the files are stored"` + // SnapshotExt is the extensions of the snapshot + SnapshotExt string `name:"snapshot-ext" json:"snapshot_ext" env:"SNAPSHOT_EXT" usage:"extenstion of the snapshots"` + // ExportExt is the extensions of the exports + ExportExt string `name:"export-ext" json:"export_ext" env:"EXPORT_EXT" usage:"extenstion of the exports"` + // Verbose switches on debug logging + Verbose bool `name:"verbose" json:"verbose" usage:"switch on debug / verbose logging"` + // OnlyFatalLog set log level as fatal to ignore logs + OnlyFatalLog bool `name:"only-fatal-log" json:"only-fatal-log" usage:"used while running test"` +} + +func GetCommandLineOptions() []cli.Flag { + defaults := NewDefaultConfig() + var flags []cli.Flag + count := reflect.TypeOf(Config{}).NumField() + for i := 0; i < count; i++ { + field := reflect.TypeOf(Config{}).Field(i) + usage, found := field.Tag.Lookup("usage") + if !found { + continue + } + envName := field.Tag.Get("env") + if envName != "" { + envName = envPrefix + envName + } + optName := field.Tag.Get("name") + + switch t := field.Type; t.Kind() { + case reflect.Bool: + dv := reflect.ValueOf(defaults).Elem().FieldByName(field.Name).Bool() + msg := fmt.Sprintf("%s (default: %t)", usage, dv) + flags = append(flags, cli.BoolTFlag{ + Name: optName, + Usage: msg, + EnvVar: envName, + }) + case reflect.String: + defaultValue := reflect.ValueOf(defaults).Elem().FieldByName(field.Name).String() + flags = append(flags, cli.StringFlag{ + Name: optName, + Usage: usage, + EnvVar: envName, + Value: defaultValue, + }) + } + } + + return flags +} + +func ParseCLIOptions(cx *cli.Context, config *Config) (err error) { + // iterate the Config and grab command line options via reflection + count := reflect.TypeOf(config).Elem().NumField() + for i := 0; i < count; i++ { + field := reflect.TypeOf(config).Elem().Field(i) + name := field.Tag.Get("name") + + if cx.IsSet(name) { + switch field.Type.Kind() { + case reflect.Bool: + reflect.ValueOf(config).Elem().FieldByName(field.Name).SetBool(cx.Bool(name)) + case reflect.String: + reflect.ValueOf(config).Elem().FieldByName(field.Name).SetString(cx.String(name)) + } + } + } + return nil +} + +func NewLogger(config *Config) (*zap.Logger, error) { + c := zap.NewProductionConfig() + c.DisableCaller = true + // c.Encoding = "console" + + if config.Verbose { + c.DisableCaller = false + c.Development = true + c.DisableStacktrace = true // Disable stack trace for development + c.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + } + + if config.OnlyFatalLog { + c.Level = zap.NewAtomicLevelAt(zap.FatalLevel) + } + + log, err := c.Build() + if err != nil { + return nil, err + } + zap.ReplaceGlobals(log) // Set zap global logger + return log, err +} diff --git a/collector/const.go b/collector/const.go new file mode 100644 index 00000000..6522a6da --- /dev/null +++ b/collector/const.go @@ -0,0 +1,21 @@ +package main + +var ( + Version = "v0" + RequestIdCtxKey = &contextKey{"RequestId"} +) + +const ( + Prog = "collector" + Description = "is a service that aggregates the information of shuttle" + envPrefix = "COLLECTOR_" +) + +// copied and modified from net/http/http.go +// contextKey is a value for use with context.WithValue. It's used as +// a pointer, so it fits in an interface{} without allocation. +type contextKey struct { + name string +} + +func (k *contextKey) String() string { return Prog + " context value " + k.name } diff --git a/collector/error.go b/collector/error.go new file mode 100644 index 00000000..62e86f4c --- /dev/null +++ b/collector/error.go @@ -0,0 +1,64 @@ +package main + +import ( + "net/http" + + "github.com/go-chi/render" + "go.uber.org/zap" +) + +type ErrResponse struct { + Err error `json:"-"` + HTTPStatusCode int `json:"-"` + + MessageText string `json:"message"` +} + +func (e *ErrResponse) Render(w http.ResponseWriter, r *http.Request) error { + render.Status(r, e.HTTPStatusCode) + return nil +} + +// NewErrResponse create http aware errors from custom errors +func NewErrResponse(err error) *ErrResponse { + return &ErrResponse{ + Err: err, + HTTPStatusCode: http.StatusInternalServerError, + MessageText: err.Error(), + } +} + +var ( + ErrValidation = &ErrResponse{HTTPStatusCode: http.StatusBadRequest, MessageText: "Validation error."} + ErrNotFound = &ErrResponse{HTTPStatusCode: http.StatusNotFound, MessageText: "Resource not found."} + ErrNotImplemented = &ErrResponse{HTTPStatusCode: http.StatusNotImplemented, MessageText: "Not Implemented."} + ErrMethodNotAllowed = &ErrResponse{HTTPStatusCode: http.StatusMethodNotAllowed, MessageText: "Method not allowed."} + ErrRequestBind = &ErrResponse{HTTPStatusCode: http.StatusBadRequest, MessageText: "Unable to bind request body."} + ErrInternalServer = &ErrResponse{HTTPStatusCode: http.StatusInternalServerError, MessageText: "Internal server error."} +) + +// NotFound Method to render Json respose, used by middleware +func NotFound(w http.ResponseWriter, r *http.Request) { + _ = render.Render(w, r, ErrNotFound) +} + +// MethodNotAllowed Method to render Json respose, used by middleware +func MethodNotAllowed(w http.ResponseWriter, r *http.Request) { + _ = render.Render(w, r, ErrMethodNotAllowed) +} + +func (a *AppServer) renderError(w http.ResponseWriter, r *http.Request, err error, msg ...string) { + log := a.logger + errResp := NewErrResponse(err) + // Logging error at different levels depending on status + switch code := errResp.HTTPStatusCode; { + case code < http.StatusInternalServerError: + log.Warn(err.Error()) + default: + log.Error( + "Internal server error", + zap.Error(err), + ) + } + _ = render.Render(w, r, errResp) +} diff --git a/collector/fileserver.go b/collector/fileserver.go new file mode 100644 index 00000000..5751c23a --- /dev/null +++ b/collector/fileserver.go @@ -0,0 +1,243 @@ +package main + +import ( + "fmt" + "go.uber.org/zap" + "os" + "path/filepath" +) + +type FileDB struct { + logger *zap.Logger + path string +} + +func NewFileDB(logger *zap.Logger, path string) *FileDB { + dirPath := filepath.Dir(path) + if err := os.MkdirAll(dirPath, os.ModeDir); err != nil { + panic(fmt.Sprintf("Unable to create root directory at: %s", dirPath)) + } + logger.Info("Initialized directory used for storing files", + zap.String("path", dirPath)) + + return &FileDB{ + path: dirPath, + logger: logger, + } +} + +func (f *FileDB) getDirs(paths ...string) ([]string, error) { + path := filepath.Join(f.path, filepath.Join(paths...)) + + files, err := os.ReadDir(path) + if err != nil { + return nil, err + } + + var dirNames []string + for _, file := range files { + fileInfo, err := file.Info() + if err != nil { + return nil, err + } + if fileInfo.IsDir() { + dirNames = append(dirNames, fileInfo.Name()) + } + } + + return dirNames, nil +} + +func (f *FileDB) getFiles(paths ...string) ([]string, error) { + path := filepath.Join(f.path, filepath.Join(paths...)) + + files, err := os.ReadDir(path) + if err != nil { + return nil, err + } + + var fileNames []string + for _, file := range files { + fileInfo, err := file.Info() + if err != nil { + return nil, err + } + if !fileInfo.IsDir() { + fileNames = append(fileNames, fileInfo.Name()) + } + } + + return fileNames, nil +} + +// GetChains will return a list of chains reading from files +func (f *FileDB) GetChains() ([]Chain, error) { + chainDirs, err := f.getDirs("/") + if err != nil { + return nil, err + } + + var chains []Chain + for _, chainDir := range chainDirs { + validators, err := f.GetChainValidators(chainDir) + if err != nil { + return nil, err + } + chains = append(chains, Chain{ + Name: chainDir, + Validators: validators, + }) + } + + return chains, nil +} + +func (f *FileDB) IsChain(name string) bool { + chains, err := f.GetChains() + if err != nil { + return false + } + + for _, chain := range chains { + if chain.Name == name { + return true + } + } + + return false +} + +func (f *FileDB) CreateChain(name string) error { + err := os.Mkdir(filepath.Join(f.path, name), os.ModeDir) + if os.IsExist(err) { + return nil + } + + return err +} + +// GetChainValidators will return a list of validators in the chain +func (f *FileDB) GetChainValidators(chain string) ([]Validator, error) { + valDirs, err := f.getDirs(chain) + if err != nil { + return nil, err + } + + var vals []Validator + for _, valDir := range valDirs { + vals = append(vals, Validator{ + Name: valDir, + Moniker: valDir, + }) + } + + return vals, nil +} + +func (f *FileDB) IsValidator(chain string, name string) bool { + vals, err := f.GetChainValidators(chain) + if err != nil { + return false + } + + for _, val := range vals { + if val.Name == name { + return true + } + } + + return false +} + +func (f *FileDB) CreateValidator(chain string, name string) error { + err := os.Mkdir(filepath.Join(f.path, chain, name), os.ModeDir) + if os.IsExist(err) { + return nil + } + + return err +} + +func (f *FileDB) ListSnapshots(chain string, validator string) ([]string, error) { + if !f.IsChain(chain) { + return nil, fmt.Errorf("chain %s does not exists", chain) + } + if !f.IsValidator(chain, validator) { + return nil, fmt.Errorf("validator %s does not exists for %s", chain, validator) + } + + files, err := f.getFiles(chain, validator, "snapshots") + if err != nil { + return nil, err + } + + return files, nil +} + +// GetSnapshot return snapshot stored for the chain and validator +func (f *FileDB) GetSnapshot(chain string, validator string, snapshot string) ([]byte, error) { + filePath := filepath.Join(f.path, chain, validator, "snapshots", snapshot) + data, err := os.ReadFile(filePath) + if err != nil { + return nil, err + } + + return data, nil +} + +func (f *FileDB) StoreSnapshot(chain string, validator string, snapshot string, data []byte) error { + err := os.MkdirAll(filepath.Join(f.path, chain, validator, "snapshots"), os.ModeDir) + if err != nil { + return err + } + + filePath := filepath.Join(f.path, chain, validator, "snapshots", snapshot) + err = os.WriteFile(filePath, data, 0644) + if err != nil { + return fmt.Errorf("unable to write snapshot to %s, with err: %s", filePath, err) + } + + return nil +} + +func (f *FileDB) ListExports(chain string, validator string) ([]string, error) { + if !f.IsChain(chain) { + return nil, fmt.Errorf("chain %s does not exists", chain) + } + if !f.IsValidator(chain, validator) { + return nil, fmt.Errorf("validator %s does not exists for %s", chain, validator) + } + + files, err := f.getFiles(chain, validator, "exports") + if err != nil { + return nil, err + } + + return files, nil +} + +// GetExport return snapshot stored for the chain and validator +func (f *FileDB) GetExport(chain string, validator string, export string) ([]byte, error) { + filePath := filepath.Join(f.path, chain, validator, "exports", export) + data, err := os.ReadFile(filePath) + if err != nil { + return nil, err + } + + return data, nil +} + +func (f *FileDB) StoreExport(chain string, validator string, export string, data []byte) error { + err := os.MkdirAll(filepath.Join(f.path, chain, validator, "exports"), os.ModeDir) + if err != nil { + return err + } + + filePath := filepath.Join(f.path, chain, validator, "exports", export) + err = os.WriteFile(filePath, data, 0644) + if err != nil { + return fmt.Errorf("unable to write snapshot to %s, with err: %s", filePath, err) + } + + return nil +} diff --git a/collector/go.mod b/collector/go.mod new file mode 100644 index 00000000..b60218b7 --- /dev/null +++ b/collector/go.mod @@ -0,0 +1,19 @@ +module exposer + +go 1.19 + +require ( + github.com/go-chi/chi v1.5.4 + github.com/go-chi/render v1.0.2 + github.com/urfave/cli v1.22.10 + go.uber.org/zap v1.23.0 +) + +require ( + github.com/ajg/form v1.5.1 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect + github.com/russross/blackfriday/v2 v2.0.1 // indirect + github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect +) diff --git a/collector/go.sum b/collector/go.sum new file mode 100644 index 00000000..9f8f92ef --- /dev/null +++ b/collector/go.sum @@ -0,0 +1,35 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU= +github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-chi/chi v1.5.4 h1:QHdzF2szwjqVV4wmByUnTcsbIg7UGaQ0tPF2t5GcAIs= +github.com/go-chi/chi v1.5.4/go.mod h1:uaf8YgoFazUOkPBG7fxPftUylNumIev9awIWOENIuEg= +github.com/go-chi/render v1.0.2 h1:4ER/udB0+fMWB2Jlf15RV3F4A2FDuYi/9f+lFttR/Lg= +github.com/go-chi/render v1.0.2/go.mod h1:/gr3hVkmYR0YlEy3LxCuVRFzEu9Ruok+gFqbIofjao0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/urfave/cli v1.22.10 h1:p8Fspmz3iTctJstry1PYS3HVdllxnEzTEsgIgtxTrCk= +github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= +go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/collector/handler.go b/collector/handler.go new file mode 100644 index 00000000..f6c55ccb --- /dev/null +++ b/collector/handler.go @@ -0,0 +1,112 @@ +package main + +import ( + "fmt" + "github.com/go-chi/chi" + "github.com/go-chi/render" + "io" + "net/http" + "os" + "strings" + + "go.uber.org/zap" +) + +func (a *AppServer) renderJSONFile(w http.ResponseWriter, r *http.Request, filePath string) { + jsonFile, err := os.Open(filePath) + if err != nil { + a.logger.Error("Error opening file", + zap.String("file", filePath), + zap.Error(err)) + a.renderError(w, r, fmt.Errorf("error opening json file: %s", filePath)) + } + + byteValue, _ := io.ReadAll(jsonFile) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(byteValue) +} + +func (a *AppServer) GetChains(w http.ResponseWriter, r *http.Request) { + chains, err := a.db.GetChains() + if err != nil { + a.renderError(w, r, err, "unable to get chains") + return + } + + render.JSON(w, r, NewItemsResponse(chains)) +} + +func (a *AppServer) GetChainExports(w http.ResponseWriter, r *http.Request) { + render.Render(w, r, ErrNotImplemented) +} + +func (a *AppServer) SetChainExport(w http.ResponseWriter, r *http.Request) { + render.Render(w, r, ErrNotImplemented) +} + +func (a *AppServer) GetChainExport(w http.ResponseWriter, r *http.Request) { + render.Render(w, r, ErrNotImplemented) +} + +func (a *AppServer) GetChainSnapshots(w http.ResponseWriter, r *http.Request) { + chainID := chi.URLParam(r, "chain") + valID := chi.URLParam(r, "validator") + + snapshotsIDs, err := a.db.ListSnapshots(chainID, valID) + if err != nil { + a.renderError(w, r, err) + return + } + + render.JSON(w, r, NewItemsResponse(snapshotsIDs)) +} + +func (a *AppServer) SetChainSnapshot(w http.ResponseWriter, r *http.Request) { + chainID := chi.URLParam(r, "chain") + valID := chi.URLParam(r, "validator") + snapshotID := chi.URLParam(r, "id") + + if !strings.HasSuffix(snapshotID, a.config.SnapshotExt) { + a.renderError(w, r, fmt.Errorf("snapshot id must contain suffix :%s", a.config.SnapshotExt)) + return + } + + data, err := io.ReadAll(r.Body) + if err != nil { + a.logger.Warn(ErrRequestBind.MessageText, zap.Error(err)) + render.Render(w, r, ErrRequestBind) + return + } + + err = a.db.StoreSnapshot(chainID, valID, snapshotID, data) + if err != nil { + a.logger.Warn("unable to store snapshot", + zap.String("snapshotID", snapshotID), + zap.Error(err)) + a.renderError(w, r, err) + return + } + + render.Status(r, http.StatusCreated) // Set response status to 201 +} + +func (a *AppServer) GetChainSnapshot(w http.ResponseWriter, r *http.Request) { + chainID := chi.URLParam(r, "chain") + valID := chi.URLParam(r, "validator") + snapshotID := chi.URLParam(r, "id") + + data, err := a.db.GetSnapshot(chainID, valID, snapshotID) + if err != nil { + a.logger.Warn("unable to read snapshot", + zap.String("snapshotID", snapshotID), + zap.Error(err)) + a.renderError(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/gzip") + w.WriteHeader(http.StatusOK) + w.Write(data) +} diff --git a/collector/main.go b/collector/main.go new file mode 100644 index 00000000..9eda6426 --- /dev/null +++ b/collector/main.go @@ -0,0 +1,61 @@ +package main + +import ( + "os" + "os/signal" + "runtime" + "syscall" + "time" + + "github.com/urfave/cli" +) + +func init() { + time.LoadLocation("UTC") // ensure all time is in UTC + runtime.GOMAXPROCS(runtime.NumCPU()) // set the core +} + +// NewApp creates a cli app that can be setup using args flags +// gracefull shutdown using termination signals +func NewApp() *cli.App { + conf := NewDefaultConfig() + app := cli.NewApp() + app.Name = Prog + app.Usage = Description + app.Version = Version + app.Flags = GetCommandLineOptions() + app.UsageText = "exposer [options]" + + app.Action = func(ctx *cli.Context) error { + if err := ParseCLIOptions(ctx, conf); err != nil { + return cli.NewExitError(err.Error(), 1) + } + + // Alternative Sentry Setup point where we can actually pass along configuration options + // SetupSentry(conf) + + server, err := NewAppServer(conf) + if err != nil { + return cli.NewExitError(err.Error(), 1) + } + + defer server.logger.Sync() + if err := server.Run(); err != nil { + return cli.NewExitError(err.Error(), 1) + } + + // Setup the termination signals + signalChannel := make(chan os.Signal) + signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-signalChannel + + return nil + } + + return app +} + +func main() { + app := NewApp() + app.Run(os.Args) +} diff --git a/collector/model.go b/collector/model.go new file mode 100644 index 00000000..0041c8fc --- /dev/null +++ b/collector/model.go @@ -0,0 +1,62 @@ +package main + +type ItemsResponse struct { + Items interface{} `json:"items"` + TotalItems int `json:"total_items"` +} + +func NewItemsResponse(items interface{}) ItemsResponse { + if items == nil { + return ItemsResponse{ + Items: []string{}, + TotalItems: 0, + } + } + + listItems, ok := items.([]interface{}) + if !ok { + return ItemsResponse{ + Items: []interface{}{items}, + TotalItems: 1, + } + } + + return ItemsResponse{ + Items: listItems, + TotalItems: len(listItems), + } +} + +type Chain struct { + Name string `json:"name,omitempty"` + Type string `json:"type,omitempty"` + Validators []Validator `json:"validators,omitempty"` +} + +type Validator struct { + Name string `json:"name,omitempty"` + Moniker string `json:"moniker,omitempty"` + Address string `json:"address,omitempty"` +} + +type State struct { + ID string `json:"id,omitempty"` + Height string `json:"height,omitempty"` + DataType string `json:"data_type,omitempty"` +} + +func NewExportState(id string, height string) State { + return State{ + ID: id, + Height: height, + DataType: "json", + } +} + +func NewSnapshotState(id string, height string) State { + return State{ + ID: id, + Height: height, + DataType: "tar", + } +} diff --git a/exposer/handler.go b/exposer/handler.go index eea4bc2d..111481ed 100644 --- a/exposer/handler.go +++ b/exposer/handler.go @@ -35,7 +35,6 @@ func (a *AppServer) renderJSONFile(w http.ResponseWriter, r *http.Request, fileP zap.Error(err)) a.renderError(w, r, fmt.Errorf("error opening json file: %s", filePath)) } - defer jsonFile.Close() byteValue, _ := io.ReadAll(jsonFile)