From 510f46ab48d08a1ac715c05b3b3b53adeb4d9e4a Mon Sep 17 00:00:00 2001 From: michaeljguarino Date: Mon, 16 Dec 2024 22:13:17 -0500 Subject: [PATCH 1/2] Implement pluggable log aggregation drivers Starting w/ victoria metrics, and will also implement elastic. Support authz against main plural objects, eg services, clusters, projects, and query constructs that would work with a fully fledged log agg ui. --- assets/src/generated/graphql.ts | 47 +++++++++++++ go/client/models_gen.go | 70 +++++++++++++++++++ lib/console/deployments/pipelines.ex | 10 ++- lib/console/graphql/deployments/settings.ex | 18 ++++- lib/console/graphql/observability.ex | 29 ++++++++ .../graphql/resolvers/observability.ex | 8 +++ lib/console/logs/line.ex | 9 +++ lib/console/logs/provider.ex | 27 +++++++ lib/console/logs/provider/utils.ex | 6 ++ lib/console/logs/provider/victoria.ex | 66 +++++++++++++++++ lib/console/logs/query.ex | 47 +++++++++++++ lib/console/logs/stream/exec.ex | 57 +++++++++++++++ lib/console/logs/stream/json_line.ex | 10 +++ lib/console/logs/time.ex | 7 ++ lib/console/schema/deployment_settings.ex | 15 ++++ .../20241216181834_add_log_driver_support.exs | 9 +++ schema/schema.graphql | 39 +++++++++++ .../queries/observability_queries_test.exs | 27 +++++++ test/support/test_helpers.ex | 2 + test/test_helper.exs | 1 + 20 files changed, 500 insertions(+), 4 deletions(-) create mode 100644 lib/console/logs/line.ex create mode 100644 lib/console/logs/provider.ex create mode 100644 lib/console/logs/provider/utils.ex create mode 100644 lib/console/logs/provider/victoria.ex create mode 100644 lib/console/logs/query.ex create mode 100644 lib/console/logs/stream/exec.ex create mode 100644 lib/console/logs/stream/json_line.ex create mode 100644 lib/console/logs/time.ex create mode 100644 priv/repo/migrations/20241216181834_add_log_driver_support.exs diff --git a/assets/src/generated/graphql.ts b/assets/src/generated/graphql.ts index 42260aaf0e..e26306b397 100644 --- a/assets/src/generated/graphql.ts +++ b/assets/src/generated/graphql.ts @@ -2351,6 +2351,8 @@ export type DeploymentSettings = { insertedAt?: Maybe; /** the latest known k8s version */ latestK8sVsn: Scalars['String']['output']; + /** settings for connections to log aggregation datastores */ + logging?: Maybe; /** the way we can connect to your loki instance */ lokiConnection?: Maybe; name: Scalars['String']['output']; @@ -3305,6 +3307,17 @@ export type LoadBalancerStatus = { ingress?: Maybe>>; }; +export enum LogDriver { + Elastic = 'ELASTIC', + Victoria = 'VICTORIA' +} + +export type LogFacet = { + __typename?: 'LogFacet'; + key: Scalars['String']['output']; + value: Scalars['String']['output']; +}; + export type LogFilter = { __typename?: 'LogFilter'; metadata: Metadata; @@ -3325,12 +3338,36 @@ export type LogLabel = { value?: Maybe; }; +export type LogLine = { + __typename?: 'LogLine'; + facets?: Maybe>>; + log: Scalars['String']['output']; + timestamp?: Maybe; +}; + export type LogStream = { __typename?: 'LogStream'; stream?: Maybe; values?: Maybe>>; }; +export type LogTimeRange = { + after?: InputMaybe; + before?: InputMaybe; + duration?: InputMaybe; + reverse?: InputMaybe; +}; + +/** Settings for configuring log aggregation throughout Plural */ +export type LoggingSettings = { + __typename?: 'LoggingSettings'; + /** the type of log aggregation solution you wish to use */ + driver?: Maybe; + enabled?: Maybe; + /** configures a connection to victoria metrics */ + victoria?: Maybe; +}; + export type LoginInfo = { __typename?: 'LoginInfo'; external?: Maybe; @@ -6802,6 +6839,7 @@ export type RootQueryType = { installations?: Maybe; invite?: Maybe; job?: Maybe; + logAggregation?: Maybe>>; logFilters?: Maybe>>; loginInfo?: Maybe; logs?: Maybe>>; @@ -7358,6 +7396,15 @@ export type RootQueryTypeJobArgs = { }; +export type RootQueryTypeLogAggregationArgs = { + clusterId?: InputMaybe; + limit?: InputMaybe; + query?: InputMaybe; + serviceId?: InputMaybe; + time?: InputMaybe; +}; + + export type RootQueryTypeLogFiltersArgs = { namespace: Scalars['String']['input']; }; diff --git a/go/client/models_gen.go b/go/client/models_gen.go index 246a37ada2..551bb80cbd 100644 --- a/go/client/models_gen.go +++ b/go/client/models_gen.go @@ -1915,6 +1915,8 @@ type DeploymentSettings struct { Ai *AiSettings `json:"ai,omitempty"` // settings for cost management Cost *CostSettings `json:"cost,omitempty"` + // settings for connections to log aggregation datastores + Logging *LoggingSettings `json:"logging,omitempty"` // The console's expected agent version AgentVsn string `json:"agentVsn"` // the latest known k8s version @@ -2724,6 +2726,11 @@ type LoadBalancerStatus struct { Ingress []*LoadBalancerIngressStatus `json:"ingress,omitempty"` } +type LogFacet struct { + Key string `json:"key"` + Value string `json:"value"` +} + type LogFilter struct { Metadata Metadata `json:"metadata"` Spec LogFilterSpec `json:"spec"` @@ -2741,11 +2748,33 @@ type LogLabel struct { Value *string `json:"value,omitempty"` } +type LogLine struct { + Timestamp *string `json:"timestamp,omitempty"` + Log string `json:"log"` + Facets []*LogFacet `json:"facets,omitempty"` +} + type LogStream struct { Stream map[string]interface{} `json:"stream,omitempty"` Values []*MetricResult `json:"values,omitempty"` } +type LogTimeRange struct { + Before *string `json:"before,omitempty"` + After *string `json:"after,omitempty"` + Duration *string `json:"duration,omitempty"` + Reverse *bool `json:"reverse,omitempty"` +} + +// Settings for configuring log aggregation throughout Plural +type LoggingSettings struct { + Enabled *bool `json:"enabled,omitempty"` + // the type of log aggregation solution you wish to use + Driver *LogDriver `json:"driver,omitempty"` + // configures a connection to victoria metrics + Victoria *HTTPConnection `json:"victoria,omitempty"` +} + type LoginInfo struct { OidcURI *string `json:"oidcUri,omitempty"` External *bool `json:"external,omitempty"` @@ -7253,6 +7282,47 @@ func (e ListMerge) MarshalGQL(w io.Writer) { fmt.Fprint(w, strconv.Quote(e.String())) } +type LogDriver string + +const ( + LogDriverVictoria LogDriver = "VICTORIA" + LogDriverElastic LogDriver = "ELASTIC" +) + +var AllLogDriver = []LogDriver{ + LogDriverVictoria, + LogDriverElastic, +} + +func (e LogDriver) IsValid() bool { + switch e { + case LogDriverVictoria, LogDriverElastic: + return true + } + return false +} + +func (e LogDriver) String() string { + return string(e) +} + +func (e *LogDriver) UnmarshalGQL(v interface{}) error { + str, ok := v.(string) + if !ok { + return fmt.Errorf("enums must be strings") + } + + *e = LogDriver(str) + if !e.IsValid() { + return fmt.Errorf("%s is not a valid LogDriver", str) + } + return nil +} + +func (e LogDriver) MarshalGQL(w io.Writer) { + fmt.Fprint(w, strconv.Quote(e.String())) +} + type MatchStrategy string const ( diff --git a/lib/console/deployments/pipelines.ex b/lib/console/deployments/pipelines.ex index 912f279540..c285df7b0a 100644 --- a/lib/console/deployments/pipelines.ex +++ b/lib/console/deployments/pipelines.ex @@ -221,15 +221,19 @@ defmodule Console.Deployments.Pipelines do Whether all promotion gates for this edge are currently open """ @spec open?(PipelineEdge.t, PipelinePromotion.t) :: boolean - def open?(%PipelineEdge{gates: [_ | _] = gates}, %PipelinePromotion{revised_at: r}) when not is_nil(r) do + def open?(%PipelineEdge{gates: [_ | _] = gates}, %PipelinePromotion{revised_at: r} = promo) when not is_nil(r) do Enum.all?(gates, fn %PipelineGate{state: :open} = g -> - Timex.after?(coalesce(g.updated_at, g.inserted_at), r) + ts = coalesce(g.updated_at, g.inserted_at) + Timex.after?(ts, r) && after_context?(ts, promo) _ -> false end) end def open?(_, _), do: true + defp after_context?(ts, %PipelinePromotion{context: %PipelineContext{inserted_at: at}}), do: Timex.after?(ts, at) + defp after_context?(_, _), do: true + @doc """ Whether an edge was promoted after the given dt """ @@ -396,7 +400,7 @@ defmodule Console.Deployments.Pipelines do @spec apply_promotion(PipelinePromotion.t) :: promotion_resp def apply_promotion(%PipelinePromotion{} = promo) do start_transaction() - |> add_operation(:promo, fn _ -> {:ok, Repo.preload(promo, [:stage, services: [:service, :revision]])} end) + |> add_operation(:promo, fn _ -> {:ok, Repo.preload(promo, [:stage, :context, services: [:service, :revision]])} end) |> add_operation(:edges, fn %{promo: %{stage: stage}} -> {:ok, edges(stage)} end) |> add_operation(:resolve, fn %{promo: promotion, edges: edges} -> Enum.filter(edges, &open?(&1, promotion)) diff --git a/lib/console/graphql/deployments/settings.ex b/lib/console/graphql/deployments/settings.ex index b57c79df77..2482ec6181 100644 --- a/lib/console/graphql/deployments/settings.ex +++ b/lib/console/graphql/deployments/settings.ex @@ -2,8 +2,10 @@ defmodule Console.GraphQl.Deployments.Settings do use Console.GraphQl.Schema.Base alias Console.Deployments.Settings alias Console.GraphQl.Resolvers.{Deployments} + alias Console.Schema.DeploymentSettings - ecto_enum :ai_provider, Console.Schema.DeploymentSettings.AIProvider + ecto_enum :ai_provider, DeploymentSettings.AIProvider + ecto_enum :log_driver, DeploymentSettings.LogDriver input_object :project_attributes do field :name, non_null(:string) @@ -49,6 +51,12 @@ defmodule Console.GraphQl.Deployments.Settings do field :recommendation_cushion, :integer, description: "the percentage change needed to generate a recommendation, default 20%" end + input_object :logging_settings_attributes do + field :enabled, :boolean + field :driver, :log_driver + field :victoria, :http_connection_attributes + end + input_object :ai_settings_attributes do field :enabled, :boolean field :tools, :tool_config_attributes @@ -154,6 +162,7 @@ defmodule Console.GraphQl.Deployments.Settings do field :smtp, :smtp_settings, description: "smtp server configuration for email notifications" field :ai, :ai_settings, description: "settings for LLM provider clients" field :cost, :cost_settings, description: "settings for cost management" + field :logging, :logging_settings, description: "settings for connections to log aggregation datastores" field :agent_vsn, non_null(:string), description: "The console's expected agent version", resolve: fn _, _, _ -> {:ok, Settings.agent_vsn()} end @@ -259,6 +268,13 @@ defmodule Console.GraphQl.Deployments.Settings do field :location, non_null(:string), description: "the gcp region the model" end + @desc "Settings for configuring log aggregation throughout Plural" + object :logging_settings do + field :enabled, :boolean + field :driver, :log_driver, description: "the type of log aggregation solution you wish to use" + field :victoria, :http_connection, description: "configures a connection to victoria metrics" + end + connection node_type: :project object :settings_queries do diff --git a/lib/console/graphql/observability.ex b/lib/console/graphql/observability.ex index 28d3f16ca1..cddf7869d6 100644 --- a/lib/console/graphql/observability.ex +++ b/lib/console/graphql/observability.ex @@ -13,6 +13,13 @@ defmodule Console.GraphQl.Observability do field :value, :string end + input_object :log_time_range do + field :before, :string + field :after, :string + field :duration, :string + field :reverse, :boolean + end + object :dashboard do field :id, non_null(:string), resolve: fn %{metadata: %{name: n}}, _, _ -> {:ok, n} end field :spec, non_null(:dashboard_spec) @@ -50,6 +57,17 @@ defmodule Console.GraphQl.Observability do end end + object :log_line do + field :timestamp, :datetime + field :log, non_null(:string) + field :facets, list_of(:log_facet) + end + + object :log_facet do + field :key, non_null(:string) + field :value, non_null(:string) + end + object :observability_queries do field :dashboards, list_of(:dashboard) do middleware Authenticated @@ -83,6 +101,17 @@ defmodule Console.GraphQl.Observability do safe_resolve &Observability.resolve_metric/2 end + field :log_aggregation, list_of(:log_line) do + middleware Authenticated + arg :service_id, :id + arg :cluster_id, :id + arg :query, :string + arg :time, :log_time_range + arg :limit, :integer + + resolve &Observability.list_logs/2 + end + field :logs, list_of(:log_stream) do middleware Authenticated diff --git a/lib/console/graphql/resolvers/observability.ex b/lib/console/graphql/resolvers/observability.ex index a1b08b1cad..e1ae6b2aa6 100644 --- a/lib/console/graphql/resolvers/observability.ex +++ b/lib/console/graphql/resolvers/observability.ex @@ -1,5 +1,7 @@ defmodule Console.GraphQl.Resolvers.Observability do alias Console.Services.Observability + alias Console.Logs.{Provider, Query} + @default_offset 30 * 60 @nano 1_000_000_000 @@ -16,6 +18,12 @@ defmodule Console.GraphQl.Resolvers.Observability do end end + def list_logs(args, %{context: %{current_user: user}}) do + query = Query.new(args) + with {:ok, query} <- Provider.accessible(query, user), + do: Provider.query(query) + end + def resolve_logs(%{query: query, limit: limit} = args, _) do now = Timex.now() start = (args[:start] || ts(now)) / @nano diff --git a/lib/console/logs/line.ex b/lib/console/logs/line.ex new file mode 100644 index 0000000000..6cb6ba6517 --- /dev/null +++ b/lib/console/logs/line.ex @@ -0,0 +1,9 @@ +defmodule Console.Logs.Line do + @type t :: %__MODULE__{facets: [%{key: binary, value: binary}]} + + defstruct [:timestamp, :log, :facets] + + def new(map) do + %__MODULE__{log: map[:log], timestamp: map[:timestamp], facets: map[:facets]} + end +end diff --git a/lib/console/logs/provider.ex b/lib/console/logs/provider.ex new file mode 100644 index 0000000000..6acfa0a64f --- /dev/null +++ b/lib/console/logs/provider.ex @@ -0,0 +1,27 @@ +defmodule Console.Logs.Provider do + alias Console.Logs.{Query, Line} + alias Console.Logs.Provider.{Victoria} + alias Console.Schema.{User, DeploymentSettings} + + @type error :: Console.error + + @callback query(struct, Query.t) :: {:ok, [Line.t]} | error + + @spec query(Query.t) :: {:ok, [Line.t]} | error + def query(%Query{} = q) do + with {:ok, %{__struct__: provider} = prov} <- client(), + do: provider.query(prov, q) + end + + @spec accessible(Query.t, User.t) :: {:ok, Query.t} | error + def accessible(%Query{} = query, %User{} = user), do: Query.accessible(query, user) + + defp client() do + Console.Deployments.Settings.cached() + |> client() + end + + def client(%DeploymentSettings{logging: %{enabled: true, driver: :victoria, victoria: %{} = victoria}}), + do: {:ok, Victoria.new(victoria)} + def client(_), do: {:error, "Plural logging integration not yet configured"} +end diff --git a/lib/console/logs/provider/utils.ex b/lib/console/logs/provider/utils.ex new file mode 100644 index 0000000000..e0a1f7243d --- /dev/null +++ b/lib/console/logs/provider/utils.ex @@ -0,0 +1,6 @@ +defmodule Console.Logs.Provider.Utils do + def facets(%{} = map) do + Enum.map(map, fn {k, v} -> %{key: k, value: v} end) + end + def facets(_), do: nil +end diff --git a/lib/console/logs/provider/victoria.ex b/lib/console/logs/provider/victoria.ex new file mode 100644 index 0000000000..9f1c867dfb --- /dev/null +++ b/lib/console/logs/provider/victoria.ex @@ -0,0 +1,66 @@ +defmodule Console.Logs.Provider.Victoria do + @moduledoc """ + Log driver implementation for victoria metrics + """ + @behaviour Console.Logs.Provider + import Console.Logs.Provider.Utils + alias Console.Logs.{Query, Time, Line, Stream.Exec} + alias Console.Schema.{Cluster, Service, DeploymentSettings.Connection} + + @headers [{"content-type", "application/x-www-form-urlencoded"}] + + defstruct [:connection] + + def new(conn), do: %__MODULE__{connection: conn} + + def query(%__MODULE__{connection: %Connection{host: host} = conn}, %Query{} = query) when is_binary(host) do + Path.join(host, "/select/logsql/query") + |> HTTPoison.post({:form, [ + {"query", build_query(query)}, + {"limit", "#{Query.limit(query)}"} + ]}, headers(conn), [stream_to: self(), async: :once]) + |> Exec.exec(mapper: &line/1) + end + def query(_, _), do: {:error, "no victoria metrics host specified"} + + defp build_query(%Query{query: query} = q) do + add_resource([query], q) + |> add_time(q) + |> maybe_reverse(q) + |> Enum.join(" ") + end + + defp line(%{"_msg" => log, "_time" => time} = line) do + facets = + Map.drop(line, ~w(_msg _time _stream)) + |> Map.merge(line["_stream"] || %{}) + |> facets() + + %Line{ + log: log, + timestamp: Timex.parse!(time, "{ISO:Extended}"), + facets: facets + } + end + + defp headers(%Connection{user: u, password: p}) when is_binary(u) and is_binary(p), + do: [{"Authorization", Plug.BasicAuth.encode_basic_auth(u, p)} | @headers] + defp headers(_), do: @headers + + defp add_resource(io, %Query{resource: %Cluster{} = cluster}), + do: [cluster_label(cluster) | io] + defp add_resource(io, %Query{resource: %Service{namespace: ns} = svc}) do + %{cluster: %Cluster{} = cluster} = Console.Repo.preload(svc, [:cluster]) + [cluster_label(cluster), ~s("namespace":#{ns}) | io] + end + + defp cluster_label(%Cluster{handle: h}), do: ~s("cluster":"#{h}") + + defp add_time(io, %Query{time: %Time{after: a, before: b}}) when is_binary(a) and is_binary(b), + do: [~s(_time:[#{a}, #{b}]) | io] + defp add_time(io, %Query{time: %Time{duration: d}}) when is_binary(d), + do: [~s(_time:#{d}) | io] + + defp maybe_reverse(io, %Query{time: %Time{reverse: true}}), do: io ++ ["| sort by (_time desc)"] + defp maybe_reverse(io, _), do: io +end diff --git a/lib/console/logs/query.ex b/lib/console/logs/query.ex new file mode 100644 index 0000000000..93c708ef8a --- /dev/null +++ b/lib/console/logs/query.ex @@ -0,0 +1,47 @@ +defmodule Console.Logs.Query do + alias Console.Logs.Time + alias Console.Schema.{User, Project, Cluster, Service} + alias Console.Deployments.Policies + + @default_limit 200 + + @type t :: %__MODULE__{time: Time.t} + + defstruct [:project_id, :cluster_id, :service_id, :query, :limit, :resource, :time] + + def new(args) do + %__MODULE__{ + project_id: args[:project_id], + cluster_id: args[:cluster_id], + service_id: args[:service_id], + query: args[:query], + limit: args[:limit], + time: Time.new(args) + } + end + + def limit(%__MODULE__{limit: l}) when is_integer(l), do: l + def limit(_), do: @default_limit + + def accessible(q, %User{roles: %{admin: true}}), do: {:ok, q} + + def accessible(%__MODULE__{project_id: project_id} = q, %User{} = user) when is_binary(project_id), + do: check_access(Project, project_id, user, q) + + def accessible(%__MODULE__{cluster_id: id} = q, %User{} = user) when is_binary(id), + do: check_access(Cluster, id, user, q) + + def accessible(%__MODULE__{service_id: id} = q, %User{} = user) when is_binary(id), + do: check_access(Service, id, user, q) + + def accessible(_, _), do: {:error, "forbidden"} + + defp check_access(model, id, user, query) do + Console.Repo.get!(model, id) + |> Policies.allow(user, :read) + |> case do + {:ok, resource} -> {:ok, %{query | resource: resource}} + err -> err + end + end +end diff --git a/lib/console/logs/stream/exec.ex b/lib/console/logs/stream/exec.ex new file mode 100644 index 0000000000..95ec3a7fcd --- /dev/null +++ b/lib/console/logs/stream/exec.ex @@ -0,0 +1,57 @@ +defmodule Console.Logs.Stream.Exec do + alias Console.Logs.Line + + def exec(start, opts \\ []) do + parser = Keyword.get(opts, :parser, Console.Logs.Stream.JsonLine) + mapper = Keyword.get(opts, :mapper, &Line.new/1) + + build_stream(start, parser) + |> Enum.reduce_while([], fn + {:error, error}, _ -> {:halt, {:error, "service error: #{inspect(error)}"}} + {:ok, res}, acc -> {:cont, [mapper.(res) | acc]} + end) + |> case do + l when is_list(l) -> {:ok, Enum.reverse(l)} + {:error, _} = err -> err + end + end + + defp build_stream(start, parser) do + Stream.resource( + start, + fn + {:error, %HTTPoison.Error{} = error} -> {[{:error, error}], :error} + {{:error, err}, _} -> {[{:error, err}], :error} + + {:ok, %HTTPoison.AsyncResponse{}} = resp -> {[], {resp, ""}} + + {{:ok, %HTTPoison.AsyncResponse{id: id} = res}, acc} -> + receive do + %HTTPoison.AsyncStatus{id: ^id, code: code} when code >= 200 and code < 400 -> + {[], stream_next(res, acc)} + + %HTTPoison.AsyncStatus{id: ^id, code: code} -> + {[{:error, "error code: #{code}"}], :error} + + %HTTPoison.AsyncHeaders{id: ^id, headers: _headers} -> + {[], stream_next(res, acc)} + + %HTTPoison.AsyncChunk{chunk: chunk} -> + {items, remaining} = parser.parse(acc <> chunk) + {items, stream_next(res, remaining)} + + %HTTPoison.AsyncEnd{id: ^id} -> + {:halt, res} + end + {:error, _} -> {:halt, :error} + :error -> {:halt, :error} + end, + fn + %{id: id} -> :hackney.stop_async(id) + :error -> :ok + end + ) + end + + defp stream_next(resp, acc), do: {HTTPoison.stream_next(resp), acc} +end diff --git a/lib/console/logs/stream/json_line.ex b/lib/console/logs/stream/json_line.ex new file mode 100644 index 0000000000..1aa3a1861d --- /dev/null +++ b/lib/console/logs/stream/json_line.ex @@ -0,0 +1,10 @@ +defmodule Console.Logs.Stream.JsonLine do + @eol ~r/(\r?\n|\r)/ + + @spec parse(binary) :: {[{:ok, map} | {:error, term}], binary} + def parse(chunk) do + chunks = String.split(chunk, @eol) + {straggle, full} = List.pop_at(chunks, -1) + {Enum.map(full, &Jason.decode/1), straggle} + end +end diff --git a/lib/console/logs/time.ex b/lib/console/logs/time.ex new file mode 100644 index 0000000000..4feca7f47f --- /dev/null +++ b/lib/console/logs/time.ex @@ -0,0 +1,7 @@ +defmodule Console.Logs.Time do + @type t :: %__MODULE__{} + defstruct [:after, :before, :duration, :reverse] + + def new(%{} = args), do: struct(__MODULE__, args) + def new(_), do: nil +end diff --git a/lib/console/schema/deployment_settings.ex b/lib/console/schema/deployment_settings.ex index 73f13131b9..5b5cb6c1b3 100644 --- a/lib/console/schema/deployment_settings.ex +++ b/lib/console/schema/deployment_settings.ex @@ -4,6 +4,7 @@ defmodule Console.Schema.DeploymentSettings do alias Piazza.Ecto.EncryptedString defenum AIProvider, openai: 0, anthropic: 1, ollama: 2, azure: 3, bedrock: 4, vertex: 5 + defenum LogDriver, victoria: 0, elastic: 1 defmodule Connection do use Piazza.Ecto.Schema @@ -54,6 +55,13 @@ defmodule Console.Schema.DeploymentSettings do field :ssl, :boolean end + embeds_one :logging, Logging, on_replace: :update do + field :enabled, :boolean + field :driver, LogDriver + + embeds_one :victoria, Connection, on_replace: :update + end + embeds_one :cost, Cost, on_replace: :update do field :enabled, :boolean field :recommendation_threshold, :integer @@ -159,6 +167,7 @@ defmodule Console.Schema.DeploymentSettings do |> cast_embed(:smtp, with: &smtp_changeset/2) |> cast_embed(:stacks, with: &stacks_changeset/2) |> cast_embed(:cost, with: &cost_changeset/2) + |> cast_embed(:logging, with: &logging_changeset/2) |> change_markers(agent_helm_values: :helm_changed, agent_version: :version_changed) |> put_new_change(:write_policy_id, &Ecto.UUID.generate/0) |> put_new_change(:read_policy_id, &Ecto.UUID.generate/0) @@ -241,4 +250,10 @@ defmodule Console.Schema.DeploymentSettings do model |> cast(attrs, ~w(enabled recommendation_threshold recommendation_cushion)a) end + + defp logging_changeset(model, attrs) do + model + |> cast(attrs, [:enabled]) + |> cast_embed(:victoria) + end end diff --git a/priv/repo/migrations/20241216181834_add_log_driver_support.exs b/priv/repo/migrations/20241216181834_add_log_driver_support.exs new file mode 100644 index 0000000000..1c96526767 --- /dev/null +++ b/priv/repo/migrations/20241216181834_add_log_driver_support.exs @@ -0,0 +1,9 @@ +defmodule Console.Repo.Migrations.AddLogDriverSupport do + use Ecto.Migration + + def change do + alter table(:deployment_settings) do + add :logging, :map + end + end +end diff --git a/schema/schema.graphql b/schema/schema.graphql index 8be4b25f97..16bf473b48 100644 --- a/schema/schema.graphql +++ b/schema/schema.graphql @@ -59,6 +59,8 @@ type RootQueryType { metric(query: String!, offset: Int, step: String, clusterId: ID): [MetricResponse] + logAggregation(serviceId: ID, clusterId: ID, query: String, time: LogTimeRange, limit: Int): [LogLine] + logs(query: String!, start: Long, end: Long, limit: Int!, clusterId: ID): [LogStream] scalingRecommendation(kind: AutoscalingTarget!, namespace: String!, name: String!): VerticalPodAutoscaler @@ -1072,6 +1074,11 @@ enum AiProvider { VERTEX } +enum LogDriver { + VICTORIA + ELASTIC +} + input ProjectAttributes { name: String! description: String @@ -1321,6 +1328,9 @@ type DeploymentSettings { "settings for cost management" cost: CostSettings + "settings for connections to log aggregation datastores" + logging: LoggingSettings + "The console's expected agent version" agentVsn: String! @@ -1483,6 +1493,17 @@ type VertexAiSettings { location: String! } +"Settings for configuring log aggregation throughout Plural" +type LoggingSettings { + enabled: Boolean + + "the type of log aggregation solution you wish to use" + driver: LogDriver + + "configures a connection to victoria metrics" + victoria: HttpConnection +} + type ProjectConnection { pageInfo: PageInfo! edges: [ProjectEdge] @@ -7009,6 +7030,13 @@ input LabelInput { value: String } +input LogTimeRange { + before: String + after: String + duration: String + reverse: Boolean +} + type Dashboard { id: String! spec: DashboardSpec! @@ -7044,6 +7072,17 @@ type MetricResponse { values: [MetricResult] } +type LogLine { + timestamp: DateTime + log: String! + facets: [LogFacet] +} + +type LogFacet { + key: String! + value: String! +} + type Metadata { labels: [LabelPair] annotations: [LabelPair] diff --git a/test/console/graphql/queries/observability_queries_test.exs b/test/console/graphql/queries/observability_queries_test.exs index e0ede1dc91..32e7b1a415 100644 --- a/test/console/graphql/queries/observability_queries_test.exs +++ b/test/console/graphql/queries/observability_queries_test.exs @@ -177,6 +177,33 @@ defmodule Console.GraphQl.ObservabilityQueriesTest do end end + describe "logAggregation" do + test "it can fetch from a logs db" do + user = insert(:user) + svc = insert(:service, read_bindings: [%{user_id: user.id}]) + expect(Console.Logs.Provider, :query, fn _ -> {:ok, [log_line("a log")]} end) + + {:ok, %{data: %{"logAggregation" => [line]}}} = run_query(""" + query Logs($serviceId: ID!) { + logAggregation(serviceId: $serviceId) { timestamp log } + } + """, %{"serviceId" => svc.id}, %{current_user: user}) + + assert line["log"] == "a log" + end + + test "it will authz" do + user = insert(:user) + svc = insert(:service) + + {:ok, %{errors: [_ | _]}} = run_query(""" + query Logs($serviceId: ID!) { + logAggregation(serviceId: $serviceId) { timestamp log } + } + """, %{"serviceId" => svc.id}, %{current_user: user}) + end + end + describe "scalingRecommendation" do test "it can fetch a vpa to provide recommendations" do user = insert(:user) diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex index 2ede28058e..e73007a749 100644 --- a/test/support/test_helpers.ex +++ b/test/support/test_helpers.ex @@ -54,5 +54,7 @@ defmodule Console.TestHelpers do def run_query(query, variables, context \\ %{}), do: Absinthe.run(query, Console.GraphQl, variables: variables, context: context) + def log_line(log), do: %Console.Logs.Line{timestamp: Timex.now(), log: log} + def from_connection(%{"edges" => edges}), do: Enum.map(edges, & &1["node"]) end diff --git a/test/test_helper.exs b/test/test_helper.exs index b52d327a38..1c9a7017a3 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -12,6 +12,7 @@ Mimic.copy(Console.Buffers.TokenAudit) Mimic.copy(Console.Deployments.Git.Discovery) Mimic.copy(Console.Deployments.Pipelines.Discovery) Mimic.copy(Console.Deployments.Pipelines) +Mimic.copy(Console.Logs.Provider) Mimic.copy(Console) Mimic.copy(Kazan) Mimic.copy(HTTPoison) From 15281df812ea9e4b992d8665b8bbbf9dce191dd4 Mon Sep 17 00:00:00 2001 From: michaeljguarino Date: Tue, 17 Dec 2024 09:40:13 -0500 Subject: [PATCH 2/2] fix cost ingest mutation --- assets/src/generated/graphql.ts | 1 + go/client/models_gen.go | 1 + .../servicedeployment_controller.go | 1 + lib/console/cost/ingester.ex | 31 ++++++++++--------- lib/console/deployments/pr/validation.ex | 18 +++++------ lib/console/deployments/services.ex | 2 +- lib/console/graphql/deployments/service.ex | 1 + schema/schema.graphql | 2 ++ .../deployments/cluster_mutations_test.exs | 17 ++++++++-- .../deployments/cluster_queries_test.exs | 8 ++--- 10 files changed, 49 insertions(+), 33 deletions(-) diff --git a/assets/src/generated/graphql.ts b/assets/src/generated/graphql.ts index e26306b397..1a77bf26c7 100644 --- a/assets/src/generated/graphql.ts +++ b/assets/src/generated/graphql.ts @@ -8804,6 +8804,7 @@ export type ServiceUpdateAttributes = { dryRun?: InputMaybe; git?: InputMaybe; helm?: InputMaybe; + imports?: InputMaybe>>; interval?: InputMaybe; kustomize?: InputMaybe; parentId?: InputMaybe; diff --git a/go/client/models_gen.go b/go/client/models_gen.go index 551bb80cbd..3a380d996c 100644 --- a/go/client/models_gen.go +++ b/go/client/models_gen.go @@ -5261,6 +5261,7 @@ type ServiceUpdateAttributes struct { ReadBindings []*PolicyBindingAttributes `json:"readBindings,omitempty"` WriteBindings []*PolicyBindingAttributes `json:"writeBindings,omitempty"` ContextBindings []*ContextBindingAttributes `json:"contextBindings,omitempty"` + Imports []*ServiceImportAttributes `json:"imports,omitempty"` } type ServiceVuln struct { diff --git a/go/controller/internal/controller/servicedeployment_controller.go b/go/controller/internal/controller/servicedeployment_controller.go index 4da364d95f..79a737af3a 100644 --- a/go/controller/internal/controller/servicedeployment_controller.go +++ b/go/controller/internal/controller/servicedeployment_controller.go @@ -182,6 +182,7 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ SyncConfig: attr.SyncConfig, Dependencies: attr.Dependencies, ParentID: attr.ParentID, + Imports: attr.Imports, } sha, err := utils.HashObject(updater) diff --git a/lib/console/cost/ingester.ex b/lib/console/cost/ingester.ex index efc23a08c2..b821a12159 100644 --- a/lib/console/cost/ingester.ex +++ b/lib/console/cost/ingester.ex @@ -1,15 +1,18 @@ defmodule Console.Cost.Ingester do alias Console.Repo import Console.Services.Base - import Console.Cost.Utils, only: [batch_insert: 3] + import Console.Cost.Utils, only: [batch_insert: 2] alias Console.Schema.{Cluster, ClusterUsage, ClusterNamespaceUsage, ClusterScalingRecommendation} def ingest(attrs, %Cluster{id: id}) do start_transaction() - |> add_operation(:wipe_cluster, fn _ -> - ClusterUsage.for_cluster(id) - |> Repo.delete_all() - |> ok() + |> add_operation(:cluster, fn _ -> + case Repo.get_by(ClusterUsage, cluster_id: id) do + %ClusterUsage{} = usage -> usage + nil -> %ClusterUsage{cluster_id: id} + end + |> ClusterUsage.changeset(attrs[:cluster]) + |> Repo.insert_or_update() end) |> add_operation(:wipe_namespace, fn _ -> ClusterNamespaceUsage.for_cluster(id) @@ -21,22 +24,17 @@ defmodule Console.Cost.Ingester do |> Repo.delete_all() |> ok() end) - |> add_operation(:cluster, fn _ -> - %ClusterUsage{id: id} - |> ClusterUsage.changeset(attrs[:cluster]) - |> Repo.insert() - end) |> add_operation(:namespace, fn _ -> Map.get(attrs, :namespaces, []) - |> Stream.map(×tamped/1) + |> Stream.map(&cluster_timestamped(&1, id)) |> Stream.map(&Map.drop(&1, ~w(gpu_util)a)) - |> batch_insert(ClusterNamespaceUsage, repo: Repo) + |> batch_insert(ClusterNamespaceUsage) |> ok() end) |> add_operation(:scaling, fn _ -> Map.get(attrs, :recommendations, []) - |> Stream.map(×tamped/1) - |> batch_insert(ClusterScalingRecommendation, repo: Repo) + |> Stream.map(&cluster_timestamped(&1, id)) + |> batch_insert(ClusterScalingRecommendation) |> ok() end) |> execute() @@ -45,4 +43,9 @@ defmodule Console.Cost.Ingester do err -> err end end + + defp cluster_timestamped(map, cluster_id) do + timestamped(map) + |> Map.put(:cluster_id, cluster_id) + end end diff --git a/lib/console/deployments/pr/validation.ex b/lib/console/deployments/pr/validation.ex index 6036e38657..00f624c571 100644 --- a/lib/console/deployments/pr/validation.ex +++ b/lib/console/deployments/pr/validation.ex @@ -16,10 +16,10 @@ defmodule Console.Deployments.Pr.Validation do defp do_validate(%Configuration{type: :int}, val) when is_integer(val), do: :ok defp do_validate(%Configuration{type: :bool}, val) when is_boolean(val), do: :ok - defp do_validate(%Configuration{type: :enum, values: vals}, val) do + defp do_validate(%Configuration{type: :enum, name: n, values: vals}, val) do case val in vals do true -> :ok - false -> {:error, "#{inspect(val)} is not a member of {#{Enum.join(vals, ",")}}"} + false -> {:error, ~s(field "#{n}" with value "#{inspect(val)}" is not a member of {#{Enum.join(vals, ",")}})} end end @@ -32,30 +32,30 @@ defmodule Console.Deployments.Pr.Validation do ) when is_binary(val) do query = scope_query(scope) case Repo.get_by(query, name: val) do - %^query{} -> {:error, "there is already a #{scope} with name #{val}"} + %^query{} -> {:error, ~s(there is already a #{scope} with name #{val})} _ -> do_validate(put_in(conf.validation.uniq_by, nil), val) end end - defp do_validate(%Configuration{type: :string, validation: %Validation{json: true}}, val) when is_binary(val) do + defp do_validate(%Configuration{type: :string, name: n, validation: %Validation{json: true}}, val) when is_binary(val) do case Jason.decode(val) do {:ok, _} -> :ok - _ -> {:error, "value #{val} is not a json-encoded string"} + _ -> {:error, ~s(field "#{n}" with value "#{val}" is not a json-encoded string)} end end - defp do_validate(%Configuration{type: :string, validation: %Validation{regex: r}}, val) + defp do_validate(%Configuration{type: :string, name: n, validation: %Validation{regex: r}}, val) when is_binary(r) and is_binary(val) do case String.match?(val, ~r/#{r}/) do true -> :ok - false -> {:error, "value #{val} does not match regex #{r}"} + false -> {:error, ~s(field "#{n}" with value "#{val}" does not match regex #{r})} end end defp do_validate(%Configuration{type: :string}, val) when is_binary(val) and byte_size(val) > 0, do: :ok - defp do_validate(%Configuration{type: t}, val), - do: {:error, "value #{inspect(val)} does not match type #{String.upcase(to_string(t))}"} + defp do_validate(%Configuration{type: t, name: n}, val), + do: {:error, ~s(field "#{n}" with value "#{inspect(val)}" does not match type #{String.upcase(to_string(t))})} defp scope_query(:project), do: Project defp scope_query(:cluster), do: Cluster diff --git a/lib/console/deployments/services.ex b/lib/console/deployments/services.ex index 0ee5ae4bb3..b0d046e6b8 100644 --- a/lib/console/deployments/services.ex +++ b/lib/console/deployments/services.ex @@ -475,7 +475,7 @@ defmodule Console.Deployments.Services do def update_service(attrs, %Service{} = svc) do start_transaction() |> add_operation(:base, fn _ -> - svc = Repo.preload(svc, [:context_bindings, :dependencies, :read_bindings, :write_bindings]) + svc = Repo.preload(svc, [:context_bindings, :dependencies, :read_bindings, :write_bindings, :imports]) attrs = Map.put(attrs, :status, :stale) svc |> Service.changeset(stabilize_deps(attrs, svc)) diff --git a/lib/console/graphql/deployments/service.ex b/lib/console/graphql/deployments/service.ex index 91bb243cbb..24f4131ead 100644 --- a/lib/console/graphql/deployments/service.ex +++ b/lib/console/graphql/deployments/service.ex @@ -80,6 +80,7 @@ defmodule Console.GraphQl.Deployments.Service do field :read_bindings, list_of(:policy_binding_attributes) field :write_bindings, list_of(:policy_binding_attributes) field :context_bindings, list_of(:context_binding_attributes) + field :imports, list_of(:service_import_attributes) end input_object :service_clone_attributes do diff --git a/schema/schema.graphql b/schema/schema.graphql index 16bf473b48..10e07c0e60 100644 --- a/schema/schema.graphql +++ b/schema/schema.graphql @@ -3818,6 +3818,8 @@ input ServiceUpdateAttributes { writeBindings: [PolicyBindingAttributes] contextBindings: [ContextBindingAttributes] + + imports: [ServiceImportAttributes] } input ServiceCloneAttributes { diff --git a/test/console/graphql/mutations/deployments/cluster_mutations_test.exs b/test/console/graphql/mutations/deployments/cluster_mutations_test.exs index aedae9445a..a329789894 100644 --- a/test/console/graphql/mutations/deployments/cluster_mutations_test.exs +++ b/test/console/graphql/mutations/deployments/cluster_mutations_test.exs @@ -500,9 +500,20 @@ defmodule Console.GraphQl.Deployments.ClusterMutationsTest do } """, %{"costs" => ingest}, %{cluster: cluster}) - assert Console.Repo.aggregate(Console.Schema.ClusterUsage, :count, :id) == 1 - assert Console.Repo.aggregate(Console.Schema.ClusterNamespaceUsage, :count, :id) == 1 - assert Console.Repo.aggregate(Console.Schema.ClusterScalingRecommendation, :count, :id) == 1 + {:ok, %{data: %{"ingestClusterCost" => true}}} = run_query(""" + mutation Ingest($costs: CostIngestAttributes!) { + ingestClusterCost(costs: $costs) + } + """, %{"costs" => ingest}, %{cluster: cluster}) + + [usage] = Console.Repo.all(Console.Schema.ClusterUsage) + assert usage.cluster_id == cluster.id + + [ns] = Console.Repo.all(Console.Schema.ClusterNamespaceUsage) + assert ns.cluster_id == cluster.id + + [sr] = Console.Repo.all(Console.Schema.ClusterScalingRecommendation) + assert sr.cluster_id == cluster.id end end end diff --git a/test/console/graphql/queries/deployments/cluster_queries_test.exs b/test/console/graphql/queries/deployments/cluster_queries_test.exs index 80fe5b5f13..daec6435aa 100644 --- a/test/console/graphql/queries/deployments/cluster_queries_test.exs +++ b/test/console/graphql/queries/deployments/cluster_queries_test.exs @@ -405,16 +405,13 @@ defmodule Console.GraphQl.Deployments.ClusterQueriesTest do query Runtime($id: ID!) { runtimeService(id: $id) { id - addon { - versions { version kube } - readme - } + addon { versions { version kube } } } } """, %{"id" => runtime.id}, %{current_user: user}) assert rs["id"] == runtime.id - assert rs["addon"]["readme"] + # assert rs["addon"]["readme"] end test "users w/o cluster read cannot fetch a runtime service by id" do @@ -428,7 +425,6 @@ defmodule Console.GraphQl.Deployments.ClusterQueriesTest do id addon { versions { version kube } - readme } } }