Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Develpg: Implementation of evaluation algorithm #4

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ parallelExecution in Test := false

libraryDependencies ++= Seq(
"io.prediction" %% "core" % pioVersion.value % "provided",
"org.apache.spark" %% "spark-core" % "1.3.0" % "provided",
"org.apache.spark" %% "spark-mllib" % "1.3.0" % "provided",
"org.apache.spark" %% "spark-core" % "1.5.1" % "provided",
"org.apache.spark" %% "spark-mllib" % "1.5.1" % "provided",
"org.scalatest" %% "scalatest" % "2.2.1" % "test")
2 changes: 1 addition & 1 deletion engine.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"name": "ecomm",
"params": {
"appName": "INVALID_APP_NAME",
"unseenOnly": true,
"unseenOnly": false,
"seenEvents": ["buy", "view"],
"similarEvents": ["view"],
"rank": 10,
Expand Down
156 changes: 154 additions & 2 deletions src/main/scala/DataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ import org.apache.spark.rdd.RDD

import grizzled.slf4j.Logger

case class DataSourceParams(appName: String) extends Params
case class DataSourceEvalParams(kFold: Int, queryNum: Int, buyTestScore: Double, viewTestScore: Double)

case class DataSourceParams(
appName: String,
evalParams: Option[DataSourceEvalParams]) extends Params

class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
EmptyEvaluationInfo, Query, EmptyActualResult] {
EmptyEvaluationInfo, Query, ActualResult] {

@transient lazy val logger = Logger[this.type]

Expand Down Expand Up @@ -108,6 +112,154 @@ class DataSource(val dsp: DataSourceParams)
buyEvents = buyEventsRDD
)
}

override
def readEval(sc: SparkContext)
: Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {

require(!dsp.evalParams.isEmpty, "Must specify evalParams")
val evalParams = dsp.evalParams.get

// create a RDD of (entityID, User)
val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
appName = dsp.appName,
entityType = "user"
)(sc).map { case (entityId, properties) =>
val user = try {
User()
} catch {
case e: Exception => {
logger.error(s"Failed to get properties ${properties} of" +
s" user ${entityId}. Exception: ${e}.")
throw e
}
}
(entityId, user)
}.cache()

// create a RDD of (entityID, Item)
val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
appName = dsp.appName,
entityType = "item"
)(sc).map { case (entityId, properties) =>
val item = try {
// Assume categories is optional property of item.
Item(categories = properties.getOpt[List[String]]("categories"))
} catch {
case e: Exception => {
logger.error(s"Failed to get properties ${properties} of" +
s" item ${entityId}. Exception: ${e}.")
throw e
}
}
(entityId, item)
}.cache()

val eventsRDD: RDD[(Event,Long)] = PEventStore.find(
appName = dsp.appName,
entityType = Some("user"),
eventNames = Some(List("view", "buy")),
// targetEntityType is optional field of an event.
targetEntityType = Some(Some("item")))(sc).zipWithUniqueId.cache()

