Skip to content
This repository has been archived by the owner on Jul 7, 2021. It is now read-only.

Commit

Permalink
Merge pull request #32 from rodneykinney/step-names
Browse files Browse the repository at this point in the history
Step names
  • Loading branch information
rodneykinney committed Jun 11, 2015
2 parents 53bc10a + 09a4dad commit ddd5f4c
Show file tree
Hide file tree
Showing 39 changed files with 1,849 additions and 1,102 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
.settings
bin/
target/
logs/
pipeline-output/
551 changes: 175 additions & 376 deletions README.md

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ val pipeline = Project(
base = file(".")
)

StylePlugin.enableLineLimit := false

organization := "org.allenai"
crossScalaVersions := Seq("2.11.5")
scalaVersion <<= crossScalaVersions { (vs: Seq[String]) => vs.head }
Expand All @@ -33,12 +35,11 @@ enablePlugins(LibraryPlugin)
PublishTo.ai2Public

dependencyOverrides += "org.scala-lang" % "scala-reflect" % "2.11.5"

libraryDependencies ++= Seq(
sprayJson,
awsJavaSdk,
commonsIO,
ai2Common,
allenAiTestkit % "test",
scalaReflection
scalaReflection,
awsJavaSdk
)
5 changes: 2 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import org.allenai.plugins.CoreDependencies

/** Object holding the dependencies Common has, plus resolvers and overrides. */
object Dependencies extends CoreDependencies {
val scalaReflection = "org.scala-lang" % "scala-reflect" % "2.11.5"
val awsJavaSdk = "com.amazonaws" % "aws-java-sdk" % "1.8.9.1"
val scalaReflection = "org.scala-lang" % "scala-reflect" % "2.11.5"
val commonsIO = "commons-io" % "commons-io" % "2.4"

val ai2Common = allenAiCommon exclude ("org.allenai", "pipeline")
}
}
22 changes: 22 additions & 0 deletions spark/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<!--
Logback configuration for unit tests
Logging level ERROR
Don't log to stdout
-->
<configuration>
<!-- Appender to a file named based on the application name. -->
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>logs/unit-tests.log</file>
<encoder>
<pattern>%-5level %logger{36} [%d{HH:mm:ss.SSS}][%thread]: %msg%n</pattern>
</encoder>
</appender>

<root level="${logback_rootLevel:-ERROR}">
<appender-ref ref="FILE" />
</root>

<logger name="org.allenai" level="${logback_s2Level:-DEBUG}" />
</configuration>
55 changes: 24 additions & 31 deletions src/main/resources/org/allenai/pipeline/pipelineSummary.html
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@
padding-top: 10px;
}

