linuxmuster/linuxmuster-linuxclient7

View on GitHub
usr/lib/python3/dist-packages/linuxmusterLinuxclient7/shares.py

Summary

Maintainability
A
3 hrs
Test Coverage
import os, pwd, sys, shutil, re, subprocess, shutil
from linuxmusterLinuxclient7 import logging, constants, user, config, computer
from pathlib import Path

def mountShare(networkPath, shareName = None, hiddenShare = False, username = None):
    """
    Mount a given path of a samba share

    :param networkPath: Network path of the share
    :type networkPath: str
    :param shareName: The name of the share (name of the folder the share is being mounted to)
    :type shareName: str
    :param hiddenShare: If the share sould be visible in Nautilus
    :type hiddenShare: bool
    :param username: The user in whoms context the share should be mounted
    :type username: str
    :return: Tuple: (success, mountpoint)
    :rtype: tuple
    """
    networkPath = networkPath.replace("\\", "/")
    username = _getDefaultUsername(username)
    shareName = _getDefaultShareName(networkPath, shareName)

    if user.isRoot():
        return _mountShare(username, networkPath, shareName, hiddenShare, True)
    else:
        mountpoint = _getShareMountpoint(networkPath, username, hiddenShare, shareName)
        # This will call _mountShare() directly with root privileges
        return _mountShareWithoutRoot(networkPath, shareName, hiddenShare), mountpoint

def getMountpointOfRemotePath(remoteFilePath, hiddenShare = False, username = None, autoMount = True):
    """
    Get the local path of a remote samba share path.
    This function automatically checks if the shares is already mounted.
    It optionally automatically mounts the top path of the remote share:
    If the remote path is `//server/sysvol/linuxmuster.lan/Policies` it mounts `//server/sysvol`

    :param remoteFilePath: Remote path
    :type remoteFilePath: str
    :param hiddenShare: If the share sould be visible in Nautilus
    :type hiddenShare: bool
    :param username: The user in whoms context the share should be mounted
    :type username: str
    :parama autoMount: If the share should be mouted automatically if it is not already mounted
    :type autoMount: bool
    :return: Tuple: (success, mountpoint)
    :rtype: tuple
    """
    remoteFilePath = remoteFilePath.replace("\\", "/")
    username = _getDefaultUsername(username)

    # get basepath fo remote file path
    # this turns //server/sysvol/linuxmuster.lan/Policies into //server/sysvol
    pattern = re.compile("(^\\/\\/[^\\/]+\\/[^\\/]+)")
    match = pattern.search(remoteFilePath)

    if match is None:
        logging.error("Cannot get local file path of {} beacuse it is not a valid path!".format(remoteFilePath))
        return False, None

    shareBasepath = match.group(0)

    if autoMount:
       rc, mointpoint = mountShare(shareBasepath, hiddenShare=hiddenShare, username=username)
       if not rc:
           return False, None
    
    # calculate local path
    shareMountpoint = _getShareMountpoint(shareBasepath, username, hiddenShare, shareName=None)
    localFilePath = remoteFilePath.replace(shareBasepath, shareMountpoint)

    return True, localFilePath

def unmountAllSharesOfUser(username):
    """
    Unmount all shares of a given user and safely delete the mountpoints and the parent directory.

    :param username: The username of the user
    :type username: str
    :return: True or False
    :rtype: bool
    """
    logging.info("=== Trying to unmount all shares of user {0} ===".format(username))
    for basedir in [constants.shareMountBasepath, constants.hiddenShareMountBasepath]:
        shareMountBasedir = basedir.format(username)

        try:
            mountedShares = os.listdir(shareMountBasedir)
        except FileNotFoundError:
            logging.info("Mount basedir {} does not exist -> nothing to unmount".format(shareMountBasedir))
            continue

        for share in mountedShares:
            _unmountShare("{0}/{1}".format(shareMountBasedir, share))
    
        if len(os.listdir(shareMountBasedir)) > 0:
            logging.warning("* Mount basedir {} is not empty so not removed!".format(shareMountBasedir))
            return False
        else:
            # Delete the directory
            logging.info("Deleting {0}...".format(shareMountBasedir))
            try:
                os.rmdir(shareMountBasedir)
            except Exception as e:
                logging.error("FAILED!")
                logging.exception(e)
                return False

    logging.info("===> Finished unmounting all shares of user {0} ===".format(username))
    return True

def getLocalSysvolPath():
    """
    Get the local mountpoint of the sysvol

    :return: Full path of the mountpoint
    :rtype: str
    """
    rc, networkConfig = config.network()
    if not rc:
        return False, None

    networkPath = f"//{networkConfig['serverHostname']}/sysvol"
    return getMountpointOfRemotePath(networkPath, True)

# --------------------
# - Helper functions -
# --------------------

