Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,12 @@ private Map<String, Set<WorkerSlot>> hostToUsedSlots(Cluster cluster) {
return hostUsedSlots;
}

// returns list of list of slots, reverse sorted by number of slots
// Returns list of hosts with their assignable slots, sorted by:
// 1. (primary) total assignable slots, descending — same as before
// 2. (secondary) currently free slots, descending — prefer hosts that
// need fewer evictions
// 3. (tertiary) host name, ascending — deterministic order
// for testability
private LinkedList<HostAssignableSlots> hostAssignableSlots(Cluster cluster) {
List<WorkerSlot> assignableSlots = cluster.getAssignableSlots();
Map<String, List<WorkerSlot>> hostAssignableSlots = new HashMap<String, List<WorkerSlot>>();
Expand All @@ -316,14 +321,36 @@ private LinkedList<HostAssignableSlots> hostAssignableSlots(Cluster cluster) {
}
slots.add(slot);
}

final Map<String, Integer> hostFreeSlotCount = new HashMap<String, Integer>();
for (WorkerSlot slot : cluster.getAvailableSlots()) {
String host = cluster.getHost(slot.getNodeId());
if (hostAssignableSlots.containsKey(host)) {
Integer count = hostFreeSlotCount.get(host);
hostFreeSlotCount.put(host, (count == null ? 0 : count) + 1);
}
}

List<HostAssignableSlots> sortHostAssignSlots = new ArrayList<HostAssignableSlots>();
for (Map.Entry<String, List<WorkerSlot>> entry : hostAssignableSlots.entrySet()) {
sortHostAssignSlots.add(new HostAssignableSlots(entry.getKey(), entry.getValue()));
}
Collections.sort(sortHostAssignSlots, new Comparator<HostAssignableSlots>() {
@Override
public int compare(HostAssignableSlots o1, HostAssignableSlots o2) {
return o2.getWorkerSlots().size() - o1.getWorkerSlots().size();
int bySlots = o2.getWorkerSlots().size() - o1.getWorkerSlots().size();
if (bySlots != 0) {
return bySlots;
}
int free1 = hostFreeSlotCount.containsKey(o1.getHostName())
? hostFreeSlotCount.get(o1.getHostName()) : 0;
int free2 = hostFreeSlotCount.containsKey(o2.getHostName())
? hostFreeSlotCount.get(o2.getHostName()) : 0;
int byFree = free2 - free1;
if (byFree != 0) {
return byFree;
}
return o1.getHostName().compareTo(o2.getHostName());
}
});

Expand Down
Loading