"""
Shared HTTP client for compact JSON-oriented LLM calls.
"""
from dataclasses import dataclass
from typing import Any, Dict, Mapping, Optional, cast
import httpx
from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)
from lalandre_core.utils.api_key_pool import APIKeyPool
from lalandre_core.utils.shared_key_pool import SharedKeyPoolProxy, build_clients_by_key
[docs]
def coerce_json_object(value: Any) -> Optional[Dict[str, Any]]:
"""Safely coerce a runtime value to a JSON object with string keys."""
if not isinstance(value, dict):
return None
raw_dict = cast(dict[Any, Any], value)
coerced: Dict[str, Any] = {}
for raw_key, raw_value in raw_dict.items():
if isinstance(raw_key, str):
coerced[raw_key] = raw_value
return coerced
def _is_retryable(exc: BaseException) -> bool:
if isinstance(exc, httpx.TimeoutException):
return True
if isinstance(exc, httpx.ConnectError):
return True
if isinstance(exc, httpx.HTTPStatusError):
return exc.response.status_code == 429 or exc.response.status_code >= 500
return False
[docs]
@dataclass(frozen=True)
class JSONHTTPLLMClient:
"""Thin HTTP client for OpenAI-compatible JSON generation."""
provider: str
model: str
base_url: str
timeout_seconds: float
max_output_tokens: int
temperature: float
api_key: Optional[str] = None
system_prompt: str = "Return valid JSON only."
error_preview_chars: int = 240
def __post_init__(self) -> None:
normalized_provider = self.provider.strip().lower()
normalized_base_url = self.base_url.strip().rstrip("/")
object.__setattr__(self, "provider", normalized_provider)
object.__setattr__(self, "model", self.model.strip())
object.__setattr__(self, "base_url", normalized_base_url)
object.__setattr__(self, "timeout_seconds", max(float(self.timeout_seconds), 0.1))
object.__setattr__(self, "max_output_tokens", max(int(self.max_output_tokens), 1))
object.__setattr__(self, "temperature", max(float(self.temperature), 0.0))
api_key = self.api_key.strip() if isinstance(self.api_key, str) else None
object.__setattr__(self, "api_key", api_key or None)
object.__setattr__(self, "error_preview_chars", max(int(self.error_preview_chars), 40))
object.__setattr__(
self,
"_client",
httpx.Client(timeout=httpx.Timeout(self.timeout_seconds)),
)
[docs]
def generate(self, prompt: str) -> str:
"""Generate a JSON-formatted completion payload as raw string."""
try:
return self._generate_openai_compatible(prompt)
except httpx.HTTPStatusError as exc:
details = exc.response.text[: self.error_preview_chars]
raise RuntimeError(f"HTTP {exc.response.status_code}: {details}") from exc
except httpx.ConnectError as exc:
raise RuntimeError(f"Connection error: {exc}") from exc
except httpx.TimeoutException as exc:
raise RuntimeError(f"Timeout: {exc}") from exc
def _generate_openai_compatible(self, prompt: str) -> str:
if self.base_url.endswith("/v1"):
url = f"{self.base_url}/chat/completions"
else:
url = f"{self.base_url}/v1/chat/completions"
payload: Dict[str, Any] = {
"model": self.model,
"temperature": self.temperature,
"max_tokens": self.max_output_tokens,
"response_format": {"type": "json_object"},
"messages": [
{
"role": "system",
"content": self.system_prompt,
},
{
"role": "user",
"content": prompt,
},
],
}
response = self._post_json(url, payload)
choices_raw = response.get("choices")
if not isinstance(choices_raw, list) or not choices_raw:
return ""
first_choice = coerce_json_object(choices_raw[0])
if first_choice is None:
return ""
message = coerce_json_object(first_choice.get("message"))
if message is None:
return ""
content_raw = message.get("content", "")
return content_raw if isinstance(content_raw, str) else ""
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=1, max=16),
retry=retry_if_exception(_is_retryable),
reraise=True,
)
def _post_json(self, url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
headers: Dict[str, str] = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
response: httpx.Response = self._client.post(url, json=payload, headers=headers) # type: ignore[attr-defined]
response.raise_for_status()
payload_obj = coerce_json_object(response.json())
return payload_obj if payload_obj is not None else {}
[docs]
class SharedKeyPoolJSONHTTPLLMClient:
"""Dispatch JSON HTTP LLM calls through a shared API key pool."""
def __init__(
self,
*,
key_pool: APIKeyPool,
clients_by_key: Mapping[str, JSONHTTPLLMClient],
) -> None:
self._proxy = SharedKeyPoolProxy(
key_pool=key_pool,
clients_by_key=clients_by_key,
)
[docs]
@classmethod
def from_key_pool(
cls,
*,
key_pool: APIKeyPool,
provider: str,
model: str,
base_url: str,
timeout_seconds: float,
max_output_tokens: int,
temperature: float,
system_prompt: str = "Return valid JSON only.",
error_preview_chars: int = 240,
) -> "SharedKeyPoolJSONHTTPLLMClient":
"""Build one JSON HTTP client per API key and wrap them in a shared pool."""
clients_by_key = build_clients_by_key(
key_pool=key_pool,
factory=lambda key: JSONHTTPLLMClient(
provider=provider,
model=model,
base_url=base_url,
timeout_seconds=timeout_seconds,
api_key=key,
max_output_tokens=max_output_tokens,
temperature=temperature,
system_prompt=system_prompt,
error_preview_chars=error_preview_chars,
),
)
return cls(
key_pool=key_pool,
clients_by_key=clients_by_key,
)
[docs]
def generate(self, prompt: str) -> str:
"""Generate one JSON response using the next key selected by the pool."""
return self._proxy.generate(prompt)
def __getattr__(self, name: str) -> Any:
return getattr(self._proxy, name)