Merge branch 'master' into cygwin
[re6stnet.git] / re6st / tunnel.py
1 import logging, platform, random, socket, subprocess, sys, time
2 from collections import deque
3 from itertools import chain
4 from . import plib, utils
5
6 PORT = 326
7 RTF_CACHE = 0x01000000 # cache entry
8
9 # Be careful the refresh interval should let the routes be established
10
11
12 class MultiGatewayManager(dict):
13
14 def __init__(self, gateway):
15 self._gw = gateway
16
17 def _route(self, cmd, dest, gw):
18 if gw:
19 cmd = 'ip', '-4', 'route', cmd, '%s/32' % dest, 'via', gw
20 logging.trace('%r', cmd)
21 subprocess.call(cmd)
22
23 def add(self, dest, route):
24 try:
25 self[dest][1] += 1
26 except KeyError:
27 gw = self._gw(dest) if route else None
28 self[dest] = [gw, 0]
29 self._route('add', dest, gw)
30
31 def remove(self, dest):
32 gw, count = self[dest]
33 if count:
34 self[dest][1] = count - 1
35 else:
36 del self[dest]
37 try:
38 self._route('del', dest, gw)
39 except:
40 pass
41
42 class Connection(object):
43
44 def __init__(self, address_list, iface, prefix):
45 self.address_list = address_list
46 self.iface = iface
47 self.routes = 0
48 self._prefix = prefix
49
50 def __iter__(self):
51 if not hasattr(self, '_remote_ip_set'):
52 self._remote_ip_set = set()
53 for ip, port, proto in self.address_list:
54 try:
55 socket.inet_pton(socket.AF_INET, ip)
56 except socket.error:
57 continue
58 self._remote_ip_set.add(ip)
59 return iter(self._remote_ip_set)
60
61 def open(self, write_pipe, timeout, encrypt, ovpn_args, _retry=0):
62 self.process = plib.client(
63 self.iface, (self.address_list[_retry],), encrypt,
64 '--tls-remote', '%u/%u' % (int(self._prefix, 2), len(self._prefix)),
65 '--resolv-retry', '0',
66 '--connect-retry-max', '3', '--tls-exit',
67 '--ping-exit', str(timeout),
68 '--route-up', '%s %u' % (plib.ovpn_client, write_pipe),
69 *ovpn_args)
70 _retry += 1
71 self._retry = _retry < len(self.address_list) and (
72 write_pipe, timeout, encrypt, ovpn_args, _retry)
73
74 def connected(self, db):
75 try:
76 i = self._retry[-1] - 1
77 self._retry = None
78 except TypeError:
79 i = len(self.address_list) - 1
80 if i:
81 db.addPeer(self._prefix, ','.join(self.address_list[i]), True)
82 else:
83 db.connecting(self._prefix, 0)
84
85 def close(self):
86 try:
87 self.process.stop()
88 except (AttributeError, OSError):
89 pass # we already polled an exited process
90
91 def refresh(self):
92 # Check that the connection is alive
93 if self.process.poll() != None:
94 logging.info('Connection with %s has failed with return code %s',
95 self._prefix, self.process.returncode)
96 if not self._retry:
97 return False
98 logging.info('Retrying with alternate address')
99 self.close()
100 self.open(*self._retry)
101 return True
102
103
104 class TunnelManager(object):
105
106 def __init__(self, write_pipe, peer_db, openvpn_args, timeout,
107 refresh, client_count, iface_list, network, prefix,
108 address, ip_changed, encrypt, remote_gateway, disable_proto):
109 self._write_pipe = write_pipe
110 self._peer_db = peer_db
111 self._connecting = set()
112 self._connection_dict = {}
113 self._disconnected = None
114 self._distant_peers = []
115 self._iface_to_prefix = {}
116 self._ovpn_args = openvpn_args
117 self._timeout = timeout
118 self._refresh_time = refresh
119 self._network = network
120 self._iface_list = iface_list
121 self._prefix = prefix
122 self._address = utils.dump_address(address)
123 self._ip_changed = ip_changed
124 self._encrypt = encrypt
125 self._gateway_manager = MultiGatewayManager(remote_gateway) \
126 if remote_gateway else None
127 self._disable_proto = disable_proto
128 self._served = set()
129
130 self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
131 # See also http://stackoverflow.com/questions/597225/
132 # about binding and anycast.
133 self.sock.bind(('::', PORT))
134
135 self.next_refresh = time.time()
136 self._next_tunnel_refresh = time.time()
137
138 self._client_count = client_count
139 self._refresh_count = 1
140 self.new_iface_list = deque('re6stnet' + str(i)
141 for i in xrange(1, self._client_count + 1))
142 self._free_iface_list = []
143
144 def _tuntap(self, iface=None):
145 if iface:
146 self.new_iface_list.appendleft(iface)
147 action = 'del'
148 else:
149 iface = self.new_iface_list.popleft()
150 action = 'add'
151 args = 'ip', 'tuntap', action, 'dev', iface, 'mode', 'tap'
152 logging.debug('%r', args)
153 subprocess.call(args)
154 return iface
155
156 def delInterfaces(self):
157 iface_list = self._free_iface_list
158 iface_list += self._iface_to_prefix
159 self._iface_to_prefix.clear()
160 while iface_list:
161 self._tuntap(iface_list.pop())
162
163 def getFreeInterface(self, prefix):
164 try:
165 iface = self._free_iface_list.pop()
166 except IndexError:
167 iface = self._tuntap()
168 self._iface_to_prefix[iface] = prefix
169 return iface
170
171 def freeInterface(self, iface):
172 self._free_iface_list.append(iface)
173 del self._iface_to_prefix[iface]
174
175 def refresh(self):
176 logging.debug('Checking tunnels...')
177 self._cleanDeads()
178 remove = self._next_tunnel_refresh < time.time()
179 if remove:
180 self._countRoutes()
181 self._removeSomeTunnels()
182 self._next_tunnel_refresh = time.time() + self._refresh_time
183 self._peer_db.log()
184 self._makeNewTunnels(remove)
185 if remove and self._free_iface_list:
186 self._tuntap(self._free_iface_list.pop())
187 self.next_refresh = time.time() + 5
188
189 def _cleanDeads(self):
190 for prefix in self._connection_dict.keys():
191 if not self._connection_dict[prefix].refresh():
192 self._kill(prefix)
193
194 def _removeSomeTunnels(self):
195 # Get the candidates to killing
196 candidates = sorted(self._connection_dict, key=lambda p:
197 self._connection_dict[p].routes)
198 for prefix in candidates[0: max(0, len(self._connection_dict) -
199 self._client_count + self._refresh_count)]:
200 self._kill(prefix)
201
202 def _kill(self, prefix):
203 logging.info('Killing the connection with %u/%u...',
204 int(prefix, 2), len(prefix))
205 connection = self._connection_dict.pop(prefix)
206 self.freeInterface(connection.iface)
207 connection.close()
208 if self._gateway_manager is not None:
209 for ip in connection:
210 self._gateway_manager.remove(ip)
211 logging.trace('Connection with %u/%u killed',
212 int(prefix, 2), len(prefix))
213
214 def _makeTunnel(self, prefix, address):
215 assert len(self._connection_dict) < self._client_count, (prefix, self.__dict__)
216 if prefix in self._served or prefix in self._connection_dict:
217 return False
218 assert prefix != self._prefix, self.__dict__
219 address = [x for x in utils.parse_address(address)
220 if x[2] not in self._disable_proto]
221 self._peer_db.connecting(prefix, 1)
222 if not address:
223 return False
224 logging.info('Establishing a connection with %u/%u',
225 int(prefix, 2), len(prefix))
226 iface = self.getFreeInterface(prefix)
227 self._connection_dict[prefix] = c = Connection(address, iface, prefix)
228 if self._gateway_manager is not None:
229 for ip in c:
230 self._gateway_manager.add(ip, True)
231 c.open(self._write_pipe, self._timeout, self._encrypt, self._ovpn_args)
232 return True
233
234 def _makeNewTunnels(self, route_counted):
235 count = self._client_count - len(self._connection_dict)
236 if not count:
237 return
238 assert count >= 0
239 # CAVEAT: Forget any peer that didn't reply to our previous address
240 # request, either because latency is too high or some packet
241 # was lost. However, this means that some time should pass
242 # before calling _makeNewTunnels again.
243 self._connecting.clear()
244 distant_peers = self._distant_peers
245 if len(distant_peers) < count and not route_counted:
246 self._countRoutes()
247 disconnected = self._disconnected
248 if disconnected is not None:
249 # We aren't the registry node and we have no tunnel to or from it,
250 # so it looks like we are not connected to the network, and our
251 # neighbours are in the same situation.
252 self._disconnected = None
253 disconnected = set(disconnected).union(distant_peers)
254 if disconnected:
255 # We do have neighbours that are probably also disconnected,
256 # so force rebootstrapping.
257 peer = self._peer_db.getBootstrapPeer()
258 if not peer:
259 # Registry dead ? Assume we're connected after all.
260 disconnected = None
261 elif peer[0] not in disconnected:
262 # Got a node that will probably help us rejoining the
263 # network, so connect to it.
264 count -= self._makeTunnel(*peer)
265 if disconnected is None:
266 # Normal operation. Choose peers to connect to by looking at the
267 # routing table.
268 while count and distant_peers:
269 i = random.randrange(0, len(distant_peers))
270 peer = distant_peers[i]
271 distant_peers[i] = distant_peers[-1]
272 del distant_peers[-1]
273 address = self._peer_db.getAddress(peer)
274 if address:
275 count -= self._makeTunnel(peer, address)
276 else:
277 ip = utils.ipFromBin(self._network + peer)
278 # TODO: Send at least 1 address. This helps the registry
279 # node filling its cache when building a new network.
280 try:
281 self.sock.sendto('\2', (ip, PORT))
282 except socket.error, e:
283 logging.info('Failed to query %s (%s)', ip, e)
284 self._connecting.add(peer)
285 count -= 1
286 elif count:
287 # No route/tunnel to registry, which usually happens when starting
288 # up. Select peers from cache for which we have no route.
289 new = 0
290 bootstrap = True
291 for peer, address in self._peer_db.getPeerList():
292 if peer not in disconnected:
293 bootstrap = False
294 if self._makeTunnel(peer, address):
295 new += 1
296 if new == count:
297 return
298 if not (new or disconnected):
299 if bootstrap:
300 # Startup without any good address in the cache.
301 peer = self._peer_db.getBootstrapPeer()
302 if peer and self._makeTunnel(*peer):
303 return
304 # Failed to bootstrap ! Last change to connect is to
305 # retry an address that already failed :(
306 for peer in self._peer_db.getPeerList(1):
307 if self._makeTunnel(*peer):
308 break
309
310 if sys.platform == 'cygwin':
311 def _iterRoutes(self):
312 interface_dict = {}
313 interfaces = subprocess.check_output(
314 ('netsh', 'interface', 'ipv6', 'show', 'interface'),
315 stderr=subprocess.STDOUT)
316 for line in interfaces.splitlines():
317 if not line == '':
318 fs = line.split(None, 4)
319 if fs[0].isdigit():
320 interface_dict[fs[0].strip()] = fs[4].strip()
321 routing_table = subprocess.check_output(
322 ('netsh', 'interface', 'ipv6', 'show', 'route', 'verbose'),
323 stderr=subprocess.STDOUT)
324 # Before Vista
325 if platform.system()[10:11] == '5':
326 pname = 'Prefix'
327 ifname = lambda a : a
328 # From Vista later
329 else:
330 pname = 'Destination Prefix'
331 ifname = interface_dict.get
332 for line in routing_table.splitlines():
333 fs = line.split(':', 1)
334 test = fs[0].startswith
335 if test(pname):
336 prefix, prefix_len = fs[1].split('/', 1)
337 elif test('Interface'):
338 yield (ifname(fs[1].strip()),
339 utils.binFromIp(prefix.strip()),
340 int(prefix_len))
341
342 else:
343 def _iterRoutes(self):
344 with open('/proc/net/ipv6_route') as f:
345 routing_table = f.read()
346 for line in routing_table.splitlines():
347 line = line.split()
348 iface = line[-1]
349 if iface != 'lo' and not (int(line[-2], 16) & RTF_CACHE):
350 yield (iface, bin(int(line[0], 16))[2:].rjust(128, '0'),
351 int(line[1], 16))
352
353 def _countRoutes(self):
354 logging.debug('Starting to count the routes on each interface...')
355 del self._distant_peers[:]
356 for conn in self._connection_dict.itervalues():
357 conn.routes = 0
358 a = len(self._network)
359 b = a + len(self._prefix)
360 other = []
361 for iface, ip, prefix_len in self._iterRoutes():
362 if ip[:a] == self._network and ip[a:b] != self._prefix:
363 prefix = ip[a:prefix_len]
364 logging.trace('Route on iface %s detected to %s/%u',
365 iface, utils.ipFromBin(ip), prefix_len)
366 nexthop = self._iface_to_prefix.get(iface)
367 if nexthop:
368 self._connection_dict[nexthop].routes += 1
369 if prefix in self._served or prefix in self._connection_dict:
370 continue
371 if iface in self._iface_list:
372 other.append(prefix)
373 else:
374 self._distant_peers.append(prefix)
375 is_registry = self._peer_db.registry_ip[a:].startswith
376 if is_registry(self._prefix) or any(is_registry(peer)
377 for peer in chain(self._distant_peers, other,
378 self._served, self._connection_dict)):
379 self._disconnected = None
380 # XXX: When there is no new peer to connect when looking at routes
381 # coming from tunnels, we'd like to consider those discovered
382 # from the LAN. However, we don't want to create tunnels to
383 # nodes of the LAN so do nothing until we find a way to get
384 # some information from Babel.
385 #if not self._distant_peers:
386 # self._distant_peers = other
387 else:
388 self._disconnected = other
389 logging.debug("Routes counted: %u distant peers",
390 len(self._distant_peers))
391 for c in self._connection_dict.itervalues():
392 logging.trace('- %s: %s', c.iface, c.routes)
393
394 def killAll(self):
395 for prefix in self._connection_dict.keys():
396 self._kill(prefix)
397
398 def handleTunnelEvent(self, msg):
399 try:
400 msg = msg.rstrip()
401 args = msg.split()
402 m = getattr(self, '_ovpn_' + args.pop(0).replace('-', '_'))
403 except (AttributeError, ValueError):
404 logging.warning("Unknown message received from OpenVPN: %s", msg)
405 else:
406 logging.debug(msg)
407 m(*args)
408
409 def _ovpn_client_connect(self, common_name, trusted_ip):
410 prefix = utils.binFromSubnet(common_name)
411 self._served.add(prefix)
412 if self._gateway_manager is not None:
413 self._gateway_manager.add(trusted_ip, False)
414 if prefix in self._connection_dict and self._prefix < prefix:
415 self._kill(prefix)
416 self._peer_db.connecting(prefix, 0)
417
418 def _ovpn_client_disconnect(self, common_name, trusted_ip):
419 prefix = utils.binFromSubnet(common_name)
420 try:
421 self._served.remove(prefix)
422 except KeyError:
423 return
424 if self._gateway_manager is not None:
425 self._gateway_manager.remove(trusted_ip)
426
427 def _ovpn_route_up(self, common_name, ip):
428 prefix = utils.binFromSubnet(common_name)
429 try:
430 self._connection_dict[prefix].connected(self._peer_db)
431 except KeyError:
432 pass
433 if self._ip_changed:
434 self._address = utils.dump_address(self._ip_changed(ip))
435
436 def handlePeerEvent(self):
437 msg, address = self.sock.recvfrom(1<<16)
438 if not (msg or utils.binFromIp(address[0]).startswith(self._network)):
439 return
440 code = ord(msg[0])
441 if code == 1: # answer
442 # We parse the message in a way to discard a truncated line.
443 for peer in msg[1:].split('\n')[:-1]:
444 try:
445 prefix, address = peer.split()
446 int(prefix, 2)
447 except ValueError:
448 break
449 if prefix != self._prefix:
450 self._peer_db.addPeer(prefix, address)
451 try:
452 self._connecting.remove(prefix)
453 except KeyError:
454 continue
455 self._makeTunnel(prefix, address)
456 elif code == 2: # request
457 encode = '%s %s\n'.__mod__
458 if self._address:
459 msg = [encode((self._prefix, self._address))]
460 else: # I don't know my IP yet!
461 msg = []
462 # Add an extra random peer, mainly for the registry.
463 if random.randint(0, self._peer_db.getPeerCount()):
464 msg.append(encode(self._peer_db.getPeerList().next()))
465 if msg:
466 try:
467 self.sock.sendto('\1' + ''.join(msg), address)
468 except socket.error, e:
469 logging.info('Failed to reply to %s (%s)', address, e)
470 elif code == 255:
471 # the registry wants to know the topology for debugging purpose
472 if utils.binFromIp(address[0]) == self._peer_db.registry_ip:
473 msg = ['\xfe%s%u/%u\n%u\n' % (msg[1:],
474 int(self._prefix, 2), len(self._prefix),
475 len(self._connection_dict))]
476 msg.extend('%u/%u\n' % (int(x, 2), len(x))
477 for x in (self._connection_dict, self._served)
478 for x in x)
479 try:
480 self.sock.sendto(''.join(msg), address)
481 except socket.error, e:
482 pass