Source code for lalandre_core.http.llm_client

"""
Shared HTTP client for compact JSON-oriented LLM calls.
"""

from dataclasses import dataclass
from typing import Any, Dict, Mapping, Optional, cast

import httpx
from tenacity import (
    retry,
    retry_if_exception,
    stop_after_attempt,
    wait_exponential,
)

from lalandre_core.utils.api_key_pool import APIKeyPool
from lalandre_core.utils.shared_key_pool import SharedKeyPoolProxy, build_clients_by_key


[docs] def coerce_json_object(value: Any) -> Optional[Dict[str, Any]]: """Safely coerce a runtime value to a JSON object with string keys.""" if not isinstance(value, dict): return None raw_dict = cast(dict[Any, Any], value) coerced: Dict[str, Any] = {} for raw_key, raw_value in raw_dict.items(): if isinstance(raw_key, str): coerced[raw_key] = raw_value return coerced
def _is_retryable(exc: BaseException) -> bool: if isinstance(exc, httpx.TimeoutException): return True if isinstance(exc, httpx.ConnectError): return True if isinstance(exc, httpx.HTTPStatusError): return exc.response.status_code == 429 or exc.response.status_code >= 500 return False
[docs] @dataclass(frozen=True) class JSONHTTPLLMClient: """Thin HTTP client for OpenAI-compatible JSON generation.""" provider: str model: str base_url: str timeout_seconds: float max_output_tokens: int temperature: float api_key: Optional[str] = None system_prompt: str = "Return valid JSON only." error_preview_chars: int = 240 def __post_init__(self) -> None: normalized_provider = self.provider.strip().lower() normalized_base_url = self.base_url.strip().rstrip("/") object.__setattr__(self, "provider", normalized_provider) object.__setattr__(self, "model", self.model.strip()) object.__setattr__(self, "base_url", normalized_base_url) object.__setattr__(self, "timeout_seconds", max(float(self.timeout_seconds), 0.1)) object.__setattr__(self, "max_output_tokens", max(int(self.max_output_tokens), 1)) object.__setattr__(self, "temperature", max(float(self.temperature), 0.0)) api_key = self.api_key.strip() if isinstance(self.api_key, str) else None object.__setattr__(self, "api_key", api_key or None) object.__setattr__(self, "error_preview_chars", max(int(self.error_preview_chars), 40)) object.__setattr__( self, "_client", httpx.Client(timeout=httpx.Timeout(self.timeout_seconds)), )
[docs] def generate(self, prompt: str) -> str: """Generate a JSON-formatted completion payload as raw string.""" try: return self._generate_openai_compatible(prompt) except httpx.HTTPStatusError as exc: details = exc.response.text[: self.error_preview_chars] raise RuntimeError(f"HTTP {exc.response.status_code}: {details}") from exc except httpx.ConnectError as exc: raise RuntimeError(f"Connection error: {exc}") from exc except httpx.TimeoutException as exc: raise RuntimeError(f"Timeout: {exc}") from exc
def _generate_openai_compatible(self, prompt: str) -> str: if self.base_url.endswith("/v1"): url = f"{self.base_url}/chat/completions" else: url = f"{self.base_url}/v1/chat/completions" payload: Dict[str, Any] = { "model": self.model, "temperature": self.temperature, "max_tokens": self.max_output_tokens, "response_format": {"type": "json_object"}, "messages": [ { "role": "system", "content": self.system_prompt, }, { "role": "user", "content": prompt, }, ], } response = self._post_json(url, payload) choices_raw = response.get("choices") if not isinstance(choices_raw, list) or not choices_raw: return "" first_choice = coerce_json_object(choices_raw[0]) if first_choice is None: return "" message = coerce_json_object(first_choice.get("message")) if message is None: return "" content_raw = message.get("content", "") return content_raw if isinstance(content_raw, str) else "" @retry( stop=stop_after_attempt(4), wait=wait_exponential(multiplier=1, min=1, max=16), retry=retry_if_exception(_is_retryable), reraise=True, ) def _post_json(self, url: str, payload: Dict[str, Any]) -> Dict[str, Any]: headers: Dict[str, str] = {} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" response: httpx.Response = self._client.post(url, json=payload, headers=headers) # type: ignore[attr-defined] response.raise_for_status() payload_obj = coerce_json_object(response.json()) return payload_obj if payload_obj is not None else {}
[docs] class SharedKeyPoolJSONHTTPLLMClient: """Dispatch JSON HTTP LLM calls through a shared API key pool.""" def __init__( self, *, key_pool: APIKeyPool, clients_by_key: Mapping[str, JSONHTTPLLMClient], ) -> None: self._proxy = SharedKeyPoolProxy( key_pool=key_pool, clients_by_key=clients_by_key, )
[docs] @classmethod def from_key_pool( cls, *, key_pool: APIKeyPool, provider: str, model: str, base_url: str, timeout_seconds: float, max_output_tokens: int, temperature: float, system_prompt: str = "Return valid JSON only.", error_preview_chars: int = 240, ) -> "SharedKeyPoolJSONHTTPLLMClient": """Build one JSON HTTP client per API key and wrap them in a shared pool.""" clients_by_key = build_clients_by_key( key_pool=key_pool, factory=lambda key: JSONHTTPLLMClient( provider=provider, model=model, base_url=base_url, timeout_seconds=timeout_seconds, api_key=key, max_output_tokens=max_output_tokens, temperature=temperature, system_prompt=system_prompt, error_preview_chars=error_preview_chars, ), ) return cls( key_pool=key_pool, clients_by_key=clients_by_key, )
[docs] def generate(self, prompt: str) -> str: """Generate one JSON response using the next key selected by the pool.""" return self._proxy.generate(prompt)
def __getattr__(self, name: str) -> Any: return getattr(self._proxy, name)