1117 lines
46 KiB
Python
Executable File
1117 lines
46 KiB
Python
Executable File
from tabnanny import verbose
|
||
import ollama
|
||
import time
|
||
from typing import List, Dict, Any
|
||
import json
|
||
from statistics import mean
|
||
import re
|
||
import ast
|
||
import argparse
|
||
import requests
|
||
import os
|
||
import glob
|
||
import matplotlib.pyplot as plt
|
||
from together import Together
|
||
from cpuinfo import get_cpu_info
|
||
import subprocess
|
||
from tools import get_tools_compatible_models, print_tools_compatibility_table
|
||
|
||
|
||
# ANSI color codes
|
||
SUCCESS = '\033[38;5;78m' # Soft mint green for success
|
||
ERROR = '\033[38;5;203m' # Soft coral red for errors
|
||
INFO = '\033[38;5;75m' # Sky blue for info
|
||
HEADER = '\033[38;5;147m' # Soft purple for headers
|
||
WARNING = '\033[38;5;221m' # Warm gold for warnings
|
||
EMPHASIS = '\033[38;5;159m' # Cyan for emphasis
|
||
MUTED = '\033[38;5;246m' # Subtle gray for less important text
|
||
ENDC = '\033[0m'
|
||
BOLD = '\033[1m'
|
||
|
||
# Replace existing color usages
|
||
GREEN = SUCCESS
|
||
RED = ERROR
|
||
BLUE = INFO
|
||
YELLOW = WARNING
|
||
WHITE = MUTED
|
||
|
||
# Server configurations
|
||
SERVERS = {
|
||
'local': 'http://localhost:11434',
|
||
'remote': 'http://192.168.196.60:11434'
|
||
}
|
||
|
||
class Timer:
|
||
def __init__(self):
|
||
self.start_time = None
|
||
self.end_time = None
|
||
|
||
def start(self):
|
||
self.start_time = time.time()
|
||
|
||
def stop(self):
|
||
self.end_time = time.time()
|
||
|
||
def elapsed_time(self):
|
||
if self.start_time is None:
|
||
return 0
|
||
if self.end_time is None:
|
||
return time.time() - self.start_time
|
||
return self.end_time - self.start_time
|
||
|
||
def extract_code_from_response(response: str) -> str:
|
||
"""Extract Python code from a markdown-formatted string."""
|
||
code_blocks = re.findall(r'```python\n(.*?)```', response, re.DOTALL)
|
||
if code_blocks:
|
||
return code_blocks[0].strip()
|
||
return response
|
||
|
||
def is_valid_python(code: str) -> bool:
|
||
"""Check if the code is valid Python syntax."""
|
||
try:
|
||
ast.parse(code)
|
||
return True
|
||
except SyntaxError:
|
||
return False
|
||
|
||
def analyze_failed_code(code: str, test_case: tuple, expected: any, actual: any, function_name: str, model: str) -> bool:
|
||
"""Analyze why code failed using Together API. Returns True if Together thinks the code should work."""
|
||
prompt = f"""Analyze this Python code and explain why it failed the test case. Format your response EXACTLY as follows:
|
||
|
||
ASSESSMENT: [Write a one-line assessment: either "SHOULD PASS" or "SHOULD FAIL" followed by a brief reason]
|
||
|
||
ANALYSIS:
|
||
[Detailed analysis of why the code failed and how to fix it]
|
||
|
||
Code:
|
||
{code}
|
||
|
||
Test case:
|
||
Input: {test_case}
|
||
Expected output: {expected}
|
||
Actual output: {actual}
|
||
Function name required: {function_name}
|
||
Model: {model}"""
|
||
|
||
try:
|
||
TOGETHER_API_KEY = os.environ["TOGETHER_API_KEY"]
|
||
together_client = Together(api_key=TOGETHER_API_KEY)
|
||
response = together_client.chat.completions.create(
|
||
model="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
|
||
messages=[
|
||
{"role": "system", "content": "You are a Python expert analyzing code failures. Always format your response with ASSESSMENT and ANALYSIS sections."},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
max_tokens=1000,
|
||
temperature=0.7,
|
||
top_p=0.7,
|
||
top_k=50,
|
||
repetition_penalty=1,
|
||
stop=["<|eot_id|>", "<|eom_id|>"]
|
||
)
|
||
|
||
analysis = response.choices[0].message.content
|
||
should_pass = "SHOULD PASS" in analysis.upper()
|
||
if verbose:
|
||
print(f"\n{BLUE}{'='*50}{ENDC}")
|
||
print(f"{BLUE}[{model}] TOGETHER API CODE ANALYSIS:{ENDC}")
|
||
print(f"{BLUE}{'='*50}{ENDC}")
|
||
print(f"{GREEN if should_pass else RED}{analysis}{ENDC}")
|
||
print(f"{BLUE}{'='*50}{ENDC}")
|
||
return should_pass
|
||
except Exception as e:
|
||
print(f"\n{RED}Error getting Together API analysis: {e}{ENDC}")
|
||
return False
|
||
|
||
def validate_with_debug(code: str, function_name: str, test_cases: List[tuple], model: str) -> tuple[bool, str, List[bool]]:
|
||
"""Validate code with detailed debug information. Returns (success, debug_info, test_results)"""
|
||
debug_info = []
|
||
test_results = [] # Track individual test case results
|
||
test_outputs = [] # Store test outputs for combined display
|
||
|
||
try:
|
||
# Create a local namespace
|
||
namespace = {}
|
||
debug_info.append(f"Executing code:\n{code}")
|
||
|
||
try:
|
||
# Redirect stdout to capture prints from the executed code
|
||
import io
|
||
import sys
|
||
stdout = sys.stdout
|
||
sys.stdout = io.StringIO()
|
||
|
||
# Execute the code
|
||
exec(code, namespace)
|
||
|
||
# Restore stdout
|
||
sys.stdout = stdout
|
||
|
||
except Exception as e:
|
||
if 'sys' in locals(): # Restore stdout if it was changed
|
||
sys.stdout = stdout
|
||
if verbose: print(f"\n{RED}Failed code:{ENDC}\n{code}")
|
||
return False, f"Error executing code: {str(e)}", test_results
|
||
|
||
if function_name not in namespace:
|
||
if verbose: print(f"\n{RED}Failed code:{ENDC}\n{code}")
|
||
together_opinion = analyze_failed_code(code, "N/A", f"Function named '{function_name}'",
|
||
f"Found functions: {list(namespace.keys())}", function_name, model)
|
||
print(f"\nTests passed: ❌ Together opinion: {'✅' if together_opinion else '❌'}")
|
||
return False, f"Function '{function_name}' not found in code. Available names: {list(namespace.keys())}", test_results
|
||
|
||
function = namespace[function_name]
|
||
debug_info.append(f"Function {function_name} found")
|
||
|
||
# Run test cases
|
||
all_passed = True
|
||
for i, (test_input, expected) in enumerate(test_cases):
|
||
try:
|
||
# Redirect stdout for each test case
|
||
stdout = sys.stdout
|
||
sys.stdout = io.StringIO()
|
||
|
||
if isinstance(test_input, tuple):
|
||
result = function(*test_input)
|
||
else:
|
||
result = function(test_input)
|
||
|
||
# Restore stdout
|
||
sys.stdout = stdout
|
||
|
||
# Store result but don't print individually
|
||
test_outputs.append(str(result))
|
||
test_passed = result == expected
|
||
test_results.append(test_passed)
|
||
|
||
if not test_passed:
|
||
if verbose: print(f"\n{RED}Failed code:{ENDC}\n{code}")
|
||
print(f"\n{RED}Test case {i+1} failed:{ENDC}")
|
||
print(f"Input: {test_input} Expected: {expected} Got: {result}")
|
||
|
||
together_opinion = analyze_failed_code(code, test_input, expected, result, function_name, model)
|
||
print(f"Tests passed: ❌ Together opinion: {'✅' if together_opinion else '❌'}")
|
||
|
||
all_passed = False
|
||
continue
|
||
|
||
debug_info.append(f"Test case {i+1} passed: {test_input} → {result}")
|
||
except Exception as e:
|
||
if 'sys' in locals(): # Restore stdout if it was changed
|
||
sys.stdout = stdout
|
||
test_outputs.append(f"Error: {str(e)}")
|
||
if verbose: print(f"\n{RED}Failed code:{ENDC}\n{code}")
|
||
print(f"\n{RED}{str(e)} in test case {i+1} Input: {test_input} Expected: {expected}")
|
||
|
||
together_opinion = analyze_failed_code(code, test_input, expected, f"Error: {str(e)}", function_name, model)
|
||
print(f"Tests passed: ❌ Together opinion: {'✅' if together_opinion else '❌'}")
|
||
|
||
test_results.append(False)
|
||
all_passed = False
|
||
continue
|
||
finally:
|
||
if 'sys' in locals(): # Always restore stdout
|
||
sys.stdout = stdout
|
||
|
||
# Print all test outputs on one line
|
||
# print(f"{WHITE}{BOLD}Test outputs: {join(test_outputs)}{ENDC}")
|
||
print(f"{WHITE}Test outputs: {', '.join(test_outputs)}{ENDC}")
|
||
|
||
if all_passed:
|
||
print(f"Tests passed: ✅")
|
||
return True, "All tests passed!\n" + "\n".join(debug_info), test_results
|
||
print(f"Tests passed: ❌")
|
||
return False, "Some tests failed", test_results
|
||
except Exception as e:
|
||
if 'sys' in locals(): # Restore stdout if it was changed
|
||
sys.stdout = stdout
|
||
print(f"\n{RED}Error in validate_with_debug: {str(e)}{ENDC}")
|
||
return False, f"Unexpected error: {str(e)}", test_results
|
||
|
||
def test_fibonacci():
|
||
question = """Write a Python function named EXACTLY 'fibonacci' (not fibonacci_dp or any other name) that returns the nth Fibonacci number.
|
||
The function signature must be: def fibonacci(n)
|
||
|
||
Requirements:
|
||
1. Handle edge cases:
|
||
- For n = 0, return 0
|
||
- For n = 1 or n = 2, return 1
|
||
- For negative numbers, return -1
|
||
2. For n > 2: F(n) = F(n-1) + F(n-2)
|
||
3. Use dynamic programming or memoization for efficiency
|
||
4. Do NOT use any print statements - just return the values
|
||
|
||
Example sequence: 0,1,1,2,3,5,8,13,21,...
|
||
Example calls:
|
||
- fibonacci(6) returns 8
|
||
- fibonacci(0) returns 0
|
||
- fibonacci(-1) returns -1"""
|
||
|
||
test_cases = [
|
||
(0, 0), # Edge case: n = 0
|
||
(1, 1), # Edge case: n = 1
|
||
(2, 1), # Edge case: n = 2
|
||
(6, 8), # Regular case
|
||
(10, 55), # Larger number
|
||
(-1, -1), # Edge case: negative input
|
||
]
|
||
|
||
def validate(code: str) -> bool:
|
||
success, debug_info, test_results = validate_with_debug(code, 'fibonacci', test_cases, "N/A")
|
||
return success
|
||
|
||
return (question, validate, test_cases)
|
||
|
||
def test_binary_search():
|
||
question = """Write a Python function named EXACTLY 'binary_search' that performs binary search on a sorted list.
|
||
The function signature must be: def binary_search(arr, target)
|
||
|
||
Requirements:
|
||
1. The function takes two arguments:
|
||
- arr: a sorted list of integers
|
||
- target: the integer to find
|
||
2. Return the index of the target if found
|
||
3. Return -1 if the target is not in the list
|
||
4. Do NOT use any print statements - just return the values
|
||
|
||
Example:
|
||
- binary_search([1,2,3,4,5], 3) returns 2
|
||
- binary_search([1,2,3,4,5], 6) returns -1"""
|
||
|
||
test_cases = [
|
||
(([1,2,3,4,5], 3), 2), # Regular case: target in middle
|
||
(([1,2,3,4,5], 1), 0), # Edge case: target at start
|
||
(([1,2,3,4,5], 5), 4), # Edge case: target at end
|
||
(([1,2,3,4,5], 6), -1), # Edge case: target not in list
|
||
(([], 1), -1), # Edge case: empty list
|
||
(([1], 1), 0), # Edge case: single element list
|
||
]
|
||
|
||
def validate(code: str) -> bool:
|
||
success, debug_info, test_results = validate_with_debug(code, 'binary_search', test_cases, "N/A")
|
||
return success
|
||
|
||
return (question, validate, test_cases)
|
||
|
||
def test_palindrome():
|
||
question = """Write a Python function named EXACTLY 'is_palindrome' that checks if a string is a palindrome.
|
||
The function signature must be: def is_palindrome(s)
|
||
|
||
Requirements:
|
||
1. The function takes one argument:
|
||
- s: a string to check
|
||
2. Return True if the string is a palindrome, False otherwise
|
||
3. Ignore case (treat uppercase and lowercase as the same)
|
||
4. Ignore non-alphanumeric characters (spaces, punctuation)
|
||
5. Do NOT use any print statements - just return the values
|
||
|
||
Example:
|
||
- is_palindrome("A man, a plan, a canal: Panama") returns True
|
||
- is_palindrome("race a car") returns False"""
|
||
|
||
test_cases = [
|
||
("A man, a plan, a canal: Panama", True), # Regular case with punctuation
|
||
("race a car", False), # Regular case, not palindrome
|
||
("", True), # Edge case: empty string
|
||
("a", True), # Edge case: single character
|
||
("Was it a car or a cat I saw?", True), # Complex case with punctuation
|
||
("hello", False), # Simple case, not palindrome
|
||
]
|
||
|
||
def validate(code: str) -> bool:
|
||
success, debug_info, test_results = validate_with_debug(code, 'is_palindrome', test_cases, "N/A")
|
||
return success
|
||
|
||
return (question, validate, test_cases)
|
||
|
||
def test_anagram():
|
||
question = """Write a Python function named EXACTLY 'are_anagrams' that checks if two strings are anagrams.
|
||
The function signature must be: def are_anagrams(str1, str2)
|
||
|
||
Requirements:
|
||
1. The function takes two arguments:
|
||
- str1: first string
|
||
- str2: second string
|
||
2. Return True if the strings are anagrams, False otherwise
|
||
3. Ignore case (treat uppercase and lowercase as the same)
|
||
4. Ignore spaces
|
||
5. Consider only alphanumeric characters
|
||
6. Do NOT use any print statements - just return the values
|
||
|
||
Example:
|
||
- are_anagrams("listen", "silent") returns True
|
||
- are_anagrams("hello", "world") returns False"""
|
||
|
||
test_cases = [
|
||
(("listen", "silent"), True), # Regular case
|
||
(("hello", "world"), False), # Not anagrams
|
||
(("", ""), True), # Edge case: empty strings
|
||
(("a", "a"), True), # Edge case: single char
|
||
(("Debit Card", "Bad Credit"), True), # Case and space test
|
||
(("Python", "Java"), False), # Different lengths
|
||
]
|
||
|
||
def validate(code: str) -> bool:
|
||
success, debug_info, test_results = validate_with_debug(code, 'are_anagrams', test_cases, "N/A")
|
||
return success
|
||
|
||
return (question, validate, test_cases)
|
||
|
||
# List of all test cases
|
||
CODING_QUESTIONS = [
|
||
test_fibonacci(),
|
||
test_binary_search(),
|
||
test_palindrome(),
|
||
test_anagram()
|
||
]
|
||
|
||
# Add test names as constants
|
||
TEST_NAMES = {
|
||
"Write a Python func": "Fibonacci",
|
||
"Write a Python func": "Binary Search",
|
||
"Write a Python func": "Palindrome",
|
||
"Write a Python func": "Anagram Check"
|
||
}
|
||
|
||
def get_test_name(question: str) -> str:
|
||
"""Get a friendly name for the test based on the question."""
|
||
if "fibonacci" in question.lower():
|
||
return "Fibonacci"
|
||
elif "binary_search" in question.lower():
|
||
return "Binary Search"
|
||
elif "palindrome" in question.lower():
|
||
return "Palindrome"
|
||
elif "anagram" in question.lower():
|
||
return "Anagram Check"
|
||
return question[:20] + "..."
|
||
|
||
def get_model_stats(model_name: str, question_tuple: tuple, server_url: str) -> Dict:
|
||
"""
|
||
Get performance statistics for a specific model and validate the response.
|
||
"""
|
||
question, validator, test_cases = question_tuple
|
||
timer = Timer()
|
||
results = {
|
||
'model': model_name,
|
||
'total_duration': 0,
|
||
'tokens_per_second': 0,
|
||
'code_valid': False,
|
||
'tests_passed': False,
|
||
'error': None,
|
||
'test_results': []
|
||
}
|
||
|
||
try:
|
||
timer.start()
|
||
print(f'{WHITE}Requesting code from {server_url} with {model_name}{ENDC}')
|
||
response = requests.post(
|
||
f"{server_url}/api/chat",
|
||
json={
|
||
"model": model_name,
|
||
"messages": [{'role': 'user', 'content': question}],
|
||
"stream": False
|
||
},
|
||
headers={'Content-Type': 'application/json'} # Add headers
|
||
).json()
|
||
timer.stop()
|
||
|
||
# Get performance metrics from response
|
||
total_tokens = response.get('eval_count', 0)
|
||
total_duration = response.get('total_duration', 0)
|
||
total_response_time = float(total_duration) / 1e9
|
||
|
||
results['total_duration'] = total_response_time
|
||
if total_tokens > 0 and total_response_time > 0:
|
||
results['tokens_per_second'] = total_tokens / total_response_time
|
||
|
||
# Print concise performance metrics
|
||
print(f"Total Duration (s): {total_response_time:.2f} / Total Tokens: {total_tokens} / Tokens per Second: {results['tokens_per_second']:.2f}")
|
||
|
||
# Extract code from response
|
||
if 'message' in response and 'content' in response['message']:
|
||
code = extract_code_from_response(response['message']['content'])
|
||
|
||
# Validate code
|
||
results['code_valid'] = is_valid_python(code)
|
||
|
||
if results['code_valid']:
|
||
print(f"Code validation: ✅")
|
||
# Get validation results
|
||
print(f'{WHITE}Running tests...{ENDC}')
|
||
for test_case in CODING_QUESTIONS:
|
||
if test_case[0] == question: # Found matching test case
|
||
function_name = get_function_name_from_question(question)
|
||
test_cases = test_case[2] # Get test cases from tuple
|
||
success, debug_info, test_results = validate_with_debug(code, function_name, test_cases, model_name) # Changed model to model_name
|
||
results['tests_passed'] = success
|
||
results['test_results'] = test_results
|
||
break
|
||
else:
|
||
print(f"Code Validation: ❌")
|
||
|
||
else:
|
||
results['error'] = f"Unexpected response format: {response}"
|
||
|
||
except Exception as e:
|
||
print(f"\n{RED}Error in get_model_stats: {str(e)}{ENDC}")
|
||
results['error'] = str(e)
|
||
|
||
return results
|
||
|
||
def get_function_name_from_question(question: str) -> str:
|
||
"""Extract function name from question."""
|
||
if "fibonacci" in question.lower():
|
||
return "fibonacci"
|
||
elif "binary_search" in question.lower():
|
||
return "binary_search"
|
||
elif "palindrome" in question.lower():
|
||
return "is_palindrome"
|
||
elif "anagram" in question.lower():
|
||
return "are_anagrams"
|
||
return ""
|
||
|
||
def run_model_benchmark(model: str, server_url: str, num_runs: int = 4) -> Dict:
|
||
"""
|
||
Run multiple benchmarks for a model and calculate average metrics.
|
||
"""
|
||
# Check function calling (tools) compatibility
|
||
from tools import test_model_tools_support
|
||
supports_tools, error = test_model_tools_support(model)
|
||
if supports_tools:
|
||
print(f"\n{SUCCESS}Function Calling (Tools): ✅ Supported{ENDC}")
|
||
else:
|
||
error_msg = f" ({error})" if error else ""
|
||
print(f"\n{ERROR}Function Calling (Tools): ❌ Not Supported{error_msg}{ENDC}")
|
||
|
||
metrics = []
|
||
|
||
for i in range(num_runs):
|
||
print(f"\n{YELLOW}[{model}] Run {i+1}/{num_runs}:{ENDC}")
|
||
|
||
run_results = {}
|
||
for question_tuple in CODING_QUESTIONS:
|
||
test_name = get_test_name(question_tuple[0])
|
||
print(f"\n{BOLD}Testing {test_name}...{ENDC}")
|
||
try:
|
||
result = get_model_stats(model, question_tuple, server_url)
|
||
# Fix: Count actual passed cases from test results
|
||
result['passed_cases'] = len([r for r in result.get('test_results', []) if r])
|
||
result['total_cases'] = len(question_tuple[2])
|
||
run_results[test_name] = result
|
||
except Exception as e:
|
||
print(f"Error in run {i+1}: {e}")
|
||
continue
|
||
|
||
if run_results:
|
||
metrics.append(run_results)
|
||
|
||
# Take only the last 3 runs for averaging
|
||
metrics = metrics[-3:]
|
||
num_runs_used = len(metrics) # Actual number of runs used
|
||
|
||
if not metrics:
|
||
return {}
|
||
|
||
# Aggregate results
|
||
aggregated = {
|
||
'model': model,
|
||
'total_duration': mean([m[list(m.keys())[0]]['total_duration'] for m in metrics if m]),
|
||
'tokens_per_second': mean([m[list(m.keys())[0]]['tokens_per_second'] for m in metrics if m]),
|
||
'test_results': {}
|
||
}
|
||
|
||
# Calculate results per test
|
||
for test_name in metrics[-1].keys():
|
||
# Each test has 6 cases and we use last 3 runs
|
||
cases_per_run = 6
|
||
total_cases_this_test = cases_per_run * len(metrics) # 6 cases × number of runs used
|
||
|
||
# Sum up actual passed cases from the test results
|
||
total_passed_this_test = 0
|
||
for m in metrics:
|
||
test_results = m[test_name].get('test_results', [])
|
||
passed_in_run = len([r for r in test_results if r])
|
||
total_passed_this_test += passed_in_run
|
||
|
||
success_rate = (total_passed_this_test / total_cases_this_test * 100)
|
||
status = '✅' if success_rate == 100 else '❌'
|
||
|
||
# Print cumulative results header and results
|
||
if test_name == list(metrics[-1].keys())[0]:
|
||
print(f"\n{BOLD}Cumulative Results for each code question:{ENDC}")
|
||
|
||
print(f"{test_name}: {status} ({total_passed_this_test}/{total_cases_this_test} cases)")
|
||
|
||
aggregated['test_results'][test_name] = {
|
||
'success_rate': success_rate,
|
||
'passed_cases': total_passed_this_test,
|
||
'total_cases': total_cases_this_test,
|
||
'success_cases_rate': total_passed_this_test / total_cases_this_test,
|
||
'avg_duration': mean([m[test_name]['total_duration'] for m in metrics]),
|
||
'avg_tokens_sec': mean([m[test_name]['tokens_per_second'] for m in metrics])
|
||
}
|
||
|
||
# Calculate overall success rate and add min/max metrics
|
||
total_passed = sum(t['passed_cases'] for t in aggregated['test_results'].values())
|
||
total_cases = sum(t['total_cases'] for t in aggregated['test_results'].values())
|
||
aggregated['overall_success_rate'] = (total_passed / total_cases * 100) if total_cases > 0 else 0
|
||
aggregated['overall_success_cases_rate'] = (total_passed / total_cases) if total_cases > 0 else 0
|
||
|
||
# Add min and max metrics for both duration and tokens/sec
|
||
avg_durations = [t['avg_duration'] for t in aggregated['test_results'].values()]
|
||
avg_tokens_sec = [t['avg_tokens_sec'] for t in aggregated['test_results'].values()]
|
||
aggregated['min_avg_duration'] = min(avg_durations) if avg_durations else 0
|
||
aggregated['max_avg_duration'] = max(avg_durations) if avg_durations else 0
|
||
aggregated['min_tokens_per_second'] = min(avg_tokens_sec) if avg_tokens_sec else 0
|
||
aggregated['max_tokens_per_second'] = max(avg_tokens_sec) if avg_tokens_sec else 0
|
||
|
||
return aggregated
|
||
|
||
def print_leaderboard(results: List[Dict]):
|
||
"""Print leaderboard of model results."""
|
||
if not results:
|
||
print("No results to display")
|
||
return
|
||
|
||
# Sort by success rate first, then by tokens per second
|
||
sorted_results = sorted(results, key=lambda x: (
|
||
sum(t['passed_cases'] for t in x['test_results'].values()) / sum(t['total_cases'] for t in x['test_results'].values()) if sum(t['total_cases'] for t in x['test_results'].values()) > 0 else 0,
|
||
x['tokens_per_second']
|
||
), reverse=True)
|
||
|
||
print(f"\n{HEADER}{BOLD}🏆 Final Model Leaderboard:{ENDC}")
|
||
for i, result in enumerate(sorted_results, 1):
|
||
# Calculate stats for each model
|
||
total_passed = sum(t['passed_cases'] for t in result['test_results'].values())
|
||
total_cases = sum(t['total_cases'] for t in result['test_results'].values())
|
||
success_rate = (total_passed / total_cases * 100) if total_cases > 0 else 0
|
||
|
||
print(f"\n{BOLD}{YELLOW}{result['model']}{ENDC}")
|
||
print(f" {BOLD}Overall Success Rate:{ENDC} {success_rate:.1f}% ({total_passed}/{total_cases} cases)")
|
||
print(f" {BOLD}Average Tokens/sec:{ENDC} {result['tokens_per_second']:.2f} ({result['min_tokens_per_second']:.2f} - {result['max_tokens_per_second']:.2f})")
|
||
print(f" {BOLD}Average Duration:{ENDC} {result['total_duration']:.2f}s")
|
||
print(f" {BOLD}Min/Max Avg Duration:{ENDC} {result['min_avg_duration']:.2f}s / {result['max_avg_duration']:.2f}s")
|
||
print(f" {BOLD}Test Results:{ENDC}")
|
||
for test_name, test_result in result['test_results'].items():
|
||
status = '✅' if test_result['success_rate'] == 100 else '❌'
|
||
print(f" - {test_name}: {status} {test_result['passed_cases']}/{test_result['total_cases']} cases ({test_result['success_rate']:.1f}%)")
|
||
|
||
def get_available_models(server_url: str) -> List[str]:
|
||
"""Get list of available models from the specified Ollama server."""
|
||
try:
|
||
response = requests.get(f"{server_url}/api/tags").json()
|
||
return [model['name'] for model in response['models']]
|
||
except Exception as e:
|
||
print(f"{RED}Error getting model list from {server_url}: {e}{ENDC}")
|
||
return []
|
||
|
||
def check_model_exists_locally(model_name: str, server_url: str) -> bool:
|
||
"""Check if a model exists locally on the specified Ollama server."""
|
||
available_models = get_available_models(server_url)
|
||
return model_name in available_models
|
||
|
||
def download_model(model_name: str) -> bool:
|
||
"""Download a model using ollama pull command.
|
||
|
||
Args:
|
||
model_name: Name of the model to download
|
||
|
||
Returns:
|
||
bool: True if download was successful, False otherwise
|
||
"""
|
||
print(f"\n{INFO}Model '{model_name}' not found locally. Downloading...{ENDC}")
|
||
try:
|
||
# Run ollama pull command and capture output
|
||
process = subprocess.Popen(
|
||
["ollama", "pull", model_name],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
bufsize=1
|
||
)
|
||
|
||
# Print output in real-time
|
||
print(f"{INFO}Download progress:{ENDC}")
|
||
while True:
|
||
output = process.stdout.readline()
|
||
if output == '' and process.poll() is not None:
|
||
break
|
||
if output:
|
||
print(f"{MUTED}{output.strip()}{ENDC}")
|
||
|
||
# Check if download was successful
|
||
return_code = process.poll()
|
||
if return_code == 0:
|
||
print(f"\n{SUCCESS}Successfully downloaded model '{model_name}'.{ENDC}")
|
||
return True
|
||
else:
|
||
error = process.stderr.read()
|
||
print(f"\n{ERROR}Failed to download model '{model_name}': {error}{ENDC}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"\n{ERROR}Error downloading model '{model_name}': {e}{ENDC}")
|
||
return False
|
||
|
||
def get_model_details(model_name):
|
||
try:
|
||
result = subprocess.run(
|
||
["ollama", "show", model_name],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
encoding='utf-8',
|
||
errors='replace'
|
||
)
|
||
|
||
if result.returncode != 0:
|
||
print(f"Error: {result.stderr.strip()}")
|
||
return None
|
||
|
||
if not result.stdout.strip():
|
||
print(f"No details available for model: {model_name}")
|
||
return None
|
||
|
||
raw_output = result.stdout.strip()
|
||
lines = raw_output.split('\n')
|
||
current_section = None
|
||
|
||
for line in lines:
|
||
line = line.rstrip()
|
||
if line and not line.startswith(' '): # Section headers
|
||
current_section = line.strip()
|
||
print(f"\n {current_section}")
|
||
elif line and current_section: # Section content
|
||
# Split by multiple spaces and filter out empty parts
|
||
parts = [part for part in line.split(' ') if part.strip()]
|
||
if len(parts) >= 2:
|
||
key, value = parts[0].strip(), parts[-1].strip()
|
||
# Ensure consistent spacing for alignment
|
||
print(f" {key:<16} {value}")
|
||
elif len(parts) == 1:
|
||
# Handle single-value lines (like license text)
|
||
print(f" {parts[0].strip()}")
|
||
|
||
return None # No need to return formatted details anymore
|
||
|
||
except Exception as e:
|
||
print(f"An error occurred while getting model details: {e}")
|
||
return None
|
||
|
||
def update_server_results(server_url: str, results: List[Dict]) -> str:
|
||
try:
|
||
# Get CPU brand and format it for filename
|
||
cpu_info = get_cpu_info()
|
||
cpu_brand = cpu_info.get('brand_raw', 'Unknown_CPU').replace(' ', '_')
|
||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||
|
||
# Create a unique filename for this server's results
|
||
server_id = server_url.replace('http://', '').replace(':', '_').replace('/', '_')
|
||
results_dir = "benchmark_results"
|
||
|
||
os.makedirs(results_dir, exist_ok=True)
|
||
|
||
# Include CPU brand in filename
|
||
base_filename = f"{cpu_brand}_{server_id}"
|
||
json_filename = os.path.join(results_dir, f"{base_filename}.json")
|
||
log_filename = os.path.join(results_dir, f"{base_filename}.log")
|
||
|
||
# Load existing results or create new file
|
||
try:
|
||
with open(json_filename, 'r') as f:
|
||
existing_data = json.load(f)
|
||
except FileNotFoundError:
|
||
existing_data = {
|
||
'server_url': server_url,
|
||
'benchmarks': []
|
||
}
|
||
|
||
# Add new results with timestamp and ensure overall success rate is included
|
||
benchmark_entry = {
|
||
'timestamp': timestamp,
|
||
'results': []
|
||
}
|
||
|
||
# Add overall success rate to each model's results
|
||
for result in results:
|
||
total_passed = sum(t['passed_cases'] for t in result['test_results'].values())
|
||
total_cases = sum(t['total_cases'] for t in result['test_results'].values())
|
||
result['overall_success_rate'] = (total_passed / total_cases * 100) if total_cases > 0 else 0
|
||
result['min_avg_duration'] = min(t['avg_duration'] for t in result['test_results'].values()) if result['test_results'] else 0
|
||
result['max_avg_duration'] = max(t['avg_duration'] for t in result['test_results'].values()) if result['test_results'] else 0
|
||
benchmark_entry['results'].append(result)
|
||
|
||
existing_data['benchmarks'].append(benchmark_entry)
|
||
|
||
# Save updated results
|
||
with open(json_filename, 'w') as f:
|
||
json.dump(existing_data, f, indent=2)
|
||
|
||
print(f"{GREEN}Successfully saved results to {json_filename}{ENDC}")
|
||
|
||
# Save console output to log file
|
||
with open(log_filename, 'w') as f:
|
||
# Redirect stdout to capture the leaderboard output
|
||
import io
|
||
import sys
|
||
stdout = sys.stdout
|
||
str_output = io.StringIO()
|
||
sys.stdout = str_output
|
||
|
||
# Print CPU info
|
||
print("CPU Information:")
|
||
for key, value in cpu_info.items():
|
||
print(f"{key}: {value}")
|
||
print("\nBenchmark Results:")
|
||
print_leaderboard(results)
|
||
|
||
# Restore stdout and get the captured output
|
||
sys.stdout = stdout
|
||
log_content = str_output.getvalue()
|
||
|
||
# Write to log file
|
||
f.write(f"Benchmark Run: {timestamp}\n")
|
||
f.write(f"Server: {server_url}\n\n")
|
||
f.write(log_content)
|
||
|
||
print(f"{GREEN}Console output saved to {log_filename}{ENDC}")
|
||
|
||
return json_filename
|
||
|
||
except Exception as e:
|
||
print(f"{RED}Failed to save results: {str(e)}{ENDC}")
|
||
return None
|
||
|
||
def plot_benchmark_results(json_file=None):
|
||
"""
|
||
Plot benchmark results using the same functionality as lboard.py
|
||
|
||
Args:
|
||
json_file: Path to the JSON file with benchmark results. If None, uses the latest file.
|
||
"""
|
||
try:
|
||
# If no file specified, find the latest
|
||
if not json_file:
|
||
json_file = get_latest_json_file('benchmark_results')
|
||
if not json_file:
|
||
print(f"{RED}No benchmark results found{ENDC}")
|
||
return
|
||
|
||
with open(json_file, 'r') as f:
|
||
benchmark_data = json.load(f)
|
||
|
||
print(f"{INFO}Using benchmark file: {json_file}{ENDC}")
|
||
|
||
# Get all benchmark results and combine them
|
||
all_model_results = []
|
||
model_names = set()
|
||
|
||
# Process all benchmarks, keeping only the latest result for each model
|
||
for benchmark in benchmark_data['benchmarks']:
|
||
for model_result in benchmark.get('results', []):
|
||
model_name = model_result.get('model')
|
||
if model_name and model_name not in model_names:
|
||
all_model_results.append(model_result)
|
||
model_names.add(model_name)
|
||
elif model_name in model_names:
|
||
# Replace existing model with newer version
|
||
for i, existing_model in enumerate(all_model_results):
|
||
if existing_model.get('model') == model_name:
|
||
all_model_results[i] = model_result
|
||
break
|
||
|
||
# Calculate stats and sort models
|
||
model_stats = [calculate_model_stats(model) for model in all_model_results]
|
||
sorted_stats = sorted(model_stats,
|
||
key=lambda x: (x['overall_success_rate'], x['tokens_per_second']),
|
||
reverse=True)
|
||
|
||
print(f"\n🏆 Final Model Leaderboard:")
|
||
for stats in sorted_stats:
|
||
print(f"\n{stats['model']}")
|
||
print(f" Overall Success Rate: {stats['overall_success_rate']:.1f}%")
|
||
print(f" Average Tokens/sec: {stats['tokens_per_second']:.2f} ({stats['min_tokens_per_second']:.2f} - {stats['max_tokens_per_second']:.2f})")
|
||
print(f" Average Duration: {stats['total_duration']:.2f}s")
|
||
print(f" Min/Max Avg Duration: {stats['min_avg_duration']:.2f}s / {stats['max_avg_duration']:.2f}s")
|
||
print(f" Test Results:")
|
||
|
||
for test_name, test_result in stats['test_results'].items():
|
||
status = '✅' if test_result['success_rate'] == 100 else '❌'
|
||
print(f" - {test_name}: {status} {test_result['passed_cases']}/{test_result['total_cases']} cases ({test_result['success_rate']:.1f}%)")
|
||
|
||
# Generate visualization
|
||
plot_model_comparison(sorted_stats)
|
||
except Exception as e:
|
||
print(f"{RED}Error loading benchmark data: {e}{ENDC}")
|
||
|
||
def calculate_model_stats(model_result):
|
||
"""Calculate average stats for a model from its test results."""
|
||
test_results = model_result['test_results']
|
||
|
||
# Calculate overall success rate (average of all test success rates)
|
||
success_rates = [test['success_rate'] for test in test_results.values()]
|
||
overall_success_rate = sum(success_rates) / len(success_rates)
|
||
|
||
# Handle the case where some test results might not have avg_duration or avg_tokens_sec
|
||
# This is for backward compatibility with older benchmark results
|
||
min_avg_duration = max_avg_duration = None
|
||
min_tokens_per_second = max_tokens_per_second = None
|
||
|
||
# First try to get these values from the model_result directly (new format)
|
||
if 'min_avg_duration' in model_result and 'max_avg_duration' in model_result:
|
||
min_avg_duration = model_result['min_avg_duration']
|
||
max_avg_duration = model_result['max_avg_duration']
|
||
|
||
if 'min_tokens_per_second' in model_result and 'max_tokens_per_second' in model_result:
|
||
min_tokens_per_second = model_result['min_tokens_per_second']
|
||
max_tokens_per_second = model_result['max_tokens_per_second']
|
||
|
||
# If not available in the model_result, try to calculate from test_results (old format)
|
||
if min_avg_duration is None or max_avg_duration is None:
|
||
try:
|
||
min_avg_duration = min(test.get('avg_duration', float('inf')) for test in test_results.values() if 'avg_duration' in test)
|
||
max_avg_duration = max(test.get('avg_duration', 0) for test in test_results.values() if 'avg_duration' in test)
|
||
# If no test has avg_duration, use total_duration as fallback
|
||
if min_avg_duration == float('inf') or max_avg_duration == 0:
|
||
min_avg_duration = max_avg_duration = model_result['total_duration']
|
||
except (ValueError, KeyError):
|
||
# If calculation fails, use total_duration as fallback
|
||
min_avg_duration = max_avg_duration = model_result['total_duration']
|
||
|
||
if min_tokens_per_second is None or max_tokens_per_second is None:
|
||
try:
|
||
min_tokens_per_second = min(test.get('avg_tokens_sec', float('inf')) for test in test_results.values() if 'avg_tokens_sec' in test)
|
||
max_tokens_per_second = max(test.get('avg_tokens_sec', 0) for test in test_results.values() if 'avg_tokens_sec' in test)
|
||
# If no test has avg_tokens_sec, use tokens_per_second as fallback
|
||
if min_tokens_per_second == float('inf') or max_tokens_per_second == 0:
|
||
min_tokens_per_second = max_tokens_per_second = model_result['tokens_per_second']
|
||
except (ValueError, KeyError):
|
||
# If calculation fails, use tokens_per_second as fallback
|
||
min_tokens_per_second = max_tokens_per_second = model_result['tokens_per_second']
|
||
|
||
return {
|
||
'model': model_result['model'],
|
||
'overall_success_rate': overall_success_rate,
|
||
'tokens_per_second': model_result['tokens_per_second'],
|
||
'total_duration': model_result['total_duration'],
|
||
'min_avg_duration': min_avg_duration,
|
||
'max_avg_duration': max_avg_duration,
|
||
'min_tokens_per_second': min_tokens_per_second,
|
||
'max_tokens_per_second': max_tokens_per_second,
|
||
'test_results': test_results
|
||
}
|
||
|
||
def plot_model_comparison(model_stats):
|
||
"""Plot model comparison with dual y-axes for tokens/sec and success rate."""
|
||
models = [stat['model'] for stat in model_stats]
|
||
token_speeds = [stat['tokens_per_second'] for stat in model_stats]
|
||
success_rates = [stat['overall_success_rate'] for stat in model_stats]
|
||
durations = [stat['total_duration'] for stat in model_stats]
|
||
|
||
# Create figure and primary axis
|
||
fig, ax1 = plt.subplots(figsize=(15, 8))
|
||
|
||
# Plot tokens/sec bars using min and max values
|
||
for i, stat in enumerate(model_stats):
|
||
min_tokens = stat['min_tokens_per_second']
|
||
max_tokens = stat['max_tokens_per_second']
|
||
|
||
# Plot lower part (0 to min) with slightly darker blue
|
||
ax1.bar(i, min_tokens, color='royalblue', alpha=0.4)
|
||
# Plot upper part (min to max) with lighter blue
|
||
bar_height = max_tokens - min_tokens
|
||
ax1.bar(i, bar_height, bottom=min_tokens, color='royalblue', alpha=0.3)
|
||
|
||
ax1.set_ylabel('Tokens per Second', color='blue')
|
||
ax1.tick_params(axis='y', labelcolor='blue')
|
||
# Set y-axis range for tokens per second
|
||
max_token_speed = max(stat['max_tokens_per_second'] for stat in model_stats)
|
||
ax1.set_ylim(0, max(100, max_token_speed * 1.1)) # Add 10% padding above max value
|
||
|
||
# Set x-axis labels
|
||
ax1.set_xticks(range(len(models)))
|
||
ax1.set_xticklabels(models, rotation=45, ha='right', rotation_mode='anchor')
|
||
|
||
# Create secondary y-axis for success rate
|
||
ax2 = ax1.twinx()
|
||
ax2.plot(models, success_rates, 'r+', markersize=15, label='Success Rate', linestyle='None')
|
||
ax2.set_ylabel('Success Rate (%)', color='red')
|
||
ax2.tick_params(axis='y', labelcolor='red')
|
||
ax2.set_ylim(0, 100)
|
||
|
||
# Create third y-axis for duration
|
||
ax3 = ax1.twinx()
|
||
ax3.spines['right'].set_position(('outward', 60)) # Move third axis outward
|
||
# Add min and max duration markers
|
||
min_durations = [stat['min_avg_duration'] for stat in model_stats]
|
||
max_durations = [stat['max_avg_duration'] for stat in model_stats]
|
||
# Plot duration ranges with vertical lines and markers
|
||
for i, (min_d, max_d) in enumerate(zip(min_durations, max_durations)):
|
||
ax3.plot([i, i], [min_d, max_d], 'g-', linewidth=1) # Vertical line
|
||
ax3.plot(i, min_d, 'g-', markersize=10) # Min marker
|
||
ax3.plot(i, max_d, 'g-', markersize=10) # Max marker
|
||
|
||
ax3.set_ylabel('Duration (s)', color='green')
|
||
ax3.tick_params(axis='y', labelcolor='green')
|
||
|
||
# Customize x-axis labels with proper rotation
|
||
ax1.set_xticks(range(len(models)))
|
||
ax1.set_xticklabels(models, rotation=45, ha='right', rotation_mode='anchor')
|
||
for i, model in enumerate(models):
|
||
# Shorten model names by removing common suffixes
|
||
short_name = model.replace(':latest', '').replace('-uncensored', '')
|
||
ax1.get_xticklabels()[i].set_text(short_name)
|
||
# Updated conditions: success rate > 95% AND success rate / duration >= 5
|
||
if success_rates[i] > 95 and (success_rates[i] / durations[i] >= 5):
|
||
ax1.get_xticklabels()[i].set_color('green')
|
||
|
||
# Adjust layout to prevent label cutoff
|
||
plt.subplots_adjust(bottom=0.25, left=0.1, right=0.85)
|
||
|
||
plt.title('Model Performance Comparison')
|
||
plt.tight_layout()
|
||
|
||
# Save the figure before showing it
|
||
output_path = 'benchmark_results/model_comparison.png'
|
||
plt.savefig(output_path, dpi=300, bbox_inches='tight')
|
||
print(f"{INFO}Plot saved as '{output_path}'{ENDC}")
|
||
|
||
# Show the figure (optional - can be removed for headless environments)
|
||
plt.show()
|
||
|
||
def get_latest_json_file(directory):
|
||
"""Find the latest JSON file in the specified directory."""
|
||
json_files = glob.glob(os.path.join(directory, '*.json'))
|
||
print(f"{INFO}Found JSON files: {json_files}{ENDC}")
|
||
latest_file = max(json_files, key=os.path.getmtime) if json_files else None
|
||
return latest_file
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='Run Ollama model benchmarks')
|
||
parser.add_argument('--server', choices=['local', 'remote'], default='local',
|
||
help='Choose Ollama server (default: local)')
|
||
parser.add_argument('--model', type=str, help='Specific model to benchmark')
|
||
parser.add_argument('--number', type=str, help='Number of models to benchmark (number or "all")')
|
||
parser.add_argument('--verbose', action='store_true', help='Enable verbose output')
|
||
parser.add_argument('--plot-only', action='store_true',
|
||
help='Skip benchmarking and just plot graphs from existing results')
|
||
parser.add_argument('--no-plot', action='store_true',
|
||
help='Run benchmarking without plotting graphs at the end')
|
||
parser.add_argument('--file', type=str,
|
||
help='Specify a benchmark results file to use for plotting (only with --plot-only)')
|
||
parser.add_argument('--funcall', type=str, nargs='?', const='all',
|
||
help='Check function calling (tools) compatibility of models. Specify a model name or "all" for all models')
|
||
args = parser.parse_args()
|
||
|
||
# Set global verbose flag
|
||
global verbose
|
||
verbose = args.verbose
|
||
|
||
# Handle plot-only mode
|
||
if args.plot_only:
|
||
print(f"{INFO}Running in plot-only mode...{ENDC}")
|
||
plot_benchmark_results(args.file)
|
||
return
|
||
|
||
# Handle function calling compatibility check mode
|
||
if args.funcall is not None:
|
||
server_url = SERVERS[args.server]
|
||
print(f"{INFO}Checking function calling (tools) compatibility...{ENDC}")
|
||
|
||
if args.funcall.lower() == 'all':
|
||
# Check all available models
|
||
compatibility = get_tools_compatible_models(server_url=server_url)
|
||
print_tools_compatibility_table(compatibility)
|
||
else:
|
||
# Check specific model
|
||
model_name = args.funcall
|
||
print(f"{INFO}Checking function calling compatibility for {model_name}...{ENDC}")
|
||
supports_tools, error = get_tools_compatible_models(model=model_name)
|
||
|
||
if supports_tools:
|
||
print(f"{SUCCESS}✅ {model_name}: Supports function calling (tools){ENDC}")
|
||
else:
|
||
error_msg = f" ({error})" if error else ""
|
||
print(f"{ERROR}❌ {model_name}: Does not support function calling (tools){error_msg}{ENDC}")
|
||
return
|
||
|
||
server_url = SERVERS[args.server]
|
||
|
||
print()
|
||
print(f"{HEADER}{BOLD}CPU Information:{ENDC}")
|
||
cpu_info = get_cpu_info()
|
||
for key, value in cpu_info.items():
|
||
print(f"{MUTED}{key}: {value}{ENDC}")
|
||
|
||
print()
|
||
print(f"{INFO}Using Ollama server at {server_url}...{ENDC}")
|
||
|
||
# Get available models or use specified model
|
||
if args.model:
|
||
# Check if the specified model exists locally
|
||
if not check_model_exists_locally(args.model, server_url):
|
||
# If not, try to download it
|
||
if download_model(args.model):
|
||
# Verify the model is now available
|
||
if check_model_exists_locally(args.model, server_url):
|
||
models = [args.model]
|
||
else:
|
||
print(f"{WARNING}Model '{args.model}' was downloaded but not found on server. Please check manually.{ENDC}")
|
||
return
|
||
else:
|
||
print(f"{RED}Could not download model '{args.model}'. Exiting.{ENDC}")
|
||
return
|
||
else:
|
||
print(f"{SUCCESS}Using locally available model: {args.model}{ENDC}")
|
||
models = [args.model]
|
||
else:
|
||
models = get_available_models(server_url)
|
||
|
||
if not models:
|
||
print(f"{RED}No models found on server {server_url}. Exiting.{ENDC}")
|
||
return
|
||
|
||
# Handle number of models to test
|
||
if args.number and args.number.lower() != 'all':
|
||
try:
|
||
num_models = int(args.number)
|
||
if num_models > 0:
|
||
models = models[:num_models]
|
||
else:
|
||
print(f"{WARNING}Invalid number of models. Using all available models.{ENDC}")
|
||
except ValueError:
|
||
print(f"{WARNING}Invalid number format. Using all available models.{ENDC}")
|
||
|
||
print(f"{INFO}Testing {len(models)} models :{ENDC}")
|
||
for i, model in enumerate(models, 1):
|
||
print(f"{YELLOW}{i}. {model}{ENDC}")
|
||
|
||
# Run benchmarks
|
||
all_results = []
|
||
|
||
for model in models:
|
||
print(f"\n{HEADER}{BOLD}Benchmarking {model}...{ENDC}")
|
||
details = get_model_details(model)
|
||
if details:
|
||
print(f"\n{INFO}Model Details:{ENDC}")
|
||
if "details" in details:
|
||
for section, items in details["details"].items():
|
||
print(f"\n{BOLD}{section}{ENDC}")
|
||
for key, value in items.items():
|
||
print(f" {key}: {value}")
|
||
else:
|
||
print(json.dumps(details, indent=2))
|
||
result = run_model_benchmark(model, server_url)
|
||
if 'error' not in result:
|
||
all_results.append(result)
|
||
|
||
# Print and save results
|
||
print_leaderboard(all_results)
|
||
json_file = update_server_results(server_url, all_results)
|
||
|
||
# Plot results unless --no-plot is specified
|
||
if not args.no_plot:
|
||
print(f"{INFO}Generating performance plot...{ENDC}")
|
||
plot_benchmark_results(json_file)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|