-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcoresched.py
More file actions
381 lines (292 loc) · 12.8 KB
/
coresched.py
File metadata and controls
381 lines (292 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# $Id$
# $URL$
"""Whole core scheduling
"""
import logger
import os
glo_coresched_simulate = False
class CoreSched:
""" Whole-core scheduler
The main entrypoint is adjustCores(self, slivers) which takes a
dictionary of sliver records. The cpu_cores field is pulled from the
effective rspec (rec["_rspec"]) for each sliver.
If cpu_cores > 0 for a sliver, then that sliver will reserve one or
more of the cpu_cores on the machine.
One core is always left unreserved for system slices.
"""
def __init__(self, cgroup_var_name="cpuset.cpus", slice_attr_name="cpu_cores"):
self.cpus = []
self.cgroup_var_name = cgroup_var_name
self.slice_attr_name = slice_attr_name
self.cgroup_mem_name = "cpuset.mems"
self.mems=[]
self.mems_map={}
self.cpu_siblings={}
def get_cgroup_var(self, name=None, filename=None):
""" decode cpuset.cpus or cpuset.mems into a list of units that can
be reserved.
"""
assert(filename!=None or name!=None)
if filename==None:
filename="/dev/cgroup/" + name
data = open(filename).readline().strip()
if not data:
return []
units = []
# cpuset.cpus could be something as arbitrary as:
# 0,1,2-3,4,5-6
# deal with commas and ranges
for part in data.split(","):
unitRange = part.split("-")
if len(unitRange) == 1:
unitRange = (unitRange[0], unitRange[0])
for i in range(int(unitRange[0]), int(unitRange[1])+1):
if not i in units:
units.append(i)
return units
def get_cpus(self):
""" return a list of available cpu identifiers: [0,1,2,3...]
"""
# the cpus never change, so if it's already been computed then don't
# worry about it.
if self.cpus!=[]:
return self.cpus
self.cpus = self.get_cgroup_var(self.cgroup_var_name)
self.cpu_siblings = {}
for item in self.cpus:
self.cpu_siblings[item] = self.get_core_siblings(item)
return self.cpus
def find_cpu_mostsiblings(self, cpus):
bestCount = -1
bestCpu = -1
for cpu in cpus:
count = 0
for candidate in self.cpu_siblings[cpu]:
if candidate in cpus:
count = count + 1
if (count > bestCount):
bestCount = count
bestCpu = cpu
assert(bestCpu >= 0)
return bestCpu
def find_compatible_cpu(self, cpus, compatCpu):
if compatCpu==None:
return self.find_cpu_mostsiblings(cpus)
# find a sibling if we can
bestDelta = None
bestCpu = None
for cpu in cpus:
if compatCpu in self.cpu_siblings[cpu]:
return cpu
return self.find_cpu_mostsiblings(cpus)
def get_cgroups (self):
""" return a list of cgroups
this might change as vservers are instantiated, so always compute
it dynamically.
"""
cgroups = []
filenames = os.listdir("/dev/cgroup")
for filename in filenames:
if os.path.isdir(os.path.join("/dev/cgroup", filename)):
cgroups.append(filename)
return cgroups
def decodeCoreSpec (self, cores):
""" Decode the value of the core attribute. It's a number, followed by
an optional letter "b" to indicate besteffort cores should also
be supplied.
"""
bestEffort = False
if cores.endswith("b"):
cores = cores[:-1]
bestEffort = True
try:
cores = int(cores)
except ValueError:
cores = 0
return (cores, bestEffort)
def adjustCores (self, slivers):
""" slivers is a dict of {sliver_name: rec}
rec is a dict of attributes
rec['_rspec'] is the effective rspec
"""
cpus = self.get_cpus()[:]
mems = self.get_mems()[:]
memSchedule=True
if (len(mems) != len(cpus)):
logger.log("CoreSched fewer mems than " + self.cgroup_var_name + "; mem scheduling disabled")
memSchedule=False
logger.log("CoreSched (" + self.cgroup_var_name + "): available units: " + str(cpus))
reservations = {}
mem_reservations = {}
# allocate the cores to the slivers that have them reserved
# TODO: Need to sort this from biggest cpu_cores to smallest
for name, rec in slivers.iteritems():
rspec = rec["_rspec"]
cores = rspec.get(self.slice_attr_name, 0)
(cores, bestEffort) = self.decodeCoreSpec(cores)
lastCpu = None
while (cores>0):
# one cpu core reserved for best effort and system slices
if len(cpus)<=1:
logger.log("CoreSched: ran out of units while scheduling sliver " + name)
else:
cpu = self.find_compatible_cpu(cpus, lastCpu)
cpus.remove(cpu)
lastCpu = cpu
logger.log("CoreSched: allocating unit " + str(cpu) + " to slice " + name)
reservations[name] = reservations.get(name,[]) + [cpu]
# now find a memory node to go with the cpu
if memSchedule:
mem = self.find_associated_memnode(mems, cpu)
if mem != None:
mems.remove(mem)
logger.log("CoreSched: allocating memory node " + str(mem) + " to slice " + name)
mem_reservations[name] = mem_reservations.get(name,[]) + [mem]
else:
logger.log("CoreSched: failed to find memory node for cpu" + str(cpu))
cores = cores-1
# the leftovers go to everyone else
logger.log("CoreSched: allocating unit " + str(cpus) + " to _default")
reservations["_default"] = cpus[:]
mem_reservations["_default"] = mems[:]
# now check and see if any of our slices had the besteffort flag
# set
for name, rec in slivers.iteritems():
rspec = rec["_rspec"]
cores = rspec.get(self.slice_attr_name, 0)
(cores, bestEffort) = self.decodeCoreSpec(cores)
# if the bestEffort flag isn't set then we have nothing to do
if not bestEffort:
continue
# note that if a reservation is [], then we don't need to add
# bestEffort cores to it, since it is bestEffort by default.
if reservations.get(name,[]) != []:
reservations[name] = reservations[name] + reservations["_default"]
mem_reservations[name] = mem_reservations.get(name,[]) + mem_reservations["_default"]
logger.log("CoreSched: adding besteffort units to " + name + ". new units = " + str(reservations[name]))
self.reserveUnits(self.cgroup_var_name, reservations)
self.reserveUnits(self.cgroup_mem_name, mem_reservations)
def reserveUnits (self, var_name, reservations):
""" give a set of reservations (dictionary of slicename:cpuid_list),
write those reservations to the appropriate cgroup files.
reservations["_default"] is assumed to be the default reservation
for slices that do not reserve cores. It's essentially the leftover
cpu cores.
"""
default = reservations["_default"]
# set the default vserver cpuset. this will deal with any vservers
# that might be created before the nodemanager has had a chance to
# update the cpusets.
self.reserveDefault(var_name, default)
for cgroup in self.get_cgroups():
if cgroup in reservations:
cpus = reservations[cgroup]
logger.log("CoreSched: reserving " + var_name + " on " + cgroup + ": " + str(cpus))
else:
# no log message for default; too much verbosity in the common case
cpus = default
if glo_coresched_simulate:
print "R", "/dev/cgroup/" + cgroup + "/" + var_name, self.listToRange(cpus)
else:
file("/dev/cgroup/" + cgroup + "/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
def reserveDefault (self, var_name, cpus):
if not os.path.exists("/etc/vservers/.defaults/cgroup"):
os.makedirs("/etc/vservers/.defaults/cgroup")
if glo_coresched_simulate:
print "RDEF", "/etc/vservers/.defaults/cgroup/" + var_name, self.listToRange(cpus)
else:
file("/etc/vservers/.defaults/cgroup/" + var_name, "w").write( self.listToRange(cpus) + "\n" )
def listToRange (self, list):
""" take a list of items [1,2,3,5,...] and return it as a range: "1-3,5"
for now, just comma-separate
"""
return ",".join( [str(i) for i in list] )
def get_mems(self):
""" return a list of available cpu identifiers: [0,1,2,3...]
"""
# the cpus never change, so if it's already been computed then don't
# worry about it.
if self.mems!=[]:
return self.mems
self.mems = self.get_cgroup_var(self.cgroup_mem_name)
# build a mapping from memory nodes to the cpus they can be used with
mems_map={}
for item in self.mems:
mems_map[item] = self.get_memnode_cpus(item)
if (len(mems_map)>0):
# when NUMA_EMU is enabled, only the last memory node will contain
# the cpu_map. For example, if there were originally 2 nodes and
# we used NUM_EMU to raise it to 12, then
# mems_map[0]=[]
# ...
# mems_map[4]=[]
# mems_map[5]=[1,3,5,7,9,11]
# mems_map[6]=[]
# ...
# mems_map[10]=[]
# mems_map[11]=[0,2,4,6,8,10]
# so, we go from back to front, copying the entries as necessary.
if mems_map[self.mems[0]] == []:
work = []
for item in reversed(self.mems):
if mems_map[item]!=[]:
work = mems_map[item]
else: # mems_map[item]==[]
mems_map[item] = work
self.mems_map = mems_map
return self.mems
def find_associated_memnode(self, mems, cpu):
""" Given a list of memory nodes and a cpu, see if one of the nodes in
the list can be used with that cpu.
"""
for item in mems:
if cpu in self.mems_map[item]:
return item
return None
def get_memnode_cpus(self, index):
""" for a given memory node, return the CPUs that it is associated
with.
"""
fn = "/sys/devices/system/node/node" + str(index) + "/cpulist"
if not os.path.exists(fn):
logger.log("CoreSched: failed to locate memory node" + fn)
return []
return self.get_cgroup_var(filename=fn)
def get_core_siblings(self, index):
# use core_siblings rather than core_siblings_list, as it's compatible
# with older kernels
fn = "/sys/devices/system/cpu/cpu" + str(index) + "/topology/core_siblings"
if not os.path.exists(fn):
return []
siblings = []
x = int(open(fn,"rt").readline().strip(),16)
cpuid = 0
while (x>0):
if (x&1)!=0:
siblings.append(cpuid)
x = x >> 1
cpuid += 1
return siblings
# a little self-test
if __name__=="__main__":
glo_coresched_simulate = True
x = CoreSched()
print "cgroups:", ",".join(x.get_cgroups())
print "cpus:", x.listToRange(x.get_cpus())
print "sibling map:"
for item in x.get_cpus():
print " ", item, ",".join([str(y) for y in x.cpu_siblings.get(item,[])])
print "mems:", x.listToRange(x.get_mems())
print "cpu to memory map:"
for item in x.get_mems():
print " ", item, ",".join([str(y) for y in x.mems_map.get(item,[])])
rspec_sl_test1 = {"cpu_cores": "1"}
rec_sl_test1 = {"_rspec": rspec_sl_test1}
rspec_sl_test2 = {"cpu_cores": "5"}
rec_sl_test2 = {"_rspec": rspec_sl_test2}
rspec_sl_test3 = {"cpu_cores": "3b"}
rec_sl_test3 = {"_rspec": rspec_sl_test3}
#slivers = {"sl_test1": rec_sl_test1, "sl_test2": rec_sl_test2}
slivers = {"arizona_beta": rec_sl_test1, "arizona_test101": rec_sl_test2, "pl_sirius": rec_sl_test3}
#slivers = {"arizona_beta": rec_sl_test1, "arizona_logmon": rec_sl_test2, "arizona_owl": rec_sl_test3}
x.adjustCores(slivers)