diff --git a/.gitignore b/.gitignore index c58d83b..85d6d64 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ project/plugins/project/ # Scala-IDE specific .scala_dependencies .worksheet + + +.idea/ \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..ad0aca1 --- /dev/null +++ b/build.sbt @@ -0,0 +1,21 @@ +name := """ingestion-util""" + +organization := "com.harrys" + +version := "0.0.1" + +scalaVersion := "2.11.7" + +exportJars := true + +libraryDependencies ++= Seq( + "ch.qos.logback" % "logback-classic" % "1.1.3", + "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0", + "commons-io" % "commons-io" % "2.4", + "ma.glasnost.orika" % "orika-core" % "1.4.6", + "org.apache.httpcomponents" % "httpclient" % "4.4.1", + "org.json4s" %% "json4s-core" % "3.2.11", + "org.json4s" %% "json4s-jackson" % "3.2.11", + "org.json4s" %% "json4s-ext" % "3.2.11", + "org.postgresql" % "postgresql" % "9.4-1201-jdbc41" +) \ No newline at end of file diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..748703f --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.7 diff --git a/src/main/scala/com/harrys/file/TransientFileFactory.scala b/src/main/scala/com/harrys/file/TransientFileFactory.scala new file mode 100644 index 0000000..fe8796b --- /dev/null +++ b/src/main/scala/com/harrys/file/TransientFileFactory.scala @@ -0,0 +1,103 @@ +package com.harrys.file + +import java.io.File +import java.nio.file.attribute.FileAttribute +import java.nio.file.{Files, Path} +import java.util.concurrent._ + +import com.typesafe.scalalogging.Logger +import org.apache.commons.io.FileUtils +import org.slf4j.LoggerFactory + +import scala.collection.mutable +import scala.ref._ + +/** + * Created by chris on 10/14/15. + */ +class TransientFileFactory(directory: Path) { + private val log = Logger(LoggerFactory.getLogger(this.getClass)) + + private val pendingRefFiles = new mutable.HashSet[Reference[File]]() + private val phantomRefQueue = new ReferenceQueue[File]() + private val cleanupExecutor = new ThreadPoolExecutor(0, 1, 10, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable](), new ThreadFactory { + override def newThread(r: Runnable): Thread = { + val thread = new Thread(r) + thread.setDaemon(true) + thread.setName("TransientFileCleanup") + thread + } + }) + + final def create(prefix: String, suffix: String, attributes: FileAttribute[_]*) : File = { + val tempFile = Files.createTempFile(directory, prefix, suffix, attributes:_*).toFile + registerNewTempFile(tempFile) + } + + final def shutdown() : Unit = { + if (this == TransientFileFactory.default){ + log.warn("Rejecting attempt to stop the default instance") + } else { + this.forceShutdown() + } + } + + private final def forceShutdown() : Unit = { + cleanupExecutor.shutdown() + pendingRefFiles.synchronized { + pendingRefFiles.flatMap(_.get).foreach(_.deleteOnExit) + pendingRefFiles.clear() + } + if (!cleanupExecutor.awaitTermination(50, TimeUnit.MILLISECONDS)){ + log.warn("Forcing Executor shutdown") + cleanupExecutor.shutdownNow() + } + } + + private final def registerNewTempFile(file: File) : File = { + val phantomRef = new PhantomReference[File](file, phantomRefQueue) + log.debug(s"Registered new Transient File: ${file.getAbsolutePath}") + pendingRefFiles.synchronized { + if (pendingRefFiles.isEmpty){ + cleanupExecutor.execute(new CleanupPollingTask()) + } + pendingRefFiles.add(phantomRef) + } + return file + } + + private final def cleanupRegisteredRef(ref: Reference[File]) : Unit = { + pendingRefFiles.synchronized { + pendingRefFiles.remove(ref) + } + ref.get.collect { + case file if file.exists() => + log.debug(s"Deleting Transient File: ${file.getAbsolutePath}") + file.delete() + } + ref.clear() + } + + private final class CleanupPollingTask extends Runnable { + override final def run() : Unit = { + while (!cleanupExecutor.isTerminating && pendingRefFiles.synchronized { pendingRefFiles.nonEmpty }){ + phantomRefQueue.remove.foreach(cleanupRegisteredRef) + } + } + } +} + +object TransientFileFactory { + + lazy val default = { + val factory = new TransientFileFactory(FileUtils.getTempDirectory.toPath) + Runtime.getRuntime.addShutdownHook(new Thread(new Runnable(){ + def run() : Unit = factory.forceShutdown() + })) + factory + } + + final def create(prefix: String, suffix: String, attributes: FileAttribute[_]*) : File = { + default.create(prefix, suffix, attributes:_*) + } +} diff --git a/src/main/scala/com/harrys/http/.DS_Store b/src/main/scala/com/harrys/http/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/src/main/scala/com/harrys/http/.DS_Store differ diff --git a/src/main/scala/com/harrys/http/AbstractHttpClient.scala b/src/main/scala/com/harrys/http/AbstractHttpClient.scala new file mode 100644 index 0000000..fbf7e71 --- /dev/null +++ b/src/main/scala/com/harrys/http/AbstractHttpClient.scala @@ -0,0 +1,81 @@ +package com.harrys.http + +import java.io._ + +import com.harrys.file.TransientFileFactory +import com.harrys.util.Timer +import com.typesafe.scalalogging.Logger +import org.apache.commons.io.IOUtils +import org.apache.http.client.methods.{CloseableHttpResponse, HttpUriRequest} +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.util.EntityUtils +import org.json4s.jackson.JsonMethods +import org.json4s.{Formats, JValue} +import org.slf4j.LoggerFactory + +/** + * Created by chris on 10/14/15. + */ +abstract class AbstractHttpClient { + private val log = Logger(LoggerFactory.getLogger(this.getClass)) + + protected def createHttpClient() : CloseableHttpClient + + final def performRequestForJson(request: HttpUriRequest)(implicit formats: Formats) : (HttpResponseSummary, JValue) = { + this.performRequest(request) { response => + val summary = new HttpResponseSummary(response) + (summary, JsonMethods.parse(response.getEntity.getContent)) + } + } + + final def performRequestToBuffer(request: HttpUriRequest) : (HttpResponseSummary, Array[Byte]) = { + this.performRequest(request) { response => + response.getEntity.getContentEncoding + val summary = new HttpResponseSummary(response) + (summary, EntityUtils.toByteArray(response.getEntity)) + } + } + + final def performRequestToLocalFile(request: HttpUriRequest) : (HttpResponseSummary, File) = { + this.performRequest(request) { response => + response.getEntity.getContentType + val summary = new HttpResponseSummary(response) + (summary, copyResponseToFile(response)) + } + } + + private final def copyResponseToFile(response: CloseableHttpResponse) : File = { + val tempFile = TransientFileFactory.create("httpraw", "response") + val fileStream = new FileOutputStream(tempFile) + try { + response.getEntity.writeTo(fileStream) + return tempFile + } finally { + IOUtils.closeQuietly(fileStream) + } + } + + + final def performRequest[T](request: HttpUriRequest)(handler: (CloseableHttpResponse) => T) : T = { + val client = this.createHttpClient() + + try { + val (duration, httpResult) = Timer.timeWithAttempt { client.execute(request) } + + httpResult match { + case util.Failure(cause) => + log.error(s"[${ request.getMethod }] ${ request.getURI.getPath }(${ duration.toMillis.toString })", cause) + throw cause + case util.Success(response) => + log.debug(s"${} [${ request.getMethod }] ${ response.getStatusLine.getStatusCode } ${ request.getURI.getPath } (${ duration.toMillis.toString })") + try { + handler(response) + } finally { + IOUtils.closeQuietly(response) + } + } + } finally { + IOUtils.closeQuietly(client) + } + } +} diff --git a/src/main/scala/com/harrys/http/HttpClientDefaults.scala b/src/main/scala/com/harrys/http/HttpClientDefaults.scala new file mode 100644 index 0000000..4ce19a7 --- /dev/null +++ b/src/main/scala/com/harrys/http/HttpClientDefaults.scala @@ -0,0 +1,22 @@ +package com.harrys.http + +import org.apache.http.client.config.RequestConfig +import org.apache.http.impl.client.{HttpClientBuilder, HttpClients} + +import scala.concurrent.duration._ + +/** + * Created by chris on 10/14/15. + */ +object HttpClientDefaults { + val ConnectTimeout = FiniteDuration(10, SECONDS) + val SocketTimeout = FiniteDuration(30, SECONDS) + + def defaultRequestConfig : RequestConfig.Builder = RequestConfig.copy(RequestConfig.DEFAULT) + .setConnectTimeout(ConnectTimeout.toMillis.toInt) + .setSocketTimeout(SocketTimeout.toMillis.toInt) + + + def defaultHttpClientBuilder : HttpClientBuilder = HttpClients.custom() + .setDefaultRequestConfig(defaultRequestConfig.build()) +} diff --git a/src/main/scala/com/harrys/http/HttpResponseSummary.scala b/src/main/scala/com/harrys/http/HttpResponseSummary.scala new file mode 100644 index 0000000..6b73516 --- /dev/null +++ b/src/main/scala/com/harrys/http/HttpResponseSummary.scala @@ -0,0 +1,23 @@ +package com.harrys.http + +import org.apache.http.{Header, HttpResponse, StatusLine} + +/** + * Created by chris on 10/14/15. + */ +class HttpResponseSummary(response: HttpResponse) { + + def statusLine: StatusLine = response.getStatusLine + + def statusCode: Int = statusLine.getStatusCode + + def reasonPhrase: String = statusLine.getReasonPhrase + + def contentType: Header = response.getEntity.getContentType + + def contentLength: Long = response.getEntity.getContentLength + + def contentEncoding: Header = response.getEntity.getContentEncoding + + def allHeaders: Seq[Header] = response.getAllHeaders +} diff --git a/src/main/scala/com/harrys/orika/DateTimeConverters.scala b/src/main/scala/com/harrys/orika/DateTimeConverters.scala new file mode 100644 index 0000000..83790c4 --- /dev/null +++ b/src/main/scala/com/harrys/orika/DateTimeConverters.scala @@ -0,0 +1,121 @@ +package com.harrys.orika + +import java.sql +import java.sql.Timestamp +import java.util.Date + +import ma.glasnost.orika.converter.builtin.PassThroughConverter +import ma.glasnost.orika.metadata.Type +import ma.glasnost.orika.{Converter, CustomConverter} +import org.joda.time.{DateTime, DateTimeZone, LocalDate} + +/** + * Created by chris on 10/14/15. + */ +object DateTimeConverters { + def dateTimeConverters: Seq[Converter[_, _]] = Seq[Converter[_, _]]( + // JodaTime literal definitions + new PassThroughConverter(classOf[DateTime], classOf[LocalDate]), + // DateTime <-> Unix Epoch + new DateTimeToLong, new LongToDateTime, + // DateTime <-> java.util.Date + new DateTimeToDate, new DateToDateTime, + // DateTime <-> java.sql.Timestamp + new DateTimeToTimestamp, new TimestampToDateTime, + // LocalDate <-> java.sql.Date + new LocalDateToSqlDate, new SqlDateToLocalDate, + // DateTime -> LocalDate (unidirectional) + new DateTimeToLocalDate + ) + + final class SqlDateToLocalDate extends CustomConverter[sql.Date, LocalDate]{ + override def convert(source: sql.Date, destinationType: Type[_ <: LocalDate]): LocalDate = { + if (source == null){ + null + } else { + new LocalDate(source.getTime, DateTimeZone.UTC) + } + } + } + + + final class LocalDateToSqlDate extends CustomConverter[LocalDate, sql.Date]{ + override def convert(source: LocalDate, destinationType: Type[_ <: sql.Date]): sql.Date = { + if (source == null){ + null + } else { + new sql.Date(source.toDate.getTime) + } + } + } + + final class DateTimeToTimestamp extends CustomConverter[DateTime, Timestamp]{ + override def convert(source: DateTime, destinationType: Type[_ <: Timestamp]): Timestamp = { + if (source == null){ + null + } else { + new Timestamp(source.toDateTime(DateTimeZone.UTC).getMillis) + } + } + } + + final class TimestampToDateTime extends CustomConverter[Timestamp, DateTime]{ + override def convert(source: Timestamp, destinationType: Type[_ <: DateTime]): DateTime = { + if (source == null){ + null + } else { + new DateTime(source.getTime, DateTimeZone.UTC) + } + } + } + + final class DateTimeToDate extends CustomConverter[DateTime, Date]{ + override def convert(source: DateTime, destinationType: Type[_ <: Date]) : Date = { + if (source == null){ + null + } else { + source.toDateTime(DateTimeZone.UTC).toDate + } + } + } + + final class DateToDateTime extends CustomConverter[Date, DateTime]{ + override def convert(source: Date, destinationType: Type[_ <: DateTime]): DateTime = { + if (source == null){ + null + } else { + new DateTime(source.getTime, DateTimeZone.UTC) + } + } + } + + final class DateTimeToLocalDate extends CustomConverter[DateTime, LocalDate]{ + override def convert(source: DateTime, destinationType: Type[_ <: LocalDate]) : LocalDate = { + if (source == null){ + null + } else { + source.toDateTime(DateTimeZone.UTC).toLocalDate + } + } + } + + final class LongToDateTime extends CustomConverter[java.lang.Long, DateTime] { + override def convert(source: java.lang.Long, destinationType: Type[_ <: DateTime]): DateTime = { + if (source == null){ + null + } else { + new DateTime(source, DateTimeZone.UTC) + } + } + } + + final class DateTimeToLong extends CustomConverter[DateTime, java.lang.Long] { + override def convert(source: DateTime, destinationType: Type[_ <: java.lang.Long]): java.lang.Long = { + if (source == null){ + null.asInstanceOf[java.lang.Long] + } else { + source.toDateTime(DateTimeZone.UTC).getMillis.asInstanceOf[java.lang.Long] + } + } + } +} diff --git a/src/main/scala/com/harrys/orika/OrikaUtils.scala b/src/main/scala/com/harrys/orika/OrikaUtils.scala new file mode 100644 index 0000000..d254a89 --- /dev/null +++ b/src/main/scala/com/harrys/orika/OrikaUtils.scala @@ -0,0 +1,23 @@ +package com.harrys.orika + + +import ma.glasnost.orika.impl.DefaultMapperFactory +import ma.glasnost.orika.{Converter, MapperFactory} + +/** + * Created by chris on 10/14/15. + */ +object OrikaUtils { + + def defaultConverters : Seq[Converter[_, _]] = ScalaConverters.scalaConverters ++ DateTimeConverters.dateTimeConverters + + def createDefaultMapperFactory() : MapperFactory = { + val factory = new DefaultMapperFactory.Builder().mapNulls(true).build() + registerDefaultConverters(factory) + } + + def registerDefaultConverters(factory: MapperFactory) : MapperFactory = { + defaultConverters.foreach(c => factory.getConverterFactory.registerConverter(c)) + factory + } +} diff --git a/src/main/scala/com/harrys/orika/ScalaConverters.scala b/src/main/scala/com/harrys/orika/ScalaConverters.scala new file mode 100644 index 0000000..97f6771 --- /dev/null +++ b/src/main/scala/com/harrys/orika/ScalaConverters.scala @@ -0,0 +1,96 @@ +package com.harrys.orika + +import java.util + +import ma.glasnost.orika.metadata.{Type, TypeBuilder} +import ma.glasnost.orika.{Converter, MapperFacade} + +import scala.collection.JavaConversions + +/** + * Created by chris on 10/14/15. + */ +object ScalaConverters { + def scalaConverters: Seq[Converter[_, _]] = Seq[Converter[_, _]]( + // Scala Seq[T] <-> java.util.List[T] + new ListToSeq, new SeqToList, + // Option[T] <-> T + new OptionToNullable, new NullableToOption + ) + + final class ListToSeq extends Converter[util.List[_], Seq[_]] { + val getAType: Type[util.List[_]] = new TypeBuilder[util.List[_]](){}.build() + val getBType: Type[Seq[_]] = new TypeBuilder[Seq[_]]() {}.build() + + private var mapperFacade: MapperFacade = null + override def setMapperFacade(mapper: MapperFacade): Unit = { + this.mapperFacade = mapper + } + + override def convert(source: util.List[_], destinationType: Type[_ <: Seq[_]]) : Seq[_] = { + val converted = mapperFacade.mapAsList(source, destinationType.getRawType) + if (converted == null) { null } else { JavaConversions.asScalaBuffer(converted).toSeq } + } + + override def canConvert(sourceType: Type[_], destinationType: Type[_]): Boolean = { + getAType.isAssignableFrom(sourceType) && getBType.isAssignableFrom(destinationType) + } + } + + final class SeqToList extends Converter[Seq[_], util.List[_]]{ + val getAType: Type[Seq[_]] = new TypeBuilder[Seq[_]](){}.build() + val getBType: Type[util.List[_]] = new TypeBuilder[util.List[_]](){}.build() + + private var mapperFacade: MapperFacade = null + override def setMapperFacade(mapper: MapperFacade): Unit = { + this.mapperFacade = mapper + } + + override def canConvert(sourceType: Type[_], destinationType: Type[_]): Boolean = { + getAType.isAssignableFrom(sourceType) && getBType.isAssignableFrom(destinationType) + } + + override def convert(source: Seq[_], destinationType: Type[_ <: util.List[_]]): util.List[_] = { + val iterable: java.lang.Iterable[_] = + if (source == null){ null } else { JavaConversions.asJavaIterable(source) } + mapperFacade.mapAsList(iterable, destinationType.getComponentType.getRawType) + } + } + + final class OptionToNullable extends Converter[Option[_], Any] { + override val getAType: Type[Option[_]] = new TypeBuilder[Option[_]](){}.build() + override val getBType: Type[Any] = new TypeBuilder[Any](){}.build() + + private var mapperFacade: MapperFacade = null + override def setMapperFacade(mapper: MapperFacade): Unit = { + this.mapperFacade = mapper + } + + override def canConvert(sourceType: Type[_], destinationType: Type[_]): Boolean = { + getAType.isAssignableFrom(sourceType) && destinationType.getActualTypeArguments.isEmpty + } + + override def convert(source: Option[_], destinationType: Type[_ <: Any]): Any = { + val value = source.getOrElse(null.asInstanceOf[Any]) + mapperFacade.map(value, destinationType.getRawType) + } + } + + final class NullableToOption extends Converter[Any, Option[_]] { + override val getAType: Type[Any] = new TypeBuilder[Any](){}.build() + override val getBType: Type[Option[_]] = new TypeBuilder[Option[_]](){}.build() + + private var mapperFacade: MapperFacade = null + override def setMapperFacade(mapper: MapperFacade): Unit = { + this.mapperFacade = mapper + } + + override def canConvert(sourceType: Type[_], destinationType: Type[_]): Boolean = { + getBType.isAssignableFrom(destinationType) && sourceType.getActualTypeArguments.isEmpty + } + + override def convert(source: Any, destinationType: Type[_ <: Option[_]]) : Option[_] = { + Option(mapperFacade.map(source, destinationType.getRawType)) + } + } +} diff --git a/src/main/scala/com/harrys/sql/PostgresJdbcParser.scala b/src/main/scala/com/harrys/sql/PostgresJdbcParser.scala new file mode 100644 index 0000000..1285d13 --- /dev/null +++ b/src/main/scala/com/harrys/sql/PostgresJdbcParser.scala @@ -0,0 +1,71 @@ +package com.harrys.sql + +import java.net.URI + +import org.postgresql.ds.PGSimpleDataSource + +/** + * Created by chris on 10/14/15. + */ +object PostgresJdbcParser { + final def parseJdbcUrlForDataSource(jdbcUrl: String) : PGSimpleDataSource = { + val output = new PGSimpleDataSource() + output.setUrl(removeJdbcCredentials(jdbcUrl)) + + extractJdbcCredentials(jdbcUrl).foreach({ + case (username, None) => + output.setUser(username) + case (username, Some(pass)) => + output.setUser(username) + output.setPassword(pass) + }) + + output + } + + + private final def extractJdbcCredentials(jdbcUrl: String) : Option[(String, Option[String])] = { + val parsed = URI.create(removeJdbcPrefix(jdbcUrl)) + if (parsed.getUserInfo == null){ + None + } else { + val pieces = parsed.getUserInfo.split(":", 2) + if (pieces.length == 1){ + Some((pieces(0), None)) + } else { + Some((pieces(0), Some(pieces(1)))) + } + } + } + + /** + * Strips the userinfo section of the URI leaving the rest of the connection info + * @param jdbcUrl The (potentially) sensitive JDBC URL to sanitize + * @return The sanitized URI without those credentials + */ + final def removeJdbcCredentials(jdbcUrl: String) : String = { + val uri = URI.create(removeJdbcPrefix(jdbcUrl)) + + val scheme = + if (uri.getScheme.equals("postgres")){ + "postgresql" + } else { + uri.getScheme + } + val user = null + val host = uri.getHost() + val port = uri.getPort() + val path = uri.getPath() + val query = uri.getQuery() + val frag = uri.getFragment() + + return "jdbc:" + new URI(scheme, user, host, port, path, query, frag).toString() + } + + /** + * Removes a leading jdbc: prefix to the URI, making it managable for the URI parser + * @param jdbcUrl The URL to remove the prefix from + * @return The rest of the JDBC URL string with the prefix removed + */ + private final def removeJdbcPrefix(jdbcUrl: String) : String = jdbcUrl.replaceFirst("^jdbc:", "") +} diff --git a/src/main/scala/com/harrys/util/Timer.scala b/src/main/scala/com/harrys/util/Timer.scala new file mode 100644 index 0000000..d4a70a4 --- /dev/null +++ b/src/main/scala/com/harrys/util/Timer.scala @@ -0,0 +1,28 @@ +package com.harrys.util + +import scala.concurrent.duration._ + +/** + * Created by chris on 10/14/15. + */ +object Timer { + def timeExecution(block: => Unit) : FiniteDuration = { + val start = System.nanoTime() + block + FiniteDuration(System.nanoTime() - start, NANOSECONDS) + } + + def timeWithResult[A](block: => A) : (FiniteDuration, A) = { + val start = System.nanoTime() + val result = block + val duration = FiniteDuration(System.nanoTime() - start, NANOSECONDS) + return (duration, result) + } + + def timeWithAttempt[A](block: => A) : (FiniteDuration, util.Try[A]) = { + val start = System.nanoTime() + val attempt = util.Try(block) + val duration = FiniteDuration(System.nanoTime() - start, NANOSECONDS) + return (duration, attempt) + } +}