Review some re6stnet options and update demo
[re6stnet.git] / re6st / tunnel.py
1 import logging, random, socket, time
2 from itertools import chain
3 from . import plib, utils
4
5 PORT = 326
6 RTF_CACHE = 0x01000000 # cache entry
7
8 # Be careful the refresh interval should let the routes be established
9
10 class Connection:
11
12 def __init__(self, address, write_pipe, hello, iface, prefix, encrypt,
13 ovpn_args):
14 self.process = plib.client(iface, address, write_pipe, hello, encrypt,
15 '--tls-remote', '%u/%u' % (int(prefix, 2), len(prefix)),
16 '--connect-retry-max', '3', '--tls-exit', *ovpn_args)
17 self.iface = iface
18 self.routes = 0
19 self._prefix = prefix
20
21 def refresh(self):
22 # Check that the connection is alive
23 if self.process.poll() != None:
24 logging.info('Connection with %s has failed with return code %s',
25 self._prefix, self.process.returncode)
26 return False
27 return True
28
29
30 class TunnelManager(object):
31
32 def __init__(self, write_pipe, peer_db, openvpn_args, hello_interval,
33 refresh, client_count, iface_list, network, prefix,
34 address, ip_changed, encrypt):
35 self._write_pipe = write_pipe
36 self._peer_db = peer_db
37 self._connecting = set()
38 self._connection_dict = {}
39 self._disconnected = None
40 self._distant_peers = []
41 self._iface_to_prefix = {}
42 self._ovpn_args = openvpn_args
43 self._hello = hello_interval
44 self._refresh_time = refresh
45 self._network = network
46 self._iface_list = iface_list
47 self._prefix = prefix
48 self._address = utils.address_str(address)
49 self._ip_changed = ip_changed
50 self._encrypt = encrypt
51 self._served = set()
52
53 self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
54 # See also http://stackoverflow.com/questions/597225/
55 # about binding and anycast.
56 self.sock.bind(('::', PORT))
57
58 self.next_refresh = time.time()
59 self._next_tunnel_refresh = time.time()
60
61 self._client_count = client_count
62 self._refresh_count = 1
63 self.free_interface_set = set('re6stnet' + str(i)
64 for i in xrange(1, client_count + 1))
65
66 def refresh(self):
67 logging.debug('Checking tunnels...')
68 self._cleanDeads()
69 remove = self._next_tunnel_refresh < time.time()
70 if remove:
71 self._countRoutes()
72 self._removeSomeTunnels()
73 self._next_tunnel_refresh = time.time() + self._refresh_time
74 self._peer_db.log()
75 self._makeNewTunnels(remove)
76 self.next_refresh = time.time() + 5
77
78 def _cleanDeads(self):
79 for prefix in self._connection_dict.keys():
80 if not self._connection_dict[prefix].refresh():
81 self._kill(prefix)
82
83 def _removeSomeTunnels(self):
84 # Get the candidates to killing
85 candidates = sorted(self._connection_dict, key=lambda p:
86 self._connection_dict[p].routes)
87 for prefix in candidates[0: max(0, len(self._connection_dict) -
88 self._client_count + self._refresh_count)]:
89 self._kill(prefix)
90
91 def _kill(self, prefix, kill=False):
92 logging.info('Killing the connection with %u/%u...',
93 int(prefix, 2), len(prefix))
94 connection = self._connection_dict.pop(prefix)
95 try:
96 getattr(connection.process, 'kill' if kill else 'terminate')()
97 except OSError:
98 # If the process is already exited
99 pass
100 self.free_interface_set.add(connection.iface)
101 del self._iface_to_prefix[connection.iface]
102 logging.trace('Connection with %u/%u killed',
103 int(prefix, 2), len(prefix))
104
105 def _makeTunnel(self, prefix, address):
106 assert len(self._connection_dict) < self._client_count, (prefix, self.__dict__)
107 if prefix in self._served or prefix in self._connection_dict:
108 return False
109 assert prefix != self._prefix, self.__dict__
110 logging.info('Establishing a connection with %u/%u',
111 int(prefix, 2), len(prefix))
112 iface = self.free_interface_set.pop()
113 self._connection_dict[prefix] = Connection(address, self._write_pipe,
114 self._hello, iface, prefix, self._encrypt, self._ovpn_args)
115 self._iface_to_prefix[iface] = prefix
116 self._peer_db.connecting(prefix, 1)
117 return True
118
119 def _makeNewTunnels(self, route_counted):
120 count = self._client_count - len(self._connection_dict)
121 if not count:
122 return
123 assert count >= 0
124 # CAVEAT: Forget any peer that didn't reply to our previous address
125 # request, either because latency is too high or some packet
126 # was lost. However, this means that some time should pass
127 # before calling _makeNewTunnels again.
128 self._connecting.clear()
129 distant_peers = self._distant_peers
130 if len(distant_peers) < count and not route_counted:
131 self._countRoutes()
132 disconnected = self._disconnected
133 if disconnected is not None:
134 # We aren't the registry node and we have no tunnel to or from it,
135 # so it looks like we are not connected to the network, and our
136 # neighbours are in the same situation.
137 self._disconnected = None
138 disconnected = set(disconnected).union(distant_peers)
139 if disconnected:
140 # We do have neighbours that are probably also disconnected,
141 # so force rebootstrapping.
142 peer = self._peer_db.getBootstrapPeer()
143 if not peer:
144 # Registry dead ? Assume we're connected after all.
145 disconnected = None
146 elif peer[0] not in disconnected:
147 # Got a node that will probably help us rejoining the
148 # network, so connect to it.
149 count -= self._makeTunnel(*peer)
150 if disconnected is None:
151 # Normal operation. Choose peers to connect to by looking at the
152 # routing table.
153 while count and distant_peers:
154 i = random.randrange(0, len(distant_peers))
155 peer = distant_peers[i]
156 distant_peers[i] = distant_peers[-1]
157 del distant_peers[-1]
158 address = self._peer_db.getAddress(peer)
159 if address:
160 count -= self._makeTunnel(peer, address)
161 else:
162 ip = utils.ipFromBin(self._network + peer)
163 try:
164 self.sock.sendto('\2', (ip, PORT))
165 except socket.error, e:
166 logging.info('Failed to query %s (%s)', ip, e)
167 self._connecting.add(peer)
168 count -= 1
169 elif count:
170 # No route/tunnel to registry, which usually happens when starting
171 # up. Select peers from cache for which we have no route.
172 for peer, address in self._peer_db.getPeerList():
173 if peer not in disconnected and self._makeTunnel(peer, address):
174 count -= 1
175 if not count:
176 break
177 else:
178 if not (disconnected or self._served or self._connection_dict):
179 # Startup without any good address in the cache.
180 peer = self._peer_db.getBootstrapPeer()
181 if not (peer and self._makeTunnel(*peer)):
182 # Failed to bootstrap ! Last change to connect is to
183 # retry an address that already failed :(
184 for peer in self._peer_db.getPeerList(1):
185 if self._makeTunnel(*peer):
186 break
187
188 def _countRoutes(self):
189 logging.debug('Starting to count the routes on each interface...')
190 del self._distant_peers[:]
191 for conn in self._connection_dict.itervalues():
192 conn.routes = 0
193 a = len(self._network)
194 b = a + len(self._prefix)
195 other = []
196 with open('/proc/net/ipv6_route') as f:
197 self._last_routing_table = f.read()
198 for line in self._last_routing_table.splitlines():
199 line = line.split()
200 iface = line[-1]
201 if iface == 'lo' or int(line[-2], 16) & RTF_CACHE:
202 continue
203 ip = bin(int(line[0], 16))[2:].rjust(128, '0')
204 if ip[:a] != self._network or ip[a:b] == self._prefix:
205 continue
206 prefix_len = int(line[1], 16)
207 prefix = ip[a:prefix_len]
208 logging.trace('Route on iface %s detected to %s/%u',
209 iface, utils.ipFromBin(ip), prefix_len)
210 nexthop = self._iface_to_prefix.get(iface)
211 if nexthop:
212 self._connection_dict[nexthop].routes += 1
213 if prefix in self._served or prefix in self._connection_dict:
214 continue
215 if iface in self._iface_list:
216 other.append(prefix)
217 else:
218 self._distant_peers.append(prefix)
219 is_registry = self._peer_db.registry_ip[a:].startswith
220 if is_registry(self._prefix) or any(is_registry(peer)
221 for peer in chain(self._distant_peers, other,
222 self._served, self._connection_dict)):
223 self._disconnected = None
224 # XXX: When there is no new peer to connect when looking at routes
225 # coming from tunnels, we'd like to consider those discovered
226 # from the LAN. However, we don't want to create tunnels to
227 # nodes of the LAN so do nothing until we find a way to get
228 # some information from Babel.
229 #if not self._distant_peers:
230 # self._distant_peers = other
231 else:
232 self._disconnected = other
233 logging.debug("Routes counted: %u distant peers",
234 len(self._distant_peers))
235 for c in self._connection_dict.itervalues():
236 logging.trace('- %s: %s', c.iface, c.routes)
237
238 def killAll(self):
239 for prefix in self._connection_dict.keys():
240 self._kill(prefix, True)
241
242 def handleTunnelEvent(self, msg):
243 try:
244 msg = msg.rstrip()
245 args = msg.split()
246 m = getattr(self, '_ovpn_' + args.pop(0).replace('-', '_'))
247 except (AttributeError, ValueError):
248 logging.warning("Unknown message received from OpenVPN: %s", msg)
249 else:
250 logging.debug(msg)
251 m(*args)
252
253 def _ovpn_client_connect(self, common_name):
254 prefix = utils.binFromSubnet(common_name)
255 self._served.add(prefix)
256 if prefix in self._connection_dict and self._prefix < prefix:
257 self._kill(prefix)
258 self._peer_db.connecting(prefix, 0)
259
260 def _ovpn_client_disconnect(self, common_name):
261 prefix = utils.binFromSubnet(common_name)
262 self._served.remove(prefix)
263
264 def _ovpn_route_up(self, common_name, ip):
265 self._peer_db.connecting(utils.binFromSubnet(common_name), 0)
266 if self._ip_changed:
267 self._address = utils.address_str(self._ip_changed(ip))
268
269 def handlePeerEvent(self):
270 msg, address = self.sock.recvfrom(1<<16)
271 if not (msg or utils.binFromIp(address[0]).startswith(self._network)):
272 return
273 code = ord(msg[0])
274 if code == 1: # answer
275 # TODO: do not fail if message contains garbage
276 # We parse the message in a way to discard a truncated line.
277 for peer in msg[1:].split('\n')[:-1]:
278 prefix, address = peer.split()
279 if prefix != self._prefix:
280 self._peer_db.addPeer(prefix, address)
281 try:
282 self._connecting.remove(prefix)
283 except KeyError:
284 continue
285 self._makeTunnel(prefix, address)
286 elif code == 2: # request
287 encode = '%s %s\n'.__mod__
288 if self._address:
289 msg = [encode((self._prefix, self._address))]
290 else: # I don't know my IP yet!
291 msg = []
292 # Add an extra random peer, mainly for the registry.
293 for peer in self._peer_db.getPeerList():
294 msg.append(encode(peer))
295 break
296 if msg:
297 try:
298 self.sock.sendto('\1' + ''.join(msg), address)
299 except socket.error, e:
300 logging.info('Failed to reply to %s (%s)', address, e)
301 elif code == 255:
302 # the registry wants to know the topology for debugging purpose
303 if utils.binFromIp(address[0]) == self._peer_db.registry_ip:
304 msg = ['\xfe%s%u/%u\n%u\n' % (msg[1:],
305 int(self._prefix, 2), len(self._prefix),
306 len(self._connection_dict))]
307 msg.extend('%u/%u\n' % (int(x, 2), len(x))
308 for x in (self._connection_dict, self._served)
309 for x in x)
310 try:
311 self.sock.sendto(''.join(msg), address)
312 except socket.error, e:
313 pass