Fix code of message sending back the version
[re6stnet.git] / re6st / tunnel.py
1 import logging, os, random, socket, subprocess, time, weakref
2 from collections import defaultdict, deque
3 from . import ctl, plib, utils, version
4
5 PORT = 326
6
7 # Be careful the refresh interval should let the routes be established
8
9
10 class MultiGatewayManager(dict):
11
12 def __init__(self, gateway):
13 self._gw = gateway
14
15 def _route(self, cmd, dest, gw):
16 if gw:
17 cmd = 'ip', '-4', 'route', cmd, '%s/32' % dest, 'via', gw
18 logging.trace('%r', cmd)
19 subprocess.check_call(cmd)
20
21 def add(self, dest, route):
22 try:
23 self[dest][1] += 1
24 except KeyError:
25 gw = self._gw(dest) if route else None
26 self[dest] = [gw, 0]
27 self._route('add', dest, gw)
28
29 def remove(self, dest):
30 gw, count = self[dest]
31 if count:
32 self[dest][1] = count - 1
33 else:
34 del self[dest]
35 try:
36 self._route('del', dest, gw)
37 except:
38 pass
39
40 class Connection(object):
41
42 _retry = 0
43 time = float('inf')
44
45 def __init__(self, tunnel_manager, address_list, iface, prefix):
46 self.tunnel_manager = tunnel_manager
47 self.address_list = address_list
48 self.iface = iface
49 self._prefix = prefix
50
51 def __iter__(self):
52 if not hasattr(self, '_remote_ip_set'):
53 self._remote_ip_set = set()
54 for ip, port, proto in self.address_list:
55 try:
56 socket.inet_pton(socket.AF_INET, ip)
57 except socket.error:
58 continue
59 self._remote_ip_set.add(ip)
60 return iter(self._remote_ip_set)
61
62 def open(self):
63 tm = self.tunnel_manager
64 self.time = time.time()
65 self.process = plib.client(
66 self.iface, (self.address_list[self._retry],), tm.encrypt,
67 '--tls-remote', '%u/%u' % (int(self._prefix, 2), len(self._prefix)),
68 '--resolv-retry', '0',
69 '--connect-retry-max', '3', '--tls-exit',
70 '--remap-usr1', 'SIGTERM',
71 '--ping-exit', str(tm.timeout),
72 '--route-up', '%s %u' % (plib.ovpn_client, tm.write_pipe),
73 *tm.ovpn_args)
74 tm.resetTunnelRefresh()
75 self._retry += 1
76
77 def connected(self):
78 i = self._retry - 1
79 self._retry = None
80 db = self.tunnel_manager.peer_db
81 if i:
82 db.addPeer(self._prefix, ','.join(self.address_list[i]), True)
83 else:
84 db.connecting(self._prefix, 0)
85
86 def close(self):
87 try:
88 self.process.stop()
89 except (AttributeError, OSError):
90 pass # we already polled an exited process
91
92 def refresh(self):
93 # Check that the connection is alive
94 if self.process.poll() != None:
95 logging.info('Connection with %s has failed with return code %s',
96 self._prefix, self.process.returncode)
97 if self._retry is None or len(self.address_list) <= self._retry:
98 return False
99 logging.info('Retrying with alternate address')
100 self.close()
101 self.open()
102 return True
103
104 class TunnelKiller(object):
105
106 state = None
107
108 def __init__(self, peer, tunnel_manager, client=False):
109 self.peer = peer
110 self.tm = weakref.proxy(tunnel_manager)
111 self.timeout = time.time() + 2 * tunnel_manager.timeout
112 self.client = client
113 self()
114
115 def __call__(self):
116 if self.state:
117 return getattr(self, self.state)()
118 tm_ctl = self.tm.ctl
119 try:
120 neigh = tm_ctl.neighbours[self.peer][0]
121 except KeyError:
122 return
123 self.state = 'softLocking'
124 tm_ctl.send(ctl.SetCostMultiplier(neigh.address, neigh.ifindex, 4096))
125 self.address = neigh.address
126 self.ifindex = neigh.ifindex
127 self.cost_multiplier = neigh.cost_multiplier
128
129 def softLocking(self):
130 tm = self.tm
131 if self.peer in tm.ctl.neighbours or None in tm.ctl.neighbours:
132 return
133 tm.ctl.send(ctl.SetCostMultiplier(self.address, self.ifindex, 0))
134 self.state = "hardLocking"
135
136 def hardLocking(self):
137 tm = self.tm
138 if (self.address, self.ifindex) in tm.ctl.locked:
139 self.state = 'locked'
140 self.timeout = time.time() + 2 * tm.timeout
141 tm.sendto(self.peer, ('\4' if self.client else '\5') + tm._prefix)
142 else:
143 self.timeout = 0
144
145 def unlock(self):
146 if self.state:
147 self.tm.ctl.send(ctl.SetCostMultiplier(self.address, self.ifindex,
148 self.cost_multiplier))
149
150 def abort(self):
151 if self.state != 'unlocking':
152 self.state = 'unlocking'
153 self.timeout = time.time() + 2 * self.tm.timeout
154
155 locked = unlocking = lambda _: None
156
157
158 class TunnelManager(object):
159
160 def __init__(self, control_socket, peer_db, openvpn_args, timeout,
161 refresh, client_count, iface_list, network, prefix,
162 address, ip_changed, encrypt, remote_gateway, disable_proto,
163 neighbour_list=()):
164 self.ctl = ctl.Babel(control_socket, weakref.proxy(self), network)
165 self.encrypt = encrypt
166 self.ovpn_args = openvpn_args
167 self.peer_db = peer_db
168 self.timeout = timeout
169 # Create and open read_only pipe to get server events
170 r, self.write_pipe = os.pipe()
171 self._read_pipe = os.fdopen(r)
172 self._connecting = set()
173 self._connection_dict = {}
174 self._disconnected = 0
175 self._distant_peers = []
176 self._iface_to_prefix = {}
177 self._refresh_time = refresh
178 self._network = network
179 self._iface_list = iface_list
180 self._prefix = prefix
181 address_dict = defaultdict(list)
182 for family, address in address:
183 address_dict[family] += address
184 self._address = dict((family, utils.dump_address(address))
185 for family, address in address_dict.iteritems()
186 if address)
187 self._ip_changed = ip_changed
188 self._gateway_manager = MultiGatewayManager(remote_gateway) \
189 if remote_gateway else None
190 self._disable_proto = disable_proto
191 self._neighbour_set = set(map(utils.binFromSubnet, neighbour_list))
192 self._served = set()
193 self._killing = {}
194
195 self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
196 # See also http://stackoverflow.com/questions/597225/
197 # about binding and anycast.
198 self.sock.bind(('::', PORT))
199
200 self._next_refresh = time.time()
201 self.resetTunnelRefresh()
202
203 self._client_count = client_count
204 self.new_iface_list = deque('re6stnet' + str(i)
205 for i in xrange(1, self._client_count + 1))
206 self._free_iface_list = []
207
208 def resetTunnelRefresh(self):
209 self._next_tunnel_refresh = time.time() + self._refresh_time
210
211 def _tuntap(self, iface=None):
212 if iface:
213 self.new_iface_list.appendleft(iface)
214 action = '--rmtun'
215 else:
216 iface = self.new_iface_list.popleft()
217 action = '--mktun'
218 # BBB: do not use 'ip tuntap' which is not available on old dists
219 args = ('openvpn', action, '--verb', '0',
220 '--dev', iface, '--dev-type', 'tap')
221 logging.debug('%r', args)
222 subprocess.check_call(args)
223 return iface
224
225 def delInterfaces(self):
226 iface_list = self._free_iface_list
227 iface_list += self._iface_to_prefix
228 self._iface_to_prefix.clear()
229 while iface_list:
230 self._tuntap(iface_list.pop())
231
232 def _getFreeInterface(self, prefix):
233 try:
234 iface = self._free_iface_list.pop()
235 except IndexError:
236 iface = self._tuntap()
237 self._iface_to_prefix[iface] = prefix
238 return iface
239
240 def freeInterface(self, iface):
241 self._free_iface_list.append(iface)
242 del self._iface_to_prefix[iface]
243
244 def select(self, r, w, t):
245 r[self._read_pipe] = self.handleTunnelEvent
246 r[self.sock] = self.handlePeerEvent
247 if self._next_refresh:
248 t.append((self._next_refresh, self.refresh))
249 self.ctl.select(r, w, t)
250
251 def refresh(self):
252 logging.debug('Checking tunnels...')
253 self._cleanDeads()
254 if self._next_tunnel_refresh < time.time() or \
255 self._killing or \
256 self._makeNewTunnels(False):
257 self._next_refresh = None
258 self.ctl.request_dump() # calls babel_dump immediately at startup
259 else:
260 self._next_refresh = time.time() + 5
261
262 def babel_dump(self):
263 t = time.time()
264 if self._killing:
265 for prefix, tunnel_killer in self._killing.items():
266 if tunnel_killer.timeout < t:
267 tunnel_killer.unlock()
268 del self._killing[prefix]
269 else:
270 tunnel_killer()
271 remove = self._next_tunnel_refresh < t
272 if remove:
273 self._removeSomeTunnels()
274 self.resetTunnelRefresh()
275 self.peer_db.log()
276 self._makeNewTunnels(True)
277 # XXX: Commented code is an attempt to clean up unused interfaces
278 # but babeld does not leave ipv6 membership for deleted taps,
279 # causing a memory leak in the kernel (capped by sysctl
280 # net.core.optmem_max), and after some time, new neighbours fail
281 # to see each other.
282 #if remove and len(self._connecting) < len(self._free_iface_list):
283 # self._tuntap(self._free_iface_list.pop())
284 self._next_refresh = time.time() + 5
285
286 def _cleanDeads(self):
287 for prefix in self._connection_dict.keys():
288 if not self._connection_dict[prefix].refresh():
289 self._kill(prefix)
290
291 def _tunnelScore(self, prefix):
292 n = 0
293 try:
294 for x in self.ctl.neighbours[prefix][1]:
295 if x:
296 n += 1
297 except KeyError:
298 pass
299 return (prefix in self._neighbour_set, n) if n else ()
300
301 def _removeSomeTunnels(self):
302 # Get the candidates to killing
303 peer_set = set(self._connection_dict)
304 peer_set.difference_update(self._killing)
305 count = len(peer_set) - self._client_count + 1
306 if count > 0:
307 for prefix in sorted(peer_set, key=self._tunnelScore)[:count]:
308 self._killing[prefix] = TunnelKiller(prefix, self, True)
309
310 def _abortTunnelKiller(self, prefix):
311 tunnel_killer = self._killing.get(prefix)
312 if tunnel_killer:
313 if tunnel_killer.state:
314 tunnel_killer.abort()
315 else:
316 del self._killing[prefix]
317
318 def _kill(self, prefix):
319 logging.info('Killing the connection with %u/%u...',
320 int(prefix, 2), len(prefix))
321 self._abortTunnelKiller(prefix)
322 connection = self._connection_dict.pop(prefix)
323 self.freeInterface(connection.iface)
324 connection.close()
325 if self._gateway_manager is not None:
326 for ip in connection:
327 self._gateway_manager.remove(ip)
328 logging.trace('Connection with %u/%u killed',
329 int(prefix, 2), len(prefix))
330
331 def _makeTunnel(self, prefix, address):
332 if prefix in self._served or prefix in self._connection_dict:
333 return False
334 assert prefix != self._prefix, self.__dict__
335 address = [x for x in utils.parse_address(address)
336 if x[2] not in self._disable_proto]
337 self.peer_db.connecting(prefix, 1)
338 if not address:
339 return False
340 logging.info('Establishing a connection with %u/%u',
341 int(prefix, 2), len(prefix))
342 with utils.exit:
343 iface = self._getFreeInterface(prefix)
344 self._connection_dict[prefix] = c = Connection(self, address, iface, prefix)
345 if self._gateway_manager is not None:
346 for ip in c:
347 self._gateway_manager.add(ip, True)
348 c.open()
349 return True
350
351 def _makeNewTunnels(self, route_dumped):
352 count = self._client_count - len(self._connection_dict)
353 if not count:
354 return
355 # CAVEAT: Forget any peer that didn't reply to our previous address
356 # request, either because latency is too high or some packet
357 # was lost. However, this means that some time should pass
358 # before calling _makeNewTunnels again.
359 self._connecting.clear()
360 distant_peers = self._distant_peers
361 if len(distant_peers) < count or 0 < self._disconnected < time.time():
362 if not route_dumped:
363 return True
364 logging.debug('Analyze routes ...')
365 neighbours = self.ctl.neighbours
366 # Collect all nodes known by Babel
367 peers = set(prefix
368 for neigh_routes in neighbours.itervalues()
369 for prefix in neigh_routes[1]
370 if prefix)
371 # Keep only distant peers.
372 distant_peers[:] = peers.difference(neighbours)
373 # Check whether we're connected to the network.
374 registry = self.peer_db.registry_prefix
375 if (registry == self._prefix or registry in peers
376 or registry in self._connection_dict
377 or registry in self._served):
378 self._disconnected = 0
379 # Do not bootstrap too often, especially if we are several
380 # nodes to try.
381 elif self._disconnected < time.time():
382 logging.info("No route to registry (%u peers, %u distant)",
383 len(peers), len(distant_peers))
384 self._disconnected = time.time() + self.timeout * (
385 1 + random.randint(0, len(peers)))
386 distant_peers = None
387 if peers:
388 # We aren't the only disconnected node
389 # so force rebootstrapping.
390 peer = self.peer_db.getBootstrapPeer()
391 if not peer:
392 # Registry dead ? Assume we're connected after all.
393 distant_peers = self._distant_peers
394 elif peer[0] not in peers:
395 # Got a node that will probably help us rejoining
396 # the network, so connect to it.
397 count -= self._makeTunnel(*peer)
398 if not count:
399 return
400 if distant_peers:
401 # Normal operation. Choose peers to connect to by looking at the
402 # routing table.
403 neighbour_set = self._neighbour_set.intersection(distant_peers)
404 while count and distant_peers:
405 if neighbour_set:
406 peer = neighbour_set.pop()
407 i = distant_peers.index(peer)
408 else:
409 i = random.randrange(len(distant_peers))
410 peer = distant_peers[i]
411 distant_peers[i] = distant_peers[-1]
412 del distant_peers[-1]
413 address = self.peer_db.getAddress(peer)
414 if address:
415 count -= self._makeTunnel(peer, address)
416 elif self.sendto(peer, '\2'):
417 self._connecting.add(peer)
418 count -= 1
419 elif distant_peers is None:
420 # No route/tunnel to registry, which usually happens when starting
421 # up. Select peers from cache for which we have no route.
422 new = 0
423 bootstrap = True
424 for peer, address in self.peer_db.getPeerList():
425 if peer not in peers:
426 bootstrap = False
427 if self._makeTunnel(peer, address):
428 new += 1
429 if new == count:
430 return
431 # The following condition on 'peers' is the same as above,
432 # when we asked the registry for a node to bootstrap.
433 if not (new or peers):
434 if bootstrap:
435 # Startup without any good address in the cache.
436 peer = self.peer_db.getBootstrapPeer()
437 if peer and self._makeTunnel(*peer):
438 return
439 # Failed to bootstrap ! Last change to connect is to
440 # retry an address that already failed :(
441 for peer in self.peer_db.getPeerList(1):
442 if self._makeTunnel(*peer):
443 break
444
445 def killAll(self):
446 for prefix in self._connection_dict.keys():
447 self._kill(prefix)
448
449 def handleTunnelEvent(self):
450 try:
451 msg = self._read_pipe.readline().rstrip()
452 args = msg.split()
453 m = getattr(self, '_ovpn_' + args.pop(0).replace('-', '_'))
454 except (AttributeError, ValueError):
455 logging.warning("Unknown message received from OpenVPN: %s", msg)
456 else:
457 logging.debug(msg)
458 m(*args)
459
460 def _ovpn_client_connect(self, common_name, trusted_ip):
461 prefix = utils.binFromSubnet(common_name)
462 self._served.add(prefix)
463 if self._gateway_manager is not None:
464 self._gateway_manager.add(trusted_ip, False)
465 if prefix in self._connection_dict and self._prefix < prefix:
466 self._kill(prefix)
467 self.peer_db.connecting(prefix, 0)
468
469 def _ovpn_client_disconnect(self, common_name, trusted_ip):
470 prefix = utils.binFromSubnet(common_name)
471 try:
472 self._served.remove(prefix)
473 except KeyError:
474 return
475 self._abortTunnelKiller(prefix)
476 if self._gateway_manager is not None:
477 self._gateway_manager.remove(trusted_ip)
478
479 def _ovpn_route_up(self, common_name, time, ip):
480 prefix = utils.binFromSubnet(common_name)
481 c = self._connection_dict.get(prefix)
482 if c and c.time < float(time):
483 try:
484 c.connected()
485 except (KeyError, TypeError), e:
486 logging.error("%s (route_up %s)", e, common_name)
487 else:
488 logging.info("ignore route_up notification for %s %r",
489 common_name, tuple(self._connection_dict))
490 if self._ip_changed:
491 family, address = self._ip_changed(ip)
492 if address:
493 self._address[family] = utils.dump_address(address)
494
495 def sendto(self, peer, msg):
496 ip = utils.ipFromBin(self._network + peer)
497 try:
498 return self.sock.sendto(msg, (ip, PORT))
499 except socket.error, e:
500 logging.info('Failed to send message to %s/%s (%s)',
501 int(peer, 2), len(peer), e)
502
503 def _sendto(self, to, msg):
504 try:
505 return self.sock.sendto(msg, to[:2])
506 except socket.error, e:
507 logging.info('Failed to send message to %s (%s)', to, e)
508
509 def handlePeerEvent(self):
510 msg, address = self.sock.recvfrom(1<<16)
511 if address[0] == '::1':
512 sender = None
513 else:
514 try:
515 sender = utils.binFromIp(address[0])
516 except socket.error, e:
517 # inet_pton does not parse '<ipv6>%<iface>'
518 logging.warning('ignored message from %r (%s)', address, e)
519 return
520 if not sender.startswith(self._network):
521 return
522 if not msg:
523 return
524 code = ord(msg[0])
525 if code == 1: # answer
526 # Old versions may send additional and obsolete addresses.
527 # Ignore them, as well as truncated lines.
528 try:
529 prefix, address = msg[1:msg.index('\n')].split()
530 int(prefix, 2)
531 except ValueError:
532 pass
533 else:
534 if prefix != self._prefix:
535 self.peer_db.addPeer(prefix, address)
536 try:
537 self._connecting.remove(prefix)
538 except KeyError:
539 pass
540 else:
541 self._makeTunnel(prefix, address)
542 elif code == 2: # request
543 if self._address:
544 self._sendto(address, '\1%s %s\n' % (self._prefix,
545 ';'.join(self._address.itervalues())))
546 #else: # I don't know my IP yet!
547 elif code == 3:
548 if len(msg) == 1:
549 self._sendto(address, '\3' + version.version)
550 elif code in (4, 5): # kill
551 prefix = msg[1:]
552 if sender and sender.startswith(prefix, len(self._network)):
553 try:
554 tunnel_killer = self._killing[prefix]
555 except KeyError:
556 if code == 4 and prefix in self._served: # request
557 self._killing[prefix] = TunnelKiller(prefix, self)
558 else:
559 if code == 5 and tunnel_killer.state == 'locked': # response
560 self._kill(prefix)
561 elif code == 255:
562 # the registry wants to know the topology for debugging purpose
563 if not sender or sender[len(self._network):].startswith(
564 self.peer_db.registry_prefix):
565 msg = ['\xfe%s%u/%u\n%u\n' % (msg[1:],
566 int(self._prefix, 2), len(self._prefix),
567 len(self._connection_dict))]
568 msg.extend('%u/%u\n' % (int(x, 2), len(x))
569 for x in (self._connection_dict, self._served)
570 for x in x)
571 try:
572 self.sock.sendto(''.join(msg), address[:2])
573 except socket.error, e:
574 pass