# Copyright 2023 Canonical Ltd.
# Licensed under the Apache V2, see LICENCE file for details.
import json
import logging
from concurrent.futures import CancelledError
from pathlib import Path
import websockets
from . import errors, tag, utils, jasyncio
from .client import client, connector
from .errors import JujuAPIError
from .offerendpoints import ParseError as OfferParseError
from .offerendpoints import parse_offer_endpoint, parse_offer_url
from .user import User
log = logging.getLogger(__name__)
[docs]class RemoveError(Exception):
def __init__(self, message):
self.message = message
[docs]class Controller:
def __init__(
self,
max_frame_size=None,
bakery_client=None,
jujudata=None,
):
"""Instantiate a new Controller.
One of the connect_* methods will need to be called before this
object can be used for anything interesting.
If jujudata is None, jujudata.FileJujuData will be used.
:param max_frame_size: See
`juju.client.connection.Connection.MAX_FRAME_SIZE`
:param bakery_client httpbakery.Client: The bakery client to use
for macaroon authorization.
:param jujudata JujuData: The source for current controller
information.
"""
self._connector = connector.Connector(
max_frame_size=max_frame_size,
bakery_client=bakery_client,
jujudata=jujudata,
)
self._controller_name = None
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.disconnect()
[docs] async def connect(self, *args, **kwargs):
"""Connect to a Juju controller.
This supports two calling conventions:
The controller and (optionally) authentication information can be
taken from the data files created by the Juju CLI. This convention
will be used if a ``controller_name`` is specified, or if the
``endpoint`` is not.
Otherwise, both the ``endpoint`` and authentication information
(``username`` and ``password``, or ``bakery_client`` and/or
``macaroons``) are required.
If a single positional argument is given, it will be assumed to be
the ``controller_name``. Otherwise, the first positional argument,
if any, must be the ``endpoint``.
Available parameters are:
:param str controller_name: Name of controller registered with the
Juju CLI.
:param str endpoint: The hostname:port of the controller to connect to.
:param str username: The username for controller-local users (or None
to use macaroon-based login.)
:param str password: The password for controller-local users.
:param str cacert: The CA certificate of the controller
(PEM formatted).
:param httpbakery.Client bakery_client: The macaroon bakery client to
to use when performing macaroon-based login. Macaroon tokens
acquired when logging will be saved to bakery_client.cookies.
If this is None, a default bakery_client will be used.
:param list macaroons: List of macaroons to load into the
``bakery_client``.
:param int max_frame_size: The maximum websocket frame size to allow.
:param specified_facades: Overwrite the facades with a series of
specified facades.
"""
await self.disconnect()
if 'endpoint' not in kwargs and len(args) < 2:
if args and 'model_name' in kwargs:
raise TypeError('connect() got multiple values for '
'controller_name')
elif args:
controller_name = args[0]
else:
controller_name = kwargs.pop('controller_name', None)
await self._connector.connect_controller(controller_name, **kwargs)
else:
if 'controller_name' in kwargs:
raise TypeError('connect() got values for both '
'controller_name and endpoint')
if args and 'endpoint' in kwargs:
raise TypeError('connect() got multiple values for endpoint')
has_userpass = (len(args) >= 3 or
{'username', 'password'}.issubset(kwargs))
has_macaroons = (len(args) >= 5 or not
{'bakery_client', 'macaroons'}.isdisjoint(kwargs))
if not (has_userpass or has_macaroons):
raise TypeError('connect() missing auth params')
arg_names = [
'endpoint',
'username',
'password',
'cacert',
'bakery_client',
'macaroons',
'max_frame_size',
]
for i, arg in enumerate(args):
kwargs[arg_names[i]] = arg
if 'endpoint' not in kwargs:
raise ValueError('endpoint is required '
'if controller_name not given')
if not ({'username', 'password'}.issubset(kwargs) or
{'bakery_client', 'macaroons'}.intersection(kwargs)):
raise ValueError('Authentication parameters are required '
'if controller_name not given')
await self._connector.connect(**kwargs)
await self.update_endpoints()
[docs] async def update_endpoints(self):
try:
info = await self.info()
self._connector._connection.endpoints = [
(e, info.results[0].cacert)
for e in info.results[0].addresses
]
except errors.JujuPermissionError:
log.warning("This user doesn't have at least read access to the controller model, so endpoints are not updated after connection.")
pass
[docs] async def connect_current(self):
"""
.. deprecated:: 0.7.3
Use :meth:`.connect()` instead.
"""
return await self.connect()
[docs] async def connect_controller(self, controller_name):
"""
.. deprecated:: 0.7.3
Use :meth:`.connect(controller_name)` instead.
"""
return await self.connect(controller_name)
async def _connect_direct(self, **kwargs):
await self.disconnect()
await self._connector.connect(**kwargs)
[docs] def is_connected(self):
"""Reports whether the Controller is currently connected."""
return self._connector.is_connected()
[docs] def connection(self):
"""Return the current Connection object. It raises an exception
if the Controller is disconnected"""
return self._connector.connection()
@property
def controller_name(self):
if not self._controller_name:
try:
self._controller_name = self._connector.jujudata.controller_name_by_endpoint(
self._connector.connection().endpoint)
except FileNotFoundError:
raise errors.PylibjujuError("Unable to determine controller name. controllers.yaml not found.")
return self._controller_name
@property
def controller_uuid(self):
return self._connector.controller_uuid
@property
async def api_endpoints(self):
"""Get API endpoints
:return list string: List of API Endpoints
"""
info = await self.info()
return info.results[0].addresses
[docs] async def disconnect(self):
"""Shut down the watcher task and close websockets.
"""
await self._connector.disconnect(entity='controller')
[docs] async def add_credential(self, name=None, credential=None, cloud=None,
owner=None, force=False):
"""Add or update a credential to the controller.
:param str name: Name of new credential. If None, the default
local credential is used. Name must be provided if a credential
is given.
:param CloudCredential credential: Credential to add. If not given,
it will attempt to read from local data, if available.
:param str cloud: Name of cloud to associate the credential with.
Defaults to the same cloud as the controller.
:param str owner: Username that will own the credential. Defaults to
the current user.
:param bool force: Force indicates whether the update should be forced.
It's only supported for facade v3 or later.
Defaults to false.
:returns: Name of credential that was uploaded.
"""
if not cloud:
cloud = await self.get_cloud()
if not owner:
owner = self.connection().info['user-info']['identity']
if credential and not name:
raise errors.JujuError('Name must be provided for credential')
if not credential:
name, credential = self._connector.jujudata.load_credential(cloud,
name)
if credential is None:
raise errors.JujuError(
'Unable to find credential: {}'.format(name))
if credential.auth_type == 'jsonfile' and 'file' in credential.attrs:
# file creds have to be loaded before being sent to the controller
try:
# it might already be JSON
json.loads(credential.attrs['file'])
except json.JSONDecodeError:
# not valid JSON, so maybe it's a file
cred_path = Path(credential.attrs['file'])
if cred_path.exists():
# make a copy
cred_json = credential.to_json()
credential = client.CloudCredential.from_json(cred_json)
# inline the cred
credential.attrs['file'] = cred_path.read_text()
log.debug('Uploading credential %s', name)
cloud_facade = client.CloudFacade.from_connection(self.connection())
tagged_credentials = [
client.TaggedCredential(
tag=tag.credential(cloud, tag.untag('user-', owner), name),
credential=credential,
)]
if cloud_facade.version >= 3:
# UpdateCredentials was renamed to UpdateCredentialsCheckModels
# in facade version 3.
await cloud_facade.UpdateCredentialsCheckModels(
credentials=tagged_credentials, force=force,
)
else:
await cloud_facade.UpdateCredentials(credentials=tagged_credentials)
return name
[docs] async def add_cloud(self, name, cloud):
"""Add a cloud to this controller.
:param str name: Name to give the new cloud.
:param Cloud cloud: Cloud configuration.
:return Cloud: Cloud that was created.
"""
log.debug('Adding cloud %s', name)
cloud_facade = client.CloudFacade.from_connection(self.connection())
await cloud_facade.AddCloud(cloud=cloud, name=name)
result = await self.cloud(name=name)
return result.cloud
[docs] async def info(self):
"""Show Controller Info from connection
:return ControllerAPIInfoResult
"""
log.debug('Getting information')
uuids = await self.model_uuids()
if 'controller' not in uuids:
raise errors.JujuPermissionError('Requires access to controller model.')
controller_facade = client.ControllerFacade.from_connection(self.connection())
params = [client.Entity(tag.model(uuids["controller"]))]
return await controller_facade.ControllerAPIInfoForModels(entities=params)
[docs] async def remove_cloud(self, name):
"""Remove a cloud from this controller.
:param str name: Name of the cloud to remove.
"""
log.debug('Removing cloud %s', name)
cloud_facade = client.CloudFacade.from_connection(self.connection())
await cloud_facade.RemoveClouds(entities=[client.Entity(tag.cloud(name))])
[docs] async def add_model(
self, model_name, cloud_name=None, credential_name=None,
owner=None, config=None, region=None):
"""Add a model to this controller.
:param str model_name: Name to give the new model.
:param str cloud_name: Name of the cloud in which to create the
model, e.g. 'aws'. Defaults to same cloud as controller.
:param str credential_name: Name of the credential to use when
creating the model. If not given, it will attempt to find a
default credential.
:param str owner: Username that will own the model. Defaults to
the current user.
:param dict config: Model configuration.
:param str region: Region in which to create the model.
:return Model: A connection to the newly created model.
"""
model_facade = client.ModelManagerFacade.from_connection(
self.connection())
owner = owner or self.connection().info['user-info']['identity']
cloud_name = cloud_name or await self.get_cloud()
try:
# attempt to add/update the credential from local data if available
credential_name = await self.add_credential(
name=credential_name,
cloud=cloud_name,
owner=owner)
except errors.JujuError:
# if it's not available locally, assume it's on the controller
pass
if credential_name:
credential = tag.credential(
cloud_name,
tag.untag('user-', owner),
credential_name
)
else:
credential = None
log.debug('Creating model %s', model_name)
if not config or 'authorized-keys' not in config:
config = config or {}
config['authorized-keys'] = await utils.read_ssh_key()
model_info = await model_facade.CreateModel(
cloud_tag=tag.cloud(cloud_name),
config=config,
credential=credential,
name=model_name,
owner_tag=owner,
region=region
)
from juju.model import Model
model = Model(jujudata=self._connector.jujudata)
kwargs = self.connection().connect_params()
kwargs['uuid'] = model_info.uuid
model._info = model_info
await model._connect_direct(**kwargs)
return model
[docs] async def destroy_models(self, *models, destroy_storage=False, force=False, max_wait=None):
"""Destroy one or more models.
:param str *models: Names or UUIDs of models to destroy
:param bool destroy_storage: Whether or not to destroy storage when
destroying the models. Defaults to false.
:param bool force: Whether or not to force when destroying the models.
Defaults to false.
:param int max_wait : Max time in seconds to wait when destroying the models.
"""
uuids = await self.model_uuids()
models = [uuids[model] if model in uuids else model
for model in models]
model_facade = client.ModelManagerFacade.from_connection(
self.connection())
log.debug(
'Destroying model%s %s',
'' if len(models) == 1 else 's',
', '.join(models)
)
if model_facade.version >= 5:
params = [
client.DestroyModelParams(
model_tag=tag.model(model),
destroy_storage=destroy_storage,
force=force,
max_wait=max_wait,
)
for model in models]
await model_facade.DestroyModels(models=params)
else:
params = [client.Entity(tag.model(model)) for model in models]
await model_facade.DestroyModels(entities=params)
destroy_model = destroy_models
[docs] async def add_user(self, username, password=None, display_name=None):
"""Add a user to this controller.
:param str username: Username
:param str password: Password
:param str display_name: Display name
:returns: A :class:`~juju.user.User` instance
"""
if not display_name:
display_name = username
user_facade = client.UserManagerFacade.from_connection(
self.connection())
users = [client.AddUser(display_name=display_name,
username=username,
password=password)]
results = await user_facade.AddUser(users=users)
secret_key = results.results[0].secret_key
return await self.get_user(username, secret_key=secret_key)
[docs] async def remove_user(self, username):
"""Remove a user from this controller.
"""
client_facade = client.UserManagerFacade.from_connection(
self.connection())
user = tag.user(username)
await client_facade.RemoveUser(entities=[client.Entity(user)])
[docs] async def change_user_password(self, username, password):
"""Change the password for a user in this controller.
:param str username: Username
:param str password: New password
"""
user_facade = client.UserManagerFacade.from_connection(
self.connection())
entity = client.EntityPassword(password=password, tag=tag.user(username))
return await user_facade.SetPassword(changes=[entity])
[docs] async def reset_user_password(self, username):
"""Reset user password.
:param str username: Username
:returns: A :class:`~juju.user.User` instance
"""
user_facade = client.UserManagerFacade.from_connection(
self.connection())
entity = client.Entity(tag.user(username))
results = await user_facade.ResetPassword(entities=[entity])
secret_key = results.results[0].secret_key
return await self.get_user(username, secret_key=secret_key)
[docs] async def destroy(self, destroy_all_models=False, destroy_storage=False):
"""Destroy this controller.
:param bool destroy_all_models: Destroy all hosted models in the
controller.
:param bool destroy_storage: Destory all hosted storage in the
controller.
"""
controller_facade = client.ControllerFacade.from_connection(
self.connection())
return await controller_facade.DestroyController(destroy_models=destroy_all_models, destroy_storage=destroy_storage)
[docs] async def disable_user(self, username):
"""Disable a user.
:param str username: Username
"""
user_facade = client.UserManagerFacade.from_connection(
self.connection())
entity = client.Entity(tag.user(username))
return await user_facade.DisableUser(entities=[entity])
[docs] async def enable_user(self, username):
"""Re-enable a previously disabled user.
"""
user_facade = client.UserManagerFacade.from_connection(
self.connection())
entity = client.Entity(tag.user(username))
return await user_facade.EnableUser(entities=[entity])
[docs] async def get_model_info(self, model_name=None, model_uuid=None):
"""Return a client.ModelInfo object for a given Model.
Retrieves latest info for this Model from the api server. The
return value is cached on the Model.info attribute so that the
valued may be accessed again without another api call, if
desired.
This method is called automatically when the Model is connected,
resulting in Model.info being initialized without requiring an
explicit call to this method.
"""
if model_uuid is None and model_name is None:
raise errors.JujuError("get_model_info requires either a name or a uuid for a model")
facade = client.ModelManagerFacade.from_connection(self.connection())
if model_uuid is None:
uuids = await self.model_uuids()
try:
model_uuid = uuids[model_name]
except KeyError:
raise errors.JujuError("{} is not among the models in the controller : {}".format(model_name, uuids))
entity = client.Entity(tag.model(model_uuid))
_model_info_results = await facade.ModelInfo(entities=[entity])
return _model_info_results.results[0].result
[docs] async def cloud(self, name=None):
"""Get Cloud
:param str name: Cloud name. If not specified, the cloud where
the controller lives on is returned.
:returns: -> ~CloudResult
"""
if name is None:
name = await self.get_cloud()
entity = client.Entity(tag.cloud(name))
cloud_facade = client.CloudFacade.from_connection(self.connection())
cloud = await cloud_facade.Cloud(entities=[entity])
if len(cloud.results) == 0:
log.error("No clouds found.")
raise
elif len(cloud.results) > 1:
log.error("More than one cloud found.")
raise
return cloud.results[0]
[docs] async def clouds(self):
"""Get all the clouds in the controller
:returns: -> ~CloudsResult
"""
cloud_facade = client.CloudFacade.from_connection(self.connection())
return await cloud_facade.Clouds()
[docs] async def get_cloud(self):
"""
Get the name of the cloud that this controller lives on.
"""
cloud_facade = client.CloudFacade.from_connection(self.connection())
result = await cloud_facade.Clouds()
cloud = list(result.clouds.keys())[0] # only lives on one cloud
return tag.untag('cloud-', cloud)
[docs] async def get_models(self, all=False, username=None):
"""
.. deprecated:: 0.7.0
Use :meth:`.list_models` instead.
"""
return await self.list_models(username, all)
[docs] async def model_uuids(self, username=None, all=False):
"""Return a mapping of model names to UUIDs the given user can access.
:param str username: Optional username argument, defaults to
current connected user.
:param bool all: Flag to list all models, regardless of
user accessibility (administrative users only)
:returns: {str name : str UUID}
"""
model_manager_facade = client.ModelManagerFacade.from_connection(self.connection())
u_name = username if username else self.get_current_username()
user = tag.user(u_name)
user_model_list = await model_manager_facade.ListModelSummaries(user_tag=user, all_=all)
model_summaries = [msr.result for msr in user_model_list.results]
return {model_summary.name: model_summary.uuid
for model_summary in model_summaries}
[docs] async def list_models(self, username=None, all=False):
"""Return list of names of the available models on this controller.
Equivalent to ``sorted((await self.model_uuids()).keys())``
"""
uuids = await self.model_uuids(username, all)
return sorted(uuids.keys())
[docs] async def get_current_user(self, secret_key=None):
"""Returns the user object associated with the current connection.
:param str secret_key: Issued by juju when add or reset user
password
:returns: A :class:`~juju.user.User` instance
"""
return await self.get_user(self.connection().username)
[docs] def get_current_username(self):
"""Returns the username associated with the current connection.
:returns: :str: username of the connected user
"""
return self.connection().username
[docs] async def get_model(self, model):
"""Get a model by name or UUID.
:param str model: Model name or UUID
:returns Model: Connected Model instance.
"""
uuids = await self.model_uuids()
if model in uuids:
uuid = uuids[model]
else:
uuid = model
from juju.model import Model
model = Model()
kwargs = self.connection().connect_params()
kwargs['uuid'] = uuid
await model._connect_direct(**kwargs)
return model
[docs] async def get_user(self, username, secret_key=None):
"""Get a user by name.
:param str username: Username
:param str secret_key: Issued by juju when add or reset user
password
:returns: A :class:`~juju.user.User` instance
"""
client_facade = client.UserManagerFacade.from_connection(
self.connection())
user = tag.user(username)
args = [client.Entity(user)]
try:
response = await client_facade.UserInfo(entities=args, include_disabled=True)
except errors.JujuError as e:
if 'permission denied' in e.errors:
# apparently, trying to get info for a nonexistent user returns
# a "permission denied" error rather than an empty result set
return None
raise
if response.results and response.results[0].result:
return User(self, response.results[0].result, secret_key=secret_key)
return None
[docs] async def get_users(self, include_disabled=False):
"""Return list of users that can connect to this controller.
:param bool include_disabled: Include disabled users
:returns: A list of :class:`~juju.user.User` instances
"""
client_facade = client.UserManagerFacade.from_connection(
self.connection())
response = await client_facade.UserInfo(entities=None, include_disabled=include_disabled)
return [User(self, r.result) for r in response.results]
[docs] async def grant(self, username, acl='login'):
"""Grant access level of the given user on the controller.
Note that if the user already has higher permissions than the
provided ACL, this will do nothing (see revoke for a way to
remove permissions).
:param str username: Username
:param str acl: Access control ('login', 'add-model' or 'superuser')
:returns: True if new access was granted, False if user already had
requested access or greater. Raises JujuError if failed.
"""
controller_facade = client.ControllerFacade.from_connection(
self.connection())
user = tag.user(username)
changes = client.ModifyControllerAccess(acl, 'grant', user)
try:
await controller_facade.ModifyControllerAccess(changes=[changes])
return True
except errors.JujuError as e:
if 'user already has' in str(e):
return False
else:
raise
[docs] async def revoke(self, username, acl='login'):
"""Removes some or all access of a user to from a controller
If 'login' access is revoked, the user will no longer have any
permissions on the controller. Revoking a higher privilege from
a user without that privilege will have no effect.
:param str username: username
:param str acl: Access to remove ('login', 'add-model' or 'superuser')
"""
controller_facade = client.ControllerFacade.from_connection(
self.connection())
user = tag.user(username)
changes = client.ModifyControllerAccess(acl, 'revoke', user)
return await controller_facade.ModifyControllerAccess(changes=[changes])
[docs] async def grant_model(self, username, model_uuid, acl='read'):
"""Grant a user access to a model. Note that if the user
already has higher permissions than the provided ACL,
this will do nothing (see revoke_model for a way to remove
permissions).
:param str username: Username
:param str model_uuid: The UUID of the model to change.
:param str acl: Access control ('read, 'write' or 'admin')
"""
model_facade = client.ModelManagerFacade.from_connection(
self.connection())
user = tag.user(username)
model = tag.model(model_uuid)
changes = client.ModifyModelAccess(acl, 'grant', model, user)
return await model_facade.ModifyModelAccess(changes=[changes])
[docs] async def revoke_model(self, username, model_uuid, acl='read'):
"""Revoke some or all of a user's access to a model.
If 'read' access is revoked, the user will no longer have any
permissions on the model. Revoking a higher privilege from
a user without that privilege will have no effect.
:param str username: Username to revoke
:param str model_uuid: The UUID of the model to change.
:param str acl: Access control ('read, 'write' or 'admin')
"""
model_facade = client.ModelManagerFacade.from_connection(
self.connection())
user = tag.user(username)
model = tag.model(model_uuid)
changes = client.ModifyModelAccess(acl, 'revoke', model, user)
return await model_facade.ModifyModelAccess(changes=[changes])
[docs] async def create_offer(self, model_uuid, endpoint, offer_name=None, application_name=None):
"""
Offer a deployed application using a series of endpoints for use by
consumers.
@param endpoint: holds the application and endpoint you want to offer
@param offer_name: override the offer name to help the consumer
@param application_name: overrides the application name in the endpoint
"""
# If we have both the offer_name and the application_name
# then we're coming from bundle/overlays, so no need to parse the endpoint
# Also we accept endpoints without a colon (:) in the overlays
if offer_name and application_name:
o_name = offer_name
a_name = application_name
eps = {endpoint: endpoint}
else:
try:
offer = parse_offer_endpoint(endpoint)
except OfferParseError as e:
log.error(e.message)
raise
o_name = offer_name if offer_name else offer.application
a_name = application_name if application_name else offer.application
eps = {name: name for name in offer.endpoints}
params = client.AddApplicationOffer()
params.application_name = a_name
params.endpoints = eps
params.offer_name = o_name
params.model_tag = tag.model(model_uuid)
facade = client.ApplicationOffersFacade.from_connection(self.connection())
return await facade.Offer(offers=[params])
[docs] async def list_offers(self, model_name):
"""
Offers list information about applications' endpoints that have been
shared and who is connected.
"""
params = client.OfferFilter()
params.model_name = model_name
facade = client.ApplicationOffersFacade.from_connection(self.connection())
return await facade.ListApplicationOffers(filters=[params])
[docs] async def remove_offer(self, model_uuid, offer, force=False):
"""
Remove offer for an application.
Offers will also remove relations to those offers, use force to do
so, without an error.
"""
url = None
try:
url = parse_offer_url(offer)
except OfferParseError as e:
log.error(e.message)
raise
if url is None:
raise Exception
offer_source = url.source
if offer_source == "":
offer_source = self.controller_name
if not force:
raise RemoveError("removing offer will also remove relations, use force and try again.")
facade = client.ApplicationOffersFacade.from_connection(self.connection())
return await facade.DestroyOffers(force=force, offer_urls=[url.string()])
[docs] async def get_consume_details(self, endpoint):
"""
get_consume_details returns the details necessary to pass to another
model to consume the specified offers represented by the urls.
"""
facade = client.ApplicationOffersFacade.from_connection(self.connection())
offers = await facade.GetConsumeDetails(offer_urls=client.OfferURLs(offer_urls=[endpoint]))
if len(offers.results) != 1:
raise JujuAPIError("expected to find one result")
result = offers.results[0]
if result.error is not None:
raise JujuAPIError(result.error)
return result
[docs] async def watch_model_summaries(self, callback, as_admin=False):
"""
Watch the controller for model summary updates.
If as_admin is true, a call will be made as the admin to watch
all models in the controller. If the user isn't a superuser they
will get a permission error.
"""
stop_event = jasyncio.Event()
async def _watcher(stop_event):
try:
facade = client.ControllerFacade.from_connection(
self.connection())
watcher = client.ModelSummaryWatcherFacade.from_connection(
self.connection())
if as_admin:
result = await facade.WatchAllModelSummaries()
watcher.Id = result.watcher_id
else:
result = await facade.WatchModelSummaries()
log.debug("watcher id: {}".format(result.watcher_id))
watcher.Id = result.watcher_id
while True:
try:
results = await utils.run_with_interrupt(
watcher.Next(),
stop_event,
log=log)
except JujuAPIError as e:
if 'watcher was stopped' not in str(e):
raise
except websockets.ConnectionClosed:
break
if stop_event.is_set():
try:
await watcher.Stop()
except websockets.ConnectionClosed:
pass # can't stop on a closed conn
break
for summary in results.models:
callback(summary)
except CancelledError:
pass
except Exception:
log.exception('Error in watcher')
raise
log.debug('Starting watcher task for model summaries')
jasyncio.ensure_future(_watcher(stop_event))
return stop_event
[docs] async def add_secret_backends(self, id, name, backend_type, config):
"""
Add a new secret backend.
Parameters
----------
id : string
id for the backend
name : string
name of the backend
backend-type : string
config : dict
dictionary with the backend configuration values
Returns
-------
list
a list of errors if any
"""
facade = client.SecretBackendsFacade.from_connection(self.connection())
return await facade.AddSecretBackends([{
'id': id,
'backend-type': backend_type,
'config': config,
'name': name,
'token-rotate-interval': config.get('token-rotate-interval', None),
}])
[docs] async def list_secret_backends(self, reveal=False):
"""
Return the list of secret backends
Parameters
----------
reveal : boolean
include sensitive backend config content if true
Returns
-------
list
a list of available secret backends
"""
facade = client.SecretBackendsFacade.from_connection(self.connection())
return await facade.ListSecretBackends(None, reveal)
[docs] async def remove_secret_backends(self, name, force=False):
"""
Remove a secrets backend.
Parameters
----------
name : name of the backend
force : true if the operation is foced
Returns
-------
error if any
"""
facade = client.SecretBackendsFacade.from_connection(self.connection())
return await facade.RemoveSecretBackends([{
'name': name,
'force': force
}])
[docs] async def update_secret_backends(self, name, config=None, force=False, name_change=None, token_rotate_interval=None):
"""
Update a backend.
Parameters
----------
name : string
the backend name
config : dict
key value dict with configuration parameters
force : boolean
true to force the upate process
name_change : string
new name for the backend
token_rotate_interval : int
token rotation interval
"""
facade = client.SecretBackendsFacade.from_connection(self.connection())
return await facade.UpdateSecretBackends([{
'name': name,
'config': config,
'force': force,
'token-rotate-interval': token_rotate_interval,
'name-change': name_change,
}])
[docs]class ConnectedController(Controller):
def __init__(
self,
connection,
max_frame_size=None,
bakery_client=None,
jujudata=None,
):
super().__init__(
max_frame_size=max_frame_size,
bakery_client=bakery_client,
jujudata=jujudata)
self._conn = connection
async def __aenter__(self):
kwargs = self._conn.connect_params()
kwargs.pop('uuid')
await self._connect_direct(**kwargs)
return self
async def __aexit__(self, exc_type, exc, tb):
await self.disconnect()