Files
local-deep-research/tests/database/test_sqlcipher_utils.py
LearningCircuit 507d0d35ea fix: use env_registry for cipher_memory_security instead of raw os.environ
Addresses djpetti's review comment to use the project's environment
variable setting infrastructure (EnumSetting + get_env_setting) instead
of plain os.environ.get() for cipher_memory_security.
2026-02-10 18:41:32 +01:00

496 lines
18 KiB
Python

"""Tests for database/sqlcipher_utils.py."""
import os
import pytest
from unittest.mock import Mock, patch
class TestGetSqlcipherSettings:
"""Tests for get_sqlcipher_settings function."""
def test_returns_default_values(self):
"""Test that default values are returned when no env vars set."""
from local_deep_research.database.sqlcipher_utils import (
get_sqlcipher_settings,
DEFAULT_KDF_ITERATIONS,
DEFAULT_PAGE_SIZE,
DEFAULT_HMAC_ALGORITHM,
DEFAULT_KDF_ALGORITHM,
)
with patch.dict(os.environ, {}, clear=True):
# Clear any existing env vars that might affect the test
for key in ["LDR_DB_KDF_ITERATIONS", "LDR_DB_PAGE_SIZE"]:
os.environ.pop(key, None)
settings = get_sqlcipher_settings()
assert settings["kdf_iterations"] == DEFAULT_KDF_ITERATIONS
assert settings["page_size"] == DEFAULT_PAGE_SIZE
assert settings["hmac_algorithm"] == DEFAULT_HMAC_ALGORITHM
assert settings["kdf_algorithm"] == DEFAULT_KDF_ALGORITHM
def test_respects_env_var_kdf_iterations(self):
"""Test that LDR_DB_KDF_ITERATIONS env var is respected."""
from local_deep_research.database.sqlcipher_utils import (
get_sqlcipher_settings,
)
with patch.dict(os.environ, {"LDR_DB_KDF_ITERATIONS": "100000"}):
settings = get_sqlcipher_settings()
assert settings["kdf_iterations"] == 100000
def test_respects_env_var_page_size(self):
"""Test that LDR_DB_PAGE_SIZE env var is respected."""
from local_deep_research.database.sqlcipher_utils import (
get_sqlcipher_settings,
)
with patch.dict(os.environ, {"LDR_DB_PAGE_SIZE": "8192"}):
settings = get_sqlcipher_settings()
assert settings["page_size"] == 8192
def test_respects_env_var_hmac_algorithm(self):
"""Test that LDR_DB_HMAC_ALGORITHM env var is respected."""
from local_deep_research.database.sqlcipher_utils import (
get_sqlcipher_settings,
)
with patch.dict(os.environ, {"LDR_DB_HMAC_ALGORITHM": "HMAC_SHA256"}):
settings = get_sqlcipher_settings()
assert settings["hmac_algorithm"] == "HMAC_SHA256"
def test_returns_dict_type(self):
"""Test that settings returns a dictionary."""
from local_deep_research.database.sqlcipher_utils import (
get_sqlcipher_settings,
)
settings = get_sqlcipher_settings()
assert isinstance(settings, dict)
assert "kdf_iterations" in settings
assert "page_size" in settings
assert "hmac_algorithm" in settings
assert "kdf_algorithm" in settings
class TestSetSqlcipherKey:
"""Tests for set_sqlcipher_key function."""
def test_executes_pragma_key_command(self):
"""Test that PRAGMA key is executed with hex-encoded password."""
from local_deep_research.database.sqlcipher_utils import (
set_sqlcipher_key,
)
mock_cursor = Mock()
with patch(
"local_deep_research.database.sqlcipher_utils._get_key_from_password"
) as mock_get_key:
mock_get_key.return_value = b"\x01\x02\x03"
set_sqlcipher_key(mock_cursor, "testpass")
# Check that execute was called with PRAGMA key
mock_cursor.execute.assert_called_once()
call_args = mock_cursor.execute.call_args[0][0]
assert "PRAGMA key" in call_args
assert "x'" in call_args
class TestApplySqlcipherPragmas:
"""Tests for apply_sqlcipher_pragmas function."""
def test_applies_core_pragmas(self):
"""Test that core PRAGMA settings are applied."""
from local_deep_research.database.sqlcipher_utils import (
apply_sqlcipher_pragmas,
)
mock_cursor = Mock()
apply_sqlcipher_pragmas(mock_cursor, creation_mode=False)
# Check that core pragmas were executed
call_args_list = [
call[0][0] for call in mock_cursor.execute.call_args_list
]
assert any("cipher_page_size" in arg for arg in call_args_list)
assert any("cipher_hmac_algorithm" in arg for arg in call_args_list)
assert any("kdf_iter" in arg for arg in call_args_list)
def test_applies_creation_mode_pragmas(self):
"""Test that additional pragmas are applied in creation mode."""
from local_deep_research.database.sqlcipher_utils import (
apply_sqlcipher_pragmas,
)
mock_cursor = Mock()
apply_sqlcipher_pragmas(mock_cursor, creation_mode=True)
call_args_list = [
call[0][0] for call in mock_cursor.execute.call_args_list
]
assert any("cipher_memory_security" in arg for arg in call_args_list)
class TestApplyPerformancePragmas:
"""Tests for apply_performance_pragmas function."""
def test_applies_default_performance_pragmas(self):
"""Test that default performance pragmas are applied."""
from local_deep_research.database.sqlcipher_utils import (
apply_performance_pragmas,
)
mock_cursor = Mock()
apply_performance_pragmas(mock_cursor)
call_args_list = [
call[0][0] for call in mock_cursor.execute.call_args_list
]
assert any("temp_store = MEMORY" in arg for arg in call_args_list)
assert any("busy_timeout" in arg for arg in call_args_list)
assert any("cache_size" in arg for arg in call_args_list)
assert any("journal_mode" in arg for arg in call_args_list)
assert any("synchronous" in arg for arg in call_args_list)
def test_respects_cache_size_env_var(self):
"""Test that LDR_DB_CACHE_SIZE_MB env var is respected."""
from local_deep_research.database.sqlcipher_utils import (
apply_performance_pragmas,
)
mock_cursor = Mock()
with patch.dict(os.environ, {"LDR_DB_CACHE_SIZE_MB": "128"}):
apply_performance_pragmas(mock_cursor)
call_args_list = [
call[0][0] for call in mock_cursor.execute.call_args_list
]
# 128 MB = -131072 KB (negative for KB interpretation)
cache_call = [arg for arg in call_args_list if "cache_size" in arg][
0
]
assert "-131072" in cache_call
def test_respects_journal_mode_env_var(self):
"""Test that LDR_DB_JOURNAL_MODE env var is respected."""
from local_deep_research.database.sqlcipher_utils import (
apply_performance_pragmas,
)
mock_cursor = Mock()
with patch.dict(os.environ, {"LDR_DB_JOURNAL_MODE": "DELETE"}):
apply_performance_pragmas(mock_cursor)
call_args_list = [
call[0][0] for call in mock_cursor.execute.call_args_list
]
journal_call = [
arg for arg in call_args_list if "journal_mode" in arg
][0]
assert "DELETE" in journal_call
class TestVerifySqlcipherConnection:
"""Tests for verify_sqlcipher_connection function."""
def test_returns_true_for_valid_connection(self):
"""Test that True is returned for valid connection."""
from local_deep_research.database.sqlcipher_utils import (
verify_sqlcipher_connection,
)
mock_cursor = Mock()
mock_cursor.fetchone.return_value = (1,)
result = verify_sqlcipher_connection(mock_cursor)
assert result is True
def test_returns_false_for_invalid_connection(self):
"""Test that False is returned for invalid connection."""
from local_deep_research.database.sqlcipher_utils import (
verify_sqlcipher_connection,
)
mock_cursor = Mock()
mock_cursor.execute.side_effect = Exception("Connection error")
result = verify_sqlcipher_connection(mock_cursor)
assert result is False
def test_returns_false_for_wrong_result(self):
"""Test that False is returned when SELECT 1 returns wrong value."""
from local_deep_research.database.sqlcipher_utils import (
verify_sqlcipher_connection,
)
mock_cursor = Mock()
mock_cursor.fetchone.return_value = (0,)
result = verify_sqlcipher_connection(mock_cursor)
assert result is False
class TestConstants:
"""Tests for module constants."""
def test_default_kdf_iterations_is_reasonable(self):
"""Test that default KDF iterations is a reasonable security value."""
from local_deep_research.database.sqlcipher_utils import (
DEFAULT_KDF_ITERATIONS,
)
# Should be at least 100000 for security
assert DEFAULT_KDF_ITERATIONS >= 100000
assert isinstance(DEFAULT_KDF_ITERATIONS, int)
def test_default_page_size_is_power_of_two(self):
"""Test that default page size is a power of 2."""
from local_deep_research.database.sqlcipher_utils import (
DEFAULT_PAGE_SIZE,
)
# Page size should be a power of 2
assert DEFAULT_PAGE_SIZE > 0
assert (DEFAULT_PAGE_SIZE & (DEFAULT_PAGE_SIZE - 1)) == 0
def test_pbkdf2_placeholder_salt_exists(self):
"""Test that the PBKDF2 placeholder salt is defined."""
from local_deep_research.database.sqlcipher_utils import (
PBKDF2_PLACEHOLDER_SALT,
)
assert PBKDF2_PLACEHOLDER_SALT is not None
assert isinstance(PBKDF2_PLACEHOLDER_SALT, bytes)
class TestCreateSqlcipherConnection:
"""Tests for create_sqlcipher_connection function."""
def test_raises_import_error_when_sqlcipher_unavailable(self):
"""Test that ImportError is raised when sqlcipher3 not available."""
from local_deep_research.database.sqlcipher_utils import (
create_sqlcipher_connection,
)
with patch(
"local_deep_research.database.sqlcipher_compat.get_sqlcipher_module",
side_effect=ImportError("No module"),
):
with pytest.raises(
ImportError, match="sqlcipher3 is not available"
):
create_sqlcipher_connection("/tmp/test.db", "password")
def test_creates_connection_with_correct_password(self):
"""Test that connection is created with correct password handling."""
from local_deep_research.database.sqlcipher_utils import (
create_sqlcipher_connection,
)
mock_sqlcipher = Mock()
mock_conn = Mock()
mock_cursor = Mock()
mock_cursor.fetchone.return_value = (1,)
mock_conn.cursor.return_value = mock_cursor
mock_sqlcipher.connect.return_value = mock_conn
with patch(
"local_deep_research.database.sqlcipher_compat.get_sqlcipher_module",
return_value=mock_sqlcipher,
):
with patch(
"local_deep_research.database.sqlcipher_utils.set_sqlcipher_key"
) as mock_set_key:
create_sqlcipher_connection("/tmp/test.db", "mypassword")
mock_sqlcipher.connect.assert_called_once_with("/tmp/test.db")
mock_set_key.assert_called_once()
def test_raises_value_error_on_verification_failure(self):
"""Test that ValueError is raised when connection verification fails."""
from local_deep_research.database.sqlcipher_utils import (
create_sqlcipher_connection,
)
mock_sqlcipher = Mock()
mock_conn = Mock()
mock_cursor = Mock()
mock_cursor.fetchone.return_value = (0,) # Wrong result
mock_conn.cursor.return_value = mock_cursor
mock_sqlcipher.connect.return_value = mock_conn
with patch(
"local_deep_research.database.sqlcipher_compat.get_sqlcipher_module",
return_value=mock_sqlcipher,
):
with patch(
"local_deep_research.database.sqlcipher_utils.set_sqlcipher_key"
):
with pytest.raises(ValueError, match="Failed to establish"):
create_sqlcipher_connection("/tmp/test.db", "badpassword")
class TestSetSqlcipherRekey:
"""Tests for set_sqlcipher_rekey function — CRITICAL security fix."""
def test_rekey_uses_pbkdf2_derived_key(self):
"""Test that rekey uses _get_key_from_password, not raw UTF-8 hex."""
from local_deep_research.database.sqlcipher_utils import (
set_sqlcipher_rekey,
)
mock_cursor = Mock()
# Remove SQLAlchemy-like interface so we take the raw cursor path
mock_cursor.execute.__self__ = None
del mock_cursor.execute.__self__
with patch(
"local_deep_research.database.sqlcipher_utils._get_key_from_password"
) as mock_get_key:
mock_get_key.return_value = b"\xab\xcd\xef"
set_sqlcipher_rekey(mock_cursor, "newpass")
# Verify _get_key_from_password was called with the new password
mock_get_key.assert_called_once_with("newpass")
# Verify the hex key from PBKDF2 is used, not raw UTF-8 hex
call_args = mock_cursor.execute.call_args[0][0]
assert "PRAGMA rekey" in call_args
assert "abcdef" in call_args # hex of b"\xab\xcd\xef"
def test_rekey_key_format_matches_set_key(self):
"""Test that rekey produces same hex format as set_key."""
from local_deep_research.database.sqlcipher_utils import (
set_sqlcipher_key,
set_sqlcipher_rekey,
)
mock_cursor = Mock()
fake_key = b"\x01\x02\x03\x04"
with patch(
"local_deep_research.database.sqlcipher_utils._get_key_from_password",
return_value=fake_key,
):
set_sqlcipher_key(mock_cursor, "testpass")
key_sql = mock_cursor.execute.call_args[0][0]
mock_cursor.reset_mock()
# For rekey, ensure we use the raw cursor path
if hasattr(mock_cursor.execute, "__self__"):
del mock_cursor.execute.__self__
set_sqlcipher_rekey(mock_cursor, "testpass")
rekey_sql = mock_cursor.execute.call_args[0][0]
# Both should use the same hex encoding of the PBKDF2 key
hex_key = fake_key.hex()
assert f"x'{hex_key}'" in key_sql
assert f"x'{hex_key}'" in rekey_sql
class TestCipherMemorySecurityEnvVar:
"""Tests for configurable cipher_memory_security."""
def test_defaults_to_on(self):
"""Test that cipher_memory_security defaults to ON."""
from local_deep_research.database.sqlcipher_utils import (
apply_sqlcipher_pragmas,
)
mock_cursor = Mock()
with patch.dict(os.environ, {}, clear=False):
os.environ.pop("LDR_DB_CONFIG_CIPHER_MEMORY_SECURITY", None)
apply_sqlcipher_pragmas(mock_cursor, creation_mode=True)
call_args_list = [
call[0][0] for call in mock_cursor.execute.call_args_list
]
mem_calls = [
arg for arg in call_args_list if "cipher_memory_security" in arg
]
assert len(mem_calls) == 1
assert "ON" in mem_calls[0]
def test_respects_off_env_var(self):
"""Test that LDR_DB_CONFIG_CIPHER_MEMORY_SECURITY=OFF is respected."""
from local_deep_research.database.sqlcipher_utils import (
apply_sqlcipher_pragmas,
)
mock_cursor = Mock()
with patch.dict(
os.environ, {"LDR_DB_CONFIG_CIPHER_MEMORY_SECURITY": "OFF"}
):
apply_sqlcipher_pragmas(mock_cursor, creation_mode=True)
call_args_list = [
call[0][0] for call in mock_cursor.execute.call_args_list
]
mem_calls = [
arg for arg in call_args_list if "cipher_memory_security" in arg
]
assert len(mem_calls) == 1
assert "OFF" in mem_calls[0]
def test_invalid_value_falls_back_to_on(self):
"""Test that invalid value falls back to ON."""
from local_deep_research.database.sqlcipher_utils import (
apply_sqlcipher_pragmas,
)
mock_cursor = Mock()
with patch.dict(
os.environ, {"LDR_DB_CONFIG_CIPHER_MEMORY_SECURITY": "INVALID"}
):
apply_sqlcipher_pragmas(mock_cursor, creation_mode=True)
call_args_list = [
call[0][0] for call in mock_cursor.execute.call_args_list
]
mem_calls = [
arg for arg in call_args_list if "cipher_memory_security" in arg
]
assert len(mem_calls) == 1
assert "ON" in mem_calls[0]
class TestTTLKeyCache:
"""Tests for TTL-bounded key cache."""
def test_cache_returns_same_key_for_same_password(self):
"""Test that cache returns consistent results."""
from local_deep_research.database.sqlcipher_utils import (
_get_key_from_password,
_key_cache,
)
# Clear cache before test
_key_cache.clear()
key1 = _get_key_from_password("test_cache_password")
key2 = _get_key_from_password("test_cache_password")
assert key1 == key2
def test_different_passwords_produce_different_keys(self):
"""Test that different passwords give different keys."""
from local_deep_research.database.sqlcipher_utils import (
_get_key_from_password,
_key_cache,
)
_key_cache.clear()
key1 = _get_key_from_password("password_a_unique")
key2 = _get_key_from_password("password_b_unique")
assert key1 != key2