Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

Commit

Permalink
[WIP - tests fail] Add support for making Remote calls with mutliple …
Browse files Browse the repository at this point in the history
…streams
  • Loading branch information
jedesah committed Jun 18, 2015
1 parent 76457c5 commit b263562
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 40 deletions.
65 changes: 39 additions & 26 deletions core/src/main/scala/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scalaz.Monad
import scalaz.stream.Process
import scalaz.concurrent.Task
import scodec.bits.BitVector
import scodec.Err
import scodec.{Codec, Err}
import scodec.Attempt.{Failure, Successful}
import utils._

Expand Down Expand Up @@ -68,35 +68,48 @@ object Server {

/** Evaluate a remote expression, using the given (untyped) environment. */
def eval(env: Environment[_])(r: Remote[Any])(userStream: Process[Task, BitVector]): (Response[Any], Boolean) = {
def evalSub(r: Remote[Any]) = eval(env)(r)(userStream)._1
val streams = Streaming.demultiplexBits(userStream)
val values = env.values.values
import Remote._
r match {
case Local(a,_,_) => (Response.now(a), false)
case LocalStream(_, _,tag) =>
env.codecs.get(tag).map { decoder =>
val decodedStream: Process[Task, Any] = userStream.map(bits => decoder.complete.decode(bits).map(_.value)).flatMap(_.toProcess)
(Response.now(decodedStream), true)
}.getOrElse((Response.fail(new Error(s"[decoding] server does not have deserializers for:\n$tag")), false))
case Ref(name) => values.lift(name) match {
case None => (Response.delay { sys.error("Unknown name on server: " + name) }, false)
case Some(a) => (a(), a.isStream)
var currentIndex = 0
def evalImpl(r: Remote[Any]): (Response[Any], Boolean) = {
def evalSub(r: Remote[Any]) = evalImpl(r)._1
r match {
case Local(a, _, _) => (Response.now(a), false)
case LocalStream(_, _, tag) =>
val ourStream = Process.await(streams.apply(currentIndex))(identity)
env.codecs.get(tag).map { decoder =>
val decodedStream: Process[Task, Any] = ourStream.map(bits => decoder.complete.decode(bits).map(_.value)).flatten
currentIndex += 1
(Response.now(decodedStream), true)
}.getOrElse((Response.fail(new Error(s"[decoding] server does not have deserializers for:\n$tag")), false))
case Ref(name) => values.lift(name) match {
case None => (Response.delay {
sys.error("Unknown name on server: " + name)
}, false)
case Some(a) => (a(), a.isStream)
}
// on the server, only concern ourselves w/ tree of fully saturated calls
case Ap1(Ref(f), a) =>
val value = values(f)
(eval(env)(a)(userStream)._1.flatMap {
value(_)
}, value.isStream)
case Ap2(Ref(f), a, b) =>
val value = values(f)
(Monad[Response].tuple2(evalSub(a), evalSub(b)).flatMap { case (a, b) => value(a, b) }, value.isStream)
case Ap3(Ref(f), a, b, c) =>
val value = values(f)
(Monad[Response].tuple3(evalSub(a), evalSub(b), evalSub(c)).flatMap { case (a, b, c) => value(a, b, c) }, value.isStream)
case Ap4(Ref(f), a, b, c, d) =>
val value = values(f)
(Monad[Response].tuple4(evalSub(a), evalSub(b), evalSub(c), evalSub(d)).flatMap { case (a, b, c, d) => value(a, b, c, d) }, value.isStream)
case _ => (Response.delay {
sys.error("unable to interpret remote expression of form: " + r)
}, false)
}
// on the server, only concern ourselves w/ tree of fully saturated calls
case Ap1(Ref(f),a) =>
val value = values(f)
(eval(env)(a)(userStream)._1.flatMap{value(_)}, value.isStream)
case Ap2(Ref(f),a,b) =>
val value = values(f)
(Monad[Response].tuple2(evalSub(a), evalSub(b)).flatMap{case (a,b) => value(a,b)}, value.isStream)
case Ap3(Ref(f),a,b,c) =>
val value = values(f)
(Monad[Response].tuple3(evalSub(a), evalSub(b), evalSub(c)).flatMap{case (a,b,c) => value(a,b,c)}, value.isStream)
case Ap4(Ref(f),a,b,c,d) =>
val value = values(f)
(Monad[Response].tuple4(evalSub(a), evalSub(b), evalSub(c), evalSub(d)).flatMap{case (a,b,c,d) => value(a,b,c,d)}, value.isStream)
case _ => (Response.delay { sys.error("unable to interpret remote expression of form: " + r) }, false)
}
evalImpl(r)
}

def fail(msg: String): Process[Task, Nothing] = Process.fail(new Error(msg))
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
//: ----------------------------------------------------------------------------
package remotely

import java.util.NoSuchElementException

import remotely.codecs.DecodingFailure
import scodec.Attempt.{Successful, Failure}
import scodec.{Attempt, Err}
Expand All @@ -36,6 +38,10 @@ package object utils {
case -\/(e) => Process.fail(conv(e))
case \/-(a) => Process.emit(a)
}
def getLeft = a match {
case -\/(e) => e
case _ => throw new NoSuchElementException
}
}

implicit class AugmentedTask[A](t: Task[A]) {
Expand Down Expand Up @@ -63,12 +69,19 @@ package object utils {
case _ : EarlyCause => Task.fail(cause.asThrowable)
}
}
def apply(i: Int): Task[A] = p.drop(i).uncons.map(_._1)
def observeAll(sink: Sink[Task, Throwable \/ A]): Process[Task, A] = {
p.attempt().observe(sink).flatten
}
}
implicit class AugmentedSink[A](sink: Sink[Task, A]) {
def adapt[B](f: B => A): Sink[Task,B] = {
sink.map(ff => ff.compose(f))
}
}
implicit def errToE(err: Err) = new DecodingFailure(err)
implicit def eitherToProcess[A](either: Throwable \/ A): Process[Task, A] = either.toProcess
implicit def attemptToProcess[A](attempt: Attempt[A]): Process[Task, A] = attempt.toProcess
implicit class AugmentedAttempt[A](a: Attempt[A]) {
def toTask(implicit conv: Err => Throwable): Task[A] = a match {
case Failure(err) => Task.fail(conv(err))
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/codecs/codecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ package object codecs extends lowerprioritycodecs {
* This function actually creates a stream of bits from a LocalStream to send to the
* server once the initial request has been sent in it's entirety.
*/
def encodeLocalStream[A](l: Option[LocalStream[A]]): Process[Task,BitVector] =
l.map(l => l.format.map(encoder => l.stream.map(encoder.encode(_).toProcess))
.getOrElse(Process.fail(Err("cannot encode Local value with undefined encoder"))).flatten).getOrElse(Process.empty)
def encodeLocalStream[A](l: LocalStream[A]): Process[Task,BitVector] =
l.format.map(encoder => l.stream.map(encoder.encode(_).toProcess))
.getOrElse(Process.fail(Err("cannot encode Local value with undefined encoder"))).flatten

/**
* A `Remote[Any]` decoder. If a `Local` value refers
Expand Down
16 changes: 7 additions & 9 deletions core/src/main/scala/package.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
import remotely.Remote.LocalStream

import scalaz.{\/-, -\/, \/}

//: ----------------------------------------------------------------------------
//: Copyright (C) 2014 Verizon. All Rights Reserved.
//:
Expand All @@ -28,8 +24,11 @@ package object remotely {
import scalaz.\/.{left,right}
import scalaz.Monoid
import scodec.bits.{BitVector,ByteVector}
// import scodec.Decoder
import utils._
import remotely.Remote.LocalStream
import scodec.Codec

import scalaz.{\/-, -\/, \/}

/**
* Represents the logic of a connection handler, a function
Expand Down Expand Up @@ -75,8 +74,8 @@ package object remotely {
Response.scope { Response { ctx =>
val refs = Remote.refs(r)

val stream = (r collect { case l: LocalStream[Any]@unchecked => l }).headOption
val userBits = codecs.encodeLocalStream(stream)
val streams: List[LocalStream[Any]] = r collect { case l: LocalStream[Any]@unchecked => l }
val userBitStreams = streams.map(stream => codecs.encodeLocalStream(stream))

def reportErrors[R](startNanos: Long)(t: Process[Task, R]): Process[Task, R] =
t.onFailure { e =>
Expand Down Expand Up @@ -105,7 +104,7 @@ package object remotely {
val resultStream = Process.await(timeAndConnection) { case (start, conn) =>
val reqBits = codecs.encodeRequest(r, ctx, remoteTag).toProcess
val respBits = reportErrors(start) {
val allBits = reqBits ++ userBits
val allBits = reqBits ++ Streaming.multiplexBits(userBitStreams)
conn(allBits)
}
reportErrors(start) {
Expand All @@ -121,5 +120,4 @@ package object remotely {

implicit val BitVectorMonoid = Monoid.instance[BitVector]((a,b) => a ++ b, BitVector.empty)
implicit val ByteVectorMonoid = Monoid.instance[ByteVector]((a,b) => a ++ b, ByteVector.empty)

}
51 changes: 49 additions & 2 deletions core/src/test/scala/StreamingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package remotely
import java.net.InetSocketAddress
import java.util.NoSuchElementException

import org.scalacheck.Arbitrary
import org.scalacheck.Arbitrary._
import org.scalatest.prop.PropertyChecks
import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
import remotely.Remote.implicits._
import remotely.transport.netty.NettyTransport
Expand All @@ -31,19 +34,32 @@ import scalaz.stream._

import scalaz.stream.async

class StreamingSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
import scala.concurrent.duration._

import utils._

class StreamingSpec extends FlatSpec with Matchers with BeforeAndAfterAll with PropertyChecks {
behavior of "Streaming"

implicit def processArbitrary[A: Arbitrary]: Arbitrary[Process[Task,A]] =
Arbitrary(arbitrary[List[A]] map ((x: List[A]) => Process.emitAll(x)))

def twoStreamFunction(p1: Process[Task, Int], p2: Process[Task, Boolean]): Task[Int] =
p1.observe(io.stdOutLines.adapt(_.toString)).zipWith(p2.observe(io.stdOutLines.adapt(_.toString)))((i, b) => if(b) i else 0).sum.runLast.map(_.get)

// on server, populate environment with codecs and values
val env = Environment.empty
.codec[Byte]
.codec[Int]
.codec[Boolean]
.populate { _
// It would be nice if this could fail to compile...
.declareStream("download", (n: Int) => Response.now { Process[Byte](1,2,3,4) } )
.declareStream("continuous", (p: Process[Task, Int]) => Response.now { p.map(_ + 1)} )
.declare("upload", (p: Process[Task, Int]) => Response.async[Int](p.runLog.map(_.sum)))
.declareStream("failDownload", (n: Int) => Response.now { Process[Byte](1,2) ++ Process.fail(new NoSuchElementException)})
.declare("uploadWithNormalValue", (p: Process[Task, Int], a: Int) => Response.now(a))
.declare("twoUpload", (p1: Process[Task, Int], p2: Process[Task, Boolean]) => Response.async{twoStreamFunction(p1,p2)})
}

val addr = new InetSocketAddress("localhost", 8091)
Expand All @@ -58,7 +74,9 @@ class StreamingSpec extends FlatSpec with Matchers with BeforeAndAfterAll {

val uploadWithNormalValue = Remote.ref[(Process[Task, Int], Int) => Int]("uploadWithNormalValue")

val serverShutdown = env.serve(addr, monitoring = Monitoring.consoleLogger("[server]")).run
val twoUpload = Remote.ref[(Process[Task, Int], Process[Task, Boolean]) => Int]("twoUpload")

val serverShutdown = env.serve(addr).run

val transport = NettyTransport.single(addr).run
val loc: Endpoint = Endpoint.single(transport)
Expand Down Expand Up @@ -96,6 +114,13 @@ class StreamingSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
val result: Process[Task, Byte] = downloadFail(10).run(loc).run
result.run.attempt.run.getLeft shouldBe a [NoSuchElementException]
}
it should "support a function that takes two streams" in { forAll { (intStream: Process[Task, Int], boolStream: Process[Task, Boolean]) =>
println("begin property")
val result = twoUpload(intStream, boolStream).runWithoutContext(loc)

result.run shouldEqual twoStreamFunction(intStream, boolStream).run
println("end property")
}}
ignore should "work (mutable)" in {
val q = async.unboundedQueue[Int]
val byteStream = q.dequeue
Expand All @@ -116,6 +141,28 @@ class StreamingSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
//continuousResult(2).timed(1.second).run shouldEqual(30)
}

it should "multiplex and demultiplex streams" in { forAll { (streamOne: Process[Task, Int], streamTwo: Process[Task, Boolean]) =>
val stream = Streaming.multiplex(Process(streamOne, streamTwo))
val demuxed = Streaming.demultiplex(stream)
streamOne.runLog.run should be (demuxed.apply(0).run.runLog.run)
streamTwo.runLog.run should be (demuxed.apply(1).run.runLog.run)
}}

it should "multiplex and demultiplex mutable streams" in {
val listOne = List(1,2,3)
val listTwo = List(true, false, true)
val queueOne = async.unboundedQueue[Int]
val streamOne = queueOne.dequeue
val queueTwo = async.unboundedQueue[Boolean]
val streamTwo = queueTwo.dequeue
queueOne.enqueueAll(listOne).timed(1.second).run
queueTwo.enqueueAll(listTwo).timed(1.second).run
val stream = Streaming.multiplex(Process(streamOne, streamTwo))
val demuxed = Streaming.demultiplex(stream)
streamOne.runLog.timed(1.second).run should be (demuxed.apply(0).timed(1.second).run.runLog.timed(1.second).run)
streamTwo.runLog.timed(1.second).run should be (demuxed.apply(1).timed(1.second).run.runLog.timed(1.second).run)
}

override def afterAll() = {
transport.shutdown.run
serverShutdown.run
Expand Down

0 comments on commit b263562

Please sign in to comment.