-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Tearing out common functionality from Aftership integration and putti…
…ng into a reusable library
- Loading branch information
1 parent
3e428b8
commit cdca305
Showing
13 changed files
with
593 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,3 +15,6 @@ project/plugins/project/ | |
# Scala-IDE specific | ||
.scala_dependencies | ||
.worksheet | ||
|
||
|
||
.idea/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
name := """ingestion-util""" | ||
|
||
organization := "com.harrys" | ||
|
||
version := "0.0.1" | ||
|
||
scalaVersion := "2.11.7" | ||
|
||
exportJars := true | ||
|
||
libraryDependencies ++= Seq( | ||
"ch.qos.logback" % "logback-classic" % "1.1.3", | ||
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0", | ||
"commons-io" % "commons-io" % "2.4", | ||
"ma.glasnost.orika" % "orika-core" % "1.4.6", | ||
"org.apache.httpcomponents" % "httpclient" % "4.4.1", | ||
"org.json4s" %% "json4s-core" % "3.2.11", | ||
"org.json4s" %% "json4s-jackson" % "3.2.11", | ||
"org.json4s" %% "json4s-ext" % "3.2.11", | ||
"org.postgresql" % "postgresql" % "9.4-1201-jdbc41" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
sbt.version=0.13.7 |
103 changes: 103 additions & 0 deletions
103
src/main/scala/com/harrys/file/TransientFileFactory.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package com.harrys.file | ||
|
||
import java.io.File | ||
import java.nio.file.attribute.FileAttribute | ||
import java.nio.file.{Files, Path} | ||
import java.util.concurrent._ | ||
|
||
import com.typesafe.scalalogging.Logger | ||
import org.apache.commons.io.FileUtils | ||
import org.slf4j.LoggerFactory | ||
|
||
import scala.collection.mutable | ||
import scala.ref._ | ||
|
||
/** | ||
* Created by chris on 10/14/15. | ||
*/ | ||
class TransientFileFactory(directory: Path) { | ||
private val log = Logger(LoggerFactory.getLogger(this.getClass)) | ||
|
||
private val pendingRefFiles = new mutable.HashSet[Reference[File]]() | ||
private val phantomRefQueue = new ReferenceQueue[File]() | ||
private val cleanupExecutor = new ThreadPoolExecutor(0, 1, 10, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable](), new ThreadFactory { | ||
override def newThread(r: Runnable): Thread = { | ||
val thread = new Thread(r) | ||
thread.setDaemon(true) | ||
thread.setName("TransientFileCleanup") | ||
thread | ||
} | ||
}) | ||
|
||
final def create(prefix: String, suffix: String, attributes: FileAttribute[_]*) : File = { | ||
val tempFile = Files.createTempFile(directory, prefix, suffix, attributes:_*).toFile | ||
registerNewTempFile(tempFile) | ||
} | ||
|
||
final def shutdown() : Unit = { | ||
if (this == TransientFileFactory.default){ | ||
log.warn("Rejecting attempt to stop the default instance") | ||
} else { | ||
this.forceShutdown() | ||
} | ||
} | ||
|
||
private final def forceShutdown() : Unit = { | ||
cleanupExecutor.shutdown() | ||
pendingRefFiles.synchronized { | ||
pendingRefFiles.flatMap(_.get).foreach(_.deleteOnExit) | ||
pendingRefFiles.clear() | ||
} | ||
if (!cleanupExecutor.awaitTermination(50, TimeUnit.MILLISECONDS)){ | ||
log.warn("Forcing Executor shutdown") | ||
cleanupExecutor.shutdownNow() | ||
} | ||
} | ||
|
||
private final def registerNewTempFile(file: File) : File = { | ||
val phantomRef = new PhantomReference[File](file, phantomRefQueue) | ||
log.debug(s"Registered new Transient File: ${file.getAbsolutePath}") | ||
pendingRefFiles.synchronized { | ||
if (pendingRefFiles.isEmpty){ | ||
cleanupExecutor.execute(new CleanupPollingTask()) | ||
} | ||
pendingRefFiles.add(phantomRef) | ||
} | ||
return file | ||
} | ||
|
||
private final def cleanupRegisteredRef(ref: Reference[File]) : Unit = { | ||
pendingRefFiles.synchronized { | ||
pendingRefFiles.remove(ref) | ||
} | ||
ref.get.collect { | ||
case file if file.exists() => | ||
log.debug(s"Deleting Transient File: ${file.getAbsolutePath}") | ||
file.delete() | ||
} | ||
ref.clear() | ||
} | ||
|
||
private final class CleanupPollingTask extends Runnable { | ||
override final def run() : Unit = { | ||
while (!cleanupExecutor.isTerminating && pendingRefFiles.synchronized { pendingRefFiles.nonEmpty }){ | ||
phantomRefQueue.remove.foreach(cleanupRegisteredRef) | ||
} | ||
} | ||
} | ||
} | ||
|
||
object TransientFileFactory { | ||
|
||
lazy val default = { | ||
val factory = new TransientFileFactory(FileUtils.getTempDirectory.toPath) | ||
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable(){ | ||
def run() : Unit = factory.forceShutdown() | ||
})) | ||
factory | ||
} | ||
|
||
final def create(prefix: String, suffix: String, attributes: FileAttribute[_]*) : File = { | ||
default.create(prefix, suffix, attributes:_*) | ||
} | ||
} |
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package com.harrys.http | ||
|
||
import java.io._ | ||
|
||
import com.harrys.file.TransientFileFactory | ||
import com.harrys.util.Timer | ||
import com.typesafe.scalalogging.Logger | ||
import org.apache.commons.io.IOUtils | ||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpUriRequest} | ||
import org.apache.http.impl.client.CloseableHttpClient | ||
import org.apache.http.util.EntityUtils | ||
import org.json4s.jackson.JsonMethods | ||
import org.json4s.{Formats, JValue} | ||
import org.slf4j.LoggerFactory | ||
|
||
/** | ||
* Created by chris on 10/14/15. | ||
*/ | ||
abstract class AbstractHttpClient { | ||
private val log = Logger(LoggerFactory.getLogger(this.getClass)) | ||
|
||
protected def createHttpClient() : CloseableHttpClient | ||
|
||
final def performRequestForJson(request: HttpUriRequest)(implicit formats: Formats) : (HttpResponseSummary, JValue) = { | ||
this.performRequest(request) { response => | ||
val summary = new HttpResponseSummary(response) | ||
(summary, JsonMethods.parse(response.getEntity.getContent)) | ||
} | ||
} | ||
|
||
final def performRequestToBuffer(request: HttpUriRequest) : (HttpResponseSummary, Array[Byte]) = { | ||
this.performRequest(request) { response => | ||
response.getEntity.getContentEncoding | ||
val summary = new HttpResponseSummary(response) | ||
(summary, EntityUtils.toByteArray(response.getEntity)) | ||
} | ||
} | ||
|
||
final def performRequestToLocalFile(request: HttpUriRequest) : (HttpResponseSummary, File) = { | ||
this.performRequest(request) { response => | ||
response.getEntity.getContentType | ||
val summary = new HttpResponseSummary(response) | ||
(summary, copyResponseToFile(response)) | ||
} | ||
} | ||
|
||
private final def copyResponseToFile(response: CloseableHttpResponse) : File = { | ||
val tempFile = TransientFileFactory.create("httpraw", "response") | ||
val fileStream = new FileOutputStream(tempFile) | ||
try { | ||
response.getEntity.writeTo(fileStream) | ||
return tempFile | ||
} finally { | ||
IOUtils.closeQuietly(fileStream) | ||
} | ||
} | ||
|
||
|
||
final def performRequest[T](request: HttpUriRequest)(handler: (CloseableHttpResponse) => T) : T = { | ||
val client = this.createHttpClient() | ||
|
||
try { | ||
val (duration, httpResult) = Timer.timeWithAttempt { client.execute(request) } | ||
|
||
httpResult match { | ||
case util.Failure(cause) => | ||
log.error(s"[${ request.getMethod }] ${ request.getURI.getPath }(${ duration.toMillis.toString })", cause) | ||
throw cause | ||
case util.Success(response) => | ||
log.debug(s"${} [${ request.getMethod }] ${ response.getStatusLine.getStatusCode } ${ request.getURI.getPath } (${ duration.toMillis.toString })") | ||
try { | ||
handler(response) | ||
} finally { | ||
IOUtils.closeQuietly(response) | ||
} | ||
} | ||
} finally { | ||
IOUtils.closeQuietly(client) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package com.harrys.http | ||
|
||
import org.apache.http.client.config.RequestConfig | ||
import org.apache.http.impl.client.{HttpClientBuilder, HttpClients} | ||
|
||
import scala.concurrent.duration._ | ||
|
||
/** | ||
* Created by chris on 10/14/15. | ||
*/ | ||
object HttpClientDefaults { | ||
val ConnectTimeout = FiniteDuration(10, SECONDS) | ||
val SocketTimeout = FiniteDuration(30, SECONDS) | ||
|
||
def defaultRequestConfig : RequestConfig.Builder = RequestConfig.copy(RequestConfig.DEFAULT) | ||
.setConnectTimeout(ConnectTimeout.toMillis.toInt) | ||
.setSocketTimeout(SocketTimeout.toMillis.toInt) | ||
|
||
|
||
def defaultHttpClientBuilder : HttpClientBuilder = HttpClients.custom() | ||
.setDefaultRequestConfig(defaultRequestConfig.build()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package com.harrys.http | ||
|
||
import org.apache.http.{Header, HttpResponse, StatusLine} | ||
|
||
/** | ||
* Created by chris on 10/14/15. | ||
*/ | ||
class HttpResponseSummary(response: HttpResponse) { | ||
|
||
def statusLine: StatusLine = response.getStatusLine | ||
|
||
def statusCode: Int = statusLine.getStatusCode | ||
|
||
def reasonPhrase: String = statusLine.getReasonPhrase | ||
|
||
def contentType: Header = response.getEntity.getContentType | ||
|
||
def contentLength: Long = response.getEntity.getContentLength | ||
|
||
def contentEncoding: Header = response.getEntity.getContentEncoding | ||
|
||
def allHeaders: Seq[Header] = response.getAllHeaders | ||
} |
121 changes: 121 additions & 0 deletions
121
src/main/scala/com/harrys/orika/DateTimeConverters.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package com.harrys.orika | ||
|
||
import java.sql | ||
import java.sql.Timestamp | ||
import java.util.Date | ||
|
||
import ma.glasnost.orika.converter.builtin.PassThroughConverter | ||
import ma.glasnost.orika.metadata.Type | ||
import ma.glasnost.orika.{Converter, CustomConverter} | ||
import org.joda.time.{DateTime, DateTimeZone, LocalDate} | ||
|
||
/** | ||
* Created by chris on 10/14/15. | ||
*/ | ||
object DateTimeConverters { | ||
def dateTimeConverters: Seq[Converter[_, _]] = Seq[Converter[_, _]]( | ||
// JodaTime literal definitions | ||
new PassThroughConverter(classOf[DateTime], classOf[LocalDate]), | ||
// DateTime <-> Unix Epoch | ||
new DateTimeToLong, new LongToDateTime, | ||
// DateTime <-> java.util.Date | ||
new DateTimeToDate, new DateToDateTime, | ||
// DateTime <-> java.sql.Timestamp | ||
new DateTimeToTimestamp, new TimestampToDateTime, | ||
// LocalDate <-> java.sql.Date | ||
new LocalDateToSqlDate, new SqlDateToLocalDate, | ||
// DateTime -> LocalDate (unidirectional) | ||
new DateTimeToLocalDate | ||
) | ||
|
||
final class SqlDateToLocalDate extends CustomConverter[sql.Date, LocalDate]{ | ||
override def convert(source: sql.Date, destinationType: Type[_ <: LocalDate]): LocalDate = { | ||
if (source == null){ | ||
null | ||
} else { | ||
new LocalDate(source.getTime, DateTimeZone.UTC) | ||
} | ||
} | ||
} | ||
|
||
|
||
final class LocalDateToSqlDate extends CustomConverter[LocalDate, sql.Date]{ | ||
override def convert(source: LocalDate, destinationType: Type[_ <: sql.Date]): sql.Date = { | ||
if (source == null){ | ||
null | ||
} else { | ||
new sql.Date(source.toDate.getTime) | ||
} | ||
} | ||
} | ||
|
||
final class DateTimeToTimestamp extends CustomConverter[DateTime, Timestamp]{ | ||
override def convert(source: DateTime, destinationType: Type[_ <: Timestamp]): Timestamp = { | ||
if (source == null){ | ||
null | ||
} else { | ||
new Timestamp(source.toDateTime(DateTimeZone.UTC).getMillis) | ||
} | ||
} | ||
} | ||
|
||
final class TimestampToDateTime extends CustomConverter[Timestamp, DateTime]{ | ||
override def convert(source: Timestamp, destinationType: Type[_ <: DateTime]): DateTime = { | ||
if (source == null){ | ||
null | ||
} else { | ||
new DateTime(source.getTime, DateTimeZone.UTC) | ||
} | ||
} | ||
} | ||
|
||
final class DateTimeToDate extends CustomConverter[DateTime, Date]{ | ||
override def convert(source: DateTime, destinationType: Type[_ <: Date]) : Date = { | ||
if (source == null){ | ||
null | ||
} else { | ||
source.toDateTime(DateTimeZone.UTC).toDate | ||
} | ||
} | ||
} | ||
|
||
final class DateToDateTime extends CustomConverter[Date, DateTime]{ | ||
override def convert(source: Date, destinationType: Type[_ <: DateTime]): DateTime = { | ||
if (source == null){ | ||
null | ||
} else { | ||
new DateTime(source.getTime, DateTimeZone.UTC) | ||
} | ||
} | ||
} | ||
|
||
final class DateTimeToLocalDate extends CustomConverter[DateTime, LocalDate]{ | ||
override def convert(source: DateTime, destinationType: Type[_ <: LocalDate]) : LocalDate = { | ||
if (source == null){ | ||
null | ||
} else { | ||
source.toDateTime(DateTimeZone.UTC).toLocalDate | ||
} | ||
} | ||
} | ||
|
||
final class LongToDateTime extends CustomConverter[java.lang.Long, DateTime] { | ||
override def convert(source: java.lang.Long, destinationType: Type[_ <: DateTime]): DateTime = { | ||
if (source == null){ | ||
null | ||
} else { | ||
new DateTime(source, DateTimeZone.UTC) | ||
} | ||
} | ||
} | ||
|
||
final class DateTimeToLong extends CustomConverter[DateTime, java.lang.Long] { | ||
override def convert(source: DateTime, destinationType: Type[_ <: java.lang.Long]): java.lang.Long = { | ||
if (source == null){ | ||
null.asInstanceOf[java.lang.Long] | ||
} else { | ||
source.toDateTime(DateTimeZone.UTC).getMillis.asInstanceOf[java.lang.Long] | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.