#!/usr/bin/env python # -*- mode: python -*- # # Check PGP signed emails # # This script verifies the signature on incoming mail for a couple of things # - That the signature is valid, recent and is not replay # - The signer is in the LDAP directory and is in the right group # - The message contains no extra text that is not signed. # # Options: # -r Replay cache file, if unset replay checking is disabled # -k Colon seperated list of keyrings to use # -d LDAP search base DN # -l LDAP server # -g supplementary group membership # -p File of Phrases that must be in the plaintext. # -m Disallow PGP/MIME # -v Verbose mode # Typical Debian invokation may look like: # sigcheck -k /usr/share/keyrings/debian-keyring.gpg:/usr/share/keyrings/debian-keyring.pgp \ # -d ou=users,dc=debian,dc=org -l db.debian.org \ # -m debian.org -a admin@db.debian.org \ # -e /etc/userdir-ldap/templtes/error-reply -- test.sh import sys, traceback, time, os; import pwd, getopt; import email, email.parser from userdir_gpg import *; EX_TEMPFAIL = 75; EX_PERMFAIL = 65; # EX_DATAERR Error = 'Message Error'; # Configuration ReplayCacheFile = None; LDAPDn = None; LDAPServer = None; GroupMember = None; Phrases = None; AllowMIME = 1; Verbose = 0; def verbmsg(msg): if Verbose: sys.stderr.write(msg + "\n") # Match the key fingerprint against an LDAP directory def CheckLDAP(FingerPrint): import ldap; import userdir_ldap; # Connect to the ldap server global ErrTyp, ErrMsg; ErrType = EX_TEMPFAIL; ErrMsg = "An error occurred while performing the LDAP lookup:"; global l; l = userdir_ldap.connectLDAP(LDAPServer); l.simple_bind_s("",""); # Search for the matching key fingerprint verbmsg("Processing fingerprint %s" % FingerPrint) Attrs = l.search_s(LDAPDn,ldap.SCOPE_ONELEVEL,"keyfingerprint=" + FingerPrint); if len(Attrs) == 0: raise Error, "Key not found" if len(Attrs) != 1: raise Error, "Oddly your key fingerprint is assigned to more than one account.." gidnumber_found = 0; for key in Attrs[0][1].keys(): if (key == "gidNumber"): gidnumber_found = 1 if (gidnumber_found != 1): raise Error, "No gidnumber in attributes for fingerprint %s" % FingerPrint # Look for the group with the gid of the user GAttr = l.search_s(LDAPDn,ldap.SCOPE_ONELEVEL,"(&(objectClass=debianGroup)(gidnumber=%s))" % Attrs[0][1]["gidNumber"][0], ["gid"]) if len(GAttr) == 0: raise Error, "Database inconsistency found: main group for account not found in database" # See if the group membership is OK # Only if a group was given on the commandline if GroupMember is not None: Hit = 0; # Check primary group first if GAttr[0][1]["gid"][0] == GroupMember: Hit = 1 else: # Check supplementary groups for x in Attrs[0][1].get("supplementaryGid",[]): if x == GroupMember: Hit = 1; if Hit != 1: raise Error, "You don't have %s group permissions."%(GroupMember); # Start of main program # Process options (options, arguments) = getopt.getopt(sys.argv[1:], "r:k:d:l:g:mp:v"); for (switch, val) in options: if (switch == '-r'): ReplayCacheFile = val; elif (switch == '-k'): SetKeyrings(val.split(":")); elif (switch == '-d'): LDAPDn = val; elif (switch == '-l'): LDAPServer = val; elif (switch == '-g'): GroupMember = val; elif (switch == '-m'): AllowMIME = 0; elif (switch == '-v'): Verbose = 1; elif (switch == '-p'): Phrases = val; Now = time.strftime("%a, %d %b %Y %H:%M:%S",time.gmtime(time.time())); ErrMsg = "Indeterminate Error"; ErrType = EX_TEMPFAIL; MsgID = None; try: # Startup the replay cache ErrType = EX_TEMPFAIL; if ReplayCacheFile is not None: ErrMsg = "Failed to initialize the replay cache:"; RC = ReplayCache(ReplayCacheFile); # Get the email ErrType = EX_PERMFAIL; ErrMsg = "Failed to understand the email or find a signature:"; mail = email.parser.Parser().parse(sys.stdin); MsgID = mail["Message-ID"] print "Inspecting message %s"%MsgID; verbmsg("Processing message %s" % MsgID) Msg = GetClearSig(mail,1); if AllowMIME == 0 and Msg[1] != 0: raise Error, "PGP/MIME disallowed"; ErrMsg = "Message is not PGP signed:" if Msg[0].find("-----BEGIN PGP SIGNED MESSAGE-----") == -1: raise Error, "No PGP signature"; # Check the signature ErrMsg = "Unable to check the signature or the signature was invalid:"; pgp = GPGCheckSig2(Msg[0]) if not pgp.ok: raise UDFormatError, pgp.why if pgp.text is None: raise UDFormatError, "Null signature text" # Check the signature against the replay cache if ReplayCacheFile is not None: RC.process(pgp.sig_info) # Do LDAP stuff if LDAPDn is not None: CheckLDAP(pgp.key_fpr) ErrMsg = "Verifying message:"; if Phrases is not None: F = open(Phrases,"r"); while 1: Line = F.readline(); if Line == "": break; if pgp.text.find(Line.strip()) == -1: raise Error,"Phrase '%s' was not found" % (Line.strip()) except: ErrMsg = "[%s] \"%s\" \"%s %s\"\n"%(Now,MsgID,ErrMsg,sys.exc_value); sys.stderr.write(ErrMsg); Trace = "==> %s: %s\n" %(sys.exc_type,sys.exc_value); List = traceback.extract_tb(sys.exc_traceback); if len(List) >= 1: Trace = Trace + "Python Stack Trace:\n"; for x in List: Trace = Trace + " %s %s:%u: %s\n" %(x[2],x[0],x[1],x[3]); #print Trace; sys.exit(EX_PERMFAIL); # For Main print "Message %s passed"%MsgID; sys.exit(0);