val kFold = evalParams.kFold
(0 until kFold).map { idx => {
logger.info(s"kFold: ${idx}.")

val trainingEventsRDD: RDD[Event] = eventsRDD.filter(_._2 % kFold != idx).map(_._1)
logger.info(s"trainingEventsRDD count: ${trainingEventsRDD.count()}.")

val testEventsRDD: RDD[Event] = eventsRDD.filter(_._2 % kFold == idx).map(_._1)
logger.info(s"testEventsRDD count: ${testEventsRDD.count()}.")

val trainingViewEventsRDD: RDD[ViewEvent] = trainingEventsRDD
.filter { event => event.event == "view" }
.map { event =>
try {
ViewEvent(
user = event.entityId,
item = event.targetEntityId.get,
t = event.eventTime.getMillis
)
} catch {
case e: Exception =>
logger.error(s"Cannot convert ${event} to ViewEvent." +
s" Exception: ${e}.")
throw e
}
}
logger.info(s"trainingViewEventsRDD count: ${trainingViewEventsRDD.count()}.")

val trainingBuyEventsRDD: RDD[BuyEvent] = trainingEventsRDD
.filter { event => event.event == "buy" }
.map { event =>
try {
BuyEvent(
user = event.entityId,
item = event.targetEntityId.get,
t = event.eventTime.getMillis
)
} catch {
case e: Exception =>
logger.error(s"Cannot convert ${event} to BuyEvent." +
s" Exception: ${e}.")
throw e
}
}
logger.info(s"trainingBuyEventsRDD count: ${trainingBuyEventsRDD.count()}.")

val testViewEventsRDD: RDD[ViewEvent] = testEventsRDD
.filter { event => event.event == "view" }
.map { event =>
try {
ViewEvent(
user = event.entityId,
item = event.targetEntityId.get,
t = event.eventTime.getMillis
)
} catch {
case e: Exception =>
logger.error(s"Cannot convert ${event} to ViewEvent." +
s" Exception: ${e}.")
throw e
}
}
logger.info(s"testViewEventsRDD count: ${testViewEventsRDD.count()}.")

val testBuyEventsRDD: RDD[BuyEvent] = testEventsRDD
.filter { event => event.event == "buy" }
.map { event =>
try {
BuyEvent(
user = event.entityId,
item = event.targetEntityId.get,
t = event.eventTime.getMillis
)
} catch {
case e: Exception =>
logger.error(s"Cannot convert ${event} to BuyEvent." +
s" Exception: ${e}.")
throw e
}
}
logger.info(s"testBuyEventsRDD count: ${testBuyEventsRDD.count()}.")


val viewbuy = Set("view","buy")
val testingUsers = testEventsRDD.filter{ event => viewbuy.contains(event.event) }.map(event=>(event.entityId)).distinct
logger.info(s"testingUsers count: ${testingUsers.count()}.")

val is1 = testBuyEventsRDD.map(ev=>((ev.user,ev.item),evalParams.buyTestScore))
val is2 = testViewEventsRDD.map(ev=>((ev.user,ev.item),evalParams.viewTestScore))
val is = is1.union(is2).reduceByKey(_+_)

//val trainingItemScores = trainingBuyEventsRDD.map(ev=>(ev.user, new ItemScore(ev.item, 1.0))).groupByKey().map(x=>(x._1, x._2.toArray)).collect
val testItemScores = is.map(ev=>(ev._1._1, new ItemScore(ev._1._2, ev._2))).groupByKey().map(x=>(x._1, x._2.toArray)).collect
logger.info(s"testItemScores size: ${testItemScores.size}.")

(new TrainingData(users = usersRDD,items = itemsRDD,viewEvents = trainingViewEventsRDD,buyEvents = trainingBuyEventsRDD),new EmptyEvaluationInfo(),testingUsers.map{user => (Query(user=user, num=evalParams.queryNum,categories=None,whiteList=None,blackList=None), ActualResult(testItemScores.filter(_._1==user).flatMap(_._2) ))})
}}
}
}

case class User()
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/Engine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ case class PredictedResult(
itemScores: Array[ItemScore]
) extends Serializable

case class ActualResult(
itemScores: Array[ItemScore]
) extends Serializable

