Peer discovery through server added
[re6stnet.git] / vifibnet.py
1 #!/usr/bin/env python
2 import argparse, errno, os, select, sqlite3, subprocess, sys, time, xmlrpclib
3 import traceback
4 import upnpigd
5 import openvpn
6 import random
7 import log
8
9 VIFIB_NET = "2001:db8:42::/48"
10 connection_dict = {} # to remember current connections we made
11 free_interface_set = set(('client1', 'client2', 'client3', 'client4', 'client5',
12 'client6', 'client7', 'client8', 'client9', 'client10'))
13
14 # TODO : flag in some way the peers that are connected to us so we don't connect to them
15 # Or maybe we just don't care,
16 class PeersDB:
17 def __init__(self, dbPath):
18 self.proxy = xmlrpclib.ServerProxy('http://%s:%u' % (config.server, config.server_port))
19
20 log.log('Connectiong to peers database', 4)
21 self.db = sqlite3.connect(dbPath, isolation_level=None)
22 log.log('Initializing peers database', 4)
23 try:
24 self.db.execute("""CREATE TABLE peers (
25 id INTEGER PRIMARY KEY AUTOINCREMENT,
26 ip TEXT NOT NULL,
27 port INTEGER NOT NULL,
28 proto TEXT NOT NULL,
29 used INTEGER NOT NULL default 0)""")
30 self.db.execute("CREATE INDEX _peers_used ON peers(used)")
31 self.db.execute("UPDATE peers SET used = 0")
32 except sqlite3.OperationalError, e:
33 if e.args[0] != 'table peers already exists':
34 raise RuntimeError
35 else:
36 self.populateDB(100)
37
38 def populateDB(self, n):
39 self.db.executemany("INSERT INTO peers (ip, port, proto) VALUES ?", self.proxy.getPeerList(n))
40
41 def getUnusedPeers(self, nPeers):
42 return self.db.execute("SELECT id, ip, port, proto FROM peers WHERE used = 0 "
43 "ORDER BY RANDOM() LIMIT ?", (nPeers,))
44
45 def usePeer(self, id):
46 log.log('Updating peers database : using peer ' + str(id), 5)
47 self.db.execute("UPDATE peers SET used = 1 WHERE id = ?", (id,))
48
49 def unusePeer(self, id):
50 log.log('Updating peers database : unusing peer ' + str(id), 5)
51 self.db.execute("UPDATE peers SET used = 0 WHERE id = ?", (id,))
52
53 def ipFromPrefix(prefix, prefix_len):
54 tmp = hew(int(prefix, 2))[2::]
55 ip = VIFIB_NET
56 for i in xrange(0, len(ip), 4):
57 ip += tmp[i:i+4] + ':'
58 ip += ':'
59
60 def startBabel(**kw):
61 args = ['babeld',
62 '-C', 'redistribute local ip %s' % (config.ip),
63 '-C', 'redistribute local deny',
64 # Route VIFIB ip adresses
65 '-C', 'in ip %s' % VIFIB_NET,
66 # Route only addresse in the 'local' network,
67 # or other entire networks
68 #'-C', 'in ip %s' % (config.ip),
69 #'-C', 'in ip ::/0 le %s' % network_mask,
70 # Don't route other addresses
71 '-C', 'in deny',
72 '-d', str(config.verbose),
73 '-s',
74 ]
75 if config.babel_state:
76 args += '-S', config.babel_state
77 return subprocess.Popen(args + ['vifibnet'] + list(free_interface_set), **kw)
78
79 def getConfig():
80 global config
81 parser = argparse.ArgumentParser(
82 description='Resilient virtual private network application')
83 _ = parser.add_argument
84 _('--server', required=True,
85 help='Address for peer discovery server')
86 _('--server-port', required=True,
87 help='Peer discovery server port')
88 _('--log-directory', default='/var/log',
89 help='Path to vifibnet logs directory')
90 _('--client-count', default=2, type=int,
91 help='Number of client connections')
92 # TODO : use maxpeer
93 _('--max-clients', default=10, type=int,
94 help='the number of peers that can connect to the server')
95 _('--refresh-time', default=60, type=int,
96 help='the time (seconds) to wait before changing the connections')
97 _('--refresh-count', default=1, type=int,
98 help='The number of connections to drop when refreshing the connections')
99 _('--db', default='/var/lib/vifibnet/peers.db',
100 help='Path to peers database')
101 _('--dh', required=True,
102 help='Path to dh file')
103 _('--babel-state', default='/var/lib/vifibnet/babel_state',
104 help='Path to babeld state-file')
105 _('--verbose', '-v', default=0, type=int,
106 help='Defines the verbose level')
107 _('--cert', required=True,
108 help='Path to the certificate file')
109 # Temporary args - to be removed
110 _('--ip', required=True,
111 help='IPv6 of the server')
112 # Openvpn options
113 _('openvpn_args', nargs=argparse.REMAINDER,
114 help="Common OpenVPN options (e.g. certificates)")
115 openvpn.config = config = parser.parse_args()
116 with open(config.cert, 'r') as f:
117 cert = crypto.load_certificate(crypto.FILETYPE_PEM, f)
118 subject = cert.get_subject()
119 prefix_txt, prefix_len_txt = subject.serialNumber.split('/')
120 prefix = int(prefix_txt)
121 prefix_len = int(prefix_len_txt)
122 ip = ipFromPrefix(prefix)
123 print ip
124 if config.openvpn_args[0] == "--":
125 del config.openvpn_args[0]
126 config.openvpn_args.append('--cert')
127 config.openvpn_args.append(config.cert)
128
129 def startNewConnection(n):
130 try:
131 for id, ip, port, proto in peers_db.getUnusedPeers(n):
132 log.log('Establishing a connection with id %s (%s:%s)' % (id,ip,port), 2)
133 iface = free_interface_set.pop()
134 connection_dict[id] = ( openvpn.client( ip, '--dev', iface, '--proto', proto, '--rport', str(port),
135 stdout=os.open(os.path.join(config.log_directory, 'vifibnet.client.%s.log' % (id,)),
136 os.O_WRONLY|os.O_CREAT|os.O_TRUNC) ),
137 iface)
138 peers_db.usePeer(id)
139 except KeyError:
140 log.log("Can't establish connection with %s : no available interface" % ip, 2)
141 pass
142 except Exception:
143 traceback.print_exc()
144
145 def killConnection(id):
146 try:
147 log.log('Killing the connection with id ' + str(id), 2)
148 p, iface = connection_dict.pop(id)
149 p.kill()
150 free_interface_set.add(iface)
151 peers_db.unusePeer(id)
152 except KeyError:
153 log.log("Can't kill connection to " + peer + ": no existing connection", 1)
154 pass
155 except Exception:
156 log.log("Can't kill connection to " + peer + ": uncaught error", 1)
157 pass
158
159 def checkConnections():
160 for id in connection_dict.keys():
161 p, iface = connection_dict[id]
162 if p.poll() != None:
163 log.log('Connection with %s has failed with return code %s' % (id, p.returncode), 3)
164 free_interface_set.add(iface)
165 peers_db.unusePeer(id)
166 del connection_dict[id]
167
168 def refreshConnections():
169 checkConnections()
170 # Kill some random connections
171 try:
172 for i in range(0, max(0, len(connection_dict) - config.client_count + config.refresh_count)):
173 id = random.choice(connection_dict.keys())
174 killConnection(id)
175 except Exception:
176 pass
177 # Establish new connections
178 startNewConnection(config.client_count - len(connection_dict))
179
180 def handle_message(msg):
181 script_type, common_name = msg.split()
182 if script_type == 'client-connect':
183 log.log('Incomming connection from %s' % (common_name,), 3)
184 # TODO : check if we are not already connected to it
185 elif script_type == 'client-disconnect':
186 log.log('%s has disconnected' % (common_name,), 3)
187 else:
188 log.log('Unknow message recieved from the openvpn pipe : ' + msg, 1)
189
190 def main():
191 # Get arguments
192 getConfig()
193 log.verbose = config.verbose
194 (externalIp, externalPort) = upnpigd.GetExternalInfo(1194)
195
196 # Setup database
197 global peers_db # stop using global variables for everything ?
198 peers_db = PeersDB(config.db)
199
200 # Launch babel on all interfaces
201 log.log('Starting babel', 3)
202 babel = startBabel(stdout=os.open('%s/babeld.log' % (config.log_directory,), os.O_WRONLY | os.O_CREAT | os.O_TRUNC),
203 stderr=subprocess.STDOUT)
204
205 # Create and open read_only pipe to get connect/disconnect events from openvpn
206 log.log('Creating pipe for openvpn events', 3)
207 r_pipe, write_pipe = os.pipe()
208 read_pipe = os.fdopen(r_pipe)
209
210 # Establish connections
211 log.log('Starting openvpn server', 3)
212 serverProcess = openvpn.server(config.ip, write_pipe, '--dev', 'vifibnet',
213 stdout=os.open(os.path.join(config.log_directory, 'vifibnet.server.log'), os.O_WRONLY | os.O_CREAT | os.O_TRUNC))
214 startNewConnection(config.client_count)
215
216 # Timed refresh initializing
217 next_refresh = time.time() + config.refresh_time
218
219 # main loop
220 try:
221 while True:
222 ready, tmp1, tmp2 = select.select([read_pipe], [], [],
223 max(0, next_refresh - time.time()))
224 if ready:
225 handle_message(read_pipe.readline())
226 if time.time() >= next_refresh:
227 refreshConnections()
228 next_refresh = time.time() + config.refresh_time
229 except KeyboardInterrupt:
230 return 0
231
232 if __name__ == "__main__":
233 main()
234