Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Make the minify method return a more useful error message #462

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import scala.util.{ Success, Failure }
import scala.util.control.Exception.allCatch

import org.slf4j.LoggerFactory
import com.twitter.util.{Try => TTry, Throw, Return}

object Scalding {
@transient private val logger = LoggerFactory.getLogger(classOf[Scalding])
Expand Down Expand Up @@ -94,8 +95,11 @@ object Scalding {
commutativity
}

def intersect(dr1: DateRange, dr2: DateRange): Option[DateRange] =
(dr1.as[Interval[Timestamp]] && (dr2.as[Interval[Timestamp]])).as[Option[DateRange]]
def intersect(dr1: DateRange, dr2: DateRange): Either[List[FailureReason], DateRange] =
(dr1.as[Interval[Timestamp]] && (dr2.as[Interval[Timestamp]])).as[Option[DateRange]] match {
case Some(dr) => Right(dr)
case None => Left(List("No intersection between date ranges: " + dr1 + " and " + dr2))
}

/** Given a constructor function, computes the maximum available range
* of time or gives an error.
Expand All @@ -109,19 +113,25 @@ object Scalding {
val available = (mode, factory(desired)) match {
case (hdfs: Hdfs, ts: STPS) =>
TimePathedSource.satisfiableHdfs(hdfs, desired, factory.asInstanceOf[DateRange => STPS])
.map(Right(_))
.getOrElse(Left(List("No satisfiable HDFS date range")))
case _ => bisectingMinify(mode, desired)(factory)
}
available.flatMap { intersect(desired, _) }
.map(Right(_))
.getOrElse(Left(List("available: " + available + ", desired: " + desired)))
available.right.flatMap {
intersect(desired, _)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like we'd end up with a pretty similar error message, available = x, desired = y. vs no intersection between x and y?

}
}
catch { case t: Throwable => toTry(t) }
}

private def bisectingMinify(mode: Mode, desired: DateRange)(factory: (DateRange) => SSource): Option[DateRange] = {
def isGood(end: Long): Boolean = allCatch.opt(factory(DateRange(desired.start, RichDate(end))).validateTaps(mode)).isDefined
private def bisectingMinify(mode: Mode, desired: DateRange)
(factory: (DateRange) => SSource)
: Either[List[FailureReason], DateRange] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put this back on the line before

def tryRange(end: Long): TTry[Unit] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like tryRange might fit all on one line?

TTry(factory(DateRange(desired.start, RichDate(end))).validateTaps(mode))
val DateRange(start, end) = desired
if(isGood(start.timestamp)) {
tryRange(start.timestamp) match{
case Return(_) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indenting looks off here?

// The invariant is that low isGood, low < upper, and upper isGood == false
@annotation.tailrec
def findEnd(low: Long, upper: Long): Long =
Expand All @@ -130,20 +140,21 @@ object Scalding {
else {
// mid must be > low because upper >= low + 2
val mid = low + (upper - low)/2
if(isGood(mid))
if(tryRange(mid).isReturn)
findEnd(mid, upper)
else
findEnd(low, mid)
}

if(isGood(end.timestamp)) Some(desired)
else Some(DateRange(desired.start, RichDate(findEnd(start.timestamp, end.timestamp))))
if(tryRange(end.timestamp).isReturn) Right(desired)
else Right(DateRange(desired.start, RichDate(findEnd(start.timestamp, end.timestamp))))
}
else {
case Throw(e) =>{
// No good data
None
toTry(e)
}
}
}

/** This uses minify to find the smallest subset we can run.
* If you don't want this behavior, then use pipeFactoryExact which
Expand Down