Skip to content
Merged
33 changes: 33 additions & 0 deletions accounting/filters/BaseFilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,25 @@
import importlib


RESOURCE_TYPES = ["Cpus", "Memory", "Disk", "Gpus"]
FLOORED_RESOURCE_FIELD = "FlooredRequest{resource}"
FLOORED_RESOURCE_SCRIPT = """
if (
doc.containsKey("{resource}Provisioned") &&
doc["{resource}Provisioned"].size() > 0 &&
doc.containsKey("Request{resource}") &&
doc["Request{resource}"].size() > 0 &&
doc["{resource}Provisioned"].value < doc["Request{resource}"].value
) {{
emit(doc["{resource}Provisioned"].value);
}} else if (
doc.containsKey("Request{resource}") &&
doc["Request{resource}"].size() > 0) {{
emit(doc["Request{resource}"].value);
}}
"""


class BaseFilter:
name = "job history"

Expand Down Expand Up @@ -96,6 +115,20 @@ def get_query(self, index, start_ts, end_ts, scroll=None, size=500):
}
}
}

# Add floored resource requests (INF-3590)
fields = []
runtime_mappings = {}
for resource in RESOURCE_TYPES:
fields.append(FLOORED_RESOURCE_FIELD.format(resource=resource))
runtime_mappings[FLOORED_RESOURCE_FIELD.format(resource=resource)] = {
"type": "long",
"script": {
"source": FLOORED_RESOURCE_SCRIPT.format(resource=resource)
}
}
query["fields"] = fields
query["runtime_mappings"] = runtime_mappings
return query

def user_filter(self, data, doc):
Expand Down
33 changes: 24 additions & 9 deletions accounting/filters/ChtcScheddCpuFilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ def schedd_filter(self, data, doc):
# Get input dict
i = doc["_source"]

# Get computed fields (as single values instead of arrays)
f = {k: v[0] for k, v in doc.get("fields", {}).items()}

# Get output dict for this schedd
schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN"
o = data["Schedds"][schedd]
Expand Down Expand Up @@ -188,9 +191,9 @@ def schedd_filter(self, data, doc):
# Compute job units
if i.get("RemoteWallClockTime", 0) > 0:
o["NumJobUnits"].append(get_job_units(
cpus=i.get("RequestCpus", 1),
memory_gb=i.get("RequestMemory", 1024)/1024,
disk_gb=i.get("RequestDisk", 1024**2)/1024**2,
cpus=f.get("FlooredRequestCpus", 1),
memory_gb=f.get("FlooredRequestMemory", 1024)/1024,
disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2,
))
else:
o["NumJobUnits"].append(None)
Expand All @@ -202,6 +205,8 @@ def schedd_filter(self, data, doc):
o[attr].append(int(float(i.get(attr))))
except TypeError:
o[attr].append(None)
elif attr.startswith("Request"):
o[attr].append(f.get(f"Floored{attr}", i.get(attr, None)))
else:
o[attr].append(i.get(attr, None))

Expand All @@ -210,6 +215,9 @@ def user_filter(self, data, doc):
# Get input dict
i = doc["_source"]

# Get computed fields (as single values instead of arrays)
f = {k: v[0] for k, v in doc.get("fields", {}).items()}

# Get output dict for this user
user = i.get("User", "UNKNOWN") or "UNKNOWN"
o = data["Users"][user]
Expand Down Expand Up @@ -254,9 +262,9 @@ def user_filter(self, data, doc):
# Compute job units
if i.get("RemoteWallClockTime", 0) > 0:
o["NumJobUnits"].append(get_job_units(
cpus=i.get("RequestCpus", 1),
memory_gb=i.get("RequestMemory", 1024)/1024,
disk_gb=i.get("RequestDisk", 1024**2)/1024**2,
cpus=f.get("FlooredRequestCpus", 1),
memory_gb=f.get("FlooredRequestMemory", 1024)/1024,
disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2,
))
else:
o["NumJobUnits"].append(None)
Expand All @@ -271,6 +279,8 @@ def user_filter(self, data, doc):
o[attr].append(int(float(i.get(attr))))
except TypeError:
o[attr].append(None)
elif attr.startswith("Request"):
o[attr].append(f.get(f"Floored{attr}", i.get(attr, None)))
else:
o[attr].append(i.get(attr, None))

Expand All @@ -280,6 +290,9 @@ def project_filter(self, data, doc):
# Get input dict
i = doc["_source"]

# Get computed fields (as single values instead of arrays)
f = {k: v[0] for k, v in doc.get("fields", {}).items()}

# Get output dict for this project
project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN"
o = data["Projects"][project]
Expand Down Expand Up @@ -324,9 +337,9 @@ def project_filter(self, data, doc):
# Compute job units
if i.get("RemoteWallClockTime", 0) > 0:
o["NumJobUnits"].append(get_job_units(
cpus=i.get("RequestCpus", 1),
memory_gb=i.get("RequestMemory", 1024)/1024,
disk_gb=i.get("RequestDisk", 1024**2)/1024**2,
cpus=f.get("FlooredRequestCpus", 1),
memory_gb=f.get("FlooredRequestMemory", 1024)/1024,
disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2,
))
else:
o["NumJobUnits"].append(None)
Expand All @@ -338,6 +351,8 @@ def project_filter(self, data, doc):
o[attr].append(int(float(i.get(attr))))
except TypeError:
o[attr].append(None)
elif attr.startswith("Request"):
o[attr].append(f.get(f"Floored{attr}", i.get(attr, None)))
else:
o[attr].append(i.get(attr, None))

Expand Down
77 changes: 41 additions & 36 deletions accounting/filters/ChtcScheddCpuMonthlyFilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,40 +77,36 @@ def get_query(self, index, start_ts, end_ts, **kwargs):
# (Dict has same structure as the REST API query language)
query = super().get_query(index, start_ts, end_ts, **kwargs)

query.update({
"body": {
"query": {
"bool": {
"filter": [
{"range": {
"RecordTime": {
"gte": start_ts,
"lt": end_ts,
}
}},
{"regexp": {
"ScheddName.keyword": ".*[.]chtc[.]wisc[.]edu"
}}
],
"must_not": [
{"terms": {
"JobUniverse": [7, 12]
}},
],
}
}
query["body"]["query"].update({
"bool": {
"filter": [
{"range": {
"RecordTime": {
"gte": start_ts,
"lt": end_ts,
}
}},
{"regexp": {
"ScheddName.keyword": ".*[.]chtc[.]wisc[.]edu"
}}
],
"must_not": [
{"terms": {
"JobUniverse": [7, 12]
}},
],
}
})
return query

def reduce_data(self, i, o, t, is_site=False):
def reduce_data(self, i, f, o, t, is_site=False):

is_removed = i.get("JobStatus") == 3
is_dagnode = i.get("DAGNodeName") is not None
is_exec = i.get("NumJobStarts", 0) >= 1
is_multiexec = i.get("NumJobStarts", 0) > 1
has_holds = i.get("NumHolds", 0) > 0
is_over_rqst_disk = i.get("DiskUsage", 0) > i.get("RequestDisk", 1000)
is_over_rqst_disk = i.get("DiskUsage", 0) > f.get("FlooredRequestDisk", 1000)
is_singularity_job = i.get("SingularityImage") is not None
has_activation_duration = i.get("activationduration") is not None
if has_activation_duration:
Expand Down Expand Up @@ -143,9 +139,9 @@ def reduce_data(self, i, o, t, is_site=False):
elif not is_removed:
goodput_time = int(float(i.get("lastremotewallclocktime", i.get("CommittedTime", 0))))
job_units = get_job_units(
cpus=i.get("RequestCpus", 1),
memory_gb=i.get("RequestMemory", 1024)/1024,
disk_gb=i.get("RequestDisk", 1024**2)/1024**2,
cpus=f.get("FlooredRequestCpus", 1),
memory_gb=f.get("FlooredRequestMemory", 1024)/1024,
disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2,
)
input_files = 0
input_bytes = 0
Expand Down Expand Up @@ -221,9 +217,9 @@ def reduce_data(self, i, o, t, is_site=False):
sum_cols["TotalActivationSetupDuration"] = int(activation_setup_duration)

sum_cols["TotalLongJobWallClockTime"] = long_job_wallclock_time
sum_cols["GoodCpuTime"] = (goodput_time * max(i.get("RequestCpus", 1), 1))
sum_cols["CpuTime"] = (i.get("RemoteWallClockTime", 0) * max(i.get("RequestCpus", 1), 1))
sum_cols["BadCpuTime"] = ((i.get("RemoteWallClockTime", 0) - goodput_time) * max(i.get("RequestCpus", 1), 1))
sum_cols["GoodCpuTime"] = (goodput_time * max(f.get("FlooredRequestCpus", 1), 1))
sum_cols["CpuTime"] = (i.get("RemoteWallClockTime", 0) * max(f.get("FlooredRequestCpus", 1), 1))
sum_cols["BadCpuTime"] = ((i.get("RemoteWallClockTime", 0) - goodput_time) * max(f.get("FlooredRequestCpus", 1), 1))
sum_cols["JobUnitTime"] = job_units * i.get("RemoteWallClockTime", 0)
sum_cols["NumShadowStarts"] = i.get("NumShadowStarts", 0)
sum_cols["NumJobStarts"] = i.get("NumJobStarts", 0)
Expand All @@ -245,11 +241,11 @@ def reduce_data(self, i, o, t, is_site=False):

max_cols = {}
max_cols["MaxLongJobWallClockTime"] = long_job_wallclock_time
max_cols["MaxRequestMemory"] = i.get("RequestMemory", 0)
max_cols["MaxRequestMemory"] = f.get("FlooredRequestMemory", 0)
max_cols["MaxMemoryUsage"] = i.get("MemoryUsage", 0)
max_cols["MaxRequestDisk"] = i.get("RequestDisk", 0)
max_cols["MaxRequestDisk"] = f.get("FlooredRequestDisk", 0)
max_cols["MaxDiskUsage"] = i.get("DiskUsage", 0)
max_cols["MaxRequestCpus"] = i.get("RequestCpus", 1)
max_cols["MaxRequestCpus"] = f.get("FlooredRequestCpus", 1)
max_cols["MaxJobUnits"] = job_units

min_cols = {}
Expand All @@ -270,24 +266,30 @@ def schedd_filter(self, data, doc):
# Get input dict
i = doc["_source"]

# Get computed fields (as single values instead of arrays)
f = {k: v[0] for k, v in doc.get("fields", {}).items()}

# Get output dict for this schedd
schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN"
output = data["Schedds"][schedd]
total = data["Schedds"]["TOTAL"]

self.reduce_data(i, output, total)
self.reduce_data(i, f, output, total)

def user_filter(self, data, doc):

# Get input dict
i = doc["_source"]

# Get computed fields (as single values instead of arrays)
f = {k: v[0] for k, v in doc.get("fields", {}).items()}

# Get output dict for this user
user = i.get("User", "UNKNOWN") or "UNKNOWN"
output = data["Users"][user]
total = data["Users"]["TOTAL"]

self.reduce_data(i, output, total)
self.reduce_data(i, f, output, total)

counter_cols = {}
counter_cols["ScheddNames"] = i.get("ScheddName", "UNKNOWN") or "UNKNOWN"
Expand All @@ -307,12 +309,15 @@ def project_filter(self, data, doc):
# Get input dict
i = doc["_source"]

# Get computed fields (as single values instead of arrays)
f = {k: v[0] for k, v in doc.get("fields", {}).items()}

# Get output dict for this project
project = i.get("ProjectName", i.get("projectname", i.get("projectname", "UNKNOWN"))) or "UNKNOWN"
output = data["Projects"][project]
total = data["Projects"]["TOTAL"]

self.reduce_data(i, output, total)
self.reduce_data(i, f, output, total)

dict_cols = {}
dict_cols["Users"] = i.get("User", "UNKNOWN") or "UNKNOWN"
Expand Down
33 changes: 24 additions & 9 deletions accounting/filters/ChtcScheddCpuOspoolFilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ def schedd_filter(self, data, doc):
# Get input dict
i = doc["_source"]

# Get computed fields (as single values instead of arrays)
f = {k: v[0] for k, v in doc.get("fields", {}).items()}

# Get output dict for this schedd
schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN"
o = data["Schedds"][schedd]
Expand Down Expand Up @@ -288,9 +291,9 @@ def schedd_filter(self, data, doc):
# Compute job units
if i.get("RemoteWallClockTime", 0) > 0:
o["NumJobUnits"].append(get_job_units(
cpus=i.get("RequestCpus", 1),
memory_gb=i.get("RequestMemory", 1024)/1024,
disk_gb=i.get("RequestDisk", 1024**2)/1024**2,
cpus=f.get("FlooredRequestCpus", 1),
memory_gb=f.get("FlooredRequestMemory", 1024)/1024,
disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2,
))
else:
o["NumJobUnits"].append(None)
Expand All @@ -302,6 +305,8 @@ def schedd_filter(self, data, doc):
o[attr].append(int(float(i.get(attr))))
except TypeError:
o[attr].append(None)
elif attr.startswith("Request"):
o[attr].append(f.get(f"Floored{attr}", i.get(attr, None)))
else:
o[attr].append(i.get(attr, None))

Expand All @@ -310,6 +315,9 @@ def user_filter(self, data, doc):
# Get input dict
i = doc["_source"]

# Get computed fields (as single values instead of arrays)
f = {k: v[0] for k, v in doc.get("fields", {}).items()}

# Get output dict for this user
user = i.get("User", "UNKNOWN") or "UNKNOWN"
o = data["Users"][user]
Expand Down Expand Up @@ -359,9 +367,9 @@ def user_filter(self, data, doc):
# Compute job units
if i.get("RemoteWallClockTime", 0) > 0:
o["NumJobUnits"].append(get_job_units(
cpus=i.get("RequestCpus", 1),
memory_gb=i.get("RequestMemory", 1024)/1024,
disk_gb=i.get("RequestDisk", 1024**2)/1024**2,
cpus=f.get("FlooredRequestCpus", 1),
memory_gb=f.get("FlooredRequestMemory", 1024)/1024,
disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2,
))
else:
o["NumJobUnits"].append(None)
Expand All @@ -376,6 +384,8 @@ def user_filter(self, data, doc):
o[attr].append(int(float(i.get(attr))))
except TypeError:
o[attr].append(None)
elif attr.startswith("Request"):
o[attr].append(f.get(f"Floored{attr}", i.get(attr, None)))
else:
o[attr].append(i.get(attr, None))

Expand All @@ -385,6 +395,9 @@ def project_filter(self, data, doc):
# Get input dict
i = doc["_source"]

# Get computed fields (as single values instead of arrays)
f = {k: v[0] for k, v in doc.get("fields", {}).items()}

# Get output dict for this project
project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN"
o = data["Projects"][project]
Expand Down Expand Up @@ -433,9 +446,9 @@ def project_filter(self, data, doc):
# Compute job units
if i.get("RemoteWallClockTime", 0) > 0:
o["NumJobUnits"].append(get_job_units(
cpus=i.get("RequestCpus", 1),
memory_gb=i.get("RequestMemory", 1024)/1024,
disk_gb=i.get("RequestDisk", 1024**2)/1024**2,
cpus=f.get("FlooredRequestCpus", 1),
memory_gb=f.get("FlooredRequestMemory", 1024)/1024,
disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2,
))
else:
o["NumJobUnits"].append(None)
Expand All @@ -447,6 +460,8 @@ def project_filter(self, data, doc):
o[attr].append(int(float(i.get(attr))))
except TypeError:
o[attr].append(None)
elif attr.startswith("Request"):
o[attr].append(f.get(f"Floored{attr}", i.get(attr, None)))
else:
o[attr].append(i.get(attr, None))

Expand Down
Loading