Source code for juju.user

# Copyright 2023 Canonical Ltd.
# Licensed under the Apache V2, see LICENCE file for details.

import logging

from backports.datetime_fromisoformat import datetime_fromisoformat

from . import errors, tag
from .client import client

log = logging.getLogger(__name__)


[docs]class User: def __init__(self, controller, user_info, secret_key=None): self.controller = controller self._user_info = user_info self._secret_key = secret_key @property def tag(self): return tag.user(self.username) @property def username(self): return self._user_info.username @property def display_name(self): return self._user_info.display_name @property def last_connection(self): return datetime_fromisoformat(self._user_info.last_connection) @property def access(self): """Identifies the controller access levels of this user""" return self._user_info.access @property def date_created(self): return self._user_info.date_created @property def enabled(self): return not self._user_info.disabled @property def disabled(self): return self._user_info.disabled @property def created_by(self): return self._user_info.created_by @property def secret_key(self): return self._secret_key
[docs] async def set_password(self, password): """Update this user's password.""" await self.controller.change_user_password(self.username, password) self._user_info.password = password
[docs] async def modify_model_access(self, acl, action, model_name): """Grants or revokes the given access level for this user for a given model :param str acl: Model access levels (see access module) :param str action: grant/revoke :param str model_name: Name of the model :return bool: True if access changed, Error if user already has it """ modelmanager_facade = client.ModelManagerFacade.from_connection( self.controller.connection() ) models = await self.controller.model_uuids() if model_name not in models: raise errors.JujuError(f"Unable to find model : {model_name}") changes = client.ModifyModelAccess( acl, action, tag.model(models[model_name]), self.tag ) await modelmanager_facade.ModifyModelAccess(changes=[changes]) return True
[docs] async def modify_controller_access(self, acl, action): """Grants or revokes the given access level for this user on the current controller :param str acl: Controller access levels (see access module) :param str action: grant/revoke :return bool: True if access changed, Error if user already has it """ controller_facade = client.ControllerFacade.from_connection( self.controller.connection() ) changes = client.ModifyControllerAccess(acl, action, self.tag) await controller_facade.ModifyControllerAccess(changes=[changes]) new_access = acl if action == "revoke": new_access = "" self._user_info.access = new_access return True
[docs] async def modify_offer_access(self, acl, action, offer_url): """Grants or revokes the given access level for this user on a given offer :param str acl: Controller access levels (see access module) :param str action: grant/revoke :param str offer_url: url for the offer :return bool: True if access changed, Error if user already has it """ application_offers_facade = client.ApplicationOffersFacade.from_connection( self.controller.connection() ) changes = client.ModifyOfferAccess(acl, action, offer_url, self.tag) await application_offers_facade.ModifyOfferAccess(changes=[changes]) return True
[docs] async def grant_or_revoke(self, acl, action, **kwargs): """Grants or revokes the given access level of this user on model, offer or controller, depending on the access level (see the access module) :param str acl: Access control level :param str action: 'grant' or 'revoke' Depending on the access level, the available keyword parameters are: :param str model_name: name of the model if acl is one of model access levels :param str offer_url: url for the offer if acl is one of offer access levels :return: True if access changed, False if user already has it """ try: if "model_name" in kwargs: return await self.modify_model_access(acl, action, kwargs["model_name"]) elif "offer_url" in kwargs: return await self.modify_offer_access(acl, action, kwargs["offer_url"]) else: return await self.modify_controller_access(acl, action) except errors.JujuError as e: if "user already has" in str(e): return False else: raise
[docs] async def grant(self, acl, **kwargs): """Grant the given access level of this user on model, offer or controller, depending on the access level (see the access module) :param str acl: Access control level Depending on the access level, the available keyword parameters are: :param str model_name: name of the model if acl is one of model access levels :param str offer_url: url for the offer if acl is one of offer access levels :return: None or Error """ return await self.grant_or_revoke(acl, "grant", **kwargs)
[docs] async def revoke(self, acl="login", **kwargs): """The opposite of user.grant(). Revokes the given access level of this user on model, offer or controller, depending on the given access level. :param str acl: Access control level (see access module) Available keyword parameters are: :param str model_name: name of the model if acl is one of model access levels :param str offer_url: url for the offer if acl is one of offer access levels """ return await self.grant_or_revoke(acl, "revoke", **kwargs)
[docs] async def disable(self): """Disable this user.""" await self.controller.disable_user(self.username) self._user_info.disabled = True
[docs] async def enable(self): """Re-enable this user.""" await self.controller.enable_user(self.username) self._user_info.disabled = False