linuxmuster/linuxmuster-base7

View on GitHub
sbin/linuxmuster-import-devices

Summary

Maintainability
Test Coverage
#!/usr/bin/python3
#
# linuxmuster-import-devices
# thomas@linuxmuster.net
# 20230112
#

import configparser
import constants
import os
import re
import subprocess
import sys
import time
import getopt
import shutil
import csv

from os import listdir
from os.path import isfile, join
from pathlib import Path

from functions import getBootImage, getDevicesArray, getGrubOstype, getGrubPart
from functions import getStartconfOsValues, getStartconfOption
from functions import getStartconfPartnr, getStartconfPartlabel, getSubnetArray
from functions import isValidHostIpv4, getLinboVersion, printScript, readTextfile
from functions import setGlobalStartconfOption, subProc, writeTextfile


def usage():
    print('Usage: linuxmuster-import-devices [options]')
    print(' [options] may be:')
    print(' -s <schoolname>,   --school=<schoolname>   : Select a school other than default-school.')


# read commandline arguments
# get cli args
try:
    opts, args = getopt.getopt(sys.argv[1:], "s:", ["school="])
except getopt.GetoptError as err:
    # print help information and exit:
    print(err)  # will print something like "option -a not recognized"
    usage()
    sys.exit(2)

# default valued
school = 'default-school'

# evaluate options
for o, a in opts:
    if o in ("-s", "--school"):
        school = a

# default school's devices.csv
devices = constants.WIMPORTDATA

# read INIFILE
setup = configparser.ConfigParser(delimiters=(
    '='), inline_comment_prefixes=('#', ';'))
setup.read(constants.SETUPINI)
serverip = setup.get('setup', 'serverip')
domainname = setup.get('setup', 'domainname')

# start message
printScript(os.path.basename(__file__), 'begin')

# do sophomorix-devices first
msg = 'Starting sophomorix-device syntax check:'
printScript(msg)
result = subProc('sophomorix-device --dry-run')
msg = 'sophomorix-device finished '
if not result:
    printScript(msg + ' errors detected!')
    sys.exit(1)

printScript(msg + ' OK!')
subProc('sophomorix-device --sync')


# functions begin
# write grub cfgs
def doGrubCfg(startconf, group, kopts):
    grubcfg = constants.LINBOGRUBDIR + '/' + group + '.cfg'
    rc, content = readTextfile(grubcfg)
    if rc and constants.MANAGEDSTR not in content:
        return 'present'
    # get grub partition name of cache
    cache = getStartconfOption(startconf, 'LINBO', 'Cache')
    partnr = getStartconfPartnr(startconf, cache)
    systemtype = getStartconfOption(startconf, 'LINBO', 'SystemType')
    cacheroot = getGrubPart(cache, systemtype)
    cachelabel = getStartconfPartlabel(startconf, partnr)
    # if cache is not defined provide a forced netboot cfg
    if cacheroot is None:
        netboottpl = constants.LINBOTPLDIR + '/grub.cfg.forced_netboot'
        subProc('cp ' + netboottpl + ' ' + grubcfg)
        return 'not yet configured!'
    # create return message
    if os.path.isfile(grubcfg):
        msg = 'replaced'
    else:
        msg = 'created'
    # create gobal part for group cfg
    globaltpl = constants.LINBOTPLDIR + '/grub.cfg.global'
    rc, content = readTextfile(globaltpl)
    if not rc:
        return 'error!'
    replace_list = [('@@group@@', group), ('@@cachelabel@@', cachelabel),
                    ('@@cacheroot@@', cacheroot), ('@@kopts@@', kopts)]
    for item in replace_list:
        content = content.replace(item[0], item[1])
    rc = writeTextfile(grubcfg, content, 'w')
    # get os infos from group's start.conf
    oslists = getStartconfOsValues(startconf)
    if oslists is None:
        return 'error!'
    # write os parts to grub cfg
    ostpl = constants.LINBOTPLDIR + '/grub.cfg.os'
    for oslist in oslists:
        osname, partition, kernel, initrd, kappend, osnr = oslist
        osroot = getGrubPart(partition, systemtype)
        ostype = getGrubOstype(osname)
        partnr = getStartconfPartnr(startconf, partition)
        oslabel = getStartconfPartlabel(startconf, partnr)
        # add root to kernel append
        if 'root=' not in kappend:
            kappend = kappend + ' root=' + partition
        rc, content = readTextfile(ostpl)
        if not rc:
            return 'error!'
        replace_list = [('@@group@@', group), ('@@cachelabel@@', cachelabel),
                        ('@@cacheroot@@', cacheroot), ('@@osname@@', osname),
                        ('@@osnr@@', osnr), ('@@ostype@@',
                                             ostype), ('@@oslabel@@', oslabel),
                        ('@@osroot@@', osroot), ('@@partnr@@',
                                                 partnr), ('@@kernel@@', kernel),
                        ('@@initrd@@', initrd), ('@@kopts@@', kopts), ('@@append@@', kappend)]
        for item in replace_list:
            content = content.replace(item[0], str(item[1]))
        rc = writeTextfile(grubcfg, content, 'a')
        if not rc:
            return 'error!'
    return msg


