Skip to content

Commit

Permalink
Merge pull request #4326 from guardian/an/cloudwatch-metrics-lifecycle
Browse files Browse the repository at this point in the history
make an effort to send all metrics before play app shuts down
  • Loading branch information
andrew-nowak authored Sep 5, 2024
2 parents db79d54 + fd22000 commit c4a4a14
Show file tree
Hide file tree
Showing 16 changed files with 52 additions and 27 deletions.
2 changes: 1 addition & 1 deletion collections/app/CollectionsComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class CollectionsComponents(context: Context) extends GridComponents(context, ne
final override val buildInfo = utils.buildinfo.BuildInfo

val store = new CollectionsStore(config)
val metrics = new CollectionsMetrics(config, actorSystem)
val metrics = new CollectionsMetrics(config, actorSystem, applicationLifecycle)
val notifications = new Notifications(config)

val collections = new CollectionsController(auth, config, store, controllerComponents)
Expand Down
6 changes: 4 additions & 2 deletions collections/app/lib/CollectionsMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package lib

import akka.actor.ActorSystem
import com.gu.mediaservice.lib.metrics.CloudWatchMetrics
import play.api.inject.ApplicationLifecycle

import scala.concurrent.ExecutionContext

class CollectionsMetrics(
config: CollectionsConfig,
actorSystem: ActorSystem
actorSystem: ActorSystem,
applicationLifecycle: ApplicationLifecycle
)(implicit ec: ExecutionContext)
extends CloudWatchMetrics(s"${config.stage}/Collections", config, actorSystem) {
extends CloudWatchMetrics(s"${config.stage}/Collections", config, actorSystem, applicationLifecycle) {

val processingLatency = new TimeMetric("ProcessingLatency")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.gu.mediaservice.lib.metrics

import akka.actor.{Actor, ActorSystem, Props, Timers}
import akka.pattern.ask
import akka.util.Timeout
import com.amazonaws.services.cloudwatch.{AmazonCloudWatch, AmazonCloudWatchClientBuilder}
import com.amazonaws.services.cloudwatch.model.{Dimension, MetricDatum, PutMetricDataRequest, StandardUnit, StatisticSet}
import com.gu.mediaservice.lib.config.CommonConfig
import com.gu.mediaservice.lib.logging.GridLogging
import play.api.inject.ApplicationLifecycle

import java.util.Date
import scala.collection.JavaConverters._
Expand All @@ -20,7 +23,8 @@ trait Metric[A] {
abstract class CloudWatchMetrics(
namespace: String,
config: CommonConfig,
actorSystem: ActorSystem
actorSystem: ActorSystem,
applicationLifecycle: ApplicationLifecycle
) extends GridLogging {

class CountMetric(name: String) extends CloudWatchMetric[Long](name) {
Expand All @@ -43,6 +47,8 @@ abstract class CloudWatchMetrics(

private[CloudWatchMetrics] val metricsActor = actorSystem.actorOf(MetricsActor.props(namespace, client), "metricsactor")

applicationLifecycle.addStopHook(() => (metricsActor ? MetricsActor.Shutdown)(Timeout(5.seconds)))

abstract class CloudWatchMetric[A](val name: String) extends Metric[A] {
final def recordOne(value: A, dimensions: List[Dimension] = Nil): Unit =
metricsActor ! MetricsActor.AddMetrics(List(toDatum(value, dimensions)))
Expand Down Expand Up @@ -74,6 +80,7 @@ object MetricsActor {
Props(new MetricsActor(namespace, client))

private final case object Tick
final case object Shutdown
final case class AddMetrics(values: Seq[MetricDatum])
}
private class MetricsActor(namespace: String, client: AmazonCloudWatch) extends Actor with Timers with GridLogging {
Expand Down Expand Up @@ -124,11 +131,18 @@ private class MetricsActor(namespace: String, client: AmazonCloudWatch) extends

def receive: Receive = {
case AddMetrics(metrics) => become(queued(metrics))
case Shutdown => become(shutdown)
}
private def shutdown: Receive = {
case message => logger.error(s"metrics actor has shut down, cannot respond to message $message")
}
private def queued(queuedMetrics: Seq[MetricDatum]): Receive = {
case Tick =>
putData(queuedMetrics) // send metrics off
become(receive)
case Shutdown =>
putData(queuedMetrics)
become(shutdown)
case AddMetrics(metrics) =>
become(queued(queuedMetrics ++ metrics))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import akka.actor.ActorSystem
import akka.stream.Materializer
import com.gu.mediaservice.lib.config.CommonConfig
import com.gu.mediaservice.lib.metrics.CloudWatchMetrics
import play.api.inject.ApplicationLifecycle
import play.api.mvc.{Filter, RequestHeader, Result}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

class RequestMetricFilter(val config: CommonConfig, override val mat: Materializer, actorSystem: ActorSystem)(implicit ec: ExecutionContext) extends Filter {
class RequestMetricFilter(val config: CommonConfig, override val mat: Materializer, actorSystem: ActorSystem, applicationLifecycle: ApplicationLifecycle)(implicit ec: ExecutionContext) extends Filter {
val namespace: String = s"${config.stage}/${config.appName.split('-').map(_.toLowerCase.capitalize).mkString("")}"
val enabled: Boolean = config.requestMetricsEnabled

object RequestMetrics extends CloudWatchMetrics(namespace, config, actorSystem) {
object RequestMetrics extends CloudWatchMetrics(namespace, config, actorSystem, applicationLifecycle) {
val totalRequests = new CountMetric("TotalRequests")
val successfulRequests = new CountMetric("SuccessfulRequests")
val failedRequests = new CountMetric("FailedRequests")
Expand Down
2 changes: 1 addition & 1 deletion image-loader/app/ImageLoaderComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ImageLoaderComponents(context: Context) extends GridComponents(context, ne
val services = new Services(config.domainRoot, config.serviceHosts, Set.empty)
private val gridClient = GridClient(services)(wsClient)

val metrics = new ImageLoaderMetrics(config, actorSystem)
val metrics = new ImageLoaderMetrics(config, actorSystem, applicationLifecycle)

val controller = new ImageLoaderController(
auth, downloader, store, maybeIngestQueue, uploadStatusTable, notifications, config, uploader, quarantineUploader, projector, controllerComponents, gridClient, authorisation, metrics)
Expand Down
5 changes: 3 additions & 2 deletions image-loader/app/lib/ImageLoaderMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package lib

import akka.actor.ActorSystem
import com.gu.mediaservice.lib.metrics.CloudWatchMetrics
import play.api.inject.ApplicationLifecycle

class ImageLoaderMetrics(config: ImageLoaderConfig, actorSystem: ActorSystem)
extends CloudWatchMetrics (namespace = s"${config.stage}/ImageLoader", config, actorSystem){
class ImageLoaderMetrics(config: ImageLoaderConfig, actorSystem: ActorSystem, applicationLifecycle: ApplicationLifecycle)
extends CloudWatchMetrics (namespace = s"${config.stage}/ImageLoader", config, actorSystem, applicationLifecycle){

val successfulIngestsFromQueue = new CountMetric("SuccessfulIngestsFromQueue")

Expand Down
2 changes: 1 addition & 1 deletion media-api/app/MediaApiComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class MediaApiComponents(context: Context) extends GridComponents(context, new M
val imageOperations = new ImageOperations(context.environment.rootPath.getAbsolutePath)

val messageSender = new ThrallMessageSender(config.thrallKinesisStreamConfig)
val mediaApiMetrics = new MediaApiMetrics(config, actorSystem)
val mediaApiMetrics = new MediaApiMetrics(config, actorSystem, applicationLifecycle)

val s3Client = new S3Client(config)

Expand Down
5 changes: 3 additions & 2 deletions media-api/app/lib/MediaApiMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import akka.actor.ActorSystem
import com.amazonaws.services.cloudwatch.model.Dimension
import com.gu.mediaservice.lib.auth.{ApiAccessor, Syndication}
import com.gu.mediaservice.lib.metrics.CloudWatchMetrics
import play.api.inject.ApplicationLifecycle

import scala.concurrent.ExecutionContext

class MediaApiMetrics(config: MediaApiConfig, actorSystem: ActorSystem)(implicit ec: ExecutionContext)
extends CloudWatchMetrics(s"${config.stage}/MediaApi", config, actorSystem) {
class MediaApiMetrics(config: MediaApiConfig, actorSystem: ActorSystem, applicationLifecycle: ApplicationLifecycle)(implicit ec: ExecutionContext)
extends CloudWatchMetrics(s"${config.stage}/MediaApi", config, actorSystem, applicationLifecycle) {

val searchQueries = new TimeMetric("ElasticSearch")

Expand Down
12 changes: 7 additions & 5 deletions media-api/test/lib/elasticsearch/ElasticSearchTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ class ElasticSearchTest extends ElasticSearchTestBase with Eventually with Elast

private val index = "images"

private val applicationLifecycle = new ApplicationLifecycle {
override def addStopHook(hook: () => Future[_]): Unit = {}
override def stop(): Future[_] = Future.successful(())
}

private val mediaApiConfig = new MediaApiConfig(GridConfigResources(
Configuration.from(USED_CONFIGS_IN_TEST ++ MOCK_CONFIG_KEYS.map(_ -> NOT_USED_IN_TEST).toMap),
null,
new ApplicationLifecycle {
override def addStopHook(hook: () => Future[_]): Unit = {}
override def stop(): Future[_] = Future.successful(())
}
applicationLifecycle
))
private val actorSystem: ActorSystem = ActorSystem()
private val mediaApiMetrics = new MediaApiMetrics(mediaApiConfig, actorSystem)
private val mediaApiMetrics = new MediaApiMetrics(mediaApiConfig, actorSystem, applicationLifecycle)
val elasticConfig = ElasticSearchConfig(
aliases = ElasticSearchAliases(
current = "Images_Current",
Expand Down
2 changes: 1 addition & 1 deletion metadata-editor/app/MetadataEditorComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class MetadataEditorComponents(context: Context) extends GridComponents(context,
val notifications = new Notifications(config)
val imageOperations = new ImageOperations(context.environment.rootPath.getAbsolutePath)

val metrics = new MetadataEditorMetrics(config, actorSystem)
val metrics = new MetadataEditorMetrics(config, actorSystem, applicationLifecycle)
val messageConsumer = new MetadataSqsMessageConsumer(config, metrics, editsStore)

messageConsumer.startSchedule()
Expand Down
6 changes: 4 additions & 2 deletions metadata-editor/app/lib/MetadataEditorMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package lib

import akka.actor.ActorSystem
import com.gu.mediaservice.lib.metrics.CloudWatchMetrics
import play.api.inject.ApplicationLifecycle

import scala.concurrent.ExecutionContext

class MetadataEditorMetrics(
config: EditsConfig,
actorSystem: ActorSystem
actorSystem: ActorSystem,
applicationLifecycle: ApplicationLifecycle
)(implicit ec: ExecutionContext)
extends CloudWatchMetrics(s"${config.stage}/MetadataEditor", config, actorSystem) {
extends CloudWatchMetrics(s"${config.stage}/MetadataEditor", config, actorSystem, applicationLifecycle) {

val snsMessage = new CountMetric("SNSMessage")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class GridComponents[Config <: CommonConfig](context: Context, val load
gzipFilter,
new RequestLoggingFilter(materializer),
new ConnectionBrokenFilter(materializer),
new RequestMetricFilter(config, materializer, actorSystem)
new RequestMetricFilter(config, materializer, actorSystem, applicationLifecycle)
)

final override lazy val corsConfig: CORSConfig = CORSConfig.fromConfiguration(context.initialConfiguration).copy(
Expand Down
2 changes: 1 addition & 1 deletion thrall/app/ThrallComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class ThrallComponents(context: Context) extends GridComponents(context, new Thr

val store = new ThrallStore(config)
val metadataEditorNotifications = new MetadataEditorNotifications(config)
val thrallMetrics = new ThrallMetrics(config, actorSystem)
val thrallMetrics = new ThrallMetrics(config, actorSystem, applicationLifecycle)

val es = new ElasticSearch(config.esConfig, Some(thrallMetrics), actorSystem.scheduler)
es.ensureIndexExistsAndAliasAssigned()
Expand Down
5 changes: 3 additions & 2 deletions thrall/app/lib/ThrallMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package lib

import akka.actor.ActorSystem
import com.gu.mediaservice.lib.metrics.CloudWatchMetrics
import play.api.inject.ApplicationLifecycle

import scala.concurrent.ExecutionContext

class ThrallMetrics(config: ThrallConfig, actorSystem: ActorSystem)(implicit ec: ExecutionContext)
extends CloudWatchMetrics(s"${config.stage}/Thrall", config, actorSystem) {
class ThrallMetrics(config: ThrallConfig, actorSystem: ActorSystem, applicationLifecycle: ApplicationLifecycle)(implicit ec: ExecutionContext)
extends CloudWatchMetrics(s"${config.stage}/Thrall", config, actorSystem, applicationLifecycle) {

val indexedImages = new CountMetric("IndexedImages")

Expand Down
2 changes: 1 addition & 1 deletion usage/app/UsageComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class UsageComponents(context: Context) extends GridComponents(context, new Usag
val liveContentApi = new LiveContentApi(config)(ScheduledExecutor())
val usageGroupOps = new UsageGroupOps(config, mediaWrapper)
val usageTable = new UsageTable(config)
val usageMetrics = new UsageMetrics(config, actorSystem)
val usageMetrics = new UsageMetrics(config, actorSystem, applicationLifecycle)
val usageNotifier = new UsageNotifier(config, usageTable)
val usageRecorder = new UsageRecorder(usageMetrics, usageTable, usageNotifier, usageNotifier)
val notifications = new Notifications(config)
Expand Down
5 changes: 3 additions & 2 deletions usage/app/lib/UsageMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package lib

import akka.actor.ActorSystem
import com.gu.mediaservice.lib.metrics.CloudWatchMetrics
import play.api.inject.ApplicationLifecycle

import scala.concurrent.ExecutionContext

class UsageMetrics(config: UsageConfig, actorSystem: ActorSystem)(implicit ec: ExecutionContext)
extends CloudWatchMetrics(s"${config.stage}/Usage", config, actorSystem) {
class UsageMetrics(config: UsageConfig, actorSystem: ActorSystem, applicationLifecycle: ApplicationLifecycle)(implicit ec: ExecutionContext)
extends CloudWatchMetrics(s"${config.stage}/Usage", config, actorSystem, applicationLifecycle) {

def incrementUpdated = updates.increment()
def incrementErrors = errors.increment()
Expand Down

0 comments on commit c4a4a14

Please sign in to comment.