Peers can now discover themselves without a central registry.
[re6stnet.git] / re6st / tunnel.py
1 import os, traceback, time, subprocess, logging
2 import socket
3 import plib
4
5 # Be carfull the refresh interval should let the routes be established
6
7 log = None
8
9
10 class Connection:
11
12 def __init__(self, address, write_pipe, hello, iface, prefix, encrypt,
13 ovpn_args):
14 self.process = plib.client(address, write_pipe, hello, encrypt, '--dev', iface,
15 *ovpn_args, stdout=os.open(os.path.join(log,
16 're6stnet.client.%s.log' % (prefix,)),
17 os.O_WRONLY | os.O_CREAT | os.O_TRUNC),
18 stderr=subprocess.STDOUT)
19
20 self.iface = iface
21 self.routes = 0
22 self._prefix = prefix
23
24 def refresh(self):
25 # Check that the connection is alive
26 if self.process.poll() != None:
27 logging.info('Connection with %s has failed with return code %s'
28 % (self._prefix, self.process.returncode))
29 return False
30 return True
31
32
33 class TunnelManager:
34
35 def __init__(self, write_pipe, peer_db, openvpn_args, hello_interval,
36 refresh, connection_count, iface_list, network, prefix, nSend,
37 encrypt):
38 self._write_pipe = write_pipe
39 self._peer_db = peer_db
40 self._connection_dict = {}
41 self._iface_to_prefix = {}
42 self._ovpn_args = openvpn_args
43 self._hello = hello_interval
44 self._refresh_time = refresh
45 self._network = network
46 self._net_len = len(network)
47 self._iface_list = iface_list
48 self._prefix = prefix
49 self._nSend = nSend
50 self._encrypt = encrypt
51
52 self.next_refresh = time.time()
53 self._next_tunnel_refresh = time.time()
54
55 self._client_count = (connection_count + 1) // 2
56 self._refresh_count = 1
57 self.free_interface_set = set('re6stnet' + str(i)
58 for i in xrange(1, self._client_count + 1))
59
60 def refresh(self):
61 logging.info('Checking the tunnels...')
62 self._cleanDeads()
63 if self._next_tunnel_refresh < time.time():
64 logging.info('Refreshing the tunnels...')
65 self._countRoutes()
66 self._removeSomeTunnels()
67 self._next_tunnel_refresh = time.time() + self._refresh_time
68 self._makeNewTunnels()
69 self.next_refresh = time.time() + 5
70
71 def _cleanDeads(self):
72 for prefix in self._connection_dict.keys():
73 if not self._connection_dict[prefix].refresh():
74 self._kill(prefix)
75 self._peer_db.flagPeer(prefix)
76
77 def _removeSomeTunnels(self):
78 # Get the candidates to killing
79 candidates = sorted(self._connection_dict, key=lambda p:
80 self._connection_dict[p].routes)
81 for prefix in candidates[0: max(0, len(self._connection_dict) -
82 self._client_count + self._refresh_count)]:
83 self._kill(prefix)
84
85 def _kill(self, prefix):
86 logging.info('Killing the connection with %s/%u...'
87 % (hex(int(prefix, 2))[2:], len(prefix)))
88 connection = self._connection_dict.pop(prefix)
89 try:
90 connection.process.terminate()
91 except OSError:
92 # If the process is already exited
93 pass
94 self.free_interface_set.add(connection.iface)
95 self._peer_db.unusePeer(prefix)
96 del self._iface_to_prefix[connection.iface]
97 logging.trace('Connection with %s/%u killed'
98 % (hex(int(prefix, 2))[2:], len(prefix)))
99
100 def _makeNewTunnels(self):
101 tunnel_to_make = self._client_count - len(self._connection_dict)
102 if tunnel_to_make <= 0:
103 return
104 i = 0
105 logging.trace('Trying to make %i new tunnels...' % tunnel_to_make)
106 try:
107 for prefix, address in self._peer_db.getUnusedPeers(tunnel_to_make):
108 logging.info('Establishing a connection with %s/%u' %
109 (hex(int(prefix, 2))[2:], len(prefix)))
110 iface = self.free_interface_set.pop()
111 self._connection_dict[prefix] = Connection(address,
112 self._write_pipe, self._hello, iface,
113 prefix, self._encrypt, self._ovpn_args)
114 self._iface_to_prefix[iface] = prefix
115 self._peer_db.usePeer(prefix)
116 i += 1
117 logging.trace('%u new tunnels established' % (i,))
118 except KeyError:
119 logging.warning("""Can't establish connection with %s
120 : no available interface""" % prefix)
121 except Exception:
122 traceback.print_exc()
123
124 def _countRoutes(self):
125 logging.debug('Starting to count the routes on each interface...')
126 self._peer_db.clear_blacklist(0)
127 for iface in self._iface_to_prefix.keys():
128 self._connection_dict[self._iface_to_prefix[iface]].routes = 0
129 for line in open('/proc/net/ipv6_route'):
130 line = line.split()
131 ip = bin(int(line[0], 16))[2:].rjust(128, '0')
132
133 if ip.startswith(self._network):
134 iface = line[-1]
135 subnet_size = int(line[1], 16)
136 logging.trace('Route on iface %s detected to %s/%s'
137 % (iface, ip, subnet_size))
138 if iface in self._iface_to_prefix.keys():
139 self._connection_dict[self._iface_to_prefix[iface]].routes += 1
140 if iface in self._iface_list and self._net_len < subnet_size < 128:
141 prefix = ip[self._net_len:subnet_size]
142 logging.debug('A route to %s has been discovered on the LAN'
143 % (hex(int(prefix), 2)[2:]))
144 self._peer_db.blacklist(prefix, 0)
145 self._notifyPeer(line[0])
146
147 logging.debug("Routes have been counted")
148 for p in self._connection_dict.keys():
149 logging.trace('Routes on iface %s : %s' % (
150 self._connection_dict[p].iface,
151 self._connection_dict[p].routes))
152
153 def killAll(self):
154 for prefix in self._connection_dict.keys():
155 self._kill(prefix)
156
157 def checkIncomingTunnel(self, prefix):
158 if prefix in self._connection_dict:
159 if prefix < self._prefix:
160 return False
161 else:
162 self._kill(prefix)
163 return True
164
165 def _notifyPeer(self, peerIp):
166 try:
167 if self._peer_db.address:
168 ip = '%s:%s:%s:%s:%s:%s:%s:%s' % (peerIp[0:4], peerIp[4:8], peerIp[8:12],
169 peerIp[12:16], peerIp[16:20], peerIp[20:24], peerIp[24:28], peerIp[28:32])
170 logging.debug('Notifying peer %s' % ip)
171 sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
172 sock.sendto('%s %s\n' % (self._prefix, self._peer_db.address), (ip, 326))
173 except socket.error, e:
174 logging.debug('Unable to notify %s' % ip)
175 logging.debug('socket.error : %s' % e)