# Copyright 2023 Canonical Ltd.
# Licensed under the Apache V2, see LICENCE file for details.
#
# Module that parses constraints
#
# The current version of juju core expects the client to take
# constraints given in the form "mem=10G foo=bar" and parse them into
# json that looks like {"mem": 10240, "foo": "bar"}. This module helps us
# accomplish that task.
#
# We do not attempt to duplicate the checking done in
# client/_client.py:Value here. That class will verify that the
# constraints keys are valid, and that we can successfully dump the
# constraints dict to json.
#
# Once https://bugs.launchpad.net/juju/+bug/1645402 is addressed, this
# module should be deprecated.
#
import re
from typing import Dict, List, Mapping, Optional, TypedDict, Union
from typing_extensions import NotRequired, Required
# Matches on a string specifying memory size
MEM = re.compile("^[1-9][0-9]*[MGTP]$")
# Multiplication factors to get Megabytes
# https://github.com/juju/juju/blob/master/constraints/constraints.go#L666
FACTORS = {
"M": 1024**0,
"G": 1024**1,
"T": 1024**2,
"P": 1024**3,
"E": 1024**4,
"Z": 1024**5,
"Y": 1024**6,
}
# List of supported constraint keys, see
# http://github.com/cderici/juju/blob/2.9/core/constraints/constraints.go#L20-L39
SUPPORTED_KEYS = [
"arch",
"container",
"cpu_cores",
"cores",
"cpu_power",
"mem",
"root_disk",
"root_disk_source",
"tags",
"instance_role",
"instance_type",
"spaces",
"virt_type",
"zones",
"allocate_public_ip",
]
LIST_KEYS = {"tags", "spaces", "zones"}
SNAKE1 = re.compile(r"(.)([A-Z][a-z]+)")
SNAKE2 = re.compile("([a-z0-9])([A-Z])")
ParsedValue = Union[int, bool, str]
[docs]class ConstraintsDict(TypedDict, total=False):
allocate_public_ip: ParsedValue
arch: ParsedValue
container: ParsedValue
cores: ParsedValue
cpu_cores: ParsedValue
cpu_power: ParsedValue
instance_role: ParsedValue
instance_type: ParsedValue
mem: ParsedValue
root_disk: ParsedValue
root_dist_source: ParsedValue
spaces: List[ParsedValue]
tags: List[ParsedValue]
virt_type: ParsedValue
zones: List[ParsedValue]
[docs]def parse(constraints: Union[str, ConstraintsDict]) -> Optional[ConstraintsDict]:
"""Constraints must be expressed as a string containing only spaces
and key value pairs joined by an '='.
"""
if not constraints:
return None
if isinstance(constraints, dict):
# Forwards compatibility: already parsed
return constraints
normalized_constraints: ConstraintsDict = {}
for s in constraints.split(" "):
if "=" not in s:
raise ValueError("malformed constraint %s" % s)
k, v = s.split("=")
normalized_constraints[normalize_key(k)] = (
normalize_list_value(v) if k in LIST_KEYS else normalize_value(v)
)
return normalized_constraints
[docs]def normalize_key(orig_key: str) -> str:
key = orig_key.strip()
key = key.replace("-", "_") # Our _client lib wants "_" in place of "-"
# Convert camelCase to snake_case
key = SNAKE1.sub(r"\1_\2", key)
key = SNAKE2.sub(r"\1_\2", key).lower()
if key not in SUPPORTED_KEYS:
raise ValueError("unknown constraint in %s" % orig_key)
return key
[docs]def normalize_value(value: str) -> Union[int, bool, str]:
value = value.strip()
if MEM.match(value):
# Translate aliases to Megabytes. e.g. 1G = 10240
return int(value[:-1]) * FACTORS[value[-1:]]
if value.isdigit():
return int(value)
if value.lower() == "true":
return True
if value.lower() == "false":
return False
return value
[docs]def normalize_list_value(value: str) -> List[ParsedValue]:
values = value.strip().split(",")
return [normalize_value(value) for value in values]
STORAGE = re.compile(
# original regex:
# '(?:(?:^|(?<=,))(?:|(?P<pool>[a-zA-Z]+[-?a-zA-Z0-9]*)|(?P<count>-?[0-9]+)|(?:(?P<size>-?[0-9]+(?:\\.[0-9]+)?)(?P<size_exp>[MGTPEZY])(?:i?B)?))(?:$|,))'
# with formatting and explanation -- note that this regex is used with re.finditer:
"(?:"
"(?:^|(?<=,))" # start of string or previous match ends with ','
"(?:" # match one of the following:
"|(?P<pool>[a-zA-Z]+[-?a-zA-Z0-9]*)" # * pool: a sequence starting with a letter, ending with a letter or number,
# ------- and including letters, numbers and hyphens (no more than one in a row)
"|(?P<count>-?[0-9]+)" # * count: an optional minus sign followed by one or more digits
"|(?:" # * size (number) and size_exp (units):
"(?P<size>-?[0-9]+(?:\\.[0-9]+)?)" # -- * an optional minus sign followed by one or more digits, optionally with decimal point and more digits
"(?P<size_exp>[MGTPEZY])(?:i?B)?)" # -- * one of MGTPEZY, optionally followed by iB or B, for example 1M or 2.0MB or -3.3MiB
")"
"(?:$|,)" # end of string or ','
")"
)
[docs]class StorageConstraintDict(TypedDict):
count: Required[int] # >= 1
pool: NotRequired[str]
size: NotRequired[int]
[docs]def parse_storage_constraint(constraint: str) -> StorageConstraintDict:
storage: StorageConstraintDict = {"count": 1}
for m in STORAGE.finditer(constraint):
pool = m.group("pool")
if pool:
if "pool" in storage:
raise ValueError("pool already specified")
storage["pool"] = pool
count = m.group("count")
if count:
count = int(count)
storage["count"] = count if count > 0 else 1
size = m.group("size")
if size:
storage["size"] = int(float(size) * FACTORS[m.group("size_exp")])
return storage
[docs]def parse_storage_constraints(
constraints: Optional[Mapping[str, Union[str, StorageConstraintDict]]] = None,
) -> Dict[str, StorageConstraintDict]:
if constraints is None:
return {}
parsed: dict[str, StorageConstraintDict] = {}
for label, storage_constraint in constraints.items():
if isinstance(storage_constraint, str):
parsed[label] = parse_storage_constraint(storage_constraint)
elif isinstance(storage_constraint, dict): # pyright: ignore[reportUnnecessaryIsInstance]
parsed[label] = storage_constraint
else:
raise ValueError(
f"Unexpected constraint {storage_constraint!r}"
f" for label {label!r} in {constraints}"
)
return parsed
DEVICE = re.compile(
# original regex:
# '^(?P<count>[0-9]+)?(?:^|,)(?P<type>[^,]+)(?:$|,(?!$))(?P<attrs>(?:[^=]+=[^;]+)+)*$'
# with formatting and explanation -- note this regex is used with re.match:
"^" # start of string
"(?P<count>[0-9]+)?" # count is 1+ digits, and is optional
"(?:^|,)" # match start of string or a comma
# -- so type can be at the start or comma separated from count
"(?P<type>[^,]+)" # type is 1+ anything not a comma (including digits), and is required
"(?:$|,(?!$))" # match end of string | or a non-trailing comma
# -- so type can be at the end or followed by attrs
"(?P<attrs>(?:[^=]+=[^;]+)+)*" # attrs is any number of semicolon separated key=value items
# -- value can have spare '=' inside, possible not intended
# -- attrs will be matched with ATTR.finditer afterwards in parse_device_constraint
"$" # end of string
)
ATTR = re.compile(";?(?P<key>[^=]+)=(?P<value>[^;]+)")
[docs]class DeviceConstraintDict(TypedDict):
count: Required[int]
type: Required[str]
attributes: NotRequired[Dict[str, str]]
[docs]def parse_device_constraint(constraint: str) -> DeviceConstraintDict:
m = DEVICE.match(constraint)
if m is None:
raise ValueError("device constraint does not match")
device: DeviceConstraintDict = {}
count = m.group("count")
if count:
count = int(count)
device["count"] = count if count > 0 else 1
else:
device["count"] = 1
device["type"] = m.group("type")
attrs = m.group("attrs")
if attrs:
device["attributes"] = {
match.group("key"): match.group("value") for match in ATTR.finditer(attrs)
}
return device