1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19  import email 
 20  import os.path 
 21  import unittest 
 22  import time 
 23   
 24  import dkim 
 25   
 26   
 28      """Get the content of the given test data file. 
 29   
 30      The files live in dkim/tests/data. 
 31      """ 
 32      path = os.path.join(os.path.dirname(__file__), 'data', filename) 
 33      with open(path, 'rb') as f: 
 34          return f.read() 
  35   
 36   
 38   
 40          self.assertEqual( 
 41              b"foo", dkim.fold(b"foo")) 
  42   
 44           
 45           
 46          self.assertEqual( 
 47              b"foo" * 24 + b"\r\n foo", dkim.fold(b"foo" * 25)) 
  48   
 50          self.assertEqual( 
 51              b"foo" * 24 + b"\n foo", dkim.fold(b"foo" * 25, linesep=b"\n")) 
   52   
 53   
 54   
 56      """End-to-end signature and verification tests.""" 
 57   
 65   
 66 -    def dnsfunc(self, domain, timeout=5): 
  67          sample_dns = """\ 
 68  k=rsa; s=email;\ 
 69  p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ 
 70  b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" 
 71   
 72          _dns_responses = { 
 73            'example._domainkey.canonical.com.': sample_dns, 
 74            'test._domainkey.example.com.': read_test_data("test.txt"), 
 75            '20120113._domainkey.gmail.com.': """k=rsa; \ 
 76  p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ 
 77  +eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ 
 78  s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ 
 79  hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ 
 80  MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ 
 81  Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" 
 82          } 
 83          try: 
 84              domain = domain.decode('ascii') 
 85          except UnicodeDecodeError: 
 86              return None 
 87          self.assertTrue(domain in _dns_responses,domain) 
 88          return _dns_responses[domain] 
  89   
 91          sample_dns = """\ 
 92  k=rsa; \ 
 93  p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ 
 94  b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" 
 95   
 96          _dns_responses = { 
 97            'example._domainkey.canonical.com.': sample_dns, 
 98            'test._domainkey.example.com.': read_test_data("test2.txt"), 
 99            '20120113._domainkey.gmail.com.': """\ 
100  p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ 
101  +eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ 
102  s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ 
103  hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ 
104  MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ 
105  Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" 
106          } 
107          try: 
108              domain = domain.decode('ascii') 
109          except UnicodeDecodeError: 
110              return None 
111          self.assertTrue(domain in _dns_responses,domain) 
112          return _dns_responses[domain] 
 113   
115          sample_dns = """\ 
116  k=rsa; \ 
117  p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ 
118  b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" 
119   
120          _dns_responses = { 
121            'example._domainkey.canonical.com.': sample_dns, 
122            'test._domainkey.example.com.': read_test_data("badversion.txt"), 
123            '20120113._domainkey.gmail.com.': """\ 
124  p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ 
125  +eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ 
126  s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ 
127  hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ 
128  MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ 
129  Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" 
130          } 
131          try: 
132              domain = domain.decode('ascii') 
133          except UnicodeDecodeError: 
134              return None 
135          self.assertTrue(domain in _dns_responses,domain) 
136          return _dns_responses[domain] 
 137   
139          sample_dns = """\ 
140  k=rsa; \ 
141  p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ 
142  b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" 
143   
144          _dns_responses = { 
145            'example._domainkey.canonical.com.': sample_dns, 
146            'test._domainkey.example.com.': read_test_data("badk.txt"), 
147            '20120113._domainkey.gmail.com.': """\ 
148  p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ 
149  +eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ 
150  s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ 
151  hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ 
152  MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ 
153  Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" 
154          } 
155          try: 
156              domain = domain.decode('ascii') 
157          except UnicodeDecodeError: 
158              return None 
159          self.assertTrue(domain in _dns_responses,domain) 
160          return _dns_responses[domain] 
 161   
163          sample_dns = """\ 
164  k=rsa; \ 
165  p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ 
166  b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" 
167   
168          _dns_responses = { 
169            'example._domainkey.canonical.com.': sample_dns, 
170            'test._domainkey.football.example.com.': read_test_data("test.txt"), 
171            'brisbane._domainkey.football.example.com.': """v=DKIM1; k=ed25519; \ 
172  p=11qYAYKxCrfVS/7TyWQHOg7hcvPapiMlrwIaaPcHURo=""" 
173          } 
174          try: 
175              domain = domain.decode('ascii') 
176          except UnicodeDecodeError: 
177              return None 
178          self.assertTrue(domain in _dns_responses,domain) 
179          return _dns_responses[domain] 
 180   
182          sample_dns = """\ 
183  k=rsa; \ 
184  p=MFwwDQYJKoZIhvNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ 
185  b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" 
186   
187          _dns_responses = { 
188            'test._domainkey.football.example.com.': sample_dns, 
189            'brisbane._domainkey.football.example.com.': """v=DKIM1; k=ed25519; \ 
190  p=11qYAYKxCrfVS/7TyWQHOg7hcvPapiMlrwIaaPcHURo=""" 
191          } 
192          try: 
193              domain = domain.decode('ascii') 
194          except UnicodeDecodeError: 
195              return None 
196          self.assertTrue(domain in _dns_responses,domain) 
197          return _dns_responses[domain] 
 198   
200           
201          for header_algo in (b"simple", b"relaxed"): 
202              for body_algo in (b"simple", b"relaxed"): 
203                  sig = dkim.sign( 
204                      self.message, b"test", b"example.com", self.key, 
205                      canonicalize=(header_algo, body_algo)) 
206                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
207                  self.assertTrue(res) 
 208   
210           
211          for header_algo in (b"simple", b"relaxed"): 
212              for body_algo in (b"simple", b"relaxed"): 
213                  sig = dkim.sign( 
214                      self.message3, b"test", b"football.example.com", self.key, 
215                      canonicalize=(header_algo, body_algo), signature_algorithm=b'rsa-sha256') 
216                  res = dkim.verify(sig + self.message3, dnsfunc=self.dnsfunc5) 
217                  self.assertTrue(res) 
 218   
220           
221          for header_algo in (b"simple", b"relaxed"): 
222              for body_algo in (b"simple", b"relaxed"): 
223                  sig = dkim.sign( 
224                      self.message3, b"test", b"football.example.com", self.key, 
225                      canonicalize=(header_algo, body_algo), signature_algorithm=b'rsa-sha256') 
226                  d = dkim.DKIM(self.message4) 
227                  res = d.verify(dnsfunc=self.dnsfunc5) 
228                  self.assertTrue(res) 
 229   
235   
237           
238          for header_algo in (b"simple", b"relaxed"): 
239              for body_algo in (b"simple", b"relaxed"): 
240                  sig = dkim.sign( 
241                      self.message, b"test", b"example.com", self.key, 
242                      canonicalize=(header_algo, body_algo), linesep=b"\n") 
243                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
244                  self.assertFalse(b'\r\n' in sig) 
245                  self.assertTrue(res) 
 246   
248           
249          for header_algo in (b"simple", b"relaxed"): 
250              for body_algo in (b"simple", b"relaxed"): 
251                  sig = dkim.sign( 
252                      self.message, b"test", b"example.com", self.key, 
253                      canonicalize=(header_algo, body_algo)) 
254                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc2) 
255                  self.assertTrue(res) 
 256   
258           
259          for header_algo in (b"simple", b"relaxed"): 
260              for body_algo in (b"simple", b"relaxed"): 
261                  sig = dkim.sign( 
262                      self.message, b"test", b"example.com", self.key, 
263                      canonicalize=(header_algo, body_algo)) 
264                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc3) 
265                  self.assertFalse(res) 
 266   
268           
269          for header_algo in (b"simple", b"relaxed"): 
270              for body_algo in (b"simple", b"relaxed"): 
271                  sig = dkim.sign( 
272                      self.message, b"test", b"example.com", self.key, 
273                      canonicalize=(header_algo, body_algo)) 
274                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc4) 
275                  self.assertFalse(res) 
 276   
278           
279          for header_algo in (b"simple", b"relaxed"): 
280               for body_algo in (b"simple", b"relaxed"): 
281                  sig = dkim.sign( 
282                      self.message, b"test", b"example.com", self.key, 
283                      canonicalize=(header_algo, body_algo), 
284                      include_headers=(b'from',) + dkim.DKIM.SHOULD) 
285                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
286                  self.assertTrue(res) 
 287   
289           
290          for header_algo in (b"simple", b"relaxed"): 
291               for body_algo in (b"simple", b"relaxed"): 
292                  sig = dkim.sign( 
293                      self.message, b"test", b"example.com", self.key, 
294                      canonicalize=(header_algo, body_algo), 
295                      include_headers=('from',) ) 
296                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
297                  self.assertTrue(res) 
 298   
300          sig = dkim.sign( 
301              self.message, b"test", b"example.com", self.key, length=True) 
302          msg = email.message_from_string(self.message.decode('utf-8')) 
303          self.assertIn('; l=%s' % len(msg.get_payload() + '\n'), sig.decode('utf-8')) 
304          res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
305          self.assertTrue(res) 
 306   
308           
309          for header_algo in (b"simple", b"relaxed"): 
310              for body_algo in (b"simple", b"relaxed"): 
311                  sig = dkim.sign( 
312                      self.message, b"test", b"example.com", self.key) 
313                  res = dkim.verify( 
314                      sig + self.message + b"foo", dnsfunc=self.dnsfunc) 
315                  self.assertFalse(res) 
 316   
318           
319          for header_algo in (b"simple", b"relaxed"): 
320              for body_algo in (b"simple", b"relaxed"): 
321                  sig = dkim.sign( 
322                      self.message, b"test", b"example.com", self.key, 
323                      canonicalize=(header_algo, body_algo), length=True) 
324                  self.message += b'added more text\n' 
325                  res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
326                  self.assertTrue(res) 
 327   
338   
340           
341          sig = dkim.sign(self.message, b"test", b"example.com\xe9", self.key) 
342          res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) 
343          self.assertFalse(res) 
 344   
346         
347         
348         
349         
350         
351        sample_msg = b"""\ 
352  From: mbp@canonical.com 
353  To: scottk@example.com 
354  Subject: this is my 
355      test message 
356  """.replace(b'\n', b'\r\n') 
357   
358        sample_privkey = b"""\ 
359  -----BEGIN RSA PRIVATE KEY----- 
360  MIIBOwIBAAJBANmBe10IgY+u7h3enWTukkqtUD5PR52Tb/mPfjC0QJTocVBq6Za/ 
361  PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQJAYFUKsD+uMlcFu1D3YNaR 
362  EGYGXjJ6w32jYGJ/P072M3yWOq2S1dvDthI3nRT8MFjZ1wHDAYHrSpfDNJ3v2fvZ 
363  cQIhAPgRPmVYn+TGd59asiqG1SZqh+p+CRYHW7B8BsicG5t3AiEA4HYNOohlgWan 
364  8tKgqLJgUdPFbaHZO1nDyBgvV8hvWZUCIQDDdCq6hYKuKeYUy8w3j7cgJq3ih922 
365  2qNWwdJCfCWQbwIgTY0cBvQnNe0067WQIpj2pG7pkHZR6qqZ9SE+AjNTHX0CIQCI 
366  Mgq55Y9MCq5wqzy141rnxrJxTwK9ABo3IAFMWEov3g== 
367  -----END RSA PRIVATE KEY----- 
368  """ 
369   
370        sample_pubkey = """\ 
371  -----BEGIN PUBLIC KEY----- 
372  MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T 
373  b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ== 
374  -----END PUBLIC KEY----- 
375  """ 
376   
377        for header_mode in [dkim.Relaxed, dkim.Simple]: 
378   
379          dkim_header = dkim.sign(sample_msg, b'example', b'canonical.com', 
380              sample_privkey, canonicalize=(header_mode, dkim.Relaxed)) 
381           
382           
383           
384           
385           
386           
387          signed = dkim.fold(dkim_header) + sample_msg 
388          result = dkim.verify(signed,dnsfunc=self.dnsfunc, 
389                  minkey=512) 
390          self.assertTrue(result) 
391          dkim_header = dkim.fold(dkim_header) 
392           
393          pos = dkim_header.rindex(b'\r\n ') 
394          dkim_header = dkim_header[:pos]+b'\r\n\t'+dkim_header[pos+3:] 
395          result = dkim.verify(dkim_header + sample_msg, 
396                  dnsfunc=self.dnsfunc, minkey=512) 
397          self.assertTrue(result) 
 398   
406   
408           
409           
410           
411          message = read_test_data("message.mbox") 
412          for header_algo in (b"simple", b"relaxed"): 
413              for body_algo in (b"simple", b"relaxed"): 
414                  d = dkim.DKIM(message) 
415                   
416                  d.should_not_sign.remove(b'received') 
417                  sig = d.sign(b"test", b"example.com", self.key, 
418                      include_headers=d.all_sign_headers(), 
419                      canonicalize=(header_algo, body_algo)) 
420                  dv = dkim.DKIM(sig + message) 
421                  res = dv.verify(dnsfunc=self.dnsfunc) 
422                  self.assertEqual(d.include_headers,dv.include_headers) 
423                  s = dkim.select_headers(d.headers,d.include_headers) 
424                  sv = dkim.select_headers(dv.headers,dv.include_headers) 
425                  self.assertEqual(s,sv) 
426                  self.assertTrue(res) 
 427   
429           
430           
431          hfrom = b'From: "Resident Evil" <sales@spammer.com>\r\n' 
432          h,b = self.message.split(b'\n\n',1) 
433          for header_algo in (b"simple", b"relaxed"): 
434              for body_algo in (b"simple", b"relaxed"): 
435                  sig = dkim.sign( 
436                      self.message, b"test", b"example.com", self.key) 
437                   
438                  h1 = h+b'\r\n'+b'X-Foo: bar' 
439                  message = b'\n\n'.join((h1,b)) 
440                  res = dkim.verify(sig+message, dnsfunc=self.dnsfunc) 
441                  self.assertTrue(res) 
442                   
443                  h1 = h+b'\r\n'+hfrom.strip() 
444                  message = b'\n\n'.join((h1,b)) 
445                  res = dkim.verify(sig+message, dnsfunc=self.dnsfunc) 
446                  self.assertFalse(res) 
447                   
448                  h1 = hfrom+h 
449                  message = b'\n\n'.join((h1,b)) 
450                  res = dkim.verify(sig+message, dnsfunc=self.dnsfunc) 
451                  self.assertFalse(res) 
 452   
454           
455          sigerror = False 
456          sig = '' 
457          message = read_test_data('test_nofrom.message') 
458          selector = 'test' 
459          domain = 'example.com' 
460          identity = None 
461          try: 
462              sig = dkim.sign(message, selector, domain, read_test_data('test.private'), identity = identity) 
463          except dkim.ParameterError as x: 
464              sigerror = True 
465          self.assertTrue(sigerror) 
 466   
468        sig = {b'v': b'1', 
469        b'a': b'rsa-sha256', 
470        b'b': b'K/UUOt8lCtgjp3kSTogqBm9lY1Yax/NwZ+bKm39/WKzo5KYe3L/6RoIA/0oiDX4kO\n \t Qut49HCV6ZUe6dY9V5qWBwLanRs1sCnObaOGMpFfs8tU4TWpDSVXaNZAqn15XVW0WH\n \t EzOzUfVuatpa1kF4voIgSbmZHR1vN3WpRtcTBe/I=', 
471        b'bh': b'n0HUwGCP28PkesXBPH82Kboy8LhNFWU9zUISIpAez7M=', 
472        b'c': b'simple/simple', 
473        b'd': b'kitterman.com', 
474        b'i': b'scott@Kitterman.com', 
475        b'h': b'From:To:Subject:Date:Cc:MIME-Version:Content-Type:\n \t Content-Transfer-Encoding:Message-Id', 
476        b's': b'2007-00', 
477        b't': b'1299525798'} 
478        dkim.validate_signature_fields(sig) 
479         
480        sigVer = sig.copy() 
481        sigVer[b'v'] = 2 
482        self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigVer) 
483         
484        sigX = sig.copy() 
485        sigX[b'x'] = b'1399525798' 
486        dkim.validate_signature_fields(sig) 
487         
488        sigX[b't'] = b'1400000000' 
489        self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigX) 
490         
491        now = int(time.time()) 
492        sigX[b'x'] = str(now+400000).encode('ascii') 
493        dkim.validate_signature_fields(sigX) 
494         
495        sigX[b'x'] = str(now - 24*3600).encode('ascii') 
496        self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigX) 
  497   
498   
500      from unittest import TestLoader 
501      return TestLoader().loadTestsFromName(__name__) 
 502