Viewing file: config.py (48.92 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import functools import logging import os import re import string import sys from collections import namedtuple from itertools import chain from operator import attrgetter from operator import itemgetter
import yaml
try: from functools import cached_property except ImportError: from cached_property import cached_property
from . import types from ..const import COMPOSE_SPEC as VERSION from ..const import COMPOSEFILE_V1 as V1 from ..utils import build_string_dict from ..utils import json_hash from ..utils import parse_bytes from ..utils import parse_nanoseconds_int from ..utils import splitdrive from ..version import ComposeVersion from .environment import env_vars_from_file from .environment import Environment from .environment import split_env from .errors import CircularReference from .errors import ComposeFileNotFound from .errors import ConfigurationError from .errors import DuplicateOverrideFileFound from .errors import VERSION_EXPLANATION from .interpolation import interpolate_environment_variables from .sort_services import get_container_name_from_network_mode from .sort_services import get_service_name_from_network_mode from .sort_services import sort_service_dicts from .types import MountSpec from .types import parse_extra_hosts from .types import parse_restart_spec from .types import SecurityOpt from .types import ServiceLink from .types import ServicePort from .types import VolumeFromSpec from .types import VolumeSpec from .validation import match_named_volumes from .validation import validate_against_config_schema from .validation import validate_config_section from .validation import validate_cpu from .validation import validate_credential_spec from .validation import validate_depends_on from .validation import validate_extends_file_path from .validation import validate_healthcheck from .validation import validate_ipc_mode from .validation import validate_links from .validation import validate_network_mode from .validation import validate_pid_mode from .validation import validate_service_constraints from .validation import validate_top_level_object from .validation import validate_ulimits
DOCKER_CONFIG_KEYS = [ 'cap_add', 'cap_drop', 'cgroup_parent', 'command', 'cpu_count', 'cpu_percent', 'cpu_period', 'cpu_quota', 'cpu_rt_period', 'cpu_rt_runtime', 'cpu_shares', 'cpus', 'cpuset', 'detach', 'device_cgroup_rules', 'devices', 'dns', 'dns_search', 'dns_opt', 'domainname', 'entrypoint', 'env_file', 'environment', 'extra_hosts', 'group_add', 'hostname', 'healthcheck', 'image', 'ipc', 'isolation', 'labels', 'links', 'mac_address', 'mem_limit', 'mem_reservation', 'memswap_limit', 'mem_swappiness', 'net', 'oom_score_adj', 'oom_kill_disable', 'pid', 'ports', 'privileged', 'read_only', 'restart', 'runtime', 'secrets', 'security_opt', 'shm_size', 'pids_limit', 'stdin_open', 'stop_signal', 'sysctls', 'tty', 'user', 'userns_mode', 'volume_driver', 'volumes', 'volumes_from', 'working_dir', ]
ALLOWED_KEYS = DOCKER_CONFIG_KEYS + [ 'blkio_config', 'build', 'container_name', 'credential_spec', 'dockerfile', 'init', 'log_driver', 'log_opt', 'logging', 'network_mode', 'platform', 'profiles', 'scale', 'stop_grace_period', ]
DOCKER_VALID_URL_PREFIXES = ( 'http://', 'https://', 'git://', 'github.com/', 'git@', )
SUPPORTED_FILENAMES = [ 'docker-compose.yml', 'docker-compose.yaml', 'compose.yml', 'compose.yaml', ]
DEFAULT_OVERRIDE_FILENAMES = ('docker-compose.override.yml', 'docker-compose.override.yaml', 'compose.override.yml', 'compose.override.yaml')
log = logging.getLogger(__name__)
class ConfigDetails(namedtuple('_ConfigDetails', 'working_dir config_files environment')): """ :param working_dir: the directory to use for relative paths in the config :type working_dir: string :param config_files: list of configuration files to load :type config_files: list of :class:`ConfigFile` :param environment: computed environment values for this project :type environment: :class:`environment.Environment` """ def __new__(cls, working_dir, config_files, environment=None): if environment is None: environment = Environment.from_env_file(working_dir) return super().__new__( cls, working_dir, config_files, environment )
class ConfigFile(namedtuple('_ConfigFile', 'filename config')): """ :param filename: filename of the config file :type filename: string :param config: contents of the config file :type config: :class:`dict` """
@classmethod def from_filename(cls, filename): return cls(filename, load_yaml(filename))
@cached_property def config_version(self): version = self.config.get('version', None) if isinstance(version, dict): return V1 return ComposeVersion(version) if version else self.version
@cached_property def version(self): version = self.config.get('version', None) if not version: # no version is specified in the config file services = self.config.get('services', None) networks = self.config.get('networks', None) volumes = self.config.get('volumes', None) if services or networks or volumes: # validate V2/V3 structure for section in ['services', 'networks', 'volumes']: validate_config_section( self.filename, self.config.get(section, {}), section) return VERSION
# validate V1 structure validate_config_section( self.filename, self.config, 'services') return V1
if isinstance(version, dict): log.warning('Unexpected type for "version" key in "{}". Assuming ' '"version" is the name of a service, and defaulting to ' 'Compose file version {}.'.format(self.filename, V1)) return V1
if not isinstance(version, str): raise ConfigurationError( 'Version in "{}" is invalid - it should be a string.' .format(self.filename))
if isinstance(version, str): version_pattern = re.compile(r"^[1-3]+(\.\d+)?$") if not version_pattern.match(version): raise ConfigurationError( 'Version "{}" in "{}" is invalid.' .format(version, self.filename))
if version.startswith("1"): raise ConfigurationError( 'Version in "{}" is invalid. {}' .format(self.filename, VERSION_EXPLANATION) )
return VERSION
def get_service(self, name): return self.get_service_dicts()[name]
def get_service_dicts(self): if self.version == V1: return self.config return self.config.get('services', {})
def get_volumes(self): return {} if self.version == V1 else self.config.get('volumes', {})
def get_networks(self): return {} if self.version == V1 else self.config.get('networks', {})
def get_secrets(self): return {} if self.version == V1 else self.config.get('secrets', {})
def get_configs(self): return {} if self.version == V1 else self.config.get('configs', {})
class Config(namedtuple('_Config', 'config_version version services volumes networks secrets configs')): """ :param config_version: configuration file version :type config_version: int :param version: configuration version :type version: int :param services: List of service description dictionaries :type services: :class:`list` :param volumes: Dictionary mapping volume names to description dictionaries :type volumes: :class:`dict` :param networks: Dictionary mapping network names to description dictionaries :type networks: :class:`dict` :param secrets: Dictionary mapping secret names to description dictionaries :type secrets: :class:`dict` :param configs: Dictionary mapping config names to description dictionaries :type configs: :class:`dict` """
class ServiceConfig(namedtuple('_ServiceConfig', 'working_dir filename name config')):
@classmethod def with_abs_paths(cls, working_dir, filename, name, config): if not working_dir: raise ValueError("No working_dir for ServiceConfig.")
return cls( os.path.abspath(working_dir), os.path.abspath(filename) if filename else filename, name, config)
def find(base_dir, filenames, environment, override_dir=None): if filenames == ['-']: return ConfigDetails( os.path.abspath(override_dir) if override_dir else os.getcwd(), [ConfigFile(None, yaml.safe_load(sys.stdin))], environment )
if filenames: filenames = [os.path.join(base_dir, f) for f in filenames] else: # search for compose files in the base dir and its parents filenames = get_default_config_files(base_dir) if not filenames and not override_dir: # none found in base_dir and no override_dir defined raise ComposeFileNotFound(SUPPORTED_FILENAMES) if not filenames: # search for compose files in the project directory and its parents filenames = get_default_config_files(override_dir) if not filenames: raise ComposeFileNotFound(SUPPORTED_FILENAMES)
log.debug("Using configuration files: {}".format(",".join(filenames))) return ConfigDetails( override_dir if override_dir else os.path.dirname(filenames[0]), [ConfigFile.from_filename(f) for f in filenames], environment )
def validate_config_version(config_files): main_file = config_files[0] validate_top_level_object(main_file)
for next_file in config_files[1:]: validate_top_level_object(next_file)
if main_file.version != next_file.version: raise ConfigurationError( "Version mismatch: file {} specifies version {} but " "extension file {} uses version {}".format( main_file.filename, main_file.version, next_file.filename, next_file.version))
def get_default_config_files(base_dir): (candidates, path) = find_candidates_in_parent_dirs(SUPPORTED_FILENAMES, base_dir)
if not candidates: return None
winner = candidates[0]
if len(candidates) > 1: log.warning("Found multiple config files with supported names: %s", ", ".join(candidates)) log.warning("Using %s\n", winner)
return [os.path.join(path, winner)] + get_default_override_file(path)
def get_default_override_file(path): override_files_in_path = [os.path.join(path, override_filename) for override_filename in DEFAULT_OVERRIDE_FILENAMES if os.path.exists(os.path.join(path, override_filename))] if len(override_files_in_path) > 1: raise DuplicateOverrideFileFound(override_files_in_path) return override_files_in_path
def find_candidates_in_parent_dirs(filenames, path): """ Given a directory path to start, looks for filenames in the directory, and then each parent directory successively, until found.
Returns tuple (candidates, path). """ candidates = [filename for filename in filenames if os.path.exists(os.path.join(path, filename))]
if not candidates: parent_dir = os.path.join(path, '..') if os.path.abspath(parent_dir) != os.path.abspath(path): return find_candidates_in_parent_dirs(filenames, parent_dir)
return (candidates, path)
def check_swarm_only_config(service_dicts): warning_template = ( "Some services ({services}) use the '{key}' key, which will be ignored. " "Compose does not support '{key}' configuration - use " "`docker stack deploy` to deploy to a swarm." ) key = 'configs' services = [s for s in service_dicts if s.get(key)] if services: log.warning( warning_template.format( services=", ".join(sorted(s['name'] for s in services)), key=key ) )
def load(config_details, interpolate=True): """Load the configuration from a working directory and a list of configuration files. Files are loaded in order, and merged on top of each other to create the final configuration.
Return a fully interpolated, extended and validated configuration. """
# validate against latest version and if fails do it against v1 schema validate_config_version(config_details.config_files)
processed_files = [ process_config_file(config_file, config_details.environment, interpolate=interpolate) for config_file in config_details.config_files ] config_details = config_details._replace(config_files=processed_files)
main_file = config_details.config_files[0] volumes = load_mapping( config_details.config_files, 'get_volumes', 'Volume' ) networks = load_mapping( config_details.config_files, 'get_networks', 'Network' ) secrets = load_mapping( config_details.config_files, 'get_secrets', 'Secret', config_details.working_dir ) configs = load_mapping( config_details.config_files, 'get_configs', 'Config', config_details.working_dir ) service_dicts = load_services(config_details, main_file, interpolate=interpolate)
if main_file.version != V1: for service_dict in service_dicts: match_named_volumes(service_dict, volumes)
check_swarm_only_config(service_dicts)
return Config(main_file.config_version, main_file.version, service_dicts, volumes, networks, secrets, configs)
def load_mapping(config_files, get_func, entity_type, working_dir=None): mapping = {}
for config_file in config_files: for name, config in getattr(config_file, get_func)().items(): mapping[name] = config or {} if not config: continue
external = config.get('external') if external: validate_external(entity_type, name, config, config_file.version) if isinstance(external, dict): config['name'] = external.get('name') elif not config.get('name'): config['name'] = name
if 'labels' in config: config['labels'] = parse_labels(config['labels'])
if 'file' in config: config['file'] = expand_path(working_dir, config['file'])
if 'driver_opts' in config: config['driver_opts'] = build_string_dict( config['driver_opts'] ) device = format_device_option(entity_type, config) if device: config['driver_opts']['device'] = device return mapping
def format_device_option(entity_type, config): if entity_type != 'Volume': return # default driver is 'local' driver = config.get('driver', 'local') if driver != 'local': return o = config['driver_opts'].get('o') device = config['driver_opts'].get('device') if o and o == 'bind' and device: fullpath = os.path.abspath(os.path.expanduser(device)) return fullpath
def validate_external(entity_type, name, config, version): for k in config.keys(): if entity_type == 'Network' and k == 'driver': continue if k not in ['external', 'name']: raise ConfigurationError( "{} {} declared as external but specifies additional attributes " "({}).".format( entity_type, name, ', '.join(k for k in config if k != 'external')))
def load_services(config_details, config_file, interpolate=True): def build_service(service_name, service_dict, service_names): service_config = ServiceConfig.with_abs_paths( config_details.working_dir, config_file.filename, service_name, service_dict) resolver = ServiceExtendsResolver( service_config, config_file, environment=config_details.environment ) service_dict = process_service(resolver.run())
service_config = service_config._replace(config=service_dict) validate_service(service_config, service_names, config_file) service_dict = finalize_service( service_config, service_names, config_file.version, config_details.environment, interpolate ) return service_dict
def build_services(service_config): service_names = service_config.keys() return sort_service_dicts([ build_service(name, service_dict, service_names) for name, service_dict in service_config.items() ])
def merge_services(base, override): all_service_names = set(base) | set(override) return { name: merge_service_dicts_from_files( base.get(name, {}), override.get(name, {}), config_file.version) for name in all_service_names }
service_configs = [ file.get_service_dicts() for file in config_details.config_files ]
service_config = functools.reduce(merge_services, service_configs)
return build_services(service_config)
def interpolate_config_section(config_file, config, section, environment): return interpolate_environment_variables( config_file.version, config, section, environment )
def process_config_section(config_file, config, section, environment, interpolate): validate_config_section(config_file.filename, config, section) if interpolate: return interpolate_environment_variables( config_file.version, config, section, environment) else: return config
def process_config_file(config_file, environment, service_name=None, interpolate=True): services = process_config_section( config_file, config_file.get_service_dicts(), 'service', environment, interpolate, )
if config_file.version > V1: processed_config = dict(config_file.config) processed_config['services'] = services processed_config['volumes'] = process_config_section( config_file, config_file.get_volumes(), 'volume', environment, interpolate, ) processed_config['networks'] = process_config_section( config_file, config_file.get_networks(), 'network', environment, interpolate, ) processed_config['secrets'] = process_config_section( config_file, config_file.get_secrets(), 'secret', environment, interpolate, ) processed_config['configs'] = process_config_section( config_file, config_file.get_configs(), 'config', environment, interpolate, ) else: processed_config = services
config_file = config_file._replace(config=processed_config) validate_against_config_schema(config_file, config_file.version)
if service_name and service_name not in services: raise ConfigurationError( "Cannot extend service '{}' in {}: Service not found".format( service_name, config_file.filename))
return config_file
class ServiceExtendsResolver: def __init__(self, service_config, config_file, environment, already_seen=None): self.service_config = service_config self.working_dir = service_config.working_dir self.already_seen = already_seen or [] self.config_file = config_file self.environment = environment
@property def signature(self): return self.service_config.filename, self.service_config.name
def detect_cycle(self): if self.signature in self.already_seen: raise CircularReference(self.already_seen + [self.signature])
def run(self): self.detect_cycle()
if 'extends' in self.service_config.config: service_dict = self.resolve_extends(*self.validate_and_construct_extends()) return self.service_config._replace(config=service_dict)
return self.service_config
def validate_and_construct_extends(self): extends = self.service_config.config['extends'] if not isinstance(extends, dict): extends = {'service': extends}
config_path = self.get_extended_config_path(extends) service_name = extends['service']
if config_path == os.path.abspath(self.config_file.filename): try: service_config = self.config_file.get_service(service_name) except KeyError: raise ConfigurationError( "Cannot extend service '{}' in {}: Service not found".format( service_name, config_path) ) else: extends_file = ConfigFile.from_filename(config_path) validate_config_version([self.config_file, extends_file]) extended_file = process_config_file( extends_file, self.environment, service_name=service_name ) service_config = extended_file.get_service(service_name)
return config_path, service_config, service_name
def resolve_extends(self, extended_config_path, service_dict, service_name): resolver = ServiceExtendsResolver( ServiceConfig.with_abs_paths( os.path.dirname(extended_config_path), extended_config_path, service_name, service_dict), self.config_file, already_seen=self.already_seen + [self.signature], environment=self.environment )
service_config = resolver.run() other_service_dict = process_service(service_config) validate_extended_service_dict( other_service_dict, extended_config_path, service_name)
return merge_service_dicts( other_service_dict, self.service_config.config, self.config_file.version)
def get_extended_config_path(self, extends_options): """Service we are extending either has a value for 'file' set, which we need to obtain a full path too or we are extending from a service defined in our own file. """ filename = self.service_config.filename validate_extends_file_path( self.service_config.name, extends_options, filename) if 'file' in extends_options: return expand_path(self.working_dir, extends_options['file']) return filename
def resolve_environment(service_dict, environment=None, interpolate=True): """Unpack any environment variables from an env_file, if set. Interpolate environment values if set. """ env = {} for env_file in service_dict.get('env_file', []): env.update(env_vars_from_file(env_file, interpolate))
env.update(parse_environment(service_dict.get('environment'))) return dict(resolve_env_var(k, v, environment) for k, v in env.items())
def resolve_build_args(buildargs, environment): args = parse_build_arguments(buildargs) return dict(resolve_env_var(k, v, environment) for k, v in args.items())
def validate_extended_service_dict(service_dict, filename, service): error_prefix = "Cannot extend service '{}' in {}:".format(service, filename)
if 'links' in service_dict: raise ConfigurationError( "%s services with 'links' cannot be extended" % error_prefix)
if 'volumes_from' in service_dict: raise ConfigurationError( "%s services with 'volumes_from' cannot be extended" % error_prefix)
if 'net' in service_dict: if get_container_name_from_network_mode(service_dict['net']): raise ConfigurationError( "%s services with 'net: container' cannot be extended" % error_prefix)
if 'network_mode' in service_dict: if get_service_name_from_network_mode(service_dict['network_mode']): raise ConfigurationError( "%s services with 'network_mode: service' cannot be extended" % error_prefix)
if 'depends_on' in service_dict: raise ConfigurationError( "%s services with 'depends_on' cannot be extended" % error_prefix)
def validate_service(service_config, service_names, config_file): def build_image(): args = sys.argv[1:] if 'pull' in args: return False
if '--no-build' in args: return False
return True
service_dict, service_name = service_config.config, service_config.name validate_service_constraints(service_dict, service_name, config_file)
if build_image(): # We only care about valid paths when actually building images validate_paths(service_dict)
validate_cpu(service_config) validate_ulimits(service_config) validate_ipc_mode(service_config, service_names) validate_network_mode(service_config, service_names) validate_pid_mode(service_config, service_names) validate_depends_on(service_config, service_names) validate_links(service_config, service_names) validate_healthcheck(service_config) validate_credential_spec(service_config)
if not service_dict.get('image') and has_uppercase(service_name): raise ConfigurationError( "Service '{name}' contains uppercase characters which are not valid " "as part of an image name. Either use a lowercase service name or " "use the `image` field to set a custom name for the service image." .format(name=service_name))
def process_service(service_config): working_dir = service_config.working_dir service_dict = dict(service_config.config)
if 'env_file' in service_dict: service_dict['env_file'] = [ expand_path(working_dir, path) for path in to_list(service_dict['env_file']) ]
if 'build' in service_dict: process_build_section(service_dict, working_dir)
if 'volumes' in service_dict and service_dict.get('volume_driver') is None: service_dict['volumes'] = resolve_volume_paths(working_dir, service_dict)
if 'sysctls' in service_dict: service_dict['sysctls'] = build_string_dict(parse_sysctls(service_dict['sysctls']))
if 'labels' in service_dict: service_dict['labels'] = parse_labels(service_dict['labels'])
service_dict = process_depends_on(service_dict)
for field in ['dns', 'dns_search', 'tmpfs']: if field in service_dict: service_dict[field] = to_list(service_dict[field])
service_dict = process_security_opt(process_blkio_config(process_ports( process_healthcheck(service_dict) )))
return service_dict
def process_build_section(service_dict, working_dir): if isinstance(service_dict['build'], str): service_dict['build'] = resolve_build_path(working_dir, service_dict['build']) elif isinstance(service_dict['build'], dict): if 'context' in service_dict['build']: path = service_dict['build']['context'] service_dict['build']['context'] = resolve_build_path(working_dir, path) if 'labels' in service_dict['build']: service_dict['build']['labels'] = parse_labels(service_dict['build']['labels'])
def process_ports(service_dict): if 'ports' not in service_dict: return service_dict
ports = [] for port_definition in service_dict['ports']: if isinstance(port_definition, ServicePort): ports.append(port_definition) else: ports.extend(ServicePort.parse(port_definition)) service_dict['ports'] = ports return service_dict
def process_depends_on(service_dict): if 'depends_on' in service_dict and not isinstance(service_dict['depends_on'], dict): service_dict['depends_on'] = { svc: {'condition': 'service_started'} for svc in service_dict['depends_on'] } return service_dict
def process_blkio_config(service_dict): if not service_dict.get('blkio_config'): return service_dict
for field in ['device_read_bps', 'device_write_bps']: if field in service_dict['blkio_config']: for v in service_dict['blkio_config'].get(field, []): rate = v.get('rate', 0) v['rate'] = parse_bytes(rate) if v['rate'] is None: raise ConfigurationError('Invalid format for bytes value: "{}"'.format(rate))
for field in ['device_read_iops', 'device_write_iops']: if field in service_dict['blkio_config']: for v in service_dict['blkio_config'].get(field, []): try: v['rate'] = int(v.get('rate', 0)) except ValueError: raise ConfigurationError( 'Invalid IOPS value: "{}". Must be a positive integer.'.format(v.get('rate')) )
return service_dict
def process_healthcheck(service_dict): if 'healthcheck' not in service_dict: return service_dict
hc = service_dict['healthcheck']
if 'disable' in hc: del hc['disable'] hc['test'] = ['NONE']
for field in ['interval', 'timeout', 'start_period']: if field not in hc or isinstance(hc[field], int): continue hc[field] = parse_nanoseconds_int(hc[field])
return service_dict
def finalize_service_volumes(service_dict, environment): if 'volumes' in service_dict: finalized_volumes = [] normalize = environment.get_boolean('COMPOSE_CONVERT_WINDOWS_PATHS') win_host = environment.get_boolean('COMPOSE_FORCE_WINDOWS_HOST') for v in service_dict['volumes']: if isinstance(v, dict): finalized_volumes.append(MountSpec.parse(v, normalize, win_host)) else: finalized_volumes.append(VolumeSpec.parse(v, normalize, win_host))
duplicate_mounts = [] mounts = [v.as_volume_spec() if isinstance(v, MountSpec) else v for v in finalized_volumes] for mount in mounts: if list(map(attrgetter('internal'), mounts)).count(mount.internal) > 1: duplicate_mounts.append(mount.repr())
if duplicate_mounts: raise ConfigurationError("Duplicate mount points: [%s]" % ( ', '.join(duplicate_mounts)))
service_dict['volumes'] = finalized_volumes
return service_dict
def finalize_service(service_config, service_names, version, environment, interpolate=True): service_dict = dict(service_config.config)
if 'environment' in service_dict or 'env_file' in service_dict: service_dict['environment'] = resolve_environment(service_dict, environment, interpolate) service_dict.pop('env_file', None)
if 'volumes_from' in service_dict: service_dict['volumes_from'] = [ VolumeFromSpec.parse(vf, service_names, version) for vf in service_dict['volumes_from'] ]
service_dict = finalize_service_volumes(service_dict, environment)
if 'net' in service_dict: network_mode = service_dict.pop('net') container_name = get_container_name_from_network_mode(network_mode) if container_name and container_name in service_names: service_dict['network_mode'] = 'service:{}'.format(container_name) else: service_dict['network_mode'] = network_mode
if 'networks' in service_dict: service_dict['networks'] = parse_networks(service_dict['networks'])
if 'restart' in service_dict: service_dict['restart'] = parse_restart_spec(service_dict['restart'])
if 'secrets' in service_dict: service_dict['secrets'] = [ types.ServiceSecret.parse(s) for s in service_dict['secrets'] ]
if 'configs' in service_dict: service_dict['configs'] = [ types.ServiceConfig.parse(c) for c in service_dict['configs'] ]
normalize_build(service_dict, service_config.working_dir, environment)
service_dict['name'] = service_config.name return normalize_v1_service_format(service_dict)
def normalize_v1_service_format(service_dict): if 'log_driver' in service_dict or 'log_opt' in service_dict: if 'logging' not in service_dict: service_dict['logging'] = {} if 'log_driver' in service_dict: service_dict['logging']['driver'] = service_dict['log_driver'] del service_dict['log_driver'] if 'log_opt' in service_dict: service_dict['logging']['options'] = service_dict['log_opt'] del service_dict['log_opt']
if 'dockerfile' in service_dict: service_dict['build'] = service_dict.get('build', {}) service_dict['build'].update({ 'dockerfile': service_dict.pop('dockerfile') })
return service_dict
def merge_service_dicts_from_files(base, override, version): """When merging services from multiple files we need to merge the `extends` field. This is not handled by `merge_service_dicts()` which is used to perform the `extends`. """ new_service = merge_service_dicts(base, override, version) if 'extends' in override: new_service['extends'] = override['extends'] elif 'extends' in base: new_service['extends'] = base['extends'] return new_service
class MergeDict(dict): """A dict-like object responsible for merging two dicts into one."""
def __init__(self, base, override): self.base = base self.override = override
def needs_merge(self, field): return field in self.base or field in self.override
def merge_field(self, field, merge_func, default=None): if not self.needs_merge(field): return
self[field] = merge_func( self.base.get(field, default), self.override.get(field, default))
def merge_mapping(self, field, parse_func=None): if not self.needs_merge(field): return
if parse_func is None: def parse_func(m): return m or {}
self[field] = parse_func(self.base.get(field)) self[field].update(parse_func(self.override.get(field)))
def merge_sequence(self, field, parse_func): def parse_sequence_func(seq): return to_mapping((parse_func(item) for item in seq), 'merge_field')
if not self.needs_merge(field): return
merged = parse_sequence_func(self.base.get(field, [])) merged.update(parse_sequence_func(self.override.get(field, []))) self[field] = [item.repr() for item in sorted(merged.values())]
def merge_scalar(self, field): if self.needs_merge(field): self[field] = self.override.get(field, self.base.get(field))
def merge_service_dicts(base, override, version): md = MergeDict(base, override)
md.merge_mapping('environment', parse_environment) md.merge_mapping('labels', parse_labels) md.merge_mapping('ulimits', parse_flat_dict) md.merge_mapping('sysctls', parse_sysctls) md.merge_mapping('depends_on', parse_depends_on) md.merge_mapping('storage_opt', parse_flat_dict) md.merge_sequence('links', ServiceLink.parse) md.merge_sequence('secrets', types.ServiceSecret.parse) md.merge_sequence('configs', types.ServiceConfig.parse) md.merge_sequence('security_opt', types.SecurityOpt.parse) md.merge_mapping('extra_hosts', parse_extra_hosts)
md.merge_field('networks', merge_networks, default={}) for field in ['volumes', 'devices']: md.merge_field(field, merge_path_mappings)
for field in [ 'cap_add', 'cap_drop', 'expose', 'external_links', 'volumes_from', 'device_cgroup_rules', 'profiles', ]: md.merge_field(field, merge_unique_items_lists, default=[])
for field in ['dns', 'dns_search', 'env_file', 'tmpfs']: md.merge_field(field, merge_list_or_string)
md.merge_field('logging', merge_logging, default={}) merge_ports(md, base, override) md.merge_field('blkio_config', merge_blkio_config, default={}) md.merge_field('healthcheck', merge_healthchecks, default={}) md.merge_field('deploy', merge_deploy, default={})
for field in set(ALLOWED_KEYS) - set(md): md.merge_scalar(field)
if version == V1: legacy_v1_merge_image_or_build(md, base, override) elif md.needs_merge('build'): md['build'] = merge_build(md, base, override)
return dict(md)
def merge_unique_items_lists(base, override): override = (str(o) for o in override) base = (str(b) for b in base) return sorted(set(chain(base, override)))
def merge_healthchecks(base, override): if override.get('disabled') is True: return override result = base.copy() result.update(override) return result
def merge_ports(md, base, override): def parse_sequence_func(seq): acc = [s for item in seq for s in ServicePort.parse(item)] return to_mapping(acc, 'merge_field')
field = 'ports'
if not md.needs_merge(field): return
merged = parse_sequence_func(md.base.get(field, [])) merged.update(parse_sequence_func(md.override.get(field, []))) md[field] = [item for item in sorted(merged.values(), key=attrgetter("target"))]
def merge_build(output, base, override): def to_dict(service): build_config = service.get('build', {}) if isinstance(build_config, str): return {'context': build_config} return build_config
md = MergeDict(to_dict(base), to_dict(override)) md.merge_scalar('context') md.merge_scalar('dockerfile') md.merge_scalar('network') md.merge_scalar('target') md.merge_scalar('shm_size') md.merge_scalar('isolation') md.merge_mapping('args', parse_build_arguments) md.merge_field('cache_from', merge_unique_items_lists, default=[]) md.merge_mapping('labels', parse_labels) md.merge_mapping('extra_hosts', parse_extra_hosts) return dict(md)
def merge_deploy(base, override): md = MergeDict(base or {}, override or {}) md.merge_scalar('mode') md.merge_scalar('endpoint_mode') md.merge_scalar('replicas') md.merge_mapping('labels', parse_labels) md.merge_mapping('update_config') md.merge_mapping('rollback_config') md.merge_mapping('restart_policy') if md.needs_merge('resources'): resources_md = MergeDict(md.base.get('resources') or {}, md.override.get('resources') or {}) resources_md.merge_mapping('limits') resources_md.merge_field('reservations', merge_reservations, default={}) md['resources'] = dict(resources_md) if md.needs_merge('placement'): placement_md = MergeDict(md.base.get('placement') or {}, md.override.get('placement') or {}) placement_md.merge_scalar('max_replicas_per_node') placement_md.merge_field('constraints', merge_unique_items_lists, default=[]) placement_md.merge_field('preferences', merge_unique_objects_lists, default=[]) md['placement'] = dict(placement_md)
return dict(md)
def merge_networks(base, override): merged_networks = {} all_network_names = set(base) | set(override) base = {k: {} for k in base} if isinstance(base, list) else base override = {k: {} for k in override} if isinstance(override, list) else override for network_name in all_network_names: md = MergeDict(base.get(network_name) or {}, override.get(network_name) or {}) md.merge_field('aliases', merge_unique_items_lists, []) md.merge_field('link_local_ips', merge_unique_items_lists, []) md.merge_scalar('priority') md.merge_scalar('ipv4_address') md.merge_scalar('ipv6_address') merged_networks[network_name] = dict(md) return merged_networks
def merge_reservations(base, override): md = MergeDict(base, override) md.merge_scalar('cpus') md.merge_scalar('memory') md.merge_sequence('generic_resources', types.GenericResource.parse) md.merge_field('devices', merge_unique_objects_lists, default=[]) return dict(md)
def merge_unique_objects_lists(base, override): result = {json_hash(i): i for i in base + override} return [i[1] for i in sorted(((k, v) for k, v in result.items()), key=itemgetter(0))]
def merge_blkio_config(base, override): md = MergeDict(base, override) md.merge_scalar('weight')
def merge_blkio_limits(base, override): get_path = itemgetter('path') index = {get_path(b): b for b in base} index.update((get_path(o), o) for o in override)
return sorted(index.values(), key=get_path)
for field in [ "device_read_bps", "device_read_iops", "device_write_bps", "device_write_iops", "weight_device", ]: md.merge_field(field, merge_blkio_limits, default=[])
return dict(md)
def merge_logging(base, override): md = MergeDict(base, override) md.merge_scalar('driver') if md.get('driver') == base.get('driver') or base.get('driver') is None: md.merge_mapping('options', lambda m: m or {}) elif override.get('options'): md['options'] = override.get('options', {}) return dict(md)
def legacy_v1_merge_image_or_build(output, base, override): output.pop('image', None) output.pop('build', None) if 'image' in override: output['image'] = override['image'] elif 'build' in override: output['build'] = override['build'] elif 'image' in base: output['image'] = base['image'] elif 'build' in base: output['build'] = base['build']
def merge_environment(base, override): env = parse_environment(base) env.update(parse_environment(override)) return env
def merge_labels(base, override): labels = parse_labels(base) labels.update(parse_labels(override)) return labels
def split_kv(kvpair): if '=' in kvpair: return kvpair.split('=', 1) else: return kvpair, ''
def parse_dict_or_list(split_func, type_name, arguments): if not arguments: return {}
if isinstance(arguments, list): return dict(split_func(e) for e in arguments)
if isinstance(arguments, dict): return dict(arguments)
raise ConfigurationError( "%s \"%s\" must be a list or mapping," % (type_name, arguments) )
parse_build_arguments = functools.partial(parse_dict_or_list, split_env, 'build arguments') parse_environment = functools.partial(parse_dict_or_list, split_env, 'environment') parse_labels = functools.partial(parse_dict_or_list, split_kv, 'labels') parse_networks = functools.partial(parse_dict_or_list, lambda k: (k, None), 'networks') parse_sysctls = functools.partial(parse_dict_or_list, split_kv, 'sysctls') parse_depends_on = functools.partial( parse_dict_or_list, lambda k: (k, {'condition': 'service_started'}), 'depends_on' )
def parse_flat_dict(d): if not d: return {}
if isinstance(d, dict): return dict(d)
raise ConfigurationError("Invalid type: expected mapping")
def resolve_env_var(key, val, environment): if val is not None: return key, val elif environment and key in environment: return key, environment[key] else: return key, None
def resolve_volume_paths(working_dir, service_dict): return [ resolve_volume_path(working_dir, volume) for volume in service_dict['volumes'] ]
def resolve_volume_path(working_dir, volume): if isinstance(volume, dict): if volume.get('source', '').startswith(('.', '~')) and volume['type'] == 'bind': volume['source'] = expand_path(working_dir, volume['source']) return volume
mount_params = None container_path, mount_params = split_path_mapping(volume)
if mount_params is not None: host_path, mode = mount_params if host_path is None: return container_path if host_path.startswith('.'): host_path = expand_path(working_dir, host_path) host_path = os.path.expanduser(host_path) return "{}:{}{}".format(host_path, container_path, (':' + mode if mode else ''))
return container_path
def normalize_build(service_dict, working_dir, environment):
if 'build' in service_dict: build = {} # Shortcut where specifying a string is treated as the build context if isinstance(service_dict['build'], str): build['context'] = service_dict.pop('build') else: build.update(service_dict['build']) if 'args' in build: build['args'] = build_string_dict( resolve_build_args(build.get('args'), environment) )
service_dict['build'] = build
def resolve_build_path(working_dir, build_path): if is_url(build_path): return build_path return expand_path(working_dir, build_path)
def is_url(build_path): return build_path.startswith(DOCKER_VALID_URL_PREFIXES)
def validate_paths(service_dict): if 'build' in service_dict: build = service_dict.get('build', {})
if isinstance(build, str): build_path = build elif isinstance(build, dict) and 'context' in build: build_path = build['context'] else: # We have a build section but no context, so nothing to validate return
if ( not is_url(build_path) and (not os.path.exists(build_path) or not os.access(build_path, os.R_OK)) ): raise ConfigurationError( "build path %s either does not exist, is not accessible, " "or is not a valid URL." % build_path)
def merge_path_mappings(base, override): d = dict_from_path_mappings(base) d.update(dict_from_path_mappings(override)) return path_mappings_from_dict(d)
def dict_from_path_mappings(path_mappings): if path_mappings: return dict(split_path_mapping(v) for v in path_mappings) else: return {}
def path_mappings_from_dict(d): return [join_path_mapping(v) for v in sorted(d.items())]
def split_path_mapping(volume_path): """ Ascertain if the volume_path contains a host path as well as a container path. Using splitdrive so windows absolute paths won't cause issues with splitting on ':'. """ if isinstance(volume_path, dict): return (volume_path.get('target'), volume_path) drive, volume_config = splitdrive(volume_path)
if ':' in volume_config: (host, container) = volume_config.split(':', 1) container_drive, container_path = splitdrive(container) mode = None if ':' in container_path: container_path, mode = container_path.rsplit(':', 1)
return (container_drive + container_path, (drive + host, mode)) else: return (volume_path, None)
def process_security_opt(service_dict): security_opts = service_dict.get('security_opt', []) result = [] for value in security_opts: result.append(SecurityOpt.parse(value)) if result: service_dict['security_opt'] = result return service_dict
def join_path_mapping(pair): (container, host) = pair if isinstance(host, dict): return host elif host is None: return container else: host, mode = host result = ":".join((host, container)) if mode: result += ":" + mode return result
def expand_path(working_dir, path): return os.path.abspath(os.path.join(working_dir, os.path.expanduser(path)))
def merge_list_or_string(base, override): return to_list(base) + to_list(override)
def to_list(value): if value is None: return [] elif isinstance(value, str): return [value] else: return value
def to_mapping(sequence, key_field): return {getattr(item, key_field): item for item in sequence}
def has_uppercase(name): return any(char in string.ascii_uppercase for char in name)
def load_yaml(filename, encoding=None, binary=True): try: with open(filename, 'rb' if binary else 'r', encoding=encoding) as fh: return yaml.safe_load(fh) except (OSError, yaml.YAMLError, UnicodeDecodeError) as e: if encoding is None: # Sometimes the user's locale sets an encoding that doesn't match # the YAML files. Im such cases, retry once with the "default" # UTF-8 encoding return load_yaml(filename, encoding='utf-8-sig', binary=False) error_name = getattr(e, '__module__', '') + '.' + e.__class__.__name__ raise ConfigurationError("{}: {}".format(error_name, e))
|