Viewing file: api.py (4.91 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import json from importlib import import_module from typing import Any, Callable, Dict, List, Tuple
from uaclient.api import errors from uaclient.api.data_types import APIData, APIResponse, ErrorWarningObject from uaclient.config import UAConfig from uaclient.data_types import IncorrectFieldTypeError from uaclient.messages import API_UNKNOWN_ARG, WARN_NEW_VERSION_AVAILABLE from uaclient.version import check_for_new_version
VALID_ENDPOINTS = [ "u.pro.attach.auto.configure_retry_service.v1", "u.pro.attach.auto.full_auto_attach.v1", "u.pro.attach.auto.should_auto_attach.v1", "u.pro.attach.magic.initiate.v1", "u.pro.attach.magic.revoke.v1", "u.pro.attach.magic.wait.v1", "u.pro.packages.summary.v1", "u.pro.packages.updates.v1", "u.pro.security.fix.cve.execute.v1", "u.pro.security.fix.cve.plan.v1", "u.pro.security.fix.usn.execute.v1", "u.pro.security.fix.usn.plan.v1", "u.pro.security.status.livepatch_cves.v1", "u.pro.security.status.reboot_required.v1", "u.pro.status.enabled_services.v1", "u.pro.status.is_attached.v1", "u.pro.version.v1", "u.security.package_manifest.v1", "u.unattended_upgrades.status.v1", "u.apt_news.current_news.v1", ]
def _process_options( options: List[str], fields: List[str] ) -> Tuple[Dict[str, Any], List[ErrorWarningObject]]: kwargs = {} warnings = []
for option in options: try: k, v = option.split("=") except ValueError: raise errors.APIBadArgsFormat(arg=option)
if not k or not v: raise errors.APIBadArgsFormat(arg=option)
if k not in fields: warnings.append( ErrorWarningObject( title=API_UNKNOWN_ARG.format(arg=k).msg, code=API_UNKNOWN_ARG.name, meta={}, ) )
kwargs[k] = v
return kwargs, warnings
def _process_data( data: str, fields: List[str] ) -> Tuple[Dict[str, Any], List[ErrorWarningObject]]: kwargs = {} warnings = []
try: json_data = json.loads(data) except json.decoder.JSONDecodeError: raise errors.APIJSONDataFormatError(data=data)
for k, v in json_data.items(): if not k or not v: raise errors.APIBadArgsFormat(arg="{}:{}".format(k, v))
if k not in fields: warnings.append( ErrorWarningObject( title=API_UNKNOWN_ARG.format(arg=k).msg, code=API_UNKNOWN_ARG.name, meta={}, ) )
kwargs[k] = v
return kwargs, warnings
def call_api( endpoint_path: str, options: List[str], data: str, cfg: UAConfig ) -> APIResponse:
if endpoint_path not in VALID_ENDPOINTS: return errors.error_out( errors.APIInvalidEndpoint(endpoint=endpoint_path) )
module = import_module("uaclient.api." + endpoint_path) endpoint = module.endpoint
option_warnings = []
if endpoint.options_cls: fields = [f.key for f in endpoint.options_cls.fields] try: if options: kwargs, warnings = _process_options(options, fields) elif data: kwargs, warnings = _process_data(data, fields) else: kwargs, warnings = {}, [] option_warnings.extend(warnings) except errors.APIError as e: return errors.error_out(e)
try: options = endpoint.options_cls.from_dict(kwargs) except IncorrectFieldTypeError as e: return errors.error_out( errors.APIMissingArg(arg=e.key, endpoint=endpoint_path) )
try: result = endpoint.fn(options, cfg) except Exception as e: return errors.error_out(e)
else: if options or data: return errors.error_out( errors.APINoArgsForEndpoint(endpoint=endpoint_path) ) try: result = endpoint.fn(cfg) except Exception as e: return errors.error_out(e)
new_version = check_for_new_version() if new_version: option_warnings.append( ErrorWarningObject( title=WARN_NEW_VERSION_AVAILABLE.format( version=new_version ).msg, code=WARN_NEW_VERSION_AVAILABLE.name, meta={}, ) )
return APIResponse( _schema_version=endpoint.version, warnings=result.warnings + option_warnings, data=APIData( type=endpoint.name, attributes=result, meta=result.meta, ), )
class APIEndpoint: def __init__( self, version: str, name: str, fn: Callable, options_cls, ): self.version = version self.name = name self.fn = fn self.options_cls = options_cls
|