printing role without lmn-linuxmusterclient7

This commit is contained in:
Raphael Dannecker 2023-11-09 09:10:23 +01:00
parent 5f088511c4
commit 0e4073336a
32 changed files with 80 additions and 3246 deletions

View file

@ -52,6 +52,7 @@
ntp_serv: "{{ vault_ntp_serv }}" ## ntp.example.org
proxy: "{{ vault_proxy }}" ## http://firewall.example.org:3128
no_proxy: "{{ vault_no_proxy }}" ## firewall.example.org,server.example.org,idam.example.org,dw.example.org
printservers: ['10.190.1.1'] ## list of printservers
## PAM mount nextcloud, remove or leave empty to skip:
web_dav: "{{ vault_web_dav }}" ## https://nc.example.org/remote.php/dav/files/%(USER)
@ -139,6 +140,39 @@
APT::Periodic::Update-Package-Lists "1";
APT::Periodic::Unattended-Upgrade "1";
- name: Remove pam_mount sysvol mount
blockinfile:
dest: /etc/security/pam_mount.conf.xml
marker: "<!-- {mark} ANSIBLE MANAGED BLOCK (SysVol) -->"
block: |
<volume
fstype="cifs"
server="{{ smb_server }}"
path="sysvol/"
mountpoint="/srv/samba/%(USER)/sysvol"
options="sec=krb5i,cruid=%(USERUID),user=%(USER),gid=1010,file_mode=0770,dir_mode=0770,mfsymlinks"
><not><or><user>root</user><user>ansible</user><user>Debian-gdm</user><user>sddm</user><user>{{ localuser }}</user></or></not>
</volume>
state: absent
- name: disable rmlpr.timer
systemd:
name: rmlpr.timer
enabled: false
- name: Remove deprecated files and directories
file:
path: "{{ item }}"
state: absent
with_items:
- /etc/linuxmuster-linuxclient7
- /usr/lib/python3/dist-packages/linuxmusterLinuxclient7
- /usr/share/linuxmuster-linuxclient7
- /usr/local/bin/onLogin
- /etc/sudoers.d/90-lmn-sudotools
- /etc/systemd/system/rmlpr.service
- /etc/systemd/system/rmlpr.timer
## bookworm fixes/hacks:
- name: Work around sddm hang on shutdown
ansible.builtin.lineinfile:

View file

@ -0,0 +1,4 @@
%examusers ALL=(root) NOPASSWD: /usr/local/bin/install-printers.sh
%role-student ALL=(root) NOPASSWD: /usr/local/bin/install-printers.sh
%role-teacher ALL=(root) NOPASSWD: /usr/local/bin/install-printers.sh

View file

@ -1,4 +0,0 @@
%examusers ALL=(root) NOPASSWD: /usr/share/linuxmuster-linuxclient7/scripts/sudoTools
%role-student ALL=(root) NOPASSWD: /usr/share/linuxmuster-linuxclient7/scripts/sudoTools
%role-teacher ALL=(root) NOPASSWD: /usr/share/linuxmuster-linuxclient7/scripts/sudoTools

View file

@ -1,3 +0,0 @@
#
# linuxmuster-linuxclient7 is a library for use with Linuxmuster.net
#

View file

@ -1,57 +0,0 @@
import socket
from linuxmusterLinuxclient7 import logging, ldapHelper, realm, localUserHelper
def hostname():
"""
Get the hostname of the computer
:return: The hostname
:rtype: str
"""
return socket.gethostname().split('.', 1)[0]
def krbHostName():
"""
Get the krb hostname, eg. `COMPUTER01$`
:return: The krb hostname
:rtype: str
"""
return hostname().upper() + "$"
def readAttributes():
"""
Read all ldap attributes of the cumputer
:return: Tuple (success, dict of attributes)
:rtype: tuple
"""
return ldapHelper.searchOne("(sAMAccountName={}$)".format(hostname()))
def isInGroup(groupName):
"""
Check if the computer is part of an ldap group
:param groupName: The name of the group to check
:type grouName: str
:return: True or False
:rtype: bool
"""
rc, groups = localUserHelper.getGroupsOfLocalUser(krbHostName())
if not rc:
return False
return groupName in groups
def isInAD():
"""
Check if the computer is joined to an AD
:return: True or False
:rtype: bool
"""
rc, groups = localUserHelper.getGroupsOfLocalUser(krbHostName())
if not rc:
return False
return "domain computers" in groups

View file

@ -1,136 +0,0 @@
import configparser, re
from linuxmusterLinuxclient7 import logging, constants
def network():
"""
Get the network configuration in `/etc/linuxmuster-linuxclient7/network.conf`
:return: Tuple (success, dict of keys)
:rtype: tuple
"""
rc, rawNetworkConfig = _readNetworkConfig()
if not rc:
return False, None
if not _checkNetworkConfigVersion(rawNetworkConfig)[0]:
return False, None
networkConfig = {}
try:
networkConfig["serverHostname"] = rawNetworkConfig["serverHostname"]
networkConfig["domain"] = rawNetworkConfig["domain"]
networkConfig["realm"] = rawNetworkConfig["realm"]
except KeyError as e:
logging.error("Error when reading network.conf (2)")
logging.exception(e)
return False, None
return True, networkConfig
def writeNetworkConfig(newNetworkConfig):
"""
Write the network configuration in `/etc/linuxmuster-linuxclient7/network.conf`
:param newNetworkConfig: The new config
:type newNetworkConfig: dict
:return: True or False
:rtype: bool
"""
networkConfig = configparser.ConfigParser(interpolation=None)
try:
networkConfig["network"] = {}
networkConfig["network"]["version"] = str(_networkConfigVersion())
networkConfig["network"]["serverHostname"] = newNetworkConfig["serverHostname"]
networkConfig["network"]["domain"] = newNetworkConfig["domain"]
networkConfig["network"]["realm"] = newNetworkConfig["realm"]
except Exception as e:
logging.error("Error when preprocessing new network configuration!")
logging.exception(e)
return False
try:
logging.info("Writing new network Configuration")
with open(constants.networkConfigFilePath, 'w') as networkConfigFile:
networkConfig.write(networkConfigFile)
except Exception as e:
logging.error("Failed!")
logging.exception(e)
return False
return True
def upgrade():
"""
Upgrade the format of the network configuration in `/etc/linuxmuster-linuxclient7/network.conf`
This is done automatically on package upgrades.
:return: True or False
:rtype: bool
"""
return _upgradeNetworkConfig()
# --------------------
# - Helper functions -
# --------------------
def _readNetworkConfig():
configParser = configparser.ConfigParser()
configParser.read(constants.networkConfigFilePath)
try:
rawNetworkConfig = configParser["network"]
return True, rawNetworkConfig
except KeyError as e:
logging.error("Error when reading network.conf (1)")
logging.exception(e)
return False, None
return configParser
def _networkConfigVersion():
return 1
def _checkNetworkConfigVersion(rawNetworkConfig):
try:
networkConfigVersion = int(rawNetworkConfig["version"])
except KeyError as e:
logging.warning("The network.conf version could not be identified, assuming 0")
networkConfigVersion = 0
if networkConfigVersion != _networkConfigVersion():
logging.warning("The network.conf Version is a mismatch!")
return False, networkConfigVersion
return True, networkConfigVersion
def _upgradeNetworkConfig():
logging.info("Upgrading network config.")
rc, rawNetworkConfig = _readNetworkConfig()
if not rc:
return False
rc, networkConfigVersion = _checkNetworkConfigVersion(rawNetworkConfig)
if rc:
logging.info("No need to upgrade, already up-to-date.")
return True
logging.info("Upgrading network config from {0} to {1}".format(networkConfigVersion, _networkConfigVersion()))
if networkConfigVersion > _networkConfigVersion():
logging.error("Cannot upgrade from a newer version to an older one!")
return False
try:
if networkConfigVersion == 0:
newNetworkConfig = {}
newNetworkConfig["serverHostname"] = rawNetworkConfig["serverHostname"] + "." + rawNetworkConfig["domain"]
newNetworkConfig["domain"] = rawNetworkConfig["domain"]
newNetworkConfig["realm"] = rawNetworkConfig["domain"].upper()
return writeNetworkConfig(newNetworkConfig)
except Exception as e:
logging.error("Error when upgrading network config!")
logging.exception(e)
return False
return True

View file

@ -1,46 +0,0 @@
#!/usr/bin/python3
templateUser = "linuxadmin"
userTemplateDir = "/home/" + templateUser
defaultDomainAdminUser = "global-admin"
# {} will be substituted for the username
shareMountBasepath = "/home/{}/media"
hiddenShareMountBasepath = "/srv/samba/{}"
machineAccountSysvolMountPath = "/var/lib/samba/sysvol"
etcBaseDir = "/etc/linuxmuster-linuxclient7"
shareBaseDir = "/usr/share/linuxmuster-linuxclient7"
configFileTemplateDir = shareBaseDir + "/templates"
scriptDir = shareBaseDir + "/scripts"
networkConfigFilePath = etcBaseDir + "/network.conf"
# {} will be substituted for the username
tmpEnvironmentFilePath = "/home/{}/.linuxmuster-linuxclient7-environment.sh"
notTemplatableFiles = ["/etc/sssd/sssd.conf", "/etc/linuxmuster-linuxclient7/network.conf"]
# cleanup
obsoleteFiles = [
"/etc/profile.d/99-linuxmuster.sh",
"/etc/sudoers.d/linuxmuster",
"/etc/profile.d/linuxmuster-proxy.sh",
"/etc/bash_completion.d/99-linuxmuster-client-adsso.sh",
"/etc/profile.d/99-linuxmuster-client-adsso.sh",
"/etc/sudoers.d/linuxmuster-client-adsso",
"/usr/sbin/linuxmuster-client-adsso",
"/usr/sbin/linuxmuster-client-adsso-print-logs",
"/etc/systemd/system/linuxmuster-client-adsso.service",
"{}/.config/autostart/linuxmuster-client-adsso-autostart.desktop".format(userTemplateDir),
"/etc/cups/client.conf",
"/usr/share/linuxmuster-linuxclient7/templates/linuxmuster-client-adsso.service",
"/usr/share/linuxmuster-linuxclient7/templates/linuxmuster-client-adsso-autostart.desktop",
"/etc/security/pam_mount.conf.xml",
"{}/pam_mount.conf.xml".format(configFileTemplateDir)
]
obsoleteDirectories = [
"/etc/linuxmuster-client",
"/etc/linuxmuster-client-adsso",
"/usr/share/linuxmuster-client-adsso"
]

View file

@ -1,60 +0,0 @@
import os
from linuxmusterLinuxclient7 import constants, user, logging
def export(keyValuePair):
"""
Export an environment variable
:param keyValuePair: Key value pair in format `key=value`
:type keyValuePait: str
:return: True or False
:rtype: bool
"""
logging.debug("Saving export '{}' to tmp file".format(keyValuePair))
envList = keyValuePair.split("=")
if len(envList) == 2:
os.putenv(envList[0], envList[1])
return _appendToTmpEnvFile("export", keyValuePair)
def unset(key):
"""
Unset a previously exported environment variable
:param key: The key to unset
:type key: str
:return: True or False
:rtype: bool
"""
logging.debug("Saving unset '{}' to tmp file".format(key))
return _appendToTmpEnvFile("unset", key)
# --------------------
# - Helper functions -
# --------------------
def _isApplicable():
if not user.isInAD():
logging.error("Modifying environment variables of non-AD users is not supported by lmn-export and lmn-unset!")
return False
elif "LinuxmusterLinuxclient7EnvFixActive" not in os.environ or os.environ["LinuxmusterLinuxclient7EnvFixActive"] != "1":
logging.error("lmn-export and lmn-unset may only be used inside of linuxmuster-linuxclient7 hooks!")
return False
else:
return True
def _appendToTmpEnvFile(mode, keyValuePair):
if not _isApplicable():
return False
tmpEnvironmentFilePath = constants.tmpEnvironmentFilePath.format(user.username())
fileOpenMode = "a" if os.path.exists(tmpEnvironmentFilePath) else "w"
try:
with open(tmpEnvironmentFilePath, fileOpenMode) as tmpEnvironmentFile:
tmpEnvironmentFile.write("\n{0} '{1}'".format(mode, keyValuePair))
return True
except Exception as e:
logging.exception(e)
return False

