Viewing file: vultr.py (7.15 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Author: Eric Benner <[email protected]> # # This file is part of cloud-init. See LICENSE file for license information.
import json import logging from functools import lru_cache
from requests import exceptions
from cloudinit import dmi, net, subp, url_helper, util from cloudinit.net.dhcp import NoDHCPLeaseError from cloudinit.net.ephemeral import EphemeralDHCPv4
# Get LOG LOG = logging.getLogger(__name__)
@lru_cache() def get_metadata( distro, url, timeout, retries, sec_between, agent, tmp_dir=None ): # Bring up interface (and try until one works) exception = RuntimeError("Failed to DHCP")
# Seek iface with DHCP for iface in get_interface_list(): try: with EphemeralDHCPv4( distro, iface=iface, connectivity_url_data={"url": url}, ): # Fetch the metadata v1 = read_metadata(url, timeout, retries, sec_between, agent)
metadata = json.loads(v1) refactor_metadata(metadata) return metadata except ( NoDHCPLeaseError, subp.ProcessExecutionError, RuntimeError, exceptions.RequestException, ) as exc: LOG.error("DHCP Exception: %s", exc) exception = exc raise exception
# Refactor metadata into acceptable format def refactor_metadata(metadata): metadata["instance-id"] = metadata["instance-v2-id"] metadata["local-hostname"] = metadata["hostname"] region = metadata["region"]["regioncode"] if "countrycode" in metadata["region"]: region = metadata["region"]["countrycode"] metadata["region"] = region.lower()
# Get interface list, sort, and clean def get_interface_list(): ifaces = [] for iface in net.find_candidate_nics(): # Skip dummy if "dummy" in iface: continue ifaces.append(iface)
return ifaces
# Read the system information from SMBIOS def get_sysinfo(): return { "manufacturer": dmi.read_dmi_data("system-manufacturer"), "subid": dmi.read_dmi_data("system-serial-number"), }
# Assumes is Vultr is already checked def is_baremetal(): if get_sysinfo()["manufacturer"] != "Vultr": return True return False
# Confirm is Vultr def is_vultr(): # VC2, VDC, and HFC use DMI sysinfo = get_sysinfo()
if sysinfo["manufacturer"] == "Vultr": return True
# Baremetal requires a kernel parameter if "vultr" in util.get_cmdline().split(): return True
return False
# Read Metadata endpoint def read_metadata(url, timeout, retries, sec_between, agent): url = "%s/v1.json" % url
# Announce os details so we can handle non Vultr origin # images and provide correct vendordata generation. headers = {"Metadata-Token": "cloudinit", "User-Agent": agent}
response = url_helper.readurl( url, timeout=timeout, retries=retries, headers=headers, sec_between=sec_between, )
if not response.ok(): raise RuntimeError( "Failed to connect to %s: Code: %s" % url, response.code )
return response.contents.decode()
# Wrapped for caching @lru_cache() def get_interface_map(): return net.get_interfaces_by_mac()
# Convert macs to nics def get_interface_name(mac): macs_to_nic = get_interface_map()
if mac not in macs_to_nic: return None
return macs_to_nic.get(mac)
# Generate network configs def generate_network_config(interfaces): network = { "version": 1, "config": [ { "type": "nameserver", "address": ["108.61.10.10", "2001:19f0:300:1704::6"], } ], }
# Prepare interface 0, public if len(interfaces) > 0: public = generate_interface(interfaces[0], primary=True) network["config"].append(public)
# Prepare additional interfaces, private for i in range(1, len(interfaces)): interface = interfaces[i]
# Skip interfaces set not to be configured if interface.get("unconfigured"): continue
private = generate_interface(interface) network["config"].append(private)
return network
def generate_interface(interface, primary=False): interface_name = get_interface_name(interface["mac"]) if not interface_name: raise RuntimeError( "Interface: %s could not be found on the system" % interface["mac"] )
netcfg = { "name": interface_name, "type": "physical", "mac_address": interface["mac"], }
if primary: netcfg["accept-ra"] = 1 netcfg["subnets"] = [ {"type": "dhcp", "control": "auto"}, {"type": "ipv6_slaac", "control": "auto"}, ]
if not primary: netcfg["subnets"] = [ { "type": "static", "control": "auto", "address": interface["ipv4"]["address"], "netmask": interface["ipv4"]["netmask"], } ]
generate_interface_routes(interface, netcfg) generate_interface_additional_addresses(interface, netcfg)
# Add config to template return netcfg
def generate_interface_routes(interface, netcfg): # Options that may or may not be used if "mtu" in interface: netcfg["mtu"] = interface["mtu"]
if "accept-ra" in interface: netcfg["accept-ra"] = interface["accept-ra"]
if "routes" in interface: netcfg["subnets"][0]["routes"] = interface["routes"]
def generate_interface_additional_addresses(interface, netcfg): # Check for additional IP's additional_count = len(interface["ipv4"]["additional"]) if "ipv4" in interface and additional_count > 0: for additional in interface["ipv4"]["additional"]: add = { "type": "static", "control": "auto", "address": additional["address"], "netmask": additional["netmask"], }
if "routes" in additional: add["routes"] = additional["routes"]
netcfg["subnets"].append(add)
# Check for additional IPv6's additional_count = len(interface["ipv6"]["additional"]) if "ipv6" in interface and additional_count > 0: for additional in interface["ipv6"]["additional"]: add = { "type": "static6", "control": "auto", "address": "%s/%s" % (additional["network"], additional["prefix"]), }
if "routes" in additional: add["routes"] = additional["routes"]
netcfg["subnets"].append(add)
# Make required adjustments to the network configs provided def add_interface_names(netcfg): for interface in netcfg["config"]: if interface["type"] != "physical": continue interface_name = get_interface_name(interface["mac_address"]) if not interface_name: raise RuntimeError( "Interface: %s could not be found on the system" % interface["mac_address"] ) interface["name"] = interface_name
|