Skip to content

Commit

Permalink
Scalafmt + Scalafix
Browse files Browse the repository at this point in the history
  • Loading branch information
ariskk committed Oct 7, 2020
1 parent 982f445 commit 010755c
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/com.ariskk.raft/Raft.scala
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ final class Raft[T](

/**
* Blocks until it gets committed.
*/
*/
def submitCommand(command: Command[T]): ZIO[Clock, RaftException, CommandResponse] =
state.leader.commit.flatMap { leader =>
leader match {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com.ariskk.raft/model/Command.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package com.ariskk.raft.model
case class Key(value: String) extends AnyVal

sealed trait Command[T]
case class ReadCommand[T](key: Key) extends Command[T]
case class ReadCommand[T](key: Key) extends Command[T]
case class WriteCommand[T](key: Key, value: T) extends Command[T]
2 changes: 1 addition & 1 deletion src/main/scala/com.ariskk.raft/model/RaftException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ case class InvalidStateException(message: String) extends RaftException
case class InvalidCommandException(message: String) extends RaftException
case class StorageException(message: String) extends RaftException
case object LeaderNotFoundException extends RaftException
case class StateMachineException(message: String) extends RaftException
case class StateMachineException(message: String) extends RaftException
4 changes: 2 additions & 2 deletions src/main/scala/com.ariskk.raft/state/VolatileState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ final class VolatileState(

def initPeerIndices(lastIndex: Index) = for {
peers <- peerList
_ <- ZSTM.collectAll(peers.map(nextIndex.put(_, lastIndex.increment)))
_ <- ZSTM.collectAll(peers.map(matchIndex.put(_, Index(0))))
_ <- ZSTM.collectAll(peers.map(nextIndex.put(_, lastIndex.increment)))
_ <- ZSTM.collectAll(peers.map(matchIndex.put(_, Index(0))))
} yield ()

def nodeState = state.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import com.ariskk.raft.model._

final class KeyValueStore[T](map: TMap[Key, T]) extends StateMachine[T] {
override def write(command: WriteCommand[T]) = map.put(command.key, command.value)
override def read(command: ReadCommand[T]) = map.get(command.key)
override def read(command: ReadCommand[T]) = map.get(command.key)
}

object KeyValueStore {
def apply[R] = TMap.empty[Key, R].commit.map(new KeyValueStore(_))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.ariskk.raft.model._

/**
* Very simplistic modeling
*/
*/
trait StateMachine[T] {
def write(command: WriteCommand[T]): STM[StateMachineException, Unit]
def read(command: ReadCommand[T]): STM[StateMachineException, Option[T]]
Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/com.ariskk.raft/storage/MemoryStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ final class MemoryLog[T](log: TRef[List[LogEntry[T]]]) extends Log[T] {
log.get.map(_.lift(index.index.toInt))
def getEntries(fromIndex: Index): STM[StorageException, List[LogEntry[T]]] =
log.get.map(_.drop(fromIndex.index.toInt))
def purgeFrom(index: Index): STM[StorageException, Unit] = log.get.map(l =>
l.dropRight(l.size - index.index.toInt)
)
def purgeFrom(index: Index): STM[StorageException, Unit] = log.get.map(l => l.dropRight(l.size - index.index.toInt))
}

object MemoryStorage {
Expand Down
16 changes: 6 additions & 10 deletions src/test/scala/com.ariskk.raft/ClusterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,20 @@ object ClusterSpec extends DefaultRunnableSpec {
}
} yield (cluster, fiber)

private val unitCommand = WriteCommand[Unit](Key("key"), ())
private val unitCommand = WriteCommand[Unit](Key("key"), ())
private def intCommand(i: Int) = WriteCommand[Int](Key(s"key$i"), i)

def spec = suite("ClusterSpec")(
testM("A three node cluster should be able to elect a single leader") {

lazy val program = liveCluster[Unit](3, chaos = false)
.flatMap { case (_, fiber) => fiber.interrupt }
.unit
lazy val program = liveCluster[Unit](3, chaos = false).flatMap { case (_, fiber) => fiber.interrupt }.unit

assertM(program)(equalTo(()))

},
testM("Even on adverse network conditions") {

lazy val program = liveCluster[Unit](3, chaos = true)
.flatMap { case(_, fiber) => fiber.interrupt }
.unit
lazy val program = liveCluster[Unit](3, chaos = true).flatMap { case (_, fiber) => fiber.interrupt }.unit

assertM(program)(equalTo(()))

Expand All @@ -55,7 +51,7 @@ object ClusterSpec extends DefaultRunnableSpec {

lazy val program = for {
(cluster, fiber) <- liveCluster[Unit](3, chaos = false)
_ <- cluster.submitCommand(unitCommand)
_ <- cluster.submitCommand(unitCommand)
_ <- cluster.getAllLogEntries.repeatUntil { case (_, entries) =>
entries.map(_.map(_.command)) == Seq(Seq(unitCommand), Seq(unitCommand), Seq(unitCommand))
}
Expand All @@ -68,7 +64,7 @@ object ClusterSpec extends DefaultRunnableSpec {

lazy val program = for {
(cluster, fiber) <- liveCluster[Int](3, chaos = false)
_ <- ZIO.collectAll((1 to 5).map(i => cluster.submitCommand(intCommand(i))))
_ <- ZIO.collectAll((1 to 5).map(i => cluster.submitCommand(intCommand(i))))
correctLog = (1 to 5).toSeq.map(intCommand)
_ <- cluster.getAllLogEntries.repeatUntil { case (_, entries) =>
entries.map(_.map(_.command)) == Seq(correctLog, correctLog, correctLog)
Expand All @@ -83,7 +79,7 @@ object ClusterSpec extends DefaultRunnableSpec {

/**
* Two duplicate - out of order entries in 1 node
*/
*/

lazy val program = for {
(cluster, fiber) <- liveCluster[Int](3, chaos = true)
Expand Down
6 changes: 2 additions & 4 deletions src/test/scala/com.ariskk.raft/TestCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Message._
* Relays messages between Raft consensus modules to allow for quick in-memory leader election
* and command submission testing.
* By passing `chaos = true`, one can emulate a faulty network where
* messages are reordered, dropped and delayed arbitrarily. Practically, it
* messages are reordered, dropped and delayed arbitrarily. Practically, it
* tries to test safety under non-Byzantine conditions.
* The implementation is non-deterministic on purpose as the algorithm must
* converge at all times.
Expand Down Expand Up @@ -79,9 +79,7 @@ final class TestCluster[T](nodeRef: TRef[Seq[Raft[T]]], chaos: Boolean) {
ids = nodes.map(_.nodeId)
states <- ZIO.collectAll(nodes.map(_.nodeState))
leaderId = ids.zip(states).collect { case (id, state) if state == NodeState.Leader => id }.headOption
_ <- ZIO.fromOption(leaderId).flatMap(id =>
getNode(id).flatMap(_.submitCommand(command)).unit
)
_ <- ZIO.fromOption(leaderId).flatMap(id => getNode(id).flatMap(_.submitCommand(command)).unit)
} yield ()

def getAllLogEntries = for {
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/com.ariskk.raft/TestRaft.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ object TestRaft {
val id = NodeId.newUniqueId
for {
s <- MemoryStorage.default[T]
sm <- KeyValueStore.apply[T]
sm <- KeyValueStore.apply[T]
raft <- Raft.default[T](s, sm)
} yield raft
}

def apply[T](nodeId: NodeId, peers: Set[NodeId]): UIO[Raft[T]] = for {
s <- MemoryStorage.default[T]
sm <- KeyValueStore.apply[T]
sm <- KeyValueStore.apply[T]
raft <- Raft[T](nodeId, peers, s, sm)
} yield raft

Expand Down

0 comments on commit 010755c

Please sign in to comment.