Source code for mlonmcu.models.frontend
#
# Copyright (c) 2022 TUM Department of Electrical and Computer Engineering.
#
# This file is part of MLonMCU.
# See https://github.com/tum-ei-eda/mlonmcu.git for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re
import time
import tempfile
import multiprocessing
from pathlib import Path
from abc import ABC, abstractmethod
from typing import Tuple, List, Dict, Union
import numpy as np
from mlonmcu.feature.features import get_matching_features
from mlonmcu.models.model import (
ModelFormats,
Model,
Program,
ExampleProgram,
EmbenchProgram,
EmbenchIoTProgram,
EmbenchDSPProgram,
TaclebenchProgram,
PolybenchProgram,
CoremarkProgram,
DhrystoneProgram,
MathisProgram,
MibenchProgram,
OpenASIPProgram,
RVVBenchProgram,
ISSBenchProgram,
CryptoBenchProgram,
CmsisDSPProgram,
CmsisNNProgram,
)
from mlonmcu.models.lookup import lookup_models
from mlonmcu.feature.type import FeatureType
from mlonmcu.config import filter_config, str2bool
from mlonmcu.artifact import Artifact, ArtifactFormat
from mlonmcu.setup import utils
from mlonmcu.target.metrics import Metrics
from mlonmcu.logging import get_logger
logger = get_logger()
[docs]
def check_integrity(algorithm: str, value: str, file_path: Union[str, Path], check: bool = True):
import hashlib
hash_func = hashlib.new(algorithm)
with open(file_path, "rb") as file:
# Read the file in chunks of 8192 bytes
while chunk := file.read(8192):
hash_func.update(chunk)
file_hash = hash_func.hexdigest()
hash_matches = file_hash == value
if check:
assert hash_matches, f"Model hash ({algorithm}) missmatch: {file_hash} vs. {value}"
return hash_matches
[docs]
class Frontend(ABC):
FEATURES = {"validate"}
DEFAULTS = {
"use_inout_data": False,
# the following should be configured using gen_data feature
"gen_data": False,
"gen_data_fill_mode": None,
"gen_data_file": None,
"gen_data_number": None,
"gen_data_fmt": None,
# the following should be configured using gen_ref_data feature
"gen_ref_data": False,
"gen_ref_data_mode": None,
"gen_ref_data_file": None,
"gen_ref_data_fmt": None,
"gen_ref_labels": False,
"gen_ref_labels_mode": None,
"gen_ref_labels_file": None,
"gen_ref_labels_fmt": None,
}
REQUIRED = set()
OPTIONAL = set()
def __init__(self, name, input_formats=None, output_formats=None, features=None, config=None):
self.name = name
self.input_formats = input_formats if input_formats else []
self.output_formats = output_formats if output_formats else []
self.config = config if config else {}
self.features = self.process_features(features)
self.config = filter_config(self.config, self.name, self.DEFAULTS, self.OPTIONAL, self.REQUIRED)
def __repr__(self):
probs = []
if self.name:
probs.append(self.name)
if self.features and len(self.features) > 0:
probs.append(str(self.features))
if self.config and len(self.config) > 0:
probs.append(str(self.config))
return "Frontend(" + ",".join(probs) + ")"
@property
def use_inout_data(self):
value = self.config["use_inout_data"]
return str2bool(value)
@property
def gen_data(self):
value = self.config["gen_data"]
return str2bool(value)
@property
def gen_data_fill_mode(self):
value = self.config["gen_data_fill_mode"]
assert value in ["random", "ones", "zeros", "file", "dataset"]
return value
@property
def gen_data_file(self):
return self.config["gen_data_file"]
@property
def gen_data_number(self):
return int(self.config["gen_data_number"])
@property
def gen_data_fmt(self):
value = self.config["gen_data_fmt"]
assert value in ["npy", "npz"]
return value
@property
def gen_ref_data(self):
value = self.config["gen_ref_data"]
return str2bool(value)
@property
def gen_ref_data_mode(self):
value = self.config["gen_ref_data_mode"]
assert value in ["file", "model"]
return value
@property
def gen_ref_data_file(self):
return self.config["gen_ref_data_file"]
@property
def gen_ref_data_fmt(self):
value = self.config["gen_ref_data_fmt"]
assert value in ["npy", "npz"]
return value
@property
def gen_ref_labels(self):
value = self.config["gen_ref_labels"]
return str2bool(value)
@property
def gen_ref_labels_mode(self):
value = self.config["gen_ref_labels_mode"]
assert value in ["file", "model"]
return value
@property
def gen_ref_labels_file(self):
return self.config["gen_ref_labels_file"]
@property
def gen_ref_labels_fmt(self):
value = self.config["gen_ref_labels_fmt"]
assert value in ["npy", "npz", "txt", "csv"]
return value
[docs]
def inference(self, model: Model, input_data: Dict[str, np.array]):
raise NotImplementedError
[docs]
def supports_formats(self, ins=None, outs=None):
"""Returs true if the frontend can handle at least one combination of input and output formats."""
assert ins is not None or outs is not None, "Please provide a list of input formats, outputs formats or both"
ret = True
if ins:
if not isinstance(ins, list):
ins = [ins]
supported = any(fmt in self.input_formats for fmt in ins)
ret = ret and supported
if outs:
if not isinstance(outs, list):
outs = [outs]
supported = any(fmt in self.output_formats for fmt in ins)
ret = ret and supported
return ret
[docs]
def lookup_models(self, names, config=None, context=None):
return lookup_models(names, frontends=[self], config=config, context=context)
[docs]
def process_features(self, features):
if features is None:
return []
features = get_matching_features(features, FeatureType.FRONTEND)
for feature in features:
assert ( # If this assertion occurs, continue with the next frontend instead of failing
# (TODO: create custom exception type)
feature.name
in self.FEATURES
), f"Incompatible feature: {feature.name}"
# Instead we might introduce self.compatible and set it to true at this line
feature.used = True
feature.add_frontend_config(self.name, self.config)
feature.update_formats(self.name, self.input_formats, self.output_formats)
return features
[docs]
@abstractmethod
# def produce_artifacts(self, model):
def produce_artifacts(self, model):
pass
[docs]
def generate_input_data(self, input_names, input_types, input_shapes, input_ranges, input_quant_details, in_paths):
# TODO: drop self and move method out of frontends.py, support non-tflite models
assert self.gen_data
inputs_data = []
if self.gen_data_fill_mode in ["zeros", "ones", "random"]:
for i in range(self.gen_data_number):
data = {}
NEW = True
for ii, input_name in enumerate(input_names):
assert input_name in input_types, f"Unknown dtype for input: {input_name}"
dtype = input_types[input_name]
quant = input_quant_details.get(input_name, None)
rng = input_ranges.get(input_name, None)
gen_dtype = dtype
if quant:
_, _, ty = quant
assert "float" in ty, "Input already quantized?"
if NEW:
gen_dtype = ty
assert input_name in input_shapes, f"Unknown shape for input: {input_name}"
shape = input_shapes[input_name]
if self.gen_data_fill_mode == "zeros":
arr = np.zeros(shape, dtype=gen_dtype)
elif self.gen_data_fill_mode == "ones":
arr = np.ones(shape, dtype=gen_dtype)
elif self.gen_data_fill_mode == "random":
DIST = "uniform"
if DIST == "uniform":
UPPER = None
LOWER = None
if rng is not None:
assert len(rng) == 2, "Range should be a tuple (lower, upper)"
LOWER, UPPER = rng
if "float" in gen_dtype:
if UPPER is None:
# UPPER = 1.0
UPPER = 0.5
if LOWER is None:
# LOWER = -1.0
LOWER = -0.5
elif "int" in gen_dtype:
dtype_info = (np.iinfo(gen_dtype),)
if UPPER is None:
UPPER = dtype_info.max
else:
assert UPPER <= dtype_info.max, "Out of dtype bound"
if LOWER is None:
LOWER = dtype_info.min
else:
assert LOWER >= dtype_info.min, "Out of dtype bound"
else:
raise RuntimeError(f"Unsupported dtype: {gen_dtype}")
assert LOWER <= UPPER
RANGE = UPPER - LOWER
assert RANGE > 0
arr = np.random.uniform(LOWER, UPPER, shape)
arr = arr.astype(gen_dtype)
# input("?=")
# if "float" in dtype:
# arr = np.random.rand(*shape).astype(dtype)
# elif "int" in dtype:
# arr = np.random.randint(np.iinfo(dtype).min,
# np.iinfo(dtype).max, size=shape, dtype=dtype)
# else:
# assert False
# Quantize if required
# if gen_dtype != dtype:
if quant:
assert "int" in dtype
# assert quant
scale, shift, ty = quant
arr = (arr / scale) + shift
arr = np.around(arr)
arr = arr.astype(dtype)
# input("!=")
else:
raise RuntimeError(f"Unsupported distribution: {DIST}")
else:
assert False
data[input_name] = arr
assert len(data) > 0
inputs_data.append(data)
elif self.gen_data_fill_mode == "file":
if self.gen_data_file == "auto":
assert len(in_paths) > 0, "in_paths is empty"
if len(in_paths) == 1:
if in_paths[0].is_dir():
files = list(in_paths[0].iterdir())
else:
assert in_paths[0].is_file(), "in_paths not found"
files = [in_paths[0]]
else:
files = in_paths
temp = {}
NEW = True
for file in files:
if not isinstance(file, Path):
file = Path(file)
assert file.is_file(), f"Not found: {file}"
basename, ext = file.stem, file.suffix
if ext == ".bin":
if "_" in basename:
i, ii = basename.split("_", 1)
i = int(i)
ii = int(ii)
else:
i = int(basename)
ii = 0
with open(file, "rb") as f:
data = f.read()
if i not in temp:
temp[i] = {}
temp[i][ii] = data
elif ext in [".npy", ".npz"]:
raise NotImplementedError
else:
raise RuntimeError(f"Unsupported ext: {ext}")
assert len(temp) > 0
for i in range(min(self.gen_data_number, len(temp))):
assert i in temp
data = {}
for ii, input_name in enumerate(input_names):
assert ii in temp[i]
assert input_name in input_types, f"Unknown dtype for input: {input_name}"
dtype = input_types[input_name]
quant = input_quant_details.get(input_name, None)
rng = input_ranges.get(input_name, None)
gen_dtype = dtype
if quant:
_, _, ty, _ = quant
# assert "float" in ty, "Input already quantized?"
if NEW:
gen_dtype = ty
arr = np.frombuffer(temp[i][ii], dtype=gen_dtype)
assert input_name in input_shapes, f"Unknown shape for input: {input_name}"
shape = input_shapes[input_name]
arr = np.reshape(arr, shape)
# Quantize if required
# if gen_dtype != dtype:
if True:
assert "int" in dtype
# assert quant
scale, shift, ty, qrng = quant
if qrng is not None:
assert len(qrng) == 2, "Range should be a tuple (lower, upper)"
lower, upper = qrng
assert lower <= upper
CLIP_INPUTS = True
if CLIP_INPUTS:
arr = np.clip(arr, lower, upper)
else:
assert np.min(arr) >= lower or np.isclose(
np.min(arr), lower
), "Range missmatch (lower)"
assert np.max(arr) <= upper or np.isclose(
np.max(arr), upper
), "Range missmatch (upper)"
arr = (arr / scale) + shift
arr = np.around(arr)
arr = arr.astype(dtype)
# input("!=")
if rng is not None:
# TODO: Move shared code!
assert len(rng) == 2, "Range should be a tuple (lower, upper)"
lower, upper = rng
assert lower <= upper
CLIP_INPUTS = True
if CLIP_INPUTS:
arr = np.clip(arr, lower, upper)
else:
assert np.min(arr) >= lower or np.isclose(np.min(arr), lower), "Range missmatch (lower)"
assert np.max(arr) <= upper or np.isclose(np.max(arr), upper), "Range missmatch (upper)"
data[input_name] = arr
inputs_data.append(data)
else:
assert self.gen_data_file is not None, "Missing value for gen_data_file"
file = Path(self.gen_data_file)
assert file.is_file(), f"File not found: {file}"
# for i, input_name in enumerate(input_names):
elif self.gen_data_fill_mode == "dataset":
raise NotImplementedError
else:
raise RuntimeError(f"unsupported fill_mode: {self.gen_data_fill_mode}")
return inputs_data
[docs]
def generate_output_ref_data(
self, inputs_data, model, out_paths, output_names, output_types, output_shapes, output_quant_details
):
assert self.gen_ref_data
outputs_data = []
if self.gen_ref_data_mode == "model":
assert len(inputs_data) > 0
for i, input_data in enumerate(inputs_data):
# input("321?")
output_data = self.inference(model, input_data, quant=False, dequant=True)
outputs_data.append(output_data)
# input("321!")
elif self.gen_ref_data_mode == "file":
if self.gen_ref_data_file == "auto":
assert len(out_paths) > 0, "out_paths is empty"
if len(out_paths) == 1:
if out_paths[0].is_dir():
files = list(out_paths[0].iterdir())
else:
files = out_paths
temp = {}
assert len(inputs_data) <= len(
files
), f"Missing output data for provided inputs. (Expected: {len(inputs_data)}, Got: {len(files)})"
for file in files:
if not isinstance(file, Path):
file = Path(file)
assert file.is_file()
basename, ext = file.stem, file.suffix
if ext == ".bin":
if "_" in basename:
i, ii = basename.split("_", 1)
i = int(i)
ii = int(ii)
else:
i = int(basename)
ii = 0
with open(file, "rb") as f:
data = f.read()
if i not in temp:
temp[i] = {}
temp[i][ii] = data
elif ext in [".npy", ".npz"]:
raise NotImplementedError
else:
raise RuntimeError(f"Unsupported ext: {ext}")
# TODO: handle case where there are more output samples than input samples?
for i in range(len(temp)):
assert i in temp
data = {}
for ii, output_name in enumerate(output_names):
assert ii in temp[i]
assert output_name in output_types, f"Unknown dtype for output: {output_name}"
dtype = output_types[output_name]
dequant = output_quant_details.get(output_name, None)
if dequant:
_, _, ty, _ = dequant
dtype = ty
arr = np.frombuffer(temp[i][ii], dtype=dtype)
assert output_name in output_shapes, f"Unknown shape for output: {output_name}"
shape = output_shapes[output_name]
arr = np.reshape(arr, shape)
data[output_name] = arr
outputs_data.append(data)
else:
assert self.gen_ref_data_file is not None, "Missing value for gen_ref_data_file"
file = Path(self.gen_data_file)
assert file.is_file(), f"File not found: {file}"
raise NotImplementedError
else:
raise RuntimeError(f"unsupported fill_mode: {self.gen_ref_data_mode}")
return outputs_data
[docs]
def generate_ref_labels(
self, inputs_data, model, out_labels_paths, output_names, output_types, output_shapes, output_quant_details
):
assert self.gen_ref_labels
labels = []
if self.gen_ref_labels_mode == "model":
assert len(inputs_data) > 0
for i, input_data in enumerate(inputs_data):
output_data = self.inference(model, input_data, quant=False, dequant=True)
assert len(output_data) == 1, "Does not support multi-output classification"
output_data = output_data[list(output_data)[0]]
top_label = np.argmax(output_data)
labels.append(top_label)
elif self.gen_ref_labels_mode == "file":
if self.gen_ref_labels_file == "auto":
assert len(out_labels_paths) > 0, "labels_paths is empty"
assert len(out_labels_paths) == 1
file = Path(out_labels_paths[0])
else:
assert self.gen_ref_labels_file is not None, "Missing value for gen_ref_labels_file"
file = Path(self.gen_ref_labels_file)
assert file.is_file(), f"File not found: {file}"
ext = file.suffix
assert len(ext) > 1
fmt = ext[1:].lower()
if fmt == "csv":
import pandas as pd
labels_df = pd.read_csv(file, sep=",")
assert "i" in labels_df.columns
assert "label_idx" in labels_df.columns
assert len(inputs_data) <= len(labels_df)
labels_df.sort_values("i", inplace=True)
labels = list(labels_df["label_idx"].astype(int))[: len(inputs_data)]
else:
raise NotImplementedError(f"Fmt not supported: {fmt}")
else:
raise RuntimeError(f"unsupported fill_mode: {self.gen_ref_labels_mode}")
return labels
[docs]
def generate_model_info(
self,
input_names,
output_names,
input_shapes,
output_shapes,
input_types,
output_types,
input_ranges,
output_ranges,
input_quant_details,
output_quant_details,
):
model_info_dict = {
"input_names": input_names,
"output_names": output_names,
"input_shapes": list(input_shapes.values()),
"output_shapes": list(output_shapes.values()),
"input_types": list(input_types.values()),
"output_types": list(output_types.values()),
"input_ranges": list(input_ranges.values()),
"output_ranges": list(output_ranges.values()),
"input_quant_details": list(input_quant_details.values()),
"output_quant_details": list(output_quant_details.values()),
}
# nested version
# model_info_dict = {
# "inputs": [
# {
# "name": "input_1",
# "shape": [1, 1014],
# "type": "int8",
# }
# ],
# "outputs": [
# {
# "name": "output",
# "shape": [1, 10],
# "type": "int8",
# }
# ],
# }
return model_info_dict # TODO: turn into class
[docs]
def process_metadata(self, model, cfg=None):
model_dir = Path(model.paths[0]).parent.resolve()
metadata = model.metadata
in_paths = []
out_paths = []
labels_paths = []
input_shapes = {}
output_shapes = {}
input_types = {}
output_types = {}
input_ranges = {}
output_ranges = {}
input_quant_details = {}
output_quant_details = {}
if metadata is not None and "network_parameters" in metadata:
network = metadata["network_parameters"]
assert "input_nodes" in network
ins = network["input_nodes"]
for inp in ins:
name = inp.get("name", None)
shape = inp.get("shape", None)
ty = inp.get("dtype", None)
if ty is None:
ty = inp.get("type", None) # legacy
rng = inp.get("range", None)
quantize = inp.get("quantize", None)
if name and shape:
input_shapes[name] = shape
if name and ty:
input_types[name] = ty
if name and rng:
input_ranges[name] = rng
if name and quantize:
quant_scale = quantize.get("scale", None)
quant_zero_shift = quantize.get("zero_shift", None)
quant_dtype = quantize.get("dtype", None)
quant_range = quantize.get("range", None)
quant_details = [quant_scale, quant_zero_shift, quant_dtype, quant_range]
input_quant_details[name] = quant_details
if self.use_inout_data or (
self.gen_data and self.gen_data_fill_mode == "file" and self.gen_data_file == "auto"
):
if "example_input" in inp and "path" in inp["example_input"]:
in_data_dir = Path(inp["example_input"]["path"])
# TODO: this will only work with relative paths to model dir! (Fallback to parent directories?)
in_path = model_dir / in_data_dir
assert (
in_path.is_dir()
), f"Input data directory defined in model metadata does not exist: {in_path}"
in_paths.append(in_path)
assert "output_nodes" in network
outs = network["output_nodes"]
for outp in outs:
name = outp.get("name", None)
shape = outp.get("shape", None)
ty = outp.get("dtype", None)
if ty is None:
ty = outp.get("type", None) # legacy
rng = outp.get("range", None)
dequantize = outp.get("dequantize", None)
if name and shape:
output_shapes[name] = shape
if name and ty:
output_types[name] = ty
if name and rng:
output_ranges[name] = rng
if name and dequantize:
quant_scale = dequantize.get("scale", None)
quant_zero_shift = dequantize.get("zero_shift", None)
quant_dtype = dequantize.get("dtype", None)
quant_range = dequantize.get("range", None)
quant_details = [quant_scale, quant_zero_shift, quant_dtype, quant_range]
output_quant_details[name] = quant_details
if self.use_inout_data or (
self.gen_ref_data and self.gen_ref_data_mode == "file" and self.gen_ref_data_file == "auto"
):
if "test_output_path" in outp:
out_data_dir = Path(outp["test_output_path"])
out_path = model_dir / out_data_dir
assert (
out_path.is_dir()
), f"Output data directory defined in model metadata does not exist: {out_path}"
out_paths.append(out_path)
if self.gen_ref_labels and self.gen_ref_labels_mode == "file" and self.gen_ref_labels_file == "auto":
if "test_labels_file" in outp:
labels_file = Path(outp["test_labels_file"])
labels_path = model_dir / labels_file
assert (
labels_path.is_file()
), f"Labels file defined in model metadata does not exist: {labels_path}"
labels_paths.append(labels_path)
else:
fallback_in_path = model_dir / "input"
if fallback_in_path.is_dir():
in_paths.append(fallback_in_path)
fallback_out_path = model_dir / "output"
if fallback_out_path.is_dir():
out_paths.append(fallback_out_path)
fallback_labels_path = model_dir / "output_labels.csv"
if fallback_labels_path.is_file():
labels_paths.append(fallback_labels_path)
if model.inputs_path:
logger.info("Overriding default model input data with user path")
in_paths = [model.inputs_path]
if model.outputs_path:
logger.info("Overriding default model output data with user path")
out_paths = [model.outputs_path]
if model.output_labels_path: # TODO
logger.info("Overriding default model output labels with user path")
labels_paths = [model.output_labels_path]
if metadata is not None and "backends" in metadata:
assert cfg is not None
backend_options = metadata["backends"]
for backend in backend_options:
if backend_options[backend] is not None:
flattened = {f"{backend}.{key}": value for key, value in backend_options[backend].items()}
cfg.update(flattened)
if len(input_shapes) > 0:
assert len(input_types) in [len(input_shapes), 0]
input_names = list(input_shapes.keys())
elif len(input_types) > 0:
input_names = list(input_types.keys())
else:
input_names = []
if metadata is None:
try:
(
input_names,
input_shapes,
input_types,
input_quant_details,
output_names,
output_shapes,
output_types,
output_quant_details,
) = self.extract_model_info(model)
except NotImplementedError:
logger.warning("Model info could not be extracted.")
# Detect model support code (Allow overwrite in metadata YAML)
if model.support_path:
support_path = model.support_path
else:
support_path = model_dir / "support"
if support_path.is_dir():
assert cfg is not None
# TODO: onlu overwrite if unset?
if cfg.get("mlif.model_support_dir", None) is not None:
cfg.update({"mlif.model_support_dir": support_path})
# cfg.update({"espidf.model_support_dir": support_path})
# cfg.update({"zephyr.model_support_dir": support_path})
if len(in_paths) > 0:
cfg.update({"mlif.input_data_path": in_paths})
# cfg.update({"espidf.input_data_path": in_paths})
# cfg.update({"zephyr.input_data_path": in_paths})
if len(out_paths) > 0:
cfg.update({"mlif.output_data_path": out_paths})
# cfg.update({"espidf.output_data_path": out_paths})
# cfg.update({"zephyr.output_data_path": out_paths})
if len(labels_paths) > 0:
cfg.update({"mlif.output_labels_path": labels_paths})
if len(input_shapes) > 0:
cfg.update({f"{model.name}.input_shapes": input_shapes})
if len(output_shapes) > 0:
cfg.update({f"{model.name}.output_shapes": output_shapes})
if len(input_types) > 0:
cfg.update({f"{model.name}.input_types": input_types})
if len(output_types) > 0:
cfg.update({f"{model.name}.output_types": output_types})
# flattened version
if len(output_shapes) > 0:
assert len(output_types) in [len(output_shapes), 0]
output_names = list(output_shapes.keys())
elif len(output_shapes) > 0:
output_names = list(output_types.keys())
else:
output_names = []
artifacts = []
inputs_data = None
gen_model_info = True # TODO: move to self (configurable)
if gen_model_info:
model_info_dict = self.generate_model_info(
input_names,
output_names,
input_shapes,
output_shapes,
input_types,
output_types,
input_ranges,
output_ranges,
input_quant_details,
output_quant_details,
)
import yaml
content = yaml.dump(model_info_dict)
model_info_artifact = Artifact(
"model_info.yml", content=content, fmt=ArtifactFormat.TEXT, flags=("model_info",)
)
artifacts.append(model_info_artifact)
if self.gen_data:
inputs_data = self.generate_input_data(
input_names, input_types, input_shapes, input_ranges, input_quant_details, in_paths
)
fmt = self.gen_data_fmt
if fmt == "npy":
with tempfile.TemporaryDirectory() as tmpdirname:
tempfilename = Path(tmpdirname) / "inputs.npy"
np.save(tempfilename, inputs_data)
with open(tempfilename, "rb") as f:
raw = f.read()
elif fmt == "npz":
raise NotImplementedError
else:
raise RuntimeError(f"Unsupported fmt: {fmt}")
assert raw
inputs_data_artifact = Artifact(f"inputs.{fmt}", raw=raw, fmt=ArtifactFormat.BIN, flags=("inputs", fmt))
artifacts.append(inputs_data_artifact)
if self.gen_ref_data:
outputs_ref_data = self.generate_output_ref_data(
inputs_data, model, out_paths, output_names, output_types, output_shapes, output_quant_details
)
fmt = self.gen_data_fmt
if fmt == "npy":
with tempfile.TemporaryDirectory() as tmpdirname:
tempfilename = Path(tmpdirname) / "outputs_ref.npy"
np.save(tempfilename, outputs_ref_data)
with open(tempfilename, "rb") as f:
raw = f.read()
elif fmt == "npz":
raise NotImplementedError
else:
raise RuntimeError(f"Unsupported fmt: {fmt}")
assert raw
outputs_ref_artifact = Artifact(
f"outputs_ref.{fmt}", raw=raw, fmt=ArtifactFormat.BIN, flags=("outputs_ref", fmt)
)
artifacts.append(outputs_ref_artifact)
if self.gen_ref_labels:
labels_ref = self.generate_ref_labels(
inputs_data, model, labels_paths, output_names, output_types, output_shapes, output_quant_details
)
fmt = self.gen_ref_labels_fmt
if fmt == "npy":
with tempfile.TemporaryDirectory() as tmpdirname:
tempfilename = Path(tmpdirname) / "labels.npy"
np.save(tempfilename, labels_ref)
with open(tempfilename, "rb") as f:
raw = f.read()
elif fmt == "npz":
raise NotImplementedError
elif fmt == "txt":
raise NotImplementedError
elif fmt == "csv":
raise NotImplementedError
else:
raise RuntimeError(f"Unsupported fmt: {fmt}")
assert raw
labels_ref_artifact = Artifact(
f"labels_ref.{fmt}", raw=raw, fmt=ArtifactFormat.BIN, flags=("labels_ref", fmt)
)
artifacts.append(labels_ref_artifact)
return artifacts
[docs]
def generate(self, model) -> Tuple[dict, dict]:
artifacts = []
count = len(model.paths)
assert count == len(model.formats)
assert count > 0, f"'{self.name}' frontend expects at least one model"
max_ins = len(self.input_formats)
assert count <= max_ins, f"'{self.name}' frontend did not expect more than {max_ins} models"
formats = model.formats
assert self.supports_formats(formats), f"Invalid model format for '{self.name}' frontend"
artifacts = self.produce_artifacts(model)
if not isinstance(artifacts, list):
artifacts = [artifacts]
assert len(artifacts) > 0, f"'{self.name}' frontend should produce at least one model"
max_outs = len(self.output_formats)
assert len(artifacts) <= max_outs, f"'{self.name}' frontend should not return more than {max_outs}"
# If we want to use the same instance of this Frontend in parallel, we need to get rid of self.artifacts...
return {"default": artifacts}, {}
[docs]
def generate_artifacts(self, model) -> List[Artifact]:
start_time = time.time()
artifacts, metrics = self.generate(model)
# TODO: do something with out?
end_time = time.time()
diff = end_time - start_time
if len(metrics) == 0:
metrics = {"default": Metrics()}
for name, metrics_ in metrics.items():
if name == "default":
metrics_.add("Load Stage Time [s]", diff, True)
content = metrics_.to_csv(include_optional=True)
artifact = Artifact("load_metrics.csv", content=content, fmt=ArtifactFormat.TEXT, flags=["metrics"])
if name not in artifacts:
artifacts[name] = []
artifacts[name].append(artifact)
self.artifacts = artifacts
return artifacts
[docs]
def export_artifacts(self, path):
assert len(self.artifacts) > 0, "No artifacts found, please run generate_artifacts() first"
if not isinstance(path, Path):
path = Path(path)
is_dir = len(path.suffix) == 0
if is_dir:
assert (
path.is_dir()
), "The supplied path does not exists." # Make sure it actually exists (we do not create it by default)
for artifact in self.artifacts:
artifact.export(path)
else:
raise NotImplementedError
[docs]
def add_platform_config(self, platform, config):
config.update(self.get_platform_config(platform))
[docs]
class SimpleFrontend(Frontend):
"""An abstract frontend with equivalent input and output formats."""
# Assumption: only raw model data
def __init__(self, name, fmt, features=None, config=None):
super().__init__(
name,
input_formats=[fmt],
output_formats=[fmt],
features=features,
config=config,
)
[docs]
def produce_artifacts(self, model):
assert len(self.input_formats) == len(self.output_formats) == len(model.paths) == 1
artifacts = []
name = model.name
assert "/" not in name
path = model.paths[0]
ext = self.input_formats[0].extension
with open(path, "rb") as handle: # TODO: is an onnx model raw data or text?
raw = handle.read()
artifacts.append(Artifact(f"{name}.{ext}", raw=raw, fmt=ArtifactFormat.RAW, flags=["model"]))
return artifacts
# TODO: move to frontends.py
# TODO: frontend parsed metadata instead of lookup.py?
# TODO: how to find inout_data?
[docs]
class TfLiteFrontend(SimpleFrontend):
FEATURES = Frontend.FEATURES | {
"visualize",
"split_layers",
"tflite_analyze",
"gen_data",
"gen_ref_data",
"gen_ref_labels",
}
DEFAULTS = {
**Frontend.DEFAULTS,
"visualize_enable": False,
"visualize_script": None,
"split_layers": False,
"pack_script": None,
"analyze_enable": False,
"analyze_script": None,
"check_integrity": False,
}
REQUIRED = Frontend.REQUIRED
OPTIONAL = SimpleFrontend.OPTIONAL | {"tflite_analyze.script"}
def __init__(self, features=None, config=None):
super().__init__(
"tflite",
ModelFormats.TFLITE,
features=features,
config=config,
)
[docs]
def process_metadata(self, model, cfg=None):
ret = super().process_metadata(model, cfg=cfg)
metadata = model.metadata
if self.check_integrity:
checked = False
if metadata is not None:
network_data = metadata.get("network")
if network_data is not None:
hash_data = network_data.get("hash")
if hash_data is not None:
algorithm = hash_data.get("algorithm", "sha1")
value = hash_data.get("value", None)
check_integrity(algorithm, value, model.paths[0])
checked = True
if not checked:
logger.debug("Skipping integrity check (missing hash)")
return ret
@property
def check_integrity(self):
value = self.config["check_integrity"]
return str2bool(value)
@property
def visualize_enable(self):
value = self.config["visualize_enable"]
return str2bool(value)
@property
def split_layers(self):
value = self.config["split_layers"]
return str2bool(value)
@property
def visualize_script(self):
return self.config["visualize_script"]
@property
def pack_script(self):
return self.config["pack_script"]
@property
def analyze_enable(self):
value = self.config["analyze_enable"]
return str2bool(value)
@property
def analyze_script(self):
return self.config["analyze_script"]
[docs]
def extract_model_info(self, model: Model):
import tensorflow as tf
model_path = str(model.paths[0])
interpreter = tf.lite.Interpreter(model_path=model_path)
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
input_names = []
input_shapes = {}
input_types = {}
input_quant_details = {}
output_names = []
output_shapes = {}
output_types = {}
output_quant_details = {}
for inp in input_details:
name = str(inp["name"])
input_names.append(name)
input_shapes[name] = inp["shape"].tolist()
input_types[name] = np.dtype(inp["dtype"]).name
if "quantization" in inp:
scale, zero_point = inp["quantization"]
quant = [scale, zero_point, "float32"]
input_quant_details[name] = quant
for outp in output_details:
name = str(outp["name"])
output_names.append(name)
output_shapes[name] = outp["shape"].tolist()
output_types[name] = np.dtype(outp["dtype"]).name
if "quantization" in outp:
scale, zero_point = outp["quantization"]
quant = [scale, zero_point, "float32"]
output_quant_details[name] = quant
return (
input_names,
input_shapes,
input_types,
input_quant_details,
output_names,
output_shapes,
output_types,
output_quant_details,
)
[docs]
def inference(self, model: Model, input_data: Dict[str, np.array], quant=False, dequant=False, verbose=False):
import tensorflow as tf
model_path = str(model.paths[0])
interpreter = tf.lite.Interpreter(model_path=model_path)
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
interpreter.allocate_tensors()
if verbose:
print()
print("Input details:")
print(input_details)
print()
print("Output details:")
print(output_details)
print()
assert len(input_details) == 1, "Multi-inputs not yet supported"
input_type = input_details[0]["dtype"]
input_name = input_details[0]["name"]
input_shape = input_details[0]["shape"]
assert input_name in input_data, f"Input {input_name} fot found in data"
np_features = input_data[input_name]
if quant and input_type == np.int8:
input_scale, input_zero_point = input_details[0]["quantization"]
if verbose:
print("Input scale:", input_scale)
print("Input zero point:", input_zero_point)
print()
np_features = (np_features / input_scale) + input_zero_point
np_features = np.around(np_features)
np_features = np_features.astype(input_type)
np_features = np_features.reshape(input_shape)
interpreter.set_tensor(input_details[0]["index"], np_features)
interpreter.invoke()
output = interpreter.get_tensor(output_details[0]["index"])
# If the output type is int8 (quantized model), rescale data
assert len(output_details) == 1, "Multi-outputs not yet supported"
output_type = output_details[0]["dtype"]
output_name = output_details[0]["name"]
if dequant and output_type == np.int8:
output_scale, output_zero_point = output_details[0]["quantization"]
if verbose:
print("Raw output scores:", output)
print("Output scale:", output_scale)
print("Output zero point:", output_zero_point)
print()
output = output_scale * (output.astype(np.float32) - output_zero_point)
if verbose:
# Print the results of inference
print("Inference output:", output, type(output))
return {output_name: output}
[docs]
def produce_artifacts(self, model):
assert len(self.input_formats) == len(model.paths) == 1
artifacts = []
name = model.name
# assert "/" not in name
if "/" in name:
name = name.rsplit("/", 1)[-1]
path = model.paths[0]
ext = self.input_formats[0].extension
with open(path, "rb") as handle:
raw = handle.read()
artifacts.append(Artifact(f"{name}.{ext}", raw=raw, fmt=ArtifactFormat.RAW, flags=["model"]))
if self.analyze_enable:
with tempfile.TemporaryDirectory() as tmpdirname:
out_file = str(Path(tmpdirname) / "tflite_analyze.csv")
args = [
path,
"--csv",
out_file,
"--ops",
"--estimate-macs",
"--estimate-rom",
"--estimate-ram",
]
assert self.analyze_script is not None
assert Path(self.analyze_script).is_file(), f"Script {self.analyze_script} not found."
utils.python(self.analyze_script, *args)
with open(out_file, "r") as handle:
tflite_analyze_csv = handle.read()
tflite_analyze_artifact = Artifact(
"tflite_analyze.csv",
content=tflite_analyze_csv,
fmt=ArtifactFormat.TEXT,
)
artifacts.append(tflite_analyze_artifact)
if self.visualize_enable:
assert self.visualize_script is not None
in_file = model.paths[0]
ext = "html"
with tempfile.TemporaryDirectory() as tmpdirname:
out_file = str(Path(tmpdirname) / f"tflite_visualize.{ext}")
utils.python(self.visualize_script, in_file, out_file)
with open(out_file, "r") as handle:
tflite_visualize_text = handle.read()
tflite_visualize_artifact = Artifact(
f"tflite_visualize.{ext}",
content=tflite_visualize_text,
fmt=ArtifactFormat.TEXT,
)
artifacts.append(tflite_visualize_artifact)
if self.visualize_enable and self.analyze_enable:
assert len(self.output_formats) == 3
elif self.visualize_enable or self.analyze_enable:
assert len(self.output_formats) == 2
else:
assert len(self.output_formats) == 1
return artifacts
[docs]
def generate(self, model) -> Tuple[dict, dict]:
if self.split_layers:
artifacts = {}
name = model.name
path = model.paths[0]
formats = model.formats
assert self.supports_formats(formats), f"Invalid model format for '{self.name}' frontend"
ret = self.produce_artifacts(model)
if not isinstance(ret, list):
ret = [ret]
assert len(ret) > 0, f"'{self.name}' frontend should produce at least one model"
max_outs = len(self.output_formats)
assert len(ret) <= max_outs, f"'{self.name}' frontend should not return more than {max_outs}"
artifacts["default"] = ret
with tempfile.TemporaryDirectory() as tmpdirname:
def get_num_layers(file):
tflite_pack_args = [path, "--count-layers", "--noop"]
out = utils.execute(self.pack_script, *tflite_pack_args)
matches = re.compile(r"Found\s(\d+)\slayers.").findall(out)
assert len(matches) == 1
num = int(matches[0])
return num
replace = False
# replace = True
drop = False
# drop = True
def gen_layer_files(file, dest):
results = []
num_layers = get_num_layers(file)
assert num_layers > 0
keep = None
if replace:
assert keep is not None and len(keep) == 1
for i in range(num_layers):
if keep and i not in keep:
continue
out_name = f"layer{i}"
out_file = Path(dest) / out_name
tflite_pack_args = [path, "-k", str(i), "--out", out_file]
utils.execute(self.pack_script, *tflite_pack_args)
assert out_file.is_file()
results.append(out_file)
return results
layer_files = gen_layer_files(path, tmpdirname)
for i, layer_file in enumerate(layer_files):
subrun = f"layer{i}"
layer_name = f"{name}_{subrun}"
layer_model = Model(layer_name, [layer_file])
ret = self.produce_artifacts(layer_model)
if not isinstance(ret, list):
ret = [ret]
assert len(ret) > 0, f"'{self.name}' frontend should produce at least one model"
max_outs = len(self.output_formats)
assert len(ret) <= max_outs, f"'{self.name}' frontend should not return more than {max_outs}"
if replace:
subrun = "default"
artifacts[subrun] = ret
if drop:
del artifacts["default"]
return artifacts, {}
else:
return super().generate(model)
[docs]
class RelayFrontend(SimpleFrontend):
FEATURES = Frontend.FEATURES | {"relayviz"}
DEFAULTS = {**Frontend.DEFAULTS, "visualize_graph": False, "relayviz_plotter": "term"}
REQUIRED = Frontend.REQUIRED | {"tvm.build_dir", "tvm.pythonpath"}
def __init__(self, features=None, config=None):
super().__init__(
"relay",
ModelFormats.RELAY,
features=features,
config=config,
)
@property
def visualize_graph(self):
value = self.config["visualize_graph"]
return str2bool(value)
@property
def relayviz_plotter(self):
return self.config["relayviz_plotter"]
@property
def tvm_build_dir(self):
return self.config["tvm.build_dir"]
@property
def tvm_pythonpath(self):
return self.config["tvm.pythonpath"]
[docs]
def produce_artifacts(self, model):
assert len(self.input_formats) == len(model.paths) == 1
artifacts = []
name = model.name
path = model.paths[0]
ext = self.input_formats[0].extension
with open(path, "rb") as handle: # TODO: is an onnx model raw data or text?
raw = handle.read()
artifacts.append(Artifact(f"{name}.{ext}", raw=raw, fmt=ArtifactFormat.RAW, flags=["model"]))
if not self.visualize_graph:
assert len(self.output_formats) == 1
else:
assert len(self.output_formats) == 2
def _relayviz(in_file, out_file, plotter_name, env={}):
import sys
import os
sys.path.append(env["PYTHONPATH"])
os.environ["TVM_LIBRARY_PATH"] = env["TVM_LIBRARY_PATH"]
from tvm import parser
from tvm.contrib import relay_viz
from tvm.contrib.relay_viz.terminal import TermPlotter
from tvm.contrib.relay_viz.dot import DotPlotter
if plotter_name == "term":
plotter_cls = TermPlotter
elif plotter_name == "dot":
plotter_cls = DotPlotter
else:
raise RuntimeError(f"Invalid plotter name: {plotter_name}")
with open(in_file, "r", encoding="utf-8") as relay_text:
text = relay_text.read()
mod = parser.fromtext(text)
plotter_inst = plotter_cls()
viz = relay_viz.RelayVisualizer(mod, plotter=plotter_inst)
out_file_base = os.path.splitext(out_file)[0]
viz.render(filename=out_file_base)
in_file = model.paths[0]
ext = "" if self.relayviz_plotter == "term" else "pdf"
with tempfile.TemporaryDirectory() as tmpdirname:
out_file = str(Path(tmpdirname) / (f"relayviz.{ext}" if len(ext) > 0 else "relayviz"))
proc = multiprocessing.Process(
target=_relayviz,
args=[in_file, out_file, self.relayviz_plotter],
kwargs={"env": {"PYTHONPATH": self.tvm_pythonpath, "TVM_LIBRARY_PATH": self.tvm_build_dir}},
)
proc.start()
proc.join()
if self.relayviz_plotter == "term":
with open(out_file, "r") as handle:
relayviz_text = handle.read()
relayviz_artifact = Artifact(
"relayviz.txt",
content=relayviz_text,
fmt=ArtifactFormat.TEXT,
)
else:
with open(out_file, "rb") as handle:
relayviz_data = handle.read()
relayviz_artifact = Artifact(
f"relayviz.{ext}",
raw=relayviz_data,
fmt=ArtifactFormat.RAW,
)
artifacts.append(relayviz_artifact)
return artifacts
[docs]
class PackedFrontend(Frontend): # Inherit from TFLiteFrontend? -> how to do constructor?
FEATURES = Frontend.FEATURES | {"packing", "packed"}
DEFAULTS = {
**Frontend.DEFAULTS,
"ignore_existing": True,
"fake_pack": False, # Pretend that every compatible tensor is packable
# (best case scenerio, TODO: rename to force_pack?)
"use_packed": True,
"check": False, # Unimplemented
}
REQUIRED = {"packer.exe"} # TODO move to feature?
def __init__(self, features=None, config=None):
super().__init__(name="packed", features=features, config=config)
if self.fake_pack or self.ignore_existing:
# assert self.use_packed
self.input_formats = [ModelFormats.TFLITE]
else:
self.input_formats = [ModelFormats.PACKED, ModelFormats.TFLITE]
# if self.use_packed:
self.output_formats = [
ModelFormats.PACKED,
ModelFormats.TFLITE,
] # Always copy over the input model as intermediate artifact for reproducability
# TODO: add a Frontend.DEFAULT which can be used to turn off this behavoir (keep intermediate?)
# else:
# self.output_formats = [ModelFormats.TFLITE]
# Order of formats ir irrelevant here, hot for artifacts, the first one will always be the main object
@property
def ignore_existing(self):
value = self.config["ignore_existing"]
return str2bool(value)
@property
def fake_pack(self):
value = self.config["fake_pack"]
return str2bool(value)
@property
def use_packed(self):
value = self.config["use_packed"]
return str2bool(value)
@property
def check(self):
value = self.config["check"]
return str2bool(value)
[docs]
def produce_artifacts(self, model):
tflite_data = None
packed_data = None
name = model.name
if self.fake_pack or self.ignore_existing: # -> ins: TFLITE
# assert self.use_packed
tflite_path = model.paths[0]
with open(tflite_path, "rb") as handle:
tflite_data = handle.read()
else: # -> ins: TFLITE, PACKED
for path, fmt in zip(model.paths, model.formats):
data_in = None
with open(path, "rb") as handle:
data_in = handle.read()
if fmt == ModelFormats.PACKED:
packed_data = data_in
break
elif fmt == ModelFormats.TFLITE:
# tflite_data_in = data_in
break
else:
raise RuntimeError(f"Unexpected model format: {fmt}")
if packed_data is None:
# Do packing
with tempfile.TemporaryDirectory() as tmpdirname:
logger.debug("Using temporary directory for packing results: %s", tmpdirname)
packer_exe = self.config["packer_exe"]
assert packer_exe is not None
in_file = Path(tmpdirname) / "in.tflite"
with open(in_file, "wb") as handle:
handle.write(tflite_data)
out_file = Path(tmpdirname) / "out.tflm"
utils.execute(packer_exe, in_file, out_file)
with open(out_file, "rb") as handle:
packed_data = handle.read()
if self.check:
raise NotImplementedError
tflite_artifact = Artifact(
f"{name}.tflite",
raw=tflite_data,
fmt=ArtifactFormat.RAW,
flags=["model"],
optional=self.use_packed,
)
packed_artifact = Artifact(
f"{name}.tflm",
raw=packed_data,
fmt=ArtifactFormat.RAW,
optional=not self.use_packed,
)
if self.use_packed:
return [packed_artifact, tflite_artifact]
else:
return [tflite_artifact, packed_artifact]
[docs]
class ONNXFrontend(SimpleFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"onnx",
ModelFormats.ONNX,
features=features,
config=config,
)
[docs]
class MLIRFrontend(SimpleFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"mlir",
ModelFormats.MLIR,
features=features,
config=config,
)
[docs]
class PBFrontend(SimpleFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"pb",
ModelFormats.PB,
features=features,
config=config,
)
[docs]
class PaddleFrontend(SimpleFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"paddle",
ModelFormats.PADDLE,
features=features,
config=config,
)
[docs]
class BenchFrontend(SimpleFrontend):
def __init__(self, name: str, prog_cls: Program, features=None, config=None):
super().__init__(
name,
ModelFormats.NONE,
features=features,
config=config,
)
self.prog_cls = prog_cls
[docs]
def generate(self, model) -> Tuple[dict, dict]:
artifacts = [Artifact("dummy_model", raw=bytes(), fmt=ArtifactFormat.RAW, flags=["model", "dummy"])]
return {"default": artifacts}, {}
[docs]
def lookup_models(self, names, config=None, context=None):
ret = []
for name in names:
name = name.replace(f"{self.name}/", "")
if name in self.supported_names:
hint = self.prog_cls(
name,
alt=f"{self.name}/{name}",
config=config,
)
ret.append(hint)
return ret
[docs]
def get_platform_config(self, platform):
ret = {}
if platform == "mlif":
ret["template"] = self.name
return ret
[docs]
class SingleBenchFrontend(BenchFrontend):
@property
def supported_names(self):
return [self.name]
[docs]
class ExampleFrontend(BenchFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"example",
ExampleProgram,
features=features,
config=config,
)
@property
def supported_names(self):
# return ["hello_world", "foobar"]
return [
"hello_world",
"foobar",
"load_filter",
"basic_hash",
"temp_crypto",
]
[docs]
class EmbenchFrontend(BenchFrontend):
REQUIRED = {"embench.src_dir"}
def __init__(self, features=None, config=None):
super().__init__(
"embench",
EmbenchProgram,
features=features,
config=config,
)
@property
def supported_names(self):
# TODO: automatic lookup
return [
"edn",
"md5sum",
"nettle-sha256",
"nettle-aes",
"ud",
"matmult-int",
"aha-mont64",
"huffbench",
"cubic",
"nbody",
"sglib-combined",
"crc32",
"wikisort",
"slre",
"qrduino",
"minver",
"picojpeg",
"tarfind",
"st",
"nsichneu",
"statemate",
"primecount",
]
[docs]
def get_platform_defs(self, platform):
ret = {}
if platform == "mlif":
ret["EMBENCH_DIR"] = Path(self.config["embench.src_dir"])
return ret
[docs]
class EmbenchIoTFrontend(BenchFrontend):
REQUIRED = {"embench_iot.src_dir"}
def __init__(self, features=None, config=None):
super().__init__(
"embench_iot",
EmbenchIoTProgram,
features=features,
config=config,
)
@property
def supported_names(self):
# TODO: automatic lookup
return [
# "cubic",
# "nbody",
# "minver",
# "st",
# "primecount", ???
"aha-mont64",
"crc32",
"depthconv",
"edn",
"huffbench",
"matmult-int",
"md5sum",
"nettle-aes",
"nettle-sha256",
"nsichneu",
"picojpeg",
"qrduino",
"sglib-combined",
"slre",
"statemate",
"tarfind",
"ud",
"wikisort",
"xgboost",
]
[docs]
def get_platform_defs(self, platform):
ret = {}
if platform == "mlif":
ret["EMBENCH_IOT_DIR"] = Path(self.config["embench_iot.src_dir"])
return ret
[docs]
class EmbenchDSPFrontend(BenchFrontend):
REQUIRED = {"embench_dsp.src_dir"}
def __init__(self, features=None, config=None):
super().__init__(
"embench_dsp",
EmbenchDSPProgram,
features=features,
config=config,
)
@property
def supported_names(self):
# TODO: automatic lookup
return [
"biquad_cascade_df2T_f32_sos3_n1",
"biquad_cascade_df2T_f32_sos3_n128",
"dct4_2048_f32",
"dct4_512_f32",
"fir_f32_taps256_n1",
"fir_f32_taps256_n128",
"lms_f32_taps256_n1",
"lms_f32_taps256_n128",
"rfft2048_f32",
"rfft512_f32",
]
[docs]
def get_platform_defs(self, platform):
ret = {}
if platform == "mlif":
ret["EMBENCH_DSP_DIR"] = Path(self.config["embench_dsp.src_dir"])
return ret
[docs]
class TaclebenchFrontend(BenchFrontend):
REQUIRED = {"taclebench.src_dir"}
def __init__(self, features=None, config=None):
super().__init__(
"taclebench",
TaclebenchProgram,
features=features,
config=config,
)
@property
def supported_names(self):
# TODO: automatic lookup
return [
"test/test3",
"test/cover",
"test/duff",
"app/powerwindow",
"app/lift",
"kernel/deg2rad",
"kernel/matrix1",
"kernel/binarysearch",
"kernel/pm",
"kernel/sha",
"kernel/filterbank",
"kernel/md5",
"kernel/fir2dim",
"kernel/fft",
"kernel/minver",
"kernel/lms",
"kernel/bitcount",
"kernel/st",
"kernel/bsort",
"kernel/bitonic",
"kernel/iir",
"kernel/prime",
"kernel/jfdctint",
"kernel/recursion",
"kernel/complex_updates",
"kernel/cosf",
"kernel/insertsort",
"kernel/fac",
"kernel/rad2deg",
"kernel/isqrt",
"kernel/cubic",
"kernel/ludcmp",
"kernel/quicksort",
"kernel/countnegative",
"sequential/epic",
"sequential/huff_dec",
"sequential/fmref",
"sequential/h264_dec",
"sequential/dijkstra",
"sequential/adpcm_dec",
"sequential/adpcm_enc",
"sequential/gsm_dec",
"sequential/rijndael_dec",
"sequential/g723_enc",
"sequential/huff_enc",
"sequential/statemate",
"sequential/susan",
"sequential/gsm_enc",
"sequential/ndes",
"sequential/audiobeam",
"sequential/rijndael_enc",
"sequential/cjpeg_transupp",
"sequential/ammunition",
"sequential/mpeg2",
"sequential/anagram",
"sequential/cjpeg_wrbmp",
"sequential/petrinet",
]
[docs]
def get_platform_defs(self, platform):
ret = {}
if platform == "mlif":
ret["TACLEBENCH_DIR"] = Path(self.config["taclebench.src_dir"])
return ret
[docs]
class PolybenchFrontend(BenchFrontend):
DEFAULTS = {
**Frontend.DEFAULTS,
"dataset": "large", # mini/small/medium/large/extralarge
}
REQUIRED = {"polybench.src_dir"}
def __init__(self, features=None, config=None):
super().__init__(
"polybench",
PolybenchProgram,
features=features,
config=config,
)
@property
def supported_names(self):
# TODO: automatic lookup
return [
"linear-algebra/solvers/gramschmidt",
"linear-algebra/solvers/ludcmp",
"linear-algebra/solvers/trisolv",
"linear-algebra/solvers/durbin",
"linear-algebra/solvers/lu",
"linear-algebra/solvers/cholesky",
"linear-algebra/kernels/atax",
"linear-algebra/kernels/3mm",
"linear-algebra/kernels/mvt",
"linear-algebra/kernels/2mm",
"linear-algebra/kernels/bicg",
"linear-algebra/kernels/doitgen",
"linear-algebra/blas/trmm",
"linear-algebra/blas/gemver",
"linear-algebra/blas/syrk",
"linear-algebra/blas/gesummv",
"linear-algebra/blas/syr2k",
"linear-algebra/blas/symm",
"linear-algebra/blas/gemm",
"stencils/fdtd-2d",
"stencils/seidel-2d",
"stencils/adi",
"stencils/jacobi-1d",
"stencils/jacobi-2d",
"stencils/heat-3d",
"datamining/covariance",
"datamining/correlation",
"medley/deriche",
"medley/nussinov",
"medley/floyd-warshall",
]
[docs]
def get_platform_defs(self, platform):
ret = {}
if platform == "mlif":
ret["POLYBENCH_DIR"] = Path(self.config["polybench.src_dir"])
ret["POLYBENCH_DATASET"] = self.config["dataset"].upper() + "_DATASET"
return ret
[docs]
class CoremarkFrontend(SingleBenchFrontend):
REQUIRED = set()
def __init__(self, features=None, config=None):
super().__init__(
"coremark",
CoremarkProgram,
features=features,
config=config,
)
[docs]
class DhrystoneFrontend(SingleBenchFrontend):
REQUIRED = set()
def __init__(self, features=None, config=None):
super().__init__(
"dhrystone",
DhrystoneProgram,
features=features,
config=config,
)
[docs]
class MathisFrontend(BenchFrontend):
REQUIRED = set()
def __init__(self, features=None, config=None):
super().__init__(
"mathis",
MathisProgram,
features=features,
config=config,
)
@property
def supported_names(self):
return [
"to_upper",
"add8",
"add16",
"gather_add8",
"gather_add16",
"scatter_add8",
"scatter_add16",
"dot8",
"dot16",
"saxpy8",
"saxpy16",
"matmul8",
"matmul16",
"matmul8_a",
"matmul16_a",
"transposed_matmul8",
"transposed_matmul16",
"transposed_matmul8_a",
"transposed_matmul16_a",
"transposed_matmul8_b",
"transposed_matmul16_b",
]
[docs]
class MibenchFrontend(BenchFrontend):
REQUIRED = {"mibench.src_dir"}
def __init__(self, features=None, config=None):
super().__init__(
"mibench",
MibenchProgram,
features=features,
config=config,
)
@property
def supported_names(self):
# TODO: automatic lookup
return [
"telecomm/FFT",
"telecomm/CRC32",
"automotive/susan",
"automotive/basicmath_small",
"automotive/basicmath_large",
"automotive/bitcount_small",
"automotive/bitcount_large",
"automotive/qsort_small",
"automotive/qsort_large",
"security/sha",
"security/rijndael",
"network/dijkstra",
"office/stringsearch",
]
[docs]
def get_platform_defs(self, platform):
ret = {}
if platform == "mlif":
ret["MIBENCH_DIR"] = Path(self.config["mibench.src_dir"])
return ret
[docs]
class LayerGenFrontend(Frontend):
"""Example input:
1,1024 1 fully_connected ,10,relu
1,1024 1 fully_connected ,100,relu
1,1024 1 fully_connected ,2,relu
1,1024 1 fully_connected ,4,relu
1,1024 1 fully_connected ,8,relu
1,1024 1 fully_connected ,16,relu
1,1024 1 fully_connected ,32,relu
1,1024 1 fully_connected ,64,relu
1,1024 1 fully_connected ,128,relu
1,1024 1 fully_connected ,256,relu
1,1024 1 fully_connected ,512,relu
"""
FEATURES = Frontend.FEATURES
DEFAULTS = {
**Frontend.DEFAULTS,
"fmt": "tflite", # TODO: relay
}
REQUIRED = Frontend.REQUIRED | {"layergen.exe"}
def __init__(self, features=None, config=None):
super().__init__(
"layergen",
input_formats=[ModelFormats.TEXT],
output_formats=[ModelFormats.TFLITE, ModelFormats.RELAY],
features=features,
config=config,
)
@property
def fmt(self):
value = self.config["fmt"]
value = value.upper()
assert value in ["TFLITE", "RELAY"]
return value
@property
def layergen_exe(self):
return Path(self.config["layergen.exe"])
# def produce_artifacts(self, model):
# artifacts = {}
# name = model.name
# path = model.paths[0]
# ext = ModelFormats[self.fmt].extension
# print("ext", ext)
# with open(path, "r") as handle:
# content = handle.read()
# lines = content.strip().split("\n")
# print("lines", lines, list(filter(None, lines)))
# assert len(lines) > 0, "Empty file not allowed."
# def helper(args):
# args = args.split(" ")
# with tempfile.TemporaryDirectory() as tmpdirname:
# out = Path(tmpdirname) / f"out.{ext}"
# utils.python(self.layergen_exe, self.fmt.lower(), out, *args, cwd=tmpdirname)
# # TODO: log output
# with open(out, "rb") as handle:
# raw = handle.read()
# return raw
# if len(lines) > 1:
# for i, args in enumerate(lines):
# name = f"model{i}"
# raw = helper(args)
# artifact = Artifact(f"{name}.{ext}", raw=raw, fmt=ArtifactFormat.RAW, flags=["model"])
# artifacts[name] = [artifact]
# else:
# artifacts["default"] = []
# return artifacts
[docs]
def generate(self, model) -> Tuple[dict, dict]:
artifacts = {}
name = model.name
path = model.paths[0]
ext = ModelFormats[self.fmt].extension
with open(path, "r") as handle:
content = handle.read()
lines = content.strip().split("\n")
assert len(lines) > 0, "Empty file not allowed."
def helper(args):
args = args.split(" ")
with tempfile.TemporaryDirectory() as tmpdirname:
out = Path(tmpdirname) / f"out.{ext}"
utils.python(self.layergen_exe, self.fmt.lower(), out, *args, cwd=tmpdirname)
# TODO: log output
with open(out, "rb") as handle:
raw = handle.read()
return raw
if len(lines) > 1:
for i, args in enumerate(lines):
name = f"model{i}"
raw = helper(args)
artifact = Artifact(f"{name}.{ext}", raw=raw, fmt=ArtifactFormat.RAW, flags=["model"])
artifacts[name] = [artifact]
# TODO: fix this
artifacts["default"] = artifacts["model0"] # Dummy model because default artifacts can not be empty
else:
name = "default"
raw = helper(lines[0])
artifact = Artifact(f"{name}.{ext}", raw=raw, fmt=ArtifactFormat.RAW, flags=["model"])
artifacts[name] = [artifact]
return artifacts, {}
[docs]
class OpenASIPFrontend(BenchFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"openasip",
OpenASIPProgram,
features=features,
config=config,
)
@property
def supported_names(self):
return [
"sha256",
"aes",
"crc",
]
[docs]
class RVVBenchFrontend(BenchFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"rvv_bench",
RVVBenchProgram,
features=features,
config=config,
)
@property
def supported_names(self):
return [
"instructions_scalar",
"instructions_rvv",
"memcpy",
"memset",
"utf8_count",
"strlen",
"mergelines",
"mandelbrot",
"chacha20",
"poly1305",
"ascii_to_utf16",
"ascii_to_uft32",
"byteswap",
"LUT4",
"LUT6",
"hist",
"base64_encode",
]
[docs]
class ISSBenchFrontend(BenchFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"iss_bench",
ISSBenchProgram,
features=features,
config=config,
)
@property
def supported_names(self):
return [
"large_block",
"branch_heavy",
"mem_heavy",
]
[docs]
class CryptoBenchFrontend(BenchFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"crypto_bench",
CryptoBenchProgram,
features=features,
config=config,
)
@property
def supported_names(self):
return [
"mceliece348864_encrypt",
"mceliece348864_encrypt_rvv",
"mceliece348864_decrypt",
"mceliece348864_decrypt_rvv",
"mceliece348864_keypair",
"mceliece348864_keypair_rvv",
"hqc-128",
"hqc-192",
"hqc-256",
]
[docs]
class CmsisDSPFrontend(BenchFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"cmsis_dsp",
CmsisDSPProgram,
features=features,
config=config,
)
@property
def supported_names(self):
return [
"arm_abs_q15",
"arm_abs_q31",
]
[docs]
def get_platform_config(self, platform):
ret = {}
if platform == "mlif":
ret["template"] = "cmsis_dsp_bench"
return ret
[docs]
class CmsisNNFrontend(BenchFrontend):
def __init__(self, features=None, config=None):
super().__init__(
"cmsis_nn",
CmsisNNProgram,
features=features,
config=config,
)
@property
def supported_names(self):
return [
"arm_nn_activation_s16_tanh",
"arm_nn_activation_s16_sigmoid",
]
[docs]
def get_platform_config(self, platform):
ret = {}
if platform == "mlif":
ret["template"] = "cmsis_nn_bench"
return ret