diff --git a/build.sbt b/build.sbt index 896f6db..e146326 100644 --- a/build.sbt +++ b/build.sbt @@ -10,6 +10,6 @@ parallelExecution in Test := false libraryDependencies ++= Seq( "io.prediction" %% "core" % pioVersion.value % "provided", - "org.apache.spark" %% "spark-core" % "1.3.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.3.0" % "provided", + "org.apache.spark" %% "spark-core" % "1.5.1" % "provided", + "org.apache.spark" %% "spark-mllib" % "1.5.1" % "provided", "org.scalatest" %% "scalatest" % "2.2.1" % "test") diff --git a/engine.json b/engine.json index 0ea26a0..56ae780 100644 --- a/engine.json +++ b/engine.json @@ -12,7 +12,7 @@ "name": "ecomm", "params": { "appName": "INVALID_APP_NAME", - "unseenOnly": true, + "unseenOnly": false, "seenEvents": ["buy", "view"], "similarEvents": ["view"], "rank": 10, diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala index bb83a3d..639e53e 100644 --- a/src/main/scala/DataSource.scala +++ b/src/main/scala/DataSource.scala @@ -13,11 +13,15 @@ import org.apache.spark.rdd.RDD import grizzled.slf4j.Logger -case class DataSourceParams(appName: String) extends Params +case class DataSourceEvalParams(kFold: Int, queryNum: Int, buyTestScore: Double, viewTestScore: Double) + +case class DataSourceParams( + appName: String, + evalParams: Option[DataSourceEvalParams]) extends Params class DataSource(val dsp: DataSourceParams) extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { + EmptyEvaluationInfo, Query, ActualResult] { @transient lazy val logger = Logger[this.type] @@ -108,6 +112,154 @@ class DataSource(val dsp: DataSourceParams) buyEvents = buyEventsRDD ) } + + override + def readEval(sc: SparkContext) + : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = { + + require(!dsp.evalParams.isEmpty, "Must specify evalParams") + val evalParams = dsp.evalParams.get + + // create a RDD of (entityID, User) + val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties( + appName = dsp.appName, + entityType = "user" + )(sc).map { case (entityId, properties) => + val user = try { + User() + } catch { + case e: Exception => { + logger.error(s"Failed to get properties ${properties} of" + + s" user ${entityId}. Exception: ${e}.") + throw e + } + } + (entityId, user) + }.cache() + + // create a RDD of (entityID, Item) + val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties( + appName = dsp.appName, + entityType = "item" + )(sc).map { case (entityId, properties) => + val item = try { + // Assume categories is optional property of item. + Item(categories = properties.getOpt[List[String]]("categories")) + } catch { + case e: Exception => { + logger.error(s"Failed to get properties ${properties} of" + + s" item ${entityId}. Exception: ${e}.") + throw e + } + } + (entityId, item) + }.cache() + + val eventsRDD: RDD[(Event,Long)] = PEventStore.find( + appName = dsp.appName, + entityType = Some("user"), + eventNames = Some(List("view", "buy")), + // targetEntityType is optional field of an event. + targetEntityType = Some(Some("item")))(sc).zipWithUniqueId.cache() + + val kFold = evalParams.kFold + (0 until kFold).map { idx => { + logger.info(s"kFold: ${idx}.") + + val trainingEventsRDD: RDD[Event] = eventsRDD.filter(_._2 % kFold != idx).map(_._1) + logger.info(s"trainingEventsRDD count: ${trainingEventsRDD.count()}.") + + val testEventsRDD: RDD[Event] = eventsRDD.filter(_._2 % kFold == idx).map(_._1) + logger.info(s"testEventsRDD count: ${testEventsRDD.count()}.") + + val trainingViewEventsRDD: RDD[ViewEvent] = trainingEventsRDD + .filter { event => event.event == "view" } + .map { event => + try { + ViewEvent( + user = event.entityId, + item = event.targetEntityId.get, + t = event.eventTime.getMillis + ) + } catch { + case e: Exception => + logger.error(s"Cannot convert ${event} to ViewEvent." + + s" Exception: ${e}.") + throw e + } + } + logger.info(s"trainingViewEventsRDD count: ${trainingViewEventsRDD.count()}.") + + val trainingBuyEventsRDD: RDD[BuyEvent] = trainingEventsRDD + .filter { event => event.event == "buy" } + .map { event => + try { + BuyEvent( + user = event.entityId, + item = event.targetEntityId.get, + t = event.eventTime.getMillis + ) + } catch { + case e: Exception => + logger.error(s"Cannot convert ${event} to BuyEvent." + + s" Exception: ${e}.") + throw e + } + } + logger.info(s"trainingBuyEventsRDD count: ${trainingBuyEventsRDD.count()}.") + + val testViewEventsRDD: RDD[ViewEvent] = testEventsRDD + .filter { event => event.event == "view" } + .map { event => + try { + ViewEvent( + user = event.entityId, + item = event.targetEntityId.get, + t = event.eventTime.getMillis + ) + } catch { + case e: Exception => + logger.error(s"Cannot convert ${event} to ViewEvent." + + s" Exception: ${e}.") + throw e + } + } + logger.info(s"testViewEventsRDD count: ${testViewEventsRDD.count()}.") + + val testBuyEventsRDD: RDD[BuyEvent] = testEventsRDD + .filter { event => event.event == "buy" } + .map { event => + try { + BuyEvent( + user = event.entityId, + item = event.targetEntityId.get, + t = event.eventTime.getMillis + ) + } catch { + case e: Exception => + logger.error(s"Cannot convert ${event} to BuyEvent." + + s" Exception: ${e}.") + throw e + } + } + logger.info(s"testBuyEventsRDD count: ${testBuyEventsRDD.count()}.") + + + val viewbuy = Set("view","buy") + val testingUsers = testEventsRDD.filter{ event => viewbuy.contains(event.event) }.map(event=>(event.entityId)).distinct + logger.info(s"testingUsers count: ${testingUsers.count()}.") + + val is1 = testBuyEventsRDD.map(ev=>((ev.user,ev.item),evalParams.buyTestScore)) + val is2 = testViewEventsRDD.map(ev=>((ev.user,ev.item),evalParams.viewTestScore)) + val is = is1.union(is2).reduceByKey(_+_) + + //val trainingItemScores = trainingBuyEventsRDD.map(ev=>(ev.user, new ItemScore(ev.item, 1.0))).groupByKey().map(x=>(x._1, x._2.toArray)).collect + val testItemScores = is.map(ev=>(ev._1._1, new ItemScore(ev._1._2, ev._2))).groupByKey().map(x=>(x._1, x._2.toArray)).collect + logger.info(s"testItemScores size: ${testItemScores.size}.") + + (new TrainingData(users = usersRDD,items = itemsRDD,viewEvents = trainingViewEventsRDD,buyEvents = trainingBuyEventsRDD),new EmptyEvaluationInfo(),testingUsers.map{user => (Query(user=user, num=evalParams.queryNum,categories=None,whiteList=None,blackList=None), ActualResult(testItemScores.filter(_._1==user).flatMap(_._2) ))}) + }} + } } case class User() diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala index 47d9896..525e571 100644 --- a/src/main/scala/Engine.scala +++ b/src/main/scala/Engine.scala @@ -15,6 +15,10 @@ case class PredictedResult( itemScores: Array[ItemScore] ) extends Serializable +case class ActualResult( + itemScores: Array[ItemScore] +) extends Serializable + case class ItemScore( item: String, score: Double diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala new file mode 100644 index 0000000..c991c76 --- /dev/null +++ b/src/main/scala/Evaluation.scala @@ -0,0 +1,159 @@ +package org.template.ecommercerecommendation + +/* + * Copyright KOLIBERO under one or more contributor license agreements. + * KOLIBERO licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import io.prediction.controller.Evaluation +import io.prediction.controller.OptionAverageMetric +import io.prediction.controller.AverageMetric +import io.prediction.controller.EmptyEvaluationInfo +import io.prediction.controller.EngineParamsGenerator +import io.prediction.controller.EngineParams +import io.prediction.controller.MetricEvaluator + +import grizzled.slf4j.Logger + +// Usage: +// $ pio eval org.template.RecommendationEvaluation \ +// org.template.ParamsList + +case class PrecisionAtK(k: Int, scoreThreshold: Double = 1.0) + extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + require(k > 0, "k must be greater than 0") + + override def header = s"Precision@K (k=$k, threshold=$scoreThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = { + val positives: Set[String] = a.itemScores.filter(_.score >= scoreThreshold).map(_.item).toSet + + // If there is no positive results, Precision is undefined. We don't consider this case in the + // metrics, hence we return None. + if (positives.size == 0) { + return None + } + + val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size + + Some(tpCount.toDouble / math.min(k, positives.size)) + } +} + +case class Jaccard(scoreThreshold: Double = 1.0) + extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + //require(k > 0, "k must be greater than 0") + + @transient lazy val logger = Logger[this.type] + + override def header = s"Jaccard (scoreThreshold=$scoreThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = { + val positives: Set[String] = a.itemScores.toList.filter(_.score >= scoreThreshold).map(_.item).toSet + + // If there is no positive results, Precision is undefined. We don't consider this case in the + // metrics, hence we return None. + if (positives.size == 0) { + return None + } + + val aItems = a.itemScores.toList.map(_.item).toSet + val pItems = p.itemScores.toList.map(_.item).toSet + + // If there are no actual items we don't consider the case in metrics + if (aItems.size == 0) { + return None + } + + //logger.info(s"Query.user: ${q.user}") + //logger.info(s"ActualResult size: ${a.itemScores.size}") + //logger.info(s"PredictedResult size: ${p.itemScores.size}") + + val jVal = jaccardValue(aItems,pItems) + + logger.info(s"user: ${q.user}, jc: ${jVal}, ars: ${a.itemScores.size}, prs: ${p.itemScores.size}") + val aa = a.itemScores.toList.map(x=>(x.item+":"+x.score)).mkString(",") + val bb = p.itemScores.toList.map(x=>(x.item+":"+x.score)).mkString(",") + //logger.info(s"user: ${q.user}, a: ${aa}, p: ${bb}") + + Some(jVal) + } + + def jaccardValue (A: Set[String], B: Set[String]) : Double = { + return A.intersect(B).size.toDouble / A.union(B).size.toDouble + } +} + +case class PositiveCount(scoreThreshold: Double = 1.0) + extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + override def header = s"PositiveCount (threshold=$scoreThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = { + a.itemScores.toList.filter(_.score >= scoreThreshold).size + } +} + +object RecommendationEvaluation extends Evaluation { + engineEvaluator = ( + ECommerceRecommendationEngine(), + MetricEvaluator( + metric = Jaccard(scoreThreshold = 1.0), + otherMetrics = Seq( + PositiveCount(scoreThreshold = 1.0), + PrecisionAtK(k =10, scoreThreshold = 1.0) + ) + ) + ) +} + +object ComprehensiveRecommendationEvaluation extends Evaluation { + val scoreThresholds = Seq(1.0, 10.0, 11.0) + + engineEvaluator = ( + ECommerceRecommendationEngine(), + MetricEvaluator( + metric = Jaccard(scoreThreshold = 1.0), + otherMetrics = ( + (for (r <- scoreThresholds) yield PositiveCount(scoreThreshold = r)) ++ + (for (r <- scoreThresholds) yield Jaccard(scoreThreshold = r)) + ))) +} + +object RecommendationEvaluation2 extends Evaluation { + engineEvaluator = ( + ECommerceRecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k =10, scoreThreshold = 1.0), + otherMetrics = Seq( PositiveCount(scoreThreshold = 1.0) ) + ) + ) +} + +trait BaseEngineParamsList extends EngineParamsGenerator { + protected val baseEP = EngineParams( + dataSourceParams = DataSourceParams( + appName = "INVALID_APP_NAME", + evalParams = Some(DataSourceEvalParams(kFold = 2, queryNum = 5, buyTestScore = 10.0, viewTestScore = 1.0)))) +} + +object EngineParamsList extends BaseEngineParamsList { + engineParamsList = for( + rank <- Seq(10); + numIterations <- Seq(20); + lambda <- Seq(0.01)) + yield baseEP.copy( + algorithmParamsList = Seq( + ("ecomm", ECommAlgorithmParams("INVALID_APP_NAME", false, List("buy", "view"), List("view"), rank, numIterations, lambda, Option(3)))) ) +} + +