Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix voting re-flowing #14

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
run: |
pipenv run black --check .
pipenv run isort --check-only --profile black .
- name: Run STV tests
run: pipenv run pytest ./votes/tests

build-and-push-image:
name: Build and Push Docker Image
Expand Down Expand Up @@ -64,4 +66,4 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
cache-to: type=gha,mode=max
212 changes: 129 additions & 83 deletions votes/stv.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import secrets
from decimal import ROUND_DOWN, ROUND_UP, Context, Decimal, localcontext
from enum import Enum
from fractions import Fraction
from operator import attrgetter, itemgetter
from typing import Dict, List, Set, Tuple

"""
STV calculator

Based on procedure as defined in https://prfound.org/resources/reference/reference-meek-rule/
Uses exact ratio arithmetic to prevent need to use epsilon float comparisons.
Uses float arithmetic as exact arithmetic became too expensive.
Uses a secure random generator to split ties randomly.
Unfortunately this is more likely to trigger than I'd prefer due to the small populations and single seats.
"""
Expand All @@ -35,7 +35,7 @@ class Candidate:
def __init__(self, id_: int):
self.id = id_
self.status = States.HOPEFUL
self.keep_factor: Fraction = Fraction(1)
self.keep_factor: float = 1.0

def __str__(self):
return f"{self.id}: {self.status} ({str(self.keep_factor)})"
Expand All @@ -45,10 +45,10 @@ def __repr__(self):


class Vote:
def __init__(self, candidates: Dict[int, Candidate], prefs: Tuple[int]):
def __init__(self, candidates: Dict[int, Candidate], prefs: Tuple[int, ...]):
self.prefs = tuple(map(candidates.get, prefs))

def check(self, candidates: Set[int]):
def check(self, candidates: Set[Candidate]):
if len(self.prefs) != len(set(self.prefs)):
raise ElectionError(f"Double Vote [{self.prefs}]")
for i in self.prefs:
Expand All @@ -63,7 +63,7 @@ def __repr__(self):


class Election:
def __init__(self, candidates: Set[int], votes: List[Tuple[int]], seats: int):
def __init__(self, candidates: Set[int], votes: List[Tuple[int, ...]], seats: int):
self.candidatedict = {i: Candidate(i) for i in candidates}
self.candidates = set(self.candidatedict.values())
self.votes = [Vote(self.candidatedict, i) for i in votes]
Expand All @@ -72,95 +72,114 @@ def __init__(self, candidates: Set[int], votes: List[Tuple[int]], seats: int):
self.fulllog = []
self.actlog = []
print(candidates, votes, seats)
# Huge initial value
# (surplus should never be this high in our situation (its more votes than there are people in the world))
# If this code is still used when population is this high,
# why the fuck haven't you moved this to a faster language??????
self.previous_surplus = Fraction(10000000000000000000000000, 1)
self.omega = 0.000001
for i in self.votes:
i.check(self.candidates)

def withdraw(self, candidates: Set[int]):
candidates = [self.candidatedict[cand] for cand in candidates]
for i in candidates:
i.status = States.WITHDRAWN
i.keep_factor = Fraction(0)
i.keep_factor = 0.0

def round(self):
self.rounds += 1
# B1
shortcircuit = False
electable = []
elected = []
for candidate in self.candidates:
if candidate.status == States.ELECTED or candidate.status == States.HOPEFUL:
electable.append(candidate)
if candidate.status == States.ELECTED:
elected.append(candidate)
if len(electable) <= self.seats:
for i in electable:
i.status = States.ELECTED
shortcircuit = True
elif len(elected) == self.seats:
for candidate in self.candidates:
if candidate.status == States.HOPEFUL:
candidate.status = States.DEFEATED
shortcircuit = True

# B2a
wastage = Fraction(0)
scores = {k: Fraction(0) for k in self.candidates}
for vote in self.votes:
weight: Fraction = Fraction(1)
for candidate in vote.prefs:
delta: Fraction = weight * candidate.keep_factor
scores[candidate] += delta
weight -= delta
wastage += weight

# Check all votes accounted for
assert wastage + sum(scores.values()) == len(self.votes)

# B2b
quota = Fraction(sum(scores.values()), self.seats + 1)

if shortcircuit:
# Defer shortcircuit until after scores calculated to log one extra line
self._log(scores, wastage, quota)
self._report()
raise StopIteration("Election Finished")

# B2c
elected = False
for candidate in self.candidates:
if candidate.status == States.HOPEFUL and scores[candidate] > quota:
candidate.status = States.ELECTED
elected = True

# B2d
surplus = Fraction(0)
for candidate in self.candidates:
if candidate.status == States.ELECTED:
surplus += scores[candidate] - quota

# B2e
if elected:
self.previous_surplus = surplus
self._log(scores, wastage, quota)
return

