Skip to content
This repository has been archived by the owner on Jul 7, 2021. It is now read-only.

Named steps and other goodies #31

Closed
wants to merge 84 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
89a1a1b
Setting version to 1.2.1
rodneykinney May 12, 2015
1de79fa
Setting version to 1.2.2-SNAPSHOT
rodneykinney May 12, 2015
aa744e1
Add tmpOutput option to runOne()
rodneykinney May 13, 2015
57bf299
Merge branch 'master' of https://github.com/allenai/pipeline
rodneykinney May 13, 2015
58fc23f
Change type of output override in runOne()
rodneykinney May 13, 2015
755baad
Change runOne() signature
rodneykinney May 13, 2015
9d76549
Cleanup
rodneykinney May 13, 2015
a72721c
Refactor ArtifactFactory
rodneykinney May 14, 2015
2bad5f5
Cleanup
rodneykinney May 14, 2015
3b7e599
Refactor ArtifactFactory
rodneykinney May 14, 2015
fe708de
Refactory ArtifactFactory
rodneykinney May 15, 2015
bb8ba4b
Add S3Config factory method
rodneykinney May 15, 2015
6f82a27
ExecuteShellCommand
rodneykinney May 15, 2015
1a6d2e2
Refactor ExecuteShellCommand
rodneykinney May 15, 2015
6525dfe
Refactor ExecuteShellCommand
rodneykinney May 15, 2015
d61fdc5
Merge branch 'master' into artifact-factory
rodneykinney May 15, 2015
f6606ca
Add implicit conversions, BasicPipeline example
rodneykinney May 16, 2015
9716ef9
Remove path parameter from persist(). Update README
rodneykinney May 17, 2015
730cf08
Improve HTML
rodneykinney May 17, 2015
98243e0
Reformat
rodneykinney May 17, 2015
aa92617
ExecutionInfo
rodneykinney May 17, 2015
aa228ce
Rename ExecuteShellCommand -> ExternalProcess
rodneykinney May 17, 2015
023c157
TrainModel example pipeline
rodneykinney May 17, 2015
0d16b79
Add python-based model-training example pipeline
rodneykinney May 18, 2015
b72a392
Reorganize ExternalProcess
rodneykinney May 18, 2015
7caa3a0
Rename Serializer/Deserializer. Update README
rodneykinney May 18, 2015
5a985fd
Merge branch 'master' of https://github.com/allenai/pipeline
rodneykinney May 19, 2015
6ade036
Rounded corners
rodneykinney May 19, 2015
1c7f08e
Default AWS credentials
rodneykinney May 21, 2015
518d533
Merge branch 'master' of https://github.com/allenai/pipeline into art…
rodneykinney May 21, 2015
88a6fe9
Merge branch 'master' of https://github.com/rodneykinney/pipeline int…
rodneykinney May 21, 2015
8adfc14
Reformat
rodneykinney May 21, 2015
6b6dbe6
Merge branch 'master' into artifact-factory
rodneykinney May 21, 2015
d8ef83b
Refactor ArtifactFactory
rodneykinney May 22, 2015
3685238
Refactor ArtifactFactory
rodneykinney May 22, 2015
52e2d56
Rename, add comments
rodneykinney May 22, 2015
1b66768
Clean up warnings
rodneykinney May 22, 2015
cfd8422
If ExternalProcess has OutputFileToken, check that it actually writes…
jefeweisen May 22, 2015
ad1abce
Create core/s3 sub-projects
rodneykinney May 22, 2015
71da784
Move S3 classes to s3 project
rodneykinney May 23, 2015
949a309
rename ExternalProcess.apply to ExternalProcess.a
jefeweisen May 23, 2015
031e3a2
comments
jefeweisen May 23, 2015
28ea81e
Move ExternalProcess.a to RunExternalProcess.a
jefeweisen May 23, 2015
dcde424
Moves:
jefeweisen May 23, 2015
8a4043d
Make default constructor of RunExternalProcess private to conceal kno…
jefeweisen May 23, 2015
a69651d
Rename RunExternalProcess.a -> RunExternalProcess.apply
jefeweisen May 23, 2015
27e3fbd
Fix oops: forgot to add these files
jefeweisen May 23, 2015
fd14085
Implement versionHistory on RunExternalProcess
jefeweisen May 23, 2015
24d3f40
Add VersionedResource
rodneykinney May 24, 2015
7443f59
Simplify artifact creation
rodneykinney May 24, 2015
0f4999f
PartitionedRddArtifact and Io
rodneykinney May 24, 2015
2b11617
Rdd Persistence working in unit tests
rodneykinney May 24, 2015
e6acdb3
Reformat
rodneykinney May 24, 2015
915a3a1
Cleanup
rodneykinney May 24, 2015
ca2aa49
Rdd Object initialization and sample pipeline
rodneykinney May 25, 2015
b3d0aea
Clean up style, logging, format
rodneykinney May 26, 2015
36881f8
Add build.sbt
rodneykinney May 26, 2015
4b5ed61
Update .gitignore
rodneykinney May 26, 2015
591e5b6
Mix-in traits to define urlToArtifacte
rodneykinney May 26, 2015
3e489a6
Clean up Pipeline factory methods
rodneykinney May 26, 2015
79d9729
Merge pull request #2 from jefeweisen/check_for_external_process_output
rodneykinney May 27, 2015
a81e174
Merge pull request #3 from jefeweisen/external_process_abstraction
rodneykinney May 27, 2015
6d784f3
Merge pull request #4 from jefeweisen/external_process_versionHistory
rodneykinney May 27, 2015
74af917
Merge branch 'master' of https://github.com/rodneykinney/pipeline int…
rodneykinney May 27, 2015
4fac31a
Fix test
rodneykinney May 27, 2015
50005fd
Make Pipeline a trait after all.
rodneykinney May 28, 2015
557d780
Resolve relative paths within ArtifactFactory
rodneykinney May 28, 2015
412a550
Add Pipeline.createOutputArtifact
rodneykinney May 28, 2015
f93ea64
Name steps in Pipeline.persist()
rodneykinney May 28, 2015
cbd8c74
Bump version
rodneykinney May 28, 2015
23abeb9
autoGeneratedPath => hashId
rodneykinney May 29, 2015
799f09c
Handle empty RDD on read
rodneykinney May 29, 2015
6f47a3c
Fix tests
rodneykinney May 29, 2015
59ab7ff
Remove type param from PartitionedRddArtifact
rodneykinney May 29, 2015
af898e6
Fix CreateRddArtifacts
rodneykinney May 29, 2015
d39cf00
Fix step name resolution
rodneykinney May 29, 2015
bce37c2
Remove race condition
rodneykinney May 29, 2015
9a601b7
LineCollectionIo className
rodneykinney May 29, 2015
c001a2a
Disable caching for persisted RDDs
rodneykinney Jun 1, 2015
b90884d
Deprecate Producer.persist()
rodneykinney Jun 1, 2015
efef9f2
Add runUntil. Custom wrapper for S3 credentials
rodneykinney Jun 2, 2015
ccd4a7d
Pipeline.persistCustom
rodneykinney Jun 2, 2015
e63ea47
Merge branch 'master' of https://github.com/allenai/pipeline
rodneykinney Jun 2, 2015
cc5267d
Fix persistRdd, fix addTarget
rodneykinney Jun 2, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
.settings
bin/
target/
logs/
pipeline-output/
551 changes: 175 additions & 376 deletions README.md

