Source code for juju.model

# Copyright 2023 Canonical Ltd.
# Licensed under the Apache V2, see LICENCE file for details.

import base64
import collections
import hashlib
import json
import logging
import os
import sys
import re
import stat
import tempfile
import warnings
import weakref
import zipfile
from concurrent.futures import CancelledError
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path

import yaml
import websockets

from . import provisioner, tag, utils, jasyncio
from .annotationhelper import _get_annotations, _set_annotations
from .bundle import BundleHandler, get_charm_series, is_local_charm
from .charmhub import CharmHub
from .client import client, connector
from .client.overrides import Caveat, Macaroon
from .constraints import parse as parse_constraints
from .controller import Controller, ConnectedController
from .delta import get_entity_class, get_entity_delta
from .errors import JujuAPIError, JujuError, JujuModelConfigError, JujuBackupError
from .errors import JujuModelError, JujuAppError, JujuUnitError, JujuAgentError, JujuMachineError, PylibjujuError, JujuNotSupportedError
from .exceptions import DeadEntityException
from .names import is_valid_application
from .offerendpoints import ParseError as OfferParseError
from .offerendpoints import parse_local_endpoint, parse_offer_url
from .origin import Channel, Source
from .placement import parse as parse_placement
from .secrets import create_secret_data, read_secret_data
from .tag import application as application_tag
from .url import URL, Schema
from .version import DEFAULT_ARCHITECTURE

log = logging.getLogger(__name__)


class _Observer:
    """Wrapper around an observer callable.

    This wrapper allows filter criteria to be associated with the
    callable so that it's only called for changes that meet the criteria.

    """
    def __init__(self, callable_, entity_type, action, entity_id, predicate):
        self.callable_ = callable_
        self.entity_type = entity_type
        self.action = action
        self.entity_id = entity_id
        self.predicate = predicate
        if self.entity_id:
            self.entity_id = str(self.entity_id)
            if not self.entity_id.startswith('^'):
                self.entity_id = '^' + self.entity_id
            if not self.entity_id.endswith('$'):
                self.entity_id += '$'

    async def __call__(self, delta, old, new, model):
        await self.callable_(delta, old, new, model)

    def cares_about(self, delta):
        """Return True if this observer "cares about" (i.e. wants to be
        called) for a this delta.

        """
        if (self.entity_id and delta.get_id() and
                not re.match(self.entity_id, str(delta.get_id()))):
            return False

        if self.entity_type and self.entity_type != delta.entity:
            return False

        if self.action and self.action != delta.type:
            return False

        if self.predicate and not self.predicate(delta):
            return False

        return True


