Files
local-deep-research/examples/benchmarks/run_resumable_parallel_benchmark.py
LearningCircuit 0c6635ecc2 feat: Add pre-commit hook to enforce pathlib usage (issue #640) (#656)
* feat: Add pre-commit hook to enforce pathlib usage (issue #640)

- Created check-pathlib-usage.py pre-commit hook using AST parsing
- Detects os.path usage and suggests pathlib alternatives
- Fixed os.path.normpath usage in auth/routes.py to use PurePosixPath
- Added hook configuration to .pre-commit-config.yaml

The hook provides helpful suggestions for replacing os.path calls with
their pathlib equivalents for better cross-platform compatibility.

Co-Authored-By: djpetti <djpetti@users.noreply.github.com>

* feat: Add missing pathlib pre-commit hook script

Co-Authored-By: djpetti <djpetti@users.noreply.github.com>

* refactor: Migrate core src modules from os.path to pathlib

- Fixed web/app_factory.py, config/llm_config.py, metrics/token_counter.py
- Fixed utilities/es_utils.py, web/routes/benchmark_routes.py
- Fixed web/routes/settings_routes.py, web_search_engines/engines/search_engine_local.py
- Replaced os.path.join() with Path() / syntax
- Replaced os.path.exists() with Path().exists()
- Replaced os.path.basename() with Path().name
- Replaced os.path.dirname() with Path().parent

Part of the migration to modern pathlib API for better cross-platform
compatibility and cleaner code.

Co-Authored-By: djpetti <djpetti@users.noreply.github.com>

* refactor: Migrate from os.path to pathlib in src and tests (issue #640)

Replaced os.path usage with pathlib.Path throughout:
- src/local_deep_research/benchmarks: All os.path.join, exists, dirname, basename, abspath replaced
- tests directory: Complete migration of all test files
- Improved cross-platform compatibility and code readability
- Kept os.path.expandvars in env_settings.py (no pathlib equivalent)

Part of pre-commit hook enforcement for pathlib usage.
Remaining work: examples/ and scripts/ directories.

Co-Authored-By: djpetti

* fix: Complete migration from os.path to pathlib.Path (issue #640)

Completed manual migration of all os.path usage to pathlib.Path across:
- scripts/ directory (3 files)
- examples/ directory (25 files total)
  - examples/benchmarks/ (8 files)
  - examples/optimization/ (16 files)
  - examples/show_env_vars.py
- src/local_deep_research/settings/env_settings.py

Changes made:
- Replaced os.path.join() with Path() / syntax
- Replaced os.path.exists() with Path().exists()
- Replaced os.path.dirname() with Path().parent
- Replaced os.path.basename() with Path().name or Path().stem
- Replaced os.path.abspath() with Path().resolve()
- Replaced os.makedirs() with Path().mkdir(parents=True, exist_ok=True)
- Added pathlib import where needed

Note: Kept os.path.expandvars in env_settings.py as there is no pathlib
equivalent. Added comment explaining this limitation.

This completes the pathlib migration for issue #640.

Co-Authored-By: djpetti

* fix: Allow os.path.expandvars in pathlib pre-commit hook

Updated the check-pathlib-usage.py pre-commit hook to skip checking
os.path.expandvars since it has no pathlib equivalent.

Changes:
- Added exception for expandvars in both visit_Attribute and visit_Call methods
- Added comment in equivalents dictionary noting expandvars is allowed
- This allows env_settings.py to use os.path.expandvars without failing checks

This resolves the pre-commit CI failure while maintaining the pathlib
enforcement for all other os.path methods.

Co-Authored-By: djpetti

---------

Co-authored-by: djpetti
2025-08-17 22:52:35 +02:00

582 lines
19 KiB
Python

#!/usr/bin/env python
"""
Run SimpleQA and BrowseComp benchmarks in parallel with resume capability.
This script can resume interrupted benchmarks by reading existing results
and continuing from where it left off.
Usage:
# Start new benchmark
pdm run python examples/benchmarks/run_resumable_parallel_benchmark.py
# Resume interrupted benchmark
pdm run python examples/benchmarks/run_resumable_parallel_benchmark.py \
--resume-from benchmark_results/parallel_benchmark_20250513_235221
"""
import argparse
import concurrent.futures
import json
import os
import sys
import time
from datetime import datetime, UTC
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from loguru import logger
from local_deep_research.api import quick_summary
from local_deep_research.benchmarks.datasets import load_dataset
from local_deep_research.benchmarks.graders import (
extract_answer_from_response,
grade_results,
)
from local_deep_research.benchmarks.metrics import (
calculate_metrics,
generate_report,
)
from local_deep_research.benchmarks.runners import format_query
# Add the src directory to the Python path
project_root = str(Path(__file__).parent.parent.parent.resolve())
logger.enable("local_deep_research")
def load_existing_results(results_file: str) -> Dict[str, Dict]:
"""Load existing results from JSONL file."""
results = {}
if Path(results_file).exists():
logger.info(f"Loading existing results from: {results_file}")
with open(results_file, "r") as f:
for line in f:
if line.strip():
try:
result = json.loads(line)
# Use ID field as key
result_id = result.get("id", "")
if result_id:
results[result_id] = result
except json.JSONDecodeError:
logger.warning(
f"Skipping invalid JSON line: {line[:50]}..."
)
logger.info(f"Loaded {len(results)} existing results")
return results
def find_latest_results_file(
output_dir: str, dataset_type: str
) -> Optional[str]:
"""Find the most recent results file for a dataset."""
# First try dataset subdirectory
dataset_dir = str(Path(output_dir) / dataset_type)
if Path(dataset_dir).exists():
pattern = f"{dataset_type}_*_results.jsonl"
files = list(Path(dataset_dir).glob(pattern))
if files:
# Sort by filename (includes timestamp) and return the latest
return str(sorted(files)[-1])
# Then try root directory
pattern = f"{dataset_type}_*_results.jsonl"
files = list(Path(output_dir).glob(pattern))
if files:
return str(sorted(files)[-1])
return None
def run_resumable_benchmark(
dataset_type: str,
num_examples: int,
output_dir: str,
search_config: Dict[str, Any],
evaluation_config: Optional[Dict[str, Any]] = None,
resume_from: Optional[str] = None,
) -> Dict[str, Any]:
"""Run a benchmark with resume capability."""
# Create output directory if needed
os.makedirs(output_dir, exist_ok=True)
# Load dataset
dataset = load_dataset(
dataset_type=dataset_type,
num_examples=num_examples,
seed=None, # Random seed for truly random sampling
)
# Determine output files
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
results_file = str(
Path(output_dir) / f"{dataset_type}_{timestamp}_results.jsonl"
)
evaluation_file = str(
Path(output_dir) / f"{dataset_type}_{timestamp}_evaluation.jsonl"
)
report_file = str(
Path(output_dir) / f"{dataset_type}_{timestamp}_report.md"
)
# Load existing results if resuming
existing_results = {}
if resume_from:
existing_results_file = find_latest_results_file(
resume_from, dataset_type
)
if existing_results_file:
existing_results = load_existing_results(existing_results_file)
logger.info(
f"Found {len(existing_results)} existing results for {dataset_type}"
)
# Process examples
all_results = []
new_results_count = 0
reused_results_count = 0
error_count = 0
for i, example in enumerate(dataset):
# Extract ID and question
example_id = example.get("id", f"example_{i}")
# Extract question and answer based on dataset type
if dataset_type.lower() == "simpleqa":
question = example.get("problem", "")
correct_answer = example.get("answer", "")
else: # browsecomp
question = example.get("problem", "")
correct_answer = example.get("correct_answer", "") or example.get(
"answer", ""
)
# Check if we have existing result
existing_result = existing_results.get(example_id)
if existing_result and existing_result.get("response"):
# Reuse existing result
logger.info(
f"Reusing existing result for example {i + 1}/{len(dataset)}: {example_id}"
)
all_results.append(existing_result)
reused_results_count += 1
# Write to new results file
with open(results_file, "a") as f:
f.write(json.dumps(existing_result) + "\n")
else:
# Process new example
logger.info(
f"Processing new example {i + 1}/{len(dataset)}: {question[:50]}..."
)
try:
# Format query
formatted_query = format_query(question, dataset_type)
# Time the search
start_time = time.time()
# Get response from LDR
search_result = quick_summary(
query=formatted_query,
iterations=search_config.get("iterations", 3),
questions_per_iteration=search_config.get(
"questions_per_iteration", 3
),
search_tool=search_config.get("search_tool", "searxng"),
search_strategy=search_config.get(
"search_strategy", "source_based"
),
)
processing_time = time.time() - start_time
# Extract response
response = search_result.get("summary", "")
extracted = extract_answer_from_response(response, dataset_type)
# Create result
result = {
"id": example_id,
"problem": question,
"correct_answer": correct_answer,
"response": response,
"extracted_answer": extracted["extracted_answer"],
"confidence": extracted["confidence"],
"processing_time": processing_time,
"sources": search_result.get("sources", []),
"search_config": search_config,
}
all_results.append(result)
new_results_count += 1
# Write to file immediately
with open(results_file, "a") as f:
f.write(json.dumps(result) + "\n")
except Exception as e:
logger.exception("Error processing example")
error_count += 1
# Create error result
error_result = {
"id": example_id,
"problem": question,
"correct_answer": correct_answer,
"error": str(e),
"processing_time": 0,
}
all_results.append(error_result)
new_results_count += 1
# Write error result
with open(results_file, "a") as f:
f.write(json.dumps(error_result) + "\n")
logger.info(
f"Completed {dataset_type}: {new_results_count} new, {reused_results_count} reused, {error_count} errors"
)
# Run evaluation on all results
logger.info(f"Running evaluation for {dataset_type}")
try:
evaluation_results = grade_results(
results_file=results_file,
output_file=evaluation_file,
dataset_type=dataset_type,
evaluation_config=evaluation_config,
)
logger.info(
f"Evaluation results for {dataset_type}: {evaluation_results}"
)
# Calculate metrics
metrics = calculate_metrics(evaluation_file)
logger.info(f"Metrics for {dataset_type}: {metrics}")
# Generate report
generate_report(metrics, evaluation_file, report_file, dataset_type)
return {
"accuracy": metrics.get("accuracy", 0),
"metrics": metrics,
"new_results": new_results_count,
"reused_results": reused_results_count,
"total_results": len(all_results),
"errors": error_count,
}
except Exception as e:
logger.exception("Error during evaluation")
return {
"accuracy": 0,
"metrics": {},
"new_results": new_results_count,
"reused_results": reused_results_count,
"total_results": len(all_results),
"errors": error_count,
"evaluation_error": str(e),
}
def run_simpleqa_benchmark_wrapper(args: Tuple) -> Dict[str, Any]:
"""Wrapper for running SimpleQA benchmark in parallel."""
num_examples, output_dir, resume_from, search_config, evaluation_config = (
args
)
logger.info(f"Starting SimpleQA benchmark with {num_examples} examples")
start_time = time.time()
results = run_resumable_benchmark(
dataset_type="simpleqa",
num_examples=num_examples,
output_dir=str(Path(output_dir) / "simpleqa"),
search_config=search_config,
evaluation_config=evaluation_config,
resume_from=resume_from,
)
duration = time.time() - start_time
logger.info(f"SimpleQA benchmark completed in {duration:.1f} seconds")
return results
def run_browsecomp_benchmark_wrapper(args: Tuple) -> Dict[str, Any]:
"""Wrapper for running BrowseComp benchmark in parallel."""
num_examples, output_dir, resume_from, search_config, evaluation_config = (
args
)
logger.info(f"Starting BrowseComp benchmark with {num_examples} examples")
start_time = time.time()
# BrowseComp needs more iterations
browsecomp_config = {**search_config, "iterations": 3}
results = run_resumable_benchmark(
dataset_type="browsecomp",
num_examples=num_examples,
output_dir=str(Path(output_dir) / "browsecomp"),
search_config=browsecomp_config,
evaluation_config=evaluation_config,
resume_from=resume_from,
)
duration = time.time() - start_time
logger.info(f"BrowseComp benchmark completed in {duration:.1f} seconds")
return results
def setup_llm_environment(
model=None, provider=None, endpoint_url=None, api_key=None
):
"""Set up environment variables for LLM configuration."""
if model:
os.environ["LDR_LLM_MODEL"] = model
logger.info(f"Using LLM model: {model}")
if provider:
os.environ["LDR_LLM_PROVIDER"] = provider
logger.info(f"Using LLM provider: {provider}")
if endpoint_url:
os.environ["OPENAI_ENDPOINT_URL"] = endpoint_url
os.environ["LDR_LLM_OPENAI_ENDPOINT_URL"] = endpoint_url
os.environ["LDR_LLM_OLLAMA_URL"] = endpoint_url
logger.info(f"Using endpoint URL: {endpoint_url}")
if api_key:
# Set the appropriate environment variable based on provider
if provider == "openai":
os.environ["OPENAI_API_KEY"] = api_key
os.environ["LDR_LLM_OPENAI_API_KEY"] = api_key
elif provider == "openai_endpoint":
os.environ["OPENAI_ENDPOINT_API_KEY"] = api_key
os.environ["LDR_LLM_OPENAI_ENDPOINT_API_KEY"] = api_key
elif provider == "anthropic":
os.environ["ANTHROPIC_API_KEY"] = api_key
os.environ["LDR_LLM_ANTHROPIC_API_KEY"] = api_key
logger.info("API key configured")
def main():
parser = argparse.ArgumentParser(
description="Run SimpleQA and BrowseComp benchmarks in parallel with resume capability"
)
parser.add_argument(
"--examples",
type=int,
default=20,
help="Number of examples for each benchmark (default: 20)",
)
parser.add_argument(
"--resume-from",
help="Path to previous benchmark results directory to resume from",
)
# LLM configuration options
parser.add_argument(
"--model",
help="Model name for the LLM (e.g., 'google/gemini-2.0-flash-001')",
)
parser.add_argument(
"--provider",
help="Provider for the LLM (e.g., 'anthropic', 'openai', 'openai_endpoint')",
)
parser.add_argument(
"--endpoint-url",
help="Custom endpoint URL (e.g., 'https://openrouter.ai/api/v1')",
)
parser.add_argument("--api-key", help="API key for the LLM provider")
parser.add_argument(
"--datasets",
choices=["simpleqa", "browsecomp", "both"],
default="both",
help="Which datasets to run (default: both)",
)
args = parser.parse_args()
# Determine output directory
if args.resume_from:
# Create new directory but link to old results
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
output_dir = str(
Path(project_root)
/ "benchmark_results"
/ f"resumed_benchmark_{timestamp}"
)
Path(output_dir).mkdir(parents=True, exist_ok=True)
logger.info(
f"Resuming from {args.resume_from}, new results in {output_dir}"
)
else:
# Create new timestamp directory
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
output_dir = str(
Path(project_root)
/ "benchmark_results"
/ f"parallel_benchmark_{timestamp}"
)
os.makedirs(output_dir, exist_ok=True)
logger.info(f"Starting new benchmark in: {output_dir}")
# Display start information
print(f"Starting parallel benchmarks with {args.examples} examples each")
print(f"Results will be saved to: {output_dir}")
if args.resume_from:
print(f"Resuming from previous run: {args.resume_from}")
# Set up LLM environment if specified
setup_llm_environment(
model=args.model,
provider=args.provider,
endpoint_url=args.endpoint_url,
api_key=args.api_key,
)
# Set up configurations
search_config = {
"iterations": 8, # Same as original 96% benchmark
"questions_per_iteration": 5, # Same as original 96% benchmark
"search_tool": "searxng",
"search_strategy": "focused_iteration", # Same as original 96% benchmark
# performance
}
# Add model configurations if provided
if args.model:
search_config["model_name"] = args.model
if args.provider:
search_config["provider"] = args.provider
if args.endpoint_url:
search_config["openai_endpoint_url"] = args.endpoint_url
evaluation_config = {
"provider": "ANTHROPIC",
"model_name": "claude-3-7-sonnet-20250219",
"temperature": 0,
}
# Start time for total execution
total_start_time = time.time()
# Run benchmarks based on user selection
futures = []
if args.datasets in ["simpleqa", "both"]:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
simpleqa_future = executor.submit(
run_simpleqa_benchmark_wrapper,
(
args.examples,
output_dir,
args.resume_from,
search_config,
evaluation_config,
),
)
futures.append(("simpleqa", simpleqa_future))
if args.datasets in ["browsecomp", "both"]:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
browsecomp_future = executor.submit(
run_browsecomp_benchmark_wrapper,
(
args.examples,
output_dir,
args.resume_from,
search_config,
evaluation_config,
),
)
futures.append(("browsecomp", browsecomp_future))
# Get results from completed futures
simpleqa_results = None
browsecomp_results = None
for dataset_name, future in futures:
try:
result = future.result()
if dataset_name == "simpleqa":
simpleqa_results = result
print(
f"SimpleQA benchmark completed: {result['new_results']} new, {result['reused_results']} reused"
)
elif dataset_name == "browsecomp":
browsecomp_results = result
print(
f"BrowseComp benchmark completed: {result['new_results']} new, {result['reused_results']} reused"
)
except Exception:
logger.exception("Error in benchmark")
# Calculate total time
total_duration = time.time() - total_start_time
# Print summary
print("\n" + "=" * 50)
print(" PARALLEL BENCHMARK SUMMARY ")
print("=" * 50)
print(f"Total duration: {total_duration:.1f} seconds")
print(f"Examples per benchmark: {args.examples}")
if args.resume_from:
print(f"Resumed from: {args.resume_from}")
if simpleqa_results:
print("\nSimpleQA:")
print(f" - Accuracy: {simpleqa_results.get('accuracy', 'N/A')}")
print(f" - New results: {simpleqa_results['new_results']}")
print(f" - Reused results: {simpleqa_results['reused_results']}")
print(f" - Errors: {simpleqa_results.get('errors', 0)}")
else:
print("\nSimpleQA: Failed or no results")
if browsecomp_results:
print("\nBrowseComp:")
print(f" - Accuracy: {browsecomp_results.get('accuracy', 'N/A')}")
print(f" - New results: {browsecomp_results['new_results']}")
print(f" - Reused results: {browsecomp_results['reused_results']}")
print(f" - Errors: {browsecomp_results.get('errors', 0)}")
else:
print("\nBrowseComp: Failed or no results")
print(f"\nResults saved to: {output_dir}")
print("=" * 50)
# Save summary
try:
summary = {
"timestamp": timestamp,
"examples_per_benchmark": args.examples,
"total_duration": total_duration,
"resumed_from": args.resume_from,
"simpleqa": simpleqa_results,
"browsecomp": browsecomp_results,
"model": args.model,
"provider": args.provider,
}
with open(
Path(output_dir) / "parallel_benchmark_summary.json", "w"
) as f:
json.dump(summary, f, indent=2)
except Exception:
logger.exception("Error saving summary")
return 0
if __name__ == "__main__":
sys.exit(main())