# write linbo start configuration file
def doLinboStartconf(group):
    startconf = constants.LINBODIR + '/start.conf.' + group
    # provide unconfigured start.conf if there is none for this group
    if os.path.isfile(startconf):
        if getStartconfOption(startconf, 'LINBO', 'Cache') is None:
            msg1 = 'not yet configured!'
        else:
            msg1 = 'present'
    else:
        msg1 = 'not yet configured!'
        subProc('cp ' + constants.LINBODIR + '/start.conf ' + startconf)
    # read values from start.conf
    group_s = getStartconfOption(startconf, 'LINBO', 'Group')
    serverip_s = getStartconfOption(startconf, 'LINBO', 'Server')
    kopts_s = getStartconfOption(startconf, 'LINBO', 'KernelOptions')
    # get alternative server ip from kernel options
    try:
        serverip_k = re.findall(
            r'server=[^ ]*', kopts_s, re.IGNORECASE)[0].split('=')[1]
    except Exception:
        serverip_k = None
    # determine whether global values from start conf have to changed
    if serverip_k is not None and isValidHostIpv4(serverip_k):
        serverip_r = serverip_k
    else:
        serverip_r = serverip
    if kopts_s is None:
        kopts_r = 'splash quiet'
    else:
        kopts_r = kopts_s
    if group_s != group:
        group_r = group
    else:
        group_r = group
    # change global startconf options if necessary
    if serverip_s != serverip_r:
        rc = setGlobalStartconfOption(startconf, 'Server', serverip_r)
        if not rc:
            return rc
    if kopts_s != kopts_r:
        rc = setGlobalStartconfOption(startconf, 'KernelOptions', kopts_r)
        if not rc:
            return rc
    if group_s != group_r:
        rc = setGlobalStartconfOption(startconf, 'Group', group_r)
        if not rc:
            return rc
    # process grub cfgs
    msg2 = doGrubCfg(startconf, group, kopts_r)
    # format row in columns for output
    row = [group, msg1, msg2]
    printScript("  {: <15} | {: <20} | {: <20}".format(*row))


# write dhcp subnet devices config
def writeDhcpDevicesConfig(school='default-school'):
    printScript('', 'begin')
    printScript('Working on dhcp configuration for devices')
    host_decl_tpl = """host @@hostname@@ {
  option host-name "@@hostname@@";
  hardware ethernet @@mac@@;
"""
    baseConfigFilePath = constants.DHCPDEVCONF
    devicesConfigBasedir = "/etc/dhcp/devices"
    Path(devicesConfigBasedir).mkdir(parents=True, exist_ok=True)

    cfgfile = devicesConfigBasedir + "/" + school + ".conf"
    if os.path.isfile(cfgfile):
        os.unlink(cfgfile)
    if os.path.isfile(baseConfigFilePath):
        os.unlink(baseConfigFilePath)
    try:
        # open devices/<school>.conf for append
        with open(cfgfile, 'a') as outfile:
            # iterate over the defined subnets
            subnets = getSubnetArray('0')
            subnets.append(['DHCP'])
            for item in subnets:
                subnet = item[0]
                # iterate over devices per subnet
                headline = False
                for device_array in getDevicesArray(fieldnrs='1,2,3,4,7,8,10', subnet=subnet, stype=True, school=school):
                    if not headline:
                        # write corresponding subnet as a comment
                        if subnet == 'DHCP':
                            outfile.write('# dynamic ip hosts\n')
                            printScript('* dynamic ip hosts:')
                        else:
                            outfile.write('# subnet ' + subnet + '\n')
                            printScript('* in subnet ' + subnet + ':')
                        headline = True
                    hostname, group, mac, ip, dhcpopts, computertype, pxeflag, systemtype = device_array
                    if systemtype is None:
                        systemtype = ''
                    if len(computertype) > 15:
                        computertype = computertype[0:15]
                    # format row in columns for output
                    row = [hostname, ip, computertype, pxeflag, systemtype]
                    printScript(
                        "  {: <15} | {: <15} | {: <15} | {: <1} | {: <6}".format(*row))
                    # begin host declaration
                    host_decl = host_decl_tpl.replace(
                        '@@mac@@', mac).replace('@@hostname@@', hostname)
                    # fixed ip
                    if ip != 'DHCP':
                        host_decl = host_decl + '  fixed-address ' + ip + ';\n'
                    # only for pxe clients
                    if int(pxeflag) != 0:
                        # get grub bootimage dependend to group's systemtype in start.conf
                        bootimage = getBootImage(systemtype)
                        # linbo pxe boot
                        host_decl = host_decl + '  option extensions-path "' + group + '";\n  option nis-domain "' + group + '";\n'
                        if 'filename' not in dhcpopts and bootimage is not None:
                            host_decl = host_decl + '  filename "boot/grub/' + bootimage + '";\n'
                        # dhcp options have to be 5 chars minimum to get processed
                        if len(dhcpopts) > 4:
                            for opt in dhcpopts.split(','):
                                host_decl = host_decl + '  ' + opt + ';\n'
                    # finish host declaration
                    host_decl = host_decl + '}\n'
                    # finally write host declaration
                    outfile.write(host_decl)

        # open devices.conf for append
        with open(baseConfigFilePath, 'a') as outfile:
            for devicesConf in listdir(devicesConfigBasedir):
                outfile.write(
                    "include \"{0}/{1}\";\n".format(devicesConfigBasedir, devicesConf))

    except Exception as error:
        print(error)
        return False


