registry: remove incomplete migration code
[re6stnet.git] / re6st / registry.py
1 """
2 Authenticated communication:
3
4 handshake (hello):
5 C->S: CN
6 S->C: X = Encrypt(CN)(secret), Sign(CA)(X)
7
8 call:
9 C->S: CN, ..., HMAC(secret+1)(path_info?query_string)
10 S->C: result, HMAC(secret+2)(result)
11
12 secret+1 = SHA1(secret) to protect from replay attacks
13
14 HMAC in custom header, base64-encoded
15
16 To prevent anyone from breaking an existing session,
17 keep 2 secrets for each client:
18 - the last one that was really used by the client (!hello)
19 - the one of the last handshake (hello)
20 """
21 import base64, hmac, hashlib, httplib, inspect, json, logging
22 import mailbox, os, random, select, smtplib, socket, sqlite3
23 import string, struct, sys, threading, time, weakref, zlib
24 from collections import defaultdict, deque
25 from datetime import datetime
26 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
27 from email.mime.text import MIMEText
28 from operator import itemgetter
29 from OpenSSL import crypto
30 from urllib import splittype, splithost, unquote, urlencode
31 from . import ctl, tunnel, utils, version, x509
32
33 HMAC_HEADER = "Re6stHMAC"
34 RENEW_PERIOD = 30 * 86400
35 GRACE_PERIOD = 100 * 86400
36
37 def rpc(f):
38 args, varargs, varkw, defaults = inspect.getargspec(f)
39 assert not (varargs or varkw or defaults), f
40 f.getcallargs = eval("lambda %s: locals()" % ','.join(args[1:]))
41 return f
42
43
44 class RegistryServer(object):
45
46 peers = 0, ()
47 cert_duration = 365 * 86400
48
49 def __init__(self, config):
50 self.config = config
51 self.lock = threading.Lock()
52 self.sessions = {}
53 self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
54
55 # Database initializing
56 utils.makedirs(os.path.dirname(self.config.db))
57 self.db = sqlite3.connect(self.config.db, isolation_level=None,
58 check_same_thread=False)
59 self.db.text_factory = str
60 utils.sqliteCreateTable(self.db, "config",
61 "name TEXT PRIMARY KEY NOT NULL",
62 "value")
63 self.prefix = self.getConfig("prefix", None)
64 self.version = str(self.getConfig("version", "\0")) # BBB: blob
65 utils.sqliteCreateTable(self.db, "token",
66 "token TEXT PRIMARY KEY NOT NULL",
67 "email TEXT NOT NULL",
68 "prefix_len INTEGER NOT NULL",
69 "date INTEGER NOT NULL")
70 if utils.sqliteCreateTable(self.db, "cert",
71 "prefix TEXT PRIMARY KEY NOT NULL",
72 "email TEXT",
73 "cert TEXT"):
74 self.db.execute("INSERT INTO cert VALUES ('',null,null)")
75 utils.sqliteCreateTable(self.db, "crl",
76 "serial INTEGER PRIMARY KEY NOT NULL",
77 # Expiration date of revoked certificate.
78 # TODO: purge rows with dates in the past.
79 "date INTEGER NOT NULL")
80
81 self.cert = x509.Cert(self.config.ca, self.config.key)
82 # Get vpn network prefix
83 self.network = self.cert.network
84 logging.info("Network: %s/%u", utils.ipFromBin(self.network),
85 len(self.network))
86 self.email = self.cert.ca.get_subject().emailAddress
87
88 self.peers_lock = threading.Lock()
89 self.ctl = ctl.Babel(os.path.join(config.run, 'babeld.sock'),
90 weakref.proxy(self), self.network)
91
92 self.onTimeout()
93 if self.prefix:
94 with self.db:
95 self.updateNetworkConfig()
96
97 def getConfig(self, name, *default):
98 r, = next(self.db.execute(
99 "SELECT value FROM config WHERE name=?", (name,)), default)
100 return r
101
102 def setConfig(self, *name_value):
103 self.db.execute("INSERT OR REPLACE INTO config VALUES (?, ?)",
104 name_value)
105
106 def updateNetworkConfig(self, _it0=itemgetter(0)):
107 kw = {
108 'babel_default': 'max-rtt-penalty 5000 rtt-max 500 rtt-decay 125',
109 'crl': map(_it0, self.db.execute(
110 "SELECT serial FROM crl ORDER BY serial")),
111 'protocol': version.protocol,
112 'registry_prefix': self.prefix,
113 }
114 if self.config.ipv4:
115 kw['ipv4'], kw['ipv4_sublen'] = self.config.ipv4
116 for x in ('client_count', 'encrypt', 'hello',
117 'max_clients', 'min_protocol', 'tunnel_refresh'):
118 kw[x] = getattr(self.config, x)
119 config = json.dumps(kw, sort_keys=True)
120 if config != self.getConfig('last_config', None):
121 self.version = self.encodeVersion(
122 1 + self.decodeVersion(self.version))
123 # BBB: Use buffer because of http://bugs.python.org/issue13676
124 # on Python 2.6
125 self.setConfig('version', buffer(self.version))
126 self.setConfig('last_config', config)
127 self.sendto(self.prefix, 0)
128 kw[''] = 'version',
129 # Example to avoid all nodes to restart at the same time:
130 # kw['delay_restart'] = 600 * random.random()
131 kw['version'] = self.version.encode('base64')
132 self.network_config = zlib.compress(json.dumps(kw))
133
134 # The 3 first bits code the number of bytes.
135 def encodeVersion(self, version):
136 for n in xrange(8):
137 x = 32 << 8 * n
138 if version < x:
139 x = struct.pack("!Q", version + n * x)[7-n:]
140 return x + self.cert.sign(x)
141 version -= x
142
143 def decodeVersion(self, version):
144 n = ord(version[0]) >> 5
145 version, = struct.unpack("!Q", '\0' * (7 - n) + version[:n+1])
146 return sum((32 << 8 * n for n in xrange(n)),
147 version - (n * 32 << 8 * n))
148
149 def sendto(self, prefix, code):
150 self.sock.sendto("%s\0%c" % (prefix, code), ('::1', tunnel.PORT))
151
152 def recv(self, code):
153 try:
154 prefix, msg = self.sock.recv(1<<16).split('\0', 1)
155 int(prefix, 2)
156 except ValueError:
157 pass
158 else:
159 if msg and ord(msg[0]) == code:
160 return prefix, msg[1:]
161 return None, None
162
163 def select(self, r, w, t):
164 if self.timeout:
165 t.append((self.timeout, self.onTimeout))
166
167 def request_dump(self):
168 assert self.peers_lock.locked()
169 def abort():
170 raise ctl.BabelException
171 self._wait_dump = True
172 for _ in 0, 1:
173 self.ctl.request_dump()
174 try:
175 while self._wait_dump:
176 args = {}, {}, ((time.time() + 5, abort),)
177 self.ctl.select(*args)
178 utils.select(*args)
179 break
180 except ctl.BabelException:
181 self.ctl.reset()
182
183 def babel_dump(self):
184 self._wait_dump = False
185
186 def iterCert(self):
187 for prefix, email, cert in self.db.execute(
188 "SELECT * FROM cert WHERE cert IS NOT NULL"):
189 try:
190 yield (crypto.load_certificate(crypto.FILETYPE_PEM, cert),
191 prefix, email)
192 except crypto.Error:
193 pass
194
195 def onTimeout(self):
196 # XXX: Because we use threads to process requests, the statements
197 # 'self.timeout = 1' below have no effect as long as the
198 # 'select' call does not return. Ideally, we should interrupt it.
199 logging.info("Checking if there's any old entry in the database ...")
200 not_after = None
201 old = time.time() - GRACE_PERIOD
202 q = self.db.execute
203 with self.lock:
204 with self.db:
205 q("BEGIN")
206 for token, x in q("SELECT token, date FROM token"):
207 if x <= old:
208 q("DELETE FROM token WHERE token=?", (token,))
209 elif not_after is None or x < not_after:
210 not_after = x
211 for cert, prefix, email in self.iterCert():
212 x = x509.notAfter(cert)
213 if x <= old:
214 if prefix == self.prefix:
215 logging.critical("Refuse to delete certificate"
216 " of main node: wrong clock ?")
217 sys.exit(1)
218 logging.info("Delete %s: %s (invalid since %s)",
219 "certificate requested by '%s'" % email
220 if email else "anonymous certificate",
221 ", ".join("%s=%s" % x for x in
222 cert.get_subject().get_components()),
223 datetime.utcfromtimestamp(x).isoformat())
224 q("UPDATE cert SET email=null, cert=null WHERE prefix=?",
225 (prefix,))
226 elif not_after is None or x < not_after:
227 not_after = x
228 # TODO: reduce 'cert' table by merging free slots
229 # (IOW, do the contrary of newPrefix)
230 self.timeout = not_after and not_after + GRACE_PERIOD
231
232 def handle_request(self, request, method, kw,
233 _localhost=('127.0.0.1', '::1')):
234 m = getattr(self, method)
235 if method in ('revoke', 'versions', 'topology'):
236 x_forwarded_for = request.headers.get('X-Forwarded-For')
237 if request.client_address[0] not in _localhost or \
238 x_forwarded_for and x_forwarded_for not in _localhost:
239 return request.send_error(httplib.FORBIDDEN)
240 key = m.getcallargs(**kw).get('cn')
241 if key:
242 h = base64.b64decode(request.headers[HMAC_HEADER])
243 with self.lock:
244 session = self.sessions[key]
245 for key in session:
246 if h == hmac.HMAC(key, request.path, hashlib.sha1).digest():
247 break
248 else:
249 raise Exception("Wrong HMAC")
250 key = hashlib.sha1(key).digest()
251 session[:] = hashlib.sha1(key).digest(),
252 try:
253 result = m(**kw)
254 except:
255 logging.warning(request.requestline, exc_info=1)
256 return request.send_error(httplib.INTERNAL_SERVER_ERROR)
257 if result:
258 request.send_response(httplib.OK)
259 request.send_header("Content-Length", str(len(result)))
260 else:
261 request.send_response(httplib.NO_CONTENT)
262 if key:
263 request.send_header(HMAC_HEADER, base64.b64encode(
264 hmac.HMAC(key, result, hashlib.sha1).digest()))
265 request.end_headers()
266 if result:
267 request.wfile.write(result)
268
269 @rpc
270 def hello(self, client_prefix):
271 with self.lock:
272 cert = self.getCert(client_prefix)
273 key = utils.newHmacSecret()
274 self.sessions.setdefault(client_prefix, [])[1:] = key,
275 key = x509.encrypt(cert, key)
276 sign = self.cert.sign(key)
277 assert len(key) == len(sign)
278 return key + sign
279
280 def getCert(self, client_prefix):
281 assert self.lock.locked()
282 return self.db.execute("SELECT cert FROM cert"
283 " WHERE prefix=? AND cert IS NOT NULL",
284 (client_prefix,)).next()[0]
285
286 @rpc
287 def requestToken(self, email):
288 with self.lock:
289 while True:
290 # Generating token
291 token = ''.join(random.sample(string.ascii_lowercase, 8))
292 args = token, email, self.config.prefix_length, int(time.time())
293 # Updating database
294 try:
295 self.db.execute("INSERT INTO token VALUES (?,?,?,?)", args)
296 break
297 except sqlite3.IntegrityError:
298 pass
299 self.timeout = 1
300
301 # Creating and sending email
302 msg = MIMEText('Hello, your token to join re6st network is: %s\n'
303 % token)
304 msg['Subject'] = '[re6stnet] Token Request'
305 if self.email:
306 msg['From'] = self.email
307 msg['To'] = email
308 if os.path.isabs(self.config.mailhost) or \
309 os.path.isfile(self.config.mailhost):
310 with self.lock:
311 m = mailbox.mbox(self.config.mailhost)
312 try:
313 m.add(msg)
314 finally:
315 m.close()
316 else:
317 s = smtplib.SMTP(self.config.mailhost)
318 s.sendmail(self.email, email, msg.as_string())
319 s.quit()
320
321 def newPrefix(self, prefix_len):
322 max_len = 128 - len(self.network)
323 assert 0 < prefix_len <= max_len
324 try:
325 prefix, = self.db.execute("""SELECT prefix FROM cert WHERE length(prefix) <= ? AND cert is null
326 ORDER BY length(prefix) DESC""", (prefix_len,)).next()
327 except StopIteration:
328 logging.error('No more free /%u prefix available', prefix_len)
329 raise
330 while len(prefix) < prefix_len:
331 self.db.execute("UPDATE cert SET prefix = ? WHERE prefix = ?", (prefix + '1', prefix))
332 prefix += '0'
333 self.db.execute("INSERT INTO cert VALUES (?,null,null)", (prefix,))
334 if len(prefix) < max_len or '1' in prefix:
335 return prefix
336 self.db.execute("UPDATE cert SET cert = 'reserved' WHERE prefix = ?", (prefix,))
337 return self.newPrefix(prefix_len)
338
339 @rpc
340 def requestCertificate(self, token, req):
341 req = crypto.load_certificate_request(crypto.FILETYPE_PEM, req)
342 with self.lock:
343 with self.db:
344 if token:
345 try:
346 token, email, prefix_len, _ = self.db.execute(
347 "SELECT * FROM token WHERE token = ?",
348 (token,)).next()
349 except StopIteration:
350 return
351 self.db.execute("DELETE FROM token WHERE token = ?",
352 (token,))
353 else:
354 prefix_len = self.config.anonymous_prefix_length
355 if not prefix_len:
356 return
357 email = None
358 prefix = self.newPrefix(prefix_len)
359 self.db.execute("UPDATE cert SET email = ? WHERE prefix = ?",
360 (email, prefix))
361 if self.prefix is None:
362 self.prefix = prefix
363 self.setConfig('prefix', prefix)
364 self.updateNetworkConfig()
365 subject = req.get_subject()
366 subject.serialNumber = str(self.getSubjectSerial())
367 return self.createCertificate(prefix, subject, req.get_pubkey())
368
369 def getSubjectSerial(self):
370 # Smallest unique number, for IPv4 support.
371 serials = []
372 for x in self.iterCert():
373 serial = x[0].get_subject().serialNumber
374 if serial:
375 serials.append(int(serial))
376 serials.sort()
377 for serial, x in enumerate(serials):
378 if serial != x:
379 return serial
380 return len(serials)
381
382 def createCertificate(self, client_prefix, subject, pubkey, not_after=None):
383 cert = crypto.X509()
384 cert.gmtime_adj_notBefore(0)
385 if not_after:
386 cert.set_notAfter(not_after)
387 else:
388 cert.gmtime_adj_notAfter(self.cert_duration)
389 cert.set_issuer(self.cert.ca.get_subject())
390 subject.CN = "%u/%u" % (int(client_prefix, 2), len(client_prefix))
391 cert.set_subject(subject)
392 cert.set_pubkey(pubkey)
393 # Certificate serial, for revocation support. Contrary to
394 # subject serial, it does not need to be as small as possible.
395 serial = 1 + self.getConfig('serial', 0)
396 self.setConfig('serial', serial)
397 cert.set_serial_number(serial)
398 cert.sign(self.cert.key, 'sha512')
399 cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
400 self.db.execute("UPDATE cert SET cert = ? WHERE prefix = ?",
401 (cert, client_prefix))
402 self.timeout = 1
403 return cert
404
405 @rpc
406 def renewCertificate(self, cn):
407 with self.lock:
408 with self.db as db:
409 pem = self.getCert(cn)
410 cert = crypto.load_certificate(crypto.FILETYPE_PEM, pem)
411 if x509.notAfter(cert) - RENEW_PERIOD < time.time():
412 not_after = None
413 elif db.execute("SELECT count(*) FROM crl WHERE serial=?",
414 (cert.get_serial_number(),)).fetchone()[0]:
415 not_after = cert.get_notAfter()
416 else:
417 return pem
418 return self.createCertificate(cn,
419 cert.get_subject(), cert.get_pubkey(), not_after)
420
421 @rpc
422 def getCa(self):
423 return crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert.ca)
424
425 @rpc
426 def getDh(self, cn):
427 with open(self.config.dh) as f:
428 return f.read()
429
430 @rpc
431 def getNetworkConfig(self, cn):
432 return self.network_config
433
434 @rpc
435 def getBootstrapPeer(self, cn):
436 with self.peers_lock:
437 age, peers = self.peers
438 if age < time.time() or not peers:
439 self.request_dump()
440 peers = [prefix
441 for neigh_routes in self.ctl.neighbours.itervalues()
442 for prefix in neigh_routes[1]
443 if prefix]
444 peers.append(self.prefix)
445 random.shuffle(peers)
446 self.peers = time.time() + 60, peers
447 peer = peers.pop()
448 if peer == cn:
449 # Very unlikely (e.g. peer restarted with empty cache),
450 # so don't bother looping over above code
451 # (in case 'peers' is empty).
452 peer = self.prefix
453 with self.lock:
454 self.sendto(peer, 1)
455 s = self.sock,
456 timeout = 3
457 end = timeout + time.time()
458 # Loop because there may be answers from previous requests.
459 while select.select(s, (), (), timeout)[0]:
460 prefix, msg = self.recv(1)
461 if prefix == peer:
462 break
463 timeout = max(0, end - time.time())
464 else:
465 logging.info("Timeout while querying address for %s/%s",
466 int(peer, 2), len(peer))
467 return
468 cert = self.getCert(cn)
469 msg = "%s %s" % (peer, msg)
470 logging.info("Sending bootstrap peer: %s", msg)
471 return x509.encrypt(cert, msg)
472
473 @rpc
474 def revoke(self, cn_or_serial):
475 with self.lock:
476 with self.db:
477 q = self.db.execute
478 try:
479 serial = int(cn_or_serial)
480 except ValueError:
481 prefix = utils.binFromSubnet(cn_or_serial)
482 cert = self.getCert(prefix)
483 q("UPDATE cert SET email=null, cert=null WHERE prefix=?",
484 (prefix,))
485 cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
486 serial = cert.get_serial_number()
487 self.sessions.pop(prefix, None)
488 else:
489 cert, = (cert for cert, prefix, email in self.iterCert()
490 if cert.get_serial_number() == serial)
491 not_after = x509.notAfter(cert)
492 if time.time() < not_after:
493 q("INSERT INTO crl VALUES (?,?)", (serial, not_after))
494 self.updateNetworkConfig()
495
496 @rpc
497 def versions(self):
498 with self.peers_lock:
499 self.request_dump()
500 peers = set(prefix
501 for neigh_routes in self.ctl.neighbours.itervalues()
502 for prefix in neigh_routes[1]
503 if prefix)
504 peers.add(self.prefix)
505 peer_dict = {}
506 s = self.sock,
507 with self.lock:
508 while True:
509 r, w, _ = select.select(s, s if peers else (), (), 3)
510 if r:
511 prefix, ver = self.recv(4)
512 if prefix:
513 peer_dict[prefix] = ver
514 if w:
515 prefix = peers.pop()
516 peer_dict[prefix] = None
517 self.sendto(prefix, 4)
518 elif not r:
519 break
520 return json.dumps(peer_dict)
521
522 @rpc
523 def topology(self):
524 p = lambda p: '%s/%s' % (int(p, 2), len(p))
525 peers = deque((p(self.prefix),))
526 graph = defaultdict(set)
527 s = self.sock,
528 with self.lock:
529 while True:
530 r, w, _ = select.select(s, s if peers else (), (), 3)
531 if r:
532 prefix, x = self.recv(5)
533 if prefix and x:
534 prefix = p(prefix)
535 x = x.split()
536 try:
537 n = int(x.pop(0))
538 except ValueError:
539 continue
540 if n <= len(x) and prefix not in x:
541 graph[prefix].update(x[:n])
542 peers += set(x).difference(graph)
543 for x in x[n:]:
544 graph[x].add(prefix)
545 graph[''].add(prefix)
546 if w:
547 self.sendto(utils.binFromSubnet(peers.popleft()), 5)
548 elif not r:
549 break
550 return json.dumps(dict((k, list(v)) for k, v in graph.iteritems()))
551
552
553 class RegistryClient(object):
554
555 _hmac = None
556 user_agent = "re6stnet/" + version.version
557
558 def __init__(self, url, cert=None, auto_close=True):
559 self.cert = cert
560 self.auto_close = auto_close
561 scheme, host = splittype(url)
562 host, path = splithost(host)
563 self._conn = dict(http=httplib.HTTPConnection,
564 https=httplib.HTTPSConnection,
565 )[scheme](unquote(host), timeout=60)
566 self._path = path.rstrip('/')
567
568 def __getattr__(self, name):
569 getcallargs = getattr(RegistryServer, name).getcallargs
570 def rpc(*args, **kw):
571 kw = getcallargs(*args, **kw)
572 query = '/' + name
573 if kw:
574 if any(type(v) is not str for v in kw.itervalues()):
575 raise TypeError
576 query += '?' + urlencode(kw)
577 url = self._path + query
578 client_prefix = kw.get('cn')
579 retry = True
580 try:
581 while retry:
582 if client_prefix:
583 key = self._hmac
584 if not key:
585 retry = False
586 h = self.hello(client_prefix)
587 n = len(h) // 2
588 self.cert.verify(h[n:], h[:n])
589 key = self.cert.decrypt(h[:n])
590 h = hmac.HMAC(key, query, hashlib.sha1).digest()
591 key = hashlib.sha1(key).digest()
592 self._hmac = hashlib.sha1(key).digest()
593 else:
594 retry = False
595 self._conn.putrequest('GET', url, skip_accept_encoding=1)
596 self._conn.putheader('User-Agent', self.user_agent)
597 if client_prefix:
598 self._conn.putheader(HMAC_HEADER, base64.b64encode(h))
599 self._conn.endheaders()
600 response = self._conn.getresponse()
601 body = response.read()
602 if response.status in (httplib.OK, httplib.NO_CONTENT) and (
603 not client_prefix or
604 hmac.HMAC(key, body, hashlib.sha1).digest() ==
605 base64.b64decode(response.msg[HMAC_HEADER])):
606 if self.auto_close and name != 'hello':
607 self._conn.close()
608 return body
609 if client_prefix:
610 self._hmac = None
611 except Exception:
612 logging.info(url, exc_info=1)
613 else:
614 logging.info('%s\nUnexpected response %s %s',
615 url, response.status, response.reason)
616 self._conn.close()
617 setattr(self, name, rpc)
618 return rpc