Large diffs are not rendered by default.

25 changes: 13 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@ import Dependencies._

import ReleaseKeys._

val pipeline = Project(
id = "allenai-pipeline",
base = file(".")
val core = Project(
id = "core",
base = file("core")
)

val s3 = Project(
id = "s3",
base = file("s3")
).dependsOn(core)

val spark = Project(
id = "spark",
base = file("spark")
).dependsOn(core, s3)

organization := "org.allenai"
crossScalaVersions := Seq("2.11.5")
scalaVersion <<= crossScalaVersions { (vs: Seq[String]) => vs.head }
Expand All @@ -33,12 +43,3 @@ enablePlugins(LibraryPlugin)
PublishTo.ai2Public

dependencyOverrides += "org.scala-lang" % "scala-reflect" % "2.11.5"

libraryDependencies ++= Seq(
sprayJson,
awsJavaSdk,
commonsIO,
ai2Common,
allenAiTestkit % "test",
scalaReflection
)
15 changes: 15 additions & 0 deletions core/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import Dependencies._

name := "pipeline-core"
organization := "org.allenai"

StylePlugin.enableLineLimit := false

dependencyOverrides += "org.scala-lang" % "scala-reflect" % "2.11.5"
libraryDependencies ++= Seq(
sprayJson,
commonsIO,
ai2Common,
allenAiTestkit % "test",
scalaReflection
)
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@
padding-top: 10px;
}

