-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmaster_script.py
More file actions
226 lines (202 loc) · 10.1 KB
/
master_script.py
File metadata and controls
226 lines (202 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/env python3
import csv
import sys
import random
# -------------------------------
# Data Structures
# -------------------------------
class Ballot:
def __init__(self, preferences, weight=1.0):
# Each ballot stores the ordered list of candidate preferences.
self.preferences = [pref.strip() for pref in preferences]
self.weight = round(weight, 5)
self.current_index = 0 # pointer to current active preference
def get_current_candidate(self, candidates_status):
# Return the next candidate on the ballot who is still active.
while self.current_index < len(self.preferences):
candidate = self.preferences[self.current_index]
if candidates_status.get(candidate, 'active') == 'active':
return candidate
else:
self.current_index += 1
return None # ballot exhausted
# -------------------------------
# STV Election Function (Revised)
# -------------------------------
def stv_election(ballots, seats):
# Build the set of candidates from all ballots.
candidate_set = set()
for ballot in ballots:
for pref in ballot.preferences:
candidate_set.add(pref)
# Initialize candidate state.
candidates = {
cand: {
'status': 'active', # 'active', 'elected', or 'eliminated'
'votes': 0.0, # current round vote total
'ballots': [] # ballots allocated to this candidate
}
for cand in candidate_set
}
# For tie‑breaking, record vote totals from previous rounds.
candidate_history = {cand: [] for cand in candidate_set}
total_ballots = len(ballots)
# Compute the threshold (quota) using ballots / (seats + 1)
threshold = round(total_ballots / (seats + 1), 5)
print("Total ballots:", total_ballots)
print("Seats:", seats)
print("Threshold:", threshold)
elected_candidates = [] # List to record elected candidates in order.
eliminated_candidates = [] # List for eliminated candidates.
round_num = 0
# All ballots are initially active.
active_ballots = ballots[:]
while len(elected_candidates) < seats and any(candidates[c]['status'] == 'active' for c in candidates):
round_num += 1
print("\n----- Round", round_num, "-----")
# Reset vote counts and ballot lists for active candidates.
for cand in candidates:
if candidates[cand]['status'] == 'active':
candidates[cand]['votes'] = 0.0
candidates[cand]['ballots'] = []
# Count votes by allocating each active ballot to its current active candidate.
for ballot in active_ballots:
current = ballot.get_current_candidate({c: candidates[c]['status'] for c in candidates})
if current:
candidates[current]['votes'] = round(candidates[current]['votes'] + ballot.weight, 5)
candidates[current]['ballots'].append(ballot)
# Record round vote totals for tie‑breaking.
for cand in candidates:
if candidates[cand]['status'] == 'active':
candidate_history[cand].append(candidates[cand]['votes'])
else:
candidate_history[cand].append(0.0)
# -------------------------------
# Display round vote totals (only show candidates that are active or elected)
# -------------------------------
for cand in candidates:
if candidates[cand]['status'] != 'eliminated':
print(f"{cand}: {candidates[cand]['votes']} ({candidates[cand]['status']})")
# -------------------------------
# Elect candidates who meet/exceed the threshold.
# Process them in descending order of votes.
# -------------------------------
to_elect = [cand for cand in candidates if candidates[cand]['status'] == 'active' and candidates[cand]['votes'] >= threshold]
to_elect.sort(key=lambda c: candidates[c]['votes'], reverse=True)
for cand in to_elect:
if len(elected_candidates) < seats:
candidates[cand]['status'] = 'elected'
elected_candidates.append(cand)
print(f"\nCandidate '{cand}' elected with {candidates[cand]['votes']} votes.")
# Process surplus transfer if the candidate's votes exceed the threshold.
if candidates[cand]['votes'] > threshold:
surplus = round(candidates[cand]['votes'] - threshold, 5)
transfer_value = round(surplus / candidates[cand]['votes'], 5)
print(f"Transferring surplus of {surplus} votes from '{cand}' with transfer value {transfer_value}.")
# Copy the ballots and then remove them from active_ballots
transferred_ballots = candidates[cand]['ballots'][:]
new_active = []
for ballot in transferred_ballots:
new_weight = round(ballot.weight * transfer_value, 5)
if new_weight > 0:
ballot.weight = new_weight
ballot.current_index += 1 # move to next preference
new_active.append(ballot)
# Remove transferred ballots from active_ballots so they're not double counted.
active_ballots = [b for b in active_ballots if b not in transferred_ballots]
# Add the reweighted ballots to the active pool.
active_ballots.extend(new_active)
if len(elected_candidates) == seats:
break # Stop if we have filled all seats.
if len(elected_candidates) == seats:
break
# If the number of remaining active candidates equals the number of seats left, elect them automatically.
remaining_seats = seats - len(elected_candidates)
active_cands = [c for c in candidates if candidates[c]['status'] == 'active']
if remaining_seats == len(active_cands):
for cand in active_cands:
candidates[cand]['status'] = 'elected'
elected_candidates.append(cand)
print(f"\nCandidate '{cand}' elected by default (only remaining candidate).")
break
# -------------------------------
# If no candidate was elected in this round, eliminate the candidate with the fewest votes.
# -------------------------------
if not to_elect:
active_cands = [c for c in candidates if candidates[c]['status'] == 'active']
if not active_cands:
break
min_votes = min(candidates[c]['votes'] for c in active_cands)
tied = [c for c in active_cands if candidates[c]['votes'] == min_votes]
if len(tied) > 1:
# Use previous round vote totals to break tie.
for prev in range(round_num - 2, -1, -1):
prev_votes = [(c, candidate_history[c][prev]) for c in tied]
min_prev = min(v for c, v in prev_votes)
tied = [c for c, v in prev_votes if v == min_prev]
if len(tied) == 1:
break
if len(tied) > 1:
eliminated = random.choice(tied)
print(f"\nTie in elimination among {tied}; randomly eliminating '{eliminated}'.")
else:
eliminated = tied[0]
else:
eliminated = tied[0]
candidates[eliminated]['status'] = 'eliminated'
eliminated_candidates.append(eliminated)
print(f"\nCandidate '{eliminated}' eliminated with {candidates[eliminated]['votes']} votes.")
# Remove ballots allocated to the eliminated candidate and transfer them.
transferred_ballots = candidates[eliminated]['ballots'][:]
new_active = []
for ballot in transferred_ballots:
ballot.current_index += 1 # move to next preference
new_active.append(ballot)
# Remove those ballots from active_ballots.
active_ballots = [b for b in active_ballots if b not in transferred_ballots]
active_ballots.extend(new_active)
# Prepare for next round: keep only ballots that still have an active candidate.
active_ballots = [b for b in active_ballots if b.get_current_candidate({c: candidates[c]['status'] for c in candidates}) is not None]
# -------------------------------
# Final Results
# -------------------------------
print("\n===== Final Results =====")
print("Elected candidates (in order):")
for cand in elected_candidates:
print(" -", cand)
if eliminated_candidates:
print("\nEliminated candidates:")
for cand in eliminated_candidates:
print(" -", cand)
return elected_candidates
# -------------------------------
# CSV Ballot Reader
# -------------------------------
def read_ballots_from_csv(file_path):
ballots = []
with open(file_path, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
# The "Candidates" column is expected to be a comma‐separated list.
prefs = row["Candidates"].split(",")
if len(prefs) < 5:
print("Warning: ballot with fewer than five preferences:", prefs)
ballots.append(Ballot(preferences=prefs))
return ballots
# -------------------------------
# Main Entry Point
# -------------------------------
def main():
if len(sys.argv) < 3:
print("Usage: python stv.py <ballots_csv_file> <number_of_seats>")
sys.exit(1)
file_path = sys.argv[1]
try:
seats = int(sys.argv[2])
except ValueError:
print("Number of seats must be an integer.")
sys.exit(1)
ballots = read_ballots_from_csv(file_path)
stv_election(ballots, seats)
if __name__ == "__main__":
main()