# Create necessary host-based symlinks
def doSchoolSpecificGroupLinksAndGetPxeGroups(school='default-school'):
    pxe_groups = []

    # clean up
    linksFileBasepath = constants.LINBODIR + "/boot/links"
    Path(linksFileBasepath).mkdir(parents=True, exist_ok=True)
    linksFile = linksFileBasepath + "/" + school + ".csv"
    if os.path.isfile(linksFile):
        os.unlink(linksFile)

    with open(linksFile, "w+") as csvfile:
        csvWriter = csv.writer(csvfile, delimiter=';',
                                quotechar='"', quoting=csv.QUOTE_MINIMAL)

        for device_array in getDevicesArray(fieldnrs='1,2,3,4,10', subnet='all', pxeflag='1,2,3', school=school):
            host, group, mac, ip, pxeflag = device_array
            # collect groups with pxe for later use
            if group not in pxe_groups:
                pxe_groups.append(group)

            # format row in columns for output
            printScript("  {: <15} | {: <15}".format(host, group))

            # start.conf
            linkSource = 'start.conf.' + group
            linkTarget = constants.LINBODIR + '/start.conf-'
            if ip == 'DHCP':
                linkTarget += mac.lower()
            else:
                linkTarget += ip
            csvWriter.writerow([linkSource, linkTarget])

            # Grub.cfg
            linkSource = '../' + group + '.cfg'
            linkTarget = constants.LINBOGRUBDIR + '/hostcfg/' + host + '.cfg'
            csvWriter.writerow([linkSource, linkTarget])

    return pxe_groups


# look up all links for all schools and place them in the correct place
def doAllGroupLinks():
    # delete old config links
    subProc("find " + constants.LINBODIR + " -maxdepth 1 -name start.conf-\* -type l -exec rm '{}' \;")
    subProc("find " + constants.LINBOGRUBDIR + "/hostcfg -maxdepth 1 -name \*.cfg -type l -exec rm '{}' \;")

    linksConfBasedir = constants.LINBODIR + "/boot/links"
    for schoolLinksConf in listdir(linksConfBasedir):
        schoolLinksConfPath = linksConfBasedir + "/" + schoolLinksConf
        if not os.path.isfile(schoolLinksConfPath) or not schoolLinksConf.endswith(".csv"):
            continue

        with open(schoolLinksConfPath, newline='') as csvfile:
            csvReader = csv.reader(csvfile, delimiter=';', quotechar='"')
            for row in csvReader:
                os.symlink(row[0], row[1])

# functions end


# write dhcp devices.conf
writeDhcpDevicesConfig(school=school)


# linbo stuff
linbo_version = int(getLinboVersion().split('.')[0])
printScript('', 'begin')
printScript('Working on linbo/grub configuration for devices:')

pxe_groups = doSchoolSpecificGroupLinksAndGetPxeGroups(school=school)

# resolve all links and place them
doAllGroupLinks()

# write pxe configs for collected groups
printScript('', 'begin')
printScript('Working on linbo/grub configuration for groups:')
printScript("  {: <15} | {: <20} | {: <20}".format(
    *[' ', 'linbo start.conf', 'grub cfg']))
printScript("  {: <15}+{: <20}+{: <20}".format(*['-'*16, '-'*22, '-'*21]))
for group in pxe_groups:
    doLinboStartconf(group)


# execute post hooks
hookpath = constants.POSTDEVIMPORT
hookscripts = [f for f in listdir(hookpath) if isfile(
    join(hookpath, f)) and os.access(join(hookpath, f), os.X_OK)]
if len(hookscripts) > 0:
    printScript('', 'begin')
    printScript('Executing post hooks:')
    for h in hookscripts:
        hookscript = hookpath + '/' + h
        msg = '* ' + h + ' '
        printScript(msg, '', False, False, True)
        output = subprocess.check_output(hookscript).decode('utf-8')
        if output != '':
            print(output)

# restart services
printScript('', 'begin')
printScript('Finally restarting dhcp service.')
subProc('service isc-dhcp-server restart')

# end message
printScript(os.path.basename(__file__), 'end')