Lines Matching full:self
10 …def __init__(self, rulefname, reportfname='', t=5, debug=False, datafname='', fullrulefname='', wo… argument
11 self.rulefname = rulefname
12 self.reportfname = reportfname
13 self.rules = None
14 self.collectlist:str = metrics
15 self.metrics = self.__set_metrics(metrics)
16 self.skiplist = set()
17 self.tolerance = t
19 self.workloads = [x for x in workload.split(",") if x]
20 self.wlidx = 0 # idx of current workloads
21 self.allresults = dict() # metric results of all workload
22 self.allignoremetrics = dict() # metrics with no results or negative results
23 self.allfailtests = dict()
24 self.alltotalcnt = dict()
25 self.allpassedcnt = dict()
26 self.allerrlist = dict()
28 self.results = dict() # metric results of current workload
30 …self.ignoremetrics= set() # metrics with no results or negative results, neg result counts as a fa…
31 self.failtests = dict()
32 self.totalcnt = 0
33 self.passedcnt = 0
35 self.errlist = list()
38 self.pctgmetrics = set() # Percentage rule
41 self.datafname = datafname
42 self.debug = debug
43 self.fullrulefname = fullrulefname
45 def __set_metrics(self, metrics=''): argument
51 def read_json(self, filename: str) -> dict: argument
61 def json_dump(self, data, output_file): argument
72 def get_results(self, idx:int = 0): argument
73 return self.results[idx]
75 def get_bounds(self, lb, ub, error, alias={}, ridx:int = 0) -> list: argument
78 If missing lb, use 0.0; missing ub, use float('inf); missing error, use self.tolerance.
96 vall = self.get_value(alias[ub], ridx)
109 t = get_bound_value(error, self.tolerance, ridx)
116 def get_value(self, name:str, ridx:int = 0) -> list: argument
118 Get value of the metric from self.results.
119 …this metric is not provided, the metric name will be added into self.ignoremetics and self.errlist.
123 @returns: list with value found in self.results; list is empty when value is not found.
126 data = self.results[ridx] if ridx in self.results else self.results[0]
127 if name not in self.ignoremetrics:
133 self.ignoremetrics.add(name)
136 def check_bound(self, val, lb, ub, err): argument
140 def pos_val_test(self): argument
145 …Metrics with negative value will be added into the self.failtests['PositiveValueTest'] and self.ig…
151 for name, val in self.get_results().items():
160 self.second_test(rerun, second_results)
167 self.failtests['PositiveValueTest']['Total Tests'] = tcnt
168 self.failtests['PositiveValueTest']['Passed Tests'] = pcnt
170 self.ignoremetrics.update(negmetric.keys())
172 … self.failtests['PositiveValueTest']['Failed Tests'].append({'NegativeValue': negmessage})
176 def evaluate_formula(self, formula:str, alias:dict, ridx:int = 0): argument
194 v = self.get_value(s, ridx)
219 def relationship_test(self, rule: dict): argument
231 …lbv, ubv, t = self.get_bounds(rule['RangeLower'], rule['RangeUpper'], rule['ErrorThreshold'], alia…
232 val, f = self.evaluate_formula(rule['Formula'], alias, ridx=rule['RuleIndex'])
234 …self.failtests['RelationshipTest']['Failed Tests'].append({'RuleIndex': rule['RuleIndex'], 'Descri…
235 elif not self.check_bound(val, lbv, ubv, t):
244 …self.failtests['RelationshipTest']['Failed Tests'].append({'RuleIndex': rule['RuleIndex'], 'Formul…
245 … 'RangeLower': lb, 'LowerBoundValue': self.get_value(lb),
246 … 'RangeUpper': ub, 'UpperBoundValue':self.get_value(ub),
249 self.passedcnt += 1
250 self.failtests['RelationshipTest']['Passed Tests'] += 1
251 self.totalcnt += 1
252 self.failtests['RelationshipTest']['Total Tests'] += 1
258 def single_test(self, rule:dict): argument
265 … This test updates self.total_cnt and records failed tests in self.failtest['SingleMetricTest'].
269 … lbv, ubv, t = self.get_bounds(rule['RangeLower'], rule['RangeUpper'], rule['ErrorThreshold'])
278 result = self.get_value(m['Name'])
279 … if len(result) > 0 and self.check_bound(result[0], lbv, ubv, t) or m['Name'] in self.skiplist:
287 self.second_test(rerun, second_results)
290 if self.check_bound(val, lbv, ubv, t):
295 self.results[0][name] = val
297 self.totalcnt += totalcnt
298 self.passedcnt += passcnt
299 self.failtests['SingleMetricTest']['Total Tests'] += totalcnt
300 self.failtests['SingleMetricTest']['Passed Tests'] += passcnt
303 … self.failtests['SingleMetricTest']['Failed Tests'].append({'RuleIndex':rule['RuleIndex'],
311 def create_report(self): argument
316 for i in range(0, len(self.workloads)):
317 … reportstas = {"Total Rule Count": self.alltotalcnt[i], "Passed Rule Count": self.allpassedcnt[i]}
318 … data = {"Metric Validation Statistics": reportstas, "Tests in Category": self.allfailtests[i],
319 "Errors":self.allerrlist[i]}
320 alldata.append({"Workload": self.workloads[i], "Report": data})
326 if self.debug:
327 …allres = [{"Workload": self.workloads[i], "Results": self.allresults[i]} for i in range(0, len(sel…
328 self.json_dump(allres, self.datafname)
330 def check_rule(self, testtype, metric_list): argument
340 if m['Name'] not in self.metrics:
345 def convert(self, data: list, metricvalues:dict): argument
360 def _run_perf(self, metric, workload: str): argument
371 def collect_perf(self, workload: str): argument
375 self.results = dict()
379 if self.collectlist != "":
380 collectlist[0] = {x for x in self.collectlist.split(",")}
382 collectlist[0] = set(list(self.metrics))
384 for rule in self.rules:
394 data = self._run_perf(metric, wl)
395 if idx not in self.results: self.results[idx] = dict()
396 self.convert(data, self.results[idx])
399 def second_test(self, collectlist, second_results): argument
400 workload = self.workloads[self.wlidx]
402 data = self._run_perf(metric, workload)
403 self.convert(data, second_results)
408 def parse_perf_metrics(self): argument
423 self.metrics.add(name)
425 self.pctgmetrics.add(name.lower())
432 def remove_unsupported_rules(self, rules): argument
437 if m["Name"] in self.skiplist or m["Name"] not in self.metrics:
444 def create_rules(self): argument
452 data = self.read_json(self.rulefname)
454 self.skiplist = set([name.lower() for name in data['SkipList']])
455 self.rules = self.remove_unsupported_rules(rules)
460 'ErrorThreshold': self.tolerance,
462 'Metrics': [{'Name': m.lower()} for m in self.pctgmetrics]}
463 self.rules.append(pctgrule)
467 for r in self.rules:
471 if self.debug:
473 …data = {'RelationshipRules':self.rules, 'SupportedMetrics': [{"MetricName": name} for name in self…
474 self.json_dump(data, self.fullrulefname)
479 def _storewldata(self, key): argument
482 @param key: key to the dictionaries (index of self.workloads).
484 self.allresults[key] = self.results
485 self.allignoremetrics[key] = self.ignoremetrics
486 self.allfailtests[key] = self.failtests
487 self.alltotalcnt[key] = self.totalcnt
488 self.allpassedcnt[key] = self.passedcnt
489 self.allerrlist[key] = self.errlist
492 def _init_data(self): argument
495 self.results = dict()
496 self.ignoremetrics= set()
497 self.errlist = list()
498 … self.failtests = {k:{'Total Tests':0, 'Passed Tests':0, 'Failed Tests':[]} for k in testtypes}
499 self.totalcnt = 0
500 self.passedcnt = 0
502 def test(self): argument
514 if not self.collectlist:
515 self.parse_perf_metrics()
516 self.create_rules()
517 for i in range(0, len(self.workloads)):
518 self.wlidx = i
519 self._init_data()
520 self.collect_perf(self.workloads[i])
522 self.pos_val_test()
523 for r in self.rules:
526 if not self.check_rule(testtype, r['Metrics']):
529 self.relationship_test(r)
531 self.single_test(r)
534 self.errlist.append("Unsupported Test Type from rule: " + r['RuleIndex'])
535 self._storewldata(i)
536 print("Workload: ", self.workloads[i])
537 print("Total metrics collected: ", self.failtests['PositiveValueTest']['Total Tests'])
538 … print("Non-negative metric count: ", self.failtests['PositiveValueTest']['Passed Tests'])
539 print("Total Test Count: ", self.totalcnt)
540 print("Passed Test Count: ", self.passedcnt)
542 self.create_report()
543 return sum(self.alltotalcnt.values()) != sum(self.allpassedcnt.values())