Skip to content

Commit

Permalink
C* Spark RDD Provider work clean up.
Browse files Browse the repository at this point in the history
Includes contributions from Lucas Chen and Sotirios Roussos.
This is a WIP and needs some issues sorted out.

Signed-off-by: Jim Hughes <[email protected]>

commit 4dcac37
Author: Lucas Chen <[email protected]>
Date:   Wed Dec 11 00:27:21 2019 -0500

    Initial implementation of CassandraSpatialRDDProvider

    Signed-off-by: Lucas Chen <[email protected]>

commit 517bd96
Author: Sotirios Roussos <[email protected]>
Date:   Tue Dec 17 17:15:38 2019 +0200

    Cassandra Spark fixes

    Signed-off-by: Sotirios Roussos <[email protected]>

commit 7ad15ed
Author: Sotirios Roussos <[email protected]>
Date:   Mon Dec 16 23:44:44 2019 +0200

    Cassandra Spark

    Signed-off-by: Sotirios Roussos <[email protected]>
  • Loading branch information
jnh5y committed Jun 24, 2020
1 parent 2f8f2ca commit 23b1009
Show file tree
Hide file tree
Showing 15 changed files with 1,720 additions and 0 deletions.
83 changes: 83 additions & 0 deletions geomesa-cassandra/geomesa-cassandra-jobs/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>geomesa-cassandra_2.11</artifactId>
<groupId>org.locationtech.geomesa</groupId>
<version>3.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>geomesa-cassandra-jobs_2.11</artifactId>
<name>GeoMesa Cassandra Jobs</name>

<dependencies>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-z3_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-utils_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-feature-all_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-filter_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-index-api_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-process-vector_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-cassandra-datastore_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>compile</scope>
</dependency>

<!-- test dependencies -->
<!-- <dependency>-->
<!-- <groupId>org.locationtech.geomesa</groupId>-->
<!-- <artifactId>geomesa-accumulo-datastore_${scala.binary.version}</artifactId>-->
<!-- <classifier>tests</classifier>-->
<!-- </dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-junit_${scala.binary.version}</artifactId>
</dependency>
</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/***********************************************************************
* Copyright (c) 2013-2020 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.cassandra.jobs

import org.geotools.data.Query
import org.locationtech.geomesa.cassandra.data.{CassandraDataStore, EmptyPlan, StatementPlan}

object CassandraJobUtils {
/**
* Gets (potentially) multiple scan plans. Each plan will only scan a single table
*
* @param ds data store
* @param query query
* @throws java.lang.IllegalArgumentException if query can't be answered with scan plans
* @return
*/
@throws(classOf[IllegalArgumentException])
def getMultiStatementPlans(ds: CassandraDataStore, query: Query): Seq[StatementPlan] = {
ds.getQueryPlan(query).flatMap {
case p: StatementPlan if p.tables.lengthCompare(1) == 0 => Seq(p)
case p: StatementPlan => p.tables.map(table => p.copy(tables = Seq(table)))
case _: EmptyPlan => Seq.empty
case p =>
throw new IllegalArgumentException("Query requires a scan which is not supported through " +
s"the input format: ${p.getClass.getName}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/***********************************************************************
* Copyright (c) 2013-2020 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.cassandra.jobs

import org.geotools.data.Query
import org.locationtech.geomesa.cassandra.data.{CassandraDataStore, StatementPlan}

object CassandraJobUtils1 {

/**
* Gets a single scan plan, against a single table
*
* @param ds data store
* @param query query
* @throws java.lang.IllegalArgumentException if query can't be answered with a single scan plan
* @return
*/
@throws(classOf[IllegalArgumentException])
def getSingleScanPlan(ds: CassandraDataStore, query: Query): StatementPlan = {
// get the query plan to set up the iterators, ranges, etc
val plans = ds.getQueryPlan(query)
if (plans.lengthCompare(1) != 0) {
throw new IllegalArgumentException(s"Query requires multiple query plans: ${plans.mkString(", ")}")
}

plans.head match {
case p: StatementPlan if p.tables.lengthCompare(1) == 0 => p

case p: StatementPlan =>
throw new IllegalArgumentException("Query requires multiple tables, which is not supported through " +
s"the input format: ${p.tables.mkString(", ")}")

case p =>
throw new IllegalArgumentException("Query requires a scan which is not supported through " +
s"the input format: ${p.getClass.getName}")
}
}

/**
* Gets (potentially) multiple scan plans. Each plan will only scan a single table
*
* @param ds data store
* @param query query
* @throws java.lang.IllegalArgumentException if query can't be answered with scan plans
* @return
*/
@throws(classOf[IllegalArgumentException])
def getMultiScanPlans(ds: CassandraDataStore, query: Query): Seq[StatementPlan] = {
ds.getQueryPlan(query).flatMap {
case p: StatementPlan if p.tables.lengthCompare(1) == 0 => Seq(p)
case p: StatementPlan => p.tables.map(table => p.copy(tables = Seq(table)))
case _: StatementPlan => Seq.empty
case p =>
throw new IllegalArgumentException("Query requires a scan which is not supported through " +
s"the input format: ${p.getClass.getName}")
}
}
}
Loading

0 comments on commit 23b1009

Please sign in to comment.