if surplus == 0 or surplus >= self.previous_surplus:
# B3
sorted_results = sorted(
filter(lambda x: x[0].status == States.HOPEFUL, scores.items()),
key=itemgetter(1),
)
min_score = sorted_results[0][1]
eliminated_candidate: Candidate = self._choose(
list(filter(lambda x: x[1] == min_score, sorted_results))
converged = False
wastage = 0.0
scores = {k: 0.0 for k in self.candidates}
quota = None

previous_surplus = float("+Infinity")
while not converged:
# B2a
wastage = 0.0
scores = {k: 0.0 for k in self.candidates}
for vote in self.votes:
weight: float = 1.0
for candidate in vote.prefs:
delta: float = weight * candidate.keep_factor
scores[candidate] += delta
weight -= delta
if weight == 0:
continue
wastage += weight

# Check all votes accounted for
assert (
len(self.votes) - self.omega
<= wastage + sum(scores.values())
<= len(self.votes) + self.omega
)
eliminated_candidate.status = States.DEFEATED
eliminated_candidate.keep_factor = Fraction(0)
else:
# B2f

# B2b
quota = sum(scores.values()) / (self.seats + 1) + 0.000000001

if shortcircuit:
# Defer shortcircuit until after scores calculated to log one extra line
self._log(scores, wastage, quota)
self._report()
raise StopIteration("Election Finished")

# B2c
elected = False
for candidate in self.candidates:
if candidate.status == States.HOPEFUL and scores[candidate] > quota:
candidate.status = States.ELECTED
elected = True

# B2d
surplus = 0.0
for candidate in self.candidates:
if candidate.status == States.ELECTED:
candidate.keep_factor = Fraction(
candidate.keep_factor * quota, scores[candidate]
)
self.previous_surplus = surplus
surplus += scores[candidate] - quota

# B2e
if elected:
self._log(scores, wastage, quota)
return

if surplus < self.omega or surplus >= previous_surplus:
converged = True
else:
# B2f
for candidate in self.candidates:
if candidate.status == States.ELECTED:
candidate.keep_factor = (
candidate.keep_factor * quota
) / scores[candidate]
previous_surplus = surplus

# B3
sorted_results = sorted(
filter(lambda x: x[0].status == States.HOPEFUL, scores.items()),
key=itemgetter(1),
)
min_score = sorted_results[0][1]
eliminated_candidate: Candidate = self._choose(
list(filter(lambda x: x[1] <= min_score + self.omega, sorted_results))
)
eliminated_candidate.status = States.DEFEATED
eliminated_candidate.keep_factor = 0.0

self._log(scores, wastage, quota)

def _choose(self, candidates):
Expand Down Expand Up @@ -193,28 +212,28 @@ def _log(self, scores, wastage, quota):
self._addlog(self.rounds)
self._addlog("======")
candstates = {}
for i in self.candidates:
for i in sorted(self.candidates, key=attrgetter("id")):
assert isinstance(i, Candidate)
self._addlog("Candidate:", i.id, i.keep_factor.limit_denominator(1000))
self._addlog("Candidate:", i.id, i.keep_factor)
self._addlog("Status:", str(i.status))
self._addlog("Votes:", str(scores[i].limit_denominator(1000)))
self._addlog("Votes:", str(scores[i]))
self._addlog()
candstates[str(i.id)] = {
"keep_factor": float(i.keep_factor.limit_denominator(1000)),
"keep_factor": float(i.keep_factor),
"status": str(i.status),
"votes": float(scores[i].limit_denominator(1000)),
"votes": float(scores[i]),
}
self._addlog("Wastage:", str(wastage.limit_denominator(1000)))
self._addlog("Threshold:", str(quota.limit_denominator(1000)))
self._addlog("Wastage:", str(wastage))
self._addlog("Threshold:", str(quota))
self._addlog()

self._addaction(
"round",
{
"round": self.rounds,
"candidates": candstates,
"wastage": float(wastage.limit_denominator(1000)),
"threshold": float(quota.limit_denominator(1000)),
"wastage": float(wastage),
"threshold": float(quota),
},
)

Expand All @@ -239,14 +258,14 @@ def _report(self):

def full_election(self):
# Log initial state
scores = {k: Fraction(0) for k in self.candidates}
wastage = Fraction(0)
scores = {k: Decimal(0) for k in self.candidates}
wastage = Decimal(0)
for vote in self.votes:
if len(vote.prefs) > 0:
scores[vote.prefs[0]] += 1
else:
wastage += 1
quota = Fraction(sum(scores.values()), self.seats + 1)
quota = Decimal(sum(scores.values())) / Decimal(self.seats + 1)
self._log(scores, wastage, quota)

try:
Expand All @@ -262,3 +281,30 @@ def winners(self):
filter(lambda x: x.status == States.ELECTED, self.candidates),
)
)


class DeterministicElection(Election):
def __init__(self, *args, random_picks=None, **kwargs):
super().__init__(*args, **kwargs)
self.random_picks = random_picks

def _choose(self, candidates):
if len(candidates) > 1:
# Fix random choice to known behaviour. Only useful for tests
i = self.random_picks[self.rounds]
a = [candidate[0] for candidate in candidates if candidate[0].id == i][0]
self._addlog("-Tiebreak-")
self._addlog("! DETERMINISTIC PICK ! DEBUG ONLY ! DETERMINISTIC PICK !")
self._addlog(a)
self._addlog()
self._addaction(
"tiebreak",
{
"round": self.rounds,
"candidates": [str(candidate[0].id) for candidate in candidates],
"choice": str(a.id),
},
)
else:
a = candidates[0][0]
return a
10 changes: 10 additions & 0 deletions votes/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pathlib import Path

import pytest


@pytest.fixture
def data():
test_dir = Path(__file__).absolute().parent

return test_dir / "data"
8 changes: 8 additions & 0 deletions votes/tests/data/42.blt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
3 2
4 1 2 0
2 3 0
0
"Castor"
"Pollux"
"Helen"
"Pollux and Helen should tie"
9 changes: 9 additions & 0 deletions votes/tests/data/513.blt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
3 2
5 1 2 0
1 2 0
3 3 0
0
"Castor"
"Pollux"
"Helen"
"Pollux and Helen should tie"
Loading
Loading