.node rect {
.node .data {
border-top: 1px solid #2a75a1;
padding-top: 10px;
}

.node rect {
stroke: #2a75a1;
stroke-width: 2px;
fill: #e2f0f8;
Expand Down Expand Up @@ -275,7 +280,12 @@
fill: #C38888;
}

#outputContainer {
.executionInfo {
font-size: 20%%;
float: right;
}

#outputContainer {
position: absolute;
top: 41px;
left: 41px;
Expand Down Expand Up @@ -313,7 +323,7 @@ <h2>Outputs</h2>
%s
</div>
<script src="http://d3js.org/d3.v3.min.js"></script>
<script src="http://cpettitt.github.io/project/dagre-d3/v0.4.2/dagre-d3.min.js"></script>
<script src="http://cpettitt.github.io/project/dagre-d3/v0.4.6/dagre-d3.min.js"></script>
<script>
(function() {
/**
Expand All @@ -337,42 +347,19 @@ <h2>Outputs</h2>
*
* @return {string} The HTML for the step contents.
*/
function generateStepContent(label, description, millis, data, links) {
function generateStepContent(label, description, execInfo, data, links) {
var out = "<h2>" + label + "</h2>";
if(millis) {
var seconds = 1000;
var minutes = seconds * 60;
var hours = minutes * 60;
var days = hours * 24;
if (millis / days > 1) {
out += (millis / days).toFixed(2) + " days";
}
else if (millis / hours > 1) {
out += (millis / hours).toFixed(2) + " hours";
}
else if (millis / minutes > 1) {
out += (millis / minutes).toFixed(2) + " minutes";
}
else if (millis / seconds > 1) {
out += (millis / seconds).toFixed(2) + " seconds";
}
else {
out += millis.toFixed(0) + " millis";
}
}
else {
out += "cached";
}
if(description) {
out += "<p>" + description + "</p>";
}
if(data && Array.isArray(data)) {
if(data && Array.isArray(data) && data.length > 0) {
out += '<ul class="data">';
data.forEach(function(d) {
out += '<li>' + d + '</li>';
});
out += '</ul>';
if(links) {
}
if(links && links.length > 0) {
out += '<ul class="links">';
links.forEach(function(l) {
if(l instanceof Link) {
Expand All @@ -381,7 +368,7 @@ <h2>Outputs</h2>
});
out += '</ul>';
}
}
out += '<span class="executionInfo">' + execInfo + '</span>';
return out;
};

Expand All @@ -395,6 +382,12 @@ <h2>Outputs</h2>
// our nodes.
%s

// Round the corners of the nodes
g.nodes().forEach(function(v) {
var node = g.node(v);
node.rx = node.ry = 10;
});

// Add edges to the graph. The first argument is the edge id. Here we use null
// to indicate that an arbitrary edge id can be assigned automatically. The
// second argument is the source of the edge. The third argument is the target
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ trait StructuredArtifact extends Artifact {
*/
class ArtifactStreamWriter(out: OutputStream) {
def write(data: Array[Byte]): Unit = {
out.write(data, 0, data.size)
out.write(data, 0, data.length)
}

def write(data: Array[Byte], offset: Int, size: Int): Unit = {
Expand Down
126 changes: 126 additions & 0 deletions core/src/main/scala/org/allenai/pipeline/ArtifactFactory.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.allenai.pipeline

import scala.reflect.ClassTag

import java.io.File
import java.net.URI

/** Creates an Artifact from a URL
*/
trait ArtifactFactory {
/** @param url The location of the Artifact. The scheme (protocol) is used to determine the
* specific implementation.
* @tparam A The type of the Artifact to create. May be an abstract or concrete type
* @return The artifact
*/
def createArtifact[A <: Artifact: ClassTag](url: URI): A
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an extensible API for creating Artifacts from URLs. Previously, the Artifact class had a url method, but there was no way to go in the other direction.


/** If path is an absolute URL, create an Artifact at that location.
* If it is a relative path, create it relative to the given root URL
*/
def createArtifact[A <: Artifact: ClassTag](rootUrl: URI, path: String): A = {
val parsed = new URI(path)
val url = parsed.getScheme match {
case null =>
val fullPath = s"${rootUrl.getPath.reverse.dropWhile(_ == '/').reverse}/${parsed.getPath.dropWhile(_ == '/')}"
new URI(
rootUrl.getScheme,
rootUrl.getHost,
fullPath,
rootUrl.getFragment
)
case _ => parsed
}
createArtifact[A](url)
}
}

object ArtifactFactory {
def apply(urlHandler: UrlToArtifact, fallbackUrlHandlers: UrlToArtifact*): ArtifactFactory =
new ArtifactFactory {
val urlHandlerChain =
if (fallbackUrlHandlers.isEmpty) {
urlHandler
} else {
UrlToArtifact.chain(urlHandler, fallbackUrlHandlers.head, fallbackUrlHandlers.tail: _*)
}

def createArtifact[A <: Artifact: ClassTag](url: URI): A = {
val fn = urlHandlerChain.urlToArtifact[A]
val clazz = implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]]
require(fn.isDefinedAt(url), s"Cannot create $clazz from $url")
fn(url)
}
}

}

/** Supports creation of a particular type of Artifact from a URL.
* Allows chaining together of different implementations that recognize different input URLs
* and support creation of different Artifact types
*/
trait UrlToArtifact {
/** Return a PartialFunction indicating whether the given Artifact type can be created from an input URL
* @tparam A The Artifact type to be created
* @return A PartialFunction where isDefined will return true if an Artifact of type A can
* be created from the given URL
*/
def urlToArtifact[A <: Artifact: ClassTag]: PartialFunction[URI, A]
}

object UrlToArtifact {
// Chain together a series of UrlToArtifact instances
// The result will be a UrlToArtifact that supports creation of the union of Artifact types and input URLs
// that are supported by the individual inputs
def chain(first: UrlToArtifact, second: UrlToArtifact, others: UrlToArtifact*) =
new UrlToArtifact {
override def urlToArtifact[A <: Artifact: ClassTag]: PartialFunction[URI, A] = {
var fn = first.urlToArtifact[A] orElse second.urlToArtifact[A]
for (o <- others) {
fn = fn orElse o.urlToArtifact[A]
}
fn
}
}

object Empty extends UrlToArtifact {
def urlToArtifact[A <: Artifact: ClassTag]: PartialFunction[URI, A] =
PartialFunction.empty[URI, A]
}

}

object CreateCoreArtifacts {
// Create a FlatArtifact or StructuredArtifact from an absolute file:// URL
val fromFileUrls: UrlToArtifact = new UrlToArtifact {
def urlToArtifact[A <: Artifact: ClassTag]: PartialFunction[URI, A] = {
val c = implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]]
val fn: PartialFunction[URI, A] = {
case url if c.isAssignableFrom(classOf[FileArtifact])
&& "file" == url.getScheme =>
new FileArtifact(new File(url)).asInstanceOf[A]
case url if c.isAssignableFrom(classOf[FileArtifact])
&& null == url.getScheme =>
new FileArtifact(new File(url.getPath)).asInstanceOf[A]
case url if c.isAssignableFrom(classOf[DirectoryArtifact])
&& "file" == url.getScheme
&& new File(url).exists
&& new File(url).isDirectory =>
new DirectoryArtifact(new File(url)).asInstanceOf[A]
case url if c.isAssignableFrom(classOf[DirectoryArtifact])
&& null == url.getScheme
&& new File(url.getPath).exists
&& new File(url.getPath).isDirectory =>
new DirectoryArtifact(new File(url.getPath)).asInstanceOf[A]
case url if c.isAssignableFrom(classOf[ZipFileArtifact])
&& "file" == url.getScheme =>
new ZipFileArtifact(new File(url)).asInstanceOf[A]
case url if c.isAssignableFrom(classOf[ZipFileArtifact])
&& null == url.getScheme =>
new ZipFileArtifact(new File(url.getPath)).asInstanceOf[A]
}
fn
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ import scala.io.{ Codec, Source }
import scala.reflect.ClassTag

trait ArtifactIo[T, -A <: Artifact]
extends SerializeToArtifact[T, A] with DeserializeFromArtifact[T, A]
extends Serializer[T, A] with Deserializer[T, A]

/** Interface for defining how to persist a data type.
*
* @tparam T the type of the data being serialized
* @tparam A the type of the artifact being written (i.e. FileArtifact)
* @tparam A the type of the artifact being written (e.g. FileArtifact)
*/
trait SerializeToArtifact[-T, -A <: Artifact] extends PipelineStep {
trait Serializer[-T, -A <: Artifact] extends PipelineStep {
def write(data: T, artifact: A): Unit
}

/** Interface for defining how to persist a data type.
*
* @tparam T the type of the data being serialized
* @tparam A the type of the artifact being read (i.e. FileArtifact)
* @tparam A the type of the artifact being read (e.g. FileArtifact)
*/
trait DeserializeFromArtifact[+T, -A <: Artifact] extends PipelineStep {
trait Deserializer[+T, -A <: Artifact] extends PipelineStep {
def read(artifact: A): T
}

Expand Down Expand Up @@ -54,11 +54,14 @@ class SingletonIo[T: StringSerializable: ClassTag](implicit codec: Codec)
_.write(implicitly[StringSerializable[T]].toString(data))
}

override def stepInfo: PipelineStepInfo =
override def stepInfo: PipelineStepInfo = {
val className = scala.reflect.classTag[T].runtimeClass.getSimpleName
super.stepInfo.copy(
className = s"SingletonIo[${scala.reflect.classTag[T].runtimeClass.getSimpleName}]",
parameters = Map("charSet" -> codec.charSet.toString)
className = s"ReadObject[$className]",
parameters = Map("charSet" -> codec.charSet.toString),
description = Some(s"Read [$className] into memory")
)
}
}

object SingletonIo {
Expand All @@ -82,11 +85,14 @@ class LineCollectionIo[T: StringSerializable: ClassTag](implicit codec: Codec)
override def write(data: Iterable[T], artifact: FlatArtifact): Unit =
delegate.write(data.iterator, artifact)

override def stepInfo: PipelineStepInfo =
override def stepInfo: PipelineStepInfo = {
val className = scala.reflect.classTag[T].runtimeClass.getSimpleName
super.stepInfo.copy(
className = s"LineCollectionIo[${scala.reflect.classTag[T].runtimeClass.getSimpleName}]",
parameters = Map("charSet" -> codec.charSet.toString)
className = s"ReadCollection[$className]",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Friendlier names for source-data boxes on pipeline diagrams

parameters = Map("charSet" -> codec.charSet.toString),
description = Some(s"Read collection of [$className] into memory")
)
}

}

Expand Down Expand Up @@ -124,12 +130,15 @@ class LineIteratorIo[T: StringSerializable: ClassTag](implicit codec: Codec)
}
}

override def stepInfo: PipelineStepInfo =
override def stepInfo: PipelineStepInfo = {
val className = scala.reflect.classTag[T].runtimeClass.getSimpleName
super.stepInfo.copy(
className =
s"LineIteratorIo[${scala.reflect.classTag[T].runtimeClass.getSimpleName}]",
parameters = Map("charSet" -> codec.charSet.toString)
s"ReadIterator[$className]",
parameters = Map("charSet" -> codec.charSet.toString),
description = Some(s"Stream iterator of [$className]")
)
}
}

object LineIteratorIo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object Ai2CodeInfo {
// We have to guess which remote will have the commit in it
val useRemote = remotes.size match {
// If there is only one remote, use it
case 1 => remotes(0)
case 1 => remotes.head
// People shouldn't push directly to the upstream allenai repo. Instead the upstream
// repo gets updated via a pull request, which will have a different commit sha
// Use the first non-allenai repo found in the list, which will typically be
Expand Down
Loading