mirror of
https://github.com/LearningCircuit/local-deep-research.git
synced 2026-06-15 19:46:56 +03:00
test: delete 5 ui_tests/test_uuid* / test_trace_error / test_mixed_id script tests (#4274)
PUNCHLIST Tier 1 flagged all 5 of these files as either REAL_NETWORK or NO_ASSERT diagnostic scripts: - tests/ui_tests/test_uuid_research.py — Requires live server at http://127.0.0.1:5000 plus real OLLAMA+searxng. No assertions, returns True/False. FLAKY/REAL_NETWORK. - tests/ui_tests/test_uuid_fresh_db.py — Same dependencies; also requires live Ollama with llama3.2:3b model. No assertions. - tests/ui_tests/test_trace_error.py — Only prints status codes and response data; no assertions. Hard-coded dependency on 'test_uuid_fresh_bew2zgek' user existing in auth DB. - tests/ui_tests/test_mixed_id_handling.py — No assertions, only prints. Requires live server plus pre-existing 'testuser' and 'test_uuid_fresh_bew2zgek' users. - tests/ui_tests/test_direct_uuid_insert.py — Top-level function (not pytest test) that returns True/False; uses print() and bare returns without assertions. Hard dependency on pre-existing 'test_uuid_fresh' user. These were never pytest tests. They're diagnostic scripts for debugging UUID/auth-DB issues, run manually by developers. Deleting removes them from automated test discovery; anyone who needs the diagnostic flow can re-create it or run `gh search` for the historical version.
This commit is contained in:
@@ -1,126 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test direct database insertion with UUID
|
||||
"""
|
||||
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# Allow unencrypted databases for testing
|
||||
os.environ["LDR_BOOTSTRAP_ALLOW_UNENCRYPTED"] = "true"
|
||||
|
||||
import sys
|
||||
|
||||
sys.path.insert(
|
||||
0,
|
||||
str(Path(__file__).parent.parent.parent.resolve()),
|
||||
)
|
||||
|
||||
from sqlalchemy import inspect
|
||||
|
||||
from local_deep_research.database.auth_db import get_auth_db_session
|
||||
from local_deep_research.database.encrypted_db import db_manager
|
||||
from local_deep_research.database.models.auth import User
|
||||
from local_deep_research.database.models.research import ResearchHistory
|
||||
|
||||
|
||||
def test_direct_uuid_insertion():
|
||||
"""Test inserting research with UUID directly into database"""
|
||||
|
||||
# Get the most recent test user
|
||||
auth_db = get_auth_db_session()
|
||||
test_user = (
|
||||
auth_db.query(User)
|
||||
.filter(User.username.like("test_uuid_fresh_%"))
|
||||
.order_by(User.created_at.desc())
|
||||
.first()
|
||||
)
|
||||
auth_db.close()
|
||||
|
||||
if not test_user:
|
||||
print("No test user found")
|
||||
return False
|
||||
|
||||
username = test_user.username
|
||||
print(f"Using user: {username}")
|
||||
|
||||
# Open their database
|
||||
engine = db_manager.open_user_database(username, "T3st!Secure#2024$LDR")
|
||||
if not engine:
|
||||
print("Failed to open database")
|
||||
return False
|
||||
|
||||
# Check table schema
|
||||
inspector = inspect(engine)
|
||||
columns = inspector.get_columns("research_history")
|
||||
print("\nTable schema:")
|
||||
for col in columns:
|
||||
print(f" {col['name']}: {col['type']}")
|
||||
|
||||
# Get session factory
|
||||
Session = db_manager.Session
|
||||
|
||||
with Session() as session:
|
||||
try:
|
||||
# Create a new research with UUID
|
||||
research_id = str(uuid.uuid4())
|
||||
print(f"\nCreating research with UUID: {research_id}")
|
||||
|
||||
research = ResearchHistory(
|
||||
id=research_id,
|
||||
query="Direct UUID test",
|
||||
mode="quick",
|
||||
status="in_progress",
|
||||
created_at=datetime.utcnow().isoformat(),
|
||||
progress_log=[
|
||||
{"time": datetime.utcnow().isoformat(), "progress": 0}
|
||||
],
|
||||
research_meta={"test": "direct_uuid"},
|
||||
)
|
||||
|
||||
session.add(research)
|
||||
session.commit()
|
||||
|
||||
print("✅ Successfully created research with UUID!")
|
||||
|
||||
# Verify it was saved
|
||||
saved_research = (
|
||||
session.query(ResearchHistory).filter_by(id=research_id).first()
|
||||
)
|
||||
if saved_research:
|
||||
print("\nVerified in database:")
|
||||
print(f" ID: {saved_research.id}")
|
||||
print(f" Type: {type(saved_research.id).__name__}")
|
||||
print(f" Query: {saved_research.query}")
|
||||
|
||||
# List all research entries
|
||||
all_research = session.query(ResearchHistory).all()
|
||||
print(f"\nTotal research entries: {len(all_research)}")
|
||||
for r in all_research:
|
||||
print(f" - {r.id} ({type(r.id).__name__}): {r.query}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error creating research: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
session.rollback()
|
||||
return False
|
||||
|
||||
db_manager.close_user_database(username)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("=" * 60)
|
||||
print("Direct UUID Insertion Test")
|
||||
print("=" * 60)
|
||||
|
||||
success = test_direct_uuid_insertion()
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("✅ Test passed" if success else "❌ Test failed")
|
||||
print("=" * 60)
|
||||
@@ -1,130 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test handling of mixed integer and UUID IDs
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
os.environ["LDR_BOOTSTRAP_ALLOW_UNENCRYPTED"] = "true"
|
||||
|
||||
import sys
|
||||
|
||||
sys.path.insert(
|
||||
0,
|
||||
str(Path(__file__).parent.parent.parent.resolve()),
|
||||
)
|
||||
|
||||
import requests
|
||||
|
||||
from local_deep_research.database.encrypted_db import db_manager
|
||||
from local_deep_research.database.models.research import ResearchHistory
|
||||
|
||||
|
||||
def test_mixed_id_handling():
|
||||
"""Test that the API can handle both integer and UUID IDs"""
|
||||
|
||||
print("Testing Mixed ID Handling")
|
||||
print("=" * 60)
|
||||
|
||||
# Test different users
|
||||
test_cases = [
|
||||
("testuser", "T3st!Secure#2024$LDR", "User with integer IDs"),
|
||||
(
|
||||
"test_uuid_fresh_bew2zgek",
|
||||
"T3st!Secure#2024$LDR",
|
||||
"User with UUID IDs",
|
||||
),
|
||||
]
|
||||
|
||||
for username, password, description in test_cases:
|
||||
print(f"\n{description}: {username}")
|
||||
print("-" * 40)
|
||||
|
||||
# Login
|
||||
session = requests.Session()
|
||||
resp = session.get("http://127.0.0.1:5000/auth/login")
|
||||
csrf = None
|
||||
for line in resp.text.split("\n"):
|
||||
if "csrf_token" in line and "value=" in line:
|
||||
start = line.find('value="') + 7
|
||||
end = line.find('"', start)
|
||||
csrf = line[start:end]
|
||||
break
|
||||
|
||||
login_data = {
|
||||
"username": username,
|
||||
"password": password,
|
||||
"csrf_token": csrf,
|
||||
}
|
||||
|
||||
login_resp = session.post(
|
||||
"http://127.0.0.1:5000/auth/login", data=login_data
|
||||
)
|
||||
print(f"Login: {login_resp.status_code}")
|
||||
|
||||
if login_resp.status_code != 200:
|
||||
print("Login failed, skipping...")
|
||||
continue
|
||||
|
||||
# Test history endpoint
|
||||
print("\nTesting /api/history:")
|
||||
hist = session.get("http://127.0.0.1:5000/api/history")
|
||||
print(f" Status: {hist.status_code}")
|
||||
|
||||
if hist.status_code == 200:
|
||||
data = hist.json()
|
||||
items = data.get("items", [])
|
||||
print(f" Items: {len(items)}")
|
||||
if items:
|
||||
first_id = items[0].get("id")
|
||||
print(
|
||||
f" First ID: {first_id} (type: {type(first_id).__name__})"
|
||||
)
|
||||
else:
|
||||
print(f" Error: {hist.text[:200]}...")
|
||||
|
||||
# Check database directly
|
||||
print("\nChecking database directly:")
|
||||
try:
|
||||
engine = db_manager.open_user_database(username, password)
|
||||
if engine:
|
||||
session_db = db_manager.get_session(username)
|
||||
|
||||
# Get first research entry
|
||||
first_research = session_db.query(ResearchHistory).first()
|
||||
if first_research:
|
||||
print(
|
||||
f" First research ID: {first_research.id} (type: {type(first_research.id).__name__})"
|
||||
)
|
||||
|
||||
# Test specific endpoint
|
||||
research_id = first_research.id
|
||||
print(f"\nTesting /api/research/{research_id}:")
|
||||
|
||||
detail_resp = session.get(
|
||||
f"http://127.0.0.1:5000/api/research/{research_id}"
|
||||
)
|
||||
print(f" Status: {detail_resp.status_code}")
|
||||
|
||||
if detail_resp.status_code == 200:
|
||||
detail_data = detail_resp.json()
|
||||
print(f" ID in response: {detail_data.get('id')}")
|
||||
else:
|
||||
print(f" Error: {detail_resp.text[:200]}...")
|
||||
|
||||
# Test status endpoint
|
||||
print(f"\nTesting /api/research/{research_id}/status:")
|
||||
status_resp = session.get(
|
||||
f"http://127.0.0.1:5000/api/research/{research_id}/status"
|
||||
)
|
||||
print(f" Status: {status_resp.status_code}")
|
||||
|
||||
session_db.close()
|
||||
db_manager.close_user_database(username)
|
||||
except Exception as e:
|
||||
print(f" Database error: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_mixed_id_handling()
|
||||
@@ -1,73 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Trace the exact error when accessing history for UUID user
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
os.environ["LDR_BOOTSTRAP_ALLOW_UNENCRYPTED"] = "true"
|
||||
|
||||
import sys
|
||||
|
||||
sys.path.insert(
|
||||
0,
|
||||
str(Path(__file__).parent.parent.parent.resolve()),
|
||||
)
|
||||
|
||||
from local_deep_research.web.app_factory import create_app
|
||||
|
||||
|
||||
def test_history_error():
|
||||
"""Test to trace the exact error"""
|
||||
|
||||
# Create the Flask app
|
||||
app, socket_service = create_app()
|
||||
|
||||
with app.test_client() as client:
|
||||
# Login as user with UUID tables
|
||||
login_resp = client.get("/auth/login")
|
||||
csrf_token = None
|
||||
for line in login_resp.data.decode().split("\n"):
|
||||
if "csrf_token" in line and "value=" in line:
|
||||
start = line.find('value="') + 7
|
||||
end = line.find('"', start)
|
||||
csrf_token = line[start:end]
|
||||
break
|
||||
|
||||
login_data = {
|
||||
"username": "test_uuid_fresh_bew2zgek",
|
||||
"password": "T3st!Secure#2024$LDR",
|
||||
"csrf_token": csrf_token,
|
||||
}
|
||||
|
||||
resp = client.post(
|
||||
"/auth/login", data=login_data, follow_redirects=True
|
||||
)
|
||||
print(f"Login status: {resp.status_code}")
|
||||
|
||||
# Try to access history
|
||||
print("\nAccessing /api/history...")
|
||||
hist_resp = client.get("/api/history")
|
||||
print(f"Status: {hist_resp.status_code}")
|
||||
|
||||
if hist_resp.status_code != 200:
|
||||
print(f"Error response: {hist_resp.data.decode()}")
|
||||
|
||||
# Also try the /history/api endpoint
|
||||
print("\nAccessing /history/api...")
|
||||
hist2_resp = client.get("/history/api")
|
||||
print(f"Status: {hist2_resp.status_code}")
|
||||
|
||||
if hist2_resp.status_code == 200:
|
||||
import json
|
||||
|
||||
data = json.loads(hist2_resp.data)
|
||||
print(f"Success! Got {len(data.get('items', []))} items")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Tracing History Error")
|
||||
print("=" * 60)
|
||||
|
||||
test_history_error()
|
||||
@@ -1,219 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test UUID research with a completely fresh database by dropping existing tables.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
|
||||
# Allow unencrypted databases for testing
|
||||
os.environ["LDR_BOOTSTRAP_ALLOW_UNENCRYPTED"] = "true"
|
||||
|
||||
# Import after setting environment
|
||||
import sys
|
||||
|
||||
sys.path.insert(
|
||||
0,
|
||||
str(Path(__file__).parent.parent.parent.resolve()),
|
||||
)
|
||||
|
||||
from sqlalchemy import inspect, text
|
||||
|
||||
from local_deep_research.database.auth_db import get_auth_db_session
|
||||
from local_deep_research.database.encrypted_db import db_manager
|
||||
from local_deep_research.database.models import ResearchHistory
|
||||
from local_deep_research.database.models.auth import User
|
||||
|
||||
# Base URL for the application
|
||||
BASE_URL = "http://127.0.0.1:5000"
|
||||
|
||||
|
||||
def drop_and_recreate_research_tables(username):
|
||||
"""Drop and recreate research tables to ensure UUID schema"""
|
||||
print(f"\nDropping and recreating tables for user {username}...")
|
||||
|
||||
engine = db_manager.connections.get(username)
|
||||
if not engine:
|
||||
print("No engine found for user")
|
||||
return False
|
||||
|
||||
try:
|
||||
# Drop the research_history table if it exists
|
||||
with engine.connect() as conn:
|
||||
# Drop dependent tables first
|
||||
conn.execute(text("DROP TABLE IF EXISTS research_resources"))
|
||||
conn.execute(text("DROP TABLE IF EXISTS research_history"))
|
||||
conn.commit()
|
||||
|
||||
print("Dropped existing research tables")
|
||||
|
||||
# Recreate tables with correct schema
|
||||
ResearchHistory.__table__.create(engine)
|
||||
print("Created research_history table with UUID schema")
|
||||
|
||||
# Verify the schema
|
||||
inspector = inspect(engine)
|
||||
columns = inspector.get_columns("research_history")
|
||||
for col in columns:
|
||||
if col["name"] == "id":
|
||||
print(f"ID column type: {col['type']}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error recreating tables: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def test_uuid_with_fresh_schema():
|
||||
"""Test research creation with fresh UUID schema"""
|
||||
|
||||
# Create a session for maintaining cookies
|
||||
session = requests.Session()
|
||||
|
||||
# Generate random user credentials
|
||||
username = f"test_uuid_fresh_{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"
|
||||
password = "T3st!Secure#2024$LDR"
|
||||
|
||||
print(f"Testing with new user: {username}")
|
||||
|
||||
# Step 1: Register new user
|
||||
print("\n1. Registering new user...")
|
||||
|
||||
# Get registration page for CSRF token
|
||||
reg_page = session.get(f"{BASE_URL}/auth/register")
|
||||
csrf_token = None
|
||||
|
||||
# Extract CSRF token
|
||||
for line in reg_page.text.split("\n"):
|
||||
if 'name="csrf_token"' in line and "value=" in line:
|
||||
start = line.find('value="') + 7
|
||||
end = line.find('"', start)
|
||||
csrf_token = line[start:end]
|
||||
break
|
||||
|
||||
# Register the user
|
||||
reg_data = {
|
||||
"username": username,
|
||||
"password": password,
|
||||
"confirm_password": password,
|
||||
"acknowledge": "true",
|
||||
"csrf_token": csrf_token,
|
||||
}
|
||||
|
||||
reg_response = session.post(f"{BASE_URL}/auth/register", data=reg_data)
|
||||
print(f"Registration status: {reg_response.status_code}")
|
||||
|
||||
if reg_response.status_code != 200:
|
||||
print("Registration failed")
|
||||
return False
|
||||
|
||||
# Step 2: Drop and recreate tables with UUID schema
|
||||
# First need to get the database connection
|
||||
auth_db = get_auth_db_session()
|
||||
user = auth_db.query(User).filter_by(username=username).first()
|
||||
auth_db.close()
|
||||
|
||||
if user:
|
||||
# Open the database to establish connection
|
||||
engine = db_manager.open_user_database(username, password)
|
||||
if engine:
|
||||
# Now drop and recreate tables
|
||||
drop_and_recreate_research_tables(username)
|
||||
|
||||
# Step 3: Submit a research request
|
||||
print("\n3. Submitting research request...")
|
||||
|
||||
# Get fresh CSRF token
|
||||
home_page = session.get(f"{BASE_URL}/")
|
||||
csrf_token = None
|
||||
|
||||
for line in home_page.text.split("\n"):
|
||||
if 'name="csrf_token"' in line and "value=" in line:
|
||||
start = line.find('value="') + 7
|
||||
end = line.find('"', start)
|
||||
csrf_token = line[start:end]
|
||||
break
|
||||
|
||||
# Prepare research request
|
||||
research_data = {
|
||||
"query": f"Test UUID research for {username}",
|
||||
"mode": "quick",
|
||||
"model_provider": "OLLAMA",
|
||||
"model": "llama3.2:3b",
|
||||
"search_engine": "searxng",
|
||||
"iterations": 1,
|
||||
"questions_per_iteration": 2,
|
||||
}
|
||||
|
||||
headers = {"Content-Type": "application/json", "X-CSRF-Token": csrf_token}
|
||||
|
||||
research_response = session.post(
|
||||
f"{BASE_URL}/api/start_research", json=research_data, headers=headers
|
||||
)
|
||||
|
||||
print(f"Research submission status: {research_response.status_code}")
|
||||
|
||||
if research_response.status_code == 200:
|
||||
result = research_response.json()
|
||||
print(f"Research response: {json.dumps(result, indent=2)}")
|
||||
|
||||
research_id = result.get("research_id")
|
||||
print(f"\nResearch ID: {research_id}")
|
||||
print(f"Research ID type: {type(research_id)}")
|
||||
|
||||
# Check if it's a UUID format
|
||||
if (
|
||||
isinstance(research_id, str)
|
||||
and len(research_id) == 36
|
||||
and research_id.count("-") == 4
|
||||
):
|
||||
print("✅ Research ID is in UUID format!")
|
||||
else:
|
||||
print("❌ Research ID is NOT in UUID format!")
|
||||
|
||||
# Step 4: Verify in database
|
||||
print("\n4. Verifying in database...")
|
||||
|
||||
db_session = db_manager.get_session(username)
|
||||
if db_session:
|
||||
research = (
|
||||
db_session.query(ResearchHistory)
|
||||
.filter_by(id=research_id)
|
||||
.first()
|
||||
)
|
||||
if research:
|
||||
print("Found research in DB:")
|
||||
print(
|
||||
f" ID: {research.id} (type: {type(research.id).__name__})"
|
||||
)
|
||||
print(f" Query: {research.query}")
|
||||
print(f" Status: {research.status}")
|
||||
else:
|
||||
print("Research not found in database")
|
||||
db_session.close()
|
||||
|
||||
# Clean up
|
||||
db_manager.close_user_database(username)
|
||||
|
||||
return True
|
||||
|
||||
print(f"Research submission failed: {research_response.text}")
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("=" * 60)
|
||||
print("UUID Fresh Database Test")
|
||||
print("=" * 60)
|
||||
|
||||
success = test_uuid_with_fresh_schema()
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("✅ Test completed" if success else "❌ Test failed")
|
||||
print("=" * 60)
|
||||
@@ -1,235 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test UUID research functionality with a fresh user database.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
|
||||
import requests
|
||||
|
||||
# Allow unencrypted databases for testing
|
||||
os.environ["LDR_BOOTSTRAP_ALLOW_UNENCRYPTED"] = "true"
|
||||
|
||||
# Base URL for the application
|
||||
BASE_URL = "http://127.0.0.1:5000"
|
||||
|
||||
|
||||
# Generate random username for fresh database
|
||||
def generate_random_username():
|
||||
return f"test_uuid_{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"
|
||||
|
||||
|
||||
def test_uuid_research():
|
||||
"""Test research creation with UUID primary keys"""
|
||||
|
||||
# Create a session for maintaining cookies
|
||||
session = requests.Session()
|
||||
|
||||
# Generate random user credentials
|
||||
username = generate_random_username()
|
||||
password = "T3st!Secure#2024$LDR" # pragma: allowlist secret
|
||||
|
||||
print(f"Testing with new user: {username}")
|
||||
|
||||
# Step 1: Register new user (creates fresh database)
|
||||
print("\n1. Registering new user...")
|
||||
|
||||
# Get registration page for CSRF token
|
||||
reg_page = session.get(f"{BASE_URL}/auth/register")
|
||||
csrf_token = None
|
||||
|
||||
# Extract CSRF token
|
||||
for line in reg_page.text.split("\n"):
|
||||
if 'name="csrf_token"' in line and "value=" in line:
|
||||
start = line.find('value="') + 7
|
||||
end = line.find('"', start)
|
||||
csrf_token = line[start:end]
|
||||
break
|
||||
|
||||
if not csrf_token:
|
||||
print("Failed to get CSRF token")
|
||||
return False
|
||||
|
||||
# Register the user
|
||||
reg_data = {
|
||||
"username": username,
|
||||
"password": password,
|
||||
"confirm_password": password,
|
||||
"acknowledge": "true",
|
||||
"csrf_token": csrf_token,
|
||||
}
|
||||
|
||||
reg_response = session.post(f"{BASE_URL}/auth/register", data=reg_data)
|
||||
print(f"Registration status: {reg_response.status_code}")
|
||||
|
||||
if reg_response.status_code != 200:
|
||||
print(f"Registration failed: {reg_response.text}")
|
||||
return False
|
||||
|
||||
# Step 2: Submit a research request
|
||||
print("\n2. Submitting research request...")
|
||||
|
||||
# Get fresh CSRF token for API request
|
||||
home_page = session.get(f"{BASE_URL}/")
|
||||
csrf_token = None
|
||||
|
||||
for line in home_page.text.split("\n"):
|
||||
if 'name="csrf_token"' in line and "value=" in line:
|
||||
start = line.find('value="') + 7
|
||||
end = line.find('"', start)
|
||||
csrf_token = line[start:end]
|
||||
break
|
||||
|
||||
# Prepare research request
|
||||
research_data = {
|
||||
"query": f"Test UUID research for {username}",
|
||||
"mode": "quick",
|
||||
"model_provider": "OLLAMA",
|
||||
"model": "llama3.2:3b",
|
||||
"search_engine": "searxng",
|
||||
"iterations": 1,
|
||||
"questions_per_iteration": 2,
|
||||
}
|
||||
|
||||
headers = {"Content-Type": "application/json", "X-CSRF-Token": csrf_token}
|
||||
|
||||
research_response = session.post(
|
||||
f"{BASE_URL}/api/start_research", json=research_data, headers=headers
|
||||
)
|
||||
|
||||
print(f"Research submission status: {research_response.status_code}")
|
||||
|
||||
if research_response.status_code == 200:
|
||||
result = research_response.json()
|
||||
print(f"Research response: {json.dumps(result, indent=2)}")
|
||||
|
||||
research_id = result.get("research_id")
|
||||
print(f"\nResearch ID: {research_id}")
|
||||
print(f"Research ID type: {type(research_id)}")
|
||||
|
||||
# Check if it's a UUID format (should be a string with dashes)
|
||||
if (
|
||||
isinstance(research_id, str)
|
||||
and len(research_id) == 36
|
||||
and research_id.count("-") == 4
|
||||
):
|
||||
print("✅ Research ID is in UUID format!")
|
||||
else:
|
||||
print("❌ Research ID is NOT in UUID format!")
|
||||
|
||||
# Step 3: Check research status
|
||||
print("\n3. Checking research status...")
|
||||
|
||||
status_response = session.get(
|
||||
f"{BASE_URL}/api/research/{research_id}/status"
|
||||
)
|
||||
print(f"Status check response: {status_response.status_code}")
|
||||
|
||||
if status_response.status_code == 200:
|
||||
status_data = status_response.json()
|
||||
print(f"Research status: {status_data.get('status')}")
|
||||
|
||||
# Step 4: List research history
|
||||
print("\n4. Checking research history...")
|
||||
|
||||
history_response = session.get(f"{BASE_URL}/api/history")
|
||||
print(f"History response: {history_response.status_code}")
|
||||
|
||||
if history_response.status_code == 200:
|
||||
history_data = history_response.json()
|
||||
items = history_data.get("items", [])
|
||||
print(f"Found {len(items)} research items")
|
||||
|
||||
for item in items[:3]: # Show first 3
|
||||
item_id = item.get("id")
|
||||
print(f" - ID: {item_id} (type: {type(item_id).__name__})")
|
||||
print(f" Query: {item.get('query')}")
|
||||
print(f" Status: {item.get('status')}")
|
||||
|
||||
return True
|
||||
|
||||
print(f"Research submission failed: {research_response.text}")
|
||||
return False
|
||||
|
||||
|
||||
# Now let's also check the database directly
|
||||
def check_database_directly():
|
||||
"""Check the database schema directly"""
|
||||
print("\n5. Checking database schema directly...")
|
||||
|
||||
try:
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(
|
||||
0,
|
||||
str(Path(__file__).parent.parent.parent.resolve()),
|
||||
)
|
||||
|
||||
from sqlalchemy import inspect
|
||||
|
||||
from local_deep_research.database.auth_db import get_auth_db_session
|
||||
from local_deep_research.database.encrypted_db import db_manager
|
||||
from local_deep_research.database.models.auth import User
|
||||
from local_deep_research.database.models.research import (
|
||||
ResearchHistory,
|
||||
)
|
||||
|
||||
# Get the last created user
|
||||
auth_db = get_auth_db_session()
|
||||
latest_user = (
|
||||
auth_db.query(User).order_by(User.created_at.desc()).first()
|
||||
)
|
||||
auth_db.close()
|
||||
|
||||
if latest_user:
|
||||
print(f"\nChecking database for user: {latest_user.username}")
|
||||
|
||||
# Open their database
|
||||
engine = db_manager.open_user_database(
|
||||
latest_user.username, "T3st!Secure#2024$LDR"
|
||||
)
|
||||
if engine:
|
||||
# Inspect the research_history table schema
|
||||
inspector = inspect(engine)
|
||||
columns = inspector.get_columns("research_history")
|
||||
|
||||
print("\nresearch_history table schema:")
|
||||
for col in columns:
|
||||
if col["name"] == "id":
|
||||
print(
|
||||
f" - {col['name']}: {col['type']} (primary_key: {col.get('primary_key', False)})"
|
||||
)
|
||||
|
||||
# Get actual research entries
|
||||
session = db_manager.get_session(latest_user.username)
|
||||
researches = session.query(ResearchHistory).limit(5).all()
|
||||
|
||||
print(f"\nFound {len(researches)} research entries:")
|
||||
for r in researches:
|
||||
print(f" - ID: {r.id} (type: {type(r.id).__name__})")
|
||||
print(f" Query: {r.query[:50]}...")
|
||||
|
||||
session.close()
|
||||
db_manager.close_user_database(latest_user.username)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error checking database: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("=" * 60)
|
||||
print("UUID Research Test")
|
||||
print("=" * 60)
|
||||
|
||||
success = test_uuid_research()
|
||||
|
||||
# Also check database directly
|
||||
check_database_directly()
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("✅ Test completed" if success else "❌ Test failed")
|
||||
print("=" * 60)
|
||||
Reference in New Issue
Block a user