256 lines
9.5 KiB
Python
256 lines
9.5 KiB
Python
import os, pwd, sys, shutil, re, subprocess, shutil
|
|
from linuxmusterLinuxclient7 import logging, constants, user, config, computer
|
|
from pathlib import Path
|
|
|
|
def mountShare(networkPath, shareName = None, hiddenShare = False, username = None):
|
|
"""
|
|
Mount a given path of a samba share
|
|
|
|
:param networkPath: Network path of the share
|
|
:type networkPath: str
|
|
:param shareName: The name of the share (name of the folder the share is being mounted to)
|
|
:type shareName: str
|
|
:param hiddenShare: If the share sould be visible in Nautilus
|
|
:type hiddenShare: bool
|
|
:param username: The user in whoms context the share should be mounted
|
|
:type username: str
|
|
:return: Tuple: (success, mountpoint)
|
|
:rtype: tuple
|
|
"""
|
|
networkPath = networkPath.replace("\\", "/")
|
|
username = _getDefaultUsername(username)
|
|
shareName = _getDefaultShareName(networkPath, shareName)
|
|
|
|
if user.isRoot():
|
|
return _mountShare(username, networkPath, shareName, hiddenShare, True)
|
|
else:
|
|
mountpoint = _getShareMountpoint(networkPath, username, hiddenShare, shareName)
|
|
# This will call _mountShare() directly with root privileges
|
|
return _mountShareWithoutRoot(networkPath, shareName, hiddenShare), mountpoint
|
|
|
|
def getMountpointOfRemotePath(remoteFilePath, hiddenShare = False, username = None, autoMount = True):
|
|
"""
|
|
Get the local path of a remote samba share path.
|
|
This function automatically checks if the shares is already mounted.
|
|
It optionally automatically mounts the top path of the remote share:
|
|
If the remote path is `//server/sysvol/linuxmuster.lan/Policies` it mounts `//server/sysvol`
|
|
|
|
:param remoteFilePath: Remote path
|
|
:type remoteFilePath: str
|
|
:param hiddenShare: If the share sould be visible in Nautilus
|
|
:type hiddenShare: bool
|
|
:param username: The user in whoms context the share should be mounted
|
|
:type username: str
|
|
:parama autoMount: If the share should be mouted automatically if it is not already mounted
|
|
:type autoMount: bool
|
|
:return: Tuple: (success, mountpoint)
|
|
:rtype: tuple
|
|
"""
|
|
remoteFilePath = remoteFilePath.replace("\\", "/")
|
|
username = _getDefaultUsername(username)
|
|
|
|
# get basepath fo remote file path
|
|
# this turns //server/sysvol/linuxmuster.lan/Policies into //server/sysvol
|
|
pattern = re.compile("(^\\/\\/[^\\/]+\\/[^\\/]+)")
|
|
match = pattern.search(remoteFilePath)
|
|
|
|
if match is None:
|
|
logging.error("Cannot get local file path of {} beacuse it is not a valid path!".format(remoteFilePath))
|
|
return False, None
|
|
|
|
shareBasepath = match.group(0)
|
|
|
|
if autoMount:
|
|
rc, mointpoint = mountShare(shareBasepath, hiddenShare=hiddenShare, username=username)
|
|
if not rc:
|
|
return False, None
|
|
|
|
# calculate local path
|
|
shareMountpoint = _getShareMountpoint(shareBasepath, username, hiddenShare, shareName=None)
|
|
localFilePath = remoteFilePath.replace(shareBasepath, shareMountpoint)
|
|
|
|
return True, localFilePath
|
|
|
|
def unmountAllSharesOfUser(username):
|
|
"""
|
|
Unmount all shares of a given user and safely delete the mountpoints and the parent directory.
|
|
|
|
:param username: The username of the user
|
|
:type username: str
|
|
:return: True or False
|
|
:rtype: bool
|
|
"""
|
|
logging.info("=== Trying to unmount all shares of user {0} ===".format(username))
|
|
for basedir in [constants.shareMountBasepath, constants.hiddenShareMountBasepath]:
|
|
shareMountBasedir = basedir.format(username)
|
|
|
|
try:
|
|
mountedShares = os.listdir(shareMountBasedir)
|
|
except FileNotFoundError:
|
|
logging.info("Mount basedir {} does not exist -> nothing to unmount".format(shareMountBasedir))
|
|
continue
|
|
|
|
for share in mountedShares:
|
|
_unmountShare("{0}/{1}".format(shareMountBasedir, share))
|
|
|
|
if len(os.listdir(shareMountBasedir)) > 0:
|
|
logging.warning("* Mount basedir {} is not empty so not removed!".format(shareMountBasedir))
|
|
return False
|
|
else:
|
|
# Delete the directory
|
|
logging.info("Deleting {0}...".format(shareMountBasedir))
|
|
try:
|
|
os.rmdir(shareMountBasedir)
|
|
except Exception as e:
|
|
logging.error("FAILED!")
|
|
logging.exception(e)
|
|
return False
|
|
|
|
logging.info("===> Finished unmounting all shares of user {0} ===".format(username))
|
|
return True
|
|
|
|
def getLocalSysvolPath():
|
|
"""
|
|
Get the local mountpoint of the sysvol
|
|
|
|
:return: Full path of the mountpoint
|
|
:rtype: str
|
|
"""
|
|
rc, networkConfig = config.network()
|
|
if not rc:
|
|
return False, None
|
|
|
|
networkPath = f"//{networkConfig['serverHostname']}/sysvol"
|
|
return getMountpointOfRemotePath(networkPath, True)
|
|
|
|
# --------------------
|
|
# - Helper functions -
|
|
# --------------------
|
|
|
|
# useCruidOfExecutingUser:
|
|
# defines if the ticket cache of the user executing the mount command should be used.
|
|
# If set to False, the cache of the user with the given username will be used.
|
|
# This parameter influences the `cruid` mount option.
|
|
def _mountShare(username, networkPath, shareName, hiddenShare, useCruidOfExecutingUser=False):
|
|
|
|
mountpoint = _getShareMountpoint(networkPath, username, hiddenShare, shareName)
|
|
|
|
mountCommandOptions = f"file_mode=0700,dir_mode=0700,sec=krb5,nodev,nosuid,mfsymlinks,nobrl,vers=3.0,user={username}"
|
|
rc, networkConfig = config.network()
|
|
domain = None
|
|
|
|
if rc:
|
|
domain = networkConfig["domain"]
|
|
mountCommandOptions += f",domain={domain.upper()}"
|
|
|
|
try:
|
|
pwdInfo = pwd.getpwnam(username)
|
|
uid = pwdInfo.pw_uid
|
|
gid = pwdInfo.pw_gid
|
|
mountCommandOptions += f",gid={gid},uid={uid}"
|
|
|
|
if not useCruidOfExecutingUser:
|
|
mountCommandOptions += f",cruid={uid}"
|
|
|
|
except KeyError:
|
|
uid = -1
|
|
gid = -1
|
|
logging.warning("Uid could not be found! Continuing anyway!")
|
|
|
|
mountCommand = [shutil.which("mount.cifs"), "-o", mountCommandOptions, networkPath, mountpoint]
|
|
|
|
logging.debug(f"Trying to mount '{networkPath}' to '{mountpoint}'")
|
|
logging.debug("* Creating directory...")
|
|
|
|
try:
|
|
Path(mountpoint).mkdir(parents=True, exist_ok=False)
|
|
except FileExistsError:
|
|
# Test if a share is already mounted there
|
|
if _directoryIsMountpoint(mountpoint):
|
|
logging.debug("* The mountpoint is already mounted.")
|
|
return True, mountpoint
|
|
else:
|
|
logging.warning("* The target directory already exists, proceeding anyway!")
|
|
|
|
logging.debug("* Executing '{}' ".format(" ".join(mountCommand)))
|
|
logging.debug("* Trying to mount...")
|
|
if not subprocess.call(mountCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0:
|
|
logging.fatal(f"* Error mounting share {networkPath} to {mountpoint}!\n")
|
|
return False, None
|
|
|
|
logging.debug("* Success!")
|
|
|
|
# hide the shares parent dir (/home/%user/media) in case it is not a hidden share
|
|
if not hiddenShare:
|
|
try:
|
|
hiddenFilePath = f"{mountpoint}/../../.hidden"
|
|
logging.debug(f"* hiding parent dir {hiddenFilePath}")
|
|
hiddenFile = open(hiddenFilePath, "w+")
|
|
hiddenFile.write(mountpoint.split("/")[-2])
|
|
hiddenFile.close()
|
|
except:
|
|
logging.warning(f"Could not hide parent dir of share {mountpoint}")
|
|
|
|
return True, mountpoint
|
|
|
|
def _unmountShare(mountpoint):
|
|
# check if mountpoint exists
|
|
if (not os.path.exists(mountpoint)) or (not os.path.isdir(mountpoint)):
|
|
logging.warning(f"* Could not unmount {mountpoint}, it does not exist.")
|
|
|
|
# Try to unmount share
|
|
logging.info("* Trying to unmount {0}...".format(mountpoint))
|
|
if not subprocess.call(["umount", mountpoint]) == 0:
|
|
logging.warning("* Failed!")
|
|
if _directoryIsMountpoint(mountpoint):
|
|
logging.warning("* It is still mounted! Exiting!")
|
|
# Do not delete in this case! We might delete userdata!
|
|
return
|
|
logging.info("* It is not mounted! Continuing!")
|
|
|
|
# check if the mountpoint is empty
|
|
if len(os.listdir(mountpoint)) > 0:
|
|
logging.warning("* mountpoint {} is not empty so not removed!".format(mountpoint))
|
|
return
|
|
|
|
# Delete the directory
|
|
logging.info("* Deleting {0}...".format(mountpoint))
|
|
try:
|
|
os.rmdir(mountpoint)
|
|
except Exception as e:
|
|
logging.error("* FAILED!")
|
|
logging.exception(e)
|
|
|
|
def _getDefaultUsername(username=None):
|
|
if username == None:
|
|
if user.isRoot():
|
|
username = computer.hostname().upper() + "$"
|
|
else:
|
|
username = user.username()
|
|
return username
|
|
|
|
def _getDefaultShareName(networkPath, shareName=None):
|
|
if shareName is None:
|
|
shareName = networkPath.split("/")[-1]
|
|
return shareName
|
|
|
|
def _mountShareWithoutRoot(networkPath, name, hidden):
|
|
mountCommand = ["sudo", "/usr/share/linuxmuster-linuxclient7/scripts/sudoTools", "mount-share", "--path", networkPath, "--name", name]
|
|
|
|
if hidden:
|
|
mountCommand.append("--hidden")
|
|
|
|
return subprocess.call(mountCommand) == 0
|
|
|
|
def _getShareMountpoint(networkPath, username, hidden, shareName = None):
|
|
logging.debug(f"Calculating mountpoint of {networkPath}")
|
|
|
|
shareName = _getDefaultShareName(networkPath, shareName)
|
|
|
|
if hidden:
|
|
return "{0}/{1}".format(constants.hiddenShareMountBasepath.format(username), shareName)
|
|
else:
|
|
return "{0}/{1}".format(constants.shareMountBasepath.format(username), shareName)
|
|
|
|
def _directoryIsMountpoint(dir):
|
|
return subprocess.call(["mountpoint", "-q", dir]) == 0
|