From 0dbf61f92e977545b547b3192a6eca534851e1dc Mon Sep 17 00:00:00 2001 From: Max Hansmire Date: Thu, 27 Feb 2014 09:16:33 -0500 Subject: [PATCH] Make the error message for a failed minify more informative --- .../scalding/ScaldingPlatform.scala | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala index 1a104fabb..c193ea305 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala @@ -41,6 +41,7 @@ import scala.util.{ Success, Failure } import scala.util.control.Exception.allCatch import org.slf4j.LoggerFactory +import com.twitter.util.{Try => TTry, Throw, Return} object Scalding { @transient private val logger = LoggerFactory.getLogger(classOf[Scalding]) @@ -93,8 +94,11 @@ object Scalding { commutativity } - def intersect(dr1: DateRange, dr2: DateRange): Option[DateRange] = - (dr1.as[Interval[Time]] && (dr2.as[Interval[Time]])).as[Option[DateRange]] + def intersect(dr1: DateRange, dr2: DateRange): Either[List[FailureReason], DateRange] = + (dr1.as[Interval[Time]] && (dr2.as[Interval[Time]])).as[Option[DateRange]] match { + case Some(dr) => Right(dr) + case None => Left(List("No intersection between date ranges: " + dr1 + " and " + dr2)) + } /** Given a constructor function, computes the maximum available range * of time or gives an error. @@ -108,19 +112,25 @@ object Scalding { val available = (mode, factory(desired)) match { case (hdfs: Hdfs, ts: STPS) => TimePathedSource.satisfiableHdfs(hdfs, desired, factory.asInstanceOf[DateRange => STPS]) + .map(Right(_)) + .getOrElse(Left(List("No satisfiable HDFS date range"))) case _ => bisectingMinify(mode, desired)(factory) } - available.flatMap { intersect(desired, _) } - .map(Right(_)) - .getOrElse(Left(List("available: " + available + ", desired: " + desired))) + available.right.flatMap { + intersect(desired, _) + } } catch { case t: Throwable => toTry(t) } } - private def bisectingMinify(mode: Mode, desired: DateRange)(factory: (DateRange) => SSource): Option[DateRange] = { - def isGood(end: Long): Boolean = allCatch.opt(factory(DateRange(desired.start, RichDate(end))).validateTaps(mode)).isDefined + private def bisectingMinify(mode: Mode, desired: DateRange) + (factory: (DateRange) => SSource) + : Either[List[FailureReason], DateRange] = { + def tryRange(end: Long): TTry[Unit] = + TTry(factory(DateRange(desired.start, RichDate(end))).validateTaps(mode)) val DateRange(start, end) = desired - if(isGood(start.timestamp)) { + tryRange(start.timestamp) match{ + case Return(_) => { // The invariant is that low isGood, low < upper, and upper isGood == false @annotation.tailrec def findEnd(low: Long, upper: Long): Long = @@ -129,20 +139,21 @@ object Scalding { else { // mid must be > low because upper >= low + 2 val mid = low + (upper - low)/2 - if(isGood(mid)) + if(tryRange(mid).isReturn) findEnd(mid, upper) else findEnd(low, mid) } - if(isGood(end.timestamp)) Some(desired) - else Some(DateRange(desired.start, RichDate(findEnd(start.timestamp, end.timestamp)))) + if(tryRange(end.timestamp).isReturn) Right(desired) + else Right(DateRange(desired.start, RichDate(findEnd(start.timestamp, end.timestamp)))) } - else { + case Throw(e) =>{ // No good data - None + toTry(e) } } + } /** This uses minify to find the smallest subset we can run. * If you don't want this behavior, then use pipeFactoryExact which