.node rect {
.node .data {
border-top: 1px solid #2a75a1;
padding-top: 10px;
}

.node rect {
stroke: #2a75a1;
stroke-width: 2px;
fill: #e2f0f8;
Expand Down Expand Up @@ -275,7 +280,12 @@
fill: #C38888;
}

#outputContainer {
.executionInfo {
font-size: 20%%;
float: right;
}

#outputContainer {
position: absolute;
top: 41px;
left: 41px;
Expand Down Expand Up @@ -313,7 +323,7 @@ <h2>Outputs</h2>
%s
</div>
<script src="http://d3js.org/d3.v3.min.js"></script>
<script src="http://cpettitt.github.io/project/dagre-d3/v0.4.2/dagre-d3.min.js"></script>
<script src="http://cpettitt.github.io/project/dagre-d3/v0.4.6/dagre-d3.min.js"></script>
<script>
(function() {
/**
Expand All @@ -337,42 +347,19 @@ <h2>Outputs</h2>
*
* @return {string} The HTML for the step contents.
*/
function generateStepContent(label, description, millis, data, links) {
function generateStepContent(label, description, execInfo, data, links) {
var out = "<h2>" + label + "</h2>";
if(millis) {
var seconds = 1000;
var minutes = seconds * 60;
var hours = minutes * 60;
var days = hours * 24;
if (millis / days > 1) {
out += (millis / days).toFixed(2) + " days";
}
else if (millis / hours > 1) {
out += (millis / hours).toFixed(2) + " hours";
}
else if (millis / minutes > 1) {
out += (millis / minutes).toFixed(2) + " minutes";
}
else if (millis / seconds > 1) {
out += (millis / seconds).toFixed(2) + " seconds";
}
else {
out += millis.toFixed(0) + " millis";
}
}
else {
out += "cached";
}
if(description) {
out += "<p>" + description + "</p>";
}
if(data && Array.isArray(data)) {
if(data && Array.isArray(data) && data.length > 0) {
out += '<ul class="data">';
data.forEach(function(d) {
out += '<li>' + d + '</li>';
});
out += '</ul>';
if(links) {
}
if(links && links.length > 0) {
out += '<ul class="links">';
links.forEach(function(l) {
if(l instanceof Link) {
Expand All @@ -381,7 +368,7 @@ <h2>Outputs</h2>
});
out += '</ul>';
}
}
out += '<span class="executionInfo">' + execInfo + '</span>';
return out;
};

Expand All @@ -395,6 +382,12 @@ <h2>Outputs</h2>
// our nodes.
%s

// Round the corners of the nodes
g.nodes().forEach(function(v) {
var node = g.node(v);
node.rx = node.ry = 10;
});

// Add edges to the graph. The first argument is the edge id. Here we use null
// to indicate that an arbitrary edge id can be assigned automatically. The
// second argument is the source of the edge. The third argument is the target
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/org/allenai/pipeline/Artifact.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ trait StructuredArtifact extends Artifact {
*/
class ArtifactStreamWriter(out: OutputStream) {
def write(data: Array[Byte]): Unit = {
out.write(data, 0, data.size)
out.write(data, 0, data.length)
}

def write(data: Array[Byte], offset: Int, size: Int): Unit = {
Expand Down
184 changes: 104 additions & 80 deletions src/main/scala/org/allenai/pipeline/ArtifactFactory.scala
Original file line number Diff line number Diff line change
@@ -1,102 +1,126 @@
package org.allenai.pipeline

import scala.reflect.ClassTag

import java.io.File
import java.net.URI

/** Factory interface for creating flat Artifact instances. */
trait FlatArtifactFactory[T] {
def flatArtifact(input: T): FlatArtifact
}

/** Factory interface for creating structured Artifact instances. */
trait StructuredArtifactFactory[T] {
def structuredArtifact(input: T): StructuredArtifact
}

trait ArtifactFactory[T] extends FlatArtifactFactory[T] with StructuredArtifactFactory[T]
/** Creates an Artifact from a URL
*/
trait ArtifactFactory {
/** @param url The location of the Artifact. The scheme (protocol) is used to determine the
* specific implementation.
* @tparam A The type of the Artifact to create. May be an abstract or concrete type
* @return The artifact
*/
def createArtifact[A <: Artifact: ClassTag](url: URI): A

object ArtifactFactory {
def fromUrl(outputUrl: URI): ArtifactFactory[String] = {
outputUrl match {
case url if url.getScheme == "s3" || url.getScheme == "s3n" =>
new S3(S3Config(url.getHost), Some(url.getPath))
case url if url.getScheme == "file" || url.getScheme == null =>
new RelativeFileSystem(new File(url.getPath))
case _ => sys.error(s"Illegal dir: $outputUrl")
}
}
def flatArtifactFromAbsoluteUrl(s: String): Option[FlatArtifact] = {
val url = new URI(s)
url.getScheme() match {
case "file" =>
Some(new FileArtifact(new File(url.getPath)))
case "s3" | "s3n" =>
Some(new S3FlatArtifact(url.getPath.dropWhile(_ == '/'), S3Config(url.getHost)))
case _ => None
}
}
def structuredArtifactFromAbsoluteUrl(s: String): Option[StructuredArtifact] = {
val url = new URI(s)
url.getScheme() match {
case "file" if s.endsWith(".zip") =>
Some(new ZipFileArtifact(new File(url.getPath)))
case "file" =>
Some(new DirectoryArtifact(new File(url.getPath)))
case "s3" | "s3n" =>
Some(new S3ZipArtifact(url.getPath.dropWhile(_ == '/'), S3Config(url.getHost)))
case _ => None
/** If path is an absolute URL, create an Artifact at that location.
* If it is a relative path, create it relative to the given root URL
*/
def createArtifact[A <: Artifact: ClassTag](rootUrl: URI, path: String): A = {
val parsed = new URI(path)
val url = parsed.getScheme match {
case null =>
val fullPath = s"${rootUrl.getPath.reverse.dropWhile(_ == '/').reverse}/${parsed.getPath.dropWhile(_ == '/')}"
new URI(
rootUrl.getScheme,
rootUrl.getHost,
fullPath,
rootUrl.getFragment
)
case _ => parsed
}
createArtifact[A](url)
}
}

class RelativeFileSystem(rootDir: File)
extends ArtifactFactory[String] {
private def toFile(path: String): File = new File(rootDir, path)

override def flatArtifact(name: String): FlatArtifact = new FileArtifact(toFile(name))
object ArtifactFactory {
def apply(urlHandler: UrlToArtifact, fallbackUrlHandlers: UrlToArtifact*): ArtifactFactory =
new ArtifactFactory {
val urlHandlerChain =
if (fallbackUrlHandlers.isEmpty) {
urlHandler
} else {
UrlToArtifact.chain(urlHandler, fallbackUrlHandlers.head, fallbackUrlHandlers.tail: _*)
}

override def structuredArtifact(name: String): StructuredArtifact = {
val file = toFile(name)
if (file.exists && file.isDirectory) {
new DirectoryArtifact(file)
} else {
new ZipFileArtifact(file)
def createArtifact[A <: Artifact: ClassTag](url: URI): A = {
val fn = urlHandlerChain.urlToArtifact[A]
val clazz = implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]]
require(fn.isDefinedAt(url), s"Cannot create $clazz from $url")
fn(url)
}
}
}

}

object AbsoluteFileSystem extends ArtifactFactory[File] {
override def flatArtifact(file: File): FlatArtifact = new FileArtifact(file)
/** Supports creation of a particular type of Artifact from a URL.
* Allows chaining together of different implementations that recognize different input URLs
* and support creation of different Artifact types
*/
trait UrlToArtifact {
/** Return a PartialFunction indicating whether the given Artifact type can be created from an input URL
* @tparam A The Artifact type to be created
* @return A PartialFunction where isDefined will return true if an Artifact of type A can
* be created from the given URL
*/
def urlToArtifact[A <: Artifact: ClassTag]: PartialFunction[URI, A]
}

override def structuredArtifact(file: File): StructuredArtifact = {
if (file.exists && file.isDirectory) {
new DirectoryArtifact(file)
} else {
new ZipFileArtifact(file)
object UrlToArtifact {
// Chain together a series of UrlToArtifact instances
// The result will be a UrlToArtifact that supports creation of the union of Artifact types and input URLs
// that are supported by the individual inputs
def chain(first: UrlToArtifact, second: UrlToArtifact, others: UrlToArtifact*) =
new UrlToArtifact {
override def urlToArtifact[A <: Artifact: ClassTag]: PartialFunction[URI, A] = {
var fn = first.urlToArtifact[A] orElse second.urlToArtifact[A]
for (o <- others) {
fn = fn orElse o.urlToArtifact[A]
}
fn
}
}
}

def usingPaths: ArtifactFactory[String] =
new ArtifactFactory[String] {
override def flatArtifact(path: String): FlatArtifact =
AbsoluteFileSystem.flatArtifact(new File(path))
object Empty extends UrlToArtifact {
def urlToArtifact[A <: Artifact: ClassTag]: PartialFunction[URI, A] =
PartialFunction.empty[URI, A]
}

override def structuredArtifact(path: String): StructuredArtifact =
AbsoluteFileSystem.structuredArtifact(new File(path))
}
}

class S3(config: S3Config, rootPath: Option[String] = None)
extends ArtifactFactory[String] {
// Drop leading and training slashes
private def toPath(path: String): String = rootPath match {
case None => path
case Some(dir) =>
val base = dir.dropWhile(_ == '/').reverse.dropWhile(_ == '/').reverse
s"$base/$path"
object CreateCoreArtifacts {
// Create a FlatArtifact or StructuredArtifact from an absolute file:// URL
val fromFileUrls: UrlToArtifact = new UrlToArtifact {
def urlToArtifact[A <: Artifact: ClassTag]: PartialFunction[URI, A] = {
val c = implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]]
val fn: PartialFunction[URI, A] = {
case url if c.isAssignableFrom(classOf[FileArtifact])
&& "file" == url.getScheme =>
new FileArtifact(new File(url)).asInstanceOf[A]
case url if c.isAssignableFrom(classOf[FileArtifact])
&& null == url.getScheme =>
new FileArtifact(new File(url.getPath)).asInstanceOf[A]
case url if c.isAssignableFrom(classOf[DirectoryArtifact])
&& "file" == url.getScheme
&& new File(url).exists
&& new File(url).isDirectory =>
new DirectoryArtifact(new File(url)).asInstanceOf[A]
case url if c.isAssignableFrom(classOf[DirectoryArtifact])
&& null == url.getScheme
&& new File(url.getPath).exists
&& new File(url.getPath).isDirectory =>
new DirectoryArtifact(new File(url.getPath)).asInstanceOf[A]
case url if c.isAssignableFrom(classOf[ZipFileArtifact])
&& "file" == url.getScheme =>
new ZipFileArtifact(new File(url)).asInstanceOf[A]
case url if c.isAssignableFrom(classOf[ZipFileArtifact])
&& null == url.getScheme =>
new ZipFileArtifact(new File(url.getPath)).asInstanceOf[A]
}
fn
}
}

override def flatArtifact(path: String): FlatArtifact = new S3FlatArtifact(toPath(path), config)

override def structuredArtifact(path: String): StructuredArtifact = new S3ZipArtifact(toPath(path), config)
}

Loading

0 comments on commit ddd5f4c

Please sign in to comment.