Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pluggable log aggregation drivers #1690

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions assets/src/generated/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2351,6 +2351,8 @@ export type DeploymentSettings = {
insertedAt?: Maybe<Scalars['DateTime']['output']>;
/** the latest known k8s version */
latestK8sVsn: Scalars['String']['output'];
/** settings for connections to log aggregation datastores */
logging?: Maybe<LoggingSettings>;
/** the way we can connect to your loki instance */
lokiConnection?: Maybe<HttpConnection>;
name: Scalars['String']['output'];
Expand Down Expand Up @@ -3305,6 +3307,17 @@ export type LoadBalancerStatus = {
ingress?: Maybe<Array<Maybe<LoadBalancerIngressStatus>>>;
};

export enum LogDriver {
Elastic = 'ELASTIC',
Victoria = 'VICTORIA'
}

export type LogFacet = {
__typename?: 'LogFacet';
key: Scalars['String']['output'];
value: Scalars['String']['output'];
};

export type LogFilter = {
__typename?: 'LogFilter';
metadata: Metadata;
Expand All @@ -3325,12 +3338,36 @@ export type LogLabel = {
value?: Maybe<Scalars['String']['output']>;
};

export type LogLine = {
__typename?: 'LogLine';
facets?: Maybe<Array<Maybe<LogFacet>>>;
log: Scalars['String']['output'];
timestamp?: Maybe<Scalars['DateTime']['output']>;
};

export type LogStream = {
__typename?: 'LogStream';
stream?: Maybe<Scalars['Map']['output']>;
values?: Maybe<Array<Maybe<MetricResult>>>;
};

export type LogTimeRange = {
after?: InputMaybe<Scalars['String']['input']>;
before?: InputMaybe<Scalars['String']['input']>;
duration?: InputMaybe<Scalars['String']['input']>;
reverse?: InputMaybe<Scalars['Boolean']['input']>;
};

/** Settings for configuring log aggregation throughout Plural */
export type LoggingSettings = {
__typename?: 'LoggingSettings';
/** the type of log aggregation solution you wish to use */
driver?: Maybe<LogDriver>;
enabled?: Maybe<Scalars['Boolean']['output']>;
/** configures a connection to victoria metrics */
victoria?: Maybe<HttpConnection>;
};

export type LoginInfo = {
__typename?: 'LoginInfo';
external?: Maybe<Scalars['Boolean']['output']>;
Expand Down Expand Up @@ -6802,6 +6839,7 @@ export type RootQueryType = {
installations?: Maybe<InstallationConnection>;
invite?: Maybe<Invite>;
job?: Maybe<Job>;
logAggregation?: Maybe<Array<Maybe<LogLine>>>;
logFilters?: Maybe<Array<Maybe<LogFilter>>>;
loginInfo?: Maybe<LoginInfo>;
logs?: Maybe<Array<Maybe<LogStream>>>;
Expand Down Expand Up @@ -7358,6 +7396,15 @@ export type RootQueryTypeJobArgs = {
};


export type RootQueryTypeLogAggregationArgs = {
clusterId?: InputMaybe<Scalars['ID']['input']>;
limit?: InputMaybe<Scalars['Int']['input']>;
query?: InputMaybe<Scalars['String']['input']>;
serviceId?: InputMaybe<Scalars['ID']['input']>;
time?: InputMaybe<LogTimeRange>;
};


export type RootQueryTypeLogFiltersArgs = {
namespace: Scalars['String']['input'];
};
Expand Down Expand Up @@ -8757,6 +8804,7 @@ export type ServiceUpdateAttributes = {
dryRun?: InputMaybe<Scalars['Boolean']['input']>;
git?: InputMaybe<GitRefAttributes>;
helm?: InputMaybe<HelmConfigAttributes>;
imports?: InputMaybe<Array<InputMaybe<ServiceImportAttributes>>>;
interval?: InputMaybe<Scalars['String']['input']>;
kustomize?: InputMaybe<KustomizeAttributes>;
parentId?: InputMaybe<Scalars['ID']['input']>;
Expand Down
71 changes: 71 additions & 0 deletions go/client/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_
SyncConfig: attr.SyncConfig,
Dependencies: attr.Dependencies,
ParentID: attr.ParentID,
Imports: attr.Imports,
}

sha, err := utils.HashObject(updater)
Expand Down
31 changes: 17 additions & 14 deletions lib/console/cost/ingester.ex
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
defmodule Console.Cost.Ingester do
alias Console.Repo
import Console.Services.Base
import Console.Cost.Utils, only: [batch_insert: 3]
import Console.Cost.Utils, only: [batch_insert: 2]
alias Console.Schema.{Cluster, ClusterUsage, ClusterNamespaceUsage, ClusterScalingRecommendation}

def ingest(attrs, %Cluster{id: id}) do
start_transaction()
|> add_operation(:wipe_cluster, fn _ ->
ClusterUsage.for_cluster(id)
|> Repo.delete_all()
|> ok()
|> add_operation(:cluster, fn _ ->
case Repo.get_by(ClusterUsage, cluster_id: id) do
%ClusterUsage{} = usage -> usage
nil -> %ClusterUsage{cluster_id: id}
end
|> ClusterUsage.changeset(attrs[:cluster])
|> Repo.insert_or_update()
end)
|> add_operation(:wipe_namespace, fn _ ->
ClusterNamespaceUsage.for_cluster(id)
Expand All @@ -21,22 +24,17 @@ defmodule Console.Cost.Ingester do
|> Repo.delete_all()
|> ok()
end)
|> add_operation(:cluster, fn _ ->
%ClusterUsage{id: id}
|> ClusterUsage.changeset(attrs[:cluster])
|> Repo.insert()
end)
|> add_operation(:namespace, fn _ ->
Map.get(attrs, :namespaces, [])
|> Stream.map(&timestamped/1)
|> Stream.map(&cluster_timestamped(&1, id))
|> Stream.map(&Map.drop(&1, ~w(gpu_util)a))
|> batch_insert(ClusterNamespaceUsage, repo: Repo)
|> batch_insert(ClusterNamespaceUsage)
|> ok()
end)
|> add_operation(:scaling, fn _ ->
Map.get(attrs, :recommendations, [])
|> Stream.map(&timestamped/1)
|> batch_insert(ClusterScalingRecommendation, repo: Repo)
|> Stream.map(&cluster_timestamped(&1, id))
|> batch_insert(ClusterScalingRecommendation)
|> ok()
end)
|> execute()
Expand All @@ -45,4 +43,9 @@ defmodule Console.Cost.Ingester do
err -> err
end
end

defp cluster_timestamped(map, cluster_id) do
timestamped(map)
|> Map.put(:cluster_id, cluster_id)
end
end
10 changes: 7 additions & 3 deletions lib/console/deployments/pipelines.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,19 @@ defmodule Console.Deployments.Pipelines do
Whether all promotion gates for this edge are currently open
"""
@spec open?(PipelineEdge.t, PipelinePromotion.t) :: boolean
def open?(%PipelineEdge{gates: [_ | _] = gates}, %PipelinePromotion{revised_at: r}) when not is_nil(r) do
def open?(%PipelineEdge{gates: [_ | _] = gates}, %PipelinePromotion{revised_at: r} = promo) when not is_nil(r) do
Enum.all?(gates, fn
%PipelineGate{state: :open} = g ->
Timex.after?(coalesce(g.updated_at, g.inserted_at), r)
ts = coalesce(g.updated_at, g.inserted_at)
Timex.after?(ts, r) && after_context?(ts, promo)
_ -> false
end)
end
def open?(_, _), do: true

defp after_context?(ts, %PipelinePromotion{context: %PipelineContext{inserted_at: at}}), do: Timex.after?(ts, at)
defp after_context?(_, _), do: true

@doc """
Whether an edge was promoted after the given dt
"""
Expand Down Expand Up @@ -396,7 +400,7 @@ defmodule Console.Deployments.Pipelines do
@spec apply_promotion(PipelinePromotion.t) :: promotion_resp
def apply_promotion(%PipelinePromotion{} = promo) do
start_transaction()
|> add_operation(:promo, fn _ -> {:ok, Repo.preload(promo, [:stage, services: [:service, :revision]])} end)
|> add_operation(:promo, fn _ -> {:ok, Repo.preload(promo, [:stage, :context, services: [:service, :revision]])} end)
|> add_operation(:edges, fn %{promo: %{stage: stage}} -> {:ok, edges(stage)} end)
|> add_operation(:resolve, fn %{promo: promotion, edges: edges} ->
Enum.filter(edges, &open?(&1, promotion))
Expand Down
18 changes: 9 additions & 9 deletions lib/console/deployments/pr/validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ defmodule Console.Deployments.Pr.Validation do
defp do_validate(%Configuration{type: :int}, val) when is_integer(val), do: :ok
defp do_validate(%Configuration{type: :bool}, val) when is_boolean(val), do: :ok

defp do_validate(%Configuration{type: :enum, values: vals}, val) do
defp do_validate(%Configuration{type: :enum, name: n, values: vals}, val) do
case val in vals do
true -> :ok
false -> {:error, "#{inspect(val)} is not a member of {#{Enum.join(vals, ",")}}"}
false -> {:error, ~s(field "#{n}" with value "#{inspect(val)}" is not a member of {#{Enum.join(vals, ",")}})}
end
end

Expand All @@ -32,30 +32,30 @@ defmodule Console.Deployments.Pr.Validation do
) when is_binary(val) do
query = scope_query(scope)
case Repo.get_by(query, name: val) do
%^query{} -> {:error, "there is already a #{scope} with name #{val}"}
%^query{} -> {:error, ~s(there is already a #{scope} with name #{val})}
_ -> do_validate(put_in(conf.validation.uniq_by, nil), val)
end
end

defp do_validate(%Configuration{type: :string, validation: %Validation{json: true}}, val) when is_binary(val) do
defp do_validate(%Configuration{type: :string, name: n, validation: %Validation{json: true}}, val) when is_binary(val) do
case Jason.decode(val) do
{:ok, _} -> :ok
_ -> {:error, "value #{val} is not a json-encoded string"}
_ -> {:error, ~s(field "#{n}" with value "#{val}" is not a json-encoded string)}
end
end

defp do_validate(%Configuration{type: :string, validation: %Validation{regex: r}}, val)
defp do_validate(%Configuration{type: :string, name: n, validation: %Validation{regex: r}}, val)
when is_binary(r) and is_binary(val) do
case String.match?(val, ~r/#{r}/) do
true -> :ok
false -> {:error, "value #{val} does not match regex #{r}"}
false -> {:error, ~s(field "#{n}" with value "#{val}" does not match regex #{r})}
end
end

defp do_validate(%Configuration{type: :string}, val)
when is_binary(val) and byte_size(val) > 0, do: :ok
defp do_validate(%Configuration{type: t}, val),
do: {:error, "value #{inspect(val)} does not match type #{String.upcase(to_string(t))}"}
defp do_validate(%Configuration{type: t, name: n}, val),
do: {:error, ~s(field "#{n}" with value "#{inspect(val)}" does not match type #{String.upcase(to_string(t))})}

defp scope_query(:project), do: Project
defp scope_query(:cluster), do: Cluster
Expand Down
2 changes: 1 addition & 1 deletion lib/console/deployments/services.ex
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ defmodule Console.Deployments.Services do
def update_service(attrs, %Service{} = svc) do
start_transaction()
|> add_operation(:base, fn _ ->
svc = Repo.preload(svc, [:context_bindings, :dependencies, :read_bindings, :write_bindings])
svc = Repo.preload(svc, [:context_bindings, :dependencies, :read_bindings, :write_bindings, :imports])
attrs = Map.put(attrs, :status, :stale)
svc
|> Service.changeset(stabilize_deps(attrs, svc))
Expand Down
1 change: 1 addition & 0 deletions lib/console/graphql/deployments/service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ defmodule Console.GraphQl.Deployments.Service do
field :read_bindings, list_of(:policy_binding_attributes)
field :write_bindings, list_of(:policy_binding_attributes)
field :context_bindings, list_of(:context_binding_attributes)
field :imports, list_of(:service_import_attributes)
end

input_object :service_clone_attributes do
Expand Down
Loading
Loading