Source code for juju.client.facade

# Copyright 2023 Canonical Ltd.
# Licensed under the Apache V2, see LICENCE file for details.

import argparse
import builtins
import functools
import json
import keyword
import pprint
import re
import textwrap
import typing
from collections import defaultdict
from glob import glob
from pathlib import Path
from typing import Any, Mapping, Sequence, TypeVar

import typing_inspect

from . import codegen

_marker = object()

JUJU_VERSION = re.compile(r'[0-9]+\.[0-9-]+[\.\-][0-9a-z]+(\.[0-9]+)?')
# Workaround for https://bugs.launchpad.net/juju/+bug/1683906
NAUGHTY_CLASSES = ['ClientFacade', 'Client', 'ModelStatusInfo']


# Map basic types to Python's typing with a callable
SCHEMA_TO_PYTHON = {
    'string': str,
    'integer': int,
    'float': float,
    'number': float,
    'boolean': bool,
    'object': Any,
}


# Friendly warning message to stick at the top of generated files.
HEADER = """\
# DO NOT CHANGE THIS FILE! This file is auto-generated by facade.py.
# Changes will be overwritten/lost when the file is regenerated.

"""


# Classes and helper functions that we'll write to _client.py
LOOKUP_FACADE = '''
def lookup_facade(name, version):
    """
    Given a facade name and version, attempt to pull that facade out
    of the correct client<version>.py file.

    """
    for _version in range(int(version), 0, -1):
        try:
            facade = getattr(CLIENTS[str(_version)], name)
            return facade
        except (KeyError, AttributeError):
            continue
    else:
        raise ImportError("No supported version for facade: "
                          "{}".format(name))

'''

TYPE_FACTORY = '''
class TypeFactory:
    @classmethod
    def from_connection(cls, connection):
        """
        Given a connected Connection object, return an initialized and
        connected instance of an API Interface matching the name of
        this class.

        @param connection: initialized Connection object.

        """
        facade_name = cls.__name__
        if not facade_name.endswith('Facade'):
           raise TypeError('Unexpected class name: {}'.format(facade_name))
        facade_name = facade_name[:-len('Facade')]
        version = connection.facades.get(facade_name)
        if version is None:
            raise Exception('No facade {} in facades {}'.format(facade_name,
                                                                connection.facades))

        c = lookup_facade(cls.__name__, version)
        c = c()
        c.connect(connection)

        return c

    @classmethod
    def best_facade_version(cls, connection):
        """
        Returns the best facade version for a given facade. This will help with
        trying to provide different functionality for different facade versions.

        @param connection: initialized Connection object.
        """
        facade_name = cls.__name__
        if not facade_name.endswith('Facade'):
           raise TypeError('Unexpected class name: {}'.format(facade_name))
        facade_name = facade_name[:-len('Facade')]
        return connection.facades.get(facade_name)


'''

CLIENT_TABLE = '''
CLIENTS = {{
    {clients}
}}

'''


