From 99ede8fda3353c6b83a87d9bed1273809a4bdc14 Mon Sep 17 00:00:00 2001 From: michaeljguarino Date: Mon, 16 Dec 2024 22:13:17 -0500 Subject: [PATCH] Implement pluggable log aggregation drivers Starting w/ victoria metrics, and will also implement elastic. Support authz against main plural objects, eg services, clusters, projects, and query constructs that would work with a fully fledged log agg ui. --- assets/src/generated/graphql.ts | 47 +++++++++++++ go/client/models_gen.go | 70 +++++++++++++++++++ lib/console/graphql/deployments/settings.ex | 18 ++++- lib/console/graphql/observability.ex | 29 ++++++++ .../graphql/resolvers/observability.ex | 8 +++ lib/console/logs/line.ex | 9 +++ lib/console/logs/provider.ex | 27 +++++++ lib/console/logs/provider/utils.ex | 6 ++ lib/console/logs/provider/victoria.ex | 61 ++++++++++++++++ lib/console/logs/query.ex | 47 +++++++++++++ lib/console/logs/stream/exec.ex | 57 +++++++++++++++ lib/console/logs/stream/json_line.ex | 10 +++ lib/console/logs/time.ex | 7 ++ lib/console/schema/deployment_settings.ex | 15 ++++ .../20241216181834_add_log_driver_support.exs | 9 +++ schema/schema.graphql | 39 +++++++++++ .../queries/observability_queries_test.exs | 27 +++++++ test/support/test_helpers.ex | 2 + test/test_helper.exs | 1 + 19 files changed, 488 insertions(+), 1 deletion(-) create mode 100644 lib/console/logs/line.ex create mode 100644 lib/console/logs/provider.ex create mode 100644 lib/console/logs/provider/utils.ex create mode 100644 lib/console/logs/provider/victoria.ex create mode 100644 lib/console/logs/query.ex create mode 100644 lib/console/logs/stream/exec.ex create mode 100644 lib/console/logs/stream/json_line.ex create mode 100644 lib/console/logs/time.ex create mode 100644 priv/repo/migrations/20241216181834_add_log_driver_support.exs diff --git a/assets/src/generated/graphql.ts b/assets/src/generated/graphql.ts index 42260aaf0e..e26306b397 100644 --- a/assets/src/generated/graphql.ts +++ b/assets/src/generated/graphql.ts @@ -2351,6 +2351,8 @@ export type DeploymentSettings = { insertedAt?: Maybe; /** the latest known k8s version */ latestK8sVsn: Scalars['String']['output']; + /** settings for connections to log aggregation datastores */ + logging?: Maybe; /** the way we can connect to your loki instance */ lokiConnection?: Maybe; name: Scalars['String']['output']; @@ -3305,6 +3307,17 @@ export type LoadBalancerStatus = { ingress?: Maybe>>; }; +export enum LogDriver { + Elastic = 'ELASTIC', + Victoria = 'VICTORIA' +} + +export type LogFacet = { + __typename?: 'LogFacet'; + key: Scalars['String']['output']; + value: Scalars['String']['output']; +}; + export type LogFilter = { __typename?: 'LogFilter'; metadata: Metadata; @@ -3325,12 +3338,36 @@ export type LogLabel = { value?: Maybe; }; +export type LogLine = { + __typename?: 'LogLine'; + facets?: Maybe>>; + log: Scalars['String']['output']; + timestamp?: Maybe; +}; + export type LogStream = { __typename?: 'LogStream'; stream?: Maybe; values?: Maybe>>; }; +export type LogTimeRange = { + after?: InputMaybe; + before?: InputMaybe; + duration?: InputMaybe; + reverse?: InputMaybe; +}; + +/** Settings for configuring log aggregation throughout Plural */ +export type LoggingSettings = { + __typename?: 'LoggingSettings'; + /** the type of log aggregation solution you wish to use */ + driver?: Maybe; + enabled?: Maybe; + /** configures a connection to victoria metrics */ + victoria?: Maybe; +}; + export type LoginInfo = { __typename?: 'LoginInfo'; external?: Maybe; @@ -6802,6 +6839,7 @@ export type RootQueryType = { installations?: Maybe; invite?: Maybe; job?: Maybe; + logAggregation?: Maybe>>; logFilters?: Maybe>>; loginInfo?: Maybe; logs?: Maybe>>; @@ -7358,6 +7396,15 @@ export type RootQueryTypeJobArgs = { }; +export type RootQueryTypeLogAggregationArgs = { + clusterId?: InputMaybe; + limit?: InputMaybe; + query?: InputMaybe; + serviceId?: InputMaybe; + time?: InputMaybe; +}; + + export type RootQueryTypeLogFiltersArgs = { namespace: Scalars['String']['input']; }; diff --git a/go/client/models_gen.go b/go/client/models_gen.go index 246a37ada2..551bb80cbd 100644 --- a/go/client/models_gen.go +++ b/go/client/models_gen.go @@ -1915,6 +1915,8 @@ type DeploymentSettings struct { Ai *AiSettings `json:"ai,omitempty"` // settings for cost management Cost *CostSettings `json:"cost,omitempty"` + // settings for connections to log aggregation datastores + Logging *LoggingSettings `json:"logging,omitempty"` // The console's expected agent version AgentVsn string `json:"agentVsn"` // the latest known k8s version @@ -2724,6 +2726,11 @@ type LoadBalancerStatus struct { Ingress []*LoadBalancerIngressStatus `json:"ingress,omitempty"` } +type LogFacet struct { + Key string `json:"key"` + Value string `json:"value"` +} + type LogFilter struct { Metadata Metadata `json:"metadata"` Spec LogFilterSpec `json:"spec"` @@ -2741,11 +2748,33 @@ type LogLabel struct { Value *string `json:"value,omitempty"` } +type LogLine struct { + Timestamp *string `json:"timestamp,omitempty"` + Log string `json:"log"` + Facets []*LogFacet `json:"facets,omitempty"` +} + type LogStream struct { Stream map[string]interface{} `json:"stream,omitempty"` Values []*MetricResult `json:"values,omitempty"` } +type LogTimeRange struct { + Before *string `json:"before,omitempty"` + After *string `json:"after,omitempty"` + Duration *string `json:"duration,omitempty"` + Reverse *bool `json:"reverse,omitempty"` +} + +// Settings for configuring log aggregation throughout Plural +type LoggingSettings struct { + Enabled *bool `json:"enabled,omitempty"` + // the type of log aggregation solution you wish to use + Driver *LogDriver `json:"driver,omitempty"` + // configures a connection to victoria metrics + Victoria *HTTPConnection `json:"victoria,omitempty"` +} + type LoginInfo struct { OidcURI *string `json:"oidcUri,omitempty"` External *bool `json:"external,omitempty"` @@ -7253,6 +7282,47 @@ func (e ListMerge) MarshalGQL(w io.Writer) { fmt.Fprint(w, strconv.Quote(e.String())) } +type LogDriver string + +const ( + LogDriverVictoria LogDriver = "VICTORIA" + LogDriverElastic LogDriver = "ELASTIC" +) + +var AllLogDriver = []LogDriver{ + LogDriverVictoria, + LogDriverElastic, +} + +func (e LogDriver) IsValid() bool { + switch e { + case LogDriverVictoria, LogDriverElastic: + return true + } + return false +} + +func (e LogDriver) String() string { + return string(e) +} + +func (e *LogDriver) UnmarshalGQL(v interface{}) error { + str, ok := v.(string) + if !ok { + return fmt.Errorf("enums must be strings") + } + + *e = LogDriver(str) + if !e.IsValid() { + return fmt.Errorf("%s is not a valid LogDriver", str) + } + return nil +} + +func (e LogDriver) MarshalGQL(w io.Writer) { + fmt.Fprint(w, strconv.Quote(e.String())) +} + type MatchStrategy string const ( diff --git a/lib/console/graphql/deployments/settings.ex b/lib/console/graphql/deployments/settings.ex index b57c79df77..2482ec6181 100644 --- a/lib/console/graphql/deployments/settings.ex +++ b/lib/console/graphql/deployments/settings.ex @@ -2,8 +2,10 @@ defmodule Console.GraphQl.Deployments.Settings do use Console.GraphQl.Schema.Base alias Console.Deployments.Settings alias Console.GraphQl.Resolvers.{Deployments} + alias Console.Schema.DeploymentSettings - ecto_enum :ai_provider, Console.Schema.DeploymentSettings.AIProvider + ecto_enum :ai_provider, DeploymentSettings.AIProvider + ecto_enum :log_driver, DeploymentSettings.LogDriver input_object :project_attributes do field :name, non_null(:string) @@ -49,6 +51,12 @@ defmodule Console.GraphQl.Deployments.Settings do field :recommendation_cushion, :integer, description: "the percentage change needed to generate a recommendation, default 20%" end + input_object :logging_settings_attributes do + field :enabled, :boolean + field :driver, :log_driver + field :victoria, :http_connection_attributes + end + input_object :ai_settings_attributes do field :enabled, :boolean field :tools, :tool_config_attributes @@ -154,6 +162,7 @@ defmodule Console.GraphQl.Deployments.Settings do field :smtp, :smtp_settings, description: "smtp server configuration for email notifications" field :ai, :ai_settings, description: "settings for LLM provider clients" field :cost, :cost_settings, description: "settings for cost management" + field :logging, :logging_settings, description: "settings for connections to log aggregation datastores" field :agent_vsn, non_null(:string), description: "The console's expected agent version", resolve: fn _, _, _ -> {:ok, Settings.agent_vsn()} end @@ -259,6 +268,13 @@ defmodule Console.GraphQl.Deployments.Settings do field :location, non_null(:string), description: "the gcp region the model" end + @desc "Settings for configuring log aggregation throughout Plural" + object :logging_settings do + field :enabled, :boolean + field :driver, :log_driver, description: "the type of log aggregation solution you wish to use" + field :victoria, :http_connection, description: "configures a connection to victoria metrics" + end + connection node_type: :project object :settings_queries do diff --git a/lib/console/graphql/observability.ex b/lib/console/graphql/observability.ex index 28d3f16ca1..cddf7869d6 100644 --- a/lib/console/graphql/observability.ex +++ b/lib/console/graphql/observability.ex @@ -13,6 +13,13 @@ defmodule Console.GraphQl.Observability do field :value, :string end + input_object :log_time_range do + field :before, :string + field :after, :string + field :duration, :string + field :reverse, :boolean + end + object :dashboard do field :id, non_null(:string), resolve: fn %{metadata: %{name: n}}, _, _ -> {:ok, n} end field :spec, non_null(:dashboard_spec) @@ -50,6 +57,17 @@ defmodule Console.GraphQl.Observability do end end + object :log_line do + field :timestamp, :datetime + field :log, non_null(:string) + field :facets, list_of(:log_facet) + end + + object :log_facet do + field :key, non_null(:string) + field :value, non_null(:string) + end + object :observability_queries do field :dashboards, list_of(:dashboard) do middleware Authenticated @@ -83,6 +101,17 @@ defmodule Console.GraphQl.Observability do safe_resolve &Observability.resolve_metric/2 end + field :log_aggregation, list_of(:log_line) do + middleware Authenticated + arg :service_id, :id + arg :cluster_id, :id + arg :query, :string + arg :time, :log_time_range + arg :limit, :integer + + resolve &Observability.list_logs/2 + end + field :logs, list_of(:log_stream) do middleware Authenticated diff --git a/lib/console/graphql/resolvers/observability.ex b/lib/console/graphql/resolvers/observability.ex index a1b08b1cad..e1ae6b2aa6 100644 --- a/lib/console/graphql/resolvers/observability.ex +++ b/lib/console/graphql/resolvers/observability.ex @@ -1,5 +1,7 @@ defmodule Console.GraphQl.Resolvers.Observability do alias Console.Services.Observability + alias Console.Logs.{Provider, Query} + @default_offset 30 * 60 @nano 1_000_000_000 @@ -16,6 +18,12 @@ defmodule Console.GraphQl.Resolvers.Observability do end end + def list_logs(args, %{context: %{current_user: user}}) do + query = Query.new(args) + with {:ok, query} <- Provider.accessible(query, user), + do: Provider.query(query) + end + def resolve_logs(%{query: query, limit: limit} = args, _) do now = Timex.now() start = (args[:start] || ts(now)) / @nano diff --git a/lib/console/logs/line.ex b/lib/console/logs/line.ex new file mode 100644 index 0000000000..6cb6ba6517 --- /dev/null +++ b/lib/console/logs/line.ex @@ -0,0 +1,9 @@ +defmodule Console.Logs.Line do + @type t :: %__MODULE__{facets: [%{key: binary, value: binary}]} + + defstruct [:timestamp, :log, :facets] + + def new(map) do + %__MODULE__{log: map[:log], timestamp: map[:timestamp], facets: map[:facets]} + end +end diff --git a/lib/console/logs/provider.ex b/lib/console/logs/provider.ex new file mode 100644 index 0000000000..6acfa0a64f --- /dev/null +++ b/lib/console/logs/provider.ex @@ -0,0 +1,27 @@ +defmodule Console.Logs.Provider do + alias Console.Logs.{Query, Line} + alias Console.Logs.Provider.{Victoria} + alias Console.Schema.{User, DeploymentSettings} + + @type error :: Console.error + + @callback query(struct, Query.t) :: {:ok, [Line.t]} | error + + @spec query(Query.t) :: {:ok, [Line.t]} | error + def query(%Query{} = q) do + with {:ok, %{__struct__: provider} = prov} <- client(), + do: provider.query(prov, q) + end + + @spec accessible(Query.t, User.t) :: {:ok, Query.t} | error + def accessible(%Query{} = query, %User{} = user), do: Query.accessible(query, user) + + defp client() do + Console.Deployments.Settings.cached() + |> client() + end + + def client(%DeploymentSettings{logging: %{enabled: true, driver: :victoria, victoria: %{} = victoria}}), + do: {:ok, Victoria.new(victoria)} + def client(_), do: {:error, "Plural logging integration not yet configured"} +end diff --git a/lib/console/logs/provider/utils.ex b/lib/console/logs/provider/utils.ex new file mode 100644 index 0000000000..e0a1f7243d --- /dev/null +++ b/lib/console/logs/provider/utils.ex @@ -0,0 +1,6 @@ +defmodule Console.Logs.Provider.Utils do + def facets(%{} = map) do + Enum.map(map, fn {k, v} -> %{key: k, value: v} end) + end + def facets(_), do: nil +end diff --git a/lib/console/logs/provider/victoria.ex b/lib/console/logs/provider/victoria.ex new file mode 100644 index 0000000000..54a0fce219 --- /dev/null +++ b/lib/console/logs/provider/victoria.ex @@ -0,0 +1,61 @@ +defmodule Console.Logs.Provider.Victoria do + @moduledoc """ + Log driver implementation for victoria metrics + """ + @behaviour Console.Logs.Provider + import Console.Logs.Provider.Utils + alias Console.Logs.{Query, Time, Line, Stream.Exec} + alias Console.Schema.{Cluster, Service, DeploymentSettings.Connection} + + @headers [{"content-type", "application/x-www-form-urlencoded"}] + + defstruct [:connection] + + def new(conn), do: %__MODULE__{connection: conn} + + def query(%__MODULE__{connection: %Connection{host: host} = conn}, %Query{} = query) when is_binary(host) do + Path.join(host, "/select/logsql/query") + |> HTTPoison.post({:form, [ + {"query", build_query(query)}, + {"limit", "#{Query.limit(query)}"} + ]}, headers(conn), [stream_to: self(), async: :once]) + |> Exec.exec(mapper: &line/1) + end + def query(_, _), do: {:error, "no victoria metrics host specified"} + + defp build_query(%Query{query: query} = q) do + add_resource([query], q) + |> add_time(q) + |> maybe_reverse(q) + |> Enum.join(" ") + end + + defp line(%{"_msg" => log, "_time" => time} = line) do + %Line{ + log: log, + timestamp: Timex.parse!(time, "{ISO:Extended}"), + facets: Map.drop(line, ~w(_msg _time)) |> facets() + } + end + + defp headers(%Connection{user: u, password: p}) when is_binary(u) and is_binary(p), + do: [{"Authorization", Plug.BasicAuth.encode_basic_auth(u, p)} | @headers] + defp headers(_), do: @headers + + defp add_resource(io, %Query{resource: %Cluster{} = cluster}), + do: [cluster_label(cluster) | io] + defp add_resource(io, %Query{resource: %Service{namespace: ns} = svc}) do + %{cluster: %Cluster{} = cluster} = Console.Repo.preload(svc, [:cluster]) + [cluster_label(cluster), ~s("namespace":#{ns}) | io] + end + + defp cluster_label(%Cluster{handle: h}), do: ~s("cluster":"#{h}") + + defp add_time(io, %Query{time: %Time{after: a, before: b}}) when is_binary(a) and is_binary(b), + do: [~s(_time:[#{a}, #{b}]) | io] + defp add_time(io, %Query{time: %Time{duration: d}}) when is_binary(d), + do: [~s(_time:#{d}) | io] + + defp maybe_reverse(io, %Query{time: %Time{reverse: true}}), do: io ++ ["| sort by (_time desc)"] + defp maybe_reverse(io, _), do: io +end diff --git a/lib/console/logs/query.ex b/lib/console/logs/query.ex new file mode 100644 index 0000000000..93c708ef8a --- /dev/null +++ b/lib/console/logs/query.ex @@ -0,0 +1,47 @@ +defmodule Console.Logs.Query do + alias Console.Logs.Time + alias Console.Schema.{User, Project, Cluster, Service} + alias Console.Deployments.Policies + + @default_limit 200 + + @type t :: %__MODULE__{time: Time.t} + + defstruct [:project_id, :cluster_id, :service_id, :query, :limit, :resource, :time] + + def new(args) do + %__MODULE__{ + project_id: args[:project_id], + cluster_id: args[:cluster_id], + service_id: args[:service_id], + query: args[:query], + limit: args[:limit], + time: Time.new(args) + } + end + + def limit(%__MODULE__{limit: l}) when is_integer(l), do: l + def limit(_), do: @default_limit + + def accessible(q, %User{roles: %{admin: true}}), do: {:ok, q} + + def accessible(%__MODULE__{project_id: project_id} = q, %User{} = user) when is_binary(project_id), + do: check_access(Project, project_id, user, q) + + def accessible(%__MODULE__{cluster_id: id} = q, %User{} = user) when is_binary(id), + do: check_access(Cluster, id, user, q) + + def accessible(%__MODULE__{service_id: id} = q, %User{} = user) when is_binary(id), + do: check_access(Service, id, user, q) + + def accessible(_, _), do: {:error, "forbidden"} + + defp check_access(model, id, user, query) do + Console.Repo.get!(model, id) + |> Policies.allow(user, :read) + |> case do + {:ok, resource} -> {:ok, %{query | resource: resource}} + err -> err + end + end +end diff --git a/lib/console/logs/stream/exec.ex b/lib/console/logs/stream/exec.ex new file mode 100644 index 0000000000..95ec3a7fcd --- /dev/null +++ b/lib/console/logs/stream/exec.ex @@ -0,0 +1,57 @@ +defmodule Console.Logs.Stream.Exec do + alias Console.Logs.Line + + def exec(start, opts \\ []) do + parser = Keyword.get(opts, :parser, Console.Logs.Stream.JsonLine) + mapper = Keyword.get(opts, :mapper, &Line.new/1) + + build_stream(start, parser) + |> Enum.reduce_while([], fn + {:error, error}, _ -> {:halt, {:error, "service error: #{inspect(error)}"}} + {:ok, res}, acc -> {:cont, [mapper.(res) | acc]} + end) + |> case do + l when is_list(l) -> {:ok, Enum.reverse(l)} + {:error, _} = err -> err + end + end + + defp build_stream(start, parser) do + Stream.resource( + start, + fn + {:error, %HTTPoison.Error{} = error} -> {[{:error, error}], :error} + {{:error, err}, _} -> {[{:error, err}], :error} + + {:ok, %HTTPoison.AsyncResponse{}} = resp -> {[], {resp, ""}} + + {{:ok, %HTTPoison.AsyncResponse{id: id} = res}, acc} -> + receive do + %HTTPoison.AsyncStatus{id: ^id, code: code} when code >= 200 and code < 400 -> + {[], stream_next(res, acc)} + + %HTTPoison.AsyncStatus{id: ^id, code: code} -> + {[{:error, "error code: #{code}"}], :error} + + %HTTPoison.AsyncHeaders{id: ^id, headers: _headers} -> + {[], stream_next(res, acc)} + + %HTTPoison.AsyncChunk{chunk: chunk} -> + {items, remaining} = parser.parse(acc <> chunk) + {items, stream_next(res, remaining)} + + %HTTPoison.AsyncEnd{id: ^id} -> + {:halt, res} + end + {:error, _} -> {:halt, :error} + :error -> {:halt, :error} + end, + fn + %{id: id} -> :hackney.stop_async(id) + :error -> :ok + end + ) + end + + defp stream_next(resp, acc), do: {HTTPoison.stream_next(resp), acc} +end diff --git a/lib/console/logs/stream/json_line.ex b/lib/console/logs/stream/json_line.ex new file mode 100644 index 0000000000..1aa3a1861d --- /dev/null +++ b/lib/console/logs/stream/json_line.ex @@ -0,0 +1,10 @@ +defmodule Console.Logs.Stream.JsonLine do + @eol ~r/(\r?\n|\r)/ + + @spec parse(binary) :: {[{:ok, map} | {:error, term}], binary} + def parse(chunk) do + chunks = String.split(chunk, @eol) + {straggle, full} = List.pop_at(chunks, -1) + {Enum.map(full, &Jason.decode/1), straggle} + end +end diff --git a/lib/console/logs/time.ex b/lib/console/logs/time.ex new file mode 100644 index 0000000000..4feca7f47f --- /dev/null +++ b/lib/console/logs/time.ex @@ -0,0 +1,7 @@ +defmodule Console.Logs.Time do + @type t :: %__MODULE__{} + defstruct [:after, :before, :duration, :reverse] + + def new(%{} = args), do: struct(__MODULE__, args) + def new(_), do: nil +end diff --git a/lib/console/schema/deployment_settings.ex b/lib/console/schema/deployment_settings.ex index 73f13131b9..5b5cb6c1b3 100644 --- a/lib/console/schema/deployment_settings.ex +++ b/lib/console/schema/deployment_settings.ex @@ -4,6 +4,7 @@ defmodule Console.Schema.DeploymentSettings do alias Piazza.Ecto.EncryptedString defenum AIProvider, openai: 0, anthropic: 1, ollama: 2, azure: 3, bedrock: 4, vertex: 5 + defenum LogDriver, victoria: 0, elastic: 1 defmodule Connection do use Piazza.Ecto.Schema @@ -54,6 +55,13 @@ defmodule Console.Schema.DeploymentSettings do field :ssl, :boolean end + embeds_one :logging, Logging, on_replace: :update do + field :enabled, :boolean + field :driver, LogDriver + + embeds_one :victoria, Connection, on_replace: :update + end + embeds_one :cost, Cost, on_replace: :update do field :enabled, :boolean field :recommendation_threshold, :integer @@ -159,6 +167,7 @@ defmodule Console.Schema.DeploymentSettings do |> cast_embed(:smtp, with: &smtp_changeset/2) |> cast_embed(:stacks, with: &stacks_changeset/2) |> cast_embed(:cost, with: &cost_changeset/2) + |> cast_embed(:logging, with: &logging_changeset/2) |> change_markers(agent_helm_values: :helm_changed, agent_version: :version_changed) |> put_new_change(:write_policy_id, &Ecto.UUID.generate/0) |> put_new_change(:read_policy_id, &Ecto.UUID.generate/0) @@ -241,4 +250,10 @@ defmodule Console.Schema.DeploymentSettings do model |> cast(attrs, ~w(enabled recommendation_threshold recommendation_cushion)a) end + + defp logging_changeset(model, attrs) do + model + |> cast(attrs, [:enabled]) + |> cast_embed(:victoria) + end end diff --git a/priv/repo/migrations/20241216181834_add_log_driver_support.exs b/priv/repo/migrations/20241216181834_add_log_driver_support.exs new file mode 100644 index 0000000000..1c96526767 --- /dev/null +++ b/priv/repo/migrations/20241216181834_add_log_driver_support.exs @@ -0,0 +1,9 @@ +defmodule Console.Repo.Migrations.AddLogDriverSupport do + use Ecto.Migration + + def change do + alter table(:deployment_settings) do + add :logging, :map + end + end +end diff --git a/schema/schema.graphql b/schema/schema.graphql index 8be4b25f97..16bf473b48 100644 --- a/schema/schema.graphql +++ b/schema/schema.graphql @@ -59,6 +59,8 @@ type RootQueryType { metric(query: String!, offset: Int, step: String, clusterId: ID): [MetricResponse] + logAggregation(serviceId: ID, clusterId: ID, query: String, time: LogTimeRange, limit: Int): [LogLine] + logs(query: String!, start: Long, end: Long, limit: Int!, clusterId: ID): [LogStream] scalingRecommendation(kind: AutoscalingTarget!, namespace: String!, name: String!): VerticalPodAutoscaler @@ -1072,6 +1074,11 @@ enum AiProvider { VERTEX } +enum LogDriver { + VICTORIA + ELASTIC +} + input ProjectAttributes { name: String! description: String @@ -1321,6 +1328,9 @@ type DeploymentSettings { "settings for cost management" cost: CostSettings + "settings for connections to log aggregation datastores" + logging: LoggingSettings + "The console's expected agent version" agentVsn: String! @@ -1483,6 +1493,17 @@ type VertexAiSettings { location: String! } +"Settings for configuring log aggregation throughout Plural" +type LoggingSettings { + enabled: Boolean + + "the type of log aggregation solution you wish to use" + driver: LogDriver + + "configures a connection to victoria metrics" + victoria: HttpConnection +} + type ProjectConnection { pageInfo: PageInfo! edges: [ProjectEdge] @@ -7009,6 +7030,13 @@ input LabelInput { value: String } +input LogTimeRange { + before: String + after: String + duration: String + reverse: Boolean +} + type Dashboard { id: String! spec: DashboardSpec! @@ -7044,6 +7072,17 @@ type MetricResponse { values: [MetricResult] } +type LogLine { + timestamp: DateTime + log: String! + facets: [LogFacet] +} + +type LogFacet { + key: String! + value: String! +} + type Metadata { labels: [LabelPair] annotations: [LabelPair] diff --git a/test/console/graphql/queries/observability_queries_test.exs b/test/console/graphql/queries/observability_queries_test.exs index e0ede1dc91..32e7b1a415 100644 --- a/test/console/graphql/queries/observability_queries_test.exs +++ b/test/console/graphql/queries/observability_queries_test.exs @@ -177,6 +177,33 @@ defmodule Console.GraphQl.ObservabilityQueriesTest do end end + describe "logAggregation" do + test "it can fetch from a logs db" do + user = insert(:user) + svc = insert(:service, read_bindings: [%{user_id: user.id}]) + expect(Console.Logs.Provider, :query, fn _ -> {:ok, [log_line("a log")]} end) + + {:ok, %{data: %{"logAggregation" => [line]}}} = run_query(""" + query Logs($serviceId: ID!) { + logAggregation(serviceId: $serviceId) { timestamp log } + } + """, %{"serviceId" => svc.id}, %{current_user: user}) + + assert line["log"] == "a log" + end + + test "it will authz" do + user = insert(:user) + svc = insert(:service) + + {:ok, %{errors: [_ | _]}} = run_query(""" + query Logs($serviceId: ID!) { + logAggregation(serviceId: $serviceId) { timestamp log } + } + """, %{"serviceId" => svc.id}, %{current_user: user}) + end + end + describe "scalingRecommendation" do test "it can fetch a vpa to provide recommendations" do user = insert(:user) diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex index 2ede28058e..e73007a749 100644 --- a/test/support/test_helpers.ex +++ b/test/support/test_helpers.ex @@ -54,5 +54,7 @@ defmodule Console.TestHelpers do def run_query(query, variables, context \\ %{}), do: Absinthe.run(query, Console.GraphQl, variables: variables, context: context) + def log_line(log), do: %Console.Logs.Line{timestamp: Timex.now(), log: log} + def from_connection(%{"edges" => edges}), do: Enum.map(edges, & &1["node"]) end diff --git a/test/test_helper.exs b/test/test_helper.exs index b52d327a38..1c9a7017a3 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -12,6 +12,7 @@ Mimic.copy(Console.Buffers.TokenAudit) Mimic.copy(Console.Deployments.Git.Discovery) Mimic.copy(Console.Deployments.Pipelines.Discovery) Mimic.copy(Console.Deployments.Pipelines) +Mimic.copy(Console.Logs.Provider) Mimic.copy(Console) Mimic.copy(Kazan) Mimic.copy(HTTPoison)