# coding: utf-8
from __future__ import unicode_literals, print_function
import matplotlib
import matplotlib.dates
import matplotlib.patches as mpatch
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import random
from . import core
from .. import metrics
available_series = ['bonded_slowdown', 'waiting_time', 'all']
def annotate(ax, rect, annot):
rx, ry = rect.get_xy()
cx = rx + rect.get_width() / 2.0
cy = ry + rect.get_height() / 2.0
ax.annotate(annot, (cx, cy), color='black',
fontsize='small', ha='center', va='center')
[docs]def map_unique_numbers(df):
"""Map the DataFrame of jobs to a set of jobs which should be labeled and a list of unique ids
for the given DataFrame.
Jobs which have the same jobID and workload_name will be merged together and the same
unique_id will be assigned to them. The set of labeled_jobs will only contain the job
in the middle of each list of jobs sharing the same id.
"""
labeled_jobs = set()
unique_numbers = []
# Jobs start their number with 1
number_counter = 1
numbers_map = {}
jobs_for_unique_number = {}
for index, row in df.iterrows():
workload_name = str(row["workload_name"])
job_id = str(row["jobID"])
full_job_id = workload_name + "!" + job_id
job_intervals=row['allocated_resources']
try:
# The job id was already in the workload: re-use the same unique id.
unique_number = numbers_map[full_job_id]
list_of_jobs = jobs_for_unique_number[full_job_id]
except KeyError:
# The job id is new: generate a new unique number for this
# workload_name!jobID combination.
unique_number = number_counter
numbers_map[full_job_id] = number_counter
number_counter += 1
jobs_for_unique_number[full_job_id] = list_of_jobs = []
if job_intervals:
list_of_jobs.append((index, row))
unique_numbers.append(unique_number)
for k, v in jobs_for_unique_number.items():
# If there are jobs for this job id which have job intervals:
# search for the element in the middle and add its index to the set.
if v:
index, row = v[len(v)//2]
labeled_jobs.add(index)
return labeled_jobs, unique_numbers
def plot_gantt(jobset, ax=None, title="Gantt chart",
labels=True, palette=None, alpha=0.4,
time_scale=False,
color_function=None,
label_function=None):
# Palette generation if needed
if palette is None:
palette = core.generate_palette(8)
assert(len(palette) > 0)
if color_function is None:
def color_randrobin_select(job, palette):
return palette[job.unique_number % len(palette)]
color_function = color_randrobin_select
if label_function is None:
def job_id_label(job):
return job['jobID']
label_function = job_id_label
# Get current axe to plot
if ax is None:
ax = plt.gca()
df = jobset.df.copy()
labeled_jobs, unique_numbers = map_unique_numbers(df)
df["unique_number"] = unique_numbers
if time_scale:
df['submission_time'] = pd.to_datetime(df['submission_time'], unit='s')
df['starting_time'] = pd.to_datetime(df['starting_time'], unit='s')
df['execution_time'] = pd.to_timedelta(df['execution_time'], unit='s')
def plot_job(job):
col = color_function(job, palette)
duration = job['execution_time']
for itv in job['allocated_resources'].intervals():
(y0, y1) = itv
x0 = job['starting_time']
if time_scale:
# Convert date to matplotlib float representation
x0 = matplotlib.dates.date2num(x0.to_pydatetime())
finish_time = matplotlib.dates.date2num(
job['starting_time'] + job['execution_time']
)
duration = finish_time - x0
rect = mpatch.Rectangle((x0, y0), duration,
y1 - y0 + 0.9,
alpha=alpha,
facecolor=col,
edgecolor='black',
linewidth=0.5)
if labels:
if job.name in labeled_jobs:
annotate(ax, rect, str(label_function(job)))
ax.add_artist(rect)
# apply for all jobs
df.apply(plot_job, axis=1)
# set graph limits, grid and title
ax.set_xlim(df['submission_time'].min(), (
df['starting_time'] + df['execution_time']).max())
ax.set_ylim(jobset.res_bounds[0]-1, jobset.res_bounds[1]+2)
ax.grid(True)
ax.set_title(title)
def plot_pstates(pstates, x_horizon, ax=None, palette=None,
off_pstates=None,
son_pstates=None,
soff_pstates=None):
# palette generation if needed
if palette is None:
palette = ["#000000", "#56ae6c", "#ba495b"]
assert(len(palette) >= 3)
labels = ["OFF", "switch ON", "switch OFF"]
alphas = [0.6, 1, 1]
if off_pstates is None:
off_pstates = set()
if son_pstates is None:
son_pstates = set()
if soff_pstates is None:
soff_pstates = set()
# Get current axe to plot
if ax is None:
ax = plt.gca()
interesting_pstates = off_pstates | son_pstates | soff_pstates
for _, job in pstates.pseudo_jobs.iterrows():
if job['pstate'] in interesting_pstates:
if job['pstate'] in off_pstates:
col_id = 0
elif job['pstate'] in son_pstates:
col_id = 1
elif job['pstate'] in soff_pstates:
col_id = 2
color = palette[col_id]
alpha = alphas[col_id]
label = labels[col_id]
interval_list = pstates.intervals[job['interval_id']]
for machine_interval in interval_list:
(y0, y1) = machine_interval
(b, e) = (job['begin'], min(job['end'], x_horizon))
rect = mpatch.Rectangle((b, y0), e - b, y1 - y0 + 0.9,
color=color, alpha=alpha,
label=label)
ax.add_artist(rect)
def plot_mstates(mstates_df, ax=None, title=None, palette=None, reverse=True):
# Parameter handling
if palette is None:
# Colorblind palette
palette = ["#000000", "#56ae6c", "#ba495b", "#000000", "#8960b3"]
stack_order = ['nb_sleeping', 'nb_switching_on', 'nb_switching_off',
'nb_idle', 'nb_computing']
alphas = [0.6, 1, 1, 0, 0.3]
assert(len(palette) == len(stack_order)), \
"Palette should be of size {}".format(len(stack_order))
# Get current axe to plot
if ax is None:
ax = plt.gca()
# Should the display order be reversed?
if reverse:
palette = palette[::-1]
stack_order = stack_order[::-1]
alphas = alphas[::-1]
# Computing temporary date to compute the stacked area
y = np.row_stack(tuple([mstates_df[x] for x in stack_order]))
y = np.cumsum(y, axis=0)
# Plotting
first_i = 0
ax.fill_between(mstates_df['time'], 0, y[first_i, :],
facecolor=palette[first_i], alpha=alphas[first_i],
step='post', label=stack_order[first_i])
for index, _ in enumerate(stack_order[1:]):
ax.fill_between(mstates_df['time'], y[index, :], y[index+1, :],
facecolor=palette[index+1], alpha=alphas[index+1],
step='post',
label=stack_order[index+1])
if title is not None:
ax.set_title(title)
def plot_gantt_pstates(jobset, pstates, ax, title, labels=True,
off_pstates=None,
son_pstates=None,
soff_pstates=None):
if off_pstates is None:
off_pstates = set()
if son_pstates is None:
son_pstates = set()
if soff_pstates is None:
soff_pstates = set()
plot_gantt(jobset, ax, title, labels, palette=["#8960b3"], alpha=0.3)
fpb = pstates.pseudo_jobs.loc[pstates.pseudo_jobs['end'] < float('inf')]
ax.set_xlim(min(jobset.df.submission_time.min(), fpb.begin.min()),
max(jobset.df.finish_time.max(), fpb.end.max()))
ax.set_ylim(min(jobset.res_bounds[0], pstates.res_bounds[0]),
max(jobset.res_bounds[1], pstates.res_bounds[1]))
ax.grid(True)
ax.set_title(title)
plot_pstates(pstates, ax.get_xlim()[1], ax,
off_pstates=off_pstates,
son_pstates=son_pstates,
soff_pstates=soff_pstates)
[docs]def plot_processor_load(jobset, ax=None, title="Load", labels=True):
"""
Display the impact of each job on the load of each processor.
need: execution_time, jobID, allocated_resources
"""
# Get current axe to plot
if ax is None:
ax = plt.gca()
def _draw_rect(ax, base, width, height, color, label):
rect = mpatch.Rectangle(base, width, height, alpha=0.2, color=color)
if label:
annotate(ax, rect, label)
ax.add_artist(rect)
RGB_tuples = core.generate_palette(16)
load = {
p: 0.0 for p in range(jobset.res_bounds[0], jobset.res_bounds[1] + 1)
}
for row in jobset.df.itertuples():
color = RGB_tuples[row.Index % len(RGB_tuples)]
duration = row.execution_time
label = row.jobID if labels else None
baseproc = next(iter(row.allocated_resources))
base = (baseproc, load[baseproc])
width = 0 # width is incremented in the first loop iteration
for proc in row.allocated_resources:
if base[0] + width != proc or load[proc] != base[1]:
# we cannot merge across processors: draw the current
# rectangle, and start anew
_draw_rect(ax, base, width, duration, color, label)
base = (proc, load[proc])
width = 1
else:
# we can merge across processors: extend width, and continue
width += 1
load[proc] += duration
# draw last pending rectangle if necessary
if width > 0:
_draw_rect(ax, base, width, duration, color, label)
ax.set_xlim(jobset.res_bounds)
ax.set_ylim(0, 1.02 * max(load.values()))
ax.grid(True)
ax.set_title(title)
ax.set_xlabel('proc. id')
ax.set_ylabel('load / s')
[docs]def plot_series(series_type, jobsets, ax=None, time_scale=False):
'''
Plot one or several time series about provided jobsets on the given ax
series_type can be any value present in available_series.
'''
# Get current axe to plot
if ax is None:
ax = plt.gca()
if series_type not in available_series:
raise AttributeError(
"The gieven attribute should be one of the folowing:"
"{}".format(available_series))
if series_type == "waiting_time":
series = {}
for jobset_name in jobsets.keys():
jobset = jobsets[jobset_name]
# create a serie
series[jobset_name] = metrics.cumulative_waiting_time(jobset.df)
if time_scale:
series[jobset_name].index = pd.to_datetime(
jobset.df['submission_time'] + jobset.df['waiting_time'],
unit='s')
# plot series
for serie_name, serie in series.items():
serie.plot(ax=ax, label=serie_name, drawstyle="steps")
else:
raise RuntimeError('The serie \"{}\" is not implemeted yet')
# Manage legend
ax.legend()
ax.set_title(series_type)
ax.grid(True)
[docs]def plot_gantt_general_shape(jobset_list, ax=None, alpha=0.3,
title="Gantt general shape"):
'''
Draw a general gantt shape of multiple jobsets on one plot for comparison
'''
# Get current axe to plot
if ax is None:
ax = plt.gca()
color_index = 0
RGB_tuples = core.generate_palette(len(jobset_list))
legend_rect = []
legend_label = []
xmin = None
xmax = None
for jobset_name, jobset in jobset_list.items():
# generate color
color = RGB_tuples[color_index % len(RGB_tuples)]
color_index += 1
# generate legend
legend_rect.append(
mpatch.Rectangle((0, 1), 12, 10, alpha=alpha, color=color))
legend_label.append(jobset_name)
def plot_job(job):
duration = job['execution_time']
for itv in job['allocated_resources'].intervals():
(y0, y1) = itv
rect = mpatch.Rectangle((job['starting_time'], y0), duration,
y1 - y0 + 0.9, alpha=alpha,
color=color)
ax.add_artist(rect)
# apply for all jobs
jobset.df.apply(plot_job, axis=1)
# compute graphical boundaries
if not xmin or jobset.df.submission_time.min() < xmin:
xmin = jobset.df.submission_time.min()
if not xmax or jobset.df.finish_time.max() < xmax:
xmax = jobset.df.finish_time.max()
# do include legend
ax.legend(legend_rect, legend_label, loc='center',
bbox_to_anchor=(0.5, 1.06),
fancybox=True, shadow=True, ncol=5)
ax.set_xlim((xmin, xmax))
# use last jobset of the previous loop to set the resource bounds assuming
# that all the gantt have the same number of resources
ax.set_ylim(jobset.res_bounds[0]-1, jobset.res_bounds[1]+2)
ax.grid(True)
ax.set_title(title)
def plot_job_details(dataframe, size, ax=None, title="Job details",
time_scale=False, time_offset=0):
# TODO manage also the Jobset case
# Get current axe to plot
if ax is None:
ax = plt.gca()
# Avoid side effect
df = pd.DataFrame.copy(dataframe)
df = df.sort_values(by='jobID')
df['submission_time'] = df['submission_time'] + time_offset
df['starting_time'] = df['submission_time'] + df['waiting_time']
df['finish_time'] = df['starting_time'] + df['execution_time']
to_plot = [('starting_time', 'green', '>', size),
('submission_time', 'blue', '.', 0),
('finish_time', 'red', '|', size * 2)]
lines = [['submission_time', 'starting_time', 'blue', 0, size],
['starting_time', 'finish_time', 'green', size, size * 2]]
if time_scale:
# interpret columns with time aware semantics
df['submission_time'] = pd.to_datetime(df['submission_time'], unit='s')
df['starting_time'] = pd.to_datetime(df['starting_time'], unit='s')
df['finish_time'] = pd.to_datetime(df['finish_time'], unit='s')
# convert columns to use them with matplotlib
df['submission_time'] = df['submission_time'].map(matplotlib.dates.date2num)
df['starting_time'] = df['starting_time'].map(matplotlib.dates.date2num)
df['finish_time'] = df['finish_time'].map(matplotlib.dates.date2num)
# select the axe
plt.sca(ax)
# plot lines
# add jitter
jitter = size / 20
random.seed(a=0)
new_proc_alloc = df['proc_alloc'].apply(
lambda x: x + random.uniform(-jitter, jitter))
for begin, end, color, treshold_begin, treshold_end in lines:
for i, item in df.iterrows():
x_begin = item[begin]
x_end = item[end]
plt.plot([x_begin, x_end], [new_proc_alloc[i] + treshold_begin,
new_proc_alloc[i] + treshold_end],
color=color, linestyle='-', linewidth=1, alpha=0.2)
# plot one point per serie
for serie, color, marker, treshold in to_plot:
x = df[serie]
y = new_proc_alloc + treshold
plt.scatter(x, y, c=color, marker=marker,
s=60, label=serie, alpha=0.5)
ax.grid(True)
ax.legend()
ax.set_title(title)
if time_scale:
ax.xaxis.set_major_formatter(
matplotlib.dates.DateFormatter('%Y-%m-%d\n%H:%M:%S')
)
[docs]def plot_series_comparison(series, ax=None, title="Series comparison"):
''' Plot and compare two serie in post step '''
assert len(series) == 2
# Get current axe to plot
if ax is None:
ax = plt.gca()
first_serie_name = list(series.keys())[0]
first_serie = list(series.values())[0]
first_serie.plot(drawstyle="steps-post", ax=ax, label=first_serie_name)
second_serie_name = list(series.keys())[1]
second_serie = list(series.values())[1]
second_serie.plot(drawstyle="steps-post", ax=ax, label=second_serie_name)
df = pd.DataFrame(series, index=first_serie.index).fillna(method='ffill')
y1 = df[first_serie_name]
y2 = df[second_serie_name]
ax.fill_between(df.index, y1, y2, where=y2 < y1, facecolor='red',
step='post', alpha=0.5,
label=first_serie_name + ">" + second_serie_name)
ax.fill_between(df.index, y1, y2, where=y2 > y1, facecolor='green',
step='post', alpha=0.5,
label=first_serie_name + "<" + second_serie_name)
ax.grid(True)
ax.set_title(title)
[docs]def plot_fragmentation(frag, ax=None, label="Fragmentation"):
"""
Plot fragmentation raw data, distribution and ecdf in 3 subplots
given in the ax list
fragmentation can be optain using fragmentation method
"""
# Get current axe to plot
if ax is None:
ax = plt.subplots(nrows=3)
assert len(ax) == 3
# direct plot
frag.plot(ax=ax[0], label=label)
ax[0].set_title("Fragmentation over resources")
# plot distribution
sns.distplot(frag, ax=ax[1], label=label, kde=False, rug=True)
ax[1].set_title("Fragmentation distribution")
# plot ecdf
from statsmodels.distributions.empirical_distribution import ECDF
ecdf = ECDF(frag)
ax[2].step(ecdf.x, ecdf.y, label=label)
ax[2].set_title("Fragmentation ecdf")
[docs]def plot_load(load, nb_resources=None, ax=None, normalize=False,
time_scale=False, load_label="load",
UnixStartTime=0, TimeZoneString='UTC'):
'''
Plots the number of used resources against time
:normalize: if True normalize by the number of resources
`nb_resources`
'''
mean = metrics.load_mean(load)
u = load.copy()
if time_scale:
# make the time index a column
u = u.reset_index()
# convert timestamp to datetime
u.index = pd.to_datetime(u['time'] + UnixStartTime,
unit='s')
u.index.tz_localize('UTC').tz_convert(TimeZoneString)
if normalize and nb_resources is None:
nb_resources = u.load.max()
if normalize:
u.load = u.load / nb_resources
mean = mean / nb_resources
# get an axe if not provided
if ax is None:
ax = plt.gca()
# leave room to have better view
ax.margins(x=0.1, y=0.1)
# plot load
u.load.plot(drawstyle="steps-post", ax=ax, label=load_label)
# plot a line for max available area
if nb_resources and not normalize:
ax.plot([u.index[0], u.index[-1]],
[nb_resources, nb_resources],
linestyle='-', linewidth=2,
label="Maximum resources ({})".format(nb_resources))
# plot a line for mean utilisation
ax.plot([u.index[0], u.index[-1]],
[mean, mean],
linestyle='--', linewidth=1,
label="Mean {0} ({1:.2f})".format(load_label, mean))
sns.rugplot(u.load[u.load == 0].index, ax=ax, color='r')
ax.scatter([], [], marker="|", linewidth=1, s=200,
label="Reset event ({} == 0)".format(load_label), color='r')
# FIXME: Add legend when this bug is fixed
# https://github.com/mwaskom/seaborn/issues/1071
# ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
ax.grid(True)
ax.legend()
ax.set_title(load_label)
[docs]def plot_free_resources(utilisation, nb_resources, normalize=False,
time_scale=False,
UnixStartTime=0, TimeZoneString='UTC'):
'''
Plots the number of free resources against time
:normalize: if True normalize by the number of resources `nb_resources`
'''
free = nb_resources - utilisation
if normalize:
free = free / nb_resources
if time_scale:
free.index = pd.to_datetime(free['time'] + UnixStartTime,
unit='s', utc=True)
free.index.tz_localize('UTC').tz_convert(TimeZoneString)
free.plot()
# plot a line for the number of procs
plt.plot([free.index[0], free.index[-1]],
[nb_resources, nb_resources],
linestyle='-', linewidth=1,
label="Maximum resources ({})".format(nb_resources))