ud-generate: deal with users without loginShell
[mirror/userdir-ldap.git] / sigcheck
1 #!/usr/bin/env python
2 # -*- mode: python -*-
3 #
4 # Check PGP signed emails
5 #
6 # This script verifies the signature on incoming mail for a couple of things
7 #   - That the signature is valid, recent and is not replay
8 #   - The signer is in the LDAP directory and is in the right group
9 #   - The message contains no extra text that is not signed.
10 #
11 # Options:
12 #  -r  Replay cache file, if unset replay checking is disabled
13 #  -k  Colon seperated list of keyrings to use
14 #  -d  LDAP search base DN
15 #  -l  LDAP server
16 #  -g  supplementary group membership
17 #  -p  File of Phrases that must be in the plaintext.
18 #  -m  Disallow PGP/MIME
19 #  -v  Verbose mode
20
21 # Typical Debian invokation may look like:
22 # sigcheck -k /usr/share/keyrings/debian-keyring.gpg:/usr/share/keyrings/debian-keyring.pgp \
23 #      -d ou=users,dc=debian,dc=org -l db.debian.org \
24 #      -m debian.org -a admin@db.debian.org \
25 #      -e /etc/userdir-ldap/templtes/error-reply -- test.sh
26
27 import email
28 import email.parser
29 import getopt
30 import os
31 import pwd
32 import sys
33 import time
34 import traceback
35
36 from userdir_gpg import *
37
38 EX_TEMPFAIL = 75
39 EX_PERMFAIL = 65      # EX_DATAERR
40
41 # Configuration
42 ReplayCacheFile = None
43 LDAPDn = None
44 LDAPServer = None
45 GroupMember = None
46 Phrases = None
47 AllowMIME = 1
48 Verbose = 0
49
50
51 class MessageError(Exception):
52     pass
53
54
55 def verbmsg(msg):
56     if Verbose:
57         sys.stderr.write(msg + "\n")
58
59
60 # Match the key fingerprint against an LDAP directory
61 def CheckLDAP(FingerPrint):
62     import ldap
63     import userdir_ldap
64
65     # Connect to the ldap server
66     global ErrTyp, ErrMsg
67     ErrType = EX_TEMPFAIL
68     ErrMsg = "An error occurred while performing the LDAP lookup:"
69     global l
70     l = userdir_ldap.connectLDAP(LDAPServer)
71     l.simple_bind_s("", "")
72
73     # Search for the matching key fingerprint
74     verbmsg("Processing fingerprint %s" % FingerPrint)
75     Attrs = l.search_s(LDAPDn, ldap.SCOPE_ONELEVEL, "keyfingerprint=" + FingerPrint)
76     if len(Attrs) == 0:
77         raise MessageError("Key not found")
78     if len(Attrs) != 1:
79         raise MessageError("Oddly your key fingerprint is assigned to more than one account..")
80
81     gidnumber_found = 0
82     for key in Attrs[0][1].keys():
83         if key == "gidNumber":
84             gidnumber_found = 1
85
86     if gidnumber_found != 1:
87         raise MessageError("No gidnumber in attributes for fingerprint %s" % FingerPrint)
88
89     # Look for the group with the gid of the user
90     GAttr = l.search_s(LDAPDn, ldap.SCOPE_ONELEVEL, "(&(objectClass=debianGroup)(gidnumber=%s))" % Attrs[0][1]["gidNumber"][0], ["gid"])
91     if len(GAttr) == 0:
92         raise MessageError("Database inconsistency found: main group for account not found in database")
93
94     # See if the group membership is OK
95     # Only if a group was given on the commandline
96     if GroupMember is not None:
97         Hit = 0
98         # Check primary group first
99         if GAttr[0][1]["gid"][0] == GroupMember:
100             Hit = 1
101         else:
102             # Check supplementary groups
103             for x in Attrs[0][1].get("supplementaryGid", []):
104                 if x == GroupMember:
105                     Hit = 1
106         if Hit != 1:
107             raise MessageError("You don't have %s group permissions." % GroupMember)
108
109
110 # Start of main program
111 # Process options
112 options, arguments = getopt.getopt(sys.argv[1:], "r:k:d:l:g:mp:v")
113 for (switch, val) in options:
114     if (switch == '-r'):
115         ReplayCacheFile = val
116     elif (switch == '-k'):
117         SetKeyrings(val.split(":"))
118     elif (switch == '-d'):
119         LDAPDn = val
120     elif (switch == '-l'):
121         LDAPServer = val
122     elif (switch == '-g'):
123         GroupMember = val
124     elif (switch == '-m'):
125         AllowMIME = 0
126     elif (switch == '-v'):
127         Verbose = 1
128     elif (switch == '-p'):
129         Phrases = val
130
131 Now = time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime(time.time()))
132 ErrMsg = "Indeterminate Error"
133 ErrType = EX_TEMPFAIL
134 MsgID = None
135 try:
136     # Startup the replay cache
137     ErrType = EX_TEMPFAIL
138     if ReplayCacheFile is not None:
139         ErrMsg = "Failed to initialize the replay cache:"
140         RC = ReplayCache(ReplayCacheFile)
141
142     # Get the email
143     ErrType = EX_PERMFAIL
144     ErrMsg = "Failed to understand the email or find a signature:"
145     mail = email.parser.Parser().parse(sys.stdin)
146     MsgID = mail["Message-ID"]
147
148     print "Inspecting message %s" % MsgID
149     verbmsg("Processing message %s" % MsgID)
150     Msg = GetClearSig(mail, 1)
151     if AllowMIME == 0 and Msg[1] != 0:
152         raise MessageError("PGP/MIME disallowed")
153
154     ErrMsg = "Message is not PGP signed:"
155     if Msg[0].find("-----BEGIN PGP SIGNED MESSAGE-----") == -1:
156         raise MessageError("No PGP signature")
157
158     # Check the signature
159     ErrMsg = "Unable to check the signature or the signature was invalid:"
160     pgp = GPGCheckSig2(Msg[0])
161
162     if not pgp.ok:
163         raise UDFormatError(pgp.why)
164     if pgp.text is None:
165         raise UDFormatError("Null signature text")
166
167     # Check the signature against the replay cache
168     if ReplayCacheFile is not None:
169         RC.process(pgp.sig_info)
170
171     # Do LDAP stuff
172     if LDAPDn is not None:
173         CheckLDAP(pgp.key_fpr)
174
175     ErrMsg = "Verifying message:"
176     if Phrases is not None:
177         F = open(Phrases, "r")
178         while 1:
179             Line = F.readline()
180             if Line == "":
181                 break
182             if pgp.text.find(Line.strip()) == -1:
183                 raise MessageError("Phrase '%s' was not found" % (Line.strip()))
184
185 except Exception:
186     ErrMsg = "[%s] \"%s\" \"%s %s\"\n" % (Now, MsgID, ErrMsg, sys.exc_value)
187     sys.stderr.write(ErrMsg)
188
189     Trace = "==> %s: %s\n" % (sys.exc_type, sys.exc_value)
190     List = traceback.extract_tb(sys.exc_traceback)
191     if len(List) >= 1:
192         Trace = Trace + "Python Stack Trace:\n"
193         for x in List:
194             Trace = Trace + "   %s %s:%u: %s\n" % (x[2], x[0], x[1], x[3])
195
196     sys.exit(EX_PERMFAIL)
197
198 # For Main
199 print "Message %s passed" % MsgID
200 sys.exit(0)