Source code for juju.client._client4

# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
# Changes will be overwritten/lost when the file is regenerated.

from juju.client._definitions import *
from juju.client.facade import ReturnMapping, Type


[docs]class AllModelWatcherFacade(Type): name = "AllModelWatcher" version = 4
[docs] @ReturnMapping(AllWatcherNextResults) async def Next(self): """Next will return the current state of everything on the first call and subsequent calls will Returns -> AllWatcherNextResults """ # map input types to rpc msg _params = dict() msg = dict(type="AllModelWatcher", request="Next", version=4, params=_params) reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(None) async def Stop(self): """Stop stops the watcher. Returns -> None """ # map input types to rpc msg _params = dict() msg = dict(type="AllModelWatcher", request="Stop", version=4, params=_params) reply = await self.rpc(msg) return reply
[docs] async def rpc(self, msg): """Patch rpc method to add Id.""" if not hasattr(self, "Id"): raise RuntimeError('Missing "Id" field') msg["Id"] = id from .facade import TypeEncoder reply = await self.connection.rpc(msg, encoder=TypeEncoder) return reply
[docs]class ApplicationOffersFacade(Type): name = "ApplicationOffers" version = 4
[docs] @ReturnMapping(ApplicationOffersResults) async def ApplicationOffers(self, bakery_version=None, offer_urls=None): """ApplicationOffers gets details about remote applications that match given URLs. bakery_version : int offer_urls : typing.Sequence[str] Returns -> ApplicationOffersResults """ if bakery_version is not None and not isinstance(bakery_version, int): raise Exception( f"Expected bakery_version to be a int, received: {type(bakery_version)}" ) if offer_urls is not None and not isinstance(offer_urls, (bytes, str, list)): raise Exception( f"Expected offer_urls to be a Sequence, received: {type(offer_urls)}" ) # map input types to rpc msg _params = dict() msg = dict( type="ApplicationOffers", request="ApplicationOffers", version=4, params=_params, ) _params["bakery-version"] = bakery_version _params["offer-urls"] = offer_urls reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(ErrorResults) async def DestroyOffers(self, force=None, offer_urls=None): """DestroyOffers removes the offers specified by the given URLs, forcing if necessary. force : bool offer_urls : typing.Sequence[str] Returns -> ErrorResults """ if force is not None and not isinstance(force, bool): raise Exception(f"Expected force to be a bool, received: {type(force)}") if offer_urls is not None and not isinstance(offer_urls, (bytes, str, list)): raise Exception( f"Expected offer_urls to be a Sequence, received: {type(offer_urls)}" ) # map input types to rpc msg _params = dict() msg = dict( type="ApplicationOffers", request="DestroyOffers", version=4, params=_params ) _params["force"] = force _params["offer-urls"] = offer_urls reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(QueryApplicationOffersResults) async def FindApplicationOffers(self, filters=None): """FindApplicationOffers gets details about remote applications that match given filter. filters : typing.Sequence[~OfferFilter] Returns -> QueryApplicationOffersResults """ if filters is not None and not isinstance(filters, (bytes, str, list)): raise Exception( f"Expected filters to be a Sequence, received: {type(filters)}" ) # map input types to rpc msg _params = dict() msg = dict( type="ApplicationOffers", request="FindApplicationOffers", version=4, params=_params, ) _params["Filters"] = filters reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(ConsumeOfferDetailsResults) async def GetConsumeDetails(self, offer_urls=None, user_tag=None): """GetConsumeDetails returns the details necessary to pass to another model to allow the specified args user to consume the offers represented by the args URLs. offer_urls : OfferURLs user_tag : str Returns -> ConsumeOfferDetailsResults """ if offer_urls is not None and not isinstance(offer_urls, (dict, OfferURLs)): raise Exception( f"Expected offer_urls to be a OfferURLs, received: {type(offer_urls)}" ) if user_tag is not None and not isinstance(user_tag, (bytes, str)): raise Exception( f"Expected user_tag to be a str, received: {type(user_tag)}" ) # map input types to rpc msg _params = dict() msg = dict( type="ApplicationOffers", request="GetConsumeDetails", version=4, params=_params, ) _params["offer-urls"] = offer_urls _params["user-tag"] = user_tag reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(QueryApplicationOffersResults) async def ListApplicationOffers(self, filters=None): """ListApplicationOffers gets deployed details about application offers that match given filter. The results contain details about the deployed applications such as connection count. filters : typing.Sequence[~OfferFilter] Returns -> QueryApplicationOffersResults """ if filters is not None and not isinstance(filters, (bytes, str, list)): raise Exception( f"Expected filters to be a Sequence, received: {type(filters)}" ) # map input types to rpc msg _params = dict() msg = dict( type="ApplicationOffers", request="ListApplicationOffers", version=4, params=_params, ) _params["Filters"] = filters reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(ErrorResults) async def ModifyOfferAccess(self, changes=None): """ModifyOfferAccess changes the application offer access granted to users. changes : typing.Sequence[~ModifyOfferAccess] Returns -> ErrorResults """ if changes is not None and not isinstance(changes, (bytes, str, list)): raise Exception( f"Expected changes to be a Sequence, received: {type(changes)}" ) # map input types to rpc msg _params = dict() msg = dict( type="ApplicationOffers", request="ModifyOfferAccess", version=4, params=_params, ) _params["changes"] = changes reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(ErrorResults) async def Offer(self, offers=None): """Offer makes application endpoints available for consumption at a specified URL. offers : typing.Sequence[~AddApplicationOffer] Returns -> ErrorResults """ if offers is not None and not isinstance(offers, (bytes, str, list)): raise Exception( f"Expected offers to be a Sequence, received: {type(offers)}" ) # map input types to rpc msg _params = dict() msg = dict(type="ApplicationOffers", request="Offer", version=4, params=_params) _params["Offers"] = offers reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(RemoteApplicationInfoResults) async def RemoteApplicationInfo(self, bakery_version=None, offer_urls=None): """RemoteApplicationInfo returns information about the requested remote application. This call currently has no client side API, only there for the Dashboard at this stage. bakery_version : int offer_urls : typing.Sequence[str] Returns -> RemoteApplicationInfoResults """ if bakery_version is not None and not isinstance(bakery_version, int): raise Exception( f"Expected bakery_version to be a int, received: {type(bakery_version)}" ) if offer_urls is not None and not isinstance(offer_urls, (bytes, str, list)): raise Exception( f"Expected offer_urls to be a Sequence, received: {type(offer_urls)}" ) # map input types to rpc msg _params = dict() msg = dict( type="ApplicationOffers", request="RemoteApplicationInfo", version=4, params=_params, ) _params["bakery-version"] = bakery_version _params["offer-urls"] = offer_urls reply = await self.rpc(msg) return reply
[docs]class ModelGenerationFacade(Type): name = "ModelGeneration" version = 4
[docs] @ReturnMapping(ErrorResult) async def AbortBranch(self, branch=None): """AbortBranch aborts the input branch, marking it complete. However no changes are made applicable to the whole model. No units may be assigned to the branch when aborting. branch : str Returns -> ErrorResult """ if branch is not None and not isinstance(branch, (bytes, str)): raise Exception(f"Expected branch to be a str, received: {type(branch)}") # map input types to rpc msg _params = dict() msg = dict( type="ModelGeneration", request="AbortBranch", version=4, params=_params ) _params["branch"] = branch reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(ErrorResult) async def AddBranch(self, branch=None): """AddBranch adds a new branch with the input name to the model. branch : str Returns -> ErrorResult """ if branch is not None and not isinstance(branch, (bytes, str)): raise Exception(f"Expected branch to be a str, received: {type(branch)}") # map input types to rpc msg _params = dict() msg = dict( type="ModelGeneration", request="AddBranch", version=4, params=_params ) _params["branch"] = branch reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(BranchResults) async def BranchInfo(self, branches=None, detailed=None): """BranchInfo will return details of branch identified by the input argument, including units on the branch and the configuration disjoint with the master generation. An error is returned if no in-flight branch matching in input is found. branches : typing.Sequence[str] detailed : bool Returns -> BranchResults """ if branches is not None and not isinstance(branches, (bytes, str, list)): raise Exception( f"Expected branches to be a Sequence, received: {type(branches)}" ) if detailed is not None and not isinstance(detailed, bool): raise Exception( f"Expected detailed to be a bool, received: {type(detailed)}" ) # map input types to rpc msg _params = dict() msg = dict( type="ModelGeneration", request="BranchInfo", version=4, params=_params ) _params["branches"] = branches _params["detailed"] = detailed reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(IntResult) async def CommitBranch(self, branch=None): """CommitBranch commits the input branch, making its changes applicable to the whole model and marking it complete. branch : str Returns -> IntResult """ if branch is not None and not isinstance(branch, (bytes, str)): raise Exception(f"Expected branch to be a str, received: {type(branch)}") # map input types to rpc msg _params = dict() msg = dict( type="ModelGeneration", request="CommitBranch", version=4, params=_params ) _params["branch"] = branch reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(BoolResult) async def HasActiveBranch(self, branch=None): """HasActiveBranch returns a true result if the input model has an "in-flight" branch matching the input name. branch : str Returns -> BoolResult """ if branch is not None and not isinstance(branch, (bytes, str)): raise Exception(f"Expected branch to be a str, received: {type(branch)}") # map input types to rpc msg _params = dict() msg = dict( type="ModelGeneration", request="HasActiveBranch", version=4, params=_params ) _params["branch"] = branch reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(BranchResults) async def ListCommits(self): """ListCommits will return the commits, hence only branches with generation_id higher than 0 Returns -> BranchResults """ # map input types to rpc msg _params = dict() msg = dict( type="ModelGeneration", request="ListCommits", version=4, params=_params ) reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(GenerationResult) async def ShowCommit(self, generation_id=None): """ShowCommit will return details a commit given by its generationId An error is returned if either no branch can be found corresponding to the generation id. Or the generation id given is below 1. generation_id : int Returns -> GenerationResult """ if generation_id is not None and not isinstance(generation_id, int): raise Exception( f"Expected generation_id to be a int, received: {type(generation_id)}" ) # map input types to rpc msg _params = dict() msg = dict( type="ModelGeneration", request="ShowCommit", version=4, params=_params ) _params["generation-id"] = generation_id reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(ErrorResults) async def TrackBranch(self, branch=None, entities=None, num_units=None): """TrackBranch marks the input units and/or applications as tracking the input branch, causing them to realise changes made under that branch. branch : str entities : typing.Sequence[~Entity] num_units : int Returns -> ErrorResults """ if branch is not None and not isinstance(branch, (bytes, str)): raise Exception(f"Expected branch to be a str, received: {type(branch)}") if entities is not None and not isinstance(entities, (bytes, str, list)): raise Exception( f"Expected entities to be a Sequence, received: {type(entities)}" ) if num_units is not None and not isinstance(num_units, int): raise Exception( f"Expected num_units to be a int, received: {type(num_units)}" ) # map input types to rpc msg _params = dict() msg = dict( type="ModelGeneration", request="TrackBranch", version=4, params=_params ) _params["branch"] = branch _params["entities"] = entities _params["num-units"] = num_units reply = await self.rpc(msg) return reply
[docs]class SSHClientFacade(Type): name = "SSHClient" version = 4
[docs] @ReturnMapping(SSHAddressesResults) async def AllAddresses(self, entities=None): """AllAddresses reports all addresses that might have SSH listening for each entity in args. The result is sorted with public addresses first. Machines and units are supported as entity types. entities : typing.Sequence[~Entity] Returns -> SSHAddressesResults """ if entities is not None and not isinstance(entities, (bytes, str, list)): raise Exception( f"Expected entities to be a Sequence, received: {type(entities)}" ) # map input types to rpc msg _params = dict() msg = dict(type="SSHClient", request="AllAddresses", version=4, params=_params) _params["entities"] = entities reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(CloudSpecResult) async def ModelCredentialForSSH(self): """ModelCredentialForSSH returns a cloud spec for ssh purpose. This facade call is only used for k8s model. Returns -> CloudSpecResult """ # map input types to rpc msg _params = dict() msg = dict( type="SSHClient", request="ModelCredentialForSSH", version=4, params=_params ) reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(SSHAddressResults) async def PrivateAddress(self, entities=None): """PrivateAddress reports the preferred private network address for one or more entities. Machines and units are supported. entities : typing.Sequence[~Entity] Returns -> SSHAddressResults """ if entities is not None and not isinstance(entities, (bytes, str, list)): raise Exception( f"Expected entities to be a Sequence, received: {type(entities)}" ) # map input types to rpc msg _params = dict() msg = dict( type="SSHClient", request="PrivateAddress", version=4, params=_params ) _params["entities"] = entities reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(SSHProxyResult) async def Proxy(self): """Proxy returns whether SSH connections should be proxied through the controller hosts for the model associated with the API connection. Returns -> SSHProxyResult """ # map input types to rpc msg _params = dict() msg = dict(type="SSHClient", request="Proxy", version=4, params=_params) reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(SSHAddressResults) async def PublicAddress(self, entities=None): """PublicAddress reports the preferred public network address for one or more entities. Machines and units are supported. entities : typing.Sequence[~Entity] Returns -> SSHAddressResults """ if entities is not None and not isinstance(entities, (bytes, str, list)): raise Exception( f"Expected entities to be a Sequence, received: {type(entities)}" ) # map input types to rpc msg _params = dict() msg = dict(type="SSHClient", request="PublicAddress", version=4, params=_params) _params["entities"] = entities reply = await self.rpc(msg) return reply
[docs] @ReturnMapping(SSHPublicKeysResults) async def PublicKeys(self, entities=None): """PublicKeys returns the public SSH hosts for one or more entities. Machines and units are supported. entities : typing.Sequence[~Entity] Returns -> SSHPublicKeysResults """ if entities is not None and not isinstance(entities, (bytes, str, list)): raise Exception( f"Expected entities to be a Sequence, received: {type(entities)}" ) # map input types to rpc msg _params = dict() msg = dict(type="SSHClient", request="PublicKeys", version=4, params=_params) _params["entities"] = entities reply = await self.rpc(msg) return reply