# coding: utf-8
from __future__ import unicode_literals, print_function
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from evalys import visu
import evalys.visu.legacy as vleg
from procset import ProcInt, ProcSet
from evalys.metrics import compute_load, load_mean, fragmentation_reis, fragmentation
[docs]class JobSet(object):
'''
A JobSet is a set of jobs with it state, its time properties and
the resources it is associated with.
It takes a dataframe in input that are intended to have the columns
defined in :py::`JobSet.columns`.
The `allocated_resources` one should contain the string representation
of an interval set of the allocated resources for the given job, i.e.
for this interval::
# interval_set representation
[(1, 2), (5, 5), (10, 50)]
# strinf representation
1-2 5 10-50
.. warning:: Floating point precision is set to
:py:attr:`self.float_precision` so all floating point values are
rounded with this number of digits. Defalut set to 6
For example:
>>> from evalys.jobset import JobSet
>>> js = JobSet.from_csv("./examples/jobs.csv")
>>> js.plot(with_details=True)
>>> # to show the graph
>>> # import matplotlib.pyplot as plt
>>> # plt.show()
You can also specify the the resource_bounds like this:
>>> js = JobSet.from_csv("./examples/jobs.csv",
... resource_bounds=(0, 63))
'''
def __init__(self, df, resource_bounds=None, float_precision=6):
# reset the index of the dataframe
df = df.reset_index(drop=True)
# set float round precision
self.float_precision = float_precision
self.df = np.round(df, float_precision)
if resource_bounds:
self.res_bounds = ProcInt(*resource_bounds)
else:
def alloc_apply(f, alloc):
for pset in alloc:
try:
yield f(pset)
except ValueError:
pass
self.res_bounds = ProcInt(
min(alloc_apply(lambda pset: pset.min, self.df.allocated_resources)),
max(alloc_apply(lambda pset: pset.max, self.df.allocated_resources))
)
self.MaxProcs = len(self.res_bounds)
self.df['proc_alloc'] = self.df.allocated_resources.apply(len)
# Add missing columns if possible
fillable_relative = all(
col in self.df.columns
for col in ['submission_time', 'waiting_time', 'execution_time']
)
fillable_absolute = all(
col in self.df.columns
for col in ['submission_time', 'starting_time', 'finish_time']
)
if fillable_relative:
if 'starting_time' not in self.df.columns:
self.df['starting_time'] = \
self.df['submission_time'] + self.df['waiting_time']
if 'finish_time' not in self.df.columns:
self.df['finish_time'] = \
self.df['starting_time'] + self.df['execution_time']
elif fillable_absolute:
if 'waiting_time' not in self.df.columns:
self.df['waiting_time'] = \
self.df['starting_time'] - self.df['submission_time']
if 'execution_time' not in self.df.columns:
self.df['execution_time'] = \
self.df['finish_time'] - self.df['starting_time']
if 'job_id' in self.df.columns:
self.df.rename(columns={'job_id': 'jobID'}, inplace=True)
# TODO check consistency on calculated columns...
# init cache
self._utilisation = None
self._queue = None
__converters = {
'jobID': str,
'job_id': str,
'workload': str,
'profile': str,
'allocated_resources': ProcSet.from_str,
}
columns = ['jobID',
'submission_time',
'requested_number_of_resources',
'requested_time',
'success',
'starting_time',
'execution_time',
'finish_time',
'waiting_time',
'turnaround_time',
'stretch',
'allocated_resources']
@classmethod
def from_csv(cls, filename, resource_bounds=None):
df = pd.read_csv(filename, converters=cls.__converters)
return cls(df, resource_bounds=resource_bounds)
[docs] def to_csv(self, filename):
""" Export this jobset to a csv file with a ',' as separator.
Example:
>>> from evalys.jobset import JobSet
>>> js = JobSet.from_csv("./examples/jobs.csv")
>>> js.to_csv("/tmp/jobs.csv")
"""
df = self.df.copy()
df.allocated_resources = df.allocated_resources.apply(str)
with open(filename, 'w') as f:
df.to_csv(f, index=False, sep=",",
float_format='%.{}f'.format(self.float_precision))
def gantt(self, time_scale=False, **kwargs):
if time_scale:
kwargs['xscale'] = 'time'
visu.plot_gantt(self, **kwargs)
@property
def utilisation(self):
if self._utilisation is not None:
return self._utilisation
self._utilisation = compute_load(self.df,
col_begin='starting_time',
col_end='finish_time',
col_cumsum='proc_alloc')
return self._utilisation
@property
def queue(self):
'''
Calculate cluster queue size over time in number of procs.
:returns:
a time indexed serie that contain the number of used processors
'''
# Do not re-compute everytime
if self._queue is not None:
return self._queue
proc = "requested_number_of_resources"
self._queue = compute_load(self.df, 'submission_time', 'starting_time',
proc)
return self._queue
[docs] def reset_time(self, to=0):
'''
Reset the time index by giving the first submission time as 1
'''
df = self.df
if not to:
reset_value = df['submission_time'].min() - 1
else:
reset_value = to
for col in ['starting_time', 'submission_time', 'finish_time']:
df[col] = df[col] - reset_value
self._queue = None
self._utilisation = None
def plot(self, normalize=False, with_details=False, time_scale=False,
title=None):
nrows = 2
if with_details:
nrows = nrows + 2
fig, axe = plt.subplots(nrows=nrows, sharex=True, figsize=(12, 8))
if title:
fig.suptitle(title, fontsize=16)
vleg.plot_load(self.utilisation, self.MaxProcs,
load_label="utilisation", ax=axe[0],
normalize=normalize, time_scale=time_scale)
vleg.plot_load(self.queue, self.MaxProcs,
load_label="queue", ax=axe[1], normalize=normalize,
time_scale=time_scale)
if with_details:
vleg.plot_job_details(self.df, self.MaxProcs, ax=axe[2],
time_scale=time_scale)
vleg.plot_gantt(self, ax=axe[3], time_scale=time_scale)
def detailed_utilisation(self):
df = self.free_intervals()
df['total'] = len(self.res_bounds) - df.free_itvs.apply(len)
df.set_index("time", drop=True, inplace=True)
return df
def mean_utilisation(self, begin_time=None, end_time=None):
return load_mean(self.utilisation, begin=begin_time, end=end_time)
[docs] def free_intervals(self, begin_time=0, end_time=None):
'''
:returns: a dataframe with the free resources over time. Each line
corespounding to an event in the jobset.
'''
df = self.df
# Create a list of start and stop event associated to the proc
# allocation:
# Free -> Used : grab = 1
# Used -> Free : grab = 0
event_columns = ['time', 'free_itvs', 'grab']
start_event_df = pd.concat([df['starting_time'],
df['allocated_resources'],
pd.Series(np.ones(len(df), dtype=bool))],
axis=1)
start_event_df.columns = event_columns
# Stop event have zero in grab
stop_event_df = pd.concat([df['finish_time'],
df['allocated_resources'],
pd.Series(np.zeros(len(df), dtype=bool))],
axis=1)
stop_event_df.columns = event_columns
# merge events and sort them
event_df = start_event_df.append(
stop_event_df,
ignore_index=True).sort_values(
by=['time', 'grab']).reset_index(drop=True)
# cut events if necessary
# reindex event_df
event_df = event_df.sort_values(by='time').set_index(['time'],
drop=False)
# find closest index
begin = event_df.index.searchsorted(begin_time)
if end_time is not None:
end = event_df.index.searchsorted(end_time)
else:
end = len(event_df.index) - 1
event_df = event_df.iloc[begin:end].reset_index(drop=True)
# All resources are free at the beginning
event_columns = ['time', 'free_itvs']
first_row = [begin_time, ProcSet(self.res_bounds)]
free_interval_serie = pd.DataFrame(columns=event_columns)
free_interval_serie.loc[0] = first_row
for index, row in event_df.iterrows():
current_itv = free_interval_serie.ix[index]['free_itvs']
if row.grab:
new_itv = current_itv - row.free_itvs
else:
new_itv = current_itv | row.free_itvs
new_row = [row.time, new_itv]
free_interval_serie.loc[index + 1] = new_row
if end_time is not None:
last_row = [end_time, ProcSet()]
free_interval_serie.loc[len(free_interval_serie)] = last_row
return free_interval_serie
[docs] def free_slots(self, begin_time=0, end_time=None):
'''
:returns: a DataFrame (compatible with a JobSet) that contains all
the not overlapping square free slots of this JobSet maximzing the
time. It can be transform to a JobSet to be plot as gantt chart.
'''
# slots_time contains tuple of
# (slot_begin_time,free_resources_intervals)
free_interval_serie = self.free_intervals(begin_time, end_time)
slots_time = [(free_interval_serie.time[0], ProcSet(self.res_bounds))]
new_slots_time = slots_time
columns = ['jobID', 'allocated_resources',
'starting_time', 'finish_time', 'execution_time',
'submission_time']
free_slots_df = pd.DataFrame(columns=columns)
prev_free_itvs = ProcSet(self.res_bounds)
slots = 0
for i, curr_row in free_interval_serie.iterrows():
if i == 0:
continue
new_slots_time = []
curr_time = curr_row.time
taken_resources = prev_free_itvs - curr_row.free_itvs
freed_resources = curr_row.free_itvs - prev_free_itvs
if i == len(free_interval_serie) - 1:
taken_resources = ProcSet(self.res_bounds)
if taken_resources:
# slot ends: store it and update free slot
for begin_time, itvs in slots_time:
to_update = itvs & taken_resources
if to_update:
# store new slots
slots = slots + 1
new_slot = [str(slots),
to_update,
begin_time,
curr_time,
curr_time - begin_time,
begin_time]
free_slots_df.loc[slots] = new_slot
# remove free slots
free_res = itvs - to_update
if free_res:
new_slots_time.append((begin_time, free_res))
else:
new_slots_time.append((begin_time, itvs))
if freed_resources:
# slots begin: udpate free slot
if not new_slots_time:
new_slots_time = slots_time
new_slots_time.append((curr_time, freed_resources))
# update previous
prev_free_itvs = curr_row.free_itvs
# clean slots_free
slots_time = new_slots_time
return free_slots_df
def fragmentation(self,
p=2,
resource_intervals=None,
begin_time=None,
end_time=None):
if end_time is None:
end_time = self.df.finish_time.max()
if begin_time is None:
begin_time = self.df.submission_time.min()
return fragmentation(
self.free_resources_gaps(resource_intervals,
begin_time, end_time),
p=p)
#return fragmentation_reis(
# self.free_resources_gaps(resource_intervals,
# begin_time, end_time),
# end_time - begin_time, p=p)
[docs] def free_resources_gaps(self, resource_intervals=None,
begin_time=0, end_time=None):
"""
:param resource_intervals: An interval set on which compute the
free resources gaps, Default: self.res_bounds
:returns: a resource indexed list where each element is a numpy
array of free slots.
"""
js = self
fs = js.free_slots(begin_time, end_time)
free_resources_gaps = []
if resource_intervals is None:
resource_intervals = self.res_bounds
for _ in range(resource_intervals[0], resource_intervals[1] + 1):
free_resources_gaps.append([])
def get_free_slots_by_resources(x):
for res in range(resource_intervals[0], resource_intervals[1] + 1):
if res in x.allocated_resources:
free_resources_gaps[res - resource_intervals[0]].append(x.execution_time)
# compute resource gaps
fs.apply(get_free_slots_by_resources, axis=1)
# format each gap list in numpy array
for i, fi in enumerate(free_resources_gaps):
free_resources_gaps[i] = np.asarray(fi)
return free_resources_gaps