[docs]class ModelObserver: """ Base class for creating observers that react to changes in a model. """ async def __call__(self, delta, old, new, model): handler_name = 'on_{}_{}'.format(delta.entity, delta.type) method = getattr(self, handler_name, self.on_change) await method(delta, old, new, model)
[docs] async def on_change(self, delta, old, new, model): """Generic model-change handler. This should be overridden in a subclass. :param delta: :class:`juju.client.overrides.Delta` :param old: :class:`juju.model.ModelEntity` :param new: :class:`juju.model.ModelEntity` :param model: :class:`juju.model.Model` """ pass
[docs]class ModelState: """Holds the state of the model, including the delta history of all entities in the model. """ def __init__(self, model): self.model = model self.state = dict() def _live_entity_map(self, entity_type): """Return an id:Entity map of all the living entities of type ``entity_type``. """ return { entity_id: self.get_entity(entity_type, entity_id) for entity_id, history in self.state.get(entity_type, {}).items() if history[-1] is not None } @property def applications(self): """Return a map of application-name:Application for all applications currently in the model. """ return self._live_entity_map('application') @property def remote_applications(self): """Return a map of application-name:Application for all remote applications currently in the model. """ return self._live_entity_map('remoteApplication') @property def application_offers(self): """Return a map of application-name:Application for all applications offers currently in the model. """ return self._live_entity_map('applicationOffer') @property def machines(self): """Return a map of machine-id:Machine for all machines currently in the model. """ return self._live_entity_map('machine') @property def units(self): """Return a map of unit-id:Unit for all units currently in the model. """ return self._live_entity_map('unit') @property def subordinate_units(self): """Return a map of unit-id:Unit for all subordinate units""" return {u_name: u for u_name, u in self.units.items() if u.is_subordinate} @property def relations(self): """Return a map of relation-id:Relation for all relations currently in the model. """ return self._live_entity_map('relation')
[docs] def entity_history(self, entity_type, entity_id): """Return the history deque for an entity. """ return self.state[entity_type][entity_id]
[docs] def entity_data(self, entity_type, entity_id, history_index): """Return the data dict for an entity at a specific index of its history. """ return self.entity_history(entity_type, entity_id)[history_index]
[docs] def apply_delta(self, delta): """Apply delta to our state and return a copy of the affected object as it was before and after the update, e.g.: old_obj, new_obj = self.apply_delta(delta) old_obj may be None if the delta is for the creation of a new object, e.g. a new application or unit is deployed. new_obj will never be None, but may be dead (new_obj.dead == True) if the object was deleted as a result of the delta being applied. """ history = ( self.state .setdefault(delta.entity, {}) .setdefault(delta.get_id(), collections.deque()) ) history.append(delta.data) if delta.type == 'remove': history.append(None) entity = self.get_entity(delta.entity, delta.get_id()) return entity.previous(), entity
[docs] def get_entity( self, entity_type, entity_id, history_index=-1, connected=True): """Return an object instance for the given entity_type and id. By default the object state matches the most recent state from Juju. To get an instance of the object in an older state, pass history_index, an index into the history deque for the entity. """ if history_index < 0 and history_index != -1: history_index += len(self.entity_history(entity_type, entity_id)) if history_index < 0: return None try: self.entity_data(entity_type, entity_id, history_index) except IndexError: return None entity_class = get_entity_class(entity_type) return entity_class( entity_id, self.model, history_index=history_index, connected=connected)
[docs]class ModelEntity: """An object in the Model tree""" def __init__(self, entity_id, model, history_index=-1, connected=True): """Initialize a new entity :param entity_id str: The unique id of the object in the model :param model: The model instance in whose object tree this entity resides :history_index int: The index of this object's state in the model's history deque for this entity :connected bool: Flag indicating whether this object gets live updates from the model. """ self.entity_id = entity_id self.model = model self._history_index = history_index self.connected = connected self.connection = model.connection() self._status = 'unknown' def __repr__(self): return '<{} entity_id="{}">'.format(type(self).__name__, self.entity_id) def __getattr__(self, name): """Fetch object attributes from the underlying data dict held in the model. """ try: return self.safe_data[name] except KeyError: name = name.replace('_', '-') if name in self.safe_data: return self.safe_data[name] else: raise def __bool__(self): return bool(self.data)
[docs] def on_change(self, callable_): """Add a change observer to this entity. """ self.model.add_observer( callable_, self.entity_type, 'change', self.entity_id)
[docs] def on_remove(self, callable_): """Add a remove observer to this entity. """ self.model.add_observer( callable_, self.entity_type, 'remove', self.entity_id)
@property def entity_type(self): """A string identifying the entity type of this object, e.g. 'application' or 'unit', etc. """ # Allow the overriding of entity names from the type instead of from # the class name. Useful because Model and ModelInfo clash and we really # want ModelInfo to be called Model. if hasattr(self.__class__, "type_name_override") and callable(self.__class__.type_name_override): return self.__class__.type_name_override() def first_lower(s): if len(s) == 0: return s else: return s[0].lower() + s[1:] return first_lower(self.__class__.__name__) @property def current(self): """Return True if this object represents the current state of the entity in the underlying model. This will be True except when the object represents an entity at a non-latest state in history, e.g. if the object was obtained by calling .previous() on another object. """ return self._history_index == -1 @property def dead(self): """Returns True if this entity no longer exists in the underlying model. """ return ( self.data is None or self.model.state.entity_data( self.entity_type, self.entity_id, -1) is None ) @property def alive(self): """Returns True if this entity still exists in the underlying model. """ return not self.dead @property def data(self): """The data dictionary for this entity. """ return self.model.state.entity_data( self.entity_type, self.entity_id, self._history_index) @property def safe_data(self): """The data dictionary for this entity. If this `ModelEntity` points to the dead state, it will raise `DeadEntityException`. """ if self.data is None: raise DeadEntityException( "Entity {}:{} is dead - its attributes can no longer be " "accessed. Use the .previous() method on this object to get " "a copy of the object at its previous state.".format( self.entity_type, self.entity_id)) return self.data
[docs] def previous(self): """Return a copy of this object as was at its previous state in history. Returns None if this object is new (and therefore has no history). The returned object is always "disconnected", i.e. does not receive live updates. """ return self.model.state.get_entity( self.entity_type, self.entity_id, self._history_index - 1, connected=False)
[docs] def next(self): """Return a copy of this object at its next state in history. Returns None if this object is already the latest. The returned object is "disconnected", i.e. does not receive live updates, unless it is current (latest). """ if self._history_index == -1: return None new_index = self._history_index + 1 connected = ( new_index == len(self.model.state.entity_history( self.entity_type, self.entity_id)) - 1 ) return self.model.state.get_entity( self.entity_type, self.entity_id, self._history_index - 1, connected=connected)
[docs] def latest(self): """Return a copy of this object at its current state in the model. Returns self if this object is already the latest. The returned object is always "connected", i.e. receives live updates from the model. """ if self._history_index == -1: return self return self.model.state.get_entity(self.entity_type, self.entity_id)
[docs]class DeployTypeResult: """DeployTypeResult represents the result of a deployment type after a resolution. """ def __init__(self, identifier, origin, app_name, is_local=False, is_bundle=False): self.identifier = identifier self.origin = origin self.app_name = app_name self.is_local = is_local self.is_bundle = is_bundle
[docs]class LocalDeployType: """LocalDeployType deals with local only deployments. """
[docs] async def resolve(self, charm_path, architecture, app_name=None, channel=None, series=None, revision=None, entity_url=None, force=False, model_conf=None): """resolve attempts to resolve a local charm or bundle using the url and architecture. If information is missing, it will attempt to backfill that information, before sending the result back. -- revision flag is ignored for local charms """ entity_path = Path(charm_path) if entity_path.suffix == '.yaml': bundle_path = entity_path else: bundle_path = entity_path / 'bundle.yaml' origin = client.CharmOrigin(source="local", architecture=architecture) if not (entity_path.is_dir() or entity_path.is_file()): raise JujuError('{} path not found'.format(entity_url)) is_bundle = bundle_path.exists() if app_name is None: if is_bundle: bundle_with_overlays = [b for b in yaml.safe_load_all(bundle_path.read_text())] app_name = bundle_with_overlays[0].get('name', '') else: app_name = utils.get_local_charm_metadata(entity_path)["name"] return DeployTypeResult( identifier=charm_path, origin=origin, app_name=app_name, is_local=True, is_bundle=is_bundle, )
[docs]class CharmhubDeployType: """CharmhubDeployType defines a class for resolving and deploying charmhub charms and bundles. """ def __init__(self, charm_resolver): self.charm_resolver = charm_resolver
[docs] async def resolve(self, url, architecture, app_name=None, channel=None, series=None, revision=None, entity_url=None, force=False, model_conf=None): """resolve attempts to resolve charmhub charms or bundles. A request to the charmhub API is required to correctly determine the charm url and underlying origin. """ if revision and not channel: raise JujuError('specifying a revision requires a channel for future upgrades. Please use --channel') ch = Channel('latest', 'stable') if channel is not None: ch = Channel.parse(channel).normalize() base = client.Base() if series: base.channel = ch.normalize().compute_base_channel(series=series) base.name = 'ubuntu' origin = client.CharmOrigin(source=Source.CHARM_HUB.value, architecture=architecture, risk=ch.risk, track=ch.track, base=base, revision=revision, ) charm_url, origin = await self.charm_resolver(url, origin, force, series, model_conf) is_bundle = origin.type_ == "bundle" if is_bundle and revision and channel: raise JujuError('revision and channel are mutually exclusive when deploying a bundle. Please choose one.') if app_name is None: app_name = charm_url.name return DeployTypeResult( identifier=str(charm_url), app_name=app_name, origin=origin, is_bundle=is_bundle, )
[docs]class Model: """ The main API for interacting with a Juju model. """ def __init__( self, max_frame_size=None, bakery_client=None, jujudata=None, ): """Instantiate a new Model. The connect method will need to be called before this object can be used for anything interesting. If jujudata is None, jujudata.FileJujuData will be used. :param max_frame_size: See `juju.client.connection.Connection.MAX_FRAME_SIZE` :param bakery_client httpbakery.Client: The bakery client to use for macaroon authorization. :param jujudata JujuData: The source for current controller information """ self._connector = connector.Connector( max_frame_size=max_frame_size, bakery_client=bakery_client, jujudata=jujudata, ) self._observers = weakref.WeakValueDictionary() self.state = ModelState(self) self._info = None self._mode = None self._watch_stopping = jasyncio.Event() self._watch_stopped = jasyncio.Event() self._watch_received = jasyncio.Event() self._watch_stopped.set() self._charmhub = CharmHub(self) self.deploy_types = { Schema.LOCAL: LocalDeployType(), Schema.CHARM_HUB: CharmhubDeployType(self._resolve_charm), }
[docs] def is_connected(self): """Reports whether the Model is currently connected.""" return self._connector.is_connected()
[docs] def connection(self): """Return the current Connection object. It raises an exception if the Model is disconnected""" return self._connector.connection()
[docs] async def get_controller(self): """Return a Controller instance for the currently connected model. :return Controller: """ from juju.controller import Controller controller = Controller(jujudata=self._connector.jujudata) kwargs = self.connection().connect_params() kwargs.pop('uuid') await controller._connect_direct(**kwargs) return controller
async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc, tb): await self.disconnect()
[docs] async def connect(self, *args, **kwargs): """Connect to a juju model. This supports two calling conventions: The model and (optionally) authentication information can be taken from the data files created by the Juju CLI. This convention will be used if a ``model_name`` is specified, or if the ``endpoint`` and ``uuid`` are not. Otherwise, all of the ``endpoint``, ``uuid``, and authentication information (``username`` and ``password``, or ``bakery_client`` and/or ``macaroons``) are required. If a single positional argument is given, it will be assumed to be the ``model_name``. Otherwise, the first positional argument, if any, must be the ``endpoint``. Available parameters are: :param model_name: Format [controller:][user/]model :param str endpoint: The hostname:port of the controller to connect to. :param str uuid: The model UUID to connect to. :param str username: The username for controller-local users (or None to use macaroon-based login.) :param str password: The password for controller-local users. :param str cacert: The CA certificate of the controller (PEM formatted). :param httpbakery.Client bakery_client: The macaroon bakery client to to use when performing macaroon-based login. Macaroon tokens acquired when logging will be saved to bakery_client.cookies. If this is None, a default bakery_client will be used. :param list macaroons: List of macaroons to load into the ``bakery_client``. :param int max_frame_size: The maximum websocket frame size to allow. :param specified_facades: Overwrite the facades with a series of specified facades. """ is_debug_log_conn = 'debug_log_conn' in kwargs if not is_debug_log_conn: await self.disconnect() model_name = model_uuid = None if 'endpoint' not in kwargs and len(args) < 2: # Then we're using the model_name to pick the model if args and 'model_name' in kwargs: raise TypeError('connect() got multiple values for model_name') elif args: model_name = args[0] else: model_name = kwargs.pop('model_name', None) model_uuid = await self._connector.connect_model(model_name, **kwargs) else: # Then we're using the endpoint to pick the model if 'model_name' in kwargs: raise TypeError('connect() got values for both ' 'model_name and endpoint') if args and 'endpoint' in kwargs: raise TypeError('connect() got multiple values for endpoint') if len(args) < 2 and 'uuid' not in kwargs: raise TypeError('connect() missing value for uuid') has_userpass = (len(args) >= 4 or {'username', 'password'}.issubset(kwargs)) has_macaroons = (len(args) >= 6 or not {'bakery_client', 'macaroons'}.isdisjoint(kwargs)) if not (has_userpass or has_macaroons): raise TypeError('connect() missing auth params') arg_names = [ 'endpoint', 'uuid', 'username', 'password', 'cacert', 'bakery_client', 'macaroons', 'max_frame_size', ] for i, arg in enumerate(args): kwargs[arg_names[i]] = arg model_uuid = kwargs['uuid'] if not {'endpoint', 'uuid'}.issubset(kwargs): raise ValueError('endpoint and uuid are required ' 'if model_name not given') if not ({'username', 'password'}.issubset(kwargs) or {'bakery_client', 'macaroons'}.intersection(kwargs)): raise ValueError('Authentication parameters are required ' 'if model_name not given') await self._connector.connect(**kwargs) if not is_debug_log_conn: await self._after_connect(model_name, model_uuid)
[docs] async def connect_model(self, model_name, **kwargs): """ .. deprecated:: 0.6.2 Use ``connect(model_name=model_name)`` instead. """ return await self.connect(model_name=model_name, **kwargs)
[docs] async def connect_current(self): """ .. deprecated:: 0.6.2 Use ``connect()`` instead. """ return await self.connect()
[docs] async def connect_to(self, connection): conn_params = connection.connect_params() await self._connect_direct(**conn_params)
async def _connect_direct(self, **kwargs): if self.info: uuid = self.info.uuid elif 'uuid' in kwargs: uuid = kwargs['uuid'] else: raise PylibjujuError("Unable to find uuid for the model") await self.disconnect() await self._connector.connect(**kwargs) await self._after_connect(model_uuid=uuid) async def _after_connect(self, model_name=None, model_uuid=None): self._watch() # Wait for the first packet of data from the AllWatcher, # which contains all information on the model. # TODO this means that we can't do anything until # we've received all the model data, which might be # a whole load of unneeded data if all the client wants # to do is make one RPC call. async def watch_received_waiter(): await self._watch_received.wait() waiter = jasyncio.create_task(watch_received_waiter()) # If we just wait for the _watch_received event and the _all_watcher task # fails (e.g. because API fails like migration is in progress), then # we'll hang because the _watch_received will never be set # Instead, we watch for two things, 1) _watch_received, 2) _all_watcher done # If _all_watcher is done before the _watch_received, then we should see # (and raise) an exception coming from the _all_watcher # Otherwise (i.e. _watch_received is set), then we're good to go done, pending = await jasyncio.wait([waiter, self._watcher_task], return_when=jasyncio.FIRST_COMPLETED) if self._watcher_task in done: # Cancel the _watch_received.wait waiter.cancel() # If there's no exception, then why did the _all_watcher broke its loop? if not self._watcher_task.exception(): raise JujuError("AllWatcher task is finished abruptly without an exception.") raise self._watcher_task.exception() if self._info is None: # TODO (cderici): See if this can be optimized away, or at least # be done lazily (i.e. not everytime after_connect, but whenever # self.info is needed -- which here can be bypassed if model_uuid # is known) async with ConnectedController(self.connection()) as contr: self._info = await contr.get_model_info(model_name, model_uuid) log.debug('Got ModelInfo: %s', vars(self.info)) self.uuid = self.info.uuid
[docs] async def disconnect(self): """Shut down the watcher task and close websockets. """ if not self._watch_stopped.is_set(): log.debug('Stopping watcher task') self._watch_stopping.set() # If the _all_watcher task is finished, # check to see an exception, if yes, raise, # otherwise we should see the _watch_stopped # flag is set if self._watcher_task.done() and self._watcher_task.exception(): raise self._watcher_task.exception() await self._watch_stopped.wait() self._watch_stopping.clear() if self.is_connected(): await self._connector.disconnect(entity='model') self._info = None
[docs] async def add_local_charm_dir(self, charm_dir, series): """Upload a local charm to the model. This will automatically generate an archive from the charm dir. :param charm_dir: Path to the charm directory :param series: Charm series """ charm_dir = Path(charm_dir) if charm_dir.suffix == '.charm': fn = charm_dir else: fn = tempfile.NamedTemporaryFile().name CharmArchiveGenerator(str(charm_dir)).make_archive(fn) with open(str(fn), 'rb') as fh: func = partial( self.add_local_charm, fh, series, os.stat(str(fn)).st_size) loop = jasyncio.get_running_loop() charm_url = await loop.run_in_executor(None, func) log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url) return charm_url
[docs] def add_local_charm(self, charm_file, series="", size=None): """Upload a local charm archive to the model. Returns the 'local:...' url that should be used to deploy the charm. :param charm_file: Path to charm zip archive :param series: Charm series :param size: Size of the archive, in bytes :return str: 'local:...' url for deploying the charm :raises: :class:`JujuError` if the upload fails Uses an https endpoint at the same host:port as the wss. Supports large file uploads. .. warning:: This method will block. Consider using :meth:`add_local_charm_dir` instead. """ conn, headers, path_prefix = self.connection().https_connection() path = "%s/charms?series=%s" % (path_prefix, series) headers['Content-Type'] = 'application/zip' if size: headers['Content-Length'] = size conn.request("POST", path, charm_file, headers) response = conn.getresponse() result = response.read().decode() if not response.status == 200: raise JujuError(result) result = json.loads(result) return result['charm-url']
[docs] def all_units_idle(self): """Return True if all units are idle. """ for unit in self.units.values(): unit_status = unit.data['agent-status']['current'] if unit_status != 'idle': return False return True
[docs] async def reset(self, force=False): """Reset the model to a clean state. :param bool force: Force-terminate machines. This returns only after the model has reached a clean state. "Clean" means no applications or machines exist in the model. """ log.debug('Resetting model') for app in self.applications.values(): await app.destroy() await self.block_until( lambda: len(self.applications) == 0 ) for machine in self.machines.values(): await machine.destroy(force=force) await self.block_until( lambda: len(self.machines) == 0 )
[docs] async def create_storage_pool(self, name, provider_type, attributes=""): """Create or define a storage pool. :param str name: a pool name :param str provider_type: provider type (defaults to "kubernetes" for Kubernetes models) :param str attributes: attributes for configuration as space-separated pairs, e.g. tags, size, path, etc. :return: """ _attrs = [splt.split("=") for splt in attributes.split()] storage_facade = client.StorageFacade.from_connection(self.connection()) return await storage_facade.CreatePool(pools=[client.StoragePool( name=name, provider=provider_type, attrs=dict(_attrs) )])
[docs] async def remove_storage_pool(self, name): """Remove an existing storage pool. :param str name: :return: """ storage_facade = client.StorageFacade.from_connection(self.connection()) return await storage_facade.RemovePool(pools=[client.StoragePoolDeleteArg(name)])
[docs] async def update_storage_pool(self, name, attributes=""): """ Update storage pool attributes. :param name: :param attributes: "key=value key=value ..." :return: """ _attrs = dict([splt.split("=") for splt in attributes.split()]) if len(_attrs) == 0: raise JujuError("Expected at least one attribute to update") storage_facade = client.StorageFacade.from_connection(self.connection()) return await storage_facade.UpdatePool(pools=[client.StoragePool( name=name, attrs=_attrs, )])
[docs] async def list_storage(self, filesystem=False, volume=False): """Lists storage details. :param bool filesystem: List filesystem storage :param bool volume: List volume storage :return: """ storage_facade = client.StorageFacade.from_connection(self.connection()) if filesystem and volume: raise JujuError("--filesystem and --volume can not be used together") if filesystem: _res = await storage_facade.ListFilesystems(filters=[client.FilesystemFilter()]) elif volume: _res = await storage_facade.ListVolumes(filters=[client.VolumeFilter()]) else: _res = await storage_facade.ListStorageDetails(filters=[client.StorageFilter()]) err = _res.results[0].error res = _res.results[0].result if err is not None: raise JujuError(err.message) return [details.serialize() for details in res]
[docs] async def show_storage_details(self, *storage_ids): """Shows storage instance information. :param []str storage_ids: :return: """ if not storage_ids: raise JujuError("Expected at least one storage ID") storage_facade = client.StorageFacade.from_connection(self.connection()) res = await storage_facade.StorageDetails(entities=[client.Entity(tag.storage(s)) for s in storage_ids]) return [s.result.serialize() for s in res.results]
[docs] async def list_storage_pools(self): """List storage pools. :return: """ # TODO (cderici): Filter on pool type, name. storage_facade = client.StorageFacade.from_connection(self.connection()) res = await storage_facade.ListPools(filters=[client.StoragePoolFilter()]) err = res.results[0].error if err: raise JujuError(err.message) return [p.serialize() for p in res.results[0].storage_pools]
[docs] async def remove_storage(self, *storage_ids, force=False, destroy_storage=False): """Removes storage from the model. :param bool force: Remove storage even if it is currently attached :param bool destroy_storage: Remove the storage and destroy it :param []str storage_ids: :return: """ if not storage_ids: raise JujuError("Expected at least one storage ID") storage_facade = client.StorageFacade.from_connection(self.connection()) ret = await storage_facade.Remove(storage=[client.RemoveStorageInstance( destroy_storage=destroy_storage, force=force, tag=s, ) for s in storage_ids]) if ret.results[0].error: raise JujuError(ret.results[0].error.message)
[docs] async def remove_application( self, app_name, block_until_done=False, force=False, destroy_storage=False, no_wait=False, timeout=None ): """Removes the given application from the model. :param str app_name: Name of the application :param bool force: Completely remove an application and all its dependencies. (=false) :param bool destroy_storage: Destroy storage attached to application unit. (=false) :param bool no_wait: Rush through application removal without waiting for each individual step to complete (=false) :param bool block_until_done: Ensure the app is removed from the model when returned :param int timeout: Raise asyncio.exceptions.TimeoutError if the application is not removed within the timeout period. """ if app_name not in self.applications: raise JujuError("Given application doesn't seem to appear in the\ model: %s\nCurrent applications are: %s" % (app_name, self.applications)) await self.applications[app_name].remove(destroy_storage=destroy_storage, force=force, no_wait=no_wait, ) if block_until_done: await self.block_until(lambda: app_name not in self.applications, timeout=timeout)
[docs] async def block_until(self, *conditions, timeout=None, wait_period=0.5): """Return only after all conditions are true. Raises `websockets.ConnectionClosed` if disconnected. """ def _disconnected(): return not (self.is_connected() and self.connection().is_open) def done(): return _disconnected() or all(c() for c in conditions) await utils.block_until(done, timeout=timeout, wait_period=wait_period) if _disconnected(): raise websockets.ConnectionClosed(1006, 'no reason')
@property def tag(self): return tag.model(self.uuid) @property def applications(self): """Return a map of application-name:Application for all applications currently in the model. """ return self.state.applications @property def remote_applications(self): """Return a map of application-name:Application for all remote applications currently in the model. """ return self.state.remote_applications @property def application_offers(self): """Return a map of application-name:Application for all applications offers currently in the model. """ return self.state.application_offers @property def machines(self): """Return a map of machine-id:Machine for all machines currently in the model. """ return self.state.machines @property def units(self): """Return a map of unit-id:Unit for all units currently in the model. """ return self.state.units @property def subordinate_units(self): """Return a map of unit-id:Unit for all subordiante units currently in the model. """ return self.state.subordinate_units @property def relations(self): """Return a list of all Relations currently in the model. """ return list(self.state.relations.values()) @property def charmhub(self): """Return a charmhub repository for requesting charm information using the charm-hub-url model config. """ return self._charmhub @property def name(self): """Return the name of this model """ if self._info is None: raise JujuModelError("Model is not connected") return self._info.name @property def info(self): """Return the cached client.ModelInfo object for this Model. If Model.get_info() has not been called, this will return None. """ return self._info @property def strict_mode(self): return self._mode is not None and "strict" in self._mode
[docs] def add_observer( self, callable_, entity_type=None, action=None, entity_id=None, predicate=None): """Register an "on-model-change" callback Once the model is connected, ``callable_`` will be called each time the model changes. ``callable_`` should be Awaitable and accept the following positional arguments: delta - An instance of :class:`juju.delta.EntityDelta` containing the raw delta data recv'd from the Juju websocket. old_obj - If the delta modifies an existing object in the model, old_obj will be a copy of that object, as it was before the delta was applied. Will be None if the delta creates a new entity in the model. new_obj - A copy of the new or updated object, after the delta is applied. Will be None if the delta removes an entity from the model. model - The :class:`Model` itself. Events for which ``callable_`` is called can be specified by passing entity_type, action, and/or entitiy_id filter criteria, e.g.:: add_observer( myfunc, entity_type='application', action='add', entity_id='ubuntu') For more complex filtering conditions, pass a predicate function. It will be called with a delta as its only argument. If the predicate function returns True, the ``callable_`` will be called. """ observer = _Observer( callable_, entity_type, action, entity_id, predicate) self._observers[observer] = callable_
def _watch(self): """Start an asynchronous watch against this model. See :meth:`add_observer` to register an onchange callback. """ def _post_step(obj): # Once we get the model, ensure we're running in the correct state # as a post step. if isinstance(obj, ModelInfo) and obj.safe_data is not None: model_config = obj.safe_data["config"] if "mode" in model_config: self._mode = model_config["mode"] async def _all_watcher(): # First attempt to get the model config so we know what mode the # library should be running in. model_config = await self.get_config() if "mode" in model_config: self._mode = model_config["mode"]["value"] try: allwatcher = client.AllWatcherFacade.from_connection( self.connection()) while not self._watch_stopping.is_set(): try: results = await utils.run_with_interrupt( allwatcher.Next(), self._watch_stopping, log=log) except JujuAPIError as e: if 'watcher was stopped' not in str(e): raise if self._watch_stopping.is_set(): # this shouldn't ever actually happen, because # the event should trigger before the controller # has a chance to tell us the watcher is stopped # but handle it gracefully, just in case break # controller stopped our watcher for some reason # but we're not actually stopping, so just restart it log.warning( 'Watcher: watcher stopped, restarting') del allwatcher.Id continue except websockets.ConnectionClosed: monitor = self.connection().monitor if monitor.status == monitor.ERROR: # closed unexpectedly, try to reopen log.warning( 'Watcher: connection closed, reopening') await self.connection().reconnect() if monitor.status != monitor.CONNECTED: # reconnect failed; abort and shutdown log.error('Watcher: automatic reconnect ' 'failed; stopping watcher') break del allwatcher.Id continue else: # closed on request, go ahead and shutdown break if self._watch_stopping.is_set(): try: await allwatcher.Stop() except websockets.ConnectionClosed: pass # can't stop on a closed conn break for delta in results.deltas: entity = None try: entity = get_entity_delta(delta) except KeyError: if self.strict_mode: raise JujuError("unknown delta type '{}'".format(delta.entity)) if not self.strict_mode and entity is None: continue old_obj, new_obj = self.state.apply_delta(entity) await self._notify_observers(entity, old_obj, new_obj) # Post step ensure that we can handle any settings # that need to be correctly set as a post step. _post_step(new_obj) self._watch_received.set() except CancelledError: pass except Exception: log.exception('Error in watcher') raise finally: self._watch_stopped.set() log.debug('Starting watcher task') self._watch_received.clear() self._watch_stopping.clear() self._watch_stopped.clear() self._watcher_task = jasyncio.create_task(_all_watcher()) async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state :param delta: The raw change from the watcher (:class:`juju.client.overrides.Delta`) :param old_obj: The object in the model that this delta updates. May be None. :param new_obj: The object in the model that is created or updated by applying this delta. """ if new_obj and not old_obj: delta.type = 'add' log.debug( 'Model changed: %s %s %s', delta.entity, delta.type, delta.get_id()) for o in self._observers: if o.cares_about(delta): jasyncio.ensure_future(o(delta, old_obj, new_obj, self)) async def _wait(self, entity_type, entity_id, action, predicate=None): """ Block the calling routine until a given action has happened to the given entity :param entity_type: The entity's type. :param entity_id: The entity's id. :param action: the type of action (e.g., 'add', 'change', or 'remove') :param predicate: optional callable that must take as an argument a delta, and must return a boolean, indicating whether the delta contains the specific action we're looking for. For example, you might check to see whether a 'change' has a 'completed' status. See the _Observer class for details. """ q = jasyncio.Queue() async def callback(delta, old, new, model): await q.put(delta.get_id()) self.add_observer(callback, entity_type, action, entity_id, predicate) entity_id = await q.get() # object might not be in the entity_map if we were waiting for a # 'remove' action return self.state._live_entity_map(entity_type).get(entity_id) async def _wait_for_new(self, entity_type, entity_id): """Wait for a new object to appear in the Model and return it. Waits for an object of type ``entity_type`` with id ``entity_id`` to appear in the model. This is similar to watching for the object using ``block_until``, but uses the watcher rather than polling. """ # if the entity is already in the model, just return it if entity_id in self.state._live_entity_map(entity_type): return self.state._live_entity_map(entity_type)[entity_id] return await self._wait(entity_type, entity_id, None)
[docs] async def wait_for_action(self, action_id): """Given an action, wait for it to complete.""" if action_id.startswith("action-"): # if we've been passed action.tag, transform it into the # id that the api deltas will use. action_id = action_id[7:] def predicate(delta): return delta.data['status'] in ('completed', 'failed') return await self._wait('action', action_id, None, predicate)
[docs] async def get_annotations(self): """Get annotations on this model. :return dict: The annotations for this model """ return await _get_annotations(self.tag, self.connection())
[docs] async def set_annotations(self, annotations): """Set annotations on this model. :param annotations map[string]string: the annotations as key/value pairs. """ return await _set_annotations(self.tag, annotations, self.connection())
[docs] async def add_machine( self, spec=None, constraints=None, disks=None, series=None): """Start a new, empty machine and optionally a container, or add a container to a machine. :param str spec: Machine specification Examples:: (None) - starts a new machine 'lxd' - starts a new machine with one lxd container 'lxd:4' - starts a new lxd container on machine 4 'ssh:user@10.10.0.3:/path/to/private/key' - manually provision a machine with ssh and the private key used for authentication 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS 'maas2.name' - acquire machine maas2.name on MAAS :param dict constraints: Machine constraints, which can contain the the following keys:: arch : str container : str cores : int cpu_power : int instance_type : str mem : int root_disk : int spaces : list(str) tags : list(str) virt_type : str Example:: constraints={ 'mem': 256 * MB, 'tags': ['virtual'], } :param list disks: List of disk constraint dictionaries, which can contain the following keys:: count : int pool : str size : int Example:: disks=[{ 'pool': 'rootfs', 'size': 10 * GB, 'count': 1, }] :param str series: Series, e.g. 'xenial' Supported container types are: lxd, kvm When deploying a container to an existing machine, constraints cannot be used. """ params = client.AddMachineParams() if spec: if spec.startswith("ssh:"): placement, target, private_key_path = spec.split(":") user, host = target.split("@") sshProvisioner = provisioner.SSHProvisioner( host=host, user=user, private_key_path=private_key_path, ) params = sshProvisioner.provision_machine() else: placement = parse_placement(spec) if placement: params.placement = placement[0] params.jobs = ['JobHostUnits'] if constraints: params.constraints = client.Value.from_json(constraints) if disks: params.disks = [ client.Constraints.from_json(o) for o in disks] if series: params.series = series # Submit the request. client_facade = client.MachineManagerFacade.from_connection(self.connection()) results = await client_facade.AddMachines(params=[params]) error = results.machines[0].error if error: raise ValueError("Error adding machine: %s" % error.message) machine_id = results.machines[0].machine if spec: if spec.startswith("ssh:"): # Need to run this after AddMachines has been called, # as we need the machine_id await sshProvisioner.install_agent( self.connection(), params.nonce, machine_id, ) log.debug('Added new machine %s', machine_id) return await self._wait_for_new('machine', machine_id)
[docs] async def add_relation(self, relation1, relation2): """ .. deprecated:: 2.9.9 Use ``relate()`` instead """ return await self.relate(relation1, relation2)
[docs] async def integrate(self, relation1, relation2): """Add a relation between two applications. :param str relation1: '<application>[:<relation_name>]' :param str relation2: '<application>[:<relation_name>]' """ # attempt to validate any url that are passed in. endpoints = [] remote_endpoint = None for ep in [relation1, relation2]: try: url = parse_offer_url(ep) except OfferParseError: pass else: if remote_endpoint is not None: raise JujuError("move than one remote endpoints not supported") remote_endpoint = url endpoints.append(url.application) continue try: parse_local_endpoint(ep) except OfferParseError: raise else: endpoints.append(ep) if len(endpoints) != 2: raise JujuError("error validating one of the endpoints") facade_cls = client.ApplicationFacade if remote_endpoint is not None: if facade_cls.best_facade_version(self.connection()) < 5: # old clients don't support cross model capability raise JujuError("cannot add relation to {}: remote endpoints not supported".format(remote_endpoint.string())) if remote_endpoint.has_empty_source(): async with ConnectedController(self.connection()) as current: remote_endpoint.source = current.controller_name # consume the remote endpoint await self.consume(remote_endpoint.string(), application_alias=remote_endpoint.application) log.debug( 'Adding relation %s <-> %s', endpoints[0], endpoints[1]) def _find_relation(*specs): for rel in self.relations: if rel.matches(*specs): return rel return None app_facade = facade_cls.from_connection(self.connection()) try: result = await app_facade.AddRelation(endpoints=endpoints, via_cidrs=None) except JujuAPIError as e: if 'relation already exists' not in e.message: raise rel = _find_relation(endpoints[0], endpoints[1]) if rel: return rel raise JujuError('Relation {} {} exists but not in model'.format( endpoints[0], endpoints[1])) specs = ['{}:{}'.format(app, data['name']) for app, data in result.endpoints.items()] await self.block_until(lambda: _find_relation(*specs) is not None) return _find_relation(*specs)
[docs] async def relate(self, relation1, relation2): """The relate function is deprecated in favor of integrate. The logic is the same. """ log.warning("relate is deprecated and will be removed. Use integrate instead.") return await self.integrate(relation1, relation2)
[docs] async def add_space(self, name, cidrs=None, public=True): """Add a new network space. Adds a new space with the given name and associates the given (optional) list of existing subnet CIDRs with it. :param str name: Name of the space :param List[str] cidrs: Optional list of existing subnet CIDRs """ space_facade = client.SpacesFacade.from_connection(self.connection()) spaces = [ { "cidrs": cidrs, "space-tag": tag.space(name), "public": public}] return await space_facade.CreateSpaces(spaces=spaces)
[docs] async def add_ssh_key(self, user, key): """Add a public SSH key to this model. :param str user: The username of the user :param str key: The public ssh key """ key_facade = client.KeyManagerFacade.from_connection(self.connection()) return await key_facade.AddKeys(ssh_keys=[key], user=user)
add_ssh_keys = add_ssh_key
[docs] async def get_backups(self): """Retrieve metadata for backups in this model. :return [dict]: List of metadata for the stored backups """ backups_facade = client.BackupsFacade.from_connection(self.connection()) _backups_metadata = await backups_facade.List() backups_metadata = _backups_metadata.serialize() if 'list' not in backups_metadata: raise JujuAPIError("Unexpected response metadata : %s" % backups_metadata) return backups_metadata['list']
[docs] async def create_backup(self, notes=None): """Create a backup of this model. :param str note: A note to store with the backup :param bool keep_copy: Keep a copy of the archive on the controller :param bool no_download: Do not download the backup archive :return str, dict: Filename for the downloaded archive file, Extra metadata for the created backup """ backups_facade = client.BackupsFacade.from_connection(self.connection()) results = await backups_facade.Create(notes=notes) if results is None: raise JujuAPIError("unable to create a backup") backup_metadata = results.serialize() if 'error' in backup_metadata: raise JujuBackupError("unable to create a backup, got %s from Juju API" % backup_metadata) backup_id = backup_metadata['filename'] file_name = self.download_backup(backup_id) return file_name, backup_metadata
[docs] async def debug_log( self, target=sys.stdout, no_tail=False, exclude_module=[], include_module=[], include=[], level="", limit=sys.maxsize, lines=10, exclude=[]): """Get log messages for this model. :param bool no_tail: Stop after returning existing log messages :param list exclude_module: Do not show log messages for these logging modules :param list include_module: Only show log messages for these logging modules :param list include: Only show log messages for these entities :param str level: Log level to show, valid options are 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR, :param int limit: Return this many of the most recent (possibly filtered) lines are shown :param int lines: Yield this many of the most recent lines, and keep yielding :param list exclude: Do not show log messages for these entities """ if not self.is_connected(): await self.connect() params = { 'no_tail': no_tail, 'exclude_module': exclude_module, 'include_module': include_module, 'include': include, 'level': level, 'limit': limit, 'lines': lines, 'exclude': exclude, } await self.connect(debug_log_conn=target, debug_log_params=params)
[docs] async def deploy( self, entity_url, application_name=None, bind=None, channel=None, config=None, constraints=None, force=False, num_units=1, overlays=[], base=None, resources=None, series=None, revision=None, storage=None, to=None, devices=None, trust=False, attach_storage=[]): """Deploy a new service or bundle. :param str entity_url: Charm or bundle to deploy. Charm url or file path :param str application_name: Name to give the service :param dict bind: <charm endpoint>:<network space> pairs :param str channel: Charm store channel from which to retrieve the charm or bundle, e.g. 'edge' :param dict config: Charm configuration dictionary :param constraints: Service constraints :type constraints: :class:`juju.Constraints` :param bool force: Allow charm to be deployed to a machine running an unsupported series :param int num_units: Number of units to deploy :param [] overlays: Bundles to overlay on the primary bundle, applied in order :param str base: The base on which to deploy :param dict resources: <resource name>:<file path> pairs :param str series: Series on which to deploy DEPRECATED: use --base (with Juju 3.1) :param int revision: specifying a revision requires a channel for future upgrades for charms. For bundles, revision and channel are mutually exclusive. :param dict storage: Storage constraints TODO how do these look? :param to: Placement directive as a string. For example: '23' - place on machine 23 'lxd:7' - place in new lxd container on machine 7 '24/lxd/3' - place in container 3 on machine 24 If None, a new machine is provisioned. :param devices: charm device constraints :param bool trust: Trust signifies that the charm should be deployed with access to trusted credentials. Hooks run by the charm can access cloud credentials and other trusted access credentials. :param str[] attach_storage: Existing storage to attach to the deployed unit (not available on k8s models) """ if storage: storage = { k: client.Constraints(**v) for k, v in storage.items() } if trust and (self.info.agent_version < client.Number.from_json('2.4.0')): raise NotImplementedError("trusted is not supported on model version {}".format(self.info.agent_version)) if not all([isinstance(st, str) for st in attach_storage]): raise JujuError("Expected attach_storage to be a list of strings, given {}".format(attach_storage)) # Ensure what we pass in, is a string. entity = str(entity_url) if is_local_charm(entity): if entity.startswith("local:"): entity = entity[6:] architecture = await self._resolve_architecture() schema = Schema.LOCAL else: if client.CharmsFacade.best_facade_version(self.connection()) < 3: url = URL.parse(entity, default_store=Schema.CHARM_STORE) else: url = URL.parse(entity) entity = str(url) architecture = await self._resolve_architecture(url) schema = url.schema name = url.name if schema not in self.deploy_types: raise JujuError("unknown deploy type {}, expected charmhub or local".format(schema)) model_conf = await self.get_config() res = await self.deploy_types[schema].resolve(entity, architecture, application_name, channel, series, revision, entity_url, force, model_conf) if res.identifier is None: raise JujuError('unknown charm or bundle {}'.format(entity_url)) identifier = res.identifier charm_series = series charm_origin = res.origin if base: charm_origin.base = utils.parse_base_arg(base) server_side_deploy = False if res.is_bundle: handler = BundleHandler(self, trusted=trust, forced=force) await handler.fetch_plan(entity, charm_origin, overlays=overlays) await handler.execute_plan() extant_apps = {app for app in self.applications} pending_apps = handler.applications - extant_apps if pending_apps: # new apps will usually be in the model by now, but if some # haven't made it yet we'll need to wait on them to be added await jasyncio.gather(*[ jasyncio.ensure_future( self._wait_for_new('application', app_name)) for app_name in pending_apps ]) return [app for name, app in self.applications.items() if name in handler.applications] else: if overlays: raise JujuError("options provided but not supported when deploying a charm: overlays=%s" % overlays) # XXX: we're dropping local resources here, but we don't # actually support them yet anyway if not res.is_local: add_charm_res = await self._add_charm(identifier, charm_origin) if isinstance(add_charm_res, dict): # This is for backwards compatibility for older # versions where AddCharm returns a dictionary charm_origin = add_charm_res.get('charm_origin', charm_origin) else: charm_origin = add_charm_res.charm_origin if Schema.CHARM_HUB.matches(schema): if client.ApplicationFacade.best_facade_version(self.connection()) >= 19: server_side_deploy = True else: # TODO (cderici): this is an awkward workaround for basically not calling # the AddPendingResources in case this is a server side deploy. # If that's the case, then the store resources (and revisioned local # resources) are handled at the server side if this is a server side deploy # (local uploads are handled right after we get the pendingIDs returned # from the facade call). resources = await self._add_charmhub_resources(res.app_name, identifier, add_charm_res.charm_origin) is_sub = await self.charmhub.is_subordinate(name) if is_sub: if num_units > 1: raise JujuError("cannot use num_units with subordinate application") num_units = 0 else: # We have a local charm dir that needs to be uploaded charm_dir = os.path.abspath(os.path.expanduser(identifier)) metadata = utils.get_local_charm_metadata(charm_dir) charm_series = charm_series or await get_charm_series(metadata, self) base = utils.get_local_charm_base(charm_series, charm_dir, client.Base) charm_origin.base = base if not application_name: application_name = metadata['name'] if not application_name: application_name = metadata['name'] if base is None and charm_series is None: raise JujuError( "Either series or base is needed to deploy the " "charm at {}. ".format(charm_dir)) identifier = await self.add_local_charm_dir(charm_dir, charm_series) resources = await self.add_local_resources(application_name, identifier, metadata, resources=resources) if config is None: config = {} if trust: config["trust"] = True return await self._deploy( charm_url=identifier, application=res.app_name, series=charm_series, config=config, constraints=constraints, endpoint_bindings=bind, resources=resources, storage=storage, channel=channel, num_units=num_units, placement=parse_placement(to), devices=devices, charm_origin=charm_origin, attach_storage=attach_storage, force=force, server_side_deploy=server_side_deploy, )
async def _add_charm(self, charm_url, origin): """_add_charm sends the given origin and the url to the Juju API too add the charm to the state. Either calls the CharmsFacade.AddCharm for (> version 2), or the ClientFacade.AddCharm (for older versions). :param str charm_url: the url of the charm to be added :param client.CharmOrigin origin: the origin for the charm to be added :returns client.CharmOriginResult """ # client facade is deprecated with in Juju, and smaller, more focused # facades have been created and we'll use that if it's available. charms_cls = client.CharmsFacade if charms_cls.best_facade_version(self.connection()) > 2: charms_facade = charms_cls.from_connection(self.connection()) return await charms_facade.AddCharm(charm_origin=origin, url=charm_url, force=False) client_facade = client.ClientFacade.from_connection(self.connection()) return await client_facade.AddCharm(channel=str(origin.risk), url=charm_url, force=False) async def _resolve_charm(self, url, origin, force=False, series=None, model_config=None): """Calls Charms.ResolveCharms to resolve all the fields of the charm_origin and also the url and the supported_series :param str url: The url of the charm :param client.CharmOrigin origin: The manually constructed origin based on what we know about the charm and the deployment so far Returns the confirmed origin returned by the Juju API to be used in calls like ApplicationFacade.Deploy :returns url.URL, client.CharmOrigin, [str] """ charms_cls = client.CharmsFacade if charms_cls.best_facade_version(self.connection()) < 3: raise JujuError("resolve charm") charms_facade = charms_cls.from_connection(self.connection()) # TODO (cderici): following part can be refactored out, since the # origin should be set (including the base) before calling this, # though all tests need to run (in earlier versions too) before # committing to make sure there's no regression source = Source.CHARM_HUB.value resolve_origin = {'source': source, 'architecture': origin.architecture, 'track': origin.track, 'risk': origin.risk, 'base': origin.base, 'revision': origin.revision, } resp = await charms_facade.ResolveCharms(resolve=[{ 'reference': str(url), 'charm-origin': resolve_origin }]) if len(resp.results) != 1: raise JujuError("expected one result, received {}".format(resp.results)) result = resp.results[0] if result.error: raise JujuError(f'resolving {url} : {result.error.message}') # TODO (cderici) : supported_bases supported_series = result.get('supported_series', result.unknown_fields['supported-series']) resolved_origin = result.charm_origin charm_url = URL.parse(result.url) # run the series selector to get a series for the base selected_series = utils.series_selector(series, charm_url, model_config, supported_series, force) result.charm_origin.base = utils.get_base_from_origin_or_channel(resolved_origin, selected_series) charm_url.series = selected_series return charm_url, resolved_origin async def _resolve_architecture(self, url=None): """_resolve_architecture returns the architecture for a given charm url. If the charm url is absent, or doesn't specific an arch, we return the default architecture from the model. :param str url: the client.URL to determine the arch for :returns str architecture for the given url """ if url is not None and url.architecture: return url.architecture constraints = await self.get_constraints() if 'arch' in constraints: return constraints['arch'] return DEFAULT_ARCHITECTURE async def _add_charmhub_resources(self, application, entity_url, origin, overrides=None): """_add_charmhub_resources is called by the deploy to add pending resources requested by the charm being deployed. It calls the ResourcesFacade.AddPendingResources. :param str application: the name of the application :param client.CharmURL entity_url: url for the charm that we add resources for :param client.CharmOrigin origin: origin for the charm that we add resources for :returns [string]string resource_map that is a map of resources to their assigned pendingIDs. """ charm_facade = client.CharmsFacade.from_connection(self.connection()) res = await charm_facade.CharmInfo(entity_url) resources = [] for resource in res.meta.resources.values(): resources.append({ 'description': resource.description, 'name': resource.name, 'path': resource.path, 'type_': resource.type_, 'origin': 'store', 'revision': -1, }) if overrides: names = {r['name'] for r in resources} unknown = overrides.keys() - names if unknown: raise JujuError('Unrecognized resource{}: {}'.format( 's' if len(unknown) > 1 else '', ', '.join(unknown))) for resource in resources: if resource['name'] in overrides: resource['revision'] = overrides[resource['name']] resources_facade = client.ResourcesFacade.from_connection( self.connection()) response = await resources_facade.AddPendingResources( application_tag=tag.application(application), charm_url=entity_url, charm_origin=origin, resources=[client.CharmResource(**resource) for resource in resources], ) # response.pending_ids always exists but it can itself be None # see juju/client/_definitions.py for AddPendingResourcesResult resource_map = {resource['name']: pid for resource, pid in zip(resources, response.pending_ids or {})} return resource_map
[docs] async def add_local_resources(self, application, entity_url, metadata, resources): """_add_local_resources is called by the deploy to add pending local resources requested by the charm being deployed. It calls the ResourcesFacade.AddPendingResources. After getting the pending IDs from the controller it sends an HTTP PUT request to actually upload local resources. :param str application: the name of the application :param client.CharmURL entity_url: url for the charm that we add resources for :param [string]string metadata: metadata for the charm that we add resources for :param [string] resources: the paths for the local files (or oci-images) to be added as local resources :returns [string]string resource_map that is a map of resources to their assigned pendingIDs. """ if not resources: return None resource_map = dict() for name, path in resources.items(): resource_type = metadata["resources"][name]["type"] if resource_type not in {"oci-image", "file"}: log.info("Resource {} of type {} is not supported".format(name, resource_type)) continue charmresource = { 'description': '', 'fingerprint': '', 'name': name, 'path': Path(path).name, 'revision': 0, 'size': 0, 'type_': resource_type, 'origin': 'upload', } resources_facade = client.ResourcesFacade.from_connection( self.connection()) response = await resources_facade.AddPendingResources( application_tag=tag.application(application), charm_url=entity_url, resources=[client.CharmResource(**charmresource)]) pending_id = response.pending_ids[0] resource_map[name] = pending_id if resource_type == "oci-image": # TODO Docker Image validation and support for local images. docker_image_details = { 'registrypath': path, 'username': '', 'password': '', } data = yaml.dump(docker_image_details) else: p = Path(path) data = p.read_text() if p.exists() else '' self._upload(data, path, application, name, resource_type, pending_id) return resource_map
def _upload(self, data, path, app_name, res_name, res_type, pending_id): conn, headers, path_prefix = self.connection().https_connection() query = "?pendingid={}".format(pending_id) url = "{}/applications/{}/resources/{}{}".format(path_prefix, app_name, res_name, query) if res_type == "oci-image": disp = "multipart/form-data; filename=\"{}\"".format(path) else: disp = "form-data; filename=\"{}\"".format(path) headers['Content-Type'] = 'application/octet-stream' headers['Content-Length'] = len(data) headers['Content-Sha384'] = hashlib.sha384(bytes(data, 'utf-8')).hexdigest() headers['Content-Disposition'] = disp conn.request('PUT', url, data, headers) response = conn.getresponse() result = response.read().decode() if not response.status == 200: raise JujuError(result) async def _deploy(self, charm_url, application, series, config, constraints, endpoint_bindings, resources, storage, channel=None, num_units=None, placement=None, devices=None, charm_origin=None, attach_storage=[], force=False, server_side_deploy=False): """Logic shared between `Model.deploy` and `BundleHandler.deploy`. """ log.info('Deploying %s', charm_url) trust = config.get('trust', False) # stringify all config values for API, and convert to YAML config = {k: str(v) for k, v in config.items()} config = yaml.dump({application: config}, default_flow_style=False) app_facade = client.ApplicationFacade.from_connection(self.connection()) if server_side_deploy: # Call DeployFromRepository app = client.DeployFromRepositoryArg( applicationname=application, attachstorage=attach_storage, charmname=charm_url, configyaml=config, cons=parse_constraints(constraints), devices=devices, dryrun=False, placement=placement, storage=storage, trust=trust, base=charm_origin.base, channel=channel, endpoint_bindings=endpoint_bindings, force=force, num_units=num_units, resources=resources, revision=charm_origin.revision, ) result = await app_facade.DeployFromRepository([app]) # Collect the errors errors = [] for r in result.results: if r.errors: errors.extend([e.message for e in r.errors]) # Upload pending local resources if any for _result in result.results: for pending_upload_resource in getattr(_result, 'pendingresourceuploads', []): _path = pending_upload_resource.filename p = Path(_path) data = p.read_text() if p.exists() else '' self._upload(data, _path, application, pending_upload_resource.name, 'file', '') else: app = client.ApplicationDeploy( charm_url=charm_url, application=application, series=series, channel=channel, charm_origin=charm_origin, config_yaml=config, constraints=parse_constraints(constraints), endpoint_bindings=endpoint_bindings, num_units=num_units, resources=resources, storage=storage, placement=placement, devices=devices, attach_storage=attach_storage, force=force, ) result = await app_facade.Deploy(applications=[app]) errors = [r.error.message for r in result.results if r.error] if errors: raise JujuError('\n'.join(errors)) return await self._wait_for_new('application', application)
[docs] async def destroy_unit(self, unit_id, destroy_storage=False, dry_run=False, force=False, max_wait=None): """Destroy units by name. """ return await self.destroy_units(unit_id, destroy_storage=destroy_storage, dry_run=dry_run, force=force, max_wait=max_wait )
[docs] async def destroy_units(self, *unit_names, destroy_storage=False, dry_run=False, force=False, max_wait=None): """Destroy several units at once. """ connection = self.connection() app_facade = client.ApplicationFacade.from_connection(connection) units_to_destroy = [] for unit_id in unit_names: unit_tag = tag.unit(unit_id) if unit_tag is None: log.error("Error converting %s to a valid unit tag", unit_id) raise JujuUnitError("Error converting %s to a valid unit tag", unit_id) units_to_destroy.append({ 'unit-tag': unit_tag, 'destroy-storage': destroy_storage, 'force': force, 'max-wait': max_wait, 'dry-run': dry_run, }) log.debug('Destroying units %s', unit_names) return await app_facade.DestroyUnit(units=units_to_destroy)
[docs] def download_backup(self, archive_id, target_filename=None): """Download a backup archive file. :param str archive_id: The id of the archive to download :param str (optional) target_filename: A custom name for the target file :return str: Path to the archive file """ conn, headers, path_prefix = self.connection().https_connection() path = "%s/backups" % path_prefix headers['Content-Type'] = 'application/json' args = {'id': archive_id} conn.request("GET", path, json.dumps(args, indent=2), headers=headers) response = conn.getresponse() result = response.read() if not response.status == 200: raise JujuBackupError("unable to download the backup ID : %s -- got : %s from the JujuAPI with a HTTP response code : %s" % (archive_id, result, response.status)) if target_filename: file_name = target_filename else: # check if archive_id is a filename if re.match(r'.*\.tar\.gz', archive_id): # if so, use the same ID generated & sent by the Juju API archive_id = re.compile('[0-9]+').findall(archive_id)[0] file_name = "juju-backup-%s.tar.gz" % archive_id with open(str(file_name), 'wb') as f: try: f.write(result) except (OSError, IOError) as e: raise JujuBackupError("backup ID : %s was fetched, but : %s" % (archive_id, e)) log.info("Backup archive downloaded in : %s" % file_name) return file_name
[docs] async def get_config(self): """Return the configuration settings for this model. :returns: A ``dict`` mapping keys to `ConfigValue` instances, which have `source` and `value` attributes. """ config_facade = client.ModelConfigFacade.from_connection( self.connection() ) result = await config_facade.ModelGet() config = result.config for key, value in config.items(): config[key] = client.ConfigValue.from_json(value) return config
[docs] async def get_constraints(self): """Return the machine constraints for this model. :returns: A ``dict`` of constraints. """ constraints = {} facade_cls = client.ModelConfigFacade facade = facade_cls.from_connection(self.connection()) result = await facade.GetModelConstraints() # GetModelConstraints returns GetConstraintsResults which has a # 'constraints' attribute. If no constraints have been set # GetConstraintsResults.constraints is None. Otherwise # GetConstraintsResults.constraints has an attribute for each possible # constraint, each of these in turn will be None if they have not been # set. if result.constraints: constraint_types = [a for a in dir(result.constraints) if a in client.Value._toSchema.keys()] for constraint in constraint_types: value = getattr(result.constraints, constraint) if value is not None: constraints[constraint] = getattr(result.constraints, constraint) return constraints
[docs] async def get_machines(self): """Return list of machines in this model. """ return list(self.state.machines.keys())
[docs] async def get_spaces(self): """Return list of all known spaces, including associated subnets. Returns a List of :class:`~juju._definitions.Space` instances. """ space_facade = client.SpacesFacade.from_connection(self.connection()) response = await space_facade.ListSpaces() return response.results
[docs] async def get_ssh_key(self, raw_ssh=False): """Return known SSH keys for this model. :param bool raw_ssh: if True, returns the raw ssh key, else it's fingerprint """ key_facade = client.KeyManagerFacade.from_connection(self.connection()) entity = {'tag': tag.model(self.info.uuid)} entities = client.Entities([entity]) return await key_facade.ListKeys(entities=entities, mode=raw_ssh)
get_ssh_keys = get_ssh_key
[docs] async def remove_backup(self, backup_id): """Delete a backup. :param str backup_id: The id of the backup to remove """ backups_facade = client.BackupsFacade.from_connection(self.connection()) return await backups_facade.Remove([backup_id])
[docs] async def remove_backups(self, backup_ids): """Delete the given backups. :param [str] backup_ids: The list of ids of the backups to remove """ backups_facade = client.BackupsFacade.from_connection(self.connection()) return await backups_facade.Remove(backup_ids)
[docs] async def remove_ssh_key(self, user, key): """Remove a public SSH key(s) from this model. :param str key: Full ssh key :param str user: Juju user to which the key is registered """ key_facade = client.KeyManagerFacade.from_connection(self.connection()) key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii'))) key = hashlib.md5(key).hexdigest() key = ':'.join(a + b for a, b in zip(key[::2], key[1::2])) await key_facade.DeleteKeys(ssh_keys=[key], user=user)
remove_ssh_keys = remove_ssh_key
[docs] def restore_backup( self, backup_id, bootstrap=False, constraints=None, archive=None, upload_tools=False): """Restore a backup archive to a new controller. :param str backup_id: Id of backup to restore :param bool bootstrap: Bootstrap a new state machine :param constraints: Model constraints :type constraints: :class:`juju.Constraints` :param str archive: Path to backup archive to restore :param bool upload_tools: Upload tools if bootstrapping a new machine """ raise DeprecationWarning("juju restore-backup is deprecated in favor of the stand-alone 'juju-restore' tool: https://github.com/juju/juju-restore")
[docs] async def set_config(self, config): """Set configuration keys on this model. :param dict config: Mapping of config keys to either string values or `ConfigValue` instances, as returned by `get_config`. """ config_facade = client.ModelConfigFacade.from_connection( self.connection() ) new_conf = {} for key, value in config.items(): if isinstance(value, client.ConfigValue): new_conf[key] = value.value elif isinstance(value, str): new_conf[key] = value else: raise JujuModelConfigError("Expected either a string or a ConfigValue as a config value, found : %s of type %s" % (value, type(value))) await config_facade.ModelSet(config=new_conf)
[docs] async def set_constraints(self, constraints): """Set machine constraints on this model. :param dict config: Mapping of model constraints """ facade_cls = client.ModelConfigFacade facade = facade_cls.from_connection(self.connection()) await facade.SetModelConstraints( application='', constraints=constraints)
[docs] async def get_action_output(self, action_uuid, wait=None): """ Get the results of an action by ID. :param str action_uuid: Id of the action :param int wait: Time in seconds to wait for action to complete. :return dict: Output from action :raises: :class:`JujuError` if invalid action_uuid """ action = await self._get_completed_action(action_uuid, wait=wait) # ActionResult.output is None if the action produced no output return {} if action.output is None else action.output
async def _get_completed_action(self, action_uuid, wait=None): """Get the completed internal _definitions.Action object. :param str action_uuid: Id of the action :param int wait: Time in seconds to wait for action to complete. :return dict: Output from action :raises: :class:`JujuError` if invalid action_uuid """ action_facade = client.ActionFacade.from_connection( self.connection() ) entity = [{'tag': tag.action(action_uuid)}] # Cannot use self.wait_for_action as the action event has probably # already happened and self.wait_for_action works by processing # model deltas and checking if they match our type. If the action # has already occured then the delta has gone. async def _wait_for_action_status(): while True: action_output = await action_facade.Actions(entities=entity) if action_output.results[0].status in ('completed', 'failed'): return else: await jasyncio.sleep(1) await jasyncio.wait_for( _wait_for_action_status(), timeout=wait) action_results = await action_facade.Actions(entities=entity) return action_results.results[0]
[docs] async def get_action_status(self, uuid_or_prefix=None, name=None): """Get the status of all actions, filtered by ID, ID prefix, or name. :param str uuid_or_prefix: Filter by action uuid or prefix :param str name: Filter by action name """ results = {} action_results = [] action_facade = client.ActionFacade.from_connection( self.connection() ) if name: name_results = await action_facade.FindActionsByNames(names=[name]) action_results.extend(name_results.actions[0].actions) if uuid_or_prefix: # Collect list of actions matching uuid or prefix matching_actions = await action_facade.FindActionTagsByPrefix( prefixes=[uuid_or_prefix]) entities = [] for actions in matching_actions.matches.values(): entities = [{'tag': a.tag} for a in actions] # Get action results matching action tags uuid_results = await action_facade.Actions(entities=entities) action_results.extend(uuid_results.results) for a in action_results: results[tag.untag('action-', a.action.tag)] = a.status return results
[docs] async def get_status(self, filters=None, utc=False): """Return the status of the model. :param str filters: Optional list of applications, units, or machines to include, which can use wildcards ('*'). :param bool utc: Display time as UTC in RFC3339 format """ client_facade = client.ClientFacade.from_connection(self.connection()) return await client_facade.FullStatus(patterns=filters)
[docs] async def get_metrics(self, *tags): """Retrieve metrics. :param str *tags: Tags of entities from which to retrieve metrics. No tags retrieves the metrics of all units in the model. :return: Dictionary of unit_name:metrics """ log.debug("Retrieving metrics for %s", ', '.join(tags) if tags else "all units") metrics_facade = client.MetricsDebugFacade.from_connection( self.connection()) entities = [client.Entity(tag) for tag in tags] metrics_result = await metrics_facade.GetMetrics(entities=entities) metrics = collections.defaultdict(list) for entity_metrics in metrics_result.results: error = entity_metrics.error if error: if "is not a valid tag" in error: raise ValueError(error.message) else: raise Exception(error.message) for metric in entity_metrics.metrics: metrics[metric.unit].append(vars(metric)) return metrics
[docs] async def create_offer(self, endpoint, offer_name=None, application_name=None): """ Offer a deployed application using a series of endpoints for use by consumers. @param endpoint: holds the application and endpoint you want to offer @param offer_name: over ride the offer name to help the consumer """ async with ConnectedController(self.connection()) as controller: return await controller.create_offer(self.info.uuid, endpoint, offer_name=offer_name, application_name=application_name)
[docs] async def list_offers(self): """ Offers list information about applications' endpoints that have been shared and who is connected. """ async with ConnectedController(self.connection()) as controller: return await controller.list_offers(self.name)
[docs] async def remove_offer(self, endpoint, force=False): """ Remove offer for an application. Offers will also remove relations to those offers, use force to do so, without an error. """ async with ConnectedController(self.connection()) as controller: return await controller.remove_offer(self.info.uuid, endpoint, force)
[docs] async def consume(self, endpoint, application_alias="", controller_name=None, controller=None): """ Adds a remote offer to the model. Relations can be created later using "juju relate". If consuming a relation from a model on different controller the controller name must be included in the endpoint. The controller_name argument is being deprecated. """ if controller and controller_name: raise JujuError("cannot set both controller_name and controller") try: offer = parse_offer_url(endpoint) except OfferParseError as e: log.error(e.message) raise if offer.has_endpoint(): raise JujuError("remote offer {} should not include an endpoint".format(endpoint)) if offer.user == "": offer.user = self.info.username endpoint = offer.string() if controller_name: log.warning( 'controller_name argument will soon be deprecated, controller ' 'should be specified in endpoint url') if offer.source == "": offer.source = controller_name if offer.source: source = await self._get_source_api(offer) else: if controller: source = controller else: source = Controller() kwargs = self.connection().connect_params() kwargs["uuid"] = None await source._connect_direct(**kwargs) consume_details = await source.get_consume_details(offer.as_local().string()) # Only disconnect when the controller object has been created within # with function We don't want to disconnect the object passed by the # user in the controller argument if not controller: await source.disconnect() if consume_details is None or consume_details.offer is None: raise JujuAPIError("missing consuming offer url for {}".format(offer.string())) offer_url = parse_offer_url(consume_details.offer.offer_url) offer_url.source = offer.source consume_details.offer.offer_url = offer_url.string() consume_details.offer.application_alias = application_alias arg = _create_consume_args(consume_details.offer, consume_details.macaroon, consume_details.external_controller) facade = client.ApplicationFacade.from_connection(self.connection()) results = await facade.Consume(args=[arg]) if len(results.results) != 1: raise JujuAPIError("expected 1 result, recieved {}".format(len(results.results))) if results.results[0].error is not None: raise JujuAPIError(results.results[0].error) local_name = offer_url.application if application_alias != "": local_name = application_alias return local_name
[docs] async def remove_saas(self, name): """ Removing a consumed (SAAS) application will terminate any relations that application has, potentially leaving any related local applications in a non-functional state. """ if not is_valid_application(name): raise JujuError("invalid SAAS application name {}".format(name)) arg = client.DestroyConsumedApplicationParams() arg.application_tag = application_tag(name) facade = client.ApplicationFacade.from_connection(self.connection()) return await facade.DestroyConsumedApplications(applications=[arg])
[docs] async def export_bundle(self, filename=None): """ Exports the current model configuration as a reusable bundle. """ facade = client.BundleFacade.from_connection(self.connection()) result = await facade.ExportBundle() if result.error is not None: raise JujuAPIError(result.error) if filename is None: return result.result try: with open(str(filename), "w") as file: file.write(result.result) except IOError: raise
[docs] async def add_secret(self, name, data_args, file="", info=""): """Adds a secret with a list of key values Equivalent to the cli command: juju add-secret [options] <name> [key[#base64|#file]=value...] :param name str: The name of the secret to be added. :param data_args []str: The key value pairs to be added into the secret. :param file str: A path to a yaml file containing secret key values. :param info str: The secret description. """ data = create_secret_data(data_args) if file: data_from_file = read_secret_data(file) for k, v in data_from_file.items(): # Caution: key/value pairs in files overwrite the ones in the args. data[k] = v if client.SecretsFacade.best_facade_version(self.connection()) < 2: raise JujuNotSupportedError("user secrets") secretsFacade = client.SecretsFacade.from_connection(self.connection()) results = await secretsFacade.CreateSecrets([{ 'content': {'data': data}, 'description': info, 'label': name, }]) if len(results.results) != 1: raise JujuAPIError(f"expected 1 result, got {len(results.results)}") result = results.results[0] if result.error is not None: raise JujuAPIError(result.error.message) return result.result
[docs] async def update_secret(self, name, data_args=[], new_name="", file="", info=""): """Update a secret with a list of key values, or info. Equivalent to the cli command: juju add-secret [options] <name> [key[#base64|#file]=value...] :param name str: The name of the secret to be added. :param data_args []str: The key value pairs to be added into the secret. :param file str: A path to a yaml file containing secret key values. :param info str: The secret description. """ data = create_secret_data(data_args) if file: data_from_file = read_secret_data(file) for k, v in data_from_file.items(): # Caution: key/value pairs in files overwrite the ones in the args. data[k] = v if client.SecretsFacade.best_facade_version(self.connection()) < 2: raise JujuNotSupportedError("user secrets") secretsFacade = client.SecretsFacade.from_connection(self.connection()) results = await secretsFacade.UpdateSecrets([{ 'content': {'data': data}, 'description': info, 'existing-label': name, 'label': new_name, }]) if len(results.results) != 1: raise JujuAPIError(f"expected 1 result, got {len(results.results)}") result_error = results.results[0] if result_error.error is not None: raise JujuAPIError(result_error.error)
[docs] async def list_secrets(self, filter="", show_secrets=False): """ Returns the list of available secrets. """ facade = client.SecretsFacade.from_connection(self.connection()) results = await facade.ListSecrets({ 'filter': filter, 'show-secrets': show_secrets, }) return results.results
[docs] async def remove_secret(self, secret_name, revision=-1): """Remove an existing secret. :param secret_name str: ID|name of the secret to remove. :param revision int: remove the specified revision. """ if client.SecretsFacade.best_facade_version(self.connection()) < 2: raise JujuNotSupportedError("user secrets") remove_secret_arg = { 'label': secret_name, } if revision >= 0: remove_secret_arg['revisions'] = [revision] secretsFacade = client.SecretsFacade.from_connection(self.connection()) results = await secretsFacade.RemoveSecrets([remove_secret_arg]) if len(results.results) != 1: raise JujuAPIError(f"expected 1 result, got {len(results.results)}") result_error = results.results[0] if result_error.error is not None: raise JujuAPIError(result_error.error)
[docs] async def grant_secret(self, secret_name, application, *applications): """Grants access to a secret to the specified applications. :param secret_name str: ID|name of the secret. :param application str: name of an application for which the access is granted :param applications []str: names of more applications to associate the secret with """ if client.SecretsFacade.best_facade_version(self.connection()) < 2: raise JujuNotSupportedError("user secrets") secretsFacade = client.SecretsFacade.from_connection(self.connection()) results = await secretsFacade.GrantSecret( applications=[application] + list(applications), label=secret_name) if len(results.results) != 1: raise JujuAPIError(f"expected 1 result, got {len(results.results)}") result_error = results.results[0] if result_error.error is not None: raise JujuAPIError(result_error.error)
[docs] async def revoke_secret(self, secret_name, application, *applications): """Revoke access to a secret. Revoke applications' access to view the value of a specified secret. :param secret_name str: ID|name of the secret. :param application str: name of an application for which the access to the secret is revoked :param applications []str: names of more applications to disassociate the secret with """ if client.SecretsFacade.best_facade_version(self.connection()) < 2: raise JujuNotSupportedError("user secrets") secretsFacade = client.SecretsFacade.from_connection(self.connection()) results = await secretsFacade.RevokeSecret( applications=[application] + list(applications), label=secret_name) if len(results.results) != 1: raise JujuAPIError(f"expected 1 result, got {len(results.results)}") result_error = results.results[0] if result_error.error is not None: raise JujuAPIError(result_error.error)
async def _get_source_api(self, url): controller = Controller() if url.has_empty_source(): async with ConnectedController(self.connection()) as current: if current.controller_name is not None: controller_name = current.controller_name else: controller_name = url.source await controller.connect(controller_name=controller_name) return controller
[docs] async def wait_for_idle(self, apps=None, raise_on_error=True, raise_on_blocked=False, wait_for_active=False, timeout=10 * 60, idle_period=15, check_freq=0.5, status=None, wait_for_at_least_units=None, wait_for_exact_units=None): """Wait for applications in the model to settle into an idle state. :param List[str] apps: Optional list of specific app names to wait on. If given, all apps must be present in the model and idle, while other apps in the model can still be busy. If not given, all apps currently in the model must be idle. :param bool raise_on_error: If True, then any unit or app going into "error" status immediately raises either a JujuAppError or a JujuUnitError. Note that machine or agent failures will always raise an exception (either JujuMachineError or JujuAgentError), regardless of this param. The default is True. :param bool raise_on_blocked: If True, then any unit or app going into "blocked" status immediately raises either a JujuAppError or a JujuUnitError. The default is False. :param bool wait_for_active: If True, then also wait for all unit workload statuses to be "active" as well. The default is False. :param float timeout: How long to wait, in seconds, for the bundle settles before raising an asyncio.TimeoutError. If None, will wait forever. The default is 10 minutes. :param float idle_period: How long, in seconds, the agent statuses of all units of all apps need to be `idle`. This delay is used to ensure that any pending hooks have a chance to start to avoid false positives. The default is 15 seconds. :param float check_freq: How frequently, in seconds, to check the model. The default is every half-second. :param str status: The status to wait for. If None, not waiting. The default is None (not waiting for any status). :param int wait_for_at_least_units: The least number of units to go into the idle state. wait_for_idle will return after that many units are available (across all the given applications). The default is 1 unit. :param int wait_for_exact_units: The exact number of units to be expected before going into the idle state. (e.g. useful for scaling down). When set, takes precedence over the `wait_for_units` parameter. """ if wait_for_active: warnings.warn("wait_for_active is deprecated; use status", DeprecationWarning) status = "active" _wait_for_units = wait_for_at_least_units if wait_for_at_least_units is not None else 1 timeout = timedelta(seconds=timeout) if timeout is not None else None idle_period = timedelta(seconds=idle_period) start_time = datetime.now() # Type check against the common error of passing a str for apps if apps is not None and (not isinstance(apps, list) or any(not isinstance(o, str) for o in apps)): raise JujuError(f'Expected a List[str] for apps, given {apps}') apps = apps or self.applications idle_times = {} units_ready = set() # The units that are in the desired state last_log_time = None log_interval = timedelta(seconds=30) def _raise_for_status(entities, status): if not entities: return for entity_name, error_type in (("Machine", JujuMachineError), ("Agent", JujuAgentError), ("Unit", JujuUnitError), ("App", JujuAppError)): errored = entities.get(entity_name, []) if not errored: continue raise error_type("{}{} in {}: {}".format( entity_name, "s" if len(errored) > 1 else "", status, ", ".join(errored), )) if wait_for_exact_units is not None: assert isinstance(wait_for_exact_units, int) and wait_for_exact_units >= 0, \ 'Invalid value for wait_for_exact_units : %s' % wait_for_exact_units while True: # The list 'busy' is what keeps this loop going, # i.e. it'll stop when busy is empty after all the # units are scanned busy = [] errors = {} blocks = {} for app_name in apps: if app_name not in self.applications: busy.append(app_name + " (missing)") continue app = self.applications[app_name] app_status = await app.get_status() if raise_on_error and app_status == "error": errors.setdefault("App", []).append(app.name) if raise_on_blocked and app_status == "blocked": blocks.setdefault("App", []).append(app.name) # Check if wait_for_exact_units flag is used if wait_for_exact_units is not None: if len(app.units) != wait_for_exact_units: busy.append(app.name + " (waiting for exactly %s units, current : %s)" % (wait_for_exact_units, len(app.units))) continue # If we have less # of units then required, then wait a bit more elif len(app.units) < _wait_for_units: busy.append(app.name + " (not enough units yet - %s/%s)" % (len(app.units), _wait_for_units)) continue # User is waiting for at least a certain # of units, and we have enough elif wait_for_at_least_units and len(units_ready) >= _wait_for_units: # So no need to keep looking, we have the desired number of units ready to go, # exit the loop. Don't just return here, though, we might still have some # errors to raise at the end break for unit in app.units: if raise_on_error and unit.machine is not None and unit.machine.status == "error": errors.setdefault("Machine", []).append(unit.machine.id) continue if raise_on_error and unit.agent_status == "error": errors.setdefault("Agent", []).append(unit.name) continue if raise_on_error and unit.workload_status == "error": errors.setdefault("Unit", []).append(unit.name) continue if raise_on_blocked and unit.workload_status == "blocked": blocks.setdefault("Unit", []).append(unit.name) continue # TODO (cderici): we need two versions of wait_for_idle, one for waiting on # individual units, another one for waiting for an application. # The convoluted logic below is the result of trying to do both at the same # time need_to_wait_more_for_a_particular_status = status and (unit.workload_status != status) app_is_in_desired_status = (not status) or (app_status == status) if not need_to_wait_more_for_a_particular_status and \ unit.agent_status == "idle" and \ (wait_for_at_least_units or app_is_in_desired_status): # A unit is ready if either: # 1) Don't need to wait more for a particular status and the agent is "idle" # 2) We're looking for a particular status and the unit's workload, # as well as the application, is in that status. If the user wants to # see only a particular number of units in that state -- i.e. a subset of # the units is needed, then we don't care about the application status # (because e.g. app can be in 'waiting' while unit.0 is 'active' and unit.1 # is 'waiting') # Either way, the unit is ready, start measuring the time period that # it needs to stay in that state (i.e. idle_period) units_ready.add(unit.name) now = datetime.now() idle_start = idle_times.setdefault(unit.name, now) if now - idle_start < idle_period: busy.append("{} [{}] {}: {}".format(unit.name, unit.agent_status, unit.workload_status, unit.workload_status_message)) else: idle_times.pop(unit.name, None) busy.append("{} [{}] {}: {}".format(unit.name, unit.agent_status, unit.workload_status, unit.workload_status_message)) _raise_for_status(errors, "error") _raise_for_status(blocks, "blocked") if not busy: break busy = "\n ".join(busy) if timeout is not None and datetime.now() - start_time > timeout: raise jasyncio.TimeoutError("Timed out waiting for model:\n" + busy) if last_log_time is None or datetime.now() - last_log_time > log_interval: log.info("Waiting for model:\n " + busy) last_log_time = datetime.now() await jasyncio.sleep(check_freq)
def _create_consume_args(offer, macaroon, controller_info): """ Convert a typed object that has been normalised to a overrided typed definition. @param offer: takes an offer and serialises it into a valid type @param macaroon: takes a macaroon and serialises it into a valid type @param controller_info: takes a controller information and serialises it into a valid type. """ endpoints = [] for ep in offer.endpoints: endpoints.append(client.RemoteEndpoint(interface=ep.interface, limit=ep.limit, name=ep.name, role=ep.role)) users = [] for u in offer.users: users.append(client.OfferUserDetails(access=u.access, display_name=u.display_name, user=u.user)) external_controller = client.ExternalControllerInfo(addrs=controller_info.addrs, ca_cert=controller_info.ca_cert, controller_alias=controller_info.controller_alias, controller_tag=controller_info.controller_tag) caveats = [] for c in macaroon.unknown_fields["caveats"]: caveats.append(Caveat(cid=c["cid"])) macaroon = Macaroon(signature=macaroon.unknown_fields["signature"], caveats=caveats, location=macaroon.unknown_fields["location"], identifier=macaroon.unknown_fields["identifier"]) arg = client.ConsumeApplicationArg() arg.application_description = offer.application_description arg.endpoints = endpoints arg.offer_name = offer.offer_name arg.offer_url = offer.offer_url arg.offer_uuid = offer.offer_uuid arg.source_model_tag = offer.source_model_tag arg.users = users arg.application_alias = offer.application_alias arg.external_controller = external_controller arg.macaroon = macaroon return arg
[docs]class CharmArchiveGenerator: """ Create a Zip archive of a local charm directory for upload to a controller. This is used automatically by `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_. """ def __init__(self, path): self.path = os.path.abspath(os.path.expanduser(path))
[docs] def make_archive(self, path): """Create archive of directory and write to ``path``. :param path: Path to archive Ignored:: * build/* - This is used for packing the charm itself and any similar tasks. * */.* - Hidden files are all ignored for now. This will most likely be changed into a specific ignore list (.bzr, etc) """ zf = zipfile.ZipFile(str(path), 'w', zipfile.ZIP_DEFLATED) for dirpath, dirnames, filenames in os.walk(self.path): relative_path = dirpath[len(self.path) + 1:] if relative_path and not self._ignore(relative_path): zf.write(dirpath, relative_path) for dirname in dirnames: archive_name = os.path.join(relative_path, dirname) real_path = os.path.join(dirpath, dirname) if os.path.islink(real_path): self._check_link(real_path) self._write_symlink(zf, os.readlink(real_path), archive_name) for name in filenames: archive_name = os.path.join(relative_path, name) if not self._ignore(archive_name): real_path = os.path.join(dirpath, name) self._check_type(real_path) if os.path.islink(real_path): self._check_link(real_path) self._write_symlink( zf, os.readlink(real_path), archive_name) else: zf.write(real_path, archive_name) zf.close() return path
def _check_type(self, path): """Check the path """ s = os.stat(str(path)) if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode): return path raise ValueError("Invalid Charm at %s %s" % ( path, "Invalid file type for a charm")) def _check_link(self, path): link_path = os.readlink(path) if link_path[0] == "/": raise ValueError( "Invalid Charm at %s: %s" % ( path, "Absolute links are invalid")) path_dir = os.path.dirname(path) link_path = os.path.join(path_dir, link_path) if not link_path.startswith(os.path.abspath(self.path)): raise ValueError( "Invalid charm at %s %s" % ( path, "Only internal symlinks are allowed")) def _write_symlink(self, zf, link_target, link_path): """Package symlinks with appropriate zipfile metadata.""" info = zipfile.ZipInfo() info.filename = link_path info.create_system = 3 # Magic code for symlinks / py2/3 compat # 27166663808 = (stat.S_IFLNK | 0755) << 16 info.external_attr = 2716663808 zf.writestr(info, link_target) def _ignore(self, path): if path == "build" or path.startswith("build/"): return True if path.startswith('.'): return True
[docs]class ModelInfo(ModelEntity): @property def tag(self): return tag.model(self.uuid)
[docs] @staticmethod def type_name_override(): return "model"