X-Git-Url: https://git.adam-barratt.org.uk/?a=blobdiff_plain;f=sigcheck;h=2561f1ce59d3bf3e7747bded10a68f1b4230706d;hb=71597847edb263c9837155db7ace4af5acce726e;hp=57c4a9034767831be1a3eec86c308a0d37614e3b;hpb=7e02135e1259847ac592fc58a408452f2e9e41af;p=mirror%2Fuserdir-ldap.git diff --git a/sigcheck b/sigcheck index 57c4a90..2561f1c 100755 --- a/sigcheck +++ b/sigcheck @@ -26,11 +26,11 @@ import sys, traceback, time, os; import pwd, getopt; +import email, email.parser from userdir_gpg import *; EX_TEMPFAIL = 75; EX_PERMFAIL = 65; # EX_DATAERR -Error = 'Message Error'; # Configuration ReplayCacheFile = None; @@ -41,6 +41,9 @@ Phrases = None; AllowMIME = 1; Verbose = 0; +class MessageError(Exception): + pass + def verbmsg(msg): if Verbose: sys.stderr.write(msg + "\n") @@ -48,22 +51,23 @@ def verbmsg(msg): # Match the key fingerprint against an LDAP directory def CheckLDAP(FingerPrint): import ldap; + import userdir_ldap; # Connect to the ldap server global ErrTyp, ErrMsg; ErrType = EX_TEMPFAIL; ErrMsg = "An error occurred while performing the LDAP lookup:"; global l; - l = ldap.open(LDAPServer); + l = userdir_ldap.connectLDAP(LDAPServer); l.simple_bind_s("",""); # Search for the matching key fingerprint verbmsg("Processing fingerprint %s" % FingerPrint) Attrs = l.search_s(LDAPDn,ldap.SCOPE_ONELEVEL,"keyfingerprint=" + FingerPrint); if len(Attrs) == 0: - raise Error, "Key not found" + raise MessageError("Key not found") if len(Attrs) != 1: - raise Error, "Oddly your key fingerprint is assigned to more than one account.." + raise MessageError("Oddly your key fingerprint is assigned to more than one account..") gidnumber_found = 0; for key in Attrs[0][1].keys(): @@ -71,16 +75,16 @@ def CheckLDAP(FingerPrint): gidnumber_found = 1 if (gidnumber_found != 1): - raise Error, "No gidnumber in attributes for fingerprint %s" % FingerPrint + raise MessageError("No gidnumber in attributes for fingerprint %s" % FingerPrint) # Look for the group with the gid of the user GAttr = l.search_s(LDAPDn,ldap.SCOPE_ONELEVEL,"(&(objectClass=debianGroup)(gidnumber=%s))" % Attrs[0][1]["gidNumber"][0], ["gid"]) if len(GAttr) == 0: - raise Error, "Database inconsistency found: main group for account not found in database" + raise MessageError("Database inconsistency found: main group for account not found in database") # See if the group membership is OK # Only if a group was given on the commandline - if GroupMember != None: + if GroupMember is not None: Hit = 0; # Check primary group first if GAttr[0][1]["gid"][0] == GroupMember: @@ -91,7 +95,7 @@ def CheckLDAP(FingerPrint): if x == GroupMember: Hit = 1; if Hit != 1: - raise Error, "You don't have %s group permissions."%(GroupMember); + raise MessageError("You don't have %s group permissions."%(GroupMember)) # Start of main program # Process options @@ -121,58 +125,51 @@ MsgID = None; try: # Startup the replay cache ErrType = EX_TEMPFAIL; - if ReplayCacheFile != None: + if ReplayCacheFile is not None: ErrMsg = "Failed to initialize the replay cache:"; RC = ReplayCache(ReplayCacheFile); - RC.Clean(); - + # Get the email ErrType = EX_PERMFAIL; ErrMsg = "Failed to understand the email or find a signature:"; - Email = mimetools.Message(sys.stdin,0); - MsgID = Email.getheader("Message-ID"); + mail = email.parser.Parser().parse(sys.stdin); + MsgID = mail["Message-ID"] + print "Inspecting message %s"%MsgID; verbmsg("Processing message %s" % MsgID) - Msg = GetClearSig(Email,1); - # print Msg + Msg = GetClearSig(mail,1); if AllowMIME == 0 and Msg[1] != 0: - raise Error, "PGP/MIME disallowed"; + raise MessageError("PGP/MIME disallowed") ErrMsg = "Message is not PGP signed:" if Msg[0].find("-----BEGIN PGP SIGNED MESSAGE-----") == -1: - raise Error, "No PGP signature"; + raise MessageError("No PGP signature") # Check the signature ErrMsg = "Unable to check the signature or the signature was invalid:"; - Res = GPGCheckSig(Msg[0]); + pgp = GPGCheckSig2(Msg[0]) - if Res[0] != None: - raise Error, Res[0]; - - if Res[3] == None: - raise Error, "Null signature text"; + if not pgp.ok: + raise UDFormatError(pgp.why) + if pgp.text is None: + raise UDFormatError("Null signature text") # Check the signature against the replay cache - if ReplayCacheFile != None: - ErrMsg = "The replay cache rejected your message. Check your clock!"; - Rply = RC.Check(Res[1]); - if Rply != None: - raise Error, Rply; - RC.Add(Res[1]); - RC.close(); + if ReplayCacheFile is not None: + RC.process(pgp.sig_info) # Do LDAP stuff - if LDAPDn != None: - CheckLDAP(Res[2][1]); - + if LDAPDn is not None: + CheckLDAP(pgp.key_fpr) + ErrMsg = "Verifying message:"; - if Phrases != None: + if Phrases is not None: F = open(Phrases,"r"); while 1: Line = F.readline(); if Line == "": break; - if Res[3].find(Line.strip()) == -1: - raise Error,"Phrase '%s' was not found" % (Line.strip()) + if pgp.text.find(Line.strip()) == -1: + raise MessageError("Phrase '%s' was not found" % (Line.strip())) except: ErrMsg = "[%s] \"%s\" \"%s %s\"\n"%(Now,MsgID,ErrMsg,sys.exc_value);