Source code for juju.controller

# Copyright 2023 Canonical Ltd.
# Licensed under the Apache V2, see LICENCE file for details.

import json
import logging
from concurrent.futures import CancelledError
from pathlib import Path

import websockets

from . import errors, tag, utils, jasyncio
from .client import client, connector
from .errors import JujuAPIError
from .offerendpoints import ParseError as OfferParseError
from .offerendpoints import parse_offer_endpoint, parse_offer_url
from .user import User

log = logging.getLogger(__name__)


[docs]class RemoveError(Exception): def __init__(self, message): self.message = message
[docs]class Controller: def __init__( self, max_frame_size=None, bakery_client=None, jujudata=None, ): """Instantiate a new Controller. One of the connect_* methods will need to be called before this object can be used for anything interesting. If jujudata is None, jujudata.FileJujuData will be used. :param max_frame_size: See `juju.client.connection.Connection.MAX_FRAME_SIZE` :param bakery_client httpbakery.Client: The bakery client to use for macaroon authorization. :param jujudata JujuData: The source for current controller information. """ self._connector = connector.Connector( max_frame_size=max_frame_size, bakery_client=bakery_client, jujudata=jujudata, ) self._controller_name = None async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc, tb): await self.disconnect()
[docs] async def connect(self, *args, **kwargs): """Connect to a Juju controller. This supports two calling conventions: The controller and (optionally) authentication information can be taken from the data files created by the Juju CLI. This convention will be used if a ``controller_name`` is specified, or if the ``endpoint`` is not. Otherwise, both the ``endpoint`` and authentication information (``username`` and ``password``, or ``bakery_client`` and/or ``macaroons``) are required. If a single positional argument is given, it will be assumed to be the ``controller_name``. Otherwise, the first positional argument, if any, must be the ``endpoint``. Available parameters are: :param str controller_name: Name of controller registered with the Juju CLI. :param str endpoint: The hostname:port of the controller to connect to. :param str username: The username for controller-local users (or None to use macaroon-based login.) :param str password: The password for controller-local users. :param str cacert: The CA certificate of the controller (PEM formatted). :param httpbakery.Client bakery_client: The macaroon bakery client to to use when performing macaroon-based login. Macaroon tokens acquired when logging will be saved to bakery_client.cookies. If this is None, a default bakery_client will be used. :param list macaroons: List of macaroons to load into the ``bakery_client``. :param int max_frame_size: The maximum websocket frame size to allow. :param specified_facades: Overwrite the facades with a series of specified facades. """ await self.disconnect() if 'endpoint' not in kwargs and len(args) < 2: if args and 'model_name' in kwargs: raise TypeError('connect() got multiple values for ' 'controller_name') elif args: controller_name = args[0] else: controller_name = kwargs.pop('controller_name', None) await self._connector.connect_controller(controller_name, **kwargs) else: if 'controller_name' in kwargs: raise TypeError('connect() got values for both ' 'controller_name and endpoint') if args and 'endpoint' in kwargs: raise TypeError('connect() got multiple values for endpoint') has_userpass = (len(args) >= 3 or {'username', 'password'}.issubset(kwargs)) has_macaroons = (len(args) >= 5 or not {'bakery_client', 'macaroons'}.isdisjoint(kwargs)) if not (has_userpass or has_macaroons): raise TypeError('connect() missing auth params') arg_names = [ 'endpoint', 'username', 'password', 'cacert', 'bakery_client', 'macaroons', 'max_frame_size', ] for i, arg in enumerate(args): kwargs[arg_names[i]] = arg if 'endpoint' not in kwargs: raise ValueError('endpoint is required ' 'if controller_name not given') if not ({'username', 'password'}.issubset(kwargs) or {'bakery_client', 'macaroons'}.intersection(kwargs)): raise ValueError('Authentication parameters are required ' 'if controller_name not given') await self._connector.connect(**kwargs) await self.update_endpoints()
[docs] async def update_endpoints(self): try: info = await self.info() self._connector._connection.endpoints = [ (e, info.results[0].cacert) for e in info.results[0].addresses ] except errors.JujuPermissionError: log.warning("This user doesn't have at least read access to the controller model, so endpoints are not updated after connection.") pass
[docs] async def connect_current(self): """ .. deprecated:: 0.7.3 Use :meth:`.connect()` instead. """ return await self.connect()
[docs] async def connect_controller(self, controller_name): """ .. deprecated:: 0.7.3 Use :meth:`.connect(controller_name)` instead. """ return await self.connect(controller_name)
async def _connect_direct(self, **kwargs): await self.disconnect() await self._connector.connect(**kwargs)
[docs] def is_connected(self): """Reports whether the Controller is currently connected.""" return self._connector.is_connected()
[docs] def connection(self): """Return the current Connection object. It raises an exception if the Controller is disconnected""" return self._connector.connection()
@property def controller_name(self): if not self._controller_name: try: self._controller_name = self._connector.jujudata.controller_name_by_endpoint( self._connector.connection().endpoint) except FileNotFoundError: raise errors.PylibjujuError("Unable to determine controller name. controllers.yaml not found.") return self._controller_name @property def controller_uuid(self): return self._connector.controller_uuid @property async def api_endpoints(self): """Get API endpoints :return list string: List of API Endpoints """ info = await self.info() return info.results[0].addresses
[docs] async def disconnect(self): """Shut down the watcher task and close websockets. """ await self._connector.disconnect(entity='controller')
[docs] async def add_credential(self, name=None, credential=None, cloud=None, owner=None, force=False): """Add or update a credential to the controller. :param str name: Name of new credential. If None, the default local credential is used. Name must be provided if a credential is given. :param CloudCredential credential: Credential to add. If not given, it will attempt to read from local data, if available. :param str cloud: Name of cloud to associate the credential with. Defaults to the same cloud as the controller. :param str owner: Username that will own the credential. Defaults to the current user. :param bool force: Force indicates whether the update should be forced. It's only supported for facade v3 or later. Defaults to false. :returns: Name of credential that was uploaded. """ if not cloud: cloud = await self.get_cloud() if not owner: owner = self.connection().info['user-info']['identity'] if credential and not name: raise errors.JujuError('Name must be provided for credential') if not credential: name, credential = self._connector.jujudata.load_credential(cloud, name) if credential is None: raise errors.JujuError( 'Unable to find credential: {}'.format(name)) if credential.auth_type == 'jsonfile' and 'file' in credential.attrs: # file creds have to be loaded before being sent to the controller try: # it might already be JSON json.loads(credential.attrs['file']) except json.JSONDecodeError: # not valid JSON, so maybe it's a file cred_path = Path(credential.attrs['file']) if cred_path.exists(): # make a copy cred_json = credential.to_json() credential = client.CloudCredential.from_json(cred_json) # inline the cred credential.attrs['file'] = cred_path.read_text() log.debug('Uploading credential %s', name) cloud_facade = client.CloudFacade.from_connection(self.connection()) tagged_credentials = [ client.TaggedCredential( tag=tag.credential(cloud, tag.untag('user-', owner), name), credential=credential, )] if cloud_facade.version >= 3: # UpdateCredentials was renamed to UpdateCredentialsCheckModels # in facade version 3. await cloud_facade.UpdateCredentialsCheckModels( credentials=tagged_credentials, force=force, ) else: await cloud_facade.UpdateCredentials(credentials=tagged_credentials) return name
[docs] async def add_cloud(self, name, cloud): """Add a cloud to this controller. :param str name: Name to give the new cloud. :param Cloud cloud: Cloud configuration. :return Cloud: Cloud that was created. """ log.debug('Adding cloud %s', name) cloud_facade = client.CloudFacade.from_connection(self.connection()) await cloud_facade.AddCloud(cloud=cloud, name=name) result = await self.cloud(name=name) return result.cloud
[docs] async def info(self): """Show Controller Info from connection :return ControllerAPIInfoResult """ log.debug('Getting information') uuids = await self.model_uuids() if 'controller' not in uuids: raise errors.JujuPermissionError('Requires access to controller model.') controller_facade = client.ControllerFacade.from_connection(self.connection()) params = [client.Entity(tag.model(uuids["controller"]))] return await controller_facade.ControllerAPIInfoForModels(entities=params)
[docs] async def remove_cloud(self, name): """Remove a cloud from this controller. :param str name: Name of the cloud to remove. """ log.debug('Removing cloud %s', name) cloud_facade = client.CloudFacade.from_connection(self.connection()) await cloud_facade.RemoveClouds(entities=[client.Entity(tag.cloud(name))])
[docs] async def add_model( self, model_name, cloud_name=None, credential_name=None, owner=None, config=None, region=None): """Add a model to this controller. :param str model_name: Name to give the new model. :param str cloud_name: Name of the cloud in which to create the model, e.g. 'aws'. Defaults to same cloud as controller. :param str credential_name: Name of the credential to use when creating the model. If not given, it will attempt to find a default credential. :param str owner: Username that will own the model. Defaults to the current user. :param dict config: Model configuration. :param str region: Region in which to create the model. :return Model: A connection to the newly created model. """ model_facade = client.ModelManagerFacade.from_connection( self.connection()) owner = owner or self.connection().info['user-info']['identity'] cloud_name = cloud_name or await self.get_cloud() try: # attempt to add/update the credential from local data if available credential_name = await self.add_credential( name=credential_name, cloud=cloud_name, owner=owner) except errors.JujuError: # if it's not available locally, assume it's on the controller pass if credential_name: credential = tag.credential( cloud_name, tag.untag('user-', owner), credential_name ) else: credential = None log.debug('Creating model %s', model_name) if not config or 'authorized-keys' not in config: config = config or {} config['authorized-keys'] = await utils.read_ssh_key() model_info = await model_facade.CreateModel( cloud_tag=tag.cloud(cloud_name), config=config, credential=credential, name=model_name, owner_tag=owner, region=region ) from juju.model import Model model = Model(jujudata=self._connector.jujudata) kwargs = self.connection().connect_params() kwargs['uuid'] = model_info.uuid model._info = model_info await model._connect_direct(**kwargs) return model
[docs] async def destroy_models(self, *models, destroy_storage=False, force=False, max_wait=None): """Destroy one or more models. :param str *models: Names or UUIDs of models to destroy :param bool destroy_storage: Whether or not to destroy storage when destroying the models. Defaults to false. :param bool force: Whether or not to force when destroying the models. Defaults to false. :param int max_wait : Max time in seconds to wait when destroying the models. """ uuids = await self.model_uuids() models = [uuids[model] if model in uuids else model for model in models] model_facade = client.ModelManagerFacade.from_connection( self.connection()) log.debug( 'Destroying model%s %s', '' if len(models) == 1 else 's', ', '.join(models) ) if model_facade.version >= 5: params = [ client.DestroyModelParams( model_tag=tag.model(model), destroy_storage=destroy_storage, force=force, max_wait=max_wait, ) for model in models] await model_facade.DestroyModels(models=params) else: params = [client.Entity(tag.model(model)) for model in models] await model_facade.DestroyModels(entities=params)
destroy_model = destroy_models
[docs] async def add_user(self, username, password=None, display_name=None): """Add a user to this controller. :param str username: Username :param str password: Password :param str display_name: Display name :returns: A :class:`~juju.user.User` instance """ if not display_name: display_name = username user_facade = client.UserManagerFacade.from_connection( self.connection()) users = [client.AddUser(display_name=display_name, username=username, password=password)] results = await user_facade.AddUser(users=users) secret_key = results.results[0].secret_key return await self.get_user(username, secret_key=secret_key)
[docs] async def remove_user(self, username): """Remove a user from this controller. """ client_facade = client.UserManagerFacade.from_connection( self.connection()) user = tag.user(username) await client_facade.RemoveUser(entities=[client.Entity(user)])
[docs] async def change_user_password(self, username, password): """Change the password for a user in this controller. :param str username: Username :param str password: New password """ user_facade = client.UserManagerFacade.from_connection( self.connection()) entity = client.EntityPassword(password=password, tag=tag.user(username)) return await user_facade.SetPassword(changes=[entity])
[docs] async def reset_user_password(self, username): """Reset user password. :param str username: Username :returns: A :class:`~juju.user.User` instance """ user_facade = client.UserManagerFacade.from_connection( self.connection()) entity = client.Entity(tag.user(username)) results = await user_facade.ResetPassword(entities=[entity]) secret_key = results.results[0].secret_key return await self.get_user(username, secret_key=secret_key)
[docs] async def destroy(self, destroy_all_models=False, destroy_storage=False): """Destroy this controller. :param bool destroy_all_models: Destroy all hosted models in the controller. :param bool destroy_storage: Destory all hosted storage in the controller. """ controller_facade = client.ControllerFacade.from_connection( self.connection()) return await controller_facade.DestroyController(destroy_models=destroy_all_models, destroy_storage=destroy_storage)
[docs] async def disable_user(self, username): """Disable a user. :param str username: Username """ user_facade = client.UserManagerFacade.from_connection( self.connection()) entity = client.Entity(tag.user(username)) return await user_facade.DisableUser(entities=[entity])
[docs] async def enable_user(self, username): """Re-enable a previously disabled user. """ user_facade = client.UserManagerFacade.from_connection( self.connection()) entity = client.Entity(tag.user(username)) return await user_facade.EnableUser(entities=[entity])
[docs] async def get_model_info(self, model_name=None, model_uuid=None): """Return a client.ModelInfo object for a given Model. Retrieves latest info for this Model from the api server. The return value is cached on the Model.info attribute so that the valued may be accessed again without another api call, if desired. This method is called automatically when the Model is connected, resulting in Model.info being initialized without requiring an explicit call to this method. """ if model_uuid is None and model_name is None: raise errors.JujuError("get_model_info requires either a name or a uuid for a model") facade = client.ModelManagerFacade.from_connection(self.connection()) if model_uuid is None: uuids = await self.model_uuids() try: model_uuid = uuids[model_name] except KeyError: raise errors.JujuError("{} is not among the models in the controller : {}".format(model_name, uuids)) entity = client.Entity(tag.model(model_uuid)) _model_info_results = await facade.ModelInfo(entities=[entity]) return _model_info_results.results[0].result
[docs] async def cloud(self, name=None): """Get Cloud :param str name: Cloud name. If not specified, the cloud where the controller lives on is returned. :returns: -> ~CloudResult """ if name is None: name = await self.get_cloud() entity = client.Entity(tag.cloud(name)) cloud_facade = client.CloudFacade.from_connection(self.connection()) cloud = await cloud_facade.Cloud(entities=[entity]) if len(cloud.results) == 0: log.error("No clouds found.") raise elif len(cloud.results) > 1: log.error("More than one cloud found.") raise return cloud.results[0]
[docs] async def clouds(self): """Get all the clouds in the controller :returns: -> ~CloudsResult """ cloud_facade = client.CloudFacade.from_connection(self.connection()) return await cloud_facade.Clouds()
[docs] async def get_cloud(self): """ Get the name of the cloud that this controller lives on. """ cloud_facade = client.CloudFacade.from_connection(self.connection()) result = await cloud_facade.Clouds() cloud = list(result.clouds.keys())[0] # only lives on one cloud return tag.untag('cloud-', cloud)
[docs] async def get_models(self, all=False, username=None): """ .. deprecated:: 0.7.0 Use :meth:`.list_models` instead. """ return await self.list_models(username, all)
[docs] async def model_uuids(self, username=None, all=False): """Return a mapping of model names to UUIDs the given user can access. :param str username: Optional username argument, defaults to current connected user. :param bool all: Flag to list all models, regardless of user accessibility (administrative users only) :returns: {str name : str UUID} """ model_manager_facade = client.ModelManagerFacade.from_connection(self.connection()) u_name = username if username else self.get_current_username() user = tag.user(u_name) user_model_list = await model_manager_facade.ListModelSummaries(user_tag=user, all_=all) model_summaries = [msr.result for msr in user_model_list.results] return {model_summary.name: model_summary.uuid for model_summary in model_summaries}
[docs] async def list_models(self, username=None, all=False): """Return list of names of the available models on this controller. Equivalent to ``sorted((await self.model_uuids()).keys())`` """ uuids = await self.model_uuids(username, all) return sorted(uuids.keys())
[docs] async def get_current_user(self, secret_key=None): """Returns the user object associated with the current connection. :param str secret_key: Issued by juju when add or reset user password :returns: A :class:`~juju.user.User` instance """ return await self.get_user(self.connection().username)
[docs] def get_current_username(self): """Returns the username associated with the current connection. :returns: :str: username of the connected user """ return self.connection().username
[docs] async def get_model(self, model): """Get a model by name or UUID. :param str model: Model name or UUID :returns Model: Connected Model instance. """ uuids = await self.model_uuids() if model in uuids: uuid = uuids[model] else: uuid = model from juju.model import Model model = Model() kwargs = self.connection().connect_params() kwargs['uuid'] = uuid await model._connect_direct(**kwargs) return model
[docs] async def get_user(self, username, secret_key=None): """Get a user by name. :param str username: Username :param str secret_key: Issued by juju when add or reset user password :returns: A :class:`~juju.user.User` instance """ client_facade = client.UserManagerFacade.from_connection( self.connection()) user = tag.user(username) args = [client.Entity(user)] try: response = await client_facade.UserInfo(entities=args, include_disabled=True) except errors.JujuError as e: if 'permission denied' in e.errors: # apparently, trying to get info for a nonexistent user returns # a "permission denied" error rather than an empty result set return None raise if response.results and response.results[0].result: return User(self, response.results[0].result, secret_key=secret_key) return None
[docs] async def get_users(self, include_disabled=False): """Return list of users that can connect to this controller. :param bool include_disabled: Include disabled users :returns: A list of :class:`~juju.user.User` instances """ client_facade = client.UserManagerFacade.from_connection( self.connection()) response = await client_facade.UserInfo(entities=None, include_disabled=include_disabled) return [User(self, r.result) for r in response.results]
[docs] async def grant(self, username, acl='login'): """Grant access level of the given user on the controller. Note that if the user already has higher permissions than the provided ACL, this will do nothing (see revoke for a way to remove permissions). :param str username: Username :param str acl: Access control ('login', 'add-model' or 'superuser') :returns: True if new access was granted, False if user already had requested access or greater. Raises JujuError if failed. """ controller_facade = client.ControllerFacade.from_connection( self.connection()) user = tag.user(username) changes = client.ModifyControllerAccess(acl, 'grant', user) try: await controller_facade.ModifyControllerAccess(changes=[changes]) return True except errors.JujuError as e: if 'user already has' in str(e): return False else: raise
[docs] async def revoke(self, username, acl='login'): """Removes some or all access of a user to from a controller If 'login' access is revoked, the user will no longer have any permissions on the controller. Revoking a higher privilege from a user without that privilege will have no effect. :param str username: username :param str acl: Access to remove ('login', 'add-model' or 'superuser') """ controller_facade = client.ControllerFacade.from_connection( self.connection()) user = tag.user(username) changes = client.ModifyControllerAccess(acl, 'revoke', user) return await controller_facade.ModifyControllerAccess(changes=[changes])
[docs] async def grant_model(self, username, model_uuid, acl='read'): """Grant a user access to a model. Note that if the user already has higher permissions than the provided ACL, this will do nothing (see revoke_model for a way to remove permissions). :param str username: Username :param str model_uuid: The UUID of the model to change. :param str acl: Access control ('read, 'write' or 'admin') """ model_facade = client.ModelManagerFacade.from_connection( self.connection()) user = tag.user(username) model = tag.model(model_uuid) changes = client.ModifyModelAccess(acl, 'grant', model, user) return await model_facade.ModifyModelAccess(changes=[changes])
[docs] async def revoke_model(self, username, model_uuid, acl='read'): """Revoke some or all of a user's access to a model. If 'read' access is revoked, the user will no longer have any permissions on the model. Revoking a higher privilege from a user without that privilege will have no effect. :param str username: Username to revoke :param str model_uuid: The UUID of the model to change. :param str acl: Access control ('read, 'write' or 'admin') """ model_facade = client.ModelManagerFacade.from_connection( self.connection()) user = tag.user(username) model = tag.model(model_uuid) changes = client.ModifyModelAccess(acl, 'revoke', model, user) return await model_facade.ModifyModelAccess(changes=[changes])
[docs] async def create_offer(self, model_uuid, endpoint, offer_name=None, application_name=None): """ Offer a deployed application using a series of endpoints for use by consumers. @param endpoint: holds the application and endpoint you want to offer @param offer_name: override the offer name to help the consumer @param application_name: overrides the application name in the endpoint """ # If we have both the offer_name and the application_name # then we're coming from bundle/overlays, so no need to parse the endpoint # Also we accept endpoints without a colon (:) in the overlays if offer_name and application_name: o_name = offer_name a_name = application_name eps = {endpoint: endpoint} else: try: offer = parse_offer_endpoint(endpoint) except OfferParseError as e: log.error(e.message) raise o_name = offer_name if offer_name else offer.application a_name = application_name if application_name else offer.application eps = {name: name for name in offer.endpoints} params = client.AddApplicationOffer() params.application_name = a_name params.endpoints = eps params.offer_name = o_name params.model_tag = tag.model(model_uuid) facade = client.ApplicationOffersFacade.from_connection(self.connection()) return await facade.Offer(offers=[params])
[docs] async def list_offers(self, model_name): """ Offers list information about applications' endpoints that have been shared and who is connected. """ params = client.OfferFilter() params.model_name = model_name facade = client.ApplicationOffersFacade.from_connection(self.connection()) return await facade.ListApplicationOffers(filters=[params])
[docs] async def remove_offer(self, model_uuid, offer, force=False): """ Remove offer for an application. Offers will also remove relations to those offers, use force to do so, without an error. """ url = None try: url = parse_offer_url(offer) except OfferParseError as e: log.error(e.message) raise if url is None: raise Exception offer_source = url.source if offer_source == "": offer_source = self.controller_name if not force: raise RemoveError("removing offer will also remove relations, use force and try again.") facade = client.ApplicationOffersFacade.from_connection(self.connection()) return await facade.DestroyOffers(force=force, offer_urls=[url.string()])
[docs] async def get_consume_details(self, endpoint): """ get_consume_details returns the details necessary to pass to another model to consume the specified offers represented by the urls. """ facade = client.ApplicationOffersFacade.from_connection(self.connection()) offers = await facade.GetConsumeDetails(offer_urls=client.OfferURLs(offer_urls=[endpoint])) if len(offers.results) != 1: raise JujuAPIError("expected to find one result") result = offers.results[0] if result.error is not None: raise JujuAPIError(result.error) return result
[docs] async def watch_model_summaries(self, callback, as_admin=False): """ Watch the controller for model summary updates. If as_admin is true, a call will be made as the admin to watch all models in the controller. If the user isn't a superuser they will get a permission error. """ stop_event = jasyncio.Event() async def _watcher(stop_event): try: facade = client.ControllerFacade.from_connection( self.connection()) watcher = client.ModelSummaryWatcherFacade.from_connection( self.connection()) if as_admin: result = await facade.WatchAllModelSummaries() watcher.Id = result.watcher_id else: result = await facade.WatchModelSummaries() log.debug("watcher id: {}".format(result.watcher_id)) watcher.Id = result.watcher_id while True: try: results = await utils.run_with_interrupt( watcher.Next(), stop_event, log=log) except JujuAPIError as e: if 'watcher was stopped' not in str(e): raise except websockets.ConnectionClosed: break if stop_event.is_set(): try: await watcher.Stop() except websockets.ConnectionClosed: pass # can't stop on a closed conn break for summary in results.models: callback(summary) except CancelledError: pass except Exception: log.exception('Error in watcher') raise log.debug('Starting watcher task for model summaries') jasyncio.ensure_future(_watcher(stop_event)) return stop_event
[docs] async def add_secret_backends(self, id, name, backend_type, config): """ Add a new secret backend. Parameters ---------- id : string id for the backend name : string name of the backend backend-type : string config : dict dictionary with the backend configuration values Returns ------- list a list of errors if any """ facade = client.SecretBackendsFacade.from_connection(self.connection()) return await facade.AddSecretBackends([{ 'id': id, 'backend-type': backend_type, 'config': config, 'name': name, 'token-rotate-interval': config.get('token-rotate-interval', None), }])
[docs] async def list_secret_backends(self, reveal=False): """ Return the list of secret backends Parameters ---------- reveal : boolean include sensitive backend config content if true Returns ------- list a list of available secret backends """ facade = client.SecretBackendsFacade.from_connection(self.connection()) return await facade.ListSecretBackends(None, reveal)
[docs] async def remove_secret_backends(self, name, force=False): """ Remove a secrets backend. Parameters ---------- name : name of the backend force : true if the operation is foced Returns ------- error if any """ facade = client.SecretBackendsFacade.from_connection(self.connection()) return await facade.RemoveSecretBackends([{ 'name': name, 'force': force }])
[docs] async def update_secret_backends(self, name, config=None, force=False, name_change=None, token_rotate_interval=None): """ Update a backend. Parameters ---------- name : string the backend name config : dict key value dict with configuration parameters force : boolean true to force the upate process name_change : string new name for the backend token_rotate_interval : int token rotation interval """ facade = client.SecretBackendsFacade.from_connection(self.connection()) return await facade.UpdateSecretBackends([{ 'name': name, 'config': config, 'force': force, 'token-rotate-interval': token_rotate_interval, 'name-change': name_change, }])
[docs]class ConnectedController(Controller): def __init__( self, connection, max_frame_size=None, bakery_client=None, jujudata=None, ): super().__init__( max_frame_size=max_frame_size, bakery_client=bakery_client, jujudata=jujudata) self._conn = connection async def __aenter__(self): kwargs = self._conn.connect_params() kwargs.pop('uuid') await self._connect_direct(**kwargs) return self async def __aexit__(self, exc_type, exc, tb): await self.disconnect()