Merge branch 'master' of https://git.erp5.org/repos/vifibnet
[re6stnet.git] / tunnel.py
1 import os, random, traceback, time, struct, subprocess, operator, math
2 import plib, utils, db
3
4 log = None
5 smooth = 0.3 # this is used to smooth the traffic sampling. Lower value
6 # mean more smooth
7 protected = 0.2 # ratio of the tunnels protected against kill because they are
8 # used a lot
9
10 # Be carfull the refresh interval should let the routes be established
11
12
13 class Connection:
14
15 def __init__(self, address, write_pipe, hello, iface, prefix,
16 ovpn_args):
17 self.process = plib.client(address, write_pipe, hello, '--dev', iface,
18 *ovpn_args, stdout=os.open(os.path.join(log,
19 'vifibnet.client.%s.log' % (prefix,)),
20 os.O_WRONLY | os.O_CREAT | os.O_TRUNC),
21 stderr=subprocess.STDOUT)
22
23 self.iface = iface
24 self.routes = 0
25 self._prefix = prefix
26 self.bandwidth = None
27 self._last_trafic = None
28
29 # TODO : update the stats
30 def refresh(self):
31 # Check that the connection is alive
32 if self.process.poll() != None:
33 utils.log('Connection with %s has failed with return code %s'
34 % (self._prefix, self.process.returncode), 3)
35 return False
36
37 # self._updateBandwidth()
38 return True
39
40 # Unused for now. By killing tunnels with significantly lower trafic
41 # in comparison to other tunnels, we hope to connect to nodes with
42 # better bandwith, in order to improve connectivity with destinations
43 # we are really interested in.
44 def _updateBandwidth(self):
45 try:
46 f_rx = open('/sys/class/net/%s/statistics/rx_bytes' %
47 self.iface, 'r')
48 f_tx = open('/sys/class/net/%s/statistics/tx_bytes' %
49 self.iface, 'r')
50
51 trafic = int(f_rx.read()) + int(f_tx.read())
52 t = time.time()
53
54 if bool(self._last_trafic):
55 bw = (trafic - self._last_trafic) / (t -
56 self._last_trafic_update)
57 if bool(self.bandwidth):
58 self.bandwidth = ((1 - smooth) * self.bandwidth
59 + smooth * bw)
60 else:
61 self.bandwidth = bw
62
63 utils.log('New bandwidth calculated on iface %s : %s' %
64 (self.iface, self.bandwidth), 4)
65
66 self._last_trafic_update = t
67 self._last_trafic = trafic
68 except IOError: # This just means that the interface is downs
69 utils.log('Unable to calculate bandwidth on iface %s' %
70 self.iface, 4)
71
72
73 class TunnelManager:
74
75 def __init__(self, write_pipe, peer_db, openvpn_args, hello_interval,
76 refresh, connection_count, refresh_rate, iface_list, network):
77 self._write_pipe = write_pipe
78 self._peer_db = peer_db
79 self._connection_dict = {}
80 self._iface_to_prefix = {}
81 self._ovpn_args = openvpn_args
82 self._hello = hello_interval
83 self._refresh_time = refresh
84 self._network = network
85 self._net_len = len(network)
86 self._iface_list = iface_list
87 self.free_interface_set = set(('client1', 'client2', 'client3',
88 'client4', 'client5', 'client6',
89 'client7', 'client8', 'client9',
90 'client10', 'client11', 'client12'))
91 self.next_refresh = time.time()
92
93 self._client_count = int(math.ceil(float(connection_count) / 2.0))
94 self._refresh_count = int(math.ceil(refresh_rate * self._client_count))
95
96 def refresh(self):
97 utils.log('Refreshing the tunnels...', 2)
98 self._cleanDeads()
99 self._countRoutes()
100 self._removeSomeTunnels()
101 self._makeNewTunnels()
102 utils.log('Tunnels refreshed', 2)
103 self.next_refresh = time.time() + self._refresh_time
104
105 def _cleanDeads(self):
106 for prefix in self._connection_dict.keys():
107 if not self._connection_dict[prefix].refresh():
108 self._kill(prefix)
109 self._peer_db.flagPeer(prefix)
110
111 def _removeSomeTunnels(self):
112 # Get the candidates to killing
113 candidates = sorted(self._connection_dict, key=lambda p:
114 self._connection_dict[p].bandwidth)
115 candidates = sorted(candidates[0: int(math.ceil((1 - protected)
116 * len(candidates)))], key=lambda p:
117 self._connection_dict[p].routes)
118 for prefix in candidates[0: max(0, len(self._connection_dict) -
119 self._client_count + self._refresh_count)]:
120 self._kill(prefix)
121
122 def _kill(self, prefix):
123 utils.log('Killing the connection with %s...' % (prefix,), 2)
124 connection = self._connection_dict.pop(prefix)
125 try:
126 connection.process.terminate()
127 except OSError:
128 # If the process is already exited
129 pass
130 self.free_interface_set.add(connection.iface)
131 self._peer_db.unusePeer(prefix)
132 del self._iface_to_prefix[connection.iface]
133 utils.log('Connection with %s killed' % (prefix,), 2)
134
135 def _makeNewTunnels(self):
136 i = 0
137 utils.log('Trying to make %i new tunnels...' %
138 (self._client_count - len(self._connection_dict)), 5)
139 try:
140 for prefix, address in self._peer_db.getUnusedPeers(
141 self._client_count - len(self._connection_dict)):
142 utils.log('Establishing a connection with %s' % prefix, 2)
143 iface = self.free_interface_set.pop()
144 self._connection_dict[prefix] = Connection(address,
145 self._write_pipe, self._hello, iface,
146 prefix, self._ovpn_args)
147 self._iface_to_prefix[iface] = prefix
148 self._peer_db.usePeer(prefix)
149 i += 1
150 utils.log('%u new tunnels established' % (i,), 3)
151 except KeyError:
152 utils.log("""Can't establish connection with %s
153 : no available interface""" % prefix, 2)
154 except Exception:
155 traceback.print_exc()
156
157 def _countRoutes(self):
158 utils.log('Starting to count the routes on each interface...', 3)
159 self._peer_db.clear_blacklist(0)
160 for iface in self._iface_to_prefix.keys():
161 self._connection_dict[self._iface_to_prefix[iface]].routes = 0
162 for line in open('/proc/net/ipv6_route'):
163 line = line.split()
164 ip = bin(int(line[0], 16))[2:].rjust(128, '0')
165
166 if ip.startswith(self._network):
167 iface = line[-1]
168 subnet_size = int(line[1], 16)
169 utils.log('Route on iface %s detected to %s/%s'
170 % (iface, ip, subnet_size), 8)
171 if iface in self._iface_to_prefix.keys():
172 self._connection_dict[self._iface_to_prefix[iface]].routes += 1
173 if iface in self._iface_list and self._net_len < subnet_size < 128:
174 prefix = ip[self._net_len:subnet_size]
175 utils.log('A route to %s has been discovered on the LAN'
176 % (prefix,), 3)
177 self._peer_db.blacklist(prefix, 0)
178
179 utils.log("Routes have been counted", 3)
180 for p in self._connection_dict.keys():
181 utils.log('Routes on iface %s : %s' % (
182 self._connection_dict[p].iface,
183 self._connection_dict[p].routes), 5)
184
185 def killAll(self):
186 for prefix in self._connection_dict.keys():
187 self._kill(prefix)
188