From 95a2edcec85a3ee7fbe7eb3829948238b235e998 Mon Sep 17 00:00:00 2001 From: jgg <> Date: Sun, 15 Oct 2000 02:55:54 +0000 Subject: [PATCH] Sig checker --- sigcheck | 160 +++++++++++++++++++++++++++++++++++++++++++++++++ userdir_gpg.py | 66 ++++++++++++++++++-- 2 files changed, 222 insertions(+), 4 deletions(-) create mode 100755 sigcheck diff --git a/sigcheck b/sigcheck new file mode 100755 index 0000000..e20f68f --- /dev/null +++ b/sigcheck @@ -0,0 +1,160 @@ +#!/usr/bin/env python +# -*- mode: python -*- +# +# Check PGP signed emails +# +# This script verifies the signature on incoming mail for a couple of things +# - That the signature is valid, recent and is not replay +# - The signer is in the LDAP directory and is in the right group +# - The message contains no extra text that is not signed. +# +# Options: +# -r Replay cache file, if unset replay checking is disabled +# -k Colon seperated list of keyrings to use +# -d LDAP search base DN +# -l LDAP server +# -g supplementary group membership +# -p File of Phrases that must be in the plaintext. +# -m Disallow PGP/MIME + +# Typical Debian invokation may look like: +# ./gpgwrapper -k /usr/share/keyrings/debian-keyring.gpg:/usr/share/keyrings/debian-keyring.pgp \ +# -d ou=users,dc=debian,dc=org -l db.debian.org \ +# -m debian.org -a admin@db.debian.org \ +# -e /etc/userdir-ldap/templtes/error-reply -- test.sh + +import sys, traceback, time, os; +import string, pwd, getopt; +from userdir_gpg import *; + +EX_TEMPFAIL = 75; +EX_PERMFAIL = 65; # EX_DATAERR +Error = 'Message Error'; + +# Configuration +ReplayCacheFile = None; +LDAPDn = None; +LDAPServer = None; +GroupMember = None; +Phrases = None; +AllowMIME = 1; + +# Match the key fingerprint against an LDAP directory +def CheckLDAP(FingerPrint): + import ldap; + + # Connect to the ldap server + global ErrTyp, ErrMsg; + ErrType = EX_TEMPFAIL; + ErrMsg = "An error occured while performing the LDAP lookup:"; + global l; + l = ldap.open(LDAPServer); + l.simple_bind_s("",""); + + # Search for the matching key fingerprint + Attrs = l.search_s(LDAPDn,ldap.SCOPE_ONELEVEL,"keyfingerprint=" + FingerPrint); + if len(Attrs) == 0: + raise Error, "Key not found" + if len(Attrs) != 1: + raise Error, "Oddly your key fingerprint is assigned to more than one account.." + + # See if the group membership is OK + if GroupMember != None: + Hit = 0; + for x in Attrs[0][1].get("supplementarygid",[]): + if x == GroupMember: + Hit = 1; + if Hit != 1: + raise Error, "You don't have %s group permissions."%(GroupMember); + +# Start of main program +# Process options +(options, arguments) = getopt.getopt(sys.argv[1:], "r:k:d:l:g:mp:"); +for (switch, val) in options: + if (switch == '-r'): + ReplayCacheFile = val; + elif (switch == '-k'): + SetKeyrings(string.split(val,":")); + elif (switch == '-d'): + LDAPDn = val; + elif (switch == '-l'): + LDAPServer = val; + elif (switch == '-g'): + GroupMember = val; + elif (switch == '-m'): + AllowMIME = 0; + elif (switch == '-p'): + Phrases = val; + +Now = time.strftime("%a, %d %b %Y %H:%M:%S",time.gmtime(time.time())); +ErrMsg = "Indeterminate Error"; +ErrType = EX_TEMPFAIL; +MsgID = None; +try: + # Startup the replay cache + ErrType = EX_TEMPFAIL; + if ReplayCacheFile != None: + ErrMsg = "Failed to initialize the replay cache:"; + RC = ReplayCache(ReplayCacheFile); + RC.Clean(); + + # Get the email + ErrType = EX_PERMFAIL; + ErrMsg = "Failed to understand the email or find a signature:"; + Email = mimetools.Message(sys.stdin,0); + MsgID = Email.getheader("Message-ID"); + Msg = GetClearSig(Email,1); + if AllowMIME == 0 and Msg[1] != 0: + raise Error, "PGP/MIME disallowed"; + + ErrMsg = "Message is not PGP signed:" + if string.find(Msg[0],"-----BEGIN PGP SIGNED MESSAGE-----") == -1: + raise Error, "No PGP signature"; + + # Check the signature + ErrMsg = "Unable to check the signature or the signature was invalid:"; + Res = GPGCheckSig(Msg[0]); + + if Res[0] != None: + raise Error, Res[0]; + + if Res[3] == None: + raise Error, "Null signature text"; + + # Check the signature against the replay cache + if ReplayCacheFile != None: + ErrMsg = "The replay cache rejected your message. Check your clock!"; + Rply = RC.Check(Res[1]); + if Rply != None: + raise Error, Rply; + RC.Add(Res[1]); + + # Do LDAP stuff + if LDAPDn != None: + CheckLDAP(Res[2][1]); + + ErrMsg = "Verifying message:"; + if Phrases != None: + F = open(Phrases,"r"); + while 1: + Line = F.readline(); + if Line == "": break; + if string.find(Res[3],string.strip(Line)) == -1: + raise Error,"Phrase '%s' was not found"%(string.strip(Line)); + +except: + ErrMsg = "[%s] \"%s\" \"%s %s\"\n"%(Now,MsgID,ErrMsg,sys.exc_value); + sys.stderr.write(ErrMsg); + + Trace = "==> %s: %s\n" %(sys.exc_type,sys.exc_value); + List = traceback.extract_tb(sys.exc_traceback); + if len(List) >= 1: + Trace = Trace + "Python Stack Trace:\n"; + for x in List: + Trace = Trace + " %s %s:%u: %s\n" %(x[2],x[0],x[1],x[3]); + #print Trace; + + sys.exit(EX_PERMFAIL); + +# For Main +sys.exit(0); diff --git a/userdir_gpg.py b/userdir_gpg.py index b181abe..69ebe34 100644 --- a/userdir_gpg.py +++ b/userdir_gpg.py @@ -49,7 +49,10 @@ def SetKeyrings(Rings): # present in the signature text! The return result is a tuple, the first # element is the text itself the second is a mime flag indicating if the # result should be mime processed after sig checking. -def GetClearSig(Msg): +# +# Paranoid will check the message text to make sure that all the plaintext is +# in fact signed (bounded by a PGP packet) +def GetClearSig(Msg,Paranoid = 0): Error = 'MIME Error'; # See if this is a MIME encoded multipart signed message if Msg.gettype() == "multipart/signed": @@ -65,6 +68,16 @@ def GetClearSig(Msg): mf = multifile.MultiFile(SkMessage) mf.push(Msg.getparam("boundary")); + # Check the first bit of the message.. + if Paranoid != 0: + Pos = mf.tell(); + while 1: + x = mf.readline(); + if not x: break; + if len(string.strip(x)) != 0: + raise Error,"Unsigned text in message (at start)"; + mf.seek(Pos); + # Get the first part of the multipart message if not mf.next(): raise Error, "Invalid pgp/mime encoding [no section]"; @@ -73,7 +86,7 @@ def GetClearSig(Msg): Signed = StringIO.StringIO(); Signed.write(mf.read()); InnerMsg = mimetools.Message(Signed); - + # Make sure it is the right type if InnerMsg.gettype() != "text/plain": raise Error, "Invalid pgp/mime encoding [wrong plaintext type]"; @@ -86,6 +99,17 @@ def GetClearSig(Msg): raise Error, "Invalid pgp/mime encoding [wrong signature type]"; Signature = string.joinfields(mf.readlines(),''); + # Check the last bit of the message.. + if Paranoid != 0: + mf.pop(); + Pos = mf.tell(); + while 1: + x = mf.readline(); + if not x: break; + if len(string.strip(x)) != 0: + raise Error,"Unsigned text in message (at end)"; + mf.seek(Pos); + # Append the PGP boundary header and the signature text to re-form the # original signed block [needs to convert to \r\n] Output = "-----BEGIN PGP SIGNED MESSAGE-----\r\n"; @@ -96,8 +120,42 @@ def GetClearSig(Msg): Output = Output + string.replace(Signed.getvalue(),"\n---","\n- ---") + Signature; return (Output,1); else: - # Just return the message body - return (string.joinfields(Msg.fp.readlines(),''),0); + if Paranoid == 0: + # Just return the message body + return (string.joinfields(Msg.fp.readlines(),''),0); + + Body = ""; + State = 1; + for x in Msg.fp.readlines(): + Body = Body + x; + Tmp = string.strip(x); + if len(Tmp) == 0: + continue; + + # Leading up to the signature + if State == 1: + if Tmp == "-----BEGIN PGP SIGNED MESSAGE-----": + State = 2; + else: + raise Error,"Unsigned text in message (at start)"; + continue; + + # In the signature plain text + if State == 2: + if Tmp == "-----BEGIN PGP SIGNATURE-----": + State = 3; + continue; + + # In the signature + if State == 3: + if Tmp == "-----END PGP SIGNATURE-----": + State = 4; + continue; + + # Past the end + if State == 4: + raise Error,"Unsigned text in message (at end)"; + return (Body,0); # This opens GPG in 'write filter' mode. It takes Message and sends it # to GPGs standard input, pipes the standard output to a temp file along -- 2.20.1