Peers can now discover themselves without a central registry.
[re6stnet.git] / re6st / db.py
1 import logging, sqlite3, socket, subprocess, xmlrpclib, time
2 import utils
3
4
5 class PeerManager:
6
7 # internal ip = temp arg/attribute
8 def __init__(self, db_path, registry, key_path, refresh_time, address,
9 internal_ip, prefix, manual, pp, db_size):
10 self._refresh_time = refresh_time
11 self.address = address
12 self._internal_ip = internal_ip
13 self._prefix = prefix
14 self._db_size = db_size
15 self._registry = registry
16 self._key_path = key_path
17 self._pp = pp
18 self._manual = manual
19 self.tunnel_manager = None
20
21 logging.info('Connecting to peers database...')
22 self._db = sqlite3.connect(db_path, isolation_level=None)
23 logging.debug('Database opened')
24
25 logging.info('Preparing peers database...')
26 self._db.execute("""CREATE TABLE IF NOT EXISTS peers (
27 prefix TEXT PRIMARY KEY,
28 address TEXT NOT NULL,
29 used INTEGER NOT NULL DEFAULT 0,
30 date INTEGER DEFAULT (strftime('%s', 'now')))""")
31 self._db.execute("UPDATE peers SET used = 0")
32 self._db.execute("""CREATE INDEX IF NOT EXISTS
33 _peers_used ON peers(used)""")
34 self._db.execute("""CREATE TABLE IF NOT EXISTS config (
35 name text primary key,
36 value text)""")
37 self._db.execute('ATTACH DATABASE ":memory:" AS blacklist')
38 self._db.execute("""CREATE TABLE blacklist.flag (
39 prefix TEXT PRIMARY KEY,
40 flag INTEGER NOT NULL)""")
41 self._db.execute("""CREATE INDEX blacklist.blacklist_flag
42 ON flag(flag)""")
43 self._db.execute("INSERT INTO blacklist.flag VALUES (?,?)", (prefix, 1))
44 try:
45 a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next()
46 except StopIteration:
47 proxy = xmlrpclib.ServerProxy(registry)
48 a = proxy.getPrivateAddress()
49 self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,))
50 self._proxy = xmlrpclib.ServerProxy(a)
51 logging.debug('Database prepared')
52
53 self.next_refresh = time.time()
54
55 def clear_blacklist(self, flag):
56 logging.info('Clearing blacklist from flag %u' % flag)
57 self._db.execute("DELETE FROM blacklist.flag WHERE flag = ?",
58 (flag,))
59 logging.info('Blacklist cleared')
60
61 def blacklist(self, prefix, flag):
62 logging.info('Blacklisting %s' % prefix)
63 self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,))
64 self._db.execute("INSERT OR REPLACE INTO blacklist.flag VALUES (?,?)",
65 (prefix, flag))
66 logging.debug('%s blacklisted' % prefix)
67
68 def whitelist(self, prefix):
69 logging.info('Unblacklisting %s' % prefix)
70 self._db.execute("DELETE FROM blacklist.flag WHERE prefix = ?", (prefix,))
71 logging.debug('%s whitelisted' % prefix)
72
73 def refresh(self):
74 logging.info('Refreshing the peers DB...')
75 try:
76 self._declare()
77 self._populate()
78 logging.info('DB refreshed')
79 self.next_refresh = time.time() + self._refresh_time
80 return True
81 except socket.error, e:
82 logging.debug('socket.error : %s' % e)
83 logging.info('Connection to server failed, retrying in 30s')
84 self.next_refresh = time.time() + 30
85 return False
86
87 def _declare(self):
88 if self.address != None:
89 logging.info('Sending connection info to server...')
90 self._proxy.declare(utils.address_str(self.address))
91 logging.debug('Info sent')
92 else:
93 logging.warning("Warning : couldn't send ip, unknown external config")
94
95 def _populate(self):
96 logging.info('Populating the peers DB...')
97 new_peer_list = self._proxy.getPeerList(self._db_size)
98 with self._db:
99 self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
100 RANDOM() LIMIT MAX(0, ? + (SELECT COUNT(*)
101 FROM peers WHERE used <= 0))""",
102 (str(len(new_peer_list) - self._db_size),))
103 self._db.executemany("""INSERT OR IGNORE INTO peers (prefix, address)
104 VALUES (?,?)""", new_peer_list)
105 self._db.execute("""DELETE FROM peers WHERE prefix IN
106 (SELECT prefix FROM blacklist.flag)""")
107 logging.info('DB populated')
108 logging.trace('New peers : %s' % (', '.join(map(str, new_peer_list)),))
109
110 def getUnusedPeers(self, peer_count):
111 for populate in self.refresh, self._bootstrap, bool:
112 peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
113 <= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
114 (peer_count,)).fetchall()
115 if peer_list:
116 return peer_list
117 populate()
118 logging.warning('Cannot find any new peers')
119 return []
120
121 def _bootstrap(self):
122 logging.info('Getting Boot peer...')
123 proxy = xmlrpclib.ServerProxy(self._registry)
124 try:
125 bootpeer = proxy.getBootstrapPeer(self._prefix).data
126 logging.debug('Boot peer received from server')
127 p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
128 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
129 bootpeer = p.communicate(bootpeer)[0].split()
130 return self._addPeer(bootpeer)
131 except socket.error:
132 pass
133 except sqlite3.IntegrityError, e:
134 if e.args[0] != 'column prefix is not unique':
135 raise
136 except:
137 logging.info('Unable to bootstrap')
138 return False
139
140 def usePeer(self, prefix):
141 logging.trace('Updating peers database : using peer %s' % prefix)
142 self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
143 (prefix,))
144 logging.debug('DB updated')
145
146 def unusePeer(self, prefix):
147 logging.trace('Updating peers database : unusing peer %s' % prefix)
148 self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
149 (prefix,))
150 logging.debug('DB updated')
151
152 def flagPeer(self, prefix):
153 logging.trace('Updating peers database : flagging peer %s' % prefix)
154 self._db.execute("UPDATE peers SET used = -1 WHERE prefix = ?",
155 (prefix,))
156 logging.debug('DB updated')
157
158 def handle_message(self, msg):
159 script_type, arg = msg.split()
160 if script_type == 'client-connect':
161 logging.info('Incoming connection from %s' % (arg,))
162 prefix = utils.binFromSubnet(arg)
163 if self.tunnel_manager.checkIncomingTunnel(prefix):
164 self.blacklist(prefix, 2)
165 elif script_type == 'client-disconnect':
166 self.whitelist(utils.binFromSubnet(arg))
167 logging.info('%s has disconnected' % (arg,))
168 elif script_type == 'route-up':
169 if not self._manual:
170 external_ip = arg
171 new_address = list([external_ip, port, proto]
172 for port, proto, _ in self._pp)
173 if self.address != new_address:
174 self.address = new_address
175 logging.info('Received new external ip : %s'
176 % (external_ip,))
177 try:
178 self._declare()
179 except socket.error, e:
180 logging.debug('socket.error : %s' % e)
181 logging.info('''Connection to server failed while
182 declaring external infos''')
183 else:
184 logging.debug('Unknow message recieved from the openvpn pipe : %s'
185 % msg)
186
187 def readSocket(self, msg):
188 peer = msg.split(' ')
189 if len(peer) != 2:
190 logging.debug('Invalid package recieved : %s' % msg)
191 return
192 self._addPeer(peer)
193
194 def _addPeer(self, peer):
195 logging.debug('Adding peer %s' % peer)
196 if int(self._db.execute("""SELECT COUNT(*) FROM blacklist.flag WHERE prefix = ?""", (peer[0],)).next()[0]) > 0:
197 logging.info('Peer is blacklisted')
198 return False
199 self._db.execute("INSERT INTO peers (prefix, address) VALUES (?,?)", peer)
200 logging.debug('Peer added')
201 return True