View file

@ -1,133 +0,0 @@
import os, shutil
from linuxmusterLinuxclient7 import logging
def removeLinesInFileContainingString(filePath, forbiddenStrings):
"""
Remove all lines containing a given string form a file.
:param filePath: The path to the file
:type filePath: str
:param forbiddenStrings: The string to search for
:type forbiddenStrings: str
:return: True on success, False otherwise
:rtype: bool
"""
if not isinstance(forbiddenStrings, list):
forbiddenStrings = [forbiddenStrings]
try:
with open(filePath, "r") as originalFile:
originalContents = originalFile.read()
except Exception as e:
logging.exception(e)
logging.warning("Could not read contents of original file")
return False
newContents = ""
for line in originalContents.split("\n"):
lineIsClean = True
for forbiddenString in forbiddenStrings:
lineIsClean = lineIsClean and not forbiddenString in line
if lineIsClean :
newContents += line + "\n"
try:
with open(filePath, "w") as originalFile:
originalFile.write(newContents)
except Exception as e:
logging.exception(e)
logging.warning("Could not write new contents to original file")
return False
return True
def deleteFile(filePath):
"""
Delete a file
:param filePath: The path of the file
:type filePath: str
:return: True on success, False otherwise
:rtype: bool
"""
try:
if os.path.exists(filePath):
os.unlink(filePath)
return True
except Exception as e:
logging.error("Failed!")
logging.exception(e)
return False
def deleteFilesWithExtension(directory, extension):
"""
Delete all files with a given extension in a given directory.
:param directory: The path of the directory
:type directory: str
:param extension: The file extension
:type extension: str
:return: True on success, False otherwise
:rtype: bool
"""
if directory.endswith("/"):
directory = directory[:-1]
if not os.path.exists(directory):
return True
existingFiles=os.listdir(directory)
for file in existingFiles:
if file.endswith(extension):
logging.info("* Deleting {}".format(file))
if not deleteFile("{}/{}".format(directory, file)):
logging.error("Failed!")
return False
return True
def deleteDirectory(directory):
"""
Recoursively delete a directory.
:param directory: The path of the directory
:type directory: bool
:return: True on success, False otherwise
:rtype: bool
"""
try:
shutil.rmtree(directory)
except:
return False
return True
def deleteAllInDirectory(directory):
"""
Delete all files in a given directory
:param directory: The path of the directory
:type directory: str
:return: True on success, False otherwise
:rtype: bool
"""
if directory.endswith("/"):
directory = directory[:-1]
if not os.path.exists(directory):
return True
existingFiles=os.listdir(directory)
for file in existingFiles:
fullFilePath = "{}/{}".format(directory, file)
if os.path.isdir(fullFilePath):
rc = deleteDirectory(fullFilePath)
else:
rc = deleteFile(fullFilePath)
if not rc:
logging.error("Failed!")
return False
return True

View file

@ -1,291 +0,0 @@
# Order of parsing: (overwriting each other)
# 1. Local (does not apply)
# 2. Site (does not apply)
# 3. Domain
# 4. OUs from top to bottom
import ldap, ldap.sasl, re, os.path
import xml.etree.ElementTree as ElementTree
from linuxmusterLinuxclient7 import logging, constants, config, user, ldapHelper, shares, computer, printers
def processAllPolicies():
"""
Process all applicable policies (equivalent to gpupdate on windows)
:return: True on success, False otherwise
:rtype: bool
"""
rc, policyDnList = _findApplicablePolicies()
if not rc:
logging.fatal("* Error when loading applicable GPOs! Shares and printers will not work.")
return False
for policyDn in policyDnList:
_parsePolicy(policyDn)
# --------------------
# - Helper functions -
# --------------------
def _parseGplinkSring(string):
# a gPLink strink looks like this:
# [LDAP://<link>;<status>][LDAP://<link>;<status>][...]
# The ragex matches <link> and <status> in two separate groups
# Note: "LDAP://" is matched as .*:// to prevent issues when the capitalization changes
pattern = re.compile("\\[.*:\\/\\/([^\\]]+)\\;([0-9]+)\\]")
return pattern.findall(string)
def _extractOUsFromDN(dn):
# NOT finished!
pattern = re.compile("OU=([^,]+),")
ouList = pattern.findall(dn)
# We need to parse from top to bottom
ouList.reverse()
return ouList
def _findApplicablePolicies():
policyDnList = []
""" Do this later!
# 1. Domain
rc, domainAdObject = ldapHelper.searchOne("(distinguishedName={})".format(ldapHelper.baseDn()))
if not rc:
return False, None
policyDNs.extend(_parseGplinkSring(domainAdObject["gPLink"]))
# 2. OU policies from top to bottom
rc, userAdObject = ldapHelper.searchOne("(sAMAccountName={})".format(user.username()))
if not rc:
return False, None
print(userAdObject["distinguishedName"])
"""
# For now, just parse policy sophomorix:school:<school name>
rc, schoolName = user.school()
if not rc:
return False, None
policyName = "sophomorix:school:{}".format(schoolName)
# find policy
rc, policyAdObject = ldapHelper.searchOne("(displayName={})".format(policyName))
if not rc:
return False, None
policyDnList.append((policyAdObject["distinguishedName"], 0))
return True, policyDnList
def _parsePolicy(policyDn):
logging.info("=== Parsing policy [{0};{1}] ===".format(policyDn[0], policyDn[1]))
# Check if the policy is disabled
if policyDn[1] == 1:
logging.info("===> Policy is disabled! ===")
return True
# Find policy in AD
rc, policyAdObject = ldapHelper.searchOne("(distinguishedName={})".format(policyDn[0]))
if not rc:
logging.error("===> Could not find poilcy in AD! ===")
return False, None
# mount the share the policy is on (probaply already mounted, just to be sure)
rc, localPolicyPath = shares.getMountpointOfRemotePath(policyAdObject["gPCFileSysPath"], hiddenShare = True, autoMount = True)
if not rc:
logging.error("===> Could not mount path of poilcy! ===")
return False, None
try:
# parse drives
# Skip Drive Policys (fvs change)
#_processDrivesPolicy(localPolicyPath)
# parse printers
_processPrintersPolicy(localPolicyPath)
except Exception as e:
logging.error("An error occured when parsing policy!")
logging.exception(e)
logging.info("===> Parsed policy [{0};{1}] ===".format(policyDn[0], policyDn[1]))
def _parseXmlFilters(filtersXmlNode):
if not filtersXmlNode.tag == "Filters":
logging.warning("Tried to parse a non-filter node as a filter!")
return []
filters = []
for xmlFilter in filtersXmlNode:
if xmlFilter.tag == "FilterGroup":
filters.append({
"name": xmlFilter.attrib["name"].split("\\")[1],
"bool": xmlFilter.attrib["bool"],
"userContext": xmlFilter.attrib["userContext"],
# userContext defines if the filter applies in user or computer context
"type": xmlFilter.tag
})
return filters
def _processFilters(policies):
filteredPolicies = []
for policy in policies:
if not len(policy["filters"]) > 0:
filteredPolicies.append(policy)
else:
filtersPassed = True
for filter in policy["filters"]:
logging.debug("Testing filter: {}".format(filter))
# TODO: check for AND and OR
if filter["bool"] == "AND":
filtersPassed = filtersPassed and _processFilter(filter)
elif filter["bool"] == "OR":
filtersPassed = filtersPassed or _processFilter(filter)
else:
logging.warning("Unknown boolean operation: {}! Cannot process filter!".format(filter["bool"]))
if filtersPassed:
filteredPolicies.append(policy)
return filteredPolicies
def _processFilter(filter):
if filter["type"] == "FilterGroup":
if filter["userContext"] == "1":
return user.isInGroup(filter["name"])
elif filter["userContext"] == "0":
return computer.isInGroup(filter["name"])
return False
def _parseXmlPolicy(policyFile):
if not os.path.isfile(policyFile):
logging.warning("==> XML policy file not found! ==")
return False, None
try:
tree = ElementTree.parse(policyFile)
return True, tree
except Exception as e:
logging.exception(e)
logging.error("==> Error while reading XML policy file! ==")
return False, None
def _processDrivesPolicy(policyBasepath):
logging.info("== Parsing a drive policy! ==")
policyFile = "{}/User/Preferences/Drives/Drives.xml".format(policyBasepath)
shareList = []
rc, tree = _parseXmlPolicy(policyFile)
if not rc:
logging.error("==> Error while reading Drives policy file, skipping! ==")
return
xmlDrives = tree.getroot()
if not xmlDrives.tag == "Drives":
logging.warning("==> Drive policy xml File is of invalid format, skipping! ==")
return
for xmlDrive in xmlDrives:
if xmlDrive.tag != "Drive" or ("disabled" in xmlDrive.attrib and xmlDrive.attrib["disabled"] == "1"):
continue
drive = {}
drive["filters"] = []
for xmlDriveProperty in xmlDrive:
if xmlDriveProperty.tag == "Properties":
try:
drive["label"] = xmlDriveProperty.attrib["label"]
drive["letter"] = xmlDriveProperty.attrib["letter"]
drive["path"] = xmlDriveProperty.attrib["path"]
drive["useLetter"] = xmlDriveProperty.attrib["useLetter"]
except Exception as e:
logging.warning("Exception when parsing a drive policy XML file")
logging.exception(e)
continue
if xmlDriveProperty.tag == "Filters":
drive["filters"] = _parseXmlFilters(xmlDriveProperty)
shareList.append(drive)
shareList = _processFilters(shareList)
logging.info("Found shares:")
for drive in shareList:
logging.info("* {:10}| {:5}| {:40}| {:5}".format(drive["label"], drive["letter"], drive["path"], drive["useLetter"]))
for drive in shareList:
if drive["useLetter"] == "1":
shareName = f"{drive['label']} ({drive['letter']}:)"
else:
shareName = drive["label"]
shares.mountShare(drive["path"], shareName=shareName)
logging.info("==> Successfully parsed a drive policy! ==")
def _processPrintersPolicy(policyBasepath):
logging.info("== Parsing a printer policy! ==")
policyFile = "{}/User/Preferences/Printers/Printers.xml".format(policyBasepath)
printerList = []
# test
rc, tree = _parseXmlPolicy(policyFile)
if not rc:
logging.error("==> Error while reading Printer policy file, skipping! ==")
return
xmlPrinters = tree.getroot()
if not xmlPrinters.tag == "Printers":
logging.warning("==> Printer policy xml File is of invalid format, skipping! ==")
return
for xmlPrinter in xmlPrinters:
if xmlPrinter.tag != "SharedPrinter" or ("disabled" in xmlPrinter.attrib and xmlPrinter.attrib["disabled"] == "1"):
continue
printer = {}
printer["filters"] = []
try:
printer["name"] = xmlPrinter.attrib["name"]
except Exception as e:
logging.warning("Exception when reading a printer name from a printer policy XML file")
logging.exception(e)
for xmlPrinterProperty in xmlPrinter:
if xmlPrinterProperty.tag == "Properties":
try:
rc, printerUrl = printers.translateSambaToIpp(xmlPrinterProperty.attrib["path"])
if rc:
printer["path"] = printerUrl
except Exception as e:
logging.warning("Exception when parsing a printer policy XML file")
logging.exception(e)
continue
if xmlPrinterProperty.tag == "Filters":
printer["filters"] = _parseXmlFilters(xmlPrinterProperty)
printerList.append(printer)
printerList = _processFilters(printerList)
logging.info("Found printers:")
for printer in printerList:
logging.info("* {0}\t\t| {1}\t| {2}".format(printer["name"], printer["path"], printer["filters"]))
printers.installPrinter(printer["path"], printer["name"])
logging.info("==> Successfully parsed a printer policy! ==")

View file

