1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22  import re 
 23   
 24  __all__ = [ 
 25      'CanonicalizationPolicy', 
 26      'InvalidCanonicalizationPolicyError', 
 27      ] 
 31      """The c= value could not be parsed.""" 
 32      pass 
  33   
 36      return re.sub(b"[\t ]+\r\n", b"\r\n", content) 
  37   
 40      return re.sub(b"[\t ]+", b" ", content) 
  41   
 44      end = None 
 45      while content.endswith(b"\r\n", 0, end): 
 46          if end is None: 
 47              end = -2 
 48          else: 
 49              end -= 2 
 50   
 51      if end is None: 
 52          return content + b"\r\n" 
 53   
 54      end += 2 
 55      if end == 0: 
 56          return content 
 57   
 58      return content[:end] 
  59   
 61      return re.sub(b"\r\n", b"", content) 
  62   
 63   
 64 -def correct_empty_body(content): 
  65      if content == b"\r\n": 
 66          return b"" 
 67      else: 
 68          return content 
  69   
 72      """Class that represents the "simple" canonicalization algorithm.""" 
 73   
 74      name = b"simple" 
 75   
 76      @staticmethod 
 80   
 81      @staticmethod 
  85   
 88      """Class that represents the "relaxed" canonicalization algorithm.""" 
 89   
 90      name = b"relaxed" 
 91   
 92      @staticmethod 
102   
103      @staticmethod 
 110   
113   
114 -    def __init__(self, header_algorithm, body_algorithm): 
 115          self.header_algorithm = header_algorithm 
116          self.body_algorithm = body_algorithm 
 117   
118      @classmethod 
120          """Construct the canonicalization policy described by a c= value. 
121   
122          May raise an C{InvalidCanonicalizationPolicyError} if the given 
123          value is invalid 
124   
125          @param c: c= value from a DKIM-Signature header field 
126          @return: a C{CanonicalizationPolicy} 
127          """ 
128          if c is None: 
129              c = b'simple/simple' 
130          m = c.split(b'/') 
131          if len(m) not in (1, 2): 
132              raise InvalidCanonicalizationPolicyError(c) 
133          if len(m) == 1: 
134              m.append(b'simple') 
135          can_headers, can_body = m 
136          try: 
137              header_algorithm = ALGORITHMS[can_headers] 
138              body_algorithm = ALGORITHMS[can_body] 
139          except KeyError as e: 
140              raise InvalidCanonicalizationPolicyError(e.args[0]) 
141          return cls(header_algorithm, body_algorithm) 
 142   
144          return b'/'.join( 
145              (self.header_algorithm.name, self.body_algorithm.name)) 
 146   
149   
150 -    def canonicalize_body(self, body): 
 151          return self.body_algorithm.canonicalize_body(body) 
  152   
153   
154  ALGORITHMS = dict((c.name, c) for c in (Simple, Relaxed)) 
155