# useCruidOfExecutingUser:
#  defines if the ticket cache of the user executing the mount command should be used.
#  If set to False, the cache of the user with the given username will be used.
#  This parameter influences the `cruid` mount option.
def _mountShare(username, networkPath, shareName, hiddenShare, useCruidOfExecutingUser=False):

    mountpoint = _getShareMountpoint(networkPath, username, hiddenShare, shareName)

    mountCommandOptions = f"file_mode=0700,dir_mode=0700,sec=krb5,nodev,nosuid,mfsymlinks,nobrl,vers=3.0,user={username}"
    rc, networkConfig = config.network()
    domain = None

    if rc:
        domain = networkConfig["domain"]
        mountCommandOptions += f",domain={domain.upper()}"

    try:
        pwdInfo = pwd.getpwnam(username)
        uid = pwdInfo.pw_uid
        gid = pwdInfo.pw_gid
        mountCommandOptions += f",gid={gid},uid={uid}"

        if not useCruidOfExecutingUser:
            mountCommandOptions += f",cruid={uid}"

    except KeyError:
        uid = -1
        gid = -1
        logging.warning("Uid could not be found! Continuing anyway!")

    mountCommand = [shutil.which("mount.cifs"), "-o", mountCommandOptions, networkPath, mountpoint]

    logging.debug(f"Trying to mount '{networkPath}' to '{mountpoint}'")
    logging.debug("* Creating directory...")

    try:
        Path(mountpoint).mkdir(parents=True, exist_ok=False)
    except FileExistsError:
        # Test if a share is already mounted there
        if _directoryIsMountpoint(mountpoint):
            logging.debug("* The mountpoint is already mounted.")
            return True, mountpoint
        else:
            logging.warning("* The target directory already exists, proceeding anyway!")

    logging.debug("* Executing '{}' ".format(" ".join(mountCommand)))
    logging.debug("* Trying to mount...")
    if not subprocess.call(mountCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0:
        logging.fatal(f"* Error mounting share {networkPath} to {mountpoint}!\n")
        return False, None

    logging.debug("* Success!")

    # hide the shares parent dir (/home/%user/media) in case it is not a hidden share
    if not hiddenShare:
        try:
            hiddenFilePath = f"{mountpoint}/../../.hidden"
            logging.debug(f"* hiding parent dir {hiddenFilePath}")
            hiddenFile = open(hiddenFilePath, "w+")
            hiddenFile.write(mountpoint.split("/")[-2])
            hiddenFile.close()
        except:
            logging.warning(f"Could not hide parent dir of share {mountpoint}")

    return True, mountpoint

def _unmountShare(mountpoint):
    # check if mountpoint exists
    if (not os.path.exists(mountpoint)) or (not os.path.isdir(mountpoint)):
        logging.warning(f"* Could not unmount {mountpoint}, it does not exist.")

    # Try to unmount share
    logging.info("* Trying to unmount {0}...".format(mountpoint))
    if not subprocess.call(["umount", mountpoint]) == 0:
        logging.warning("* Failed!")
        if _directoryIsMountpoint(mountpoint):
            logging.warning("* It is still mounted! Exiting!")
            # Do not delete in this case! We might delete userdata!
            return
        logging.info("* It is not mounted! Continuing!")

    # check if the mountpoint is empty
    if len(os.listdir(mountpoint)) > 0:
        logging.warning("* mountpoint {} is not empty so not removed!".format(mountpoint))
        return

    # Delete the directory
    logging.info("* Deleting {0}...".format(mountpoint))
    try:
        os.rmdir(mountpoint)
    except Exception as e:
        logging.error("* FAILED!")
        logging.exception(e)

def _getDefaultUsername(username=None):
    if username == None:
        if user.isRoot():
            username = computer.hostname().upper() + "$"
        else:
            username = user.username()
    return username

def _getDefaultShareName(networkPath, shareName=None):
    if shareName is None:
        shareName = networkPath.split("/")[-1]
    return shareName

def _mountShareWithoutRoot(networkPath, name, hidden):
    mountCommand = ["sudo", "/usr/share/linuxmuster-linuxclient7/scripts/sudoTools", "mount-share", "--path", networkPath, "--name", name]

    if hidden:
        mountCommand.append("--hidden")

    return subprocess.call(mountCommand) == 0

def _getShareMountpoint(networkPath, username, hidden, shareName = None):
    logging.debug(f"Calculating mountpoint of {networkPath}")

    shareName = _getDefaultShareName(networkPath, shareName)

    if hidden:
        return "{0}/{1}".format(constants.hiddenShareMountBasepath.format(username), shareName)
    else:
        return "{0}/{1}".format(constants.shareMountBasepath.format(username), shareName)

def _directoryIsMountpoint(dir):
    return subprocess.call(["mountpoint", "-q", dir]) == 0