Solved a bun in the incomming connections blacklisting process
[re6stnet.git] / re6st / db.py
1 import logging, sqlite3, socket, subprocess, xmlrpclib, time
2 import utils
3
4
5 class PeerManager:
6
7 # internal ip = temp arg/attribute
8 def __init__(self, db_path, registry, key_path, refresh_time, address,
9 internal_ip, prefix, manual, pp, db_size):
10 self._refresh_time = refresh_time
11 self._address = address
12 self._internal_ip = internal_ip
13 self._prefix = prefix
14 self._db_size = db_size
15 self._registry = registry
16 self._key_path = key_path
17 self._pp = pp
18 self._manual = manual
19 self.tunnel_manager = None
20
21 logging.info('Connecting to peers database...')
22 self._db = sqlite3.connect(db_path, isolation_level=None)
23 logging.debug('Database opened')
24
25 logging.info('Preparing peers database...')
26 self._db.execute("""CREATE TABLE IF NOT EXISTS peers (
27 prefix TEXT PRIMARY KEY,
28 address TEXT NOT NULL,
29 used INTEGER NOT NULL DEFAULT 0,
30 date INTEGER DEFAULT (strftime('%s', 'now')))""")
31 self._db.execute("UPDATE peers SET used = 0")
32 self._db.execute("""CREATE INDEX IF NOT EXISTS
33 _peers_used ON peers(used)""")
34 self._db.execute("""CREATE TABLE IF NOT EXISTS config (
35 name text primary key,
36 value text)""")
37 self._db.execute('ATTACH DATABASE ":memory:" AS blacklist')
38 self._db.execute("""CREATE TABLE blacklist.flag (
39 prefix TEXT PRIMARY KEY,
40 flag INTEGER NOT NULL)""")
41 self._db.execute("""CREATE INDEX blacklist.blacklist_flag
42 ON flag(flag)""")
43 self._db.execute("INSERT INTO blacklist.flag VALUES (?,?)", (prefix, 1))
44 try:
45 a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next()
46 except StopIteration:
47 proxy = xmlrpclib.ServerProxy(registry)
48 a = proxy.getPrivateAddress()
49 self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,))
50 self._proxy = xmlrpclib.ServerProxy(a)
51 logging.debug('Database prepared')
52
53 self.next_refresh = time.time()
54
55 def clear_blacklist(self, flag):
56 logging.info('Clearing blacklist from flag %u' % flag)
57 self._db.execute("DELETE FROM blacklist.flag WHERE flag = ?",
58 (flag,))
59 logging.info('Blacklist cleared')
60
61 def blacklist(self, prefix, flag):
62 logging.info('Blacklisting %s' % prefix)
63 self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,))
64 self._db.execute("INSERT OR REPLACE INTO blacklist.flag VALUES (?,?)",
65 (prefix, flag))
66 logging.debug('%s blacklisted' % prefix)
67
68 def whitelist(self, prefix):
69 logging.info('Unblacklisting %s' % prefix)
70 self._db.execute("DELETE FROM blacklist.flag WHERE prefix = ?", (prefix,))
71 logging.debug('%s whitelisted' % prefix)
72
73 def refresh(self):
74 logging.info('Refreshing the peers DB...')
75 try:
76 self._declare()
77 self._populate()
78 logging.info('DB refreshed')
79 self.next_refresh = time.time() + self._refresh_time
80 return True
81 except socket.error, e:
82 logging.debug('socket.error : %s' % e)
83 logging.info('Connection to server failed, retrying in 30s')
84 self.next_refresh = time.time() + 30
85 return False
86
87 def _declare(self):
88 if self._address != None:
89 print self._internal_ip
90 print self._address
91 print utils.address_str(self._address)
92 logging.info('Sending connection info to server...')
93 self._proxy.declare(utils.address_str(self._address))
94 logging.debug('Info sent')
95 else:
96 logging.warning("Warning : couldn't send ip, unknown external config")
97
98 def _populate(self):
99 logging.info('Populating the peers DB...')
100 new_peer_list = self._proxy.getPeerList(self._db_size)
101 with self._db:
102 self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
103 RANDOM() LIMIT MAX(0, ? + (SELECT COUNT(*)
104 FROM peers WHERE used <= 0))""",
105 (str(len(new_peer_list) - self._db_size),))
106 self._db.executemany("""INSERT OR IGNORE INTO peers (prefix, address)
107 VALUES (?,?)""", new_peer_list)
108 self._db.execute("""DELETE FROM peers WHERE prefix IN
109 (SELECT prefix FROM blacklist.flag)""")
110 logging.info('DB populated')
111 logging.trace('New peers : %s' % (', '.join(map(str, new_peer_list)),))
112
113 def getUnusedPeers(self, peer_count):
114 for populate in self.refresh, self._bootstrap, bool:
115 peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
116 <= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
117 (peer_count,)).fetchall()
118 if peer_list:
119 return peer_list
120 populate()
121 logging.warning('Cannot find any new peers')
122 return []
123
124 def _bootstrap(self):
125 logging.info('Getting Boot peer...')
126 proxy = xmlrpclib.ServerProxy(self._registry)
127 try:
128 bootpeer = proxy.getBootstrapPeer(self._prefix).data
129 logging.debug('Boot peer received from server')
130 p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
131 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
132 bootpeer = p.communicate(bootpeer)[0].split()
133 if bootpeer[0] != self._prefix:
134 self._db.execute("INSERT INTO peers (prefix, address) VALUES (?,?)", bootpeer)
135 logging.debug('Boot peer added')
136 return True
137 except socket.error:
138 pass
139 except sqlite3.IntegrityError, e:
140 if e.args[0] != 'column prefix is not unique':
141 raise
142 except StopIteration:
143 logging.info('No peer available for bootstrap')
144 return False
145
146 def usePeer(self, prefix):
147 logging.trace('Updating peers database : using peer %s' % prefix)
148 self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
149 (prefix,))
150 logging.debug('DB updated')
151
152 def unusePeer(self, prefix):
153 logging.trace('Updating peers database : unusing peer %s' % prefix)
154 self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
155 (prefix,))
156 logging.debug('DB updated')
157
158 def flagPeer(self, prefix):
159 logging.trace('Updating peers database : flagging peer %s' % prefix)
160 self._db.execute("UPDATE peers SET used = -1 WHERE prefix = ?",
161 (prefix,))
162 logging.debug('DB updated')
163
164 def handle_message(self, msg):
165 script_type, arg = msg.split()
166 if script_type == 'client-connect':
167 logging.info('Incomming connection from %s' % (arg,))
168 prefix = utils.binFromSubnet(arg)
169 if self.tunnel_manager.checkIncommingTunnel(prefix):
170 self.blacklist(prefix, 2)
171 elif script_type == 'client-disconnect':
172 self.whitelist(utils.binFromSubnet(arg))
173 logging.info('%s has disconnected' % (arg,))
174 elif script_type == 'route-up':
175 if not self._manual:
176 external_ip = arg
177 new_address = list([external_ip, port, proto]
178 for port, proto, _ in self._pp)
179 if self._address != new_address:
180 self._address = new_address
181 logging.info('Received new external ip : %s'
182 % (external_ip,))
183 try:
184 self._declare()
185 except socket.error, e:
186 logging.debug('socket.error : %s' % e)
187 logging.info('''Connection to server failed while
188 declaring external infos''')
189 else:
190 logging.debug('Unknow message recieved from the openvpn pipe : %s'
191 % msg)