New log system, read README
[re6stnet.git] / tunnel.py
1 import os, random, traceback, time, struct, subprocess, operator, math, logging
2 import plib, utils, db
3
4 smooth = 0.3 # this is used to smooth the traffic sampling. Lower value
5 # mean more smooth
6 protected = 0.2 # ratio of the tunnels protected against kill because they are
7 # used a lot
8
9 # Be carfull the refresh interval should let the routes be established
10
11
12 class Connection:
13
14 def __init__(self, address, write_pipe, hello, iface, prefix,
15 ovpn_args):
16 self.process = plib.client(address, write_pipe, hello, '--dev', iface,
17 *ovpn_args, stdout=os.open(os.path.join(log,
18 'vifibnet.client.%s.log' % (prefix,)),
19 os.O_WRONLY | os.O_CREAT | os.O_TRUNC),
20 stderr=subprocess.STDOUT)
21
22 self.iface = iface
23 self.routes = 0
24 self._prefix = prefix
25 self.bandwidth = None
26 self._last_trafic = None
27
28 # TODO : update the stats
29 def refresh(self):
30 # Check that the connection is alive
31 if self.process.poll() != None:
32 logging.info('Connection with %s has failed with return code %s'
33 % (self._prefix, self.process.returncode))
34 return False
35
36 # self._updateBandwidth()
37 return True
38
39 # Unused for now. By killing tunnels with significantly lower trafic
40 # in comparison to other tunnels, we hope to connect to nodes with
41 # better bandwith, in order to improve connectivity with destinations
42 # we are really interested in.
43 def _updateBandwidth(self):
44 try:
45 f_rx = open('/sys/class/net/%s/statistics/rx_bytes' %
46 self.iface, 'r')
47 f_tx = open('/sys/class/net/%s/statistics/tx_bytes' %
48 self.iface, 'r')
49
50 trafic = int(f_rx.read()) + int(f_tx.read())
51 t = time.time()
52
53 if bool(self._last_trafic):
54 bw = (trafic - self._last_trafic) / (t -
55 self._last_trafic_update)
56 if bool(self.bandwidth):
57 self.bandwidth = ((1 - smooth) * self.bandwidth
58 + smooth * bw)
59 else:
60 self.bandwidth = bw
61
62 logging.debug('New bandwidth calculated on iface %s : %s' %
63 (self.iface, self.bandwidth))
64
65 self._last_trafic_update = t
66 self._last_trafic = trafic
67 except IOError: # This just means that the interface is down
68 logging.debug('Unable to calculate bandwidth on iface %s' %
69 self.iface)
70
71
72 class TunnelManager:
73
74 def __init__(self, write_pipe, peer_db, openvpn_args, hello_interval,
75 refresh, connection_count, refresh_rate, iface_list, network):
76 self._write_pipe = write_pipe
77 self._peer_db = peer_db
78 self._connection_dict = {}
79 self._iface_to_prefix = {}
80 self._ovpn_args = openvpn_args
81 self._hello = hello_interval
82 self._refresh_time = refresh
83 self._network = network
84 self._net_len = len(network)
85 self._iface_list = iface_list
86 self.free_interface_set = set(('client1', 'client2', 'client3',
87 'client4', 'client5', 'client6',
88 'client7', 'client8', 'client9',
89 'client10', 'client11', 'client12'))
90 self.next_refresh = time.time()
91
92 self._client_count = int(math.ceil(float(connection_count) / 2.0))
93 self._refresh_count = int(math.ceil(refresh_rate * self._client_count))
94
95 def refresh(self):
96 logging.info('Refreshing the tunnels...')
97 self._cleanDeads()
98 self._countRoutes()
99 self._removeSomeTunnels()
100 self._makeNewTunnels()
101 logging.debug('Tunnels refreshed')
102 self.next_refresh = time.time() + self._refresh_time
103
104 def _cleanDeads(self):
105 for prefix in self._connection_dict.keys():
106 if not self._connection_dict[prefix].refresh():
107 self._kill(prefix)
108 self._peer_db.flagPeer(prefix)
109
110 def _removeSomeTunnels(self):
111 # Get the candidates to killing
112 candidates = sorted(self._connection_dict, key=lambda p:
113 self._connection_dict[p].bandwidth)
114 candidates = sorted(candidates[0: int(math.ceil((1 - protected)
115 * len(candidates)))], key=lambda p:
116 self._connection_dict[p].routes)
117 for prefix in candidates[0: max(0, len(self._connection_dict) -
118 self._client_count + self._refresh_count)]:
119 self._kill(prefix)
120
121 def _kill(self, prefix):
122 logging.info('Killing the connection with %s/%u...'
123 % (hex(int(prefix,2))[2:], len(prefix)))
124 connection = self._connection_dict.pop(prefix)
125 try:
126 connection.process.terminate()
127 except OSError:
128 # If the process is already exited
129 pass
130 self.free_interface_set.add(connection.iface)
131 self._peer_db.unusePeer(prefix)
132 del self._iface_to_prefix[connection.iface]
133 logging.trace('Connection with %s/%u killed'
134 % (hex(int(prefix,2))[2:], len(prefix)))
135
136 def _makeNewTunnels(self):
137 i = 0
138 logging.trace('Trying to make %i new tunnels...' %
139 (self._client_count - len(self._connection_dict)))
140 try:
141 for prefix, address in self._peer_db.getUnusedPeers(
142 self._client_count - len(self._connection_dict)):
143 logging.info('Establishing a connection with %s/%u' %
144 (hex(int(prefix, 2))[2:], len(prefix)))
145 iface = self.free_interface_set.pop()
146 self._connection_dict[prefix] = Connection(address,
147 self._write_pipe, self._hello, iface,
148 prefix, self._ovpn_args)
149 self._iface_to_prefix[iface] = prefix
150 self._peer_db.usePeer(prefix)
151 i += 1
152 logging.trace('%u new tunnels established' % (i,))
153 except KeyError:
154 logging.warning("""Can't establish connection with %s
155 : no available interface""" % prefix)
156 except Exception:
157 traceback.print_exc()
158
159 def _countRoutes(self):
160 logging.debug('Starting to count the routes on each interface...')
161 self._peer_db.clear_blacklist(0)
162 for iface in self._iface_to_prefix.keys():
163 self._connection_dict[self._iface_to_prefix[iface]].routes = 0
164 for line in open('/proc/net/ipv6_route'):
165 line = line.split()
166 ip = bin(int(line[0], 16))[2:].rjust(128, '0')
167
168 if ip.startswith(self._network):
169 iface = line[-1]
170 subnet_size = int(line[1], 16)
171 logging.trace('Route on iface %s detected to %s/%s'
172 % (iface, ip, subnet_size))
173 if iface in self._iface_to_prefix.keys():
174 self._connection_dict[self._iface_to_prefix[iface]].routes += 1
175 if iface in self._iface_list and self._net_len < subnet_size < 128:
176 prefix = ip[self._net_len:subnet_size]
177 logging.debug('A route to %s (%s) has been discovered on the LAN'
178 % (hex(int(prefix), 2)[2:], prefix))
179 self._peer_db.blacklist(prefix, 0)
180
181 logging.debug("Routes have been counted")
182 for p in self._connection_dict.keys():
183 logging.trace('Routes on iface %s : %s' % (
184 self._connection_dict[p].iface,
185 self._connection_dict[p].routes))
186
187 def killAll(self):
188 for prefix in self._connection_dict.keys():
189 self._kill(prefix)
190