[docs]class KindRegistry(dict):
[docs] def register(self, name, version, obj): self[name] = {version: { "object": obj, }}
[docs] def lookup(self, name, version=None): """If version is omitted, max version is used""" versions = self.get(name) if not versions: return None if version: return versions[version] return versions[max(versions)]
[docs] def getObj(self, name, version=None): result = self.lookup(name, version) if result: obj = result["object"] return obj return None
[docs]class TypeRegistry(dict): def __init__(self, schema): self.schema = schema
[docs] def get(self, name): # Two way mapping refname = self.schema.referenceName(name) if refname not in self: result = TypeVar(refname) self[refname] = result self[result] = refname return self[refname]
[docs] def getRefType(self, ref): return self.get(ref)
[docs] def objType(self, obj): kind = obj.get('type') if not kind: raise ValueError("%s has no type" % obj) result = SCHEMA_TO_PYTHON.get(kind) if not result: raise ValueError("%s has type %s" % (obj, kind)) return result
[docs] def refType(self, obj): return self.getRefType(obj["$ref"])
CLASSES = {} factories = codegen.Capture()
[docs]def booler(v): if isinstance(v, str): if v == "false": return False return bool(v)
basic_types = [str, bool, int, float] type_mapping = { 'str': '(bytes, str)', 'Sequence': '(bytes, str, list)', 'Union': 'dict', 'Mapping': 'dict', }
[docs]def name_to_py(name): result = name.replace("-", "_") result = result.lower() if keyword.iskeyword(result) or result in dir(builtins): result += "_" return result
[docs]def var_type_to_py(kind): return 'None'
[docs]def kind_to_py(kind): if kind is None or kind is typing.Any: return 'None', '', False name = "" if typing_inspect.is_generic_type(kind): origin = typing_inspect.get_origin(kind) name = origin.__name__ else: name = kind.__name__ if (kind in basic_types or type(kind) in basic_types): return name, type_mapping.get(name) or name, True if (name in type_mapping): return name, type_mapping[name], True suffix = name.lstrip("~") return suffix, "(dict, {})".format(suffix), True
[docs]def strcast(kind, keep_builtins=False): if (kind in basic_types or type(kind) in basic_types) and keep_builtins is False: return kind.__name__ if str(kind).startswith('~'): return str(kind)[1:] if kind is typing.Any: return 'Any' try: if issubclass(kind, typing.GenericMeta): return str(kind)[1:] except AttributeError: pass return kind
[docs]class Args(list): def __init__(self, schema, defs): self.schema = schema self.defs = defs if defs: rtypes = schema.registry.getObj(schema.types[defs]) if len(rtypes) == 1: if not self.do_explode(rtypes[0][1]): for name, rtype in rtypes: self.append((name, rtype)) else: for name, rtype in rtypes: self.append((name, rtype))
[docs] def do_explode(self, kind): if kind is Any: return False if kind in basic_types or type(kind) is typing.TypeVar: return False if typing_inspect.is_generic_type(kind) and issubclass(typing_inspect.get_origin(kind), Sequence): return False if typing_inspect.is_generic_type(kind) and issubclass(typing_inspect.get_origin(kind), Mapping): return False self.clear() self.extend(Args(self.schema, kind)) return True
[docs] def PyToSchemaMapping(self): m = {} for n, rt in self: m[name_to_py(n)] = n return m
[docs] def SchemaToPyMapping(self): m = {} for n, tr in self: m[n] = name_to_py(n) return m
def _format(self, name, rtype, typed=True): if typed: return "{} : {}".format( name_to_py(name), strcast(rtype) ) else: return name_to_py(name) def _get_arg_str(self, typed=False, joined=", "): if self: parts = [] for item in self: parts.append(self._format(item[0], item[1], typed)) if joined: return joined.join(parts) return parts return ''
[docs] def as_kwargs(self): if self: parts = [] for item in self: var_name = name_to_py(item[0]) var_type = var_type_to_py(item[1]) parts.append('{}={}'.format(var_name, var_type)) return ', '.join(parts) return ''
[docs] def as_validation(self): """ as_validation returns a series of validation statements for every item in the the Args. """ parts = [] for item in self: var_name = name_to_py(item[0]) var_type, var_sub_type, ok = kind_to_py(item[1]) if ok: parts.append(buildValidation(var_name, var_type, var_sub_type)) return '\n'.join(parts)
[docs] def typed(self): return self._get_arg_str(True)
def __str__(self): return self._get_arg_str(False)
[docs] def get_doc(self): return self._get_arg_str(True, "\n")
[docs]def buildValidation(name, instance_type, instance_sub_type, ident=None): INDENT = ident or " " source = """{ident}if {name} is not None and not isinstance({name}, {instance_sub_type}): {ident} raise Exception("Expected {name} to be a {instance_type}, received: {{}}".format(type({name}))) """.format(ident=INDENT, name=name, instance_type=instance_type, instance_sub_type=instance_sub_type) return source
[docs]def buildTypes(schema, capture): INDENT = " " for kind in sorted((k for k in schema.types if not isinstance(k, str)), key=lambda x: str(x)): name = schema.types[kind] if name in capture and name not in NAUGHTY_CLASSES: continue args = Args(schema, kind) # Write Factory class for _client.py make_factory(name) # Write actual class source = [""" class {}(Type): _toSchema = {} _toPy = {} def __init__(self{}{}, **unknown_fields): ''' {} '''""".format( name, # pprint these to get stable ordering across regens pprint.pformat(args.PyToSchemaMapping(), width=999), pprint.pformat(args.SchemaToPyMapping(), width=999), ", " if args else "", args.as_kwargs(), textwrap.indent(args.get_doc(), INDENT * 2))] if not args: source.append("{}self.unknown_fields = unknown_fields".format(INDENT * 2)) else: # do the validation first, before setting the variables for arg in args: arg_name = name_to_py(arg[0]) arg_type = arg[1] arg_type_name = strcast(arg_type) if arg_type in basic_types or arg_type is typing.Any: source.append("{}{}_ = {}".format(INDENT * 2, arg_name, arg_name)) elif type(arg_type) is typing.TypeVar: source.append("{}{}_ = {}.from_json({}) " "if {} else None".format(INDENT * 2, arg_name, arg_type_name, arg_name, arg_name)) elif typing_inspect.is_generic_type(arg_type) and issubclass(typing_inspect.get_origin(arg_type), Sequence): parameters = typing_inspect.get_parameters(arg_type) value_type = ( parameters[0] if len(parameters) else None ) if type(value_type) is typing.TypeVar: source.append( "{}{}_ = [{}.from_json(o) " "for o in {} or []]".format(INDENT * 2, arg_name, strcast(value_type), arg_name)) else: source.append("{}{}_ = {}".format(INDENT * 2, arg_name, arg_name)) elif typing_inspect.is_generic_type(arg_type) and issubclass(typing_inspect.get_origin(arg_type), Mapping): parameters = typing_inspect.get_parameters(arg_type) value_type = ( parameters[0] if len(parameters) else None ) if type(value_type) is typing.TypeVar: source.append( "{}{}_ = {{k: {}.from_json(v) " "for k, v in ({} or dict()).items()}}".format( INDENT * 2, arg_name, strcast(value_type), arg_name)) else: source.append("{}{}_ = {}".format(INDENT * 2, arg_name, arg_name)) else: source.append("{}{}_ = {}".format(INDENT * 2, arg_name, arg_name)) if len(args) > 0: source.append('\n{}# Validate arguments against known Juju API types.'.format(INDENT * 2)) for arg in args: arg_name = "{}_".format(name_to_py(arg[0])) arg_type, arg_sub_type, ok = kind_to_py(arg[1]) if ok: source.append('{}'.format(buildValidation(arg_name, arg_type, arg_sub_type, ident=INDENT * 2))) for arg in args: arg_name = name_to_py(arg[0]) source.append('{}self.{} = {}_'.format(INDENT * 2, arg_name, arg_name)) # Ensure that we take the kwargs (unknown_fields) and put it on the # Results/Params so we can inspect it. source.append("{}self.unknown_fields = unknown_fields".format(INDENT * 2)) source = "\n".join(source) capture.clear(name) capture[name].write(source) capture[name].write("\n\n") if name is None: print(source) co = compile(source, __name__, "exec") ns = _getns(schema) exec(co, ns) cls = ns[name] CLASSES[name] = cls
[docs]def retspec(schema, defs): # return specs # only return 1, so if there is more than one type # we need to include a union # In truth there is only 1 return # Error or the expected Type if not defs: return None if defs in basic_types: return strcast(defs, False) return strcast(defs, False)
[docs]def ReturnMapping(cls): # Annotate the method with a return Type # so the value can be cast def decorator(f): @functools.wraps(f) async def wrapper(*args, **kwargs): nonlocal cls reply = await f(*args, **kwargs) if cls is None: return reply if 'error' in reply: cls = CLASSES['Error'] if typing_inspect.is_generic_type(cls) and issubclass(typing_inspect.get_origin(cls), Sequence): parameters = typing_inspect.get_parameters(cls) result = [] item_cls = parameters[0] for item in reply: result.append(item_cls.from_json(item)) """ if 'error' in item: cls = CLASSES['Error'] else: cls = item_cls result.append(cls.from_json(item)) """ else: result = cls.from_json(reply['response']) return result return wrapper return decorator
[docs]def makeFunc(cls, name, description, params, result, _async=True): INDENT = " " args = Args(cls.schema, params) assignments = [] toschema = args.PyToSchemaMapping() for arg in args._get_arg_str(False, False): assignments.append("{}_params[\'{}\'] = {}".format(INDENT, toschema[arg], arg)) assignments = "\n".join(assignments) res = retspec(cls.schema, result) source = """ @ReturnMapping({rettype}) {_async}def {name}(self{argsep}{args}): ''' {docstring} Returns -> {res} ''' {validation} # map input types to rpc msg _params = dict() msg = dict(type='{cls.name}', request='{name}', version={cls.version}, params=_params) {assignments} reply = {_await}self.rpc(msg) return reply """ if description != "": description = "{}\n\n".format(description) doc_string = "{}{}".format(description, args.get_doc()) fsource = source.format(_async="async " if _async else "", name=name, argsep=", " if args else "", args=args.as_kwargs(), res=res, validation=args.as_validation(), rettype=result.__name__ if result else None, docstring=textwrap.indent(doc_string, INDENT), cls=cls, assignments=assignments, _await="await " if _async else "") ns = _getns(cls.schema) exec(fsource, ns) func = ns[name] return func, fsource
[docs]def makeRPCFunc(cls): source = """ async def rpc(self, msg): ''' Patch rpc method to add Id. ''' if not hasattr(self, 'Id'): raise RuntimeError('Missing "Id" field') msg['Id'] = id from .facade import TypeEncoder reply = await self.connection.rpc(msg, encoder=TypeEncoder) return reply """ ns = _getns(cls.schema) exec(source, ns) func = ns["rpc"] return func, source
[docs]def buildMethods(cls, capture): properties = cls.schema['properties'] for methodname in sorted(properties): method, source = _buildMethod(cls, methodname) setattr(cls, methodname, method) capture["{}Facade".format(cls.__name__)].write(source, depth=1)
def _buildMethod(cls, name): params = None result = None method = cls.schema['properties'][name] description = "" if 'description' in method: description = method['description'] if 'properties' in method: prop = method['properties'] spec = prop.get('Params') if spec: params = cls.schema.types.get(spec['$ref']) spec = prop.get('Result') if spec: if '$ref' in spec: result = cls.schema.types.get(spec['$ref']) else: result = SCHEMA_TO_PYTHON[spec['type']] return makeFunc(cls, name, description, params, result)
[docs]def buildWatcherRPCMethods(cls, capture): properties = cls.schema['properties'] if "Next" in properties and "Stop" in properties: method, source = makeRPCFunc(cls) setattr(cls, "rpc", method) capture["{}Facade".format(cls.__name__)].write(source, depth=1)
[docs]def buildFacade(schema): cls = type(schema.name, (Type,), dict(name=schema.name, version=schema.version, schema=schema)) source = """ class {name}Facade(Type): name = '{name}' version = {version} schema = {schema} """.format(name=schema.name, version=schema.version, schema=textwrap.indent(pprint.pformat(schema), " ")) return cls, source
[docs]class TypeEncoder(json.JSONEncoder):
[docs] def default(self, obj): if isinstance(obj, Type): return obj.serialize() return json.JSONEncoder.default(self, obj)
[docs]class Type:
[docs] def connect(self, connection): self.connection = connection
def __repr__(self): return "{}({})".format(self.__class__, self.__dict__) def __eq__(self, other): if not isinstance(other, Type): return NotImplemented return self.__dict__ == other.__dict__
[docs] async def rpc(self, msg): result = await self.connection.rpc(msg, encoder=TypeEncoder) return result
[docs] @classmethod def from_json(cls, data): def _parse_nested_list_entry(expr, result_dict): if isinstance(expr, str): if '>' in expr or '>=' in expr: # something like juju >= 2.9.31 i = expr.index('>') _key = expr[:i].strip() _value = expr[i:].strip() result_dict[_key] = _value else: # this is a simple entry result_dict[expr] = '' elif isinstance(expr, dict): for _, v in expr.items(): _parse_nested_list_entry(v, result_dict) elif isinstance(expr, list): for v in expr: _parse_nested_list_entry(v, result_dict) else: raise TypeError(f"Unexpected type of entry in assumes expression: {expr}") if isinstance(data, cls): return data if isinstance(data, str): try: data = json.loads(data) except json.JSONDecodeError: raise if isinstance(data, dict): d = {} for k, v in (data or {}).items(): d[cls._toPy.get(k, k)] = v try: return cls(**d) except TypeError: raise if isinstance(data, list): # check: https://juju.is/docs/sdk/assumes # assumes are in the form of a list d = {} _parse_nested_list_entry(data, d) return cls(**d) return None
[docs] def serialize(self): d = {} for attr, tgt in self._toSchema.items(): d[tgt] = getattr(self, attr) return d
[docs] def to_json(self): return json.dumps(self.serialize(), cls=TypeEncoder, sort_keys=True)
# treat subscript gets as JSON representation def __getitem__(self, key): attr = self._toPy[key] return getattr(self, attr) # treat subscript sets as JSON representation def __setitem__(self, key, value): attr = self._toPy[key] setattr(self, attr, value) # legacy: generated definitions used to not correctly # create typed objects and would use dict instead (from JSON) # so we emulate some dict methods.
[docs] def get(self, key, default=None): try: attr = self._toPy[key] except KeyError: return default return getattr(self, attr, default)
[docs]class Schema(dict): def __init__(self, schema): self.name = schema['Name'] self.version = schema['Version'] self.update(schema['Schema']) self.registry = KindRegistry() self.types = TypeRegistry(self)
[docs] def referenceName(self, ref): if ref.startswith("#/definitions/"): ref = ref.rsplit("/", 1)[-1] return ref
[docs] def buildDefinitions(self): # here we are building the types out # anything in definitions is a type # but these may contain references themselves # so we dfs to the bottom and build upwards # when a types is already in the registry defs = self.get('definitions') if not defs: return definitions = {} for d, data in defs.items(): if d in self.registry and d not in NAUGHTY_CLASSES: continue if data.get("type") != "object": continue definitions[d] = data for d, definition in definitions.items(): node = self.buildObject(definition, d) self.registry.register(d, self.version, node) self.types.getRefType(d)
[docs] def buildObject(self, node, name=None): # we don't need to build types recursively here # they are all in definitions already # we only want to include the type reference # which we can derive from the name struct = [] add = struct.append props = node.get("properties") pprops = node.get("patternProperties") if props: # Sort these so the __init__ arg list for each Type remains # consistently ordered across regens of client.py for p in sorted(props): prop = props[p] if "$ref" in prop: add((p, self.types.refType(prop))) else: kind = prop['type'] if kind == "array": add((p, self.buildArray(prop))) elif kind == "object": struct.extend(self.buildObject(prop, p)) else: add((p, self.types.objType(prop))) if pprops: if ".*" not in pprops: raise ValueError( "Cannot handle actual pattern in patternProperties %s" % pprops) pprop = pprops[".*"] if "$ref" in pprop: add((name, Mapping[str, self.types.refType(pprop)])) return struct ppkind = pprop["type"] if ppkind == "array": add((name, Mapping[str, self.buildArray(pprop)])) else: add((name, Mapping[str, SCHEMA_TO_PYTHON[ppkind]])) if not struct and node.get('additionalProperties', False): add((name, SCHEMA_TO_PYTHON.get('object'))) return struct
[docs] def buildArray(self, obj): # return a sequence from an array in the schema if "$ref" in obj: return Sequence[self.types.refType(obj)] else: kind = obj.get("type") if kind and kind == "array": items = obj['items'] return self.buildArray(items) else: return Sequence[self.types.objType(obj)]
def _getns(schema): ns = {'Type': Type, 'typing': typing, 'ReturnMapping': ReturnMapping } # Copy our types into the globals of the method for facade in schema.registry: ns[facade] = schema.registry.getObj(facade) return ns
[docs]def make_factory(name): if name in factories: del factories[name] factories[name].write("class {}(TypeFactory):\n pass\n\n".format(name))
[docs]def write_facades(captures, options): """ Write the Facades to the appropriate _client<version>.py """ for version in sorted(captures.keys()): filename = "{}/_client{}.py".format(options.output_dir, version) with open(filename, "w") as f: f.write(HEADER) f.write("from juju.client.facade import Type, ReturnMapping\n") f.write("from juju.client._definitions import *\n\n") for key in sorted( [k for k in captures[version].keys() if "Facade" in k]): print(captures[version][key], file=f) # Return the last (most recent) version for use in other routines. return version
[docs]def write_definitions(captures, options): """ Write auxillary (non versioned) classes to _definitions.py The auxillary classes currently get written redudantly into each capture object, so we can look in one of them -- we just use the last one from the loop above. """ with open("{}/_definitions.py".format(options.output_dir), "w") as f: f.write(HEADER) f.write("from juju.client.facade import Type, ReturnMapping\n\n") for key in sorted( [k for k in captures.keys() if "Facade" not in k]): print(captures[key], file=f)
[docs]def write_client(captures, options): """ Write the TypeFactory classes to _client.py, along with some imports and tables so that we can look up versioned Facades. """ with open("{}/_client.py".format(options.output_dir), "w") as f: f.write(HEADER) f.write("from juju.client._definitions import *\n\n") clients = ", ".join("_client{}".format(v) for v in captures) # from juju.client import _client2, _client1, _client3 ... f.write("\nfrom juju.client import " + clients + "\n\n") # CLIENTS = { .... f.write(CLIENT_TABLE.format(clients=",\n ".join( ['"{}": _client{}'.format(v, v) for v in captures]))) f.write(LOOKUP_FACADE) f.write(TYPE_FACTORY) for key in sorted([k for k in factories.keys() if "Facade" in k]): print(factories[key], file=f)
[docs]def generate_definitions(schemas): # Build all of the auxillary (unversioned) classes # TODO: get rid of some of the excess trips through loops in the # called functions. definitions = codegen.Capture() for juju_version in sorted(schemas.keys()): for schema in schemas[juju_version]: schema.buildDefinitions() # ensure we write the latest ones first, so that earlier revisions # get dropped. for juju_version in sorted(schemas.keys(), reverse=True): for schema in schemas[juju_version]: buildTypes(schema, definitions) return definitions
[docs]def generate_facades(schemas): captures = defaultdict(codegen.Capture) # Build the Facade classes for juju_version in sorted(schemas.keys()): for schema in schemas[juju_version]: cls, source = buildFacade(schema) cls_name = "{}Facade".format(schema.name) captures[schema.version].clear(cls_name) # Make the factory class for _client.py make_factory(cls_name) # Make the actual class captures[schema.version][cls_name].write(source) # Build the methods for each Facade class. buildMethods(cls, captures[schema.version]) # Build the override RPC method if the Facade is a watcher. buildWatcherRPCMethods(cls, captures[schema.version]) # Mark this Facade class as being done for this version -- # helps mitigate some excessive looping. CLASSES[schema.name] = cls return captures
[docs]def load_schemas(options): schemas = {} for p in sorted(glob(options.schema)): if 'latest' in p: juju_version = 'latest' else: try: juju_version = re.search(JUJU_VERSION, p).group() except AttributeError: print("Cannot extract a juju version from {}".format(p)) print("Schemas must include a juju version in the filename") raise SystemExit(1) new_schemas = json.loads(Path(p).read_text("utf-8")) schemas[juju_version] = [Schema(s) for s in new_schemas] return schemas
[docs]def setup(): parser = argparse.ArgumentParser() parser.add_argument("-s", "--schema", default="juju/client/schemas*") parser.add_argument("-o", "--output_dir", default="juju/client") options = parser.parse_args() return options
[docs]def main(): options = setup() schemas = load_schemas(options) # Generate some text blobs definitions = generate_definitions(schemas) captures = generate_facades(schemas) # ... and write them out write_definitions(definitions, options) write_facades(captures, options) write_client(captures, options)
if __name__ == '__main__': main()