The simulator con now check the resilience
[re6stnet.git] / registry.py
1 #!/usr/bin/env python
2 import argparse, math, random, select, smtplib, sqlite3, string, socket
3 import subprocess, time, threading, traceback, errno
4 from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
5 from email.mime.text import MIMEText
6 from OpenSSL import crypto
7 import utils
8
9 # To generate server ca and key with serial for 2001:db8:42::/48
10 # openssl req -nodes -new -x509 -key ca.key -set_serial 0x120010db80042 -days 365 -out ca.crt
11
12 IPV6_V6ONLY = 26
13 SOL_IPV6 = 41
14
15
16 class RequestHandler(SimpleXMLRPCRequestHandler):
17
18 def _dispatch(self, method, params):
19 return self.server._dispatch(method, (self,) + params)
20
21
22 class SimpleXMLRPCServer4(SimpleXMLRPCServer):
23
24 allow_reuse_address = True
25
26
27 class SimpleXMLRPCServer6(SimpleXMLRPCServer4):
28
29 address_family = socket.AF_INET6
30
31 def server_bind(self):
32 self.socket.setsockopt(SOL_IPV6, IPV6_V6ONLY, 1)
33 SimpleXMLRPCServer4.server_bind(self)
34
35
36 class main(object):
37
38 def __init__(self):
39 self.cert_duration = 365 * 86400
40 self.time_out = 86400
41 self.refresh_interval = 600
42 self.last_refresh = time.time()
43
44 # Command line parsing
45 parser = argparse.ArgumentParser(
46 description='Peer discovery http server for vifibnet')
47 _ = parser.add_argument
48 _('port', type=int, help='Port of the host server')
49 _('--db', required=True,
50 help='Path to database file')
51 _('--ca', required=True,
52 help='Path to ca.crt file')
53 _('--key', required=True,
54 help='Path to certificate key')
55 _('--mailhost', required=True,
56 help='SMTP server mail host')
57 _('--bootstrap', action="append",
58 help='''VPN prefix of the peers to send as bootstrap peer,
59 instead of random ones''')
60 _('--private',
61 help='VPN IP of the node on which runs the registry')
62 self.config = parser.parse_args()
63
64 # Database initializing
65 self.db = sqlite3.connect(self.config.db, isolation_level=None)
66 self.db.execute("""CREATE TABLE IF NOT EXISTS peers (
67 prefix text primary key not null,
68 address text not null,
69 date integer default (strftime('%s','now')))""")
70 self.db.execute("CREATE INDEX IF NOT EXISTS peers_ping ON peers(date)")
71 self.db.execute("""CREATE TABLE IF NOT EXISTS tokens (
72 token text primary key not null,
73 email text not null,
74 prefix_len integer not null,
75 date integer not null)""")
76 try:
77 self.db.execute("""CREATE TABLE vpn (
78 prefix text primary key not null,
79 email text,
80 cert text)""")
81 except sqlite3.OperationalError, e:
82 if e.args[0] != 'table vpn already exists':
83 raise RuntimeError
84 else:
85 self.db.execute("INSERT INTO vpn VALUES ('',null,null)")
86
87 # Loading certificates
88 with open(self.config.ca) as f:
89 self.ca = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
90 with open(self.config.key) as f:
91 self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
92 # Get vpn network prefix
93 self.network = bin(self.ca.get_serial_number())[3:]
94 print "Network prefix : %s/%u" % (self.network, len(self.network))
95
96 # Starting server
97 server4 = SimpleXMLRPCServer4(('0.0.0.0', self.config.port), requestHandler=RequestHandler, allow_none=True)
98 server4.register_instance(self)
99 server6 = SimpleXMLRPCServer6(('::', self.config.port), requestHandler=RequestHandler, allow_none=True)
100 server6.register_instance(self)
101
102 # Main loop
103 while True:
104 try:
105 r, w, e = select.select([server4, server6], [], [])
106 except (OSError, select.error) as e:
107 if e.args[0] != errno.EINTR:
108 raise
109 else:
110 for r in r:
111 r._handle_request_noblock()
112
113 def requestToken(self, handler, email):
114 while True:
115 # Generating token
116 token = ''.join(random.sample(string.ascii_lowercase, 8))
117 # Updating database
118 try:
119 self.db.execute("INSERT INTO tokens VALUES (?,?,?,?)", (token, email, 16, int(time.time())))
120 break
121 except sqlite3.IntegrityError:
122 pass
123
124 # Creating and sending email
125 s = smtplib.SMTP(self.config.mailhost)
126 me = 'postmaster@vifibnet.com'
127 msg = MIMEText('Hello world !\nYour token : %s' % (token,)) # XXX
128 msg['Subject'] = '[Vifibnet] Token Request'
129 msg['From'] = me
130 msg['To'] = email
131 s.sendmail(me, email, msg.as_string())
132 s.quit()
133
134 def _getPrefix(self, prefix_len):
135 assert 0 < prefix_len <= 128 - len(self.network)
136 for prefix, in self.db.execute("""SELECT prefix FROM vpn WHERE length(prefix) <= ? AND cert is null
137 ORDER BY length(prefix) DESC""", (prefix_len,)):
138 while len(prefix) < prefix_len:
139 self.db.execute("UPDATE vpn SET prefix = ? WHERE prefix = ?", (prefix + '1', prefix))
140 prefix += '0'
141 self.db.execute("INSERT INTO vpn VALUES (?,null,null)", (prefix,))
142 return prefix
143 logging.error('There are no more free /%s prefix available' % (prefix_len,))
144 raise RuntimeError
145
146 def requestCertificate(self, handler, token, cert_req):
147 try:
148 req = crypto.load_certificate_request(crypto.FILETYPE_PEM, cert_req)
149 with self.db:
150 try:
151 token, email, prefix_len, _ = self.db.execute("SELECT * FROM tokens WHERE token = ?", (token,)).next()
152 except StopIteration:
153 logging.exception('Bad token (%s) in request' %(token,))
154 raise
155 self.db.execute("DELETE FROM tokens WHERE token = ?", (token,))
156
157 # Get a new prefix
158 prefix = self._getPrefix(prefix_len)
159
160 # Create certificate
161 cert = crypto.X509()
162 #cert.set_serial_number(serial)
163 cert.gmtime_adj_notBefore(0)
164 cert.gmtime_adj_notAfter(self.cert_duration)
165 cert.set_issuer(self.ca.get_subject())
166 subject = req.get_subject()
167 subject.CN = "%u/%u" % (int(prefix, 2), prefix_len)
168 cert.set_subject(subject)
169 cert.set_pubkey(req.get_pubkey())
170 cert.sign(self.key, 'sha1')
171 cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
172
173 # Insert certificate into db
174 self.db.execute("UPDATE vpn SET email = ?, cert = ? WHERE prefix = ?", (email, cert, prefix))
175
176 return cert
177 except:
178 traceback.print_exc()
179 raise
180
181 def getCa(self, handler):
182 return crypto.dump_certificate(crypto.FILETYPE_PEM, self.ca)
183
184 def getPrivateAddress(self, handler):
185 return 'http://[%s]:%u' % (self.config.private, self.config.port)
186
187 def _randomPeer(self):
188 return self.db.execute("""SELECT prefix, address
189 FROM peers ORDER BY random() LIMIT 1""").next()
190
191 def getBootstrapPeer(self, handler, client_prefix):
192 cert = self.db.execute("SELECT cert FROM vpn WHERE prefix = ?", (client_prefix,))
193 if self.config.bootstrap:
194 bootpeer = random.choice(self.config.bootstrap)
195 try:
196 prefix, address = self.db.execute("""SELECT prefix, address
197 FROM peers WHERE prefix = ?""", (bootpeer,))
198 except StopIteration:
199 prefix, address = self._randomPeer()
200 else:
201 prefix, address = self._randomPeer()
202 r, w = os.pipe()
203 try:
204 threading.Thread(target=os.write, args=(w, cert)).start()
205 p = subprocess.Popen(('openssl', 'rsautl', '-encrypt', '-certin', '-inkey', '/proc/self/fd/%u' % r),
206 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
207 print "Sending bootstrap peer (%s, %s)" % (prefix, address)
208 return xmlrpclib.Binary(p.communicate('%s %s' % (prefix, address)))
209 finally:
210 os.close(r)
211 os.close(w)
212
213 def declare(self, handler, address):
214 print "declaring new node"
215 client_address, address = address
216 #client_address, _ = handler.client_address
217 client_ip = utils.binFromIp(client_address)
218 if client_ip.startswith(self.network):
219 prefix = client_ip[len(self.network):]
220 prefix, = self.db.execute("SELECT prefix FROM vpn WHERE prefix <= ? ORDER BY prefix DESC LIMIT 1", (prefix,)).next()
221 self.db.execute("INSERT OR REPLACE INTO peers (prefix, address) VALUES (?,?)", (prefix, address))
222 return True
223 else:
224 logging.warning("Unauthorized connection from %s which does not start with %s"
225 % (utils.ipFromBin(client_ip), utils.ipFromBin(self.network.ljust(128, '0'))))
226 return False
227
228 def getPeerList(self, handler, n, client_address):
229 assert 0 < n < 1000
230 client_ip = utils.binFromIp(client_address)
231 if client_ip.startswith(self.network):
232 if time.time() > self.last_refresh + self.refresh_interval:
233 print "refreshing peers for dead ones"
234 self.db.execute("DELETE FROM peers WHERE ( date + ? ) <= CAST (strftime('%s', 'now') AS INTEGER)", (self.time_out,))
235 self.last_refesh = time.time()
236 print "sending peers"
237 return self.db.execute("SELECT prefix, address FROM peers ORDER BY random() LIMIT ?", (n,)).fetchall()
238 else:
239 logging.warning("Unauthorized connection from %s which does not start with %s"
240 % (utils.ipFromBin(client_ip), utils.ipFromBin(self.network.ljust(128, '0'))))
241 raise RuntimeError
242
243 if __name__ == "__main__":
244 main()