case class ItemScore(
item: String,
score: Double
Expand Down
159 changes: 159 additions & 0 deletions src/main/scala/Evaluation.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package org.template.ecommercerecommendation

/*
* Copyright KOLIBERO under one or more contributor license agreements.
* KOLIBERO licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import io.prediction.controller.Evaluation
import io.prediction.controller.OptionAverageMetric
import io.prediction.controller.AverageMetric
import io.prediction.controller.EmptyEvaluationInfo
import io.prediction.controller.EngineParamsGenerator
import io.prediction.controller.EngineParams
import io.prediction.controller.MetricEvaluator

import grizzled.slf4j.Logger

// Usage:
// $ pio eval org.template.RecommendationEvaluation \
// org.template.ParamsList

case class PrecisionAtK(k: Int, scoreThreshold: Double = 1.0)
extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
require(k > 0, "k must be greater than 0")

override def header = s"Precision@K (k=$k, threshold=$scoreThreshold)"

def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
val positives: Set[String] = a.itemScores.filter(_.score >= scoreThreshold).map(_.item).toSet

// If there is no positive results, Precision is undefined. We don't consider this case in the
// metrics, hence we return None.
if (positives.size == 0) {
return None
}

val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size

Some(tpCount.toDouble / math.min(k, positives.size))
}
}

case class Jaccard(scoreThreshold: Double = 1.0)
extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
//require(k > 0, "k must be greater than 0")

@transient lazy val logger = Logger[this.type]

override def header = s"Jaccard (scoreThreshold=$scoreThreshold)"

def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
val positives: Set[String] = a.itemScores.toList.filter(_.score >= scoreThreshold).map(_.item).toSet

// If there is no positive results, Precision is undefined. We don't consider this case in the
// metrics, hence we return None.
if (positives.size == 0) {
return None
}

val aItems = a.itemScores.toList.map(_.item).toSet
val pItems = p.itemScores.toList.map(_.item).toSet

// If there are no actual items we don't consider the case in metrics
if (aItems.size == 0) {
return None
}

//logger.info(s"Query.user: ${q.user}")
//logger.info(s"ActualResult size: ${a.itemScores.size}")
//logger.info(s"PredictedResult size: ${p.itemScores.size}")

val jVal = jaccardValue(aItems,pItems)

logger.info(s"user: ${q.user}, jc: ${jVal}, ars: ${a.itemScores.size}, prs: ${p.itemScores.size}")
val aa = a.itemScores.toList.map(x=>(x.item+":"+x.score)).mkString(",")
val bb = p.itemScores.toList.map(x=>(x.item+":"+x.score)).mkString(",")
//logger.info(s"user: ${q.user}, a: ${aa}, p: ${bb}")

Some(jVal)
}

def jaccardValue (A: Set[String], B: Set[String]) : Double = {
return A.intersect(B).size.toDouble / A.union(B).size.toDouble
}
}

case class PositiveCount(scoreThreshold: Double = 1.0)
extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
override def header = s"PositiveCount (threshold=$scoreThreshold)"

def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
a.itemScores.toList.filter(_.score >= scoreThreshold).size
}
}

object RecommendationEvaluation extends Evaluation {
engineEvaluator = (
ECommerceRecommendationEngine(),
MetricEvaluator(
metric = Jaccard(scoreThreshold = 1.0),
otherMetrics = Seq(
PositiveCount(scoreThreshold = 1.0),
PrecisionAtK(k =10, scoreThreshold = 1.0)
)
)
)
}

object ComprehensiveRecommendationEvaluation extends Evaluation {
val scoreThresholds = Seq(1.0, 10.0, 11.0)

engineEvaluator = (
ECommerceRecommendationEngine(),
MetricEvaluator(
metric = Jaccard(scoreThreshold = 1.0),
otherMetrics = (
(for (r <- scoreThresholds) yield PositiveCount(scoreThreshold = r)) ++
(for (r <- scoreThresholds) yield Jaccard(scoreThreshold = r))
)))
}

object RecommendationEvaluation2 extends Evaluation {
engineEvaluator = (
ECommerceRecommendationEngine(),
MetricEvaluator(
metric = PrecisionAtK(k =10, scoreThreshold = 1.0),
otherMetrics = Seq( PositiveCount(scoreThreshold = 1.0) )
)
)
}

trait BaseEngineParamsList extends EngineParamsGenerator {
protected val baseEP = EngineParams(
dataSourceParams = DataSourceParams(
appName = "INVALID_APP_NAME",
evalParams = Some(DataSourceEvalParams(kFold = 2, queryNum = 5, buyTestScore = 10.0, viewTestScore = 1.0))))
}

object EngineParamsList extends BaseEngineParamsList {
engineParamsList = for(
rank <- Seq(10);
numIterations <- Seq(20);
lambda <- Seq(0.01))
yield baseEP.copy(
algorithmParamsList = Seq(
("ecomm", ECommAlgorithmParams("INVALID_APP_NAME", false, List("buy", "view"), List("view"), rank, numIterations, lambda, Option(3)))) )
}