From a3b06718a2814ff5ddc0f87cc72f236f85165bf0 Mon Sep 17 00:00:00 2001 From: leduc Date: Sun, 2 Mar 2025 22:40:55 +0100 Subject: [PATCH] first commit --- README.md | 130 ++++++- benchmark_results/.DS_Store | Bin 0 -> 6148 bytes devbook.md | 222 +++++++++++ lboard.py | 144 +++++++ main.py | 732 ++++++++++++++++++++++++++++++++++++ requirements.txt | 6 + 6 files changed, 1232 insertions(+), 2 deletions(-) create mode 100644 benchmark_results/.DS_Store create mode 100644 devbook.md create mode 100644 lboard.py create mode 100644 main.py create mode 100644 requirements.txt diff --git a/README.md b/README.md index 7e1deab..6cca745 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,129 @@ -# codebench +# Codebench - Ollama Model Benchmark Tool -Custom benchmarks for LLMs \ No newline at end of file +A Python-based benchmarking tool for testing and comparing different Ollama models on coding tasks. + +## Features + +- Test multiple Ollama models against common coding problems +- Measure performance metrics (tokens/sec, response time) +- Track success rates across different coding challenges +- Support for local and remote Ollama servers +- Detailed test results and leaderboard generation +- CPU information tracking for benchmarks + +## Prerequisites + +- Python 3.8+ +- Ollama server (local or remote) +- Together API key (optional, for advanced code analysis) + +## Installation + +1. Clone the repository: +```bash +git clone https://github.com/yourusername/codebench.git +cd codebench + + +2. Install required packages: +```bash +pip install -r requirements.txt + ``` + +3. (Optional) Set up Together API: +```bash +export TOGETHER_API_KEY='your_api_key_here' + ``` + +## Usage +Basic usage: + +```bash +python3 main.py + ``` + +Available options: + +```bash +python main.py --server [local|z60] --model [model_name] --number [count|all] --verbose + ``` + +## Arguments: + +- --server : Choose Ollama server (default: local) +- --model : Test specific model only +- --number : Number of models to test +- --verbose : Enable detailed output + +## Supported Tests +The tool currently tests models on these coding challenges: + +1. Fibonacci Sequence +2. Binary Search +3. Palindrome Check +4. Anagram Detection + +## Test Process & Validation + +### Code Generation +1. Each model is prompted with specific coding tasks +2. Generated code is extracted from the model's response +3. Initial syntax validation is performed + +### Test Validation +For each test case: +- Input values are provided to the function +- Output is compared with expected results +- Test results are marked as āœ… (pass) or āŒ (fail) + +Example test cases: +```plaintext +Fibonacci: +- Input: 6 Expected: 8 +- Input: 0 Expected: 0 +- Input: -1 Expected: -1 + +Binary Search: +- Input: ([1,2,3,4,5], 3) Expected: 2 +- Input: ([], 1) Expected: -1 +- Input: ([1], 1) Expected: 0 +``` + +## Output +Results are saved in the benchmark_results directory with the following naming convention: + +```plaintext +[CPU_Model]_[Server_Address].json + ``` + +Example: + +```plaintext +Apple_M1_Pro_localhost_11434.json + ``` + +## Server Configuration +Default servers are configured in the code: + +- Local: http://localhost:11434 +- Z60: http://192.168.196.60:11434 + +## Example Output +```plaintext +šŸ† Final Model Leaderboard: + +codellama:13b + Overall Success Rate: 95.8% (23/24 cases) + Average Tokens/sec: 145.23 + Average Duration: 2.34s + Test Results: + - Fibonacci: āœ… 6/6 cases (100.0%) + - Binary Search: āœ… 6/6 cases (100.0%) + ``` + + +## Contributing +Feel free to submit issues and enhancement requests! + +## License +[Your chosen license] \ No newline at end of file diff --git a/benchmark_results/.DS_Store b/benchmark_results/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T090% success AND success_rate > 5*duration +- Prevents duplicate model entries in metrics + +### Data Visualization +# Development Notes + +## Project Structure +- `main.py`: Core benchmarking functionality +- `lboard.py`: Leaderboard visualization and results analysis +- `benchmark_results/`: Directory containing JSON benchmark results + +## Visualization Features +- Blue bars: Tokens per second performance +- Red + markers: Overall success rate (%) +- Green - markers: Total duration (seconds) +- Green model names: Models with >90% success rate +- Triple y-axis plot for easy metric comparison + +## Running the Leaderboard +```bash +# View latest results +python lboard.py + +# View specific results file +python lboard.py path/to/results.json +``` +- Single plot per model (no duplicates) +- Color-coded bars based on performance +- Success rate indicators (red +) +- Duration indicators (green -) +- Dynamic axis scaling +- Combined legend for all metrics + +### Output Format +- Detailed test results per model +- Individual test case counts +- Validation status indicators +- Overall success rates +- Performance metrics +- Plot position information + +Model: model_name +ā”œā”€ Tokens/sec: XX.XX +ā”œā”€ Total Duration: XX.XXs +ā”œā”€ Test Results: +ā”‚ ā”œā”€ Test1: X/Y cases (ZZ.Z%) [validation status] +ā”‚ ā”œā”€ Test2: X/Y cases (ZZ.Z%) [validation status] +ā””ā”€ Overall Success Rate: X/Y (ZZ.Z%) + +### Error Handling +- Handles missing test results +- Processes validation failures appropriately +- Maintains consistent case counting +- Prevents data duplication +- Ensures accurate success rate calculations \ No newline at end of file diff --git a/lboard.py b/lboard.py new file mode 100644 index 0000000..1419646 --- /dev/null +++ b/lboard.py @@ -0,0 +1,144 @@ +import json +import os +import argparse +import glob +import matplotlib.pyplot as plt + +def get_latest_json_file(directory): + json_files = glob.glob(os.path.join(directory, '*.json')) + print(f"Found JSON files: {json_files}") + latest_file = max(json_files, key=os.path.getmtime) if json_files else None + return latest_file +def calculate_model_stats(model_result): + """Calculate average stats for a model from its test results.""" + test_results = model_result['test_results'] + + # Calculate overall success rate (average of all test success rates) + success_rates = [test['success_rate'] for test in test_results.values()] + overall_success_rate = sum(success_rates) / len(success_rates) + + return { + 'model': model_result['model'], + 'overall_success_rate': overall_success_rate, + 'tokens_per_second': model_result['tokens_per_second'], + 'total_duration': model_result['total_duration'], + 'test_results': test_results + } + +def plot_model_comparison(model_stats): + """Plot model comparison with dual y-axes for tokens/sec and success rate.""" + models = [stat['model'] for stat in model_stats] + token_speeds = [stat['tokens_per_second'] for stat in model_stats] + success_rates = [stat['overall_success_rate'] for stat in model_stats] + durations = [stat['total_duration'] for stat in model_stats] + + # Create figure and primary axis + fig, ax1 = plt.subplots(figsize=(15, 8)) + + # Plot tokens/sec bars on primary y-axis with lighter blue and more transparency + bars = ax1.bar(models, token_speeds, color='royalblue', alpha=0.3) + ax1.set_ylabel('Tokens per Second', color='blue') + ax1.tick_params(axis='y', labelcolor='blue') + + # Create secondary y-axis for success rate + ax2 = ax1.twinx() + ax2.plot(models, success_rates, 'r+', markersize=15, label='Success Rate', linestyle='None') + ax2.set_ylabel('Success Rate (%)', color='red') + ax2.tick_params(axis='y', labelcolor='red') + ax2.set_ylim(0, 100) + + # Create third y-axis for duration + ax3 = ax1.twinx() + ax3.spines['right'].set_position(('outward', 60)) # Move third axis outward + ax3.plot(models, durations, 'g_', markersize=15, label='Duration', linestyle='None') + ax3.set_ylabel('Duration (s)', color='green') + ax3.tick_params(axis='y', labelcolor='green') + + # Customize x-axis labels with proper rotation + ax1.set_xticks(range(len(models))) + ax1.set_xticklabels(models, rotation=45, ha='right', rotation_mode='anchor') + for i, model in enumerate(models): + # Shorten model names by removing common suffixes + short_name = model.replace(':latest', '').replace('-uncensored', '') + ax1.get_xticklabels()[i].set_text(short_name) + if success_rates[i] > 90: + ax1.get_xticklabels()[i].set_color('green') + + # Adjust layout to prevent label cutoff + plt.subplots_adjust(bottom=0.25, left=0.1, right=0.85) + + ''' + # Add value labels + for i, bar in enumerate(bars): + ax1.text(i, token_speeds[i], f'{token_speeds[i]:.1f}', + ha='center', va='bottom', color='black') + ax2.text(i, success_rates[i], f'{success_rates[i]:.1f}%', + ha='center', va='bottom', color='black') + ax3.text(i, durations[i], f'{durations[i]:.1f}s', + ha='center', va='top', color='black') + ''' + plt.title('Model Performance Comparison') + plt.tight_layout() + + plt.show() + plt.savefig('benchmark_results/model_comparison.png') + print("\nPlot saved as 'benchmark_results/model_comparison.png'") + +def print_leaderboard(benchmark_data): + """Print leaderboard from benchmark results.""" + if not benchmark_data.get('benchmarks'): + print("No benchmark data to display") + return + + # Get the latest benchmark results + latest_benchmark = benchmark_data['benchmarks'][-1] + model_results = latest_benchmark['results'] + + # Calculate stats and sort models + model_stats = [calculate_model_stats(model) for model in model_results] + sorted_stats = sorted(model_stats, + key=lambda x: (x['overall_success_rate'], x['tokens_per_second']), + reverse=True) + + print(f"\nšŸ† Final Model Leaderboard:") + for stats in sorted_stats: + print(f"\n{stats['model']}") + print(f" Overall Success Rate: {stats['overall_success_rate']:.1f}%") + print(f" Average Tokens/sec: {stats['tokens_per_second']:.2f}") + print(f" Average Duration: {stats['total_duration']:.2f}s") + print(f" Test Results:") + + for test_name, test_result in stats['test_results'].items(): + status = 'āœ…' if test_result['success_rate'] == 100 else 'āŒ' + print(f" - {test_name}: {status} {test_result['success_rate']:.1f}%") + + # Generate visualization + plot_model_comparison(sorted_stats) + +def main(): + parser = argparse.ArgumentParser(description='Display benchmark leaderboard') + parser.add_argument('filepath', nargs='?', help='Path to benchmark results JSON file') + parser.add_argument('--file', type=str, help='Path to benchmark results JSON file (alternative way)') + args = parser.parse_args() + + try: + # Use filepath if provided, then --file, otherwise find latest + if args.filepath: + json_file = args.filepath + elif args.file: + json_file = args.file + else: + json_file = get_latest_json_file('benchmark_results') + if not json_file: + print("No benchmark results found") + return + + with open(json_file, 'r') as f: + benchmark_data = json.load(f) + print(f"Using benchmark file: {json_file}") + print_leaderboard(benchmark_data) + except Exception as e: + print(f"Error loading benchmark data: {e}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..8bbc57a --- /dev/null +++ b/main.py @@ -0,0 +1,732 @@ +from tabnanny import verbose +import ollama +import time +from typing import List, Dict, Any +import json +from statistics import mean +import re +import ast +import argparse +import requests +import os +from together import Together +from cpuinfo import get_cpu_info +import subprocess + + +# ANSI color codes +SUCCESS = '\033[38;5;78m' # Soft mint green for success +ERROR = '\033[38;5;203m' # Soft coral red for errors +INFO = '\033[38;5;75m' # Sky blue for info +HEADER = '\033[38;5;147m' # Soft purple for headers +WARNING = '\033[38;5;221m' # Warm gold for warnings +EMPHASIS = '\033[38;5;159m' # Cyan for emphasis +MUTED = '\033[38;5;246m' # Subtle gray for less important text +ENDC = '\033[0m' +BOLD = '\033[1m' + +# Replace existing color usages +GREEN = SUCCESS +RED = ERROR +BLUE = INFO +YELLOW = WARNING +WHITE = MUTED + +# Server configurations +SERVERS = { + 'local': 'http://localhost:11434', + 'z60': 'http://192.168.196.60:11434' +} + +class Timer: + def __init__(self): + self.start_time = None + self.end_time = None + + def start(self): + self.start_time = time.time() + + def stop(self): + self.end_time = time.time() + + def elapsed_time(self): + if self.start_time is None: + return 0 + if self.end_time is None: + return time.time() - self.start_time + return self.end_time - self.start_time + +def extract_code_from_response(response: str) -> str: + """Extract Python code from a markdown-formatted string.""" + code_blocks = re.findall(r'```python\n(.*?)```', response, re.DOTALL) + if code_blocks: + return code_blocks[0].strip() + return response + +def is_valid_python(code: str) -> bool: + """Check if the code is valid Python syntax.""" + try: + ast.parse(code) + return True + except SyntaxError: + return False + +def analyze_failed_code(code: str, test_case: tuple, expected: any, actual: any, function_name: str, model: str) -> bool: + """Analyze why code failed using Together API. Returns True if Together thinks the code should work.""" + prompt = f"""Analyze this Python code and explain why it failed the test case. Format your response EXACTLY as follows: + +ASSESSMENT: [Write a one-line assessment: either "SHOULD PASS" or "SHOULD FAIL" followed by a brief reason] + +ANALYSIS: +[Detailed analysis of why the code failed and how to fix it] + +Code: +{code} + +Test case: +Input: {test_case} +Expected output: {expected} +Actual output: {actual} +Function name required: {function_name} +Model: {model}""" + + try: + TOGETHER_API_KEY = os.environ["TOGETHER_API_KEY"] + together_client = Together(api_key=TOGETHER_API_KEY) + response = together_client.chat.completions.create( + model="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", + messages=[ + {"role": "system", "content": "You are a Python expert analyzing code failures. Always format your response with ASSESSMENT and ANALYSIS sections."}, + {"role": "user", "content": prompt} + ], + max_tokens=1000, + temperature=0.7, + top_p=0.7, + top_k=50, + repetition_penalty=1, + stop=["<|eot_id|>", "<|eom_id|>"] + ) + + analysis = response.choices[0].message.content + should_pass = "SHOULD PASS" in analysis.upper() + if verbose: print(f"\n{BLUE}[{model}] Together Analysis:{ENDC}") + if verbose: print(f"{GREEN if should_pass else RED}{analysis}{ENDC}") + return should_pass + except Exception as e: + print(f"\n{RED}Error getting Together API analysis: {e}{ENDC}") + return False + +def validate_with_debug(code: str, function_name: str, test_cases: List[tuple], model: str) -> tuple[bool, str, List[bool]]: + """Validate code with detailed debug information. Returns (success, debug_info, test_results)""" + debug_info = [] + test_results = [] # Track individual test case results + test_outputs = [] # Store test outputs for combined display + + try: + # Create a local namespace + namespace = {} + debug_info.append(f"Executing code:\n{code}") + + try: + # Redirect stdout to capture prints from the executed code + import io + import sys + stdout = sys.stdout + sys.stdout = io.StringIO() + + # Execute the code + exec(code, namespace) + + # Restore stdout + sys.stdout = stdout + + except Exception as e: + if 'sys' in locals(): # Restore stdout if it was changed + sys.stdout = stdout + if verbose: print(f"\n{RED}Failed code:{ENDC}\n{code}") + return False, f"Error executing code: {str(e)}", test_results + + if function_name not in namespace: + if verbose: print(f"\n{RED}Failed code:{ENDC}\n{code}") + together_opinion = analyze_failed_code(code, "N/A", f"Function named '{function_name}'", + f"Found functions: {list(namespace.keys())}", function_name, model) + print(f"\nTests passed: āŒ Together opinion: {'āœ…' if together_opinion else 'āŒ'}") + return False, f"Function '{function_name}' not found in code. Available names: {list(namespace.keys())}", test_results + + function = namespace[function_name] + debug_info.append(f"Function {function_name} found") + + # Run test cases + all_passed = True + for i, (test_input, expected) in enumerate(test_cases): + try: + # Redirect stdout for each test case + stdout = sys.stdout + sys.stdout = io.StringIO() + + if isinstance(test_input, tuple): + result = function(*test_input) + else: + result = function(test_input) + + # Restore stdout + sys.stdout = stdout + + # Store result but don't print individually + test_outputs.append(str(result)) + test_passed = result == expected + test_results.append(test_passed) + + if not test_passed: + if verbose: print(f"\n{RED}Failed code:{ENDC}\n{code}") + print(f"\n{RED}Test case {i+1} failed:{ENDC}") + print(f"Input: {test_input} Expected: {expected} Got: {result}") + + together_opinion = analyze_failed_code(code, test_input, expected, result, function_name, model) + print(f"Tests passed: āŒ Together opinion: {'āœ…' if together_opinion else 'āŒ'}") + + all_passed = False + continue + + debug_info.append(f"Test case {i+1} passed: {test_input} ā†’ {result}") + except Exception as e: + if 'sys' in locals(): # Restore stdout if it was changed + sys.stdout = stdout + test_outputs.append(f"Error: {str(e)}") + if verbose: print(f"\n{RED}Failed code:{ENDC}\n{code}") + print(f"\n{RED}{str(e)} in test case {i+1} Input: {test_input} Expected: {expected}") + + together_opinion = analyze_failed_code(code, test_input, expected, f"Error: {str(e)}", function_name, model) + print(f"Tests passed: āŒ Together opinion: {'āœ…' if together_opinion else 'āŒ'}") + + test_results.append(False) + all_passed = False + continue + finally: + if 'sys' in locals(): # Always restore stdout + sys.stdout = stdout + + # Print all test outputs on one line + # print(f"{WHITE}{BOLD}Test outputs: {join(test_outputs)}{ENDC}") + print(f"{WHITE}Test outputs: {', '.join(test_outputs)}{ENDC}") + + if all_passed: + print(f"Tests passed: āœ…") + return True, "All tests passed!\n" + "\n".join(debug_info), test_results + print(f"Tests passed: āŒ") + return False, "Some tests failed", test_results + except Exception as e: + if 'sys' in locals(): # Restore stdout if it was changed + sys.stdout = stdout + print(f"\n{RED}Error in validate_with_debug: {str(e)}{ENDC}") + return False, f"Unexpected error: {str(e)}", test_results + +def test_fibonacci(): + question = """Write a Python function named EXACTLY 'fibonacci' (not fibonacci_dp or any other name) that returns the nth Fibonacci number. +The function signature must be: def fibonacci(n) + +Requirements: +1. Handle edge cases: + - For n = 0, return 0 + - For n = 1 or n = 2, return 1 + - For negative numbers, return -1 +2. For n > 2: F(n) = F(n-1) + F(n-2) +3. Use dynamic programming or memoization for efficiency +4. Do NOT use any print statements - just return the values + +Example sequence: 0,1,1,2,3,5,8,13,21,... +Example calls: +- fibonacci(6) returns 8 +- fibonacci(0) returns 0 +- fibonacci(-1) returns -1""" + + test_cases = [ + (0, 0), # Edge case: n = 0 + (1, 1), # Edge case: n = 1 + (2, 1), # Edge case: n = 2 + (6, 8), # Regular case + (10, 55), # Larger number + (-1, -1), # Edge case: negative input + ] + + def validate(code: str) -> bool: + success, debug_info, test_results = validate_with_debug(code, 'fibonacci', test_cases, "N/A") + return success + + return (question, validate, test_cases) + +def test_binary_search(): + question = """Write a Python function named EXACTLY 'binary_search' that performs binary search on a sorted list. +The function signature must be: def binary_search(arr, target) + +Requirements: +1. The function takes two arguments: + - arr: a sorted list of integers + - target: the integer to find +2. Return the index of the target if found +3. Return -1 if the target is not in the list +4. Do NOT use any print statements - just return the values + +Example: +- binary_search([1,2,3,4,5], 3) returns 2 +- binary_search([1,2,3,4,5], 6) returns -1""" + + test_cases = [ + (([1,2,3,4,5], 3), 2), # Regular case: target in middle + (([1,2,3,4,5], 1), 0), # Edge case: target at start + (([1,2,3,4,5], 5), 4), # Edge case: target at end + (([1,2,3,4,5], 6), -1), # Edge case: target not in list + (([], 1), -1), # Edge case: empty list + (([1], 1), 0), # Edge case: single element list + ] + + def validate(code: str) -> bool: + success, debug_info, test_results = validate_with_debug(code, 'binary_search', test_cases, "N/A") + return success + + return (question, validate, test_cases) + +def test_palindrome(): + question = """Write a Python function named EXACTLY 'is_palindrome' that checks if a string is a palindrome. +The function signature must be: def is_palindrome(s) + +Requirements: +1. The function takes one argument: + - s: a string to check +2. Return True if the string is a palindrome, False otherwise +3. Ignore case (treat uppercase and lowercase as the same) +4. Ignore non-alphanumeric characters (spaces, punctuation) +5. Do NOT use any print statements - just return the values + +Example: +- is_palindrome("A man, a plan, a canal: Panama") returns True +- is_palindrome("race a car") returns False""" + + test_cases = [ + ("A man, a plan, a canal: Panama", True), # Regular case with punctuation + ("race a car", False), # Regular case, not palindrome + ("", True), # Edge case: empty string + ("a", True), # Edge case: single character + ("Was it a car or a cat I saw?", True), # Complex case with punctuation + ("hello", False), # Simple case, not palindrome + ] + + def validate(code: str) -> bool: + success, debug_info, test_results = validate_with_debug(code, 'is_palindrome', test_cases, "N/A") + return success + + return (question, validate, test_cases) + +def test_anagram(): + question = """Write a Python function named EXACTLY 'are_anagrams' that checks if two strings are anagrams. +The function signature must be: def are_anagrams(str1, str2) + +Requirements: +1. The function takes two arguments: + - str1: first string + - str2: second string +2. Return True if the strings are anagrams, False otherwise +3. Ignore case (treat uppercase and lowercase as the same) +4. Ignore spaces +5. Consider only alphanumeric characters +6. Do NOT use any print statements - just return the values + +Example: +- are_anagrams("listen", "silent") returns True +- are_anagrams("hello", "world") returns False""" + + test_cases = [ + (("listen", "silent"), True), # Regular case + (("hello", "world"), False), # Not anagrams + (("", ""), True), # Edge case: empty strings + (("a", "a"), True), # Edge case: single char + (("Debit Card", "Bad Credit"), True), # Case and space test + (("Python", "Java"), False), # Different lengths + ] + + def validate(code: str) -> bool: + success, debug_info, test_results = validate_with_debug(code, 'are_anagrams', test_cases, "N/A") + return success + + return (question, validate, test_cases) + +# List of all test cases +CODING_QUESTIONS = [ + test_fibonacci(), + test_binary_search(), + test_palindrome(), + test_anagram() +] + +# Add test names as constants +TEST_NAMES = { + "Write a Python func": "Fibonacci", + "Write a Python func": "Binary Search", + "Write a Python func": "Palindrome", + "Write a Python func": "Anagram Check" +} + +def get_test_name(question: str) -> str: + """Get a friendly name for the test based on the question.""" + if "fibonacci" in question.lower(): + return "Fibonacci" + elif "binary_search" in question.lower(): + return "Binary Search" + elif "palindrome" in question.lower(): + return "Palindrome" + elif "anagram" in question.lower(): + return "Anagram Check" + return question[:20] + "..." + +def get_model_stats(model_name: str, question_tuple: tuple, server_url: str) -> Dict: + """ + Get performance statistics for a specific model and validate the response. + """ + question, validator, test_cases = question_tuple + timer = Timer() + results = { + 'model': model_name, + 'total_duration': 0, + 'tokens_per_second': 0, + 'code_valid': False, + 'tests_passed': False, + 'error': None, + 'test_results': [] + } + + try: + timer.start() + print(f'{WHITE}Requesting code from {server_url} with {model_name}{ENDC}') + response = requests.post( + f"{server_url}/api/chat", + json={ + "model": model_name, + "messages": [{'role': 'user', 'content': question}], + "stream": False + } + ).json() + timer.stop() + + # Get performance metrics from response + total_tokens = response.get('eval_count', 0) + total_duration = response.get('total_duration', 0) + total_response_time = float(total_duration) / 1e9 + + results['total_duration'] = total_response_time + if total_tokens > 0 and total_response_time > 0: + results['tokens_per_second'] = total_tokens / total_response_time + + # Print concise performance metrics + print(f"Total Duration (s): {total_response_time:.2f} / Total Tokens: {total_tokens} / Tokens per Second: {results['tokens_per_second']:.2f}") + + # Extract code from response + if 'message' in response and 'content' in response['message']: + code = extract_code_from_response(response['message']['content']) + + # Validate code + results['code_valid'] = is_valid_python(code) + + if results['code_valid']: + print(f"Code validation: āœ…") + # Get validation results + print(f'{WHITE}Running tests...{ENDC}') + for test_case in CODING_QUESTIONS: + if test_case[0] == question: # Found matching test case + function_name = get_function_name_from_question(question) + test_cases = test_case[2] # Get test cases from tuple + success, debug_info, test_results = validate_with_debug(code, function_name, test_cases, model_name) # Changed model to model_name + results['tests_passed'] = success + results['test_results'] = test_results + break + else: + print(f"Code Validation: āŒ") + + else: + results['error'] = f"Unexpected response format: {response}" + + except Exception as e: + print(f"\n{RED}Error in get_model_stats: {str(e)}{ENDC}") + results['error'] = str(e) + + return results + +def get_function_name_from_question(question: str) -> str: + """Extract function name from question.""" + if "fibonacci" in question.lower(): + return "fibonacci" + elif "binary_search" in question.lower(): + return "binary_search" + elif "palindrome" in question.lower(): + return "is_palindrome" + elif "anagram" in question.lower(): + return "are_anagrams" + return "" + +def run_model_benchmark(model: str, server_url: str, num_runs: int = 4) -> Dict: + """ + Run multiple benchmarks for a model and calculate average metrics. + """ + metrics = [] + + for i in range(num_runs): + print(f"\n{YELLOW}[{model}] Run {i+1}/{num_runs}:{ENDC}") + + run_results = {} + for question_tuple in CODING_QUESTIONS: + test_name = get_test_name(question_tuple[0]) + print(f"\n{BOLD}Testing {test_name}...{ENDC}") + try: + result = get_model_stats(model, question_tuple, server_url) + # Fix: Count actual passed cases from test results + result['passed_cases'] = len([r for r in result.get('test_results', []) if r]) + result['total_cases'] = len(question_tuple[2]) + run_results[test_name] = result + except Exception as e: + print(f"Error in run {i+1}: {e}") + continue + + if run_results: + metrics.append(run_results) + + # Take only the last 3 runs for averaging + metrics = metrics[-3:] + num_runs_used = len(metrics) # Actual number of runs used + + if not metrics: + return {} + + # Aggregate results + aggregated = { + 'model': model, + 'total_duration': mean([m[list(m.keys())[0]]['total_duration'] for m in metrics if m]), + 'tokens_per_second': mean([m[list(m.keys())[0]]['tokens_per_second'] for m in metrics if m]), + 'test_results': {} + } + + # Calculate results per test + for test_name in metrics[-1].keys(): + # Sum up actual passed cases for this test across runs + passed_cases = sum(m[test_name]['passed_cases'] for m in metrics) + # Calculate total possible cases (6 cases Ɨ number of actual runs) + total_possible_cases = 6 * num_runs_used + + success_rate = (passed_cases / total_possible_cases * 100) + status = 'āœ…' if success_rate == 100 else 'āŒ' + print(f"{test_name}: {status} ({passed_cases}/{total_possible_cases} cases)") + + aggregated['test_results'][test_name] = { + 'success_rate': success_rate, + 'passed_cases': passed_cases, + 'total_cases': total_possible_cases, + 'success_cases_rate': passed_cases / total_possible_cases, # Add success cases rate + 'avg_duration': mean([m[test_name]['total_duration'] for m in metrics]), + 'avg_tokens_sec': mean([m[test_name]['tokens_per_second'] for m in metrics]) + } + + # Calculate overall success rate across all tests + total_passed = sum(t['passed_cases'] for t in aggregated['test_results'].values()) + total_cases = sum(t['total_cases'] for t in aggregated['test_results'].values()) + aggregated['overall_success_rate'] = (total_passed / total_cases * 100) if total_cases > 0 else 0 + aggregated['overall_success_cases_rate'] = (total_passed / total_cases) if total_cases > 0 else 0 + + return aggregated + +def print_leaderboard(results: List[Dict]): + """Print leaderboard of model results.""" + if not results: + print("No results to display") + return + + # Sort by success rate first, then by tokens per second + sorted_results = sorted(results, key=lambda x: ( + sum(t['passed_cases'] for t in x['test_results'].values()) / sum(t['total_cases'] for t in x['test_results'].values()) if sum(t['total_cases'] for t in x['test_results'].values()) > 0 else 0, + x['tokens_per_second'] + ), reverse=True) + + print(f"\n{HEADER}{BOLD}šŸ† Final Model Leaderboard:{ENDC}") + for i, result in enumerate(sorted_results, 1): + # Calculate stats for each model + total_passed = sum(t['passed_cases'] for t in result['test_results'].values()) + total_cases = sum(t['total_cases'] for t in result['test_results'].values()) + success_rate = (total_passed / total_cases * 100) if total_cases > 0 else 0 + + print(f"\n{BOLD}{YELLOW}{result['model']}{ENDC}") + print(f" {BOLD}Overall Success Rate:{ENDC} {success_rate:.1f}% ({total_passed}/{total_cases} cases)") + print(f" {BOLD}Average Tokens/sec:{ENDC} {result['tokens_per_second']:.2f}") + print(f" {BOLD}Average Duration:{ENDC} {result['total_duration']:.2f}s") + print(f" {BOLD}Test Results:{ENDC}") + for test_name, test_result in result['test_results'].items(): + status = 'āœ…' if test_result['success_rate'] == 100 else 'āŒ' + print(f" - {test_name}: {status} {test_result['passed_cases']}/{test_result['total_cases']} cases ({test_result['success_rate']:.1f}%)") + +def get_available_models(server_url: str) -> List[str]: + """Get list of available models from the specified Ollama server.""" + try: + response = requests.get(f"{server_url}/api/tags").json() + return [model['name'] for model in response['models']] + except Exception as e: + print(f"{RED}Error getting model list from {server_url}: {e}{ENDC}") + return [] + +def get_model_details(model_name): + try: + result = subprocess.run( + ["ollama", "show", model_name], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding='utf-8', + errors='replace' + ) + + if result.returncode != 0: + print(f"Error: {result.stderr.strip()}") + return None + + if not result.stdout.strip(): + print(f"No details available for model: {model_name}") + return None + + raw_output = result.stdout.strip() + lines = raw_output.split('\n') + current_section = None + + for line in lines: + line = line.rstrip() + if line and not line.startswith(' '): # Section headers + current_section = line.strip() + print(f"\n {current_section}") + elif line and current_section: # Section content + # Split by multiple spaces and filter out empty parts + parts = [part for part in line.split(' ') if part.strip()] + if len(parts) >= 2: + key, value = parts[0].strip(), parts[-1].strip() + # Ensure consistent spacing for alignment + print(f" {key:<16} {value}") + elif len(parts) == 1: + # Handle single-value lines (like license text) + print(f" {parts[0].strip()}") + + return None # No need to return formatted details anymore + + except Exception as e: + print(f"An error occurred while getting model details: {e}") + return None + +def update_server_results(server_url: str, results: List[Dict]) -> None: + try: + # Get CPU brand and format it for filename + cpu_info = get_cpu_info() + cpu_brand = cpu_info.get('brand_raw', 'Unknown_CPU').replace(' ', '_') + timestamp = time.strftime("%Y%m%d_%H%M%S") + + # Create a unique filename for this server's results + server_id = server_url.replace('http://', '').replace(':', '_').replace('/', '_') + results_dir = "benchmark_results" + + os.makedirs(results_dir, exist_ok=True) + + # Include CPU brand in filename + filename = os.path.join(results_dir, f"{cpu_brand}_{server_id}.json") + + # Load existing results or create new file + try: + with open(filename, 'r') as f: + existing_data = json.load(f) + except FileNotFoundError: + existing_data = { + 'server_url': server_url, + 'benchmarks': [] + } + + # Add new results with timestamp and ensure overall success rate is included + benchmark_entry = { + 'timestamp': timestamp, + 'results': [] + } + + # Add overall success rate to each model's results + for result in results: + total_passed = sum(t['passed_cases'] for t in result['test_results'].values()) + total_cases = sum(t['total_cases'] for t in result['test_results'].values()) + result['overall_success_rate'] = (total_passed / total_cases * 100) if total_cases > 0 else 0 + benchmark_entry['results'].append(result) + + existing_data['benchmarks'].append(benchmark_entry) + + # Save updated results + with open(filename, 'w') as f: + json.dump(existing_data, f, indent=2) + print(f"{GREEN}Successfully saved results to {filename}{ENDC}") + except Exception as e: + print(f"{RED}Failed to save results: {str(e)}{ENDC}") + +def main(): + parser = argparse.ArgumentParser(description='Run Ollama model benchmarks') + parser.add_argument('--server', choices=['local', 'z60'], default='local', + help='Choose Ollama server (default: local)') + parser.add_argument('--model', type=str, help='Specific model to benchmark') + parser.add_argument('--number', type=str, help='Number of models to benchmark (number or "all")') + parser.add_argument('--verbose', action='store_true', help='Enable verbose output') + args = parser.parse_args() + + server_url = SERVERS[args.server] + + print() + print(f"{HEADER}{BOLD}CPU Information:{ENDC}") + cpu_info = get_cpu_info() + for key, value in cpu_info.items(): + print(f"{MUTED}{key}: {value}{ENDC}") + + print() + print(f"{INFO}Using Ollama server at {server_url}...{ENDC}") + + # Get available models or use specified model + if args.model: + models = [args.model] + else: + models = get_available_models(server_url) + + if not models: + print(f"{RED}No models found on server {server_url}. Exiting.{ENDC}") + return + + # Handle number of models to test + if args.number and args.number.lower() != 'all': + try: + num_models = int(args.number) + if num_models > 0: + models = models[:num_models] + else: + print(f"{WARNING}Invalid number of models. Using all available models.{ENDC}") + except ValueError: + print(f"{WARNING}Invalid number format. Using all available models.{ENDC}") + + print(f"{INFO}Testing {len(models)} models :{ENDC}") + for i, model in enumerate(models, 1): + print(f"{YELLOW}{i}. {model}{ENDC}") + + # Run benchmarks + all_results = [] + + for model in models: + print(f"\n{HEADER}{BOLD}Benchmarking {model}...{ENDC}") + details = get_model_details(model) + if details: + print(f"\n{INFO}Model Details:{ENDC}") + if "details" in details: + for section, items in details["details"].items(): + print(f"\n{BOLD}{section}{ENDC}") + for key, value in items.items(): + print(f" {key}: {value}") + else: + print(json.dumps(details, indent=2)) + result = run_model_benchmark(model, server_url) + if 'error' not in result: + all_results.append(result) + + # Print and save results + print_leaderboard(all_results) + update_server_results(server_url, all_results) + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a69f6ba --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +requests>=2.31.0 +together>=0.2.8 +ollama>=0.1.6 +python-dotenv>=1.0.0 +GPUtil==1.4.0 +py-cpuinfo \ No newline at end of file