Skip to content

Commit

Permalink
GEOMESA-3376 Accumulo - add CLI command to retrieve audit logs (#3141)
Browse files Browse the repository at this point in the history
* Command is `query-audit-logs`
* Refactor/simplify internal audit API
* Make docs build fail on warnings
* Add docs github action
  • Loading branch information
elahrvivaz authored Jul 31, 2024
1 parent 4d6f647 commit 8e91933
Show file tree
Hide file tree
Showing 39 changed files with 939 additions and 705 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/build-docs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: build-docs

on:
push:
pull_request:

permissions: # added using https://github.com/step-security/secure-repo
contents: read

env:
MAVEN_CLI_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dlicense.skip=true --batch-mode

jobs:
build-docs:
runs-on: ubuntu-latest
# avoid duplicate jobs on PRs from the main repo
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.event.pull_request.base.repo.full_name

steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: actions/setup-java@99b8673ff64fbf99d8d325f52d9a5bdedb8483e9 # v4.2.1
with:
distribution: 'temurin'
java-version: '11'
cache: 'maven'
- uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1
with:
python-version: '3.10'
cache: 'pip'
cache-dependency-path: docs/requirements.txt
- name: Install python dependencies
run: pip install -r docs/requirements.txt
- name: Build with Maven
run: mvn clean install $MAVEN_CLI_OPTS -Pdocs -pl docs
- name: Remove geomesa artifacts
if: success() || failure()
run: rm -rf ~/.m2/repository/org/locationtech/geomesa
2 changes: 1 addition & 1 deletion docs/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

<target name="sphinx" if="sphinx.available">
<exec executable="${sphinx.binary}" failonerror="true" dir="${basedir}/${id}">
<arg line="-b ${build} -d &quot;${build.directory}/${id}/doctrees&quot; . &quot;${build.directory}/${id}/${build}&quot;" />
<arg line="-W -b ${build} -d &quot;${build.directory}/${id}/doctrees&quot; . &quot;${build.directory}/${id}/${build}&quot;" />
</exec>
</target>

Expand Down
26 changes: 25 additions & 1 deletion docs/user/accumulo/commandline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ The ``--add`` argument will add the stats iterator.
``configure-table``
^^^^^^^^^^^^^^^^^^^

The command will list and update properties on the Accumulo tables used by GeoMesa. It has two
This command will list and update properties on the Accumulo tables used by GeoMesa. It has two
sub-commands:

* ``list`` List the configuration options for a table
Expand Down Expand Up @@ -292,6 +292,30 @@ only show the one requested. For update, it is required as the property to updat

The ``--value`` argument is only used during update.

``query-audit-logs``
^^^^^^^^^^^^^^^^^^^^

This command will query the audit logs produced by GeoMesa.

======================== =============================================================
Argument Description
======================== =============================================================
``-c, --catalog *`` The catalog table containing schema metadata
``-f, --feature-name *`` The name of the schema
``-b, --begin`` Lower bound (inclusive) on the date of log entries to return, in ISO 8601 format
``-e, --end`` Upper bound (exclusive) on the date of log entries to return, in ISO 8601 format
``-q, --cql`` CQL predicate used to filter log entries
``--output-format`` Output format for result, one of either ``csv`` (default) or ``json``
======================== =============================================================

The ``--begin`` and ``--end`` arguments can be used to filter logs by date. For more advanced filtering,
the ``--cql`` argument accepts GeoTools `filter expressions <https://docs.geotools.org/stable/userguide/library/cql/ecql.html>`_.
The schema to use for filtering is::

date:Date,user:String,filter:String,hints:String,planTimeMillis:Long,scanTimeMillis:Long,hits:Long

The ``--output-format`` argument can be used to return logs as CSV or as JSON (JSON lines).

.. _accumulo_tools_stats_analyze:

``stats-analyze``
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.accumulo.audit

import org.apache.accumulo.core.client.AccumuloClient
import org.apache.accumulo.core.security.Authorizations
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore
import org.locationtech.geomesa.index.audit.AuditReader
import org.locationtech.geomesa.index.audit.AuditedEvent.QueryEvent
import org.locationtech.geomesa.security.AuthorizationsProvider
import org.locationtech.geomesa.utils.collection.{CloseableIterator, IsSynchronized, MaybeSynchronized, NotSynchronized}

import java.time.ZonedDateTime

/**
* An audit reader
*
* @param client accumulo client - note: assumed to be shared and not cleaned up on closed
* @param table table containing audit records
* @param authProvider auth provider
*/
class AccumuloAuditReader(client: AccumuloClient, table: String, authProvider: AuthorizationsProvider) extends AuditReader {

import scala.collection.JavaConverters._

def this(ds: AccumuloDataStore) = this(ds.connector, ds.config.auditWriter.table, ds.config.authProvider)

private val tableExists: MaybeSynchronized[Boolean] =
if (client.tableOperations().exists(table)) { new NotSynchronized(true) } else { new IsSynchronized(false) }

override def getQueryEvents(typeName: String, dates: (ZonedDateTime, ZonedDateTime)): CloseableIterator[QueryEvent] = {
if (!checkTable) { CloseableIterator.empty } else {
val scanner = client.createScanner(table, new Authorizations(authProvider.getAuthorizations.asScala.toSeq: _*))
AccumuloQueryEventTransform.iterator(scanner, typeName, dates)
}
}

override def close(): Unit = {}

private def checkTable: Boolean = {
if (tableExists.get) {
true
} else if (client.tableOperations().exists(table)) {
tableExists.set(true, false)
true
} else {
false
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.accumulo.audit

import org.apache.accumulo.core.client.{AccumuloClient, BatchWriter}
import org.locationtech.geomesa.accumulo.audit.AccumuloAuditWriter.ShutdownTimeout
import org.locationtech.geomesa.accumulo.util.{GeoMesaBatchWriterConfig, TableManager}
import org.locationtech.geomesa.index.audit.AuditWriter.AuditLogger
import org.locationtech.geomesa.index.audit.AuditedEvent
import org.locationtech.geomesa.index.audit.AuditedEvent.QueryEvent
import org.locationtech.geomesa.utils.audit.AuditProvider
import org.locationtech.geomesa.utils.concurrent.ExitingExecutor
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty

import java.io.Closeable
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}
import scala.concurrent.{Future, Promise}
import scala.util.Try
import scala.util.control.NonFatal

/**
* Audit writer that persists log entries to a table in Accumulo
*
* @param client accumulo client
* @param table table to write to
* @param auditProvider audit provider
* @param enabled enable table writes (entries will always be written to logs)
*/
class AccumuloAuditWriter(
client: AccumuloClient,
val table: String,
auditProvider: AuditProvider,
enabled: Boolean
) extends AuditLogger(StoreType, auditProvider) with Runnable with Closeable {

private var writer: BatchWriter = _

private val queue = new java.util.concurrent.ConcurrentLinkedQueue[(Promise[Unit], QueryEvent)]

private val running = new AtomicBoolean(enabled)
private val writeLock = new ReentrantLock()

private val timeout = ShutdownTimeout.toDuration.get.toMillis

private val scheduledRun =
AccumuloAuditWriter.WriteInterval.toDuration.collect {
case d if enabled && d.isFinite =>
val millis = d.toMillis
logger.debug(s"Scheduling audit writer for ${millis}ms")
AccumuloAuditWriter.executor.scheduleWithFixedDelay(this, millis, millis, TimeUnit.MILLISECONDS)
}

override protected def write(event: AuditedEvent.QueryEvent): Future[Unit] = {
val log = super.write(event)
if (running.get()) {
val promise = Promise[Unit]()
queue.offer(promise -> event) // unbounded queue so will never fail
promise.future
} else {
log
}
}

override def run(): Unit = {
if (running.get() && writeLock.tryLock()) {
try { writeQueuedEvents() } finally {
writeLock.unlock()
}
}
}

private def writeQueuedEvents(): Unit = {
try {
var promiseAndEvent = queue.poll()
if (promiseAndEvent != null) {
val stopTime = System.currentTimeMillis() + timeout
if (writer == null) {
new TableManager(client).ensureTableExists(table)
val writeConfig = GeoMesaBatchWriterConfig().setMaxMemory(10000L).setMaxWriteThreads(5)
writer = client.createBatchWriter(table, writeConfig)
}
while (promiseAndEvent != null) {
val (promise, event) = promiseAndEvent
promise.complete(Try(writer.addMutation(AccumuloQueryEventTransform.toMutation(event))))
promiseAndEvent = if (running.get() || System.currentTimeMillis() < stopTime) { queue.poll() } else { null }
}
writer.flush()
}
} catch {
case NonFatal(e) => logger.error("Error writing audit logs:", e)
}
}

override def close(): Unit = {
if (running.compareAndSet(true, false)) {
scheduledRun.foreach(_.cancel(false))
try {
if (writeLock.tryLock()) {
// not currently running, so clean up any remaining events
writeQueuedEvents()
} else {
// currently running, wait for run to end
writeLock.lock()
}
if (writer != null) {
writer.close()
}
} finally {
writeLock.unlock()
}
}
}
}

object AccumuloAuditWriter {

val WriteInterval: SystemProperty = SystemProperty("geomesa.accumulo.audit.interval", "5 seconds")
val ShutdownTimeout: SystemProperty = SystemProperty("geomesa.accumulo.audit.shutdown.timeout", "5 seconds")

private val executor = ExitingExecutor(new ScheduledThreadPoolExecutor(5), force = true)
}

This file was deleted.

Loading

0 comments on commit 8e91933

Please sign in to comment.