#
# Copyright (c) 2022 TUM Department of Electrical and Computer Engineering.
#
# This file is part of MLonMCU.
# See https://github.com/tum-ei-eda/mlonmcu.git for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Collection of (example) postprocesses integrated in MLonMCU."""
import re
import ast
import tempfile
from pathlib import Path
from io import StringIO
from collections import defaultdict
import numpy as np
import pandas as pd
from mlonmcu.artifact import Artifact, ArtifactFormat, lookup_artifacts
from mlonmcu.config import str2dict, str2bool, str2list
from mlonmcu.logging import get_logger
from .postprocess import SessionPostprocess, RunPostprocess
from .validate_metrics import parse_validate_metrics, parse_classify_metrics
from .calc_lib_mem_footprints import (
parse_elf,
analyze_linker_map_helper,
generate_pie_data,
agg_library_footprint,
unmangle_helper,
)
from .dwarf import analyze_dwarf
logger = get_logger()
[docs]
def match_rows(df, cols):
"""Helper function to group similar rows in a dataframe."""
groups = df.astype(str).groupby(cols).apply(lambda x: tuple(x.index)).tolist()
return groups
def _check_cfg(value):
res = re.compile(r"^((?:[a-zA-Z\d\-_ \.\[\]\(\)]+)(?:,[a-zA-Z\d\-_ \.\[\]\(\)]+)*)$").match(value)
if res is None:
return False
return True
def _parse_cfg(value):
if _check_cfg(value):
return value.split(",")
else:
return ast.literal_eval(value)
[docs]
class FilterColumnsPostprocess(SessionPostprocess):
"""Postprocess which can be used to drop unwanted columns from a report."""
DEFAULTS = {
**SessionPostprocess.DEFAULTS,
"keep": None,
"drop": None,
"drop_nan": False,
"drop_empty": False,
"drop_const": False,
}
def __init__(self, features=None, config=None):
super().__init__("filter_cols", features=features, config=config)
@property
def keep(self):
"""Get keep property."""
cfg = self.config["keep"]
if isinstance(cfg, str):
return _parse_cfg(cfg)
return cfg
@property
def drop(self):
"""Get drop property."""
cfg = self.config["drop"]
if isinstance(cfg, str):
return _parse_cfg(cfg)
return cfg
@property
def drop_nan(self):
"""Get drop_nan property."""
value = self.config["drop_nan"]
return str2bool(value)
@property
def drop_empty(self):
"""Get drop_empty property."""
value = self.config["drop_empty"]
return str2bool(value)
@property
def drop_const(self):
"""Get drop_const property."""
value = self.config["drop_const"]
return str2bool(value)
[docs]
def post_session(self, report):
"""Called at the end of a session."""
def _filter_df(df, keep, drop, drop_nan=False, drop_empty=False, drop_const=False):
if drop_empty:
raise NotImplementedError
if drop_nan:
df.dropna(axis=1, how="all", inplace=True)
if drop_const:
df = df.loc[:, (df != df.iloc[0]).any()]
if not (keep is None or drop is None):
raise RuntimeError("'drop' and 'keep' can not be defined at the same time")
if keep is not None:
drop_cols = [name for name in df.columns if name not in keep]
elif drop is not None:
drop_cols = [name for name in df.columns if name in drop]
else:
drop_cols = []
return df.drop(columns=drop_cols)
report.pre_df = _filter_df(
report.pre_df,
self.keep,
self.drop,
drop_nan=self.drop_nan,
drop_empty=self.drop_empty,
drop_const=self.drop_const,
)
report.main_df = _filter_df(
report.main_df, self.keep, self.drop, drop_nan=self.drop_nan, drop_const=self.drop_const
)
report.post_df = _filter_df(
report.post_df, self.keep, self.drop, drop_nan=self.drop_nan, drop_const=self.drop_const
)
[docs]
class RenameColumnsPostprocess(SessionPostprocess):
"""Postprocess which can rename columns based on a provided mapping."""
DEFAULTS = {
**SessionPostprocess.DEFAULTS,
"mapping": {},
"merge": True,
}
def __init__(self, features=None, config=None):
super().__init__("rename_cols", features=features, config=config)
@property
def mapping(self):
value = self.config["mapping"]
if not isinstance(value, dict):
return str2dict(value)
return value
@property
def merge(self):
value = self.config["merge"]
return str2bool(value)
[docs]
def post_session(self, report):
"""Called at the end of a session."""
values = self.mapping.values()
if len(values) != len(set(values)) and not self.merge:
logger.warning("rename_cols: non unique mapping found. use merge=True to avoid overwriting values.")
def merge(df):
if len(set(df.columns)) == len(df.columns):
return df
a = df.loc[:, ~df.columns.duplicated(keep="first")]
b = df.loc[:, df.columns.duplicated(keep="first")]
return a.combine_first(merge(b))
report.pre_df = report.pre_df.rename(columns=self.mapping)
report.main_df = report.main_df.rename(columns=self.mapping)
report.post_df = report.post_df.rename(columns=self.mapping)
if self.merge:
report.pre_df = merge(report.pre_df)
report.main_df = merge(report.main_df)
report.post_df = merge(report.post_df)
[docs]
class Features2ColumnsPostprocess(SessionPostprocess): # RunPostprocess?
"""Postprocess which can be used to transform (explode) the 'Features' Column
in a dataframe for easier filtering."""
DEFAULTS = {
**SessionPostprocess.DEFAULTS,
"limit": [],
"drop": True,
}
def __init__(self, features=None, config=None):
super().__init__("features2cols", features=features, config=config)
@property
def limit(self):
"""Get limit property."""
value = self.config["limit"]
if not isinstance(value, list):
return str2list(value)
return value
@property
def drop(self):
"""Get drop property."""
value = self.config["drop"]
return str2bool(value)
[docs]
def post_session(self, report):
df = report.post_df
if "Features" not in df.columns:
return
to_concat = [
df["Features"].apply(lambda x: pd.Series({"feature_" + feature_name: feature_name in x}))
for feature_name in list(set(df["Features"].sum()))
if feature_name in self.limit or len(self.limit) == 0
]
if len(to_concat) == 0:
return
feature_df = pd.concat(
to_concat,
axis=1,
)
if self.drop:
tmp_df = df.drop(columns=["Features"])
else:
tmp_df = df
new_df = pd.concat([tmp_df, feature_df], axis=1)
report.post_df = new_df
[docs]
class Config2ColumnsPostprocess(SessionPostprocess): # RunPostprocess?
"""Postprocess which can be used to transform (explode) the 'Config' Column in a dataframe for easier filtering."""
DEFAULTS = {
**SessionPostprocess.DEFAULTS,
"limit": [],
"drop": True,
}
def __init__(self, features=None, config=None):
super().__init__("config2cols", features=features, config=config)
@property
def limit(self):
"""Get limit property."""
value = self.config["limit"]
if not isinstance(value, list):
return str2list(value)
return value
@property
def drop(self):
"""Get drop property."""
value = self.config["drop"]
return str2bool(value)
[docs]
def post_session(self, report):
"""Called at the end of a session."""
df = report.post_df
if "Config" not in df.columns:
return
config_df = (
df["Config"]
.apply(lambda x: {key: value for key, value in x.items() if key in self.limit or len(self.limit) == 0})
.apply(pd.Series)
.add_prefix("config_")
)
if self.drop:
tmp_df = df.drop(columns=["Config"])
else:
tmp_df = df
new_df = pd.concat([tmp_df, config_df], axis=1)
report.post_df = new_df
[docs]
class MyPostprocess(SessionPostprocess):
"""TODO"""
DEFAULTS = {
**SessionPostprocess.DEFAULTS,
}
def __init__(self, features=None, config=None):
super().__init__("mypost", features=features, config=config)
self.config2cols = Config2ColumnsPostprocess(
config={
"config2cols.limit": [
"tvmllvm.desired_layout",
"tvmaot.desired_layout",
"tvmaotplus.desired_layout",
"tvmrt.desired_layout",
"xcorev.mem",
"xcorev.mac",
"xcorev.bi",
"xcorev.alu",
"xcorev.bitmanip",
"xcorev.simd",
"xcorev.hwlp",
"cv32e40p.fpu",
"etiss.fpu",
"corev_ovpsim.fpu",
"tvmaot.disabled_passes",
"tvmaotplus.disabled_passes",
"tvmrt.disabled_passes",
"tvmllvm.disabled_passes",
"auto_vectorize.loop",
"auto_vectorize.slp",
"auto_vectorize.force_vector_width",
"auto_vectorize.force_vector_interleave",
"auto_vectorize.custom_unroll",
"tvmllvm.target_keys",
"tvmrt.target_keys",
"tvmaot.target_keys",
"tvmaotplus.target_keys",
"autotuned.mode",
],
"config2cols.drop": True,
}
)
self.rename_cols = RenameColumnsPostprocess(
config={
"rename_cols.mapping": {
"config_tvmllvm.desired_layout": "Layout",
"config_tvmaot.desired_layout": "Layout",
"config_tvmaotplus.desired_layout": "Layout",
"config_tvmrt.desired_layout": "Layout",
"config_xcorev.mem": "XCVMem",
"config_xcorev.mac": "XCVMac",
"config_xcorev.bi": "XCVBi",
"config_xcorev.alu": "XCVAlu",
"config_xcorev.bitmanip": "XCVBitmanip",
"config_xcorev.simd": "XCVSimd",
"config_xcorev.hwlp": "XCVHwlp",
"feature_autotuned": "Autotuned",
"feature_debug": "Debug",
"config_cv32e40p.fpu": "FPU",
"config_etiss.fpu": "FPU",
"config_corev_ovpsim.fpu": "FPU",
"config_tvmaot.disabled_passes": "Disabled",
"config_tvmaotplus.disabled_passes": "Disabled",
"config_tvmrt.disabled_passes": "Disabled",
"config_tvmllvm.disabled_passes": "Disabled",
"config_auto_vectorize.loop": "Loop",
"config_auto_vectorize.slp": "Slp",
"config_auto_vectorize.force_vector_width": "FVW",
"config_auto_vectorize.force_vector_interleave": "FVI",
"config_auto_vectorize.custom_unroll": "Unroll",
"config_tvmllvm.target_keys": "Keys",
"config_tvmrt.target_keys": "Keys",
"config_tvmaot.target_keys": "Keys",
"config_tvmaotplus.target_keys": "Keys",
"config_autotuned.mode": "Tuner",
}
}
)
self.features2cols = Features2ColumnsPostprocess(
config={
"features2cols.limit": ["autotuned", "debug", "auto_vectorize", "target_optimized"],
"features2cols.drop": True,
}
)
self.filter_cols = FilterColumnsPostprocess(
config={
"filter_cols.drop": [
"Postprocesses",
"Framework",
"Platform",
"Session",
"ROM read-only",
"ROM code",
"ROM misc",
"RAM data",
"RAM zero-init data",
"Run Stage Time [s]",
"Compile Stage Time [s]",
"Workspace Size [B]",
"Build Stage Time [s]",
"Load Stage Time [s]",
"feature_auto_vectorize",
"feature_target_optimized",
"Setup Cycles",
"Setup Instructions",
"Setup CPI",
]
}
)
[docs]
def post_session(self, report):
"""TODO"""
self.config2cols.post_session(report)
self.features2cols.post_session(report)
self.rename_cols.post_session(report)
self.filter_cols.post_session(report)
[docs]
class PassConfig2ColumnsPostprocess(SessionPostprocess):
"""Postprocess which can be used to transform (explode) the TVM pass_config into separate columns.
requires prior Config2Columns pass."""
def __init__(self, features=None, config=None):
super().__init__("passcfg2cols", features=features, config=config)
[docs]
def post_session(self, report):
"""Called at the end of a session."""
df = report.post_df
name = "config_tvmaot.extra_pass_config"
if name not in df.columns:
return
config_df = df[name].apply(pd.Series).add_prefix("passcfg_")
tmp_df = df.drop(columns=[name])
new_df = pd.concat([tmp_df, config_df], axis=1)
report.post_df = new_df
[docs]
class Bytes2kBPostprocess(SessionPostprocess): # RunPostprocess?
"""Postprocess which can be used to scale the memory related columns from Bytes to KiloBytes."""
def __init__(self, features=None, config=None):
super().__init__("bytes2kb", features=features, config=config)
[docs]
def post_session(self, report):
"""Called at the end of a session."""
df = report.main_df
match_strs = ["ROM", "RAM"]
cols = list(
filter(lambda x: any(s in x for s in match_strs), df.columns)
) # Only scale columns related to memory
cols = [col for col in cols if "kB" not in col] # Do not scale columns with are already in kB
for col in cols:
df[col] = df[col] / 1000.0
df.rename(columns={col: col + " [kB]"}, inplace=True)
report.main_df = df
[docs]
class VisualizePostprocess(SessionPostprocess):
"""A very simple example on how to generate a plot of the results using a postprocess."""
DEFAULTS = {
**SessionPostprocess.DEFAULTS,
"format": "png",
}
def __init__(self, features=None, config=None):
super().__init__("visualize", features=features, config=config)
@property
def format(self):
"""Get format property."""
return self.config["format"]
[docs]
def post_session(self, report):
"""Called at the end of a session."""
df = pd.concat([report.pre_df, report.main_df], axis=1)
if self.format != "png":
raise NotImplementedError("Currently only supports PNG")
COLS = ["Cycles", "Total ROM", "Total RAM"]
for col in COLS:
if col not in report.main_df.columns:
return []
# Local import to deal with optional dependencies
import matplotlib.pyplot as plt
fig, axes = plt.subplots(ncols=len(COLS))
plt.rcParams["figure.figsize"] = (15, 3) # (w, h)
for i, col in enumerate(COLS):
new_df = df[[col]].astype(float)
bar_names_df = (
df["Session"].astype(str) + "_" + df["Run"].astype(str)
) # ideally we would use model/backend/target names here...
new_df.index = bar_names_df
new_df.plot(kind="bar", ax=axes[i])
data = None
with tempfile.TemporaryDirectory() as tmpdirname:
fig_path = Path(tmpdirname) / "plot.png"
fig.savefig(fig_path)
with open(fig_path, "rb") as handle:
data = handle.read()
artifacts = [Artifact("plot.png", raw=data, fmt=ArtifactFormat.RAW)]
return artifacts
[docs]
class Artifact2ColumnPostprocess(RunPostprocess):
"""Postprocess for converting artifacts to columns in the report."""
DEFAULTS = {
**RunPostprocess.DEFAULTS,
"file2colname": {},
}
def __init__(self, features=None, config=None):
super().__init__("artifacts2cols", features=features, config=config)
@property
def file2colname(self):
"""Get file2colname property."""
value = self.config["file2colname"]
if not isinstance(value, dict):
return str2dict(value)
return value
[docs]
def post_run(self, report, artifacts):
"""Called at the end of a run."""
for filename, colname in self.file2colname.items():
filename = Path(filename)
filecol = None
if ":" in filename.name:
fname, filecol = filename.name.rsplit(":", 1)
filename = filename.parent / fname
matches = lookup_artifacts(artifacts, name=filename, first_only=True)
if not matches:
report.main_df[colname] = ""
continue
if matches[0].fmt != ArtifactFormat.TEXT:
raise RuntimeError("Can only put text into report columns")
content = matches[0].content
if filecol:
assert filename.suffix == ".csv"
filedf = pd.read_csv(StringIO(content))
if filecol == "*":
cols = list(filedf.columns)
else:
assert filecol in filedf.columns
cols = [filecol]
content = filedf[cols].to_dict(orient="list")
if len(content) == 1:
content = content[list(content.keys())[0]]
if len(content) == 1:
content = content[0]
content = str(content)
report.main_df[colname] = content
return []
[docs]
class AnalyseInstructionsPostprocess(RunPostprocess):
"""Counting specific types of instructions."""
DEFAULTS = {
**RunPostprocess.DEFAULTS,
"groups": True,
"sequences": True,
"seq_depth": 3,
"top": 10,
"to_df": False,
"to_file": True,
"corev": False,
}
def __init__(self, features=None, config=None):
super().__init__("analyse_instructions", features=features, config=config)
@property
def groups(self):
"""Get groups property."""
value = self.config["groups"]
return str2bool(value)
@property
def sequences(self):
"""get sequences property."""
value = self.config["sequences"]
return str2bool(value)
@property
def seq_depth(self):
"""get seq_depth property."""
return int(self.config["seq_depth"])
@property
def top(self):
"""get top property."""
return int(self.config["top"])
@property
def to_df(self):
"""Get to_df property."""
value = self.config["to_df"]
return str2bool(value)
@property
def to_file(self):
"""Get to_file property."""
value = self.config["to_file"]
return str2bool(value)
@property
def corev(self):
"""Get corev property."""
value = self.config["corev"]
return str2bool(value)
[docs]
def post_run(self, report, artifacts):
"""Called at the end of a run."""
ret_artifacts = []
log_artifact = lookup_artifacts(artifacts, flags=("log_instrs",), fmt=ArtifactFormat.TEXT, first_only=True)
assert len(log_artifact) == 1, "To use analyse_instructions process, please enable feature log_instrs."
log_artifact = log_artifact[0]
is_spike = "spike" in log_artifact.flags
is_etiss = "etiss_pulpino" in log_artifact.flags or "etiss" in log_artifact.flags
is_ovpsim = "ovpsim" in log_artifact.flags or "corev_ovpsim" in log_artifact.flags
is_riscv = is_spike or is_etiss or is_ovpsim
if is_spike:
content = log_artifact.content
if self.groups:
encodings = re.compile(r"\((0x[0-9abcdef]+)\)").findall(content)
if self.sequences:
names = re.compile(r"core\s+\d+:\s0x[0-9abcdef]+\s\(0x[0-9abcdef]+\)\s([\w.]+).*").findall(content)
elif is_etiss:
# TODO: generalize
def transform_df(df):
df["pc"] = df["pc"].apply(lambda x: int(x, 0))
df["pc"] = pd.to_numeric(df["pc"])
# TODO: normalize instr names
df[["instr", "rest"]] = df["rest"].str.split(" # ", n=1, expand=True)
df["instr"] = df["instr"].apply(lambda x: x.strip())
df["instr"] = df["instr"].astype("category")
df[["bytecode", "operands"]] = df["rest"].str.split(" ", n=1, expand=True)
def detect_size(bytecode):
if bytecode[:2] == "0x":
return len(bytecode[2:]) / 2
elif bytecode[:2] == "0b":
return len(bytecode[2:]) / 8
else:
assert len(set(bytecode)) == 2
return len(bytecode) / 8
df["size"] = df["bytecode"].apply(detect_size)
df["bytecode"] = df["bytecode"].apply(
lambda x: int(x, 16) if "0x" in x else (int(x, 2) if "0b" in x else int(x, 2))
)
df["bytecode"] = pd.to_numeric(df["bytecode"])
df.drop(columns=["rest"], inplace=True)
return df
def process_df(df):
encodings = None
names = None
if self.groups:
# encodings = re.compile(r"0x[0-9abcdef]+:\s\w+\s#\s([0-9a-fx]+)\s.*").findall(content)
# encodings = [f"{enc}" for enc in encodings]
encodings = [bin(enc) for enc in df["bytecode"].values]
if self.sequences:
# names = re.compile(r"0x[0-9abcdef]+:\s(\w+)\s#\s[0-9a-fx]+\s.*").findall(content)
names = list(df["instr"].values)
return encodings, names
log_artifact.uncache()
encodings = None
names = None
if self.groups:
encodings = []
if self.sequences:
names = []
with pd.read_csv(
log_artifact.path, sep=":", names=["pc", "rest"], chunksize=2**22
) as reader: # TODO: expose chunksize
for chunk in reader:
df = transform_df(chunk)
encodings_, names_ = process_df(df)
# input(">")
encodings = encodings_
names += names_
# df = None
# content = log_artifact.content
# if self.groups:
# encodings = re.compile(r"0x[0-9abcdef]+:\s\w+\s#\s([0-9a-fx]+)\s.*").findall(content)
# encodings = [f"0b{enc}" for enc in encodings]
# # encodings = [f"{enc}" for enc in encodings]
# if self.sequences:
# names = re.compile(r"0x[0-9abcdef]+:\s(\w+)\s#\s[0-9a-fx]+\s.*").findall(content)
elif is_ovpsim:
content = log_artifact.content
if self.groups:
encodings = re.compile(r"riscvOVPsim\/cpu',\s0x[0-9abcdef]+\(.*\):\s([0-9abcdef]+)\s+\w+\s+.*").findall(
content
)
encodings = [f"0x{enc}" for enc in encodings]
if self.sequences:
names = re.compile(r"riscvOVPsim\/cpu',\s0x[0-9abcdef]+\(.*\):\s[0-9abcdef]+\s+(\w+)\s+.*").findall(
content
)
else:
raise RuntimeError("Uable to determine the used target.")
def _helper(x, top=100):
counts = pd.Series(x).value_counts()
probs = counts / len(x)
return dict(counts.head(top)), dict(probs.head(top))
def _gen_csv(label, counts, probs):
lines = [f"{label},Count,Probability"]
for x in counts:
line = f"{x},{counts[x]},{probs[x]:.3f}"
lines.append(line)
return "\n".join(lines)
if self.groups:
assert is_riscv, "Currently only riscv instrcutions can be analysed by groups"
def _extract_major_opcode(enc):
mapping = {
0b0010011: "OP-IMM",
0b0110111: "LUI",
0b0010111: "AUIPC",
0b0110011: "OP",
0b1101111: "JAL",
0b1100111: "JALR",
0b1100011: "BRANCH",
0b0000011: "LOAD",
0b0100011: "STORE",
0b0001111: "MISC-MEM",
0b1110011: "SYSTEM",
0b1000011: "MADD",
0b1000111: "MSUB",
0b1001011: "MNSUB",
0b1001111: "MNADD",
0b0000111: "LOAD-FP",
0b0100111: "STORE-FP",
0b0001011: "custom-0",
0b0101011: "custom-1",
0b1011011: "custom-2/rv128",
0b1111011: "custom-3/rv128",
0b1101011: "reserved",
0b0101111: "AMO",
0b1010011: "OP-FP",
0b1010111: "OP-V",
0b1110111: "OP-P",
0b0011011: "OP-IMM-32",
0b0111011: "OP-32",
}
enc = int(enc, 0) # Convert from hexadecimal
opcode = enc & 0b1111111
lsbs = opcode & 0b11
if lsbs == 0b11:
major = mapping.get(opcode, "UNKNOWN")
else:
# 16-bit instruction
msbs = (enc & 0b1110000000000000) >> 13
rvc_mapping = {
0b00000: "OP-IMM",
0b00001: "OP-IMM",
0b00010: "OP-IMM",
0b00100: "LOAD",
0b00101: "JAL",
0b00110: "LOAD-FP",
0b01000: "LOAD",
0b01001: "OP-IMM",
0b01010: "LOAD",
0b01100: "LOAD-FP",
0b01101: "OP-IMM",
0b01110: "LOAD-FP",
0b10000: "reserved",
0b10001: "MISC-ALU",
0b10010: "JALR",
0b10100: "STORE-FP",
0b10101: "JAL",
0b10110: "STORE-FP",
0b11000: "STORE",
0b11001: "BRANCH",
0b11010: "STORE",
0b11100: "STORE-FP",
0b11101: "BRANCH",
0b11110: "STORE-FP",
}
combined = msbs << 2 | lsbs
assert combined in rvc_mapping.keys()
return f"{rvc_mapping[combined]} (Compressed)"
return major
majors = list(map(_extract_major_opcode, encodings))
major_counts, major_probs = _helper(majors, top=self.top)
majors_csv = _gen_csv("Major", major_counts, major_probs)
artifact = Artifact("analyse_instructions_majors.csv", content=majors_csv, fmt=ArtifactFormat.TEXT)
if self.to_file:
ret_artifacts.append(artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["AnalyseInstructionsMajorsCounts"] = str(major_counts)
post_df["AnalyseInstructionsMajorsProbs"] = str(major_probs)
report.post_df = post_df
if self.sequences:
max_len = self.seq_depth
def _get_sublists(lst, length):
ret = []
for i in range(len(lst) - length + 1):
lst_ = lst[i : i + length]
ret.append(";".join(lst_))
return ret
for length in range(1, max_len + 1):
names_ = _get_sublists(names, length)
counts, probs = _helper(names_, top=self.top)
sequence_csv = _gen_csv("Sequence", counts, probs)
artifact = Artifact(
f"analyse_instructions_seq{length}.csv", content=sequence_csv, fmt=ArtifactFormat.TEXT
)
if self.to_file:
ret_artifacts.append(artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df[f"AnalyseInstructionsSeq{length}Counts"] = str(counts)
post_df[f"AnalyseInstructionsSeq{length}Probs"] = str(probs)
report.post_df = post_df
if self.corev:
XCVMAC_INSNS = {
"cv.mac",
"cv.msu",
"cv.mulun",
"cv.mulhhun",
"cv.mulsn",
"cv.mulhhsn",
"cv.mulurn",
"cv.mulhhurn",
"cv.mulsrn",
"cv.mulhhsrn",
"cv.macun",
"cv.machhun",
"cv.macsn",
"cv.machhsn",
"cv.macurn",
"cv.machhurn",
"cv.macsrn",
"cv.machhsrn",
}
XCVMEM_INSNS = {
"cv.lb_ri_inc",
"cv.lbu_ri_inc",
"cv.lh_ri_inc",
"cv.lhu_ri_inc",
"cv.lw_ri_inc",
"cv.lb_ri_inc",
"cv.lbu_ri_inc",
"cv.lh_ri_inc",
"cv.lhu_ri_inc",
"cv.lw_ri_inc",
"cv.lb_rr_inc",
"cv.lbu_rr_inc",
"cv.lh_rr_inc",
"cv.lhu_rr_inc",
"cv.lw_rr_inc",
"cv.lb_rr_inc",
"cv.lbu_rr_inc",
"cv.lh_rr_inc",
"cv.lhu_rr_inc",
"cv.lw_rr_inc",
"cv.lb_rr",
"cv.lbu_rr",
"cv.lh_rr",
"cv.lhu_rr",
"cv.lw_rr",
"cv.sb_ri_inc",
"cv.sh_ri_inc",
"cv.sw_ri_inc",
"cv.sb_ri_inc",
"cv.sh_ri_inc",
"cv.sw_ri_inc",
"cv.sb_rr_inc",
"cv.sh_rr_inc",
"cv.sw_rr_inc",
"cv.sb_rr_inc",
"cv.sh_rr_inc",
"cv.sw_rr_inc",
"cv.sb_rr",
"cv.sh_rr",
"cv.sw_rr",
}
XCVBI_INSNS = {
"cv.bneimm",
"cv.beqimm",
}
XCVALU_INSNS = {
"cv.slet",
"cv.min",
"cv.addnr",
"cv.addunr",
"cv.addn",
"cv.maxu",
"cv.subun",
"cv.extbz",
"cv.addun",
"cv.clip",
"cv.clipu",
"cv.subn",
"cv.max",
"cv.extbs",
"cv.abs",
"cv.addurn",
"cv.exths",
"cv.exthz",
"cv.minu",
"cv.sletu",
"cv.suburn",
"cv.addrn",
"cv.clipur",
"cv.subrn",
}
XCVBITMANIP_INSNS = {
"cv.ror",
"cv.clb",
}
XCVSIMD_INSNS = {
"cv.add.h",
"cv.add.sc.b",
"cv.add.sc.h",
"cv.add.sci.h",
"cv.and.b",
"cv.and.h",
"cv.and.sc.h",
"cv.and.sci.h",
"cv.cmpeq.sc.h",
"cv.cmpge.sci.h",
"cv.cmpgtu.h",
"cv.cmplt.sci.h",
"cv.cmpltu.sci.b",
"cv.cmpne.sc.h",
"cv.cmpne.sci.b",
"cv.extract.b",
"cv.extract.h",
"cv.extractu.b",
"cv.extractu.h",
"cv.insert.h",
"cv.max.h",
"cv.max.sci.h",
"cv.maxu.h",
"cv.or.b",
"cv.or.h",
"cv.pack",
"cv.packhi.b",
"cv.packlo.b",
"cv.shuffle2.b",
"cv.shuffle2.h",
"cv.shufflei0.sci.b",
"cv.sll.sci.h",
"cv.sra.h",
"cv.sra.sci.h",
"cv.srl.h",
"cv.srl.sci.h",
"cv.sub.b",
"cv.sub.h",
"cv.xor.b",
"cv.xor.sci.b",
"cv.add.sci.b",
"cv.cmpeq.b",
"cv.cmpgtu.sc.h",
"cv.cmpleu.sc.h",
"cv.sdotup.h",
"cv.sdotup.b",
"cv.shuffle.sci.h",
"cv.xor.sc.b",
"cv.xor.sc.h",
"cv.sdotsp.h",
"cv.cmpeq.sci.b",
"cv.and.sci.b",
"cv.dotsp.h",
"cv.dotsp.b",
"cv.sdotsp.b",
"cv.add.b",
"cv.dotup.sci.b",
}
XCVHWLP_INSNS = {
"cv.count",
"cv.counti",
"cv.start",
"cv.starti",
"cv.end",
"cv.endi",
"cv.setup",
"cv.setupi",
}
def apply_mapping(x):
x = x.replace("cv_", "cv.")
x = x.replace("_sc", ".sc")
x = x.replace("_b", ".b")
x = x.replace("_h", ".h")
if x in XCVMAC_INSNS:
return "XCVMac"
elif x in XCVMEM_INSNS:
return "XCVMem"
elif x in XCVALU_INSNS:
return "XCVAlu"
elif x in XCVBITMANIP_INSNS:
return "XCVBitmanip"
elif x in XCVBI_INSNS:
return "XCVBi"
elif x in XCVSIMD_INSNS:
return "XCVSimd"
elif x in XCVHWLP_INSNS:
return "XCVHwlp"
elif "cv." in x:
return "XCV?"
else:
return "Other"
names_ = list(map(apply_mapping, names))
cv_ext_counts, cv_ext_probs = _helper(names_, top=self.top)
corev_csv = _gen_csv("Set", cv_ext_counts, cv_ext_probs)
artifact = Artifact("analyse_instructions_corev.csv", content=corev_csv, fmt=ArtifactFormat.TEXT)
if self.to_file:
ret_artifacts.append(artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["CoreVSetCounts"] = str(cv_ext_counts)
post_df["CoreVSetProbs"] = str(cv_ext_probs)
report.post_df = post_df
assert self.to_file or self.to_df, "Either to_file or to_df have to be true"
return ret_artifacts
[docs]
class CompareRowsPostprocess(SessionPostprocess):
"""TODO"""
DEFAULTS = {
**SessionPostprocess.DEFAULTS,
"to_compare": None,
"group_by": None,
"baseline": 0,
"percent": False,
"invert": False,
"substract": False,
}
def __init__(self, features=None, config=None):
super().__init__("compare_rows", features=features, config=config)
@property
def to_compare(self):
"""Get to_compare property."""
value = self.config["to_compare"]
return str2list(value, allow_none=True)
@property
def group_by(self):
"""Get group_by property."""
value = self.config["group_by"]
return str2list(value, allow_none=True)
@property
def baseline(self):
"""Get baseline property."""
value = self.config["baseline"]
return int(value)
@property
def percent(self):
"""Get percent property."""
value = self.config["percent"]
return str2bool(value)
@property
def invert(self):
"""Get invert property."""
value = self.config["invert"]
return str2bool(value)
@property
def substract(self):
"""Get substract property."""
value = self.config["substract"]
return str2bool(value)
[docs]
def post_session(self, report):
"""Called at the end of a session."""
pre_df = report.pre_df
main_df = report.main_df # metrics
post_df = report.post_df
group_by = self.group_by
if group_by is None:
group_by = [x for x in pre_df.columns if x not in ["Run", "Sub"]]
assert isinstance(group_by, list)
assert all(col in list(pre_df.columns) + list(post_df.columns) for col in group_by), "Cols mssing in df"
to_compare = self.to_compare
if to_compare is None:
to_compare = list(main_df.columns)
assert isinstance(to_compare, list)
assert all(
col in main_df.columns for col in to_compare
), f"Missing cols? ({to_compare} vs {list(main_df.columns)})"
full_df = pd.concat([pre_df, main_df, post_df], axis=1)
grouped = full_df.groupby(group_by, axis=0, group_keys=False, dropna=False)
new_df = pd.DataFrame()
for col in to_compare:
def f(df):
assert self.baseline < len(df), "Index of group baseline out of bounds"
ret = df / df.iloc[self.baseline]
if self.substract:
ret = ret - 1
if self.invert:
ret = 1 / ret
if self.percent:
ret = ret * 100.0
return ret
filtered_col = grouped[col]
first = filtered_col.apply(f).reset_index()
first_col = first[col]
new = first_col
new_name = f"{col} (rel.)"
new_df[new_name] = new
main_df = pd.concat([main_df, new_df], axis=1)
report.main_df = main_df
[docs]
class AnalyseDumpPostprocess(RunPostprocess):
"""Counting static instructions."""
DEFAULTS = {
**RunPostprocess.DEFAULTS,
"to_df": False,
"to_file": True,
}
def __init__(self, features=None, config=None):
super().__init__("analyse_dump", features=features, config=config)
@property
def to_df(self):
"""Get to_df property."""
value = self.config["to_df"]
return str2bool(value)
@property
def to_file(self):
"""Get to_file property."""
value = self.config["to_file"]
return str2bool(value)
[docs]
def post_run(self, report, artifacts):
"""Called at the end of a run."""
platform = report.pre_df["Platform"]
if (platform != "mlif").any():
return []
ret_artifacts = []
dump_artifact = lookup_artifacts(
artifacts, name="generic_mlonmcu.dump", fmt=ArtifactFormat.TEXT, first_only=True
)
assert len(dump_artifact) == 1, "Dump artifact not found!"
dump_artifact = dump_artifact[0]
is_llvm = "llvm" in dump_artifact.flags
assert is_llvm, "Non-llvm objdump currently unsupported"
content = dump_artifact.content
lines = content.split("\n")
counts = {}
total = 0
for line in lines:
splitted = line.split("\t")
if len(splitted) != 3:
continue
insn = splitted[1]
args = splitted[2]
# stop = insn == "cv.lh" and args == "t2, (a0), 0x2"
if "seal5." in insn:
insn = insn.replace("seal5.", "")
if "cv." in insn:
if "(" in args and ")" in args:
m = re.compile(r"(.*)\((.*)\)").match(args)
m2 = re.compile(r"(.*)\((.*)\),\s*(.*)").match(args)
if m2:
g = m2.groups()
assert len(g) == 3
_, base, offset = g
fmt = "ri"
try:
offset = int(offset)
except ValueError:
fmt = "rr"
insn += f"_{fmt}"
insn += "_inc"
elif m:
g = m.groups()
assert len(g) == 2
offset, base = g
fmt = "ri"
try:
offset = int(offset)
except ValueError:
fmt = "rr"
insn += f"_{fmt}"
if "!" in base or ")," in base:
insn += "_inc"
if insn in counts:
counts[insn] += 1
else:
counts[insn] = 1
total += 1
counts_csv = "Instruction,Count,Probability\n"
for insn, count in sorted(counts.items(), key=lambda item: item[1]):
counts_csv += f"{insn},{count},{count/total:.4f}\n"
artifact = Artifact("dump_counts.csv", content=counts_csv, fmt=ArtifactFormat.TEXT)
if self.to_file:
ret_artifacts.append(artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["DumpCounts"] = str(counts)
report.post_df = post_df
assert self.to_file or self.to_df, "Either to_file or to_df have to be true"
return ret_artifacts
[docs]
class AnalyseCoreVCountsPostprocess(RunPostprocess):
"""Counting static instructions."""
DEFAULTS = {
**RunPostprocess.DEFAULTS,
"to_df": False,
"to_file": True,
}
def __init__(self, features=None, config=None):
super().__init__("analyse_corev_counts", features=features, config=config)
@property
def to_df(self):
"""Get to_df property."""
value = self.config["to_df"]
return str2bool(value)
@property
def to_file(self):
"""Get to_file property."""
value = self.config["to_file"]
return str2bool(value)
[docs]
def post_run(self, report, artifacts):
"""Called at the end of a run."""
ret_artifacts = []
count_artifact = lookup_artifacts(artifacts, name="dump_counts.csv", fmt=ArtifactFormat.TEXT, first_only=True)
assert len(count_artifact) == 1, "To use analyse_corev_counts postprocess, analyse_dump needs to run first."
count_artifact = count_artifact[0]
content = count_artifact.content
lines = content.split("\n")
XCVMAC_INSNS = {
"cv.mac",
"cv.msu",
"cv.mulun",
"cv.mulhhun",
"cv.mulsn",
"cv.mulhhsn",
"cv.mulurn",
"cv.mulhhurn",
"cv.mulsrn",
"cv.mulhhsrn",
"cv.macun",
"cv.machhun",
"cv.macsn",
"cv.machhsn",
"cv.macurn",
"cv.machhurn",
"cv.macsrn",
"cv.machhsrn",
}
XCVMEM_INSNS = {
"cv.lb_ri_inc",
"cv.lbu_ri_inc",
"cv.lh_ri_inc",
"cv.lhu_ri_inc",
"cv.lw_ri_inc",
"cv.lb_ri_inc",
"cv.lbu_ri_inc",
"cv.lh_ri_inc",
"cv.lhu_ri_inc",
"cv.lw_ri_inc",
"cv.lb_rr_inc",
"cv.lbu_rr_inc",
"cv.lh_rr_inc",
"cv.lhu_rr_inc",
"cv.lw_rr_inc",
"cv.lb_rr_inc",
"cv.lbu_rr_inc",
"cv.lh_rr_inc",
"cv.lhu_rr_inc",
"cv.lw_rr_inc",
"cv.lb_rr",
"cv.lbu_rr",
"cv.lh_rr",
"cv.lhu_rr",
"cv.lw_rr",
"cv.sb_ri_inc",
"cv.sh_ri_inc",
"cv.sw_ri_inc",
"cv.sb_ri_inc",
"cv.sh_ri_inc",
"cv.sw_ri_inc",
"cv.sb_rr_inc",
"cv.sh_rr_inc",
"cv.sw_rr_inc",
"cv.sb_rr_inc",
"cv.sh_rr_inc",
"cv.sw_rr_inc",
"cv.sb_rr",
"cv.sh_rr",
"cv.sw_rr",
}
XCVBI_INSNS = {
"cv.bneimm",
"cv.beqimm",
}
XCVALU_INSNS = {
"cv.slet",
"cv.min",
"cv.addnr",
"cv.addunr",
"cv.addn",
"cv.maxu",
"cv.subun",
"cv.extbz",
"cv.addun",
"cv.clip",
"cv.clipu",
"cv.subn",
"cv.max",
"cv.extbs",
"cv.abs",
"cv.addurn",
"cv.exths",
"cv.exthz",
"cv.minu",
"cv.sletu",
"cv.suburn",
"cv.addrn",
"cv.clipur",
"cv.subrn",
}
XCVBITMANIP_INSNS = {
"cv.ror",
"cv.clb",
}
XCVSIMD_INSNS = {
"cv.add.h",
"cv.add.sc.b",
"cv.add.sc.h",
"cv.add.sci.h",
"cv.and.b",
"cv.and.h",
"cv.and.sc.h",
"cv.and.sci.h",
"cv.cmpeq.sc.h",
"cv.cmpge.sci.h",
"cv.cmpgtu.h",
"cv.cmplt.sci.h",
"cv.cmpltu.sci.b",
"cv.cmpne.sc.h",
"cv.cmpne.sci.b",
"cv.extract.b",
"cv.extract.h",
"cv.extractu.b",
"cv.extractu.h",
"cv.insert.h",
"cv.max.h",
"cv.max.sci.h",
"cv.maxu.h",
"cv.or.b",
"cv.or.h",
"cv.pack",
"cv.packhi.b",
"cv.packlo.b",
"cv.shuffle2.b",
"cv.shuffle2.h",
"cv.shufflei0.sci.b",
"cv.sll.sci.h",
"cv.sra.h",
"cv.sra.sci.h",
"cv.srl.h",
"cv.srl.sci.h",
"cv.sub.b",
"cv.sub.h",
"cv.xor.b",
"cv.xor.sci.b",
"cv.add.sci.b",
"cv.cmpeq.b",
"cv.cmpgtu.sc.h",
"cv.cmpleu.sc.h",
"cv.sdotup.h",
"cv.sdotup.b",
"cv.shuffle.sci.h",
"cv.xor.sc.b",
"cv.xor.sc.h",
"cv.sdotsp.h",
"cv.cmpeq.sci.b",
"cv.and.sci.b",
"cv.dotsp.h",
"cv.dotsp.b",
"cv.sdotsp.b",
"cv.add.b",
"cv.dotup.sci.b",
}
XCVHWLP_INSNS = {
"cv.count",
"cv.counti",
"cv.start",
"cv.starti",
"cv.end",
"cv.endi",
"cv.setup",
"cv.setupi",
}
unknowns = []
cv_ext_totals = {
"XCVMac": len(XCVMAC_INSNS),
"XCVMem": len(XCVMEM_INSNS),
"XCVBi": len(XCVBI_INSNS),
"XCVAlu": len(XCVALU_INSNS),
"XCVBitmanip": len(XCVBITMANIP_INSNS),
"XCVSimd": len(XCVSIMD_INSNS),
"XCVHwlp": len(XCVHWLP_INSNS),
"Unknown": 0,
}
cv_ext_counts = {
"XCVMac": 0,
"XCVMem": 0,
"XCVBi": 0,
"XCVAlu": 0,
"XCVBitmanip": 0,
"XCVSimd": 0,
"XCVHwlp": 0,
"Unknown": 0,
}
cv_ext_unique_counts = {
"XCVMac": 0,
"XCVMem": 0,
"XCVBi": 0,
"XCVAlu": 0,
"XCVBitmanip": 0,
"XCVSimd": 0,
"XCVHwlp": 0,
"Unknown": 0,
}
total_counts = 0
cv_counts_csv = "Instruction,Count,Probability\n"
cv_counts = {}
for line in lines[1:]:
if "cv." not in line:
continue
cv_counts_csv += f"{line}\n"
splitted = line.split(",")
assert len(splitted) == 3
insn = splitted[0]
count = int(splitted[1])
cv_counts[insn] = count
total_counts += count
if insn in XCVMAC_INSNS:
cv_ext_counts["XCVMac"] += count
cv_ext_unique_counts["XCVMac"] += 1
elif insn in XCVMEM_INSNS:
cv_ext_counts["XCVMem"] += count
cv_ext_unique_counts["XCVMem"] += 1
elif insn in XCVBI_INSNS:
cv_ext_counts["XCVBi"] += count
cv_ext_unique_counts["XCVBi"] += 1
elif insn in XCVALU_INSNS:
cv_ext_counts["XCVAlu"] += count
cv_ext_unique_counts["XCVAlu"] += 1
elif insn in XCVBITMANIP_INSNS:
cv_ext_counts["XCVBitmanip"] += count
cv_ext_unique_counts["XCVBitmanip"] += 1
elif insn in XCVSIMD_INSNS:
cv_ext_counts["XCVSimd"] += count
cv_ext_unique_counts["XCVSimd"] += 1
elif insn in XCVHWLP_INSNS:
cv_ext_counts["XCVHwlp"] += count
cv_ext_unique_counts["XCVHwlp"] += 1
else:
cv_ext_counts["Unknown"] += count
cv_ext_unique_counts["Unknown"] += 1
if insn not in unknowns:
unknowns.append(insn)
cv_ext_totals["Unknown"] = len(unknowns)
cv_ext_counts_csv = "Set,Count,Probability\n"
for ext, count in sorted(cv_ext_counts.items(), key=lambda item: item[1]):
if count == 0:
continue
cv_ext_counts_csv += f"{ext},{count},{count/total_counts}\n"
cv_ext_unique_counts_csv = "Set,Used,Utilization\n"
for ext, used in sorted(cv_ext_unique_counts.items(), key=lambda item: item[1]):
if used == 0:
continue
rel = used / cv_ext_totals[ext]
cv_ext_unique_counts_csv += f"{ext},{used},{rel:.4f}\n"
used = sum(cv_ext_unique_counts.values())
totals = sum(cv_ext_totals.values())
rel = used / totals
cv_ext_unique_counts_csv += f"XCVTotal,{used},{rel:.4f}\n"
cv_counts_artifact = Artifact("cv_counts.csv", content=cv_counts_csv, fmt=ArtifactFormat.TEXT)
cv_ext_counts_artifact = Artifact("cv_ext_counts.csv", content=cv_ext_counts_csv, fmt=ArtifactFormat.TEXT)
cv_ext_unique_counts_artifact = Artifact(
"cv_ext_unique_counts.csv", content=cv_ext_unique_counts_csv, fmt=ArtifactFormat.TEXT
)
if len(unknowns) > 0:
logger.warning("Unknown instructions found: %s", unknowns)
cv_ext_unknowns_artifact = Artifact(
"cv_ext_unknowns.csv", content="\n".join(unknowns), fmt=ArtifactFormat.TEXT
)
if self.to_file:
ret_artifacts.append(cv_ext_unknowns_artifact)
# TODO: logging
if self.to_file:
ret_artifacts.append(cv_counts_artifact)
ret_artifacts.append(cv_ext_counts_artifact)
ret_artifacts.append(cv_ext_unique_counts_artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["XCVCounts"] = str(cv_counts)
post_df["XCVExtCounts"] = str(cv_ext_counts)
post_df["XCVExtUniqueCounts"] = str(cv_ext_unique_counts)
report.post_df = post_df
assert self.to_file or self.to_df, "Either to_file or to_df have to be true"
return ret_artifacts
[docs]
class ValidateOutputsPostprocess(RunPostprocess):
"""Postprocess for comparing model outputs with golden reference."""
DEFAULTS = {
**RunPostprocess.DEFAULTS,
"report": False,
"validate_metrics": "topk(n=1);topk(n=2)",
"validate_range": True,
}
def __init__(self, features=None, config=None):
super().__init__("validate_outputs", features=features, config=config)
@property
def validate_metrics(self):
"""Get validate_metrics property."""
value = self.config["validate_metrics"]
return value
@property
def report(self):
"""Get report property."""
value = self.config["report"]
return str2bool(value)
@property
def validate_range(self):
"""Get validate_range property."""
value = self.config["validate_range"]
return str2bool(value)
[docs]
def post_run(self, report, artifacts):
"""Called at the end of a run."""
model_info_artifact = lookup_artifacts(artifacts, name="model_info.yml", first_only=True)
assert len(model_info_artifact) == 1, "Could not find artifact: model_info.yml"
model_info_artifact = model_info_artifact[0]
import yaml
model_info_data = yaml.safe_load(model_info_artifact.content)
if len(model_info_data["output_names"]) > 1:
raise NotImplementedError("Multi-outputs not yet supported.")
outputs_ref_artifact = lookup_artifacts(artifacts, name="outputs_ref.npy", first_only=True)
assert len(outputs_ref_artifact) == 1, "Could not find artifact: outputs_ref.npy"
outputs_ref_artifact = outputs_ref_artifact[0]
import numpy as np
outputs_ref = np.load(outputs_ref_artifact.path, allow_pickle=True)
# import copy
# outputs = copy.deepcopy(outputs_ref)
# outputs[1][list(outputs[1].keys())[0]][0] = 42
outputs_artifact = lookup_artifacts(artifacts, name="outputs.npy", first_only=True)
assert len(outputs_artifact) == 1, "Could not find artifact: outputs.npy"
outputs_artifact = outputs_artifact[0]
outputs = np.load(outputs_artifact.path, allow_pickle=True)
in_data = None
# compared = 0
# matching = 0
# missing = 0
# metrics = {
# "allclose(atol=0.0,rtol=0.0)": None,
# "allclose(atol=0.05,rtol=0.05)": None,
# "allclose(atol=0.1,rtol=0.1)": None,
# "topk(n=1)": None,
# "topk(n=2)": None,
# "topk(n=inf)": None,
# "toy": None,
# "mse(thr=0.1)": None,
# "mse(thr=0.05)": None,
# "mse(thr=0.01)": None,
# "+-1": None,
# }
validate_metrics_str = self.validate_metrics
validate_metrics = parse_validate_metrics(validate_metrics_str)
for i, output_ref in enumerate(outputs_ref):
if i >= len(outputs):
logger.warning("Missing output sample")
# missing += 1
break
output = outputs[i]
ii = 0
for out_name, out_ref_data in output_ref.items():
if out_name in output:
out_data = output[out_name]
elif ii < len(output):
if isinstance(output, dict):
# fallback for custom name-based npy dict
out_data = list(output.values())[ii]
else: # fallback for index-based npy array
assert isinstance(output, (list, np.array)), "expected dict, list or np.array type"
out_data = output[ii]
else:
RuntimeError(f"Output not found: {out_name}")
# optional dequantize
# print("out_data_before_quant", out_data)
# print("sum(out_data_before_quant", np.sum(out_data))
quant = model_info_data.get("output_quant_details", None)
rng = model_info_data.get("output_ranges", None)
if quant:
def ref_quant_helper(quant, data): # TODO: move somewhere else
if quant is None:
return data
quant_scale, quant_zero_point, quant_dtype, quant_range = quant
if quant_dtype is None or data.dtype.name == quant_dtype:
return data
assert data.dtype.name in ["float32"], "Quantization only supported for float32 input"
assert quant_dtype in ["int8"], "Quantization only supported for int8 output"
if quant_range and self.validate_range:
assert len(quant_range) == 2, "Range should be a tuple (lower, upper)"
lower, upper = quant_range
# print("quant_range", quant_range)
# print("np.min(data)", np.min(data))
# print("np.max(data)", np.max(data))
assert lower <= upper
assert np.min(data) >= lower and np.max(data) <= upper, "Range missmatch"
return np.around((data / quant_scale) + quant_zero_point).astype("int8")
def dequant_helper(quant, data): # TODO: move somewhere else
if quant is None:
return data
quant_scale, quant_zero_point, quant_dtype, quant_range = quant
if quant_dtype is None or data.dtype.name == quant_dtype:
return data
assert data.dtype.name in ["int8"], "Dequantization only supported for int8 input"
assert quant_dtype in ["float32"], "Dequantization only supported for float32 output"
ret = (data.astype("float32") - quant_zero_point) * quant_scale
if quant_range and self.validate_range:
assert len(quant_range) == 2, "Range should be a tuple (lower, upper)"
# print("quant_range", quant_range)
# print("np.min(ret)", np.min(ret))
# print("np.max(ret)", np.max(ret))
lower, upper = quant_range
assert lower <= upper
assert np.min(ret) >= lower and np.max(ret) <= upper, "Range missmatch"
return ret
assert ii < len(rng)
rng_ = rng[ii]
if rng_ and self.validate_range:
assert len(rng_) == 2, "Range should be a tuple (lower, upper)"
lower, upper = rng_
assert lower <= upper
# print("rng_", rng_)
# print("np.min(out_data)", np.min(out_data))
# print("np.max(out_data)", np.max(out_data))
assert np.min(out_data) >= lower and np.max(out_data) <= upper, "Range missmatch"
assert ii < len(quant)
quant_ = quant[ii]
if quant_ is not None:
out_ref_data_quant = ref_quant_helper(quant_, out_ref_data)
for vm in validate_metrics:
vm.process(out_data, out_ref_data_quant, in_data=in_data, quant=True)
out_data = dequant_helper(quant_, out_data)
# print("out_data", out_data)
# print("sum(out_data)", np.sum(out_data))
# print("out_ref_data", out_ref_data)
# print("sum(out_ref_data)", np.sum(out_ref_data))
# input("TIAW")
assert out_data.dtype == out_ref_data.dtype, "dtype missmatch"
assert out_data.shape == out_ref_data.shape, "shape missmatch"
for vm in validate_metrics:
vm.process(out_data, out_ref_data, in_data=in_data, quant=False)
ii += 1
if self.report:
raise NotImplementedError
for vm in validate_metrics:
res = vm.get_summary()
report.post_df[f"{vm.name}"] = res
return []
[docs]
class ValidateLabelsPostprocess(RunPostprocess):
"""Postprocess for comparing model outputs with golden reference."""
DEFAULTS = {
**RunPostprocess.DEFAULTS,
"report": False,
"classify_metrics": "topk_label(n=1);topk_label(n=2)",
}
def __init__(self, features=None, config=None):
super().__init__("validate_labels", features=features, config=config)
@property
def classify_metrics(self):
"""Get classify_metrics property."""
value = self.config["classify_metrics"]
return value
@property
def report(self):
"""Get report property."""
value = self.config["report"]
return str2bool(value)
[docs]
def post_run(self, report, artifacts):
"""Called at the end of a run."""
model_info_artifact = lookup_artifacts(artifacts, name="model_info.yml", first_only=True)
assert len(model_info_artifact) == 1, "Could not find artifact: model_info.yml"
model_info_artifact = model_info_artifact[0]
import yaml
model_info_data = yaml.safe_load(model_info_artifact.content)
if len(model_info_data["output_names"]) > 1:
raise NotImplementedError("Multi-outputs not yet supported.")
labels_ref_artifact = lookup_artifacts(artifacts, name="labels_ref.npy", first_only=True)
assert (
len(labels_ref_artifact) == 1
), "Could not find artifact: labels_ref.npy (Run classify_labels postprocess first!)"
labels_ref_artifact = labels_ref_artifact[0]
import numpy as np
labels_ref = np.load(labels_ref_artifact.path, allow_pickle=True)
outputs_artifact = lookup_artifacts(artifacts, name="outputs.npy", first_only=True)
assert len(outputs_artifact) == 1, "Could not find artifact: outputs.npy"
outputs_artifact = outputs_artifact[0]
outputs = np.load(outputs_artifact.path, allow_pickle=True)
# missing = 0
classify_metrics_str = self.classify_metrics
classify_metrics = parse_classify_metrics(classify_metrics_str)
for i, output in enumerate(outputs):
if isinstance(output, dict): # name based lookup
pass
else: # index based lookup
assert isinstance(output, (list, np.array)), "expected dict, list or np.array"
output_names = model_info_data["output_names"]
assert len(output) == len(output_names)
output = {output_names[idx]: out for idx, out in enumerate(output)}
assert len(output) == 1, "Only supporting single-output models"
out_data = output[list(output.keys())[0]]
# print("out_data", out_data)
assert i < len(labels_ref), "Missing reference labels"
label_ref = labels_ref[i]
# print("label_ref", label_ref)
for cm in classify_metrics:
cm.process(out_data, label_ref, quant=False)
if self.report:
raise NotImplementedError
for cm in classify_metrics:
res = cm.get_summary()
report.post_df[f"{cm.name}"] = res
return []
[docs]
class ExportOutputsPostprocess(RunPostprocess):
"""Postprocess for writing model outputs to a directory."""
DEFAULTS = {
**RunPostprocess.DEFAULTS,
"dest": None, # if none: export as artifact
"use_ref": False,
"skip_dequant": False,
"fmt": "bin",
"archive_fmt": None,
}
def __init__(self, features=None, config=None):
super().__init__("export_outputs", features=features, config=config)
@property
def dest(self):
"""Get dest property."""
value = self.config["dest"]
if value is not None:
if not isinstance(value, Path):
assert isinstance(value, str)
value = Path(value)
return value
@property
def use_ref(self):
"""Get use_ref property."""
value = self.config["use_ref"]
return str2bool(value)
@property
def skip_dequant(self):
"""Get skip_dequant property."""
value = self.config["skip_dequant"]
return str2bool(value)
@property
def fmt(self):
"""Get fmt property."""
return self.config["fmt"]
@property
def archive_fmt(self):
"""Get archive_fmt property."""
return self.config["archive_fmt"]
[docs]
def post_run(self, report, artifacts):
"""Called at the end of a run."""
model_info_artifact = lookup_artifacts(artifacts, name="model_info.yml", first_only=True)
assert len(model_info_artifact) == 1, "Could not find artifact: model_info.yml"
model_info_artifact = model_info_artifact[0]
import yaml
model_info_data = yaml.safe_load(model_info_artifact.content)
# print("model_info_data", model_info_data)
if len(model_info_data["output_names"]) > 1:
raise NotImplementedError("Multi-outputs not yet supported.")
if self.use_ref:
outputs_ref_artifact = lookup_artifacts(artifacts, name="outputs_ref.npy", first_only=True)
assert len(outputs_ref_artifact) == 1, "Could not find artifact: outputs_ref.npy"
outputs_ref_artifact = outputs_ref_artifact[0]
outputs_ref = np.load(outputs_ref_artifact.path, allow_pickle=True)
outputs = outputs_ref
else:
outputs_artifact = lookup_artifacts(artifacts, name="outputs.npy", first_only=True)
assert len(outputs_artifact) == 1, "Could not find artifact: outputs.npy"
outputs_artifact = outputs_artifact[0]
outputs = np.load(outputs_artifact.path, allow_pickle=True)
if self.dest is None:
temp_dir = tempfile.TemporaryDirectory()
dest_ = Path(temp_dir.name)
else:
temp_dir = None
assert self.dest.is_dir(), f"Not a directory: {self.dest}"
dest_ = self.dest
assert self.fmt in ["bin", "npy"], f"Invalid format: {self.fmt}"
filenames = []
for i, output in enumerate(outputs):
if isinstance(output, dict): # name based lookup
pass
else: # index based lookup
assert isinstance(output, (list, np.array)), "expected dict, list or np.array"
output_names = model_info_data["output_names"]
assert len(output) == len(output_names)
output = {output_names[idx]: out for idx, out in enumerate(output)}
quant = model_info_data.get("output_quant_details", None)
if quant and not self.skip_dequant:
def dequant_helper(quant, data):
if quant is None:
return data
quant_scale, quant_zero_point, quant_dtype, quant_range = quant
if quant_dtype is None or data.dtype.name == quant_dtype:
return data
assert data.dtype.name in ["int8"], "Dequantization only supported for int8 input"
assert quant_dtype in ["float32"], "Dequantization only supported for float32 output"
return (data.astype("float32") - quant_zero_point) * quant_scale
output = {
out_name: dequant_helper(quant[j], output[out_name]) for j, out_name in enumerate(output.keys())
}
if self.fmt == "npy":
raise NotImplementedError("npy export")
elif self.fmt == "bin":
assert len(output.keys()) == 1, "Multi-outputs not supported"
output_data = list(output.values())[0]
data = output_data.tobytes(order="C")
file_name = f"{i}.bin"
file_dest = dest_ / file_name
filenames.append(file_dest)
with open(file_dest, "wb") as f:
f.write(data)
else:
assert False, f"fmt not supported: {self.fmt}"
artifacts = []
archive_fmt = self.archive_fmt
create_artifact = self.dest is None or archive_fmt is not None
if create_artifact:
if archive_fmt is None:
assert self.dest is None
archive_fmt = "tar.gz" # Default fallback
assert archive_fmt in ["tar.xz", "tar.gz", "zip"]
archive_name = f"output_data.{archive_fmt}"
archive_path = f"{dest_}.{archive_fmt}"
if archive_fmt == "tar.gz":
import tarfile
with tarfile.open(archive_path, "w:gz") as tar:
for filename in filenames:
tar.add(filename, arcname=filename.name)
else:
raise NotImplementedError(f"archive_fmt={archive_fmt}")
with open(archive_path, "rb") as f:
raw = f.read()
artifact = Artifact(archive_name, raw=raw, fmt=ArtifactFormat.BIN)
artifacts.append(artifact)
if temp_dir:
temp_dir.cleanup()
return artifacts
[docs]
class AnalyseLinkerMapPostprocess(RunPostprocess):
"""Calculate memory footprints."""
DEFAULTS = {
**RunPostprocess.DEFAULTS,
# "to_df": True,
"to_df": False,
"to_file": True,
"per_func": True,
"per_object": True,
"per_library": True,
"ignore": [],
"sum": False,
}
def __init__(self, features=None, config=None):
super().__init__("analyse_linker_map", features=features, config=config)
@property
def to_df(self):
"""Get to_df property."""
value = self.config["to_df"]
return str2bool(value)
@property
def to_file(self):
"""Get to_file property."""
value = self.config["to_file"]
return str2bool(value)
@property
def per_func(self):
"""Get per_func property."""
value = self.config["per_func"]
return str2bool(value)
@property
def per_object(self):
"""Get per_object property."""
value = self.config["per_object"]
return str2bool(value)
@property
def per_library(self):
"""Get per_library property."""
value = self.config["per_library"]
return str2bool(value)
@property
def ignore(self):
"""Get ignore property."""
value = self.config["ignore"]
# print("value", value)
if not isinstance(value, list):
return str2list(value)
return value
@property
def sum(self):
"""Get sum property."""
value = self.config["sum"]
return str2bool(value)
[docs]
def post_run(self, report, artifacts):
"""Called at the end of a run."""
platform = report.pre_df["Platform"]
if (platform != "mlif").any():
return []
ret_artifacts = []
elf_artifact = lookup_artifacts(artifacts, name="generic_mlonmcu", fmt=ArtifactFormat.BIN, first_only=True)
assert len(elf_artifact) == 1, "ELF artifact not found!"
elf_artifact = elf_artifact[0]
map_artifact = lookup_artifacts(artifacts, name="generic_mlonmcu.map", fmt=ArtifactFormat.TEXT, first_only=True)
assert len(map_artifact) == 1, "Linker map artifact not found!"
map_artifact = map_artifact[0]
# is_ld = "ld" in map_artifact.flags
# assert is_ld, "Non ld linker currently unsupported"
mem_footprint_df = parse_elf(elf_artifact.path)
from mapfile_parser import mapfile
mapFile = mapfile.MapFile()
mapFile.readMapFile(map_artifact.path)
symbol_map = analyze_linker_map_helper(mapFile)
symbol_map_df = pd.DataFrame(
symbol_map,
columns=[
"segment",
"section",
"symbol",
"object",
"object_full",
"library",
"library_full",
],
)
topk = None
if self.per_func:
if self.to_df and self.sum:
post_df = report.post_df.copy()
post_df["ROM code (Func sum)"] = mem_footprint_df["bytes"].sum()
report.post_df = post_df
mem_footprint_per_func_data = generate_pie_data(mem_footprint_df, x="func", y="bytes", topk=topk)
# print("per_func\n", mem_footprint_per_func_data, mem_footprint_per_func_data["bytes"].sum())
if self.to_file:
mem_footprint_per_func_artifact = Artifact(
"mem_footprint_per_func.csv",
content=mem_footprint_per_func_data.to_csv(index=False),
fmt=ArtifactFormat.TEXT,
)
ret_artifacts.append(mem_footprint_per_func_artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["ROM code (by func)"] = str(
mem_footprint_per_func_data.groupby("func", dropna=False).sum().to_dict()["bytes"]
)
report.post_df = post_df
if self.per_library:
library_footprint_df = agg_library_footprint(mem_footprint_df, symbol_map_df, by="library", col="bytes")
if self.ignore:
# print("self.ignore", self.ignore)
library_footprint_df = library_footprint_df[~library_footprint_df["library"].isin(self.ignore)]
# print("library_footprint_df", library_footprint_df)
if self.to_df and self.sum:
post_df = report.post_df.copy()
post_df["ROM code (Lib sum)"] = library_footprint_df["bytes"].sum()
report.post_df = post_df
mem_footprint_per_library_data = generate_pie_data(library_footprint_df, x="library", y="bytes", topk=topk)
# print("per_library\n", mem_footprint_per_library_data, mem_footprint_per_library_data["bytes"].sum())
if self.to_file:
mem_footprint_per_func_artifact = Artifact(
"mem_footprint_per_library.csv",
content=mem_footprint_per_library_data.to_csv(index=False),
fmt=ArtifactFormat.TEXT,
)
ret_artifacts.append(mem_footprint_per_func_artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["ROM code (by library)"] = str(
mem_footprint_per_library_data.groupby("library", dropna=False).sum().to_dict()["bytes"]
)
report.post_df = post_df
# print("post_df", post_df)
if True: # TODO: generalize
# print("if1")
if "libmuriscvnn.a" in mem_footprint_per_library_data["library"].unique():
# print("if2")
muriscvnn_bytes = mem_footprint_per_library_data[
mem_footprint_per_library_data["library"] == "libmuriscvnn.a"
]["bytes"].iloc[0]
# print("muriscvnn_bytes", muriscvnn_bytes)
post_df = report.post_df.copy()
post_df["ROM code (libmuriscvnn.a)"] = muriscvnn_bytes
report.post_df = post_df
if self.per_object:
object_footprint_df = agg_library_footprint(mem_footprint_df, symbol_map_df, by="object", col="bytes")
if self.ignore:
object_footprint_df = object_footprint_df[~object_footprint_df["object"].isin(self.ignore)]
if self.to_df and self.sum:
post_df = report.post_df.copy()
post_df["ROM code (Obj sum)"] = object_footprint_df["bytes"].sum()
report.post_df = post_df
mem_footprint_per_object_data = generate_pie_data(object_footprint_df, x="object", y="bytes", topk=topk)
# print("per_object\n", mem_footprint_per_object_data, mem_footprint_per_object_data["bytes"].sum())
if self.to_file:
mem_footprint_per_func_artifact = Artifact(
"mem_footprint_per_object.csv",
content=mem_footprint_per_object_data.to_csv(index=False),
fmt=ArtifactFormat.TEXT,
)
ret_artifacts.append(mem_footprint_per_func_artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["ROM code (by object)"] = str(
mem_footprint_per_object_data.groupby("object", dropna=False).sum().to_dict()["bytes"]
)
report.post_df = post_df
assert self.to_file or self.to_df, "Either to_file or to_df have to be true"
return ret_artifacts
[docs]
class StageTimesGanttPostprocess(SessionPostprocess):
"""Write Mermaid markdown file for stage times."""
DEFAULTS = {
**SessionPostprocess.DEFAULTS,
}
def __init__(self, features=None, config=None):
super().__init__("stage_times_gantt", features=features, config=config)
[docs]
def post_session(self, report):
"""Called at the end of a session."""
artifacts = []
content = """gantt
title Flow
dateFormat x
axisFormat %H:%M:%S
"""
for i, row in report.main_df.iterrows():
content += f" section Run {i}\n"
stage_times = defaultdict(dict)
for key, value in row.items():
# if " Stage Time [s]" in key:
# key = key.replace(" Stage Time [s]", "")
# stage_times[key]["time_s"] = value
if " Start Time [s]" in key:
key = key.replace(" Start Time [s]", "")
stage_times[key]["start"] = value
if " End Time [s]" in key:
key = key.replace(" End Time [s]", "")
stage_times[key]["end"] = value
# stage_times = dict(reversed(list(stage_times.items())))
# print("stage_times", stage_times)
first = True
for stage, times in stage_times.items():
start = times.get("start")
end = times.get("end")
# time_s = times.get("time_s")
time_s = None
start = int(start * 1e3)
end = int(end * 1e3)
if False:
if first:
first = False
content += f" {stage} : 0, {time_s}s\n"
else:
content += f" {stage} : {time_s}s\n"
else:
content += f" {stage} : {start}, {end}\n"
artifact = Artifact("stage_times.mermaid", content=content, fmt=ArtifactFormat.TEXT)
artifacts.append(artifact)
return artifacts
[docs]
class ProfileFunctionsPostprocess(RunPostprocess):
"""Instr-trace based profiling of pcs/functions/objects/libraries."""
DEFAULTS = {
**RunPostprocess.DEFAULTS,
"per_func": True,
"per_object": False,
"per_library": False,
"topk": None,
"min_weight": None,
"to_df": False,
"to_file": True,
}
def __init__(self, features=None, config=None):
super().__init__("profile_functions", features=features, config=config)
@property
def per_pc(self):
"""Get per_pc property."""
value = self.config["per_pc"]
return str2bool(value)
@property
def per_func(self):
"""Get per_func property."""
value = self.config["per_func"]
return str2bool(value)
@property
def per_object(self):
"""Get per_object property."""
value = self.config["per_object"]
return str2bool(value)
@property
def per_library(self):
"""Get per_library property."""
value = self.config["per_library"]
return str2bool(value)
@property
def to_df(self):
"""Get to_df property."""
value = self.config["to_df"]
return str2bool(value)
@property
def topk(self):
"""Get topk property."""
value = self.config["topk"]
if value is None:
return None
return int(value)
@property
def min_weight(self):
"""Get min_weight property."""
value = self.config["min_weight"]
if value is None:
return None
return float(value)
@property
def to_file(self):
"""Get to_file property."""
value = self.config["to_file"]
return str2bool(value)
[docs]
def post_run(self, report, artifacts):
"""Called at the end of a run."""
ret_artifacts = []
elf_artifact = lookup_artifacts(artifacts, name="generic_mlonmcu", fmt=ArtifactFormat.BIN, first_only=True)
assert len(elf_artifact) == 1, "ELF artifact not found!"
elf_artifact = elf_artifact[0]
log_artifact = lookup_artifacts(artifacts, flags=("log_instrs",), fmt=ArtifactFormat.TEXT, first_only=True)
assert len(log_artifact) == 1, "To use analyse_instructions process, please enable feature log_instrs."
log_artifact = log_artifact[0]
func2pc_df, file2funcs_df, pc2locs_df = analyze_dwarf(elf_artifact.path)
func2pc_df[["start", "end"]] = func2pc_df["pc_range"].apply(pd.Series)
is_etiss = "etiss_pulpino" in log_artifact.flags or "etiss" in log_artifact.flags
assert is_etiss, "Only etiss traces supported currently"
if self.topk is not None:
assert NotImplementedError("topk")
if self.min_weight is not None:
assert NotImplementedError("min_weight")
if True:
def transform_df(df):
df["pc"] = df["pc"].apply(lambda x: int(x, 0))
df["pc"] = pd.to_numeric(df["pc"])
df.drop(columns=["rest"], inplace=True)
return df
log_artifact.uncache()
dfs = []
with pd.read_csv(
log_artifact.path, sep=":", names=["pc", "rest"], chunksize=2**22
) as reader: # TODO: expose chunksize
for chunk in reader:
df = transform_df(chunk)
dfs.append(df)
df = pd.concat(dfs)
total_num_instrs = len(df)
def func_pc_helper(x):
matches = func2pc_df[func2pc_df["start"] <= x]
matches = matches[matches["end"] >= x]
if len(matches) == 0:
return None
func = matches["func"].values[0]
return func
pc_counts = df["pc"].value_counts().sort_values(ascending=False).to_frame().reset_index()
pc_counts["func_name"] = pc_counts["pc"].apply(func_pc_helper)
symbol_map_df = None
if self.per_object or self.per_library:
map_artifact = lookup_artifacts(
artifacts, name="generic_mlonmcu.map", fmt=ArtifactFormat.TEXT, first_only=True
)
assert len(map_artifact) == 1, "Linker map artifact not found!"
map_artifact = map_artifact[0]
from mapfile_parser import mapfile
mapFile = mapfile.MapFile()
mapFile.readMapFile(map_artifact.path)
symbol_map = analyze_linker_map_helper(mapFile)
symbol_map_df = pd.DataFrame(
symbol_map,
columns=[
"segment",
"section",
"symbol",
"object",
"object_full",
"library",
"library_full",
],
)
def agg_runtime(runtime_df, symbol_map_df, by: str = "library", col: str = "count"):
runtime_df["func_unmangled"] = runtime_df["func_name"].apply(unmangle_helper)
ret = runtime_df.set_index("func_unmangled").join(symbol_map_df.set_index("symbol"), how="left")
ret = ret[[by, col]]
ret = ret.groupby(by, as_index=True, dropna=False)[col].sum().sort_values(ascending=False).to_frame()
# ret.reset_index(inplace=True)
return ret
if self.per_pc:
pc_counts_ = pc_counts.groupby("pc")["count"].sum().sort_values(ascending=False).to_frame()
pc_counts_["rel_count"] = pc_counts_["count"] / total_num_instrs
if self.to_file:
artifact = Artifact(
"runtime_per_pc.csv",
content=pc_counts_[["count", "rel_count"]].to_csv(index=True),
fmt=ArtifactFormat.TEXT,
)
ret_artifacts.append(artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["RuntimePerPC"] = str(pc_counts_["count"].to_dict())
post_df["RelRuntimePerPC"] = str(pc_counts_["rel_count"].to_dict())
report.post_df = post_df
if self.per_func:
func_counts = pc_counts.groupby("func_name")["count"].sum().sort_values(ascending=False).to_frame()
func_counts["rel_count"] = func_counts["count"] / total_num_instrs
if self.to_file:
artifact = Artifact(
"runtime_per_func.csv",
content=func_counts[["count", "rel_count"]].to_csv(index=True),
fmt=ArtifactFormat.TEXT,
)
ret_artifacts.append(artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["RuntimePerFunc"] = str(func_counts["count"].to_dict())
post_df["RelRuntimePerFunc"] = str(func_counts["rel_count"].to_dict())
report.post_df = post_df
if self.per_object:
assert symbol_map_df is not None
func_counts = (
pc_counts.groupby("func_name")["count"].sum().sort_values(ascending=False).to_frame().reset_index()
)
object_counts = agg_runtime(func_counts, symbol_map_df, col="count", by="object")
object_counts["rel_count"] = object_counts["count"] / total_num_instrs
if self.to_file:
artifact = Artifact(
"runtime_per_object.csv",
content=object_counts[["count", "rel_count"]].to_csv(index=True),
fmt=ArtifactFormat.TEXT,
)
ret_artifacts.append(artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["RuntimePerObject"] = str(object_counts["count"].to_dict())
post_df["RelRuntimePerObject"] = str(object_counts["rel_count"].to_dict())
report.post_df = post_df
if self.per_library:
assert symbol_map_df is not None
func_counts = (
pc_counts.groupby("func_name")["count"].sum().sort_values(ascending=False).to_frame().reset_index()
)
library_counts = agg_runtime(func_counts, symbol_map_df, col="count", by="library")
library_counts["rel_count"] = library_counts["count"] / total_num_instrs
if self.to_file:
artifact = Artifact(
"runtime_per_library.csv",
content=library_counts[["count", "rel_count"]].to_csv(index=True),
fmt=ArtifactFormat.TEXT,
)
ret_artifacts.append(artifact)
if self.to_df:
post_df = report.post_df.copy()
post_df["RuntimePerLibrary"] = str(library_counts["count"].to_dict())
post_df["RelRuntimePerLibrary"] = str(library_counts["rel_count"].to_dict())
report.post_df = post_df
assert self.to_file or self.to_df, "Either to_file or to_df have to be true"
return ret_artifacts