From a26e39aff74e75d78b3f504d3f1518f7cc245a2f Mon Sep 17 00:00:00 2001 From: michaeljguarino Date: Tue, 17 Dec 2024 09:40:13 -0500 Subject: [PATCH] fix cost ingest mutation --- lib/console/cost/ingester.ex | 31 ++++++++++--------- .../deployments/cluster_mutations_test.exs | 17 ++++++++-- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/lib/console/cost/ingester.ex b/lib/console/cost/ingester.ex index efc23a08c..b821a1215 100644 --- a/lib/console/cost/ingester.ex +++ b/lib/console/cost/ingester.ex @@ -1,15 +1,18 @@ defmodule Console.Cost.Ingester do alias Console.Repo import Console.Services.Base - import Console.Cost.Utils, only: [batch_insert: 3] + import Console.Cost.Utils, only: [batch_insert: 2] alias Console.Schema.{Cluster, ClusterUsage, ClusterNamespaceUsage, ClusterScalingRecommendation} def ingest(attrs, %Cluster{id: id}) do start_transaction() - |> add_operation(:wipe_cluster, fn _ -> - ClusterUsage.for_cluster(id) - |> Repo.delete_all() - |> ok() + |> add_operation(:cluster, fn _ -> + case Repo.get_by(ClusterUsage, cluster_id: id) do + %ClusterUsage{} = usage -> usage + nil -> %ClusterUsage{cluster_id: id} + end + |> ClusterUsage.changeset(attrs[:cluster]) + |> Repo.insert_or_update() end) |> add_operation(:wipe_namespace, fn _ -> ClusterNamespaceUsage.for_cluster(id) @@ -21,22 +24,17 @@ defmodule Console.Cost.Ingester do |> Repo.delete_all() |> ok() end) - |> add_operation(:cluster, fn _ -> - %ClusterUsage{id: id} - |> ClusterUsage.changeset(attrs[:cluster]) - |> Repo.insert() - end) |> add_operation(:namespace, fn _ -> Map.get(attrs, :namespaces, []) - |> Stream.map(×tamped/1) + |> Stream.map(&cluster_timestamped(&1, id)) |> Stream.map(&Map.drop(&1, ~w(gpu_util)a)) - |> batch_insert(ClusterNamespaceUsage, repo: Repo) + |> batch_insert(ClusterNamespaceUsage) |> ok() end) |> add_operation(:scaling, fn _ -> Map.get(attrs, :recommendations, []) - |> Stream.map(×tamped/1) - |> batch_insert(ClusterScalingRecommendation, repo: Repo) + |> Stream.map(&cluster_timestamped(&1, id)) + |> batch_insert(ClusterScalingRecommendation) |> ok() end) |> execute() @@ -45,4 +43,9 @@ defmodule Console.Cost.Ingester do err -> err end end + + defp cluster_timestamped(map, cluster_id) do + timestamped(map) + |> Map.put(:cluster_id, cluster_id) + end end diff --git a/test/console/graphql/mutations/deployments/cluster_mutations_test.exs b/test/console/graphql/mutations/deployments/cluster_mutations_test.exs index aedae9445..a32978989 100644 --- a/test/console/graphql/mutations/deployments/cluster_mutations_test.exs +++ b/test/console/graphql/mutations/deployments/cluster_mutations_test.exs @@ -500,9 +500,20 @@ defmodule Console.GraphQl.Deployments.ClusterMutationsTest do } """, %{"costs" => ingest}, %{cluster: cluster}) - assert Console.Repo.aggregate(Console.Schema.ClusterUsage, :count, :id) == 1 - assert Console.Repo.aggregate(Console.Schema.ClusterNamespaceUsage, :count, :id) == 1 - assert Console.Repo.aggregate(Console.Schema.ClusterScalingRecommendation, :count, :id) == 1 + {:ok, %{data: %{"ingestClusterCost" => true}}} = run_query(""" + mutation Ingest($costs: CostIngestAttributes!) { + ingestClusterCost(costs: $costs) + } + """, %{"costs" => ingest}, %{cluster: cluster}) + + [usage] = Console.Repo.all(Console.Schema.ClusterUsage) + assert usage.cluster_id == cluster.id + + [ns] = Console.Repo.all(Console.Schema.ClusterNamespaceUsage) + assert ns.cluster_id == cluster.id + + [sr] = Console.Repo.all(Console.Schema.ClusterScalingRecommendation) + assert sr.cluster_id == cluster.id end end end