From: Julien Cristau Date: Fri, 11 Oct 2019 22:00:37 +0000 (+0200) Subject: sigcheck: pycodestyle fixes X-Git-Url: https://git.adam-barratt.org.uk/?p=mirror%2Fuserdir-ldap.git;a=commitdiff_plain;h=5878d0a3a48077c0f321d93256153728240cdf0e sigcheck: pycodestyle fixes --- diff --git a/sigcheck b/sigcheck index 6790836..cd65bc5 100755 --- a/sigcheck +++ b/sigcheck @@ -24,9 +24,15 @@ # -m debian.org -a admin@db.debian.org \ # -e /etc/userdir-ldap/templtes/error-reply -- test.sh -import sys, traceback, time, os -import pwd, getopt -import email, email.parser +import email +import email.parser +import getopt +import os +import pwd +import sys +import time +import traceback + from userdir_gpg import * EX_TEMPFAIL = 75 @@ -41,150 +47,154 @@ Phrases = None AllowMIME = 1 Verbose = 0 + class MessageError(Exception): - pass + pass + def verbmsg(msg): - if Verbose: - sys.stderr.write(msg + "\n") + if Verbose: + sys.stderr.write(msg + "\n") + # Match the key fingerprint against an LDAP directory def CheckLDAP(FingerPrint): - import ldap - import userdir_ldap - - # Connect to the ldap server - global ErrTyp, ErrMsg - ErrType = EX_TEMPFAIL - ErrMsg = "An error occurred while performing the LDAP lookup:" - global l - l = userdir_ldap.connectLDAP(LDAPServer) - l.simple_bind_s("","") - - # Search for the matching key fingerprint - verbmsg("Processing fingerprint %s" % FingerPrint) - Attrs = l.search_s(LDAPDn,ldap.SCOPE_ONELEVEL,"keyfingerprint=" + FingerPrint) - if len(Attrs) == 0: - raise MessageError("Key not found") - if len(Attrs) != 1: - raise MessageError("Oddly your key fingerprint is assigned to more than one account..") - - gidnumber_found = 0 - for key in Attrs[0][1].keys(): - if (key == "gidNumber"): - gidnumber_found = 1 - - if (gidnumber_found != 1): - raise MessageError("No gidnumber in attributes for fingerprint %s" % FingerPrint) - - # Look for the group with the gid of the user - GAttr = l.search_s(LDAPDn,ldap.SCOPE_ONELEVEL,"(&(objectClass=debianGroup)(gidnumber=%s))" % Attrs[0][1]["gidNumber"][0], ["gid"]) - if len(GAttr) == 0: - raise MessageError("Database inconsistency found: main group for account not found in database") - - # See if the group membership is OK - # Only if a group was given on the commandline - if GroupMember is not None: - Hit = 0 - # Check primary group first - if GAttr[0][1]["gid"][0] == GroupMember: - Hit = 1 - else: - # Check supplementary groups - for x in Attrs[0][1].get("supplementaryGid",[]): - if x == GroupMember: - Hit = 1 - if Hit != 1: - raise MessageError("You don't have %s group permissions."%(GroupMember)) - + import ldap + import userdir_ldap + + # Connect to the ldap server + global ErrTyp, ErrMsg + ErrType = EX_TEMPFAIL + ErrMsg = "An error occurred while performing the LDAP lookup:" + global l + l = userdir_ldap.connectLDAP(LDAPServer) + l.simple_bind_s("", "") + + # Search for the matching key fingerprint + verbmsg("Processing fingerprint %s" % FingerPrint) + Attrs = l.search_s(LDAPDn, ldap.SCOPE_ONELEVEL, "keyfingerprint=" + FingerPrint) + if len(Attrs) == 0: + raise MessageError("Key not found") + if len(Attrs) != 1: + raise MessageError("Oddly your key fingerprint is assigned to more than one account..") + + gidnumber_found = 0 + for key in Attrs[0][1].keys(): + if key == "gidNumber": + gidnumber_found = 1 + + if gidnumber_found != 1: + raise MessageError("No gidnumber in attributes for fingerprint %s" % FingerPrint) + + # Look for the group with the gid of the user + GAttr = l.search_s(LDAPDn, ldap.SCOPE_ONELEVEL, "(&(objectClass=debianGroup)(gidnumber=%s))" % Attrs[0][1]["gidNumber"][0], ["gid"]) + if len(GAttr) == 0: + raise MessageError("Database inconsistency found: main group for account not found in database") + + # See if the group membership is OK + # Only if a group was given on the commandline + if GroupMember is not None: + Hit = 0 + # Check primary group first + if GAttr[0][1]["gid"][0] == GroupMember: + Hit = 1 + else: + # Check supplementary groups + for x in Attrs[0][1].get("supplementaryGid", []): + if x == GroupMember: + Hit = 1 + if Hit != 1: + raise MessageError("You don't have %s group permissions." % GroupMember) + + # Start of main program # Process options -(options, arguments) = getopt.getopt(sys.argv[1:], "r:k:d:l:g:mp:v") +options, arguments = getopt.getopt(sys.argv[1:], "r:k:d:l:g:mp:v") for (switch, val) in options: - if (switch == '-r'): - ReplayCacheFile = val - elif (switch == '-k'): - SetKeyrings(val.split(":")) - elif (switch == '-d'): - LDAPDn = val - elif (switch == '-l'): - LDAPServer = val - elif (switch == '-g'): - GroupMember = val - elif (switch == '-m'): - AllowMIME = 0 - elif (switch == '-v'): - Verbose = 1 - elif (switch == '-p'): - Phrases = val - -Now = time.strftime("%a, %d %b %Y %H:%M:%S",time.gmtime(time.time())) + if (switch == '-r'): + ReplayCacheFile = val + elif (switch == '-k'): + SetKeyrings(val.split(":")) + elif (switch == '-d'): + LDAPDn = val + elif (switch == '-l'): + LDAPServer = val + elif (switch == '-g'): + GroupMember = val + elif (switch == '-m'): + AllowMIME = 0 + elif (switch == '-v'): + Verbose = 1 + elif (switch == '-p'): + Phrases = val + +Now = time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime(time.time())) ErrMsg = "Indeterminate Error" ErrType = EX_TEMPFAIL MsgID = None try: - # Startup the replay cache - ErrType = EX_TEMPFAIL - if ReplayCacheFile is not None: - ErrMsg = "Failed to initialize the replay cache:" - RC = ReplayCache(ReplayCacheFile) - - # Get the email - ErrType = EX_PERMFAIL - ErrMsg = "Failed to understand the email or find a signature:" - mail = email.parser.Parser().parse(sys.stdin) - MsgID = mail["Message-ID"] - - print "Inspecting message %s"%MsgID - verbmsg("Processing message %s" % MsgID) - Msg = GetClearSig(mail,1) - if AllowMIME == 0 and Msg[1] != 0: - raise MessageError("PGP/MIME disallowed") - - ErrMsg = "Message is not PGP signed:" - if Msg[0].find("-----BEGIN PGP SIGNED MESSAGE-----") == -1: - raise MessageError("No PGP signature") - - # Check the signature - ErrMsg = "Unable to check the signature or the signature was invalid:" - pgp = GPGCheckSig2(Msg[0]) - - if not pgp.ok: - raise UDFormatError(pgp.why) - if pgp.text is None: - raise UDFormatError("Null signature text") - - # Check the signature against the replay cache - if ReplayCacheFile is not None: - RC.process(pgp.sig_info) - - # Do LDAP stuff - if LDAPDn is not None: - CheckLDAP(pgp.key_fpr) - - ErrMsg = "Verifying message:" - if Phrases is not None: - F = open(Phrases,"r") - while 1: - Line = F.readline() - if Line == "": break - if pgp.text.find(Line.strip()) == -1: - raise MessageError("Phrase '%s' was not found" % (Line.strip())) - -except: - ErrMsg = "[%s] \"%s\" \"%s %s\"\n"%(Now,MsgID,ErrMsg,sys.exc_value) - sys.stderr.write(ErrMsg) - - Trace = "==> %s: %s\n" %(sys.exc_type,sys.exc_value) - List = traceback.extract_tb(sys.exc_traceback) - if len(List) >= 1: - Trace = Trace + "Python Stack Trace:\n" - for x in List: - Trace = Trace + " %s %s:%u: %s\n" %(x[2],x[0],x[1],x[3]) - #print Trace - - sys.exit(EX_PERMFAIL) - -# For Main -print "Message %s passed"%MsgID + # Startup the replay cache + ErrType = EX_TEMPFAIL + if ReplayCacheFile is not None: + ErrMsg = "Failed to initialize the replay cache:" + RC = ReplayCache(ReplayCacheFile) + + # Get the email + ErrType = EX_PERMFAIL + ErrMsg = "Failed to understand the email or find a signature:" + mail = email.parser.Parser().parse(sys.stdin) + MsgID = mail["Message-ID"] + + print "Inspecting message %s" % MsgID + verbmsg("Processing message %s" % MsgID) + Msg = GetClearSig(mail, 1) + if AllowMIME == 0 and Msg[1] != 0: + raise MessageError("PGP/MIME disallowed") + + ErrMsg = "Message is not PGP signed:" + if Msg[0].find("-----BEGIN PGP SIGNED MESSAGE-----") == -1: + raise MessageError("No PGP signature") + + # Check the signature + ErrMsg = "Unable to check the signature or the signature was invalid:" + pgp = GPGCheckSig2(Msg[0]) + + if not pgp.ok: + raise UDFormatError(pgp.why) + if pgp.text is None: + raise UDFormatError("Null signature text") + + # Check the signature against the replay cache + if ReplayCacheFile is not None: + RC.process(pgp.sig_info) + + # Do LDAP stuff + if LDAPDn is not None: + CheckLDAP(pgp.key_fpr) + + ErrMsg = "Verifying message:" + if Phrases is not None: + F = open(Phrases, "r") + while 1: + Line = F.readline() + if Line == "": + break + if pgp.text.find(Line.strip()) == -1: + raise MessageError("Phrase '%s' was not found" % (Line.strip())) + +except Exception: + ErrMsg = "[%s] \"%s\" \"%s %s\"\n" % (Now, MsgID, ErrMsg, sys.exc_value) + sys.stderr.write(ErrMsg) + + Trace = "==> %s: %s\n" % (sys.exc_type, sys.exc_value) + List = traceback.extract_tb(sys.exc_traceback) + if len(List) >= 1: + Trace = Trace + "Python Stack Trace:\n" + for x in List: + Trace = Trace + " %s %s:%u: %s\n" % (x[2], x[0], x[1], x[3]) + + sys.exit(EX_PERMFAIL) + +# For Main +print "Message %s passed" % MsgID sys.exit(0)