Skip to content

Commit

Permalink
master Fixed finagle to respond to requests
Browse files Browse the repository at this point in the history
  • Loading branch information
msmith-techempower committed Nov 19, 2013
1 parent 43a23b7 commit 058c36f
Showing 1 changed file with 65 additions and 35 deletions.
100 changes: 65 additions & 35 deletions finagle/src/main/scala/com/falmarri/finagle/Finagle.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,40 @@
package com.falmarri.finagle

import scala.util.Random
import scala.collection.immutable.StringOps
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.exp.mysql.{Client, IntValue, MySQL, Row}
import com.twitter.finagle.http.{HttpMuxer, Request, Response}
import com.twitter.finagle.{Http, Service}
import com.twitter.util.{Future, FuturePool}
import java.net.InetSocketAddress
import java.util.concurrent.Executors
import com.twitter.finagle.Service
import com.twitter.finagle.exp.Mysql
import com.twitter.finagle.exp.mysql._
import org.jboss.netty.handler.codec.http._
import org.jboss.netty.handler.codec.http.HttpResponseStatus._
import org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1
import org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer
import scala.util.Random
import com.twitter.util.{Future, FuturePool}
import java.net.InetSocketAddress
import com.twitter.finagle.builder.{Server, ServerBuilder}
import com.twitter.finagle.http.{Http,HttpMuxer}

object FinagleBenchmark extends App {
val maxConnections = 256

val mysql = new Client(ClientBuilder()
.codec(new MySQL("benchmarkdbuser", "benchmarkdbpass", Some("hello_world")))
.hosts(new InetSocketAddress(System.getProperty("db.host", "localhost"), 3306))
.hostConnectionLimit(maxConnections)
.buildFactory())
//val mysql = new Client(ClientBuilder()
// .codec(new MySQL("benchmarkdbuser", "benchmarkdbpass", Some("hello_world")))
// .hosts(new InetSocketAddress(System.getProperty("db.host", "localhost"), 3306))
// .hostConnectionLimit(maxConnections)
// .buildFactory())

val username = "benchmarkdbuser"
val password = "benchmarkdbpass"
val db = "hello_world"
val host = System.getProperty("db.host", "localhost")

val mysql = Mysql
.withCredentials(username, password)
.withDatabase(db)
.newRichClient(host + ":3306")

val pool = FuturePool(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2))

Expand All @@ -41,37 +56,52 @@ object FinagleBenchmark extends App {
def serialize(result: Any): Array[Byte] =
mapper.writeValueAsBytes(result)

def createResponse(req: Request, bytes: Array[Byte]) = {
def createResponse(req: HttpRequest, bytes: Array[Byte]) = {
val body = wrappedBuffer(bytes)
val resp = req.response
resp.setContentTypeJson
val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
//resp.setContentTypeJson
resp.setContent(body)
resp.contentLength = body.readableBytes
//resp.contentLength = body.readableBytes
resp
}

HttpMuxer.addRichHandler("/json", new Service[Request, Response] {
def apply(req: Request): Future[Response] = pool {
createResponse(req, serialize(Map("message" -> "Hello, World!")))
}
})

HttpMuxer.addRichHandler("/db", new Service[Request, Response] {
val rand = new Random()
val sql = "SELECT * FROM world WHERE id = "
val muxService = new HttpMuxer()
.withHandler("/json", new Service[HttpRequest, HttpResponse] {
def apply(req: HttpRequest): Future[HttpResponse] = pool {
createResponse(req, serialize(Map("message" -> "Hello, World!")))
}
})
.withHandler("/db", new Service[HttpRequest, HttpResponse] {
val rand = new Random()
val sql = "SELECT * FROM world WHERE id = "

def apply(req: Request): Future[Response] = {
val n = req.params.getIntOrElse("queries", 1)
def apply(req: HttpRequest): Future[HttpResponse] = {
//val n = req.params.getIntOrElse("queries", 1)
val decoder = new QueryStringDecoder(req.getUri())
val n = {
val queries = decoder.getParameters().get("queries")
if(queries == null) {
1
}
else {
queries.get(0).toInt
}
}

val qs = (0 until n) map { i =>
mysql.select(sql + rand.nextInt(10000))(rowToMap)
}
val qs = (0 until n) map { i =>
mysql.select(sql + rand.nextInt(10000))(rowToMap)
}

Future.collect(qs) map { results =>
createResponse(req, serialize(results.flatten))
Future.collect(qs) map { results =>
createResponse(req, serialize(results.flatten))
}
}
}
})
})

Http.serve(new InetSocketAddress(8080), HttpMuxer)
//Http.serve(new InetSocketAddress(8080), HttpMuxer)
val server: Server = ServerBuilder()
.codec(Http())
.bindTo(new InetSocketAddress(8080))
.name("HttpServer")
.build(muxService)
}

0 comments on commit 058c36f

Please sign in to comment.