# Copyright 2023 Canonical Ltd.
# Licensed under the Apache V2, see LICENCE file for details.
import ipaddress
import logging
import typing
import pyrfc3339
from . import jasyncio, model, tag
from .annotationhelper import _get_annotations, _set_annotations
from .client import client
from .errors import JujuError
from juju.utils import juju_ssh_key_paths, block_until
log = logging.getLogger(__name__)
[docs]class Machine(model.ModelEntity):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs] async def destroy(self, force=False):
"""Remove this machine from the model.
Blocks until the machine is actually removed.
"""
if self.connection.is_using_old_client:
# Then we'll use the DestroyMachines from client.ClientFacade
facade = client.ClientFacade.from_connection(self.connection)
await facade.DestroyMachines(force=force, machine_names=[self.id])
else:
facade = client.MachineManagerFacade.from_connection(self.connection)
await facade.DestroyMachineWithParams(force=force, machine_tags=[tag.machine(self.id)])
log.debug('Destroying machine %s', self.id)
return await self.model._wait('machine', self.id, 'remove')
remove = destroy
[docs] async def get_annotations(self):
"""Get annotations on this machine.
:return dict: The annotations for this application
"""
return await _get_annotations(self.tag, self.connection)
[docs] async def set_annotations(self, annotations):
"""Set annotations on this machine.
:param annotations map[string]string: the annotations as key/value
pairs.
"""
return await _set_annotations(self.tag, annotations, self.connection)
def _format_addr(self, addr):
"""Validate and format IP address.
:param addr: IPv6 or IPv4 address
:type addr: str
:returns: Address string, optionally encapsulated in brackets ([])
:rtype: str
:raises: ValueError
"""
ipaddr = ipaddress.ip_address(addr)
if isinstance(ipaddr, ipaddress.IPv6Address):
fmt = '[{}]'
else:
fmt = '{}'
return fmt.format(ipaddr)
[docs] async def scp_to(self, source, destination, user='ubuntu', proxy=False,
scp_opts='', wait_for_active=False, timeout=None):
"""Transfer files to this machine.
:param str source: Local path of file(s) to transfer
:param str destination: Remote destination of transferred files
:param str user: Remote username
:param bool proxy: Proxy through the Juju API server
:param scp_opts: Additional options to the `scp` command
:type scp_opts: str or list
:param bool wait_for_active: Wait until the machine is ready to take in ssh commands.
:param int timeout: Time in seconds to wait until the machine becomes ready.
"""
if proxy:
raise NotImplementedError('proxy option is not implemented')
if wait_for_active:
await block_until(lambda: self.addresses, timeout=timeout)
try:
# if dns_name is an IP address format it appropriately
address = self._format_addr(self.dns_name)
except ValueError:
# otherwise we assume it to be a DNS resolvable string
address = self.dns_name
destination = '{}@{}:{}'.format(user, address, destination)
await self._scp(source, destination, scp_opts)
[docs] async def scp_from(self, source, destination, user='ubuntu', proxy=False,
scp_opts='', wait_for_active=False, timeout=None):
"""Transfer files from this machine.
:param str source: Remote path of file(s) to transfer
:param str destination: Local destination of transferred files
:param str user: Remote username
:param bool proxy: Proxy through the Juju API server
:param scp_opts: Additional options to the `scp` command
:type scp_opts: str or list
:param bool wait_for_active: Wait until the machine is ready to take in ssh commands.
:param int timeout: Time in seconds to wait until the machine becomes ready.
"""
if proxy:
raise NotImplementedError('proxy option is not implemented')
if wait_for_active:
await block_until(lambda: self.addresses, timeout=timeout)
try:
# if dns_name is an IP address format it appropriately
address = self._format_addr(self.dns_name)
except ValueError:
# otherwise we assume it to be a DNS resolvable string
address = self.dns_name
source = '{}@{}:{}'.format(user, address, source)
await self._scp(source, destination, scp_opts)
async def _scp(self, source, destination, scp_opts):
""" Execute an scp command. Requires a fully qualified source and
destination.
"""
_, id_path = juju_ssh_key_paths()
cmd = [
'scp',
'-i', id_path,
'-o', 'StrictHostKeyChecking=no',
'-q',
'-B'
]
cmd.extend(scp_opts.split() if isinstance(scp_opts, str) else scp_opts)
cmd.extend([source, destination])
# There's a bit of a gap between the time that the machine is assigned an IP and the ssh
# service is up and listening, which creates a race for the ssh command. So we retry a
# couple of times until either we run out of attempts, or the ssh command succeeds to
# mitigate that effect.
# TODO (cderici): refactor the ssh and scp subcommand processing into a single method.
retry_backoff = 2
retries = 10
for _ in range(retries):
process = await jasyncio.create_subprocess_exec(*cmd)
await process.wait()
if process.returncode == 0:
break
await jasyncio.sleep(retry_backoff)
if process.returncode != 0:
raise JujuError(f"command failed after {retries} attempts: {cmd}")
[docs] async def ssh(
self, command, user='ubuntu', proxy=False, ssh_opts=None, wait_for_active=False, timeout=None):
"""Execute a command over SSH on this machine.
:param str command: Command to execute
:param str user: Remote username
:param bool proxy: Proxy through the Juju API server
:param str ssh_opts: Additional options to the `ssh` command
:param bool wait_for_active: Wait until the machine is ready to take in ssh commands.
:param int timeout: Time in seconds to wait until the machine becomes ready.
"""
if proxy:
raise NotImplementedError('proxy option is not implemented')
if wait_for_active:
await block_until(lambda: self.addresses, timeout=timeout)
address = self.dns_name
destination = "{}@{}".format(user, address)
_, id_path = juju_ssh_key_paths()
cmd = [
'ssh',
'-i', id_path,
'-o', 'StrictHostKeyChecking=no',
'-q',
destination
]
if ssh_opts:
cmd.extend(ssh_opts.split() if isinstance(ssh_opts, str) else ssh_opts)
cmd.extend([command])
# There's a bit of a gap between the time that the machine is assigned an IP and the ssh
# service is up and listening, which creates a race for the ssh command. So we retry a
# couple of times until either we run out of attempts, or the ssh command succeeds to
# mitigate that effect.
retry_backoff = 2
retries = 10
for _ in range(retries):
process = await jasyncio.create_subprocess_exec(
*cmd, stdout=jasyncio.subprocess.PIPE, stderr=jasyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
if process.returncode == 0:
break
await jasyncio.sleep(retry_backoff)
if process.returncode != 0:
raise JujuError(f"command failed: {cmd} after {retries} attempts, with {stderr.decode()}")
# stdout is a bytes-like object, returning a string might be more useful
return stdout.decode()
@property
def addresses(self) -> typing.List[str]:
"""Returns the machine addresses.
"""
return self.safe_data['addresses'] or []
@property
def agent_status(self):
"""Returns the current Juju agent status string.
"""
return self.safe_data['agent-status']['current']
@property
def agent_status_since(self):
"""Get the time when the `agent_status` was last updated.
"""
return pyrfc3339.parse(self.safe_data['agent-status']['since'])
@property
def agent_version(self):
"""Get the version of the Juju machine agent.
May return None if the agent is not yet available.
"""
version = self.safe_data['agent-status']['version']
if version:
return client.Number.from_json(version)
else:
return None
@property
def status(self):
"""Returns the current machine provisioning status string.
"""
return self.safe_data['instance-status']['current']
@property
def status_message(self):
"""Returns the current machine provisioning status message.
"""
return self.safe_data['instance-status']['message']
@property
def status_since(self):
"""Get the time when the `status` was last updated.
"""
return pyrfc3339.parse(self.safe_data['instance-status']['since'])
@property
def dns_name(self):
"""Get the DNS name for this machine. This is a best guess based on the
addresses available in current data.
May return None if no suitable address is found.
"""
ordered_addresses = []
ordered_scopes = ['public', 'local-cloud', 'local-fan']
for scope in ordered_scopes:
for address in self.addresses:
if scope == address['scope']:
ordered_addresses.append(address)
for address in ordered_addresses:
scope = address['scope']
for check_scope in ordered_scopes:
if scope == check_scope:
return address['value']
return None
@property
def hostname(self):
"""Get the hostname for this machine as reported by the machine agent
running on it. This is only supported on 2.8.10+ controllers.
May return None if no hostname information is available.
"""
if 'hostname' in self.safe_data and self.safe_data['hostname'] != '':
return self.safe_data['hostname']
return None
@property
def series(self):
"""Returns the series of the current machine
"""
return self.safe_data['series']
@property
def tag(self):
return tag.machine(self.id)