@ -1,290 +0,0 @@
# Order of parsing: (overwriting each other)
# 1. Local (does not apply)
# 2. Site (does not apply)
# 3. Domain
# 4. OUs from top to bottom
import ldap, ldap.sasl, re, os.path
import xml.etree.ElementTree as ElementTree
from linuxmusterLinuxclient7 import logging, constants, config, user, ldapHelper, shares, computer, printers
def processAllPolicies():
"""
Process all applicable policies (equivalent to gpupdate on windows)
:return: True on success, False otherwise
:rtype: bool
"""
rc, policyDnList = _findApplicablePolicies()
if not rc:
logging.fatal("* Error when loading applicable GPOs! Shares and printers will not work.")
return False
for policyDn in policyDnList:
_parsePolicy(policyDn)
# --------------------
# - Helper functions -
# --------------------
def _parseGplinkSring(string):
# a gPLink strink looks like this:
# [LDAP://<link>;<status>][LDAP://<link>;<status>][...]
# The ragex matches <link> and <status> in two separate groups
# Note: "LDAP://" is matched as .*:// to prevent issues when the capitalization changes
pattern = re.compile("\\[.*:\\/\\/([^\\]]+)\\;([0-9]+)\\]")
return pattern.findall(string)
def _extractOUsFromDN(dn):
# NOT finished!
pattern = re.compile("OU=([^,]+),")
ouList = pattern.findall(dn)
# We need to parse from top to bottom
ouList.reverse()
return ouList
def _findApplicablePolicies():
policyDnList = []
""" Do this later!
# 1. Domain
rc, domainAdObject = ldapHelper.searchOne("(distinguishedName={})".format(ldapHelper.baseDn()))
if not rc:
return False, None
policyDNs.extend(_parseGplinkSring(domainAdObject["gPLink"]))
# 2. OU policies from top to bottom
rc, userAdObject = ldapHelper.searchOne("(sAMAccountName={})".format(user.username()))
if not rc:
return False, None
print(userAdObject["distinguishedName"])
"""
# For now, just parse policy sophomorix:school:<school name>
rc, schoolName = user.school()
if not rc:
return False, None
policyName = "sophomorix:school:{}".format(schoolName)
# find policy
rc, policyAdObject = ldapHelper.searchOne("(displayName={})".format(policyName))
if not rc:
return False, None
policyDnList.append((policyAdObject["distinguishedName"], 0))
return True, policyDnList
def _parsePolicy(policyDn):
logging.info("=== Parsing policy [{0};{1}] ===".format(policyDn[0], policyDn[1]))
# Check if the policy is disabled
if policyDn[1] == 1:
logging.info("===> Policy is disabled! ===")
return True
# Find policy in AD
rc, policyAdObject = ldapHelper.searchOne("(distinguishedName={})".format(policyDn[0]))
if not rc:
logging.error("===> Could not find poilcy in AD! ===")
return False, None
# mount the share the policy is on (probaply already mounted, just to be sure)
rc, localPolicyPath = shares.getMountpointOfRemotePath(policyAdObject["gPCFileSysPath"], hiddenShare = True, autoMount = True)
if not rc:
logging.error("===> Could not mount path of poilcy! ===")
return False, None
try:
# parse drives
_processDrivesPolicy(localPolicyPath)
# parse printers
_processPrintersPolicy(localPolicyPath)
except Exception as e:
logging.error("An error occured when parsing policy!")
logging.exception(e)
logging.info("===> Parsed policy [{0};{1}] ===".format(policyDn[0], policyDn[1]))
def _parseXmlFilters(filtersXmlNode):
if not filtersXmlNode.tag == "Filters":
logging.warning("Tried to parse a non-filter node as a filter!")
return []
filters = []
for xmlFilter in filtersXmlNode:
if xmlFilter.tag == "FilterGroup":
filters.append({
"name": xmlFilter.attrib["name"].split("\\")[1],
"bool": xmlFilter.attrib["bool"],
"userContext": xmlFilter.attrib["userContext"],
# userContext defines if the filter applies in user or computer context
"type": xmlFilter.tag
})
return filters
def _processFilters(policies):
filteredPolicies = []
for policy in policies:
if not len(policy["filters"]) > 0:
filteredPolicies.append(policy)
else:
filtersPassed = True
for filter in policy["filters"]:
logging.debug("Testing filter: {}".format(filter))
# TODO: check for AND and OR
if filter["bool"] == "AND":
filtersPassed = filtersPassed and _processFilter(filter)
elif filter["bool"] == "OR":
filtersPassed = filtersPassed or _processFilter(filter)
else:
logging.warning("Unknown boolean operation: {}! Cannot process filter!".format(filter["bool"]))
if filtersPassed:
filteredPolicies.append(policy)
return filteredPolicies
def _processFilter(filter):
if filter["type"] == "FilterGroup":
if filter["userContext"] == "1":
return user.isInGroup(filter["name"])
elif filter["userContext"] == "0":
return computer.isInGroup(filter["name"])
return False
def _parseXmlPolicy(policyFile):
if not os.path.isfile(policyFile):
logging.warning("==> XML policy file not found! ==")
return False, None
try:
tree = ElementTree.parse(policyFile)
return True, tree
except Exception as e:
logging.exception(e)
logging.error("==> Error while reading XML policy file! ==")
return False, None
def _processDrivesPolicy(policyBasepath):
logging.info("== Parsing a drive policy! ==")
policyFile = "{}/User/Preferences/Drives/Drives.xml".format(policyBasepath)
shareList = []
rc, tree = _parseXmlPolicy(policyFile)
if not rc:
logging.error("==> Error while reading Drives policy file, skipping! ==")
return
xmlDrives = tree.getroot()
if not xmlDrives.tag == "Drives":
logging.warning("==> Drive policy xml File is of invalid format, skipping! ==")
return
for xmlDrive in xmlDrives:
if xmlDrive.tag != "Drive" or ("disabled" in xmlDrive.attrib and xmlDrive.attrib["disabled"] == "1"):
continue
drive = {}
drive["filters"] = []
for xmlDriveProperty in xmlDrive:
if xmlDriveProperty.tag == "Properties":
try:
drive["label"] = xmlDriveProperty.attrib["label"]
drive["letter"] = xmlDriveProperty.attrib["letter"]
drive["path"] = xmlDriveProperty.attrib["path"]
drive["useLetter"] = xmlDriveProperty.attrib["useLetter"]
except Exception as e:
logging.warning("Exception when parsing a drive policy XML file")
logging.exception(e)
continue
if xmlDriveProperty.tag == "Filters":
drive["filters"] = _parseXmlFilters(xmlDriveProperty)
shareList.append(drive)
shareList = _processFilters(shareList)
logging.info("Found shares:")
for drive in shareList:
logging.info("* {:10}| {:5}| {:40}| {:5}".format(drive["label"], drive["letter"], drive["path"], drive["useLetter"]))
for drive in shareList:
if drive["useLetter"] == "1":
shareName = f"{drive['label']} ({drive['letter']}:)"
else:
shareName = drive["label"]
shares.mountShare(drive["path"], shareName=shareName)
logging.info("==> Successfully parsed a drive policy! ==")
def _processPrintersPolicy(policyBasepath):
logging.info("== Parsing a printer policy! ==")
policyFile = "{}/User/Preferences/Printers/Printers.xml".format(policyBasepath)
printerList = []
# test
rc, tree = _parseXmlPolicy(policyFile)
if not rc:
logging.error("==> Error while reading Printer policy file, skipping! ==")
return
xmlPrinters = tree.getroot()
if not xmlPrinters.tag == "Printers":
logging.warning("==> Printer policy xml File is of invalid format, skipping! ==")
return
for xmlPrinter in xmlPrinters:
if xmlPrinter.tag != "SharedPrinter" or ("disabled" in xmlPrinter.attrib and xmlPrinter.attrib["disabled"] == "1"):
continue
printer = {}
printer["filters"] = []
try:
printer["name"] = xmlPrinter.attrib["name"]
except Exception as e:
logging.warning("Exception when reading a printer name from a printer policy XML file")
logging.exception(e)
for xmlPrinterProperty in xmlPrinter:
if xmlPrinterProperty.tag == "Properties":
try:
rc, printerUrl = printers.translateSambaToIpp(xmlPrinterProperty.attrib["path"])
if rc:
printer["path"] = printerUrl
except Exception as e:
logging.warning("Exception when parsing a printer policy XML file")
logging.exception(e)
continue
if xmlPrinterProperty.tag == "Filters":
printer["filters"] = _parseXmlFilters(xmlPrinterProperty)
printerList.append(printer)
printerList = _processFilters(printerList)
logging.info("Found printers:")
for printer in printerList:
logging.info("* {0}\t\t| {1}\t| {2}".format(printer["name"], printer["path"], printer["filters"]))
printers.installPrinter(printer["path"], printer["name"])
logging.info("==> Successfully parsed a printer policy! ==")

View file

@ -1,219 +0,0 @@
#
# This is used to run hooks
#
from enum import Enum
import os, subprocess
from linuxmusterLinuxclient7 import logging, constants, user, config, computer, environment, setup, shares
class Type(Enum):
"""
Enum containing all hook types
"""
Boot = 0
"""The onBoot hook
"""
Shutdown = 1
"""The on Shutdown hook
"""
LoginAsRoot = 2
"""The onLoginAsRoot hook
"""
Login = 3
"""The onLogin hook
"""
SessionStarted = 4
"""The onSession started hook
"""
LogoutAsRoot = 5
LoginLogoutAsRoot = 6
remoteScriptNames = {
Type.Boot: "sysstart.sh",
Type.Login: "logon.sh",
Type.SessionStarted: "sessionstart.sh",
Type.Shutdown: "sysstop.sh"
}
_remoteScriptInUserContext = {
Type.Boot: False,
Type.Login: True,
Type.SessionStarted: True,
Type.Shutdown: False
}
def runLocalHook(hookType):
"""
Run all scripts in a local hookdir
:param hookType: The type of hook to run
:type hookType: hooks.Type
"""
logging.info("=== Running local hook on{0} ===".format(hookType.name))
hookDir = _getLocalHookDir(hookType)
if os.path.exists(hookDir):
_prepareEnvironment()
for fileName in sorted(os.listdir(hookDir)):
filePath = hookDir + "/" + fileName
_runHookScript(filePath)
logging.info("===> Finished running local hook on{0} ===".format(hookType.name))
def runRemoteHook(hookType):
"""
Run hookscript from sysvol
:param hookType: The type of hook to run
:type hookType: hooks.Type
"""
logging.info("=== Running remote hook on{0} ===".format(hookType.name))
rc, hookScripts = _getRemoteHookScripts(hookType)
if rc:
_prepareEnvironment()
_runHookScript(hookScripts[0])
_runHookScript(hookScripts[1])
logging.info("===> Finished running remote hook on{0} ===".format(hookType.name))
def runHook(hookType):
"""
Executes hooks.runLocalHook() and hooks.runRemoteHook()
:param hookType: The type of hook to run
:type hookType: hooks.Type
"""
runLocalHook(hookType)
runRemoteHook(hookType)
def getLocalHookScript(hookType):
"""Get the path of a local hookscript
:param hookType: The type of hook script to get the path for
:type hookType: hooks.Type
:return: The path
:rtype: str
"""
return "{0}/on{1}".format(constants.scriptDir,hookType.name)
def shouldHooksBeExecuted(overrideUsername=None):
"""Check if hooks should be executed
:param overrideUsername: Override the username to check, defaults to None
:type overrideUsername: str, optional
:return: True if hooks should be executed, fale otherwise
:rtype: bool
"""
# check if linuxmuster-linuxclient7 is setup
if not setup.isSetup():
logging.info("==== Linuxmuster-linuxclient7 is not setup, exiting ====")
return False
# check if the computer is joined
if not computer.isInAD():
logging.info("==== This Client is not joined to any domain, exiting ====")
return False
# Check if the user is an AD user
if overrideUsername == None:
overrideUsername = user.username()
if not user.isUserInAD(overrideUsername):
logging.info("==== {0} is not an AD user, exiting ====".format(user.username()))
return False
return True
# --------------------
# - Helper functions -
# --------------------
def _prepareEnvironment():
dictsAndPrefixes = {}
rc, networkConfig = config.network()
if rc:
dictsAndPrefixes["Network"] = networkConfig
rc, userConfig = user.readAttributes()
if rc:
dictsAndPrefixes["User"] = userConfig
rc, computerConfig = computer.readAttributes()
if rc:
dictsAndPrefixes["Computer"] = computerConfig
environment = _dictsToEnv(dictsAndPrefixes)
_writeEnvironment(environment)
def _getLocalHookDir(hookType):
return "{0}/on{1}.d".format(constants.etcBaseDir,hookType.name)
def _getRemoteHookScripts(hookType):
if not hookType in remoteScriptNames:
return False, None
rc, networkConfig = config.network()
if not rc:
logging.error("Could not execute server hooks because the network config could not be read")
return False, None
if _remoteScriptInUserContext[hookType]:
rc, attributes = user.readAttributes()
if not rc:
logging.error("Could not execute server hooks because the user config could not be read")
return False, None
else:
rc, attributes = computer.readAttributes()
if not rc:
logging.error("Could not execute server hooks because the computer config could not be read")
return False, None
try:
domain = networkConfig["domain"]
school = attributes["sophomorixSchoolname"]
scriptName = remoteScriptNames[hookType]
except:
logging.error("Could not execute server hooks because the computer/user config is missing attributes")
return False, None
rc, sysvolPath = shares.getLocalSysvolPath()
if not rc:
logging.error("Could not execute server hook {} because the sysvol could not be mounted!\n")
return False, None
hookScriptPathTemplate = "{0}/{1}/scripts/{2}/{3}/linux/{4}".format(sysvolPath, domain, school, "{}", scriptName)
return True, [hookScriptPathTemplate.format("lmn"), hookScriptPathTemplate.format("custom")]
# parameter must be a dict of {"prefix": dict}
def _dictsToEnv(dictsAndPrefixes):
environmentDict = {}
for prefix in dictsAndPrefixes:
for key in dictsAndPrefixes[prefix]:
if type(dictsAndPrefixes[prefix][key]) is list:
environmentDict[prefix + "_" + key] = "\n".join(dictsAndPrefixes[prefix][key])
else:
environmentDict[prefix + "_" + key] = dictsAndPrefixes[prefix][key]
return environmentDict
def _runHookScript(filePath):
if not os.path.isfile(filePath):
logging.warning("* File {0} should be executed as hook but does not exist!".format(filePath))
return
if not os.access(filePath, os.X_OK):
logging.warning("* File {0} is in hook dir but not executable!".format(filePath))
return
logging.info("== Executing script {0} ==".format(filePath))
result = subprocess.call([filePath])
logging.info("==> Script {0} finished with exit code {1} ==".format(filePath, result))
def _writeEnvironment(environment):
for key in environment:
os.putenv(key, environment[key])

View file

@ -1,220 +0,0 @@
import os, subprocess, shutil
from linuxmusterLinuxclient7 import logging, setup, realm, user, constants, printers, fileHelper
def prepareForImage(unattended=False):
"""Prepare the computer for creating an image
:param unattended: If set to True, all questions will be answered with yes, defaults to False
:type unattended: bool, optional
:return: True on success, False otherwise
:rtype: bool
"""
logging.info("#### Image preparation ####")
try:
if not _testDomainJoin(unattended):
return False
if not _upgradeSystem(unattended):
return False
if not _clearCaches(unattended):
return False
if not _clearUserHomes(unattended):
return False
if not _clearUserCache(unattended):
return False
if not _clearPrinters(unattended):
return False
if not _clearLogs(unattended):
return False
if not _emptyTrash(unattended):
return False
except KeyboardInterrupt:
print()
logging.info("Cancelled.")
return False
print()
logging.info("#### Image preparation done ####")
logging.info("#### You may create an Image now :) ####")
print()
return True
# --------------------
# - Helper functions -
# --------------------
def _askStep(step, printPlaceholder=True):
if printPlaceholder:
print()
response = input("Do you want to {}? (y/n): ".format(step))
result = response in ["y", "Y", "j", "J"]
if result:
print()
return result
def _testDomainJoin(unattended=False):
if not unattended and not _askStep("test if the domain join works"):
return True
return setup.status()
def _upgradeSystem(unattended=False):
if not unattended and not _askStep("update this computer now"):
return True
# Perform an update
logging.info("Updating this computer now...")
if subprocess.call(["apt", "update"]) != 0:
logging.error("apt update failed!")
return False
if subprocess.call(["apt", "dist-upgrade", "-y"]) != 0:
logging.error("apt dist-upgrade failed!")
return False
if subprocess.call(["apt", "autoremove", "-y"]) != 0:
logging.error("apt autoremove failed!")
return False
if subprocess.call(["apt", "clean", "-y"]) != 0:
logging.error("apt clean failed!")
return False
return True
def _clearCaches(unattended=False):
if not unattended and not _askStep("clear journalctl and apt caches now"):
return True
logging.info("Cleaning caches..")
logging.info("* apt")
fileHelper.deleteAllInDirectory("/var/lib/apt/lists/")
logging.info("* journalctl")
subprocess.call(["journalctl", "--flush", "--rotate"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
subprocess.call(["journalctl", "--vacuum-time=1s"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logging.info("Done.")
return True
def _checkLoggedInUsers():
result = subprocess.run("who -s | awk '{ print $1 }'", stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
if result.returncode != 0:
logging.error("Failed to get logged in users!")
return False, None
loggedInUsers = list(filter(None, result.stdout.split("\n")))
for loggedInUser in loggedInUsers:
if user.isUserInAD(loggedInUser):
logging.error("User {} is still logged in, please log out first! Aborting!".format(loggedInUser))
return False
return True
def _clearUserCache(unattended=False):
if not unattended and not _askStep("clear all cached users now"):
return True
if not _checkLoggedInUsers():
return False
realm.clearUserCache()
logging.info("Done.")
return realm.clearUserCache()
def _unmountAllCifsMounts():
logging.info("Unmounting all CIFS mounts!")
if subprocess.call(["umount", "-a", "-t", "cifs", "-l"]) != 0:
logging.info("Failed!")
return False
# double check (just to be sure)
result = subprocess.run("mount", stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
if result.returncode != 0:
logging.error("Failed to get mounts!")
return False
if ("cifs" in result.stdout) or ("CIFS" in result.stdout):
logging.error("There are still shares mounted!")
logging.info("Use \"mount | grep cifs\" to view them.")
return False
return True
def _clearUserHomes(unattended=False):
print("\nCAUTION! This will delete all userhomes of AD users!")
if not unattended and not _askStep("clear all user homes now", False):
return True
if not _checkLoggedInUsers():
return False
if not _unmountAllCifsMounts():
logging.info("Aborting deletion of user homes to prevent deleting data on the server.")
return False
userHomes = os.listdir("/home")
logging.info("Deleting all user homes now!")
for userHome in userHomes:
if not user.isUserInAD(userHome):
logging.info("* {} [SKIPPED]".format(userHome))
continue
logging.info("* {}".format(userHome))
try:
shutil.rmtree("/home/{}".format(userHome))
except Exception as e:
logging.error("* FAILED!")
logging.exception(e)
try:
shutil.rmtree(constants.hiddenShareMountBasepath.format(userHome))
except:
pass
logging.info("Done.")
return True
def _clearPrinters(unattended=False):
print("\nCAUTION! This will delete all printers of {}!".format(constants.templateUser))
print("This makes sure that local printers do not conflict with remote printers defined by GPOs.")
if not unattended and not _askStep("remove all local printers of {}".format(constants.templateUser), False):
return True
if not printers.uninstallAllPrintersOfUser(constants.templateUser):
return False
return True
def _clearLogs(unattended=False):
if not unattended and not _askStep("clear the syslog"):
return True
if not fileHelper.deleteFile("/var/log/syslog"):
return False
subprocess.call(["sudo", "service", "rsyslog", "restart"])
return True
def _emptyTrash(unattended=False):
if not unattended and not _askStep("clear the Trash of linuxadmin"):
return True
if not fileHelper.deleteAllInDirectory("/home/{}/.local/share/Trash".format(constants.templateUser)):
return False
return True

View file

@ -1,52 +0,0 @@
from krb5KeytabUtil import Krb5KeytabUtil
from linuxmusterLinuxclient7 import computer, config, logging
def patchKeytab():
"""
Patches the `/etc/krb5.keytab` file. It inserts the correct hostname of the current computer.
:return: True on success, False otherwise
:rtype: bool
"""
krb5KeytabFilePath = "/etc/krb5.keytab"
logging.info("Patching {}".format(krb5KeytabFilePath))
krb5KeytabUtil = Krb5KeytabUtil(krb5KeytabFilePath)
try:
krb5KeytabUtil.read()
except:
logging.error("Error reading {}".format(krb5KeytabFilePath))
return False
for entry in krb5KeytabUtil.keytab.entries:
oldData = entry.principal.components[-1].data
if len(entry.principal.components) == 1:
newData = computer.hostname().upper() + "$"
entry.principal.components[0].data = newData
elif len(entry.principal.components) == 2 and (entry.principal.components[0].data == "host" or entry.principal.components[0].data == "RestrictedKrbHost"):
rc, networkConfig = config.network()
if not rc:
continue
newData = ""
domain = networkConfig["domain"]
if domain in entry.principal.components[1].data:
newData = computer.hostname().lower() + "." + domain
else:
newData = computer.hostname().upper()
entry.principal.components[1].data = newData
logging.debug("{} was changed to {}".format(oldData, entry.principal.components[-1].data))
logging.info("Trying to overwrite {}".format(krb5KeytabFilePath))
try:
result = krb5KeytabUtil.write()
except:
result = False
if not result:
logging.error("Error overwriting {}".format(krb5KeytabFilePath))
return result

View file

@ -1,148 +0,0 @@
import ldap, ldap.sasl, sys, getpass, subprocess
from linuxmusterLinuxclient7 import logging, constants, config, user, computer
_currentLdapConnection = None
def serverUrl():
"""
Returns the server URL
:return: The server URL
:rtype: str
"""
rc, networkConfig = config.network()
if not rc:
return False, None
serverHostname = networkConfig["serverHostname"]
return 'ldap://{0}'.format(serverHostname)
def baseDn():
"""
Returns the base DN
:return: The baseDN
:rtype: str
"""
rc, networkConfig = config.network()
if not rc:
return None
domain = networkConfig["domain"]
return "dc=" + domain.replace(".", ",dc=")
def conn():
"""
Returns the ldap connection object
:return: The ldap connection object
:rtype: ldap.ldapobject.LDAPObject
"""
global _currentLdapConnection
if _connect():
return _currentLdapConnection
return None
def searchOne(filter):
"""Searches the LDAP with a filter and returns the first found object
:param filter: A valid ldap filter
:type filter: str
:return: Tuple (success, ldap object as dict)
:rtype: tuple
"""
if conn() == None:
logging.error("Cannot talk to LDAP")
return False, None
try:
rawResult = conn().search_s(
baseDn(),
ldap.SCOPE_SUBTREE,
filter
)
except Exception as e:
logging.error("Error executing LDAP search!")
logging.exception(e)
return False, None
try:
result = {}
if len(rawResult) <= 0 or rawResult[0][0] == None:
logging.debug(f"Search \"{filter}\" did not return any objects")
return False, None
for k in rawResult[0][1]:
if rawResult[0][1][k] != None:
rawAttribute = rawResult[0][1][k]
try:
if len(rawAttribute) == 1:
result[k] = str(rawAttribute[0].decode())
elif len(rawAttribute) > 0:
result[k] = []
for rawItem in rawAttribute:
result[k].append(str(rawItem.decode()))
except UnicodeDecodeError:
continue
return True, result
except Exception as e:
logging.error("Error while reading ldap search results!")
logging.exception(e)
return False, None
def isObjectInGroup(objectDn, groupName):
"""
Check if a given object is in a given group
:param objectDn: The DN of the object
:type objectDn: str
:param groupName: The name of the group
:type groupName: str
:return: True if it is a member, False otherwise
:rtype: bool
"""
logging.debug("= Testing if object {0} is a member of group {1} =".format(objectDn, groupName))
rc, groupAdObject = searchOne("(&(member:1.2.840.113556.1.4.1941:={0})(sAMAccountName={1}))".format(objectDn, groupName))
logging.debug("=> Result: {} =".format(rc))
return rc
# --------------------
# - Helper functions -
# --------------------
def _connect():
global _currentLdapConnection
if not user.isInAD() and not (user.isRoot() or not computer.isInAD()):
logging.warning("Cannot perform LDAP search: User is not in AD!")
_currentLdapConnection = None
return False
if not _currentLdapConnection == None:
return True
try:
sasl_auth = ldap.sasl.sasl({} ,'GSSAPI')
_currentLdapConnection = ldap.initialize(serverUrl(), trace_level=0)
# TODO:
# conn.set_option(ldap.OPT_X_TLS_CACERTFILE, '/path/to/ca.pem')
# conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
# conn.start_tls_s()
_currentLdapConnection.set_option(ldap.OPT_REFERRALS,0)
_currentLdapConnection.protocol_version = ldap.VERSION3
_currentLdapConnection.sasl_interactive_bind_s("", sasl_auth)
except Exception as e:
_currentLdapConnection = None
logging.error("Cloud not bind to ldap!")
logging.exception(e)
return False
return True

View file

@ -1,19 +0,0 @@
import subprocess
from linuxmusterLinuxclient7 import logging
def getGroupsOfLocalUser(username):
"""
Get all groups of a local user
:param username: The username of the user
:type username: str
:return: Tuple (success, list of groups)
:rtype: tuple
"""
try:
groups = subprocess.check_output(["id", "-Gnz", username])
stringList=[x.decode('utf-8') for x in groups.split(b"\x00")]
return True, stringList
except Exception as e:
logging.warning("Exception when querying groups of user {}, it probaply does not exist".format(username))
return False, None

View file

@ -1,130 +0,0 @@
import logging, os, traceback, re, sys, subprocess
from enum import Enum
from linuxmusterLinuxclient7 import user, config
class Level(Enum):
DEBUG = 0
INFO = 1
WARNING = 2
ERROR = 3
FATAL = 4
def debug(message):
"""
Do a debug log.
:param message: The message to log
:type message: str
"""
_log(Level.DEBUG, message)
def info(message):
"""
Do an info log.
:param message: The message to log
:type message: str
"""
_log(Level.INFO, message)
def warning(message):
"""
Do a warning log.
:param message: The message to log
:type message: str
"""
_log(Level.WARNING, message)
def error(message):
"""
Do an error log.
:param message: The message to log
:type message: str
"""
_log(Level.ERROR, message)
def fatal(message):
"""
Do a fatal log. If used in onLogin hook, this will create a dialog containing the message.
:param message: The message to log
:type message: str
"""
_log(Level.FATAL, message)
def exception(exception):
"""
Log an exception
:param exception: The exception to log
:type exception: Exception
"""
error("=== An exception occurred ===")
error(str(exception))
# Only use for debugging! This will cause ugly error dialogs in X11
#traceback.print_tb(exception.__traceback__)
error("=== end exception ===")
def printLogs(compact=False,anonymize=False):
"""
Print logs of linuxmuster-linuxclient7 from `/var/log/syslog`.
:param compact: If set to True, some stuff like time and date will be removed. Defaults to False
:type compact: bool, optional
:param anonymize: If set to True, domain/realm/serverHostname will be replaced by linuxmuster.lan. Defaults to False
:type anonymize: bool, optional
"""
print("===========================================")
print("=== Linuxmuster-linuxclient7 logs begin ===")
(rc, networkConfig) = config.network()
if rc:
domain = networkConfig["domain"]
serverHostname = networkConfig["serverHostname"]
realm= networkConfig["realm"]
with open("/var/log/syslog") as logfile:
startPattern = re.compile("^.*linuxmuster-linuxclient7[^>]+======$")
endPattern = re.compile("^.*linuxmuster-linuxclient7.*======>.*$")
currentlyInsideOfLinuxmusterLinuxclient7Log = False
for line in logfile:
line = line.replace("\n", "")
if startPattern.fullmatch(line):
currentlyInsideOfLinuxmusterLinuxclient7Log = True
print("\n")
if currentlyInsideOfLinuxmusterLinuxclient7Log:
if compact:
# "^([^ ]+[ ]+){4}" matches "Apr 6 14:39:23 somehostname"
line = re.sub("^([^ ]+[ ]+){4}", "", line)
if anonymize and rc:
line = re.sub(serverHostname, "server.linuxmuster.lan", line)
line = re.sub(domain, "linuxmuster.lan", line)
line = re.sub(realm, "LINUXMUSTER.LAN", line)
print(line)
if endPattern.fullmatch(line):
currentlyInsideOfLinuxmusterLinuxclient7Log = False
print("\n")
print("=== Linuxmuster-linuxclient7 logs end ===")
print("=========================================")
# --------------------
# - Helper functions -
# --------------------
def _log(level, message):
#if level == Level.DEBUG:
# return
if level == Level.FATAL:
sys.stderr.write(message)
print("[{0}] {1}".format(level.name, message))
message = message.replace("'", "")
subprocess.call(["logger", "-t", "linuxmuster-linuxclient7", f"[{level.name}] {message}"])

View file

@ -1,129 +0,0 @@
import os, subprocess, re
from linuxmusterLinuxclient7 import logging, user
def installPrinter(networkPath, name=None, username=None):
"""
Installs a networked printer for a user
:param networkPath: The network path of the printer
:type networkPath: str
:param name: The name for the printer, defaults to None
:type name: str, optional
:param username: The username of the user whom the is installed printer for. Defaults to the executing user
:type username: str, optional
:return: True on success, False otherwise
:rtype: bool
"""
if username == None:
username = user.username()
if user.isRoot():
return _installPrinter(username, networkPath, name)
else:
# This will call installPrinter() again with root privileges
return _installPrinterWithoutRoot(networkPath, name)
pass
def uninstallAllPrintersOfUser(username):
"""
Uninstalls all printers of a given user
:param username: The username of the user
:type username: str
:return: True on success, False otherwise
:rtype: bool
"""
logging.info("Uninstalling all printers of {}".format(username))
rc, installedPrinters = _getInstalledPrintersOfUser(username)
if not rc:
logging.error("Error getting printers!")
return False
for installedPrinter in installedPrinters:
if not _uninstallPrinter(installedPrinter):
return False
return True
def translateSambaToIpp(networkPath):
"""
Translates a samba url, like `\\server\PRINTER-01`, to an ipp url like `ipp://server/printers/PRINTER-01`.
:param networkPath: The samba url
:type networkPath: str
:return: An ipp url
:rtype: str
"""
networkPath = networkPath.replace("\\", "/")
# path has to be translated: \\server\EW-FARBLASER -> ipp://server/printers/EW-farblaser
pattern = re.compile("\\/\\/([^/]+)\\/(.*)")
result = pattern.findall(networkPath)
if len(result) != 1 or len(result[0]) != 2:
logging.error("Cannot convert printer network path from samba to ipp, as it is invalid: {}".format(networkPath))
return False, None
ippNetworkPath = "ipp://{0}/printers/{1}".format(result[0][0], result[0][1])
return True, ippNetworkPath
# --------------------
# - Helper functions -
# --------------------
def _installPrinter(username, networkPath, name):
logging.info("Install Printer {0} on {1}".format(name, networkPath))
installCommand = ["timeout", "10", "lpadmin", "-p", name, "-E", "-v", networkPath, "-m", "everywhere", "-u", f"allow:{username}"]
logging.debug("* running '{}'".format(" ".join(installCommand)))
p = subprocess.call(installCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if p == 0:
logging.debug("* Success Install Printer!")
return True
elif p == 124:
logging.fatal(f"* Timeout error while installing printer {name} on {networkPath}")
else:
logging.fatal(f"* Error installing printer {name} on {networkPath}!")
return False
def _installPrinterWithoutRoot(networkPath, name):
return subprocess.call(["sudo", "/usr/share/linuxmuster-linuxclient7/scripts/sudoTools", "install-printer", "--path", networkPath, "--name", name]) == 0
def _getInstalledPrintersOfUser(username):
logging.info(f"Getting installed printers of {username}")
command = f"lpstat -U {username} -p"
#logging.debug("running '{}'".format(command))
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
if not result.returncode == 0:
logging.info("No Printers installed.")
return True, []
rawInstalledPrinters = list(filter(None, result.stdout.split("\n")))
installedPrinters = []
for rawInstalledPrinter in rawInstalledPrinters:
rawInstalledPrinterList = list(filter(None, rawInstalledPrinter.split(" ")))
if len(rawInstalledPrinterList) < 2:
continue
installedPrinter = rawInstalledPrinterList[1]
installedPrinters.append(installedPrinter)
return True, installedPrinters
def _uninstallPrinter(name):
logging.info("Uninstall Printer {}".format(name))
uninstallCommand = ["timeout", "10", "lpadmin", "-x", name]
logging.debug("* running '{}'".format(" ".join(uninstallCommand)))
p = subprocess.call(uninstallCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if p == 0:
logging.debug("* Success Uninstall Printer!")
return True
elif p == 124:
logging.fatal(f"* Timeout error while installing printer {name}")
else:
logging.fatal(f"* Error Uninstalling Printer {name}!")
return False

View file

@ -1,195 +0,0 @@
import os, sys, subprocess, configparser
from linuxmusterLinuxclient7 import logging, computer
def join(domain, user):
"""
Joins the computer to an AD
:param domain: The domain to join
:type domain: str
:param user: The admin user used for joining
:type user: str
:return: True on success, False otherwise
:rtype: bool
"""
# join the domain using the kerberos ticket
joinCommand = ["realm", "join", "-v", domain, "-U", user]
if subprocess.call(joinCommand) != 0:
print()
logging.error('Failed! Did you enter the correct password?')
return False
logging.info("It looks like the domain was joined successfully.")
return True
def leave(domain):
"""
Leave a domain
:param domain: The domain to leave
:type domain: str
:return: True on success, False otherwise
:rtype: bool
"""
leaveCommand = ["realm", "leave", domain]
return subprocess.call(leaveCommand) == 0
def leaveAll():
"""
Leaves all joined domains
:return: True on success, False otherwise
:rtype: bool
"""
logging.info("Cleaning / leaving all domain joins")
rc, joinedDomains = getJoinedDomains()
if not rc:
return False
for joinedDomain in joinedDomains:
logging.info("* {}".format(joinedDomain))
if not leave(joinedDomain):
logging.error("-> Failed! Aborting!")
return False
logging.info("-> Done!")
return True
def isJoined():
"""
Checks if the computer is joined to a domain
:return: True if it is joined to one or more domains, False otherwise
:rtype: bool
"""
rc, joinedDomains = getJoinedDomains()
if not rc:
return False
else:
return len(joinedDomains) > 0
def pullKerberosTicketForComputerAccount():
"""
Pulls a kerberos ticket using the computer account from `/etc/krb5.keytab`
:return: True on success, False otherwise
:rtype: bool
"""
return subprocess.call(["kinit", "-k", computer.krbHostName()]) == 0
def verifyDomainJoin():
"""
Checks if the domain join actually works.
:return: True if it does, False otherwise
:rtype: bool
"""
logging.info("Testing if the domain join actually works")
if not isJoined():
logging.error("No domain is joined!")
return False
logging.info("* Checking if the group \"domain users\" exists")
if subprocess.call(["getent", "group", "domain users"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) != 0:
logging.error("The \"domain users\" group does not exists! Users wont be able to log in!")
logging.error("This is sometimes related to /etc/nsswitch.conf.")
return False
# Try to get a kerberos ticket for the computer account
logging.info("* Trying to get a kerberos ticket for the Computer Account")
if not pullKerberosTicketForComputerAccount():
logging.error("Could not get a kerberos ticket for the Computer Account!")
logging.error("Logins of non-cached users WILL NOT WORK!")
logging.error("Please try to re-join the Domain.")
return False
logging.info("The domain join is working!")
return True
def getJoinedDomains():
"""
Returns all joined domains
:return: Tuple (success, list of joined domians)
:rtype: tuple
"""
result = subprocess.run("realm list --name-only", stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
if result.returncode != 0:
logging.error("Failed to read domain joins!")
return False, None
return True, list(filter(None, result.stdout.split("\n")))
def discoverDomains():
"""
Searches for avialable domains on the current network
:return: Tuple (success, list of available domains)
:rtype: tuple
"""
result = subprocess.run("realm discover --name-only", stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
if result.returncode != 0:
logging.error("Failed to discover available domains!")
return False, None
return True, list(filter(None, result.stdout.split("\n")))
def getDomainConfig(domain):
"""
Looks up all relevant properties of a domain:
- domain controller IP
- domain name
:param domain: The domain to check
:type domain: str
:return: Tuple (success, dict with domain config)
:rtype: tuple
"""
result = subprocess.run("adcli info '{}'".format(domain), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
if result.returncode != 0:
logging.error("Failed to get details of domain {}!".format(domain))
return False, None
rawConfig = _readConfigFromString(result.stdout)
try:
rawDomainConfig = rawConfig["domain"]
except KeyError:
logging.error("Error when reading domain details")
return False, None
domainConfig = {}
try:
domainConfig["domain-controller"] = rawDomainConfig["domain-controller"]
domainConfig["domain-name"] = rawDomainConfig["domain-name"]
except KeyError:
logging.error("Error when reading domain details (2)")
return False, None
return True, domainConfig
def clearUserCache():
"""
Clears the local user cache
:return: True on success, False otherwise
:rtype: bool
"""
# clean sssd cache
logging.info("Cleaning sssd cache.")
subprocess.call(["sssctl", "cache-remove", "--stop", "--start", "--override"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
# --------------------
# - Helper functions -
# --------------------
def _readConfigFromString(string):
configParser = configparser.ConfigParser()
configParser.read_string(string)
return configParser

View file

@ -1,364 +0,0 @@
import os, re, sys, configparser, subprocess, shutil
from pathlib import Path
from linuxmusterLinuxclient7 import logging, constants, hooks, shares, config, user, templates, realm, fileHelper, printers, computer
def setup(domain=None, user=None):
"""
Sets up the client to be able to act in a linuxmuster environment
:param domain: The domain to join, defaults to the first discovered domain
:type domain: str, optional
:param user: The admin user for the join, defaults to global-admin
:type user: str, optional
:return: True on success, False otherwise
:rtype: bool
"""
logging.info('#### linuxmuster-linuxclient7 setup ####')
if not realm.clearUserCache():
return False
if not _cleanOldDomainJoins():
return False
rc, domain = _findDomain(domain)
if not rc:
return False
if user == None:
user = constants.defaultDomainAdminUser
if not _prepareNetworkConfiguration(domain):
return False
if not _deleteObsoleteFiles():
return False
if not templates.applyAll():
return False
if not _preparePam():
return False
if not _prepareServices():
return False
# Actually join domain!
print()
logging.info(f"#### Joining domain {domain} ####")
if not realm.join(domain, user):
return False
# copy server ca certificate in place
# This will also make sure that the domain join actually worked;
# mounting the sysvol will fail otherwise.
if not _installCaCertificate(domain, user):
return False
if not _adjustSssdConfiguration(domain):
return False
# run a final test
if not realm.verifyDomainJoin():
return False
print("\n\n")
logging.info(f"#### SUCCESSFULLY joined domain {domain} ####")
return True
def status():
"""
Checks the status of the client
:return: True on success, False otherwise
:rtype: bool
"""
logging.info('#### linuxmuster-linuxclient7 status ####')
if not isSetup():
logging.info("Not setup!")
return False
logging.info("Linuxmuster-linuxclient7 is setup!")
logging.info("Testing if domain is joined...")
logging.info("Checking joined domains")
rc, joinedDomains = realm.getJoinedDomains()
if not rc:
return False
print()
logging.info("Joined domains:")
for joinedDomain in joinedDomains:
logging.info(f"* {joinedDomain}")
print()
if len(joinedDomains) > 0 and not realm.verifyDomainJoin():
print()
# Give a little explination to our users :)
print("\n===============================================================================================")
print("This Computer is joined to a domain, but it was not possible to authenticate")
print("to the domain controller. There is an error with your domain join! The login WILL NOT WORK!")
print("Please try to re-join the domain using 'linuxmuster-linuxclient7 setup' and create a new image.")
print("===============================================================================================\n")
return False
elif len(joinedDomains) <= 0:
print()
logging.info('#### This client is not joined to any domain. ####')
print("#### To join a domain, run \"linuxmuster-linuxclient7 setup\" ####")
print()
logging.info('#### linuxmuster-linuxclient7 is fully setup and working! ####')
return True
def upgrade():
"""
Performs an upgrade of the linuxmuster-linuxclient7. This is executed after the package is updated.
:return: True on success, False otherwise
:rtype: bool
"""
if not isSetup():
logging.info("linuxmuster-linuxclient7 does not seem to be setup -> no upgrade is needed")
return True
logging.info('#### linuxmuster-linuxclient7 upgrade ####')
if not config.upgrade():
return False
if not _deleteObsoleteFiles():
return False
if not templates.applyAll():
return False
if not _prepareServices():
return False
rc, joinedDomains = realm.getJoinedDomains()
if not rc:
return False
for domain in joinedDomains:
_adjustSssdConfiguration(domain)
logging.info('#### linuxmuster-linuxclient7 upgrade SUCCESSFULL ####')
return True
def clean():
"""Removes all sensitive files like keys and leaves all domain joins.
"""
logging.info("#### linuxmuster-linuxclient7 clean ####")
realm.clearUserCache()
_cleanOldDomainJoins()
# clean /etc/pam.d/common-session
logging.info("Cleaning /etc/pam.d/common-session to prevent logon brick")
fileHelper.removeLinesInFileContainingString("/etc/pam.d/common-session", ["pam_mkhomedir.so", "pam_exec.so", "pam_mount.so", "linuxmuster.net", "linuxmuster-linuxclient7", "linuxmuster-client-adsso"])
logging.info('#### linuxmuster-linuxclient7 clean SUCCESSFULL ####')
def isSetup():
"""
Checks if the client is setup.
:return: True if setup, False otherwise
:rtype: bool
"""
return os.path.isfile(constants.networkConfigFilePath)
# --------------------
# - Helper functions -
# --------------------
def _cleanOldDomainJoins():
# stop sssd
logging.info("Stopping sssd")
if subprocess.call(["service", "sssd", "stop"]) != 0:
logging.error("Failed!")
return False
# Clean old domain join data
logging.info("Deleting old kerberos tickets.")
subprocess.call(["kdestroy"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if not realm.leaveAll():
return False
# delete krb5.keytab file, if existent
logging.info('Deleting krb5.keytab if it exists ... ')
if not fileHelper.deleteFile("/etc/krb5.keytab"):
return False
# delete old CA Certificate
logging.info('Deleting old CA certificate if it exists ... ')
if not fileHelper.deleteFilesWithExtension("/var/lib/samba/private/tls", ".pem"):
return False
# remove network.conf
logging.info(f"Deleting {constants.networkConfigFilePath} if exists ...")
if not fileHelper.deleteFile(constants.networkConfigFilePath):
return False
return True
def _findDomain(domain=None):
logging.info("Trying to discover available domains...")
rc, availableDomains = realm.discoverDomains()
if not rc or len(availableDomains) < 1:
logging.error("Could not discover any domain!")
return False, None
if domain == None:
domain = availableDomains[0]
logging.info(f"Using first discovered domain {domain}")
elif domain in availableDomains:
logging.info(f"Using domain {domain}")
else:
print("\n")
logging.error(f"Could not find domain {domain}!")
return False, None
return True, domain
def _prepareNetworkConfiguration(domain):
logging.info("Preparing network configuration")
rc, domainConfig = realm.getDomainConfig(domain)
if not rc:
logging.error("Could not read domain configuration")
return False
newNetworkConfig = {}
newNetworkConfig["serverHostname"] = domainConfig["domain-controller"]
newNetworkConfig["domain"] = domainConfig["domain-name"]
newNetworkConfig["realm"] = domainConfig["domain-name"].upper()
config.writeNetworkConfig(newNetworkConfig)
return True
def _preparePam():
# enable necessary pam modules
logging.info('Updating pam configuration ... ')
subprocess.call(['pam-auth-update', '--package', '--enable', 'libpam-mount', 'pwquality', 'sss', '--force'])
## mkhomedir was injected in template not using pam-auth-update
subprocess.call(['pam-auth-update', '--package', '--remove', 'krb5', 'mkhomedir', '--force'])
return True
def _prepareServices():
logging.info("Raloading systctl daemon")
subprocess.call(["systemctl", "daemon-reload"])
logging.info('Enabling services:')
services = ['linuxmuster-linuxclient7', 'smbd', 'nmbd', 'sssd']
for service in services:
logging.info('* %s' % service)
subprocess.call(['systemctl','enable', service + '.service'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logging.info('Restarting services:')
services = ['smbd', 'nmbd', 'systemd-timesyncd']
for service in services:
logging.info('* %s' % service)
subprocess.call(['systemctl', 'restart' , service + '.service'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
def _installCaCertificate(domain, user):
logging.info('Installing server ca certificate ... ')
# try to mount the share
rc, sysvolMountpoint = shares.getLocalSysvolPath()
if not rc:
logging.error("Failed to mount sysvol!")
return False
cacertPath = f"{sysvolMountpoint}/{domain}/tls/cacert.pem"
cacertTargetPath = f"/var/lib/samba/private/tls/{domain}.pem"
logging.info("Copying CA certificate from server to client!")
try:
Path(Path(cacertTargetPath).parent.absolute()).mkdir(parents=True, exist_ok=True)
shutil.copyfile(cacertPath, cacertTargetPath)
except Exception as e:
logging.error("Failed!")
logging.exception(e)
return False
# make sure the file was successfully copied
if not os.path.isfile(cacertTargetPath):
logging.error('Failed to copy over CA certificate!')
return False
# unmount sysvol
shares.unmountAllSharesOfUser(computer.krbHostName())
return True
def _adjustSssdConfiguration(domain):
logging.info("Adjusting sssd.conf")
sssdConfigFilePath = '/etc/sssd/sssd.conf'
sssdConfig = configparser.ConfigParser(interpolation=None)
sssdConfig.read(sssdConfigFilePath)
# accept usernames without domain
sssdConfig[f"domain/{domain}"]["use_fully_qualified_names"] = "False"
# override homedir
sssdConfig[f"domain/{domain}"]["override_homedir"] = "/home/%u"
# Don't validate KVNO! Otherwise the Login will fail when the KVNO stored
# in /etc/krb5.keytab does not match the one in the AD (msDS-KeyVersionNumber)
sssdConfig[f"domain/{domain}"]["krb5_validate"] = "False"
sssdConfig[f"domain/{domain}"]["ad_gpo_access_control"] = "permissive"
sssdConfig[f"domain/{domain}"]["ad_gpo_ignore_unreadable"] = "True"
# Don't renew the machine password, as this will break the domain join
# See: https://github.com/linuxmuster/linuxmuster-linuxclient7/issues/27
sssdConfig[f"domain/{domain}"]["ad_maximum_machine_account_password_age"] = "0"
# Make sure usernames are not case sensitive
sssdConfig[f"domain/{domain}"]["case_sensitive"] = "False"
try:
logging.info("Writing new Configuration")
with open(sssdConfigFilePath, 'w') as sssdConfigFile:
sssdConfig.write(sssdConfigFile)
except Exception as e:
logging.error("Failed!")
logging.exception(e)
return False
logging.info("Restarting sssd")
if subprocess.call(["service", "sssd", "restart"]) != 0:
logging.error("Failed!")
return False
return True
def _deleteObsoleteFiles():
# files
logging.info("Deleting obsolete files")
for obsoleteFile in constants.obsoleteFiles:
logging.info(f"* {obsoleteFile}")
fileHelper.deleteFile(obsoleteFile)
# directories
logging.info("Deleting obsolete directories")
for obsoleteDirectory in constants.obsoleteDirectories:
logging.info(f"* {obsoleteDirectory}")
fileHelper.deleteDirectory(obsoleteDirectory)
return True

View file

@ -1,256 +0,0 @@
import os, pwd, sys, shutil, re, subprocess, shutil
from linuxmusterLinuxclient7 import logging, constants, user, config, computer
from pathlib import Path
def mountShare(networkPath, shareName = None, hiddenShare = False, username = None):
"""
Mount a given path of a samba share
:param networkPath: Network path of the share
:type networkPath: str
:param shareName: The name of the share (name of the folder the share is being mounted to)
:type shareName: str
:param hiddenShare: If the share sould be visible in Nautilus
:type hiddenShare: bool
:param username: The user in whoms context the share should be mounted
:type username: str
:return: Tuple: (success, mountpoint)
:rtype: tuple
"""
networkPath = networkPath.replace("\\", "/")
username = _getDefaultUsername(username)
shareName = _getDefaultShareName(networkPath, shareName)
if user.isRoot():
return _mountShare(username, networkPath, shareName, hiddenShare, True)
else:
mountpoint = _getShareMountpoint(networkPath, username, hiddenShare, shareName)
# This will call _mountShare() directly with root privileges
return _mountShareWithoutRoot(networkPath, shareName, hiddenShare), mountpoint
def getMountpointOfRemotePath(remoteFilePath, hiddenShare = False, username = None, autoMount = True):
"""
Get the local path of a remote samba share path.
This function automatically checks if the shares is already mounted.
It optionally automatically mounts the top path of the remote share:
If the remote path is `//server/sysvol/linuxmuster.lan/Policies` it mounts `//server/sysvol`
:param remoteFilePath: Remote path
:type remoteFilePath: str
:param hiddenShare: If the share sould be visible in Nautilus
:type hiddenShare: bool
:param username: The user in whoms context the share should be mounted
:type username: str
:parama autoMount: If the share should be mouted automatically if it is not already mounted
:type autoMount: bool
:return: Tuple: (success, mountpoint)
:rtype: tuple
"""
remoteFilePath = remoteFilePath.replace("\\", "/")
username = _getDefaultUsername(username)
# get basepath fo remote file path
# this turns //server/sysvol/linuxmuster.lan/Policies into //server/sysvol
pattern = re.compile("(^\\/\\/[^\\/]+\\/[^\\/]+)")
match = pattern.search(remoteFilePath)
if match is None:
logging.error("Cannot get local file path of {} beacuse it is not a valid path!".format(remoteFilePath))
return False, None
shareBasepath = match.group(0)
if autoMount:
rc, mointpoint = mountShare(shareBasepath, hiddenShare=hiddenShare, username=username)
if not rc:
return False, None
# calculate local path
shareMountpoint = _getShareMountpoint(shareBasepath, username, hiddenShare, shareName=None)
localFilePath = remoteFilePath.replace(shareBasepath, shareMountpoint)
return True, localFilePath
def unmountAllSharesOfUser(username):
"""
Unmount all shares of a given user and safely delete the mountpoints and the parent directory.
:param username: The username of the user
:type username: str
:return: True or False
:rtype: bool
"""
logging.info("=== Trying to unmount all shares of user {0} ===".format(username))
for basedir in [constants.shareMountBasepath, constants.hiddenShareMountBasepath]:
shareMountBasedir = basedir.format(username)
try:
mountedShares = os.listdir(shareMountBasedir)
except FileNotFoundError:
logging.info("Mount basedir {} does not exist -> nothing to unmount".format(shareMountBasedir))
continue
for share in mountedShares:
_unmountShare("{0}/{1}".format(shareMountBasedir, share))
if len(os.listdir(shareMountBasedir)) > 0:
logging.warning("* Mount basedir {} is not empty so not removed!".format(shareMountBasedir))
return False
else:
# Delete the directory
logging.info("Deleting {0}...".format(shareMountBasedir))
try:
os.rmdir(shareMountBasedir)
except Exception as e:
logging.error("FAILED!")
logging.exception(e)
return False
logging.info("===> Finished unmounting all shares of user {0} ===".format(username))
return True
def getLocalSysvolPath():
"""
Get the local mountpoint of the sysvol
:return: Full path of the mountpoint
:rtype: str
"""
rc, networkConfig = config.network()
if not rc:
return False, None
networkPath = f"//{networkConfig['serverHostname']}/sysvol"
return getMountpointOfRemotePath(networkPath, True)
# --------------------
# - Helper functions -
# --------------------
# useCruidOfExecutingUser:
# defines if the ticket cache of the user executing the mount command should be used.
# If set to False, the cache of the user with the given username will be used.
# This parameter influences the `cruid` mount option.
def _mountShare(username, networkPath, shareName, hiddenShare, useCruidOfExecutingUser=False):
mountpoint = _getShareMountpoint(networkPath, username, hiddenShare, shareName)
mountCommandOptions = f"file_mode=0700,dir_mode=0700,sec=krb5,nodev,nosuid,mfsymlinks,nobrl,vers=3.0,user={username}"
rc, networkConfig = config.network()
domain = None
if rc:
domain = networkConfig["domain"]
mountCommandOptions += f",domain={domain.upper()}"
try:
pwdInfo = pwd.getpwnam(username)
uid = pwdInfo.pw_uid
gid = pwdInfo.pw_gid
mountCommandOptions += f",gid={gid},uid={uid}"
if not useCruidOfExecutingUser:
mountCommandOptions += f",cruid={uid}"
except KeyError:
uid = -1
gid = -1
logging.warning("Uid could not be found! Continuing anyway!")
mountCommand = [shutil.which("mount.cifs"), "-o", mountCommandOptions, networkPath, mountpoint]
logging.debug(f"Trying to mount '{networkPath}' to '{mountpoint}'")
logging.debug("* Creating directory...")
try:
Path(mountpoint).mkdir(parents=True, exist_ok=False)
except FileExistsError:
# Test if a share is already mounted there
if _directoryIsMountpoint(mountpoint):
logging.debug("* The mountpoint is already mounted.")
return True, mountpoint
else:
logging.warning("* The target directory already exists, proceeding anyway!")
logging.debug("* Executing '{}' ".format(" ".join(mountCommand)))
logging.debug("* Trying to mount...")
if not subprocess.call(mountCommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0:
logging.fatal(f"* Error mounting share {networkPath} to {mountpoint}!\n")
return False, None
logging.debug("* Success!")
# hide the shares parent dir (/home/%user/media) in case it is not a hidden share
if not hiddenShare:
try:
hiddenFilePath = f"{mountpoint}/../../.hidden"
logging.debug(f"* hiding parent dir {hiddenFilePath}")
hiddenFile = open(hiddenFilePath, "w+")
hiddenFile.write(mountpoint.split("/")[-2])
hiddenFile.close()
except:
logging.warning(f"Could not hide parent dir of share {mountpoint}")
return True, mountpoint
def _unmountShare(mountpoint):
# check if mountpoint exists
if (not os.path.exists(mountpoint)) or (not os.path.isdir(mountpoint)):
logging.warning(f"* Could not unmount {mountpoint}, it does not exist.")
# Try to unmount share
logging.info("* Trying to unmount {0}...".format(mountpoint))
if not subprocess.call(["umount", mountpoint]) == 0:
logging.warning("* Failed!")
if _directoryIsMountpoint(mountpoint):
logging.warning("* It is still mounted! Exiting!")
# Do not delete in this case! We might delete userdata!
return
logging.info("* It is not mounted! Continuing!")
# check if the mountpoint is empty
if len(os.listdir(mountpoint)) > 0:
logging.warning("* mountpoint {} is not empty so not removed!".format(mountpoint))
return
# Delete the directory
logging.info("* Deleting {0}...".format(mountpoint))
try:
os.rmdir(mountpoint)
except Exception as e:
logging.error("* FAILED!")
logging.exception(e)
def _getDefaultUsername(username=None):
if username == None:
if user.isRoot():
username = computer.hostname().upper() + "$"
else:
username = user.username()
return username
def _getDefaultShareName(networkPath, shareName=None):
if shareName is None:
shareName = networkPath.split("/")[-1]
return shareName
def _mountShareWithoutRoot(networkPath, name, hidden):
mountCommand = ["sudo", "/usr/share/linuxmuster-linuxclient7/scripts/sudoTools", "mount-share", "--path", networkPath, "--name", name]
if hidden:
mountCommand.append("--hidden")
return subprocess.call(mountCommand) == 0
def _getShareMountpoint(networkPath, username, hidden, shareName = None):
logging.debug(f"Calculating mountpoint of {networkPath}")
shareName = _getDefaultShareName(networkPath, shareName)
if hidden:
return "{0}/{1}".format(constants.hiddenShareMountBasepath.format(username), shareName)
else:
return "{0}/{1}".format(constants.shareMountBasepath.format(username), shareName)
def _directoryIsMountpoint(dir):
return subprocess.call(["mountpoint", "-q", dir]) == 0

View file

@ -1,128 +0,0 @@
import os, codecs, sys, shutil, subprocess
from pathlib import Path
from linuxmusterLinuxclient7 import logging, constants, hooks, config
def applyAll():
"""
Applies all templates from `/usr/share/linuxmuster-linuxclient7/templates`
:return: True on success, False otherwise
:rtype: bool
"""
logging.info('Applying all configuration templates:')
templateDir = constants.configFileTemplateDir
for templateFile in os.listdir(templateDir):
templatePath = templateDir + '/' + templateFile
logging.info('* ' + templateFile + ' ...')
if not _apply(templatePath):
logging.error("Aborting!")
return False
# reload sctemctl
logging.info('Reloading systemctl ... ')
if not subprocess.call(["systemctl", "daemon-reload"]) == 0:
logging.error("Failed!")
return False
return True
# --------------------
# - Helper functions -
# --------------------
def _apply(templatePath):
try:
# read template file
rc, fileData = _readTextfile(templatePath)
if not rc:
logging.error('Failed!')
return False
fileData = _resolveVariables(fileData)
# get target path
firstLine = fileData.split('\n')[0]
targetFilePath = firstLine.partition(' ')[2]
# remove first line (the target file path)
fileData = fileData[fileData.find('\n'):]
# never ever overwrite sssd.conf, this will lead to issues!
# sssd.conf is written by `realm join`!
if targetFilePath in constants.notTemplatableFiles:
logging.warning("Skipping forbidden file {}".format(targetFilePath))
return True
# create target directory
Path(Path(targetFilePath).parent.absolute()).mkdir(parents=True, exist_ok=True)
# remove comment lines beginning with # from .xml files
if targetFilePath.endswith('.xml'):
fileData = _stripComment(fileData)
# write config file
logging.debug("-> to {}".format(targetFilePath))
with open(targetFilePath, 'w') as targetFile:
targetFile.write(fileData)
return True
except Exception as e:
logging.error('Failed!')
logging.exception(e)
return False
def _resolveVariables(fileData):
# replace placeholders with values
rc, networkConfig = config.network()
if not rc:
return False, None
# network
fileData = fileData.replace('@@serverHostname@@', networkConfig["serverHostname"])
fileData = fileData.replace('@@domain@@', networkConfig["domain"])
fileData = fileData.replace('@@realm@@', networkConfig["realm"])
# constants
fileData = fileData.replace('@@userTemplateDir@@', constants.userTemplateDir)
fileData = fileData.replace('@@hiddenShareMountBasepath@@', constants.hiddenShareMountBasepath.format("%(USER)"))
# hooks
fileData = fileData.replace('@@hookScriptBoot@@', hooks.getLocalHookScript(hooks.Type.Boot))
fileData = fileData.replace('@@hookScriptShutdown@@', hooks.getLocalHookScript(hooks.Type.Shutdown))
fileData = fileData.replace('@@hookScriptLoginLogoutAsRoot@@', hooks.getLocalHookScript(hooks.Type.LoginLogoutAsRoot))
fileData = fileData.replace('@@hookScriptSessionStarted@@', hooks.getLocalHookScript(hooks.Type.SessionStarted))
return fileData
# read textfile in variable
def _readTextfile(filePath):
if not os.path.isfile(filePath):
return False, None
try:
infile = codecs.open(filePath ,'r', encoding='utf-8', errors='ignore')
content = infile.read()
infile.close()
return True, content
except Exception as e:
logging.info('Cannot read ' + filePath + '!')
logging.exception(e)
return False, None
# remove lines beginning with #
def _stripComment(fileData):
filedata_stripped = ''
for line in fileData.split('\n'):
if line[:1] == '#':
continue
else:
if filedata_stripped == '':
filedata_stripped = line
else:
filedata_stripped = filedata_stripped + '\n' + line
return filedata_stripped

View file

@ -1,158 +0,0 @@
import ldap, ldap.sasl, sys, getpass, subprocess, pwd, os, os.path
from pathlib import Path
from linuxmusterLinuxclient7 import logging, constants, config, user, ldapHelper, shares, fileHelper, computer, localUserHelper
def readAttributes():
"""
Reads all attributes of the current user from ldap
:return: Tuple (success, dict of user attributes)
:rtype: tuple
"""
if not user.isInAD():
return False, None
return ldapHelper.searchOne(f"(sAMAccountName={user.username()})")
def school():
"""
Gets the school of the current user from the AD
:return: The short name of the school
:rtype: str
"""
rc, userdata = readAttributes()
if not rc:
return False, None
return True, userdata["sophomorixSchoolname"]
def username():
"""
Returns the user of the current user
:return: The username of the current user
:rtype: str
"""
return getpass.getuser().lower()
def isUserInAD(user):
"""
Checks if a given user is an AD user.
:param user: The username of the user to check
:type user: str
:return: True if the user is in the AD, False if it is a local user
:rtype: bool
"""
if not computer.isInAD():
return False
rc, groups = localUserHelper.getGroupsOfLocalUser(user)
if not rc:
return False
return "domain users" in groups
def isInAD():
"""Checks if the current user is an AD user.
:return: True if the user is in the AD, False if it is a local user
:rtype: bool
"""
return isUserInAD(username())
def isRoot():
"""
Checks if the current user is root
:return: True if the current user is root, False otherwise
:rtype: bool
"""
return os.geteuid() == 0
def isInGroup(groupName):
"""
Checks if the current user is part of a given group
:param groupName: The name of the group
:type groupName: str
:return: True if the user is part of the group, False otherwise
:rtype: bool
"""
rc, groups = localUserHelper.getGroupsOfLocalUser(username())
if not rc:
return False
return groupName in groups
def cleanTemplateUserGtkBookmarks():
"""Remove gtk bookmarks of the template user from the current users `~/.config/gtk-3.0/bookmarks` file.
"""
logging.info("Cleaning {} gtk bookmarks".format(constants.templateUser))
gtkBookmarksFile = "/home/{0}/.config/gtk-3.0/bookmarks".format(user.username())
if not os.path.isfile(gtkBookmarksFile):
logging.warning("Gtk bookmarks file not found, skipping!")
return
fileHelper.removeLinesInFileContainingString(gtkBookmarksFile, constants.templateUser)
def getHomeShareMountpoint():
"""
Returns the mountpoint of the users serverhome.
:return: The monutpoint of the users serverhome
:rtype: str
"""
rc, homeShareName = _getHomeShareName()
if rc:
basePath = constants.shareMountBasepath.format(username())
return True, f"{basePath}/{homeShareName}"
return False, None
def mountHomeShare():
"""
Mounts the serverhome of the current user
:return: True on success, False otherwise
:rtype: bool
"""
rc1, userAttributes = readAttributes()
rc2, shareName = _getHomeShareName(userAttributes)
if rc1 and rc2:
try:
homeShareServerPath = userAttributes["homeDirectory"]
res = shares.mountShare(homeShareServerPath, shareName=shareName, hiddenShare=False, username=username())
return res
except Exception as e:
logging.error("Could not mount home dir of user")
logging.exception(e)
return False, None
# --------------------
# - Helper functions -
# --------------------
def _getHomeShareName(userAttributes=None):
if userAttributes is None:
rc, userAttributes = readAttributes()
else:
rc = True
if rc:
try:
usernameString = username()
shareName = f"{usernameString} ({userAttributes['homeDrive']})"
return True, shareName
except Exception as e:
logging.error("Could not mount home dir of user")
logging.exception(e)
return False, None

View file

@ -1 +1 @@
[[ "${UID}" -gt 10000 ]] && /usr/local/bin/onLogin
[[ "${UID}" -gt 10000 ]] && sudo /usr/local/bin/install-printers.sh

View file

@ -1,33 +0,0 @@
#!/usr/bin/python3
# DO NOT MODIFY THIS SCRIPT!
# For custom scripts use the hookdir /etc/linuxmuster-linuxclient7/onLogin.d
# This schript is called in user context when a user logs in
try:
import os, sys
#import traceback
from linuxmusterLinuxclient7 import logging, hooks, shares, user, constants, gpo, computer, environment
logging.info("====== onLogin started ======")
# mount sysvol
rc, sysvolPath = shares.getLocalSysvolPath()
if rc:
environment.export(f"SYSVOL={sysvolPath}")
# process GPOs
gpo.processAllPolicies()
logging.info("======> onLogin end ======")
except Exception as e:
try:
#traceback.print_exc()
logging.exception(e)
except:
print("A fatal error occured!")
# We need to catch all exceptions and return 0 in any case!
# If we do not return 0, login will FAIL FOR EVERYONE!
sys.exit(0)

View file

@ -1,38 +0,0 @@
#!/usr/bin/python3
# DO NOT MODIFY THIS SCRIPT!
# For custom scripts use the hookdirs
# /etc/linuxmuster-linuxclient7/onLoginAsRoot.d
# and /etc/linuxmuster-linuxclient7/onLogoutAsRoot.d
# This schript is called in root context when a user logs in or out
try:
import os, sys
from linuxmusterLinuxclient7 import logging, hooks, constants, user, shares, printers, computer, realm
pamType = os.getenv("PAM_TYPE")
pamUser = os.getenv("PAM_USER")
#PAM_RHOST, PAM_RUSER, PAM_SERVICE, PAM_TTY, PAM_USER and PAM_TYPE
logging.info("====== onLoginLogoutAsRoot started with PAM_TYPE={0} PAM_RHOST={1} PAM_RUSER={2} PAM_SERVICE={3} PAM_TTY={4} PAM_USER={5} ======".format(pamType, os.getenv("PAM_RHOST"), os.getenv("PAM_RUSER"), os.getenv("PAM_SERVICE"), os.getenv("PAM_TTY"), pamUser))
# check if whe should execute
if not hooks.shouldHooksBeExecuted(pamUser):
logging.info("======> onLoginLogoutAsRoot end ====")
sys.exit(0)
elif pamType == "close_session":
# cleanup
printers.uninstallAllPrintersOfUser(pamUser)
logging.info("======> onLoginLogoutAsRoot end ======")
except Exception as e:
try:
logging.exception(e)
except:
print("A fatal error occured!")
# We need to catch all exceptions and return 0 in any case!
# If we do not return 0, login will FAIL FOR EVERYONE!
sys.exit(0)

View file

@ -1,6 +0,0 @@
[Unit]
Description=Remove all printers
[Service]
Type=simple
ExecStart=sh -c 'lpstat -p && for printer in $(lpstat -p | cut -f 2 -d " "); do lpadmin -x $printer; done || echo no printer found.'

View file

@ -1,8 +0,0 @@
[Unit]
Description=Remove all printers on boot
[Timer]
OnBootSec=10
[Install]
WantedBy=timers.target

View file

@ -1,50 +0,0 @@
#!/usr/bin/python3
#
# Script to do some things that require root permissions as a normal user
# Currently used for:
# - mounting shares
# - installing printers
#
import os, sys, argparse
from linuxmusterLinuxclient7 import shares, printers, constants
parser = argparse.ArgumentParser(description="Script to do some things that require root permissions as a normal user")
subparsers = parser.add_subparsers(title='Tasks', metavar="<task>", help="The task to execute", dest="task", required=True)
subparserCache = subparsers.add_parser('install-printer', help='install a printer')
requiredGroupCache = subparserCache.add_argument_group('required arguments')
requiredGroupCache.add_argument("--path", help="The network path of the printer", required=True)
requiredGroupCache.add_argument("--name", help="The name of the printer", required=True)
subparserCache = subparsers.add_parser('mount-share', help='mount a network share')
subparserCache.add_argument("--hidden", help="Hide this share", action='store_true')
requiredGroupCache = subparserCache.add_argument_group('required arguments')
requiredGroupCache.add_argument("--path", help="The network path of the share", required=True)
requiredGroupCache.add_argument("--name", help="The name of the share", required=True)
args = parser.parse_args()
task = args.task
if not os.geteuid() == 0:
print("This script has to be run using sudo!")
exit(1)
username = os.getenv("SUDO_USER")
if task == "install-printer":
if printers.installPrinter(args.path, name=args.name, username=username):
sys.exit(0)
else:
sys.exit(1)
pass
elif task == "mount-share":
if shares._mountShare(username, args.path, args.name, args.hidden, False):
sys.exit(0)
else:
sys.exit(1)
exit(0)

View file

@ -3,7 +3,6 @@
apt:
name:
- cups
- python3-ldap
state: latest
- name: Disable cups printer browsing
@ -19,66 +18,20 @@
state: stopped
enabled: no
- name: Configure pam_mount sysvol mount
blockinfile:
dest: /etc/security/pam_mount.conf.xml
marker: "<!-- {mark} ANSIBLE MANAGED BLOCK (SysVol) -->"
block: |
<volume
fstype="cifs"
server="{{ smb_server }}"
path="sysvol/"
mountpoint="/srv/samba/%(USER)/sysvol"
options="sec=krb5i,cruid=%(USERUID),user=%(USER),gid=1010,file_mode=0770,dir_mode=0770,mfsymlinks"
><not><or><user>root</user><user>ansible</user><user>Debian-gdm</user><user>sddm</user><user>{{ localuser }}</user></or></not>
</volume>
insertafter: "<!-- Volume definitions -->"
- name: Create /etc/linuxmuster-linuxclient7 Directory
file:
path: /etc/linuxmuster-linuxclient7
state: directory
mode: 0755
- name: install linuxmuster-linuxclient network.conf
- name: install install-printers.sh
template:
src: network.conf.j2
dest: /etc/linuxmuster-linuxclient7/network.conf
mode: 0644
- name: install linuxmuster-linuxclient python libs
copy :
src: linuxmusterLinuxclient7
dest: /usr/lib/python3/dist-packages
- name: Create /usr/share/linuxmuster-linuxclient7/scripts Directory
file:
path: /usr/share/linuxmuster-linuxclient7/scripts
state: directory
src: install-printers.sh.j2
dest: /usr/local/bin/install-printers.sh
mode: 0755
- name: install linuxmuster-scripts
- name: install lmn-install-printers sudoers
copy:
src: scripts/sudoTools
dest: /usr/share/linuxmuster-linuxclient7/scripts/
mode: 0755
- name: install lmn-sudotools
copy:
src: 90-lmn-sudotools
src: 90-lmn-install-printers
dest: /etc/sudoers.d/
mode: 0660
owner: root
group: root
- name: install onLogin script
copy :
src: onLogin
dest: /usr/local/bin/
mode: 0755
owner: root
group: root
- name: install lmn-printer.sh in /etc/profile.d/
copy:
src: lmn-printer.sh
@ -86,18 +39,3 @@
mode: 0644
owner: root
group: root
- name: Provide service and timer to remove all printers on boot
copy:
src: "{{ item }}"
dest: "/etc/systemd/system/{{ item }}"
mode: 0644
with_items:
- rmlpr.service
- rmlpr.timer
- name: enable rmlpr.timer
systemd:
name: rmlpr.timer
enabled: true

View file

@ -0,0 +1,36 @@
#!/usr/bin/bash
set -eu
printservers="{{ printservers | join(' ') }}"
hostname=$(hostname)
hostgroup=$(id -Gn "${hostname^^}$")
usergroup=$(id -Gn "${SUDO_USER}")
installedprinters=$(lpstat -p | cut -f 2 -d" " | sed -z 's/\n/ /g' )
echo "Hostgroups: ${hostgroup}"
echo "Usergroups: ${usergroup}"
echo "Installed Printers: ${installedprinters}"
echo
for printer in $installedprinters; do
if ! $(echo "${hostgroup}" | grep -w -q "${printer}") && ! $(echo "${usergroup}" | grep -w -q "${printer}") ; then
lpadmin -x "${printer}"
fi
done
for printserver in $printservers; do
echo "checking Server: $printserver"
printers=$(lpstat -h "${printserver}" -U "${SUDO_USER}" -v | cut -f 3 -d" " | sed 's/:$//g' | sed -z 's/\n/ /g' )
echo "Available Printers: $printers"
for printer in $printers; do
if $(echo "${hostgroup}" | grep -w -q "${printer}") || $(echo "${usergroup}" | grep -w -q "${printer}") ; then
if ! $(echo "${installedprinters}" | grep -w -q "${printer}"); then
echo "Adding ${printer}"
timeout 10 lpadmin -p "${printer}" -E -v "ipp://${printserver}/printers/${printer}" -m everywhere || echo "Printer ${printer} could not be added"
installedprinters+=" ${printer}"
fi
fi
done
done

View file

@ -1,5 +0,0 @@
[network]
serverHostname = server
domain = pn.steinbeis.schule
realm = PN.STEINBEIS.SCHULE
version = 1