Skip to content

Commit

Permalink
Merge pull request #2 from ariskk/feature/append-entries
Browse files Browse the repository at this point in the history
Feature/append entries
  • Loading branch information
ariskk authored Oct 7, 2020
2 parents 7afa9c1 + 010755c commit 2e1e21f
Show file tree
Hide file tree
Showing 23 changed files with 779 additions and 336 deletions.
374 changes: 269 additions & 105 deletions src/main/scala/com.ariskk.raft/Raft.scala

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions src/main/scala/com.ariskk.raft/Server.scala

This file was deleted.

5 changes: 5 additions & 0 deletions src/main/scala/com.ariskk.raft/model/ComandResponse.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.ariskk.raft.model

sealed trait CommandResponse
case object Committed extends CommandResponse
case class Redirect(leaderId: NodeId) extends CommandResponse
7 changes: 7 additions & 0 deletions src/main/scala/com.ariskk.raft/model/Command.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.ariskk.raft.model

case class Key(value: String) extends AnyVal

sealed trait Command[T]
case class ReadCommand[T](key: Key) extends Command[T]
case class WriteCommand[T](key: Key, value: T) extends Command[T]
7 changes: 7 additions & 0 deletions src/main/scala/com.ariskk.raft/model/Index.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.ariskk.raft.model

case class Index(index: Long) extends AnyVal {
def decrement = Index(index - 1L)
def increment = Index(index + 1)
def >(otherIndex: Index) = index > otherIndex.index
}
21 changes: 21 additions & 0 deletions src/main/scala/com.ariskk.raft/model/LogEntry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ariskk.raft.model

import com.ariskk.raft.utils.Utils

case class LogEntry[T](
id: LogEntry.Id,
command: Command[T],
term: Term
)

object LogEntry {
case class Id(value: String) extends AnyVal

def newUniqueId = Id(Utils.newPrefixedId("entry"))

def apply[T](command: Command[T], term: Term): LogEntry[T] = LogEntry[T](
newUniqueId,
command,
term
)
}
54 changes: 33 additions & 21 deletions src/main/scala/com.ariskk.raft/model/Message.scala
Original file line number Diff line number Diff line change
@@ -1,43 +1,55 @@
package com.ariskk.raft.model

// TODO Model fron/to better, those are easy to mess up
import com.ariskk.raft.utils.Utils

sealed trait Message {
def from: RaftNode.Id
def to: RaftNode.Id
def from: NodeId
def to: NodeId
def term: Term
}

object Message {

final case class VoteRequest(
from: RaftNode.Id,
to: RaftNode.Id,
term: Term
from: NodeId,
to: NodeId,
term: Term,
lastLogIndex: Index,
lastLogTerm: Term
) extends Message

final case class VoteResponse(
from: RaftNode.Id,
to: RaftNode.Id,
from: NodeId,
to: NodeId,
term: Term,
granted: Boolean
) extends Message

final case class Heartbeat(
from: RaftNode.Id,
to: RaftNode.Id,
term: Term
) extends Message
object AppendEntries {
case class Id(value: String) extends AnyVal

final case class HeartbeatAck(
from: RaftNode.Id,
to: RaftNode.Id,
term: Term
def newUniqueId = Id(Utils.newPrefixedId("append"))
}

final case class AppendEntries[T](
appendId: AppendEntries.Id,
from: NodeId,
to: NodeId,
term: Term,
prevLogIndex: Index,
prevLogTerm: Term,
leaderCommitIndex: Index,
entries: Seq[LogEntry[T]]
) extends Message

final case class AppendEntries(
from: RaftNode.Id,
to: RaftNode.Id,
term: Term
final case class AppendEntriesResponse(
from: NodeId,
to: NodeId,
appendId: AppendEntries.Id,
term: Term,
prevLogIndex: Index,
lastInsertedIndex: Index,
success: Boolean
) extends Message

}
9 changes: 9 additions & 0 deletions src/main/scala/com.ariskk.raft/model/NodeId.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.ariskk.raft.model

import com.ariskk.raft.utils.Utils

final case class NodeId(value: String) extends AnyVal

object NodeId {
def newUniqueId: NodeId = NodeId(Utils.newPrefixedId("node"))
}
9 changes: 6 additions & 3 deletions src/main/scala/com.ariskk.raft/model/RaftException.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.ariskk.raft.model

sealed trait RaftException extends Throwable
case object QueueFullError extends RaftException
case class InvalidStateException(message: String) extends RaftException
sealed trait RaftException extends Throwable
case class InvalidStateException(message: String) extends RaftException
case class InvalidCommandException(message: String) extends RaftException
case class StorageException(message: String) extends RaftException
case object LeaderNotFoundException extends RaftException
case class StateMachineException(message: String) extends RaftException
107 changes: 0 additions & 107 deletions src/main/scala/com.ariskk.raft/model/RaftNode.scala

This file was deleted.

3 changes: 2 additions & 1 deletion src/main/scala/com.ariskk.raft/model/Term.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ final case class Term(term: Long) extends AnyVal {
}

object Term {
lazy val Zero = Term(0)
lazy val Zero = Term(0)
lazy val Invalid = Term(Long.MinValue)
}
2 changes: 1 addition & 1 deletion src/main/scala/com.ariskk.raft/model/Vote.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package com.ariskk.raft.model

case class Vote(peerId: RaftNode.Id, term: Term)
case class Vote(peerId: NodeId, term: Term)
127 changes: 127 additions & 0 deletions src/main/scala/com.ariskk.raft/state/VolatileState.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.ariskk.raft.state

import zio.stm._
import zio.UIO

import com.ariskk.raft.model._

final class VolatileState(
val nodeId: NodeId,
peers: TSet[NodeId],
currentLeader: TRef[Option[NodeId]],
state: TRef[NodeState],
votesReceived: TSet[Vote],
votesRejected: TSet[Vote],
commitIndex: TRef[Index],
lastApplied: TRef[Index],
nextIndex: TMap[NodeId, Index],
matchIndex: TMap[NodeId, Index]
) {
def stand(newTerm: Term) = for {
_ <- votesReceived.removeIf(_.term != newTerm)
_ <- votesRejected.removeIf(_.term != newTerm)
_ <- state.set(NodeState.Candidate)
_ <- addVote(Vote(nodeId, newTerm))
} yield ()

def peerList = peers.toList

def nextIndexForPeer(peerId: NodeId) = nextIndex.get(peerId)

def matchIndexForPeer(peerId: NodeId) = matchIndex.get(peerId)

def updateMatchIndex(peerId: NodeId, index: Index) =
matchIndex.put(peerId, index)

def updateCommitIndex(index: Index) = commitIndex.set(index)

def updateNextIndex(peerId: NodeId, index: Index) =
nextIndex.put(peerId, index)

def decrementNextIndex(peerId: NodeId) = for {
next <- nextIndexForPeer(peerId)
nextIndex = next.map(x => if (x == Index(0)) x else x.decrement).getOrElse(Index(0))
_ <- updateNextIndex(peerId, nextIndex)
} yield ()

def matchIndexEntries = matchIndex.toList

def initPeerIndices(lastIndex: Index) = for {
peers <- peerList
_ <- ZSTM.collectAll(peers.map(nextIndex.put(_, lastIndex.increment)))
_ <- ZSTM.collectAll(peers.map(matchIndex.put(_, Index(0))))
} yield ()

def nodeState = state.get

def addPeer(id: NodeId) = peers.put(id)

def removePeer(id: NodeId) = peers.delete(id)

def becomeFollower = state.set(NodeState.Follower)

def becomeLeader = for {
_ <- state.set(NodeState.Leader)
_ <- setLeader(nodeId)
} yield ()

def setLeader(leaderId: NodeId) = currentLeader.set(Option(leaderId))

def leader = currentLeader.get

def addVote(vote: Vote) = for {
_ <- votesReceived.retainIf(_.term == vote.term)
_ <- votesReceived.put(vote)
set <- votesReceived.toList
peers <- peers.toList
hasMajority = 2 * set.size > peers.size + 1
_ <- if (hasMajority) becomeLeader else ZSTM.unit
} yield hasMajority

def addVoteRejection(vote: Vote) = for {
_ <- votesRejected.retainIf(_.term == vote.term)
_ <- votesRejected.put(vote)
set <- votesRejected.toList
peers <- peers.toList
hasLost = 2 * set.size > peers.size + 1
_ <- if (hasLost) becomeFollower else ZSTM.unit
} yield hasLost

def hasLost(term: Term) = for {
vr <- votesRejected.toList
peers <- peerList
rejections = vr.filter(_.term == term)
} yield 2 * rejections.size > peers.size + 1

def isLeader = state.map(_ == NodeState.Leader).get
def isFollower = state.map(_ == NodeState.Follower).get
def isCandidate = state.map(_ == NodeState.Candidate).get

def lastCommitIndex = commitIndex.get

}

object VolatileState {
def apply[T](nodeId: NodeId, peers: Set[NodeId]): UIO[VolatileState] = for {
peerRef <- TSet.make[NodeId](peers.toSeq: _*).commit
leaderRef <- TRef.makeCommit[Option[NodeId]](None)
stateRef <- TRef.makeCommit[NodeState](NodeState.Follower)
votesReceivedRef <- TSet.empty[Vote].commit
votesRejectedRef <- TSet.empty[Vote].commit
commitIndex <- TRef.makeCommit(Index(-1L))
lastApplied <- TRef.makeCommit(Index(-1L))
nextIndex <- TMap.fromIterable(peers.map(p => (p, Index(0)))).commit
matchIndex <- TMap.fromIterable(peers.map(p => (p, Index(-1)))).commit
} yield new VolatileState(
nodeId,
peerRef,
leaderRef,
stateRef,
votesReceivedRef,
votesRejectedRef,
commitIndex,
lastApplied,
nextIndex,
matchIndex
)
}
Loading

0 comments on commit 2e1e21f

Please sign in to comment.