Source code for evalys.metrics
import pandas as pd
from math import sqrt
[docs]def cumulative_waiting_time(dataframe):
'''
Compute the cumulative waiting time on the given dataframe
:dataframe: a DataFrame that contains a "starting_time" and a
"waiting_time" column.
'''
# Avoid side effect
df = pd.DataFrame.copy(dataframe)
df['starting_time'] = df['submission_time'] + df['waiting_time']
df_sorted_by_starting_time = df.sort_values(by='starting_time')
wt_cumsum = df_sorted_by_starting_time.waiting_time.cumsum()
wt_cumsum.name = "cumulative waiting time"
# Sort by starting time
wt_cumsum.index = df_sorted_by_starting_time['starting_time']
return wt_cumsum
[docs]def compute_load(dataframe, col_begin, col_end, col_cumsum,
begin_time=0, end_time=None):
"""
Compute the load of the `col_cumsum` columns between events from
`col_begin` to `col_end`. In practice it is used to compute the queue
load and the cluster load (utilisation).
:returns: a load dataframe of all events indexed by time with a `load`
and an `area` column.
"""
# Avoid side effect
df = pd.DataFrame.copy(dataframe)
df['starting_time'] = df['submission_time'] + df['waiting_time']
df['finish_time'] = df['starting_time'] + df['execution_time']
df = df.sort_values(by=col_begin)
# Cleaning:
# - still running jobs (runtime = -1)
# - not scheduled jobs (wait = -1)
# - no procs allocated (proc_alloc = -1)
max_time = df['finish_time'].max() + 1000
df.ix[df['execution_time'] == -1, 'finish_time'] = max_time
df.ix[df['execution_time'] == -1, 'starting_time'] = max_time
if 'proc_alloc' in df:
df = df[df['proc_alloc'] > 0]
# Create a list of start and stop event associated to the number of
# proc allocation changes: starts add procs, stop remove procs
event_columns = ['time', col_cumsum, 'jobID']
start_event_df = pd.concat([df[col_begin],
df[col_cumsum],
df['jobID']],
axis=1)
start_event_df.columns = event_columns
# Stop event give negative proc_alloc value
stop_event_df = pd.concat([df[col_end],
- df[col_cumsum],
df['jobID']],
axis=1)
stop_event_df.columns = event_columns
# merge events and sort them
event_df = start_event_df.append(
stop_event_df,
ignore_index=True).sort_values(by='time').reset_index(drop=True)
# sum the event that happend at the same time and cummulate events
load_df = pd.DataFrame(
event_df.groupby(event_df['time'])[col_cumsum].sum().cumsum(),
columns=[col_cumsum])
load_df["time"] = load_df.index
# compute area
load_df["area"] = - load_df["time"].diff(-1) * load_df[col_cumsum]
del load_df["time"]
load_df.columns = ["load", "area"]
return load_df
def _load_insert_element_if_necessary(load_df, at):
"""
Insert an event at the specified point that conserve data consistency
for "area" and "load" values
"""
if len(load_df[load_df.time == at]) == 0:
prev_el = load_df[load_df.time <= at].tail(1)
new_el = prev_el.copy()
next_el = load_df[load_df.time >= at].head(1)
new_el.time = at
new_el.area = float(new_el.load) * float(next_el.time - at)
load_df.loc[prev_el.index, "area"] = \
float(prev_el.load) * float(at - prev_el.time)
load_df.loc[len(load_df)] = [
float(new_el.time),
float(new_el.load),
float(new_el.area)]
load_df = load_df.sort_values(by=["time"])
return load_df
[docs]def load_mean(df, begin=None, end=None):
""" Compute the mean load area from begin to end. """
load_df = df.reset_index()
max_to = max(load_df.time)
if end is None:
end = max_to
elif end > max_to:
raise ValueError("computing mean load after the "
"last event ({}) is NOT IMPLEMENTED".format(max_to))
min_to = load_df.time.iloc[0]
if begin is None:
begin = min_to
elif begin < min_to:
raise ValueError("computing mean load befor the "
"first event ({}) is NOT IMPLEMENTED".format(min_to))
load_df = _load_insert_element_if_necessary(load_df, begin)
load_df = _load_insert_element_if_necessary(load_df, end)
u = load_df[(load_df.time < end) & (begin <= load_df.time)]
return u.area.sum()/(end - begin)
[docs]def fragmentation(free_resources_gaps, p=2):
"""
Input is a resource indexed list where each element is a numpy
array of free slots.
This metrics definition comes from Gher and Shneider CCGRID 2009.
"""
f = free_resources_gaps
frag = pd.Series()
for i, fi in enumerate(f):
if fi.size == 0:
frag_i = 0
else:
frag_i = 1 - (sum(fi**p) / sum(fi)**p)
frag.set_value(i, frag_i)
return frag
def fragmentation_reis(free_resources_gaps, time, p=2):
f = free_resources_gaps
frag = pd.Series()
for i, fi in enumerate(f):
if fi.size == 0:
frag_i = 0
else:
frag_i = 1 - (sqrt(sum(fi**p)) / time * len(f))
frag.set_value(i, frag_i)
return frag