fa0b245d279e96724d5610f93bc3b3c8c22ca032
[subu] /
1 """
2 Low-level helpers for the SecureTransport bindings.
3
4 These are Python functions that are not directly related to the high-level APIs
5 but are necessary to get them to work. They include a whole bunch of low-level
6 CoreFoundation messing about and memory management. The concerns in this module
7 are almost entirely about trying to avoid memory leaks and providing
8 appropriate and useful assistance to the higher-level code.
9 """
10 import base64
11 import ctypes
12 import itertools
13 import os
14 import re
15 import ssl
16 import struct
17 import tempfile
18
19 from .bindings import CFConst, CoreFoundation, Security
20
21 # This regular expression is used to grab PEM data out of a PEM bundle.
22 _PEM_CERTS_RE = re.compile(
23     b"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re.DOTALL
24 )
25
26
27 def _cf_data_from_bytes(bytestring):
28     """
29     Given a bytestring, create a CFData object from it. This CFData object must
30     be CFReleased by the caller.
31     """
32     return CoreFoundation.CFDataCreate(
33         CoreFoundation.kCFAllocatorDefault, bytestring, len(bytestring)
34     )
35
36
37 def _cf_dictionary_from_tuples(tuples):
38     """
39     Given a list of Python tuples, create an associated CFDictionary.
40     """
41     dictionary_size = len(tuples)
42
43     # We need to get the dictionary keys and values out in the same order.
44     keys = (t[0] for t in tuples)
45     values = (t[1] for t in tuples)
46     cf_keys = (CoreFoundation.CFTypeRef * dictionary_size)(*keys)
47     cf_values = (CoreFoundation.CFTypeRef * dictionary_size)(*values)
48
49     return CoreFoundation.CFDictionaryCreate(
50         CoreFoundation.kCFAllocatorDefault,
51         cf_keys,
52         cf_values,
53         dictionary_size,
54         CoreFoundation.kCFTypeDictionaryKeyCallBacks,
55         CoreFoundation.kCFTypeDictionaryValueCallBacks,
56     )
57
58
59 def _cfstr(py_bstr):
60     """
61     Given a Python binary data, create a CFString.
62     The string must be CFReleased by the caller.
63     """
64     c_str = ctypes.c_char_p(py_bstr)
65     cf_str = CoreFoundation.CFStringCreateWithCString(
66         CoreFoundation.kCFAllocatorDefault,
67         c_str,
68         CFConst.kCFStringEncodingUTF8,
69     )
70     return cf_str
71
72
73 def _create_cfstring_array(lst):
74     """
75     Given a list of Python binary data, create an associated CFMutableArray.
76     The array must be CFReleased by the caller.
77
78     Raises an ssl.SSLError on failure.
79     """
80     cf_arr = None
81     try:
82         cf_arr = CoreFoundation.CFArrayCreateMutable(
83             CoreFoundation.kCFAllocatorDefault,
84             0,
85             ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
86         )
87         if not cf_arr:
88             raise MemoryError("Unable to allocate memory!")
89         for item in lst:
90             cf_str = _cfstr(item)
91             if not cf_str:
92                 raise MemoryError("Unable to allocate memory!")
93             try:
94                 CoreFoundation.CFArrayAppendValue(cf_arr, cf_str)
95             finally:
96                 CoreFoundation.CFRelease(cf_str)
97     except BaseException as e:
98         if cf_arr:
99             CoreFoundation.CFRelease(cf_arr)
100         raise ssl.SSLError("Unable to allocate array: %s" % (e,))
101     return cf_arr
102
103
104 def _cf_string_to_unicode(value):
105     """
106     Creates a Unicode string from a CFString object. Used entirely for error
107     reporting.
108
109     Yes, it annoys me quite a lot that this function is this complex.
110     """
111     value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p))
112
113     string = CoreFoundation.CFStringGetCStringPtr(
114         value_as_void_p, CFConst.kCFStringEncodingUTF8
115     )
116     if string is None:
117         buffer = ctypes.create_string_buffer(1024)
118         result = CoreFoundation.CFStringGetCString(
119             value_as_void_p, buffer, 1024, CFConst.kCFStringEncodingUTF8
120         )
121         if not result:
122             raise OSError("Error copying C string from CFStringRef")
123         string = buffer.value
124     if string is not None:
125         string = string.decode("utf-8")
126     return string
127
128
129 def _assert_no_error(error, exception_class=None):
130     """
131     Checks the return code and throws an exception if there is an error to
132     report
133     """
134     if error == 0:
135         return
136
137     cf_error_string = Security.SecCopyErrorMessageString(error, None)
138     output = _cf_string_to_unicode(cf_error_string)
139     CoreFoundation.CFRelease(cf_error_string)
140
141     if output is None or output == u"":
142         output = u"OSStatus %s" % error
143
144     if exception_class is None:
145         exception_class = ssl.SSLError
146
147     raise exception_class(output)
148
149
150 def _cert_array_from_pem(pem_bundle):
151     """
152     Given a bundle of certs in PEM format, turns them into a CFArray of certs
153     that can be used to validate a cert chain.
154     """
155     # Normalize the PEM bundle's line endings.
156     pem_bundle = pem_bundle.replace(b"\r\n", b"\n")
157
158     der_certs = [
159         base64.b64decode(match.group(1)) for match in _PEM_CERTS_RE.finditer(pem_bundle)
160     ]
161     if not der_certs:
162         raise ssl.SSLError("No root certificates specified")
163
164     cert_array = CoreFoundation.CFArrayCreateMutable(
165         CoreFoundation.kCFAllocatorDefault,
166         0,
167         ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
168     )
169     if not cert_array:
170         raise ssl.SSLError("Unable to allocate memory!")
171
172     try:
173         for der_bytes in der_certs:
174             certdata = _cf_data_from_bytes(der_bytes)
175             if not certdata:
176                 raise ssl.SSLError("Unable to allocate memory!")
177             cert = Security.SecCertificateCreateWithData(
178                 CoreFoundation.kCFAllocatorDefault, certdata
179             )
180             CoreFoundation.CFRelease(certdata)
181             if not cert:
182                 raise ssl.SSLError("Unable to build cert object!")
183
184             CoreFoundation.CFArrayAppendValue(cert_array, cert)
185             CoreFoundation.CFRelease(cert)
186     except Exception:
187         # We need to free the array before the exception bubbles further.
188         # We only want to do that if an error occurs: otherwise, the caller
189         # should free.
190         CoreFoundation.CFRelease(cert_array)
191         raise
192
193     return cert_array
194
195
196 def _is_cert(item):
197     """
198     Returns True if a given CFTypeRef is a certificate.
199     """
200     expected = Security.SecCertificateGetTypeID()
201     return CoreFoundation.CFGetTypeID(item) == expected
202
203
204 def _is_identity(item):
205     """
206     Returns True if a given CFTypeRef is an identity.
207     """
208     expected = Security.SecIdentityGetTypeID()
209     return CoreFoundation.CFGetTypeID(item) == expected
210
211
212 def _temporary_keychain():
213     """
214     This function creates a temporary Mac keychain that we can use to work with
215     credentials. This keychain uses a one-time password and a temporary file to
216     store the data. We expect to have one keychain per socket. The returned
217     SecKeychainRef must be freed by the caller, including calling
218     SecKeychainDelete.
219
220     Returns a tuple of the SecKeychainRef and the path to the temporary
221     directory that contains it.
222     """
223     # Unfortunately, SecKeychainCreate requires a path to a keychain. This
224     # means we cannot use mkstemp to use a generic temporary file. Instead,
225     # we're going to create a temporary directory and a filename to use there.
226     # This filename will be 8 random bytes expanded into base64. We also need
227     # some random bytes to password-protect the keychain we're creating, so we
228     # ask for 40 random bytes.
229     random_bytes = os.urandom(40)
230     filename = base64.b16encode(random_bytes[:8]).decode("utf-8")
231     password = base64.b16encode(random_bytes[8:])  # Must be valid UTF-8
232     tempdirectory = tempfile.mkdtemp()
233
234     keychain_path = os.path.join(tempdirectory, filename).encode("utf-8")
235
236     # We now want to create the keychain itself.
237     keychain = Security.SecKeychainRef()
238     status = Security.SecKeychainCreate(
239         keychain_path, len(password), password, False, None, ctypes.byref(keychain)
240     )
241     _assert_no_error(status)
242
243     # Having created the keychain, we want to pass it off to the caller.
244     return keychain, tempdirectory
245
246
247 def _load_items_from_file(keychain, path):
248     """
249     Given a single file, loads all the trust objects from it into arrays and
250     the keychain.
251     Returns a tuple of lists: the first list is a list of identities, the
252     second a list of certs.
253     """
254     certificates = []
255     identities = []
256     result_array = None
257
258     with open(path, "rb") as f:
259         raw_filedata = f.read()
260
261     try:
262         filedata = CoreFoundation.CFDataCreate(
263             CoreFoundation.kCFAllocatorDefault, raw_filedata, len(raw_filedata)
264         )
265         result_array = CoreFoundation.CFArrayRef()
266         result = Security.SecItemImport(
267             filedata,  # cert data
268             None,  # Filename, leaving it out for now
269             None,  # What the type of the file is, we don't care
270             None,  # what's in the file, we don't care
271             0,  # import flags
272             None,  # key params, can include passphrase in the future
273             keychain,  # The keychain to insert into
274             ctypes.byref(result_array),  # Results
275         )
276         _assert_no_error(result)
277
278         # A CFArray is not very useful to us as an intermediary
279         # representation, so we are going to extract the objects we want
280         # and then free the array. We don't need to keep hold of keys: the
281         # keychain already has them!
282         result_count = CoreFoundation.CFArrayGetCount(result_array)
283         for index in range(result_count):
284             item = CoreFoundation.CFArrayGetValueAtIndex(result_array, index)
285             item = ctypes.cast(item, CoreFoundation.CFTypeRef)
286
287             if _is_cert(item):
288                 CoreFoundation.CFRetain(item)
289                 certificates.append(item)
290             elif _is_identity(item):
291                 CoreFoundation.CFRetain(item)
292                 identities.append(item)
293     finally:
294         if result_array:
295             CoreFoundation.CFRelease(result_array)
296
297         CoreFoundation.CFRelease(filedata)
298
299     return (identities, certificates)
300
301
302 def _load_client_cert_chain(keychain, *paths):
303     """
304     Load certificates and maybe keys from a number of files. Has the end goal
305     of returning a CFArray containing one SecIdentityRef, and then zero or more
306     SecCertificateRef objects, suitable for use as a client certificate trust
307     chain.
308     """
309     # Ok, the strategy.
310     #
311     # This relies on knowing that macOS will not give you a SecIdentityRef
312     # unless you have imported a key into a keychain. This is a somewhat
313     # artificial limitation of macOS (for example, it doesn't necessarily
314     # affect iOS), but there is nothing inside Security.framework that lets you
315     # get a SecIdentityRef without having a key in a keychain.
316     #
317     # So the policy here is we take all the files and iterate them in order.
318     # Each one will use SecItemImport to have one or more objects loaded from
319     # it. We will also point at a keychain that macOS can use to work with the
320     # private key.
321     #
322     # Once we have all the objects, we'll check what we actually have. If we
323     # already have a SecIdentityRef in hand, fab: we'll use that. Otherwise,
324     # we'll take the first certificate (which we assume to be our leaf) and
325     # ask the keychain to give us a SecIdentityRef with that cert's associated
326     # key.
327     #
328     # We'll then return a CFArray containing the trust chain: one
329     # SecIdentityRef and then zero-or-more SecCertificateRef objects. The
330     # responsibility for freeing this CFArray will be with the caller. This
331     # CFArray must remain alive for the entire connection, so in practice it
332     # will be stored with a single SSLSocket, along with the reference to the
333     # keychain.
334     certificates = []
335     identities = []
336
337     # Filter out bad paths.
338     paths = (path for path in paths if path)
339
340     try:
341         for file_path in paths:
342             new_identities, new_certs = _load_items_from_file(keychain, file_path)
343             identities.extend(new_identities)
344             certificates.extend(new_certs)
345
346         # Ok, we have everything. The question is: do we have an identity? If
347         # not, we want to grab one from the first cert we have.
348         if not identities:
349             new_identity = Security.SecIdentityRef()
350             status = Security.SecIdentityCreateWithCertificate(
351                 keychain, certificates[0], ctypes.byref(new_identity)
352             )
353             _assert_no_error(status)
354             identities.append(new_identity)
355
356             # We now want to release the original certificate, as we no longer
357             # need it.
358             CoreFoundation.CFRelease(certificates.pop(0))
359
360         # We now need to build a new CFArray that holds the trust chain.
361         trust_chain = CoreFoundation.CFArrayCreateMutable(
362             CoreFoundation.kCFAllocatorDefault,
363             0,
364             ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
365         )
366         for item in itertools.chain(identities, certificates):
367             # ArrayAppendValue does a CFRetain on the item. That's fine,
368             # because the finally block will release our other refs to them.
369             CoreFoundation.CFArrayAppendValue(trust_chain, item)
370
371         return trust_chain
372     finally:
373         for obj in itertools.chain(identities, certificates):
374             CoreFoundation.CFRelease(obj)
375
376
377 TLS_PROTOCOL_VERSIONS = {
378     "SSLv2": (0, 2),
379     "SSLv3": (3, 0),
380     "TLSv1": (3, 1),
381     "TLSv1.1": (3, 2),
382     "TLSv1.2": (3, 3),
383 }
384
385
386 def _build_tls_unknown_ca_alert(version):
387     """
388     Builds a TLS alert record for an unknown CA.
389     """
390     ver_maj, ver_min = TLS_PROTOCOL_VERSIONS[version]
391     severity_fatal = 0x02
392     description_unknown_ca = 0x30
393     msg = struct.pack(">BB", severity_fatal, description_unknown_ca)
394     msg_len = len(msg)
395     record_type_alert = 0x15
396     record = struct.pack(">BBBH", record_type_alert, ver_maj, ver_min, msg_len) + msg
397     return record