discovery/client.py
import asyncio
import os
from typing import Callable, List, Optional, Union
from discovery import api, log, utils
from .engine.aiohttp import AIOHTTPEngine
from .engine.httpx import HTTPXEngine
from .exceptions import NoConsulLeaderException
from .utils import Service
class Consul:
def __init__(
self,
client: Optional[Union[AIOHTTPEngine, HTTPXEngine]] = None,
*args,
**kwargs,
):
self.client = client or AIOHTTPEngine(*args, **kwargs)
self.catalog = api.Catalog(client=self.client)
self.config = api.Config(client=self.client)
self.coordinate = api.Coordinate(client=self.client)
self.events = api.Events(client=self.client)
self.health = api.Health(client=self.client)
self.kv = api.Kv(client=self.client)
self.namespace = api.Namespace(client=self.client)
self.query = api.Query(client=self.client)
self.session = api.Session(client=self.client)
self.snapshot = api.Snapshot(client=self.client)
self.status = api.Status(client=self.client)
self.txn = api.Txn(client=self.client)
self.agent = api.Agent(client=self.client)
self.connect = api.Connect(client=self.client)
self.acl = api.Acl(client=self.client)
self.operator = api.Operator(client=self.client)
self.binding_rule = api.BindingRule(client=self.client)
self.policy = api.Policy(client=self.client)
self.role = api.Role(client=self.client)
self.token = api.Token(client=self.client)
self.check = api.Checks(client=self.client)
self.services = api.Service(client=self.client)
self.ca = api.CA(client=self.client)
self.intentions = api.Intentions(client=self.client)
self.event = api.Events(client=self.client)
self.reconnect_timeout = float(
os.getenv("DISCOVERY_CLIENT_DEFAULT_TIMEOUT", 30)
)
self._leader_id: Optional[str] = None
@property
def current_leader_id(self) -> Optional[str]:
return self._leader_id
async def leader_ip(self, *args, **kwargs) -> str:
try:
current_leader = await self.status.leader(*args, **kwargs)
leader_ip, _ = current_leader.split(":")
except Exception as err:
raise NoConsulLeaderException(details=str(err))
return leader_ip
async def leader_id(self, **kwargs) -> str:
leader_ip = await self.leader_ip(**kwargs.get("leader_options", {}))
instances = await self.health.service_instances(
"consul", **kwargs.get("instance_options", {})
)
current_id = [
instance["Node"]["ID"]
for instance in instances
if instance["Node"]["Address"] == leader_ip
][0]
try:
return str(current_id)
except Exception as err:
raise NoConsulLeaderException(details=str(err))
async def find_services(self, name: str) -> List[dict]:
return await self.catalog.list_nodes_for_service(name) # type: ignore
async def find_service(
self, name: str, fn: Callable = utils.select_one_rr, *args, **kwargs
) -> Optional[dict]:
response = await self.find_services(name, *args, **kwargs)
return fn(response) # type: ignore
async def register(
self,
service: Service,
enable_watch: bool = False,
**kwargs,
) -> None:
self._leader_id = await self.leader_id(**kwargs)
log.debug(f"Consul leader id: [current='{self.current_leader_id}']")
try:
await self.agent.service.register(service.dict(), **kwargs)
except Exception as err:
raise err
if enable_watch:
loop = asyncio.get_running_loop()
loop.create_task(
self._watch_connection(service, **kwargs),
name="discovery-client-watch-connection",
)
async def deregister(self, service_id: str, ns: Optional[str] = None) -> None:
await self.agent.service.deregister(service_id, ns)
async def reconnect(self, service: Service, *args, **kwargs) -> None:
await self.deregister(service.id) # type: ignore
await self.register(service, *args, **kwargs)
async def _watch_connection(self, service: Service, **kwargs) -> None:
while True:
await asyncio.sleep(self.reconnect_timeout)
try:
current_id = await self.leader_id()
if current_id != self.current_leader_id:
log.debug(
f"Consul leader id changed: [current='{self.current_leader_id}', new='{current_id}']"
)
await self.reconnect(service, **kwargs)
except Exception:
log.info(
f"Failed to connect to Consul, trying again at {self.reconnect_timeout}/s"
)
def __repr__(self) -> str:
return f"Consul(engine={self.client})"