the babeld configuration now allows /128 subnets
[re6stnet.git] / re6st / tunnel.py
1 import os, traceback, time, subprocess, logging
2 import socket
3 import random
4 import plib
5 import utils
6
7 # Be carfull the refresh interval should let the routes be established
8
9 log = None
10
11
12 class Connection:
13
14 def __init__(self, address, write_pipe, hello, iface, prefix, encrypt,
15 ovpn_args):
16 self.process = plib.client(address, write_pipe, hello, encrypt, '--dev', iface,
17 *ovpn_args, stdout=os.open(os.path.join(log,
18 're6stnet.client.%s.log' % (prefix,)),
19 os.O_WRONLY | os.O_CREAT | os.O_TRUNC),
20 stderr=subprocess.STDOUT)
21
22 self.iface = iface
23 self.routes = 0
24 self._prefix = prefix
25
26 def refresh(self):
27 # Check that the connection is alive
28 if self.process.poll() != None:
29 logging.info('Connection with %s has failed with return code %s'
30 % (self._prefix, self.process.returncode))
31 return False
32 return True
33
34
35 class TunnelManager:
36
37 def __init__(self, write_pipe, peer_db, openvpn_args, hello_interval,
38 refresh, connection_count, iface_list, network, prefix, nSend,
39 encrypt):
40 self._write_pipe = write_pipe
41 self._peer_db = peer_db
42 self._connection_dict = {}
43 self._iface_to_prefix = {}
44 self._ovpn_args = openvpn_args
45 self._hello = hello_interval
46 self._refresh_time = refresh
47 self._network = network
48 self._net_len = len(network)
49 self._iface_list = iface_list
50 self._prefix = prefix
51 self._nSend = nSend
52 self._encrypt = encrypt
53 self._fast_start_done = False
54
55 self.next_refresh = time.time()
56 self._next_tunnel_refresh = time.time()
57
58 self._client_count = (connection_count + 1) // 2
59 self._refresh_count = 1
60 self.free_interface_set = set('re6stnet' + str(i)
61 for i in xrange(1, self._client_count + 1))
62
63 def refresh(self):
64 logging.info('Checking the tunnels...')
65 self._cleanDeads()
66 if self._next_tunnel_refresh < time.time():
67 logging.info('Refreshing the tunnels...')
68 self._countRoutes()
69 self._removeSomeTunnels()
70 self._next_tunnel_refresh = time.time() + self._refresh_time
71 self._makeNewTunnels()
72 self.next_refresh = time.time() + 5
73
74 def _cleanDeads(self):
75 for prefix in self._connection_dict.keys():
76 if not self._connection_dict[prefix].refresh():
77 self._kill(prefix)
78
79 def _removeSomeTunnels(self):
80 # Get the candidates to killing
81 candidates = sorted(self._connection_dict, key=lambda p:
82 self._connection_dict[p].routes)
83 for prefix in candidates[0: max(0, len(self._connection_dict) -
84 self._client_count + self._refresh_count)]:
85 self._kill(prefix)
86
87 def _kill(self, prefix):
88 logging.info('Killing the connection with %s/%u...'
89 % (hex(int(prefix, 2))[2:], len(prefix)))
90 connection = self._connection_dict.pop(prefix)
91 try:
92 connection.process.terminate()
93 except OSError:
94 # If the process is already exited
95 pass
96 self.free_interface_set.add(connection.iface)
97 self._peer_db.unusePeer(prefix)
98 del self._iface_to_prefix[connection.iface]
99 logging.trace('Connection with %s/%u killed'
100 % (hex(int(prefix, 2))[2:], len(prefix)))
101
102 def _makeNewTunnels(self):
103 tunnel_to_make = self._client_count - len(self._connection_dict)
104 if tunnel_to_make <= 0:
105 return
106 i = 0
107 logging.trace('Trying to make %i new tunnels...' % tunnel_to_make)
108 try:
109 for prefix, address in self._peer_db.getUnusedPeers(tunnel_to_make):
110 logging.info('Establishing a connection with %s/%u' %
111 (hex(int(prefix, 2))[2:], len(prefix)))
112 iface = self.free_interface_set.pop()
113 self._connection_dict[prefix] = Connection(address,
114 self._write_pipe, self._hello, iface,
115 prefix, self._encrypt, self._ovpn_args)
116 self._iface_to_prefix[iface] = prefix
117 self._peer_db.usePeer(prefix)
118 i += 1
119 logging.trace('%u new tunnels established' % (i,))
120 except KeyError:
121 logging.warning("""Can't establish connection with %s
122 : no available interface""" % prefix)
123 except Exception:
124 traceback.print_exc()
125
126 def _countRoutes(self):
127 logging.debug('Starting to count the routes on each interface...')
128 self._peer_db.clear_blacklist(0)
129 possiblePeers = set()
130 for iface in self._iface_to_prefix.keys():
131 self._connection_dict[self._iface_to_prefix[iface]].routes = 0
132 for line in open('/proc/net/ipv6_route'):
133 line = line.split()
134 ip = bin(int(line[0], 16))[2:].rjust(128, '0')
135
136 if (ip.startswith(self._network) and
137 not ip.startswith(self._network + self._prefix)):
138 iface = line[-1]
139 subnet_size = int(line[1], 16)
140 logging.trace('Route on iface %s detected to %s/%s'
141 % (iface, line[0], subnet_size))
142 if iface in self._iface_to_prefix.keys():
143 self._connection_dict[self._iface_to_prefix[iface]].routes += 1
144 if iface in self._iface_list and self._net_len < subnet_size < 128:
145 prefix = ip[self._net_len:subnet_size]
146 logging.debug('A route to %s has been discovered on the LAN'
147 % (hex(int(prefix), 2)[2:]))
148 self._peer_db.blacklist(prefix, 0)
149 possiblePeers.add(line[0])
150
151 if not self._fast_start_done and len(possiblePeers) > 4:
152 nSend = min(self._peer_db.db_size, len(possiblePeers))
153 else:
154 nSend = min(2, len(possiblePeers))
155 for ip in random.sample(possiblePeers, nSend):
156 self._notifyPeer(ip)
157
158 logging.debug("Routes have been counted")
159 for p in self._connection_dict.keys():
160 logging.trace('Routes on iface %s : %s' % (
161 self._connection_dict[p].iface,
162 self._connection_dict[p].routes))
163
164 def killAll(self):
165 for prefix in self._connection_dict.keys():
166 self._kill(prefix)
167
168 def checkIncomingTunnel(self, prefix):
169 if prefix in self._connection_dict:
170 if prefix < self._prefix:
171 return False
172 else:
173 self._kill(prefix)
174 return True
175
176 def _notifyPeer(self, peerIp):
177 try:
178 if self._peer_db.address:
179 ip = '%s:%s:%s:%s:%s:%s:%s:%s' % (peerIp[0:4], peerIp[4:8], peerIp[8:12],
180 peerIp[12:16], peerIp[16:20], peerIp[20:24], peerIp[24:28], peerIp[28:32])
181 logging.trace('Notifying peer %s' % ip)
182 self._peer_db.sock.sendto('%s %s\n' % (self._prefix, utils.address_str(self._peer_db.address)), (ip, 326))
183 except socket.error, e:
184 logging.debug('Unable to notify %s' % ip)
185 logging.debug('socket.error : %s' % e)