1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import email
20 import os.path
21 import unittest
22 import time
23
24 import dkim
25
26
28 """Get the content of the given test data file.
29
30 The files live in dkim/tests/data.
31 """
32 path = os.path.join(os.path.dirname(__file__), 'data', filename)
33 with open(path, 'rb') as f:
34 return f.read()
35
36
38
40 self.assertEqual(
41 b"foo", dkim.fold(b"foo"))
42
44
45
46 self.assertEqual(
47 b"foo" * 24 + b"\r\n foo", dkim.fold(b"foo" * 25))
48
50 self.assertEqual(
51 b"foo" * 24 + b"\n foo", dkim.fold(b"foo" * 25, linesep=b"\n"))
52
53
54
56 """End-to-end signature and verification tests."""
57
65
66 - def dnsfunc(self, domain, timeout=5):
67 sample_dns = """\
68 k=rsa; s=email;\
69 p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
70 b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ=="""
71
72 _dns_responses = {
73 'example._domainkey.canonical.com.': sample_dns,
74 'test._domainkey.example.com.': read_test_data("test.txt"),
75 '20120113._domainkey.gmail.com.': """k=rsa; \
76 p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\
77 +eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\
78 s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\
79 hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\
80 MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\
81 Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB"""
82 }
83 try:
84 domain = domain.decode('ascii')
85 except UnicodeDecodeError:
86 return None
87 self.assertTrue(domain in _dns_responses,domain)
88 return _dns_responses[domain]
89
91 sample_dns = """\
92 k=rsa; \
93 p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
94 b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ=="""
95
96 _dns_responses = {
97 'example._domainkey.canonical.com.': sample_dns,
98 'test._domainkey.example.com.': read_test_data("test2.txt"),
99 '20120113._domainkey.gmail.com.': """\
100 p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\
101 +eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\
102 s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\
103 hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\
104 MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\
105 Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB"""
106 }
107 try:
108 domain = domain.decode('ascii')
109 except UnicodeDecodeError:
110 return None
111 self.assertTrue(domain in _dns_responses,domain)
112 return _dns_responses[domain]
113
115 sample_dns = """\
116 k=rsa; \
117 p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
118 b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ=="""
119
120 _dns_responses = {
121 'example._domainkey.canonical.com.': sample_dns,
122 'test._domainkey.example.com.': read_test_data("badversion.txt"),
123 '20120113._domainkey.gmail.com.': """\
124 p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\
125 +eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\
126 s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\
127 hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\
128 MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\
129 Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB"""
130 }
131 try:
132 domain = domain.decode('ascii')
133 except UnicodeDecodeError:
134 return None
135 self.assertTrue(domain in _dns_responses,domain)
136 return _dns_responses[domain]
137
139 sample_dns = """\
140 k=rsa; \
141 p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
142 b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ=="""
143
144 _dns_responses = {
145 'example._domainkey.canonical.com.': sample_dns,
146 'test._domainkey.example.com.': read_test_data("badk.txt"),
147 '20120113._domainkey.gmail.com.': """\
148 p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\
149 +eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\
150 s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\
151 hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\
152 MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\
153 Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB"""
154 }
155 try:
156 domain = domain.decode('ascii')
157 except UnicodeDecodeError:
158 return None
159 self.assertTrue(domain in _dns_responses,domain)
160 return _dns_responses[domain]
161
163 sample_dns = """\
164 k=rsa; \
165 p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
166 b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ=="""
167
168 _dns_responses = {
169 'example._domainkey.canonical.com.': sample_dns,
170 'test._domainkey.football.example.com.': read_test_data("test.txt"),
171 'brisbane._domainkey.football.example.com.': """v=DKIM1; k=ed25519; \
172 p=11qYAYKxCrfVS/7TyWQHOg7hcvPapiMlrwIaaPcHURo="""
173 }
174 try:
175 domain = domain.decode('ascii')
176 except UnicodeDecodeError:
177 return None
178 self.assertTrue(domain in _dns_responses,domain)
179 return _dns_responses[domain]
180
182 sample_dns = """\
183 k=rsa; \
184 p=MFwwDQYJKoZIhvNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
185 b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ=="""
186
187 _dns_responses = {
188 'test._domainkey.football.example.com.': sample_dns,
189 'brisbane._domainkey.football.example.com.': """v=DKIM1; k=ed25519; \
190 p=11qYAYKxCrfVS/7TyWQHOg7hcvPapiMlrwIaaPcHURo="""
191 }
192 try:
193 domain = domain.decode('ascii')
194 except UnicodeDecodeError:
195 return None
196 self.assertTrue(domain in _dns_responses,domain)
197 return _dns_responses[domain]
198
200
201 for header_algo in (b"simple", b"relaxed"):
202 for body_algo in (b"simple", b"relaxed"):
203 sig = dkim.sign(
204 self.message, b"test", b"example.com", self.key,
205 canonicalize=(header_algo, body_algo))
206 res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc)
207 self.assertTrue(res)
208
210
211 for header_algo in (b"simple", b"relaxed"):
212 for body_algo in (b"simple", b"relaxed"):
213 sig = dkim.sign(
214 self.message3, b"test", b"football.example.com", self.key,
215 canonicalize=(header_algo, body_algo), signature_algorithm=b'rsa-sha256')
216 res = dkim.verify(sig + self.message3, dnsfunc=self.dnsfunc5)
217 self.assertTrue(res)
218
220
221 for header_algo in (b"simple", b"relaxed"):
222 for body_algo in (b"simple", b"relaxed"):
223 sig = dkim.sign(
224 self.message3, b"test", b"football.example.com", self.key,
225 canonicalize=(header_algo, body_algo), signature_algorithm=b'rsa-sha256')
226 d = dkim.DKIM(self.message4)
227 res = d.verify(dnsfunc=self.dnsfunc5)
228 self.assertTrue(res)
229
235
237
238 for header_algo in (b"simple", b"relaxed"):
239 for body_algo in (b"simple", b"relaxed"):
240 sig = dkim.sign(
241 self.message, b"test", b"example.com", self.key,
242 canonicalize=(header_algo, body_algo), linesep=b"\n")
243 res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc)
244 self.assertFalse(b'\r\n' in sig)
245 self.assertTrue(res)
246
248
249 for header_algo in (b"simple", b"relaxed"):
250 for body_algo in (b"simple", b"relaxed"):
251 sig = dkim.sign(
252 self.message, b"test", b"example.com", self.key,
253 canonicalize=(header_algo, body_algo))
254 res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc2)
255 self.assertTrue(res)
256
258
259 for header_algo in (b"simple", b"relaxed"):
260 for body_algo in (b"simple", b"relaxed"):
261 sig = dkim.sign(
262 self.message, b"test", b"example.com", self.key,
263 canonicalize=(header_algo, body_algo))
264 res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc3)
265 self.assertFalse(res)
266
268
269 for header_algo in (b"simple", b"relaxed"):
270 for body_algo in (b"simple", b"relaxed"):
271 sig = dkim.sign(
272 self.message, b"test", b"example.com", self.key,
273 canonicalize=(header_algo, body_algo))
274 res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc4)
275 self.assertFalse(res)
276
278
279 for header_algo in (b"simple", b"relaxed"):
280 for body_algo in (b"simple", b"relaxed"):
281 sig = dkim.sign(
282 self.message, b"test", b"example.com", self.key,
283 canonicalize=(header_algo, body_algo),
284 include_headers=(b'from',) + dkim.DKIM.SHOULD)
285 res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc)
286 self.assertTrue(res)
287
289
290 for header_algo in (b"simple", b"relaxed"):
291 for body_algo in (b"simple", b"relaxed"):
292 sig = dkim.sign(
293 self.message, b"test", b"example.com", self.key,
294 canonicalize=(header_algo, body_algo),
295 include_headers=('from',) )
296 res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc)
297 self.assertTrue(res)
298
300 sig = dkim.sign(
301 self.message, b"test", b"example.com", self.key, length=True)
302 msg = email.message_from_string(self.message.decode('utf-8'))
303 self.assertIn('; l=%s' % len(msg.get_payload() + '\n'), sig.decode('utf-8'))
304 res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc)
305 self.assertTrue(res)
306
308
309 for header_algo in (b"simple", b"relaxed"):
310 for body_algo in (b"simple", b"relaxed"):
311 sig = dkim.sign(
312 self.message, b"test", b"example.com", self.key)
313 res = dkim.verify(
314 sig + self.message + b"foo", dnsfunc=self.dnsfunc)
315 self.assertFalse(res)
316
318
319 for header_algo in (b"simple", b"relaxed"):
320 for body_algo in (b"simple", b"relaxed"):
321 sig = dkim.sign(
322 self.message, b"test", b"example.com", self.key,
323 canonicalize=(header_algo, body_algo), length=True)
324 self.message += b'added more text\n'
325 res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc)
326 self.assertTrue(res)
327
338
340
341 sig = dkim.sign(self.message, b"test", b"example.com\xe9", self.key)
342 res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc)
343 self.assertFalse(res)
344
346
347
348
349
350
351 sample_msg = b"""\
352 From: mbp@canonical.com
353 To: scottk@example.com
354 Subject: this is my
355 test message
356 """.replace(b'\n', b'\r\n')
357
358 sample_privkey = b"""\
359 -----BEGIN RSA PRIVATE KEY-----
360 MIIBOwIBAAJBANmBe10IgY+u7h3enWTukkqtUD5PR52Tb/mPfjC0QJTocVBq6Za/
361 PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQJAYFUKsD+uMlcFu1D3YNaR
362 EGYGXjJ6w32jYGJ/P072M3yWOq2S1dvDthI3nRT8MFjZ1wHDAYHrSpfDNJ3v2fvZ
363 cQIhAPgRPmVYn+TGd59asiqG1SZqh+p+CRYHW7B8BsicG5t3AiEA4HYNOohlgWan
364 8tKgqLJgUdPFbaHZO1nDyBgvV8hvWZUCIQDDdCq6hYKuKeYUy8w3j7cgJq3ih922
365 2qNWwdJCfCWQbwIgTY0cBvQnNe0067WQIpj2pG7pkHZR6qqZ9SE+AjNTHX0CIQCI
366 Mgq55Y9MCq5wqzy141rnxrJxTwK9ABo3IAFMWEov3g==
367 -----END RSA PRIVATE KEY-----
368 """
369
370 sample_pubkey = """\
371 -----BEGIN PUBLIC KEY-----
372 MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T
373 b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==
374 -----END PUBLIC KEY-----
375 """
376
377 for header_mode in [dkim.Relaxed, dkim.Simple]:
378
379 dkim_header = dkim.sign(sample_msg, b'example', b'canonical.com',
380 sample_privkey, canonicalize=(header_mode, dkim.Relaxed))
381
382
383
384
385
386
387 signed = dkim.fold(dkim_header) + sample_msg
388 result = dkim.verify(signed,dnsfunc=self.dnsfunc,
389 minkey=512)
390 self.assertTrue(result)
391 dkim_header = dkim.fold(dkim_header)
392
393 pos = dkim_header.rindex(b'\r\n ')
394 dkim_header = dkim_header[:pos]+b'\r\n\t'+dkim_header[pos+3:]
395 result = dkim.verify(dkim_header + sample_msg,
396 dnsfunc=self.dnsfunc, minkey=512)
397 self.assertTrue(result)
398
406
408
409
410
411 message = read_test_data("message.mbox")
412 for header_algo in (b"simple", b"relaxed"):
413 for body_algo in (b"simple", b"relaxed"):
414 d = dkim.DKIM(message)
415
416 d.should_not_sign.remove(b'received')
417 sig = d.sign(b"test", b"example.com", self.key,
418 include_headers=d.all_sign_headers(),
419 canonicalize=(header_algo, body_algo))
420 dv = dkim.DKIM(sig + message)
421 res = dv.verify(dnsfunc=self.dnsfunc)
422 self.assertEqual(d.include_headers,dv.include_headers)
423 s = dkim.select_headers(d.headers,d.include_headers)
424 sv = dkim.select_headers(dv.headers,dv.include_headers)
425 self.assertEqual(s,sv)
426 self.assertTrue(res)
427
429
430
431 hfrom = b'From: "Resident Evil" <sales@spammer.com>\r\n'
432 h,b = self.message.split(b'\n\n',1)
433 for header_algo in (b"simple", b"relaxed"):
434 for body_algo in (b"simple", b"relaxed"):
435 sig = dkim.sign(
436 self.message, b"test", b"example.com", self.key)
437
438 h1 = h+b'\r\n'+b'X-Foo: bar'
439 message = b'\n\n'.join((h1,b))
440 res = dkim.verify(sig+message, dnsfunc=self.dnsfunc)
441 self.assertTrue(res)
442
443 h1 = h+b'\r\n'+hfrom.strip()
444 message = b'\n\n'.join((h1,b))
445 res = dkim.verify(sig+message, dnsfunc=self.dnsfunc)
446 self.assertFalse(res)
447
448 h1 = hfrom+h
449 message = b'\n\n'.join((h1,b))
450 res = dkim.verify(sig+message, dnsfunc=self.dnsfunc)
451 self.assertFalse(res)
452
454
455 sigerror = False
456 sig = ''
457 message = read_test_data('test_nofrom.message')
458 selector = 'test'
459 domain = 'example.com'
460 identity = None
461 try:
462 sig = dkim.sign(message, selector, domain, read_test_data('test.private'), identity = identity)
463 except dkim.ParameterError as x:
464 sigerror = True
465 self.assertTrue(sigerror)
466
468 sig = {b'v': b'1',
469 b'a': b'rsa-sha256',
470 b'b': b'K/UUOt8lCtgjp3kSTogqBm9lY1Yax/NwZ+bKm39/WKzo5KYe3L/6RoIA/0oiDX4kO\n \t Qut49HCV6ZUe6dY9V5qWBwLanRs1sCnObaOGMpFfs8tU4TWpDSVXaNZAqn15XVW0WH\n \t EzOzUfVuatpa1kF4voIgSbmZHR1vN3WpRtcTBe/I=',
471 b'bh': b'n0HUwGCP28PkesXBPH82Kboy8LhNFWU9zUISIpAez7M=',
472 b'c': b'simple/simple',
473 b'd': b'kitterman.com',
474 b'i': b'scott@Kitterman.com',
475 b'h': b'From:To:Subject:Date:Cc:MIME-Version:Content-Type:\n \t Content-Transfer-Encoding:Message-Id',
476 b's': b'2007-00',
477 b't': b'1299525798'}
478 dkim.validate_signature_fields(sig)
479
480 sigVer = sig.copy()
481 sigVer[b'v'] = 2
482 self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigVer)
483
484 sigX = sig.copy()
485 sigX[b'x'] = b'1399525798'
486 dkim.validate_signature_fields(sig)
487
488 sigX[b't'] = b'1400000000'
489 self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigX)
490
491 now = int(time.time())
492 sigX[b'x'] = str(now+400000).encode('ascii')
493 dkim.validate_signature_fields(sigX)
494
495 sigX[b'x'] = str(now - 24*3600).encode('ascii')
496 self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigX)
497
498
500 from unittest import TestLoader
501 return TestLoader().loadTestsFromName(__name__)
502