# Copyright 2023 Canonical Ltd.
# Licensed under the Apache V2, see LICENCE file for details.
import asyncio
import ipaddress
import logging
import typing
from backports.datetime_fromisoformat import datetime_fromisoformat
from juju.utils import block_until, juju_ssh_key_paths
from . import model, tag
from .annotationhelper import _get_annotations, _set_annotations
from .client import client
from .errors import JujuError
log = logging.getLogger(__name__)
[docs]class Machine(model.ModelEntity):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs] async def destroy(self, force=False):
"""Remove this machine from the model.
Blocks until the machine is actually removed.
"""
if self.connection.is_using_old_client:
# Then we'll use the DestroyMachines from client.ClientFacade
facade = client.ClientFacade.from_connection(self.connection)
await facade.DestroyMachines(force=force, machine_names=[self.id])
else:
facade = client.MachineManagerFacade.from_connection(self.connection)
await facade.DestroyMachineWithParams(
force=force, machine_tags=[tag.machine(self.id)]
)
log.debug("Destroying machine %s", self.id)
return await self.model._wait("machine", self.id, "remove")
remove = destroy
[docs] async def get_annotations(self):
"""Get annotations on this machine.
:return dict: The annotations for this application
"""
return await _get_annotations(self.tag, self.connection)
[docs] async def set_annotations(self, annotations):
"""Set annotations on this machine.
:param annotations map[string]string: the annotations as key/value
pairs.
"""
return await _set_annotations(self.tag, annotations, self.connection)
def _format_addr(self, addr):
"""Validate and format IP address.
:param addr: IPv6 or IPv4 address
:type addr: str
:returns: Address string, optionally encapsulated in brackets ([])
:rtype: str
:raises: ValueError
"""
ipaddr = ipaddress.ip_address(addr)
if isinstance(ipaddr, ipaddress.IPv6Address):
fmt = "[{}]"
else:
fmt = "{}"
return fmt.format(ipaddr)
[docs] async def scp_to(
self,
source,
destination,
user="ubuntu",
proxy=False,
scp_opts="",
wait_for_active=False,
timeout=None,
):
"""Transfer files to this machine.
:param str source: Local path of file(s) to transfer
:param str destination: Remote destination of transferred files
:param str user: Remote username
:param bool proxy: Proxy through the Juju API server
:param scp_opts: Additional options to the `scp` command
:type scp_opts: str or list
:param bool wait_for_active: Wait until the machine is ready to take in ssh commands.
:param int timeout: Time in seconds to wait until the machine becomes ready.
"""
if proxy:
raise NotImplementedError("proxy option is not implemented")
if wait_for_active:
await block_until(lambda: self.addresses, timeout=timeout)
try:
# if dns_name is an IP address format it appropriately
address = self._format_addr(self.dns_name)
except ValueError:
# otherwise we assume it to be a DNS resolvable string
address = self.dns_name
destination = f"{user}@{address}:{destination}"
await self._scp(source, destination, scp_opts)
[docs] async def scp_from(
self,
source,
destination,
user="ubuntu",
proxy=False,
scp_opts="",
wait_for_active=False,
timeout=None,
):
"""Transfer files from this machine.
:param str source: Remote path of file(s) to transfer
:param str destination: Local destination of transferred files
:param str user: Remote username
:param bool proxy: Proxy through the Juju API server
:param scp_opts: Additional options to the `scp` command
:type scp_opts: str or list
:param bool wait_for_active: Wait until the machine is ready to take in ssh commands.
:param int timeout: Time in seconds to wait until the machine becomes ready.
"""
if proxy:
raise NotImplementedError("proxy option is not implemented")
if wait_for_active:
await block_until(lambda: self.addresses, timeout=timeout)
try:
# if dns_name is an IP address format it appropriately
address = self._format_addr(self.dns_name)
except ValueError:
# otherwise we assume it to be a DNS resolvable string
address = self.dns_name
source = f"{user}@{address}:{source}"
await self._scp(source, destination, scp_opts)
async def _scp(self, source, destination, scp_opts):
"""Execute an scp command. Requires a fully qualified source and
destination.
"""
_, id_path = juju_ssh_key_paths()
cmd = ["scp", "-i", id_path, "-o", "StrictHostKeyChecking=no", "-q", "-B"]
cmd.extend(scp_opts.split() if isinstance(scp_opts, str) else scp_opts)
cmd.extend([source, destination])
# There's a bit of a gap between the time that the machine is assigned an IP and the ssh
# service is up and listening, which creates a race for the ssh command. So we retry a
# couple of times until either we run out of attempts, or the ssh command succeeds to
# mitigate that effect.
# TODO (cderici): refactor the ssh and scp subcommand processing into a single method.
retry_backoff = 2
retries = 10
for _ in range(retries):
process = await asyncio.create_subprocess_exec(*cmd)
await process.wait()
if process.returncode == 0:
break
await asyncio.sleep(retry_backoff)
if process.returncode != 0:
raise JujuError(f"command failed after {retries} attempts: {cmd}")
[docs] async def ssh(
self,
command,
user="ubuntu",
proxy=False,
ssh_opts=None,
wait_for_active=False,
timeout=None,
):
"""Execute a command over SSH on this machine.
:param str command: Command to execute
:param str user: Remote username
:param bool proxy: Proxy through the Juju API server
:param str ssh_opts: Additional options to the `ssh` command
:param bool wait_for_active: Wait until the machine is ready to take in ssh commands.
:param int timeout: Time in seconds to wait until the machine becomes ready.
"""
if proxy:
raise NotImplementedError("proxy option is not implemented")
if wait_for_active:
await block_until(lambda: self.addresses, timeout=timeout)
address = self.dns_name
destination = f"{user}@{address}"
_, id_path = juju_ssh_key_paths()
cmd = [
"ssh",
"-i",
id_path,
"-o",
"StrictHostKeyChecking=no",
"-q",
destination,
]
if ssh_opts:
cmd.extend(ssh_opts.split() if isinstance(ssh_opts, str) else ssh_opts)
cmd.extend([command])
# There's a bit of a gap between the time that the machine is assigned an IP and the ssh
# service is up and listening, which creates a race for the ssh command. So we retry a
# couple of times until either we run out of attempts, or the ssh command succeeds to
# mitigate that effect.
retry_backoff = 2
retries = 10
for _ in range(retries):
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
break
await asyncio.sleep(retry_backoff)
if process.returncode != 0:
raise JujuError(
f"command failed: {cmd} after {retries} attempts, with {stderr.decode()}"
)
# stdout is a bytes-like object, returning a string might be more useful
return stdout.decode()
@property
def addresses(self) -> typing.List[dict]:
"""Returns the machine addresses."""
return self.safe_data["addresses"] or []
@property
def agent_status(self):
"""Returns the current Juju agent status string."""
return self.safe_data["agent-status"]["current"]
@property
def agent_status_since(self):
"""Get the time when the `agent_status` was last updated."""
return datetime_fromisoformat(self.safe_data["agent-status"]["since"])
@property
def agent_version(self):
"""Get the version of the Juju machine agent.
May return None if the agent is not yet available.
"""
version = self.safe_data["agent-status"]["version"]
if version:
return client.Number.from_json(version)
else:
return None
@property
def status(self):
"""Returns the current machine provisioning status string."""
return self.safe_data["instance-status"]["current"]
@property
def status_message(self):
"""Returns the current machine provisioning status message."""
return self.safe_data["instance-status"]["message"]
@property
def status_since(self):
"""Get the time when the `status` was last updated."""
return datetime_fromisoformat(self.safe_data["instance-status"]["since"])
@property
def dns_name(self):
"""Get the DNS name for this machine. This is a best guess based on the
addresses available in current data.
May return None if no suitable address is found.
"""
ordered_scopes = ["public", "local-cloud", "local-fan"]
ordered_addresses = [
address
for scope in ordered_scopes
for address in self.addresses
if scope == address["scope"]
]
for address in ordered_addresses:
scope = address["scope"]
for check_scope in ordered_scopes:
if scope == check_scope:
return address["value"]
return None
@property
def hostname(self):
"""Get the hostname for this machine as reported by the machine agent
running on it. This is only supported on 2.8.10+ controllers.
May return None if no hostname information is available.
"""
if "hostname" in self.safe_data and self.safe_data["hostname"] != "":
return self.safe_data["hostname"]
return None
@property
def series(self):
"""Returns the series of the current machine"""
return self.safe_data["series"]
@property
def tag(self):
return tag.machine(self.id)