Bug solved : the used flag was overwrite
[re6stnet.git] / re6st / db.py
1 import logging, sqlite3, socket, subprocess, xmlrpclib, time
2 import utils
3
4
5 class PeerManager:
6
7 # internal ip = temp arg/attribute
8 def __init__(self, db_path, registry, key_path, refresh_time, address,
9 internal_ip, prefix, manual, pp, db_size):
10 self._refresh_time = refresh_time
11 self.address = address
12 self._internal_ip = internal_ip
13 self._prefix = prefix
14 self._db_size = db_size
15 self._registry = registry
16 self._key_path = key_path
17 self._pp = pp
18 self._manual = manual
19 self.tunnel_manager = None
20
21 logging.info('Connecting to peers database...')
22 self._db = sqlite3.connect(db_path, isolation_level=None)
23 logging.debug('Database opened')
24
25 logging.info('Preparing peers database...')
26 self._db.execute("""CREATE TABLE IF NOT EXISTS peers (
27 prefix TEXT PRIMARY KEY,
28 address TEXT NOT NULL,
29 used INTEGER NOT NULL DEFAULT 0,
30 date INTEGER DEFAULT (strftime('%s', 'now')))""")
31 self._db.execute("UPDATE peers SET used = 0")
32 self._db.execute("""CREATE INDEX IF NOT EXISTS
33 _peers_used ON peers(used)""")
34 self._db.execute("""CREATE TABLE IF NOT EXISTS config (
35 name text primary key,
36 value text)""")
37 self._db.execute('ATTACH DATABASE ":memory:" AS blacklist')
38 self._db.execute("""CREATE TABLE blacklist.flag (
39 prefix TEXT PRIMARY KEY,
40 flag INTEGER NOT NULL)""")
41 self._db.execute("""CREATE INDEX blacklist.blacklist_flag
42 ON flag(flag)""")
43 self._db.execute("INSERT INTO blacklist.flag VALUES (?,?)", (prefix, 1))
44 try:
45 a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next()
46 except StopIteration:
47 proxy = xmlrpclib.ServerProxy(registry)
48 a = proxy.getPrivateAddress()
49 self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,))
50 self._proxy = xmlrpclib.ServerProxy(a)
51 logging.debug('Database prepared')
52
53 self.next_refresh = time.time()
54
55 def clear_blacklist(self, flag):
56 logging.info('Clearing blacklist from flag %u' % flag)
57 self._db.execute("DELETE FROM blacklist.flag WHERE flag = ?",
58 (flag,))
59 logging.info('Blacklist cleared')
60
61 def blacklist(self, prefix, flag):
62 logging.info('Blacklisting %s' % prefix)
63 self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,))
64 self._db.execute("INSERT OR REPLACE INTO blacklist.flag VALUES (?,?)",
65 (prefix, flag))
66 logging.debug('%s blacklisted' % prefix)
67
68 def whitelist(self, prefix):
69 logging.info('Unblacklisting %s' % prefix)
70 self._db.execute("DELETE FROM blacklist.flag WHERE prefix = ?", (prefix,))
71 logging.debug('%s whitelisted' % prefix)
72
73 def refresh(self):
74 logging.info('Refreshing the peers DB...')
75 try:
76 self._declare()
77 self.next_refresh = time.time() + self._refresh_time
78 except socket.error, e:
79 logging.info('Connection to server failed, re-bootstraping')
80 try:
81 self._bootstrap()
82 self.next_refresh = time.time() + self._refresh_time
83 except socket.error, e:
84 logging.debug('socket.error : %s' % e)
85 logging.info('Connection to server failed, retrying in 30s')
86 self.next_refresh = time.time() + 30
87
88 def _declare(self):
89 if self.address != None:
90 logging.info('Sending connection info to server...')
91 self._proxy.declare(utils.address_str(self.address))
92 logging.debug('Info sent')
93 else:
94 logging.warning("Warning : couldn't send ip, unknown external config")
95
96 def getUnusedPeers(self, peer_count):
97 for populate in self._bootstrap, bool:
98 peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
99 <= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
100 (peer_count,)).fetchall()
101 if peer_list:
102 return peer_list
103 populate()
104 logging.warning('Cannot find any new peers')
105 return []
106
107 def _bootstrap(self):
108 logging.info('Getting Boot peer...')
109 proxy = xmlrpclib.ServerProxy(self._registry)
110 try:
111 bootpeer = proxy.getBootstrapPeer(self._prefix).data
112 logging.debug('Boot peer received from server')
113 p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
114 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
115 bootpeer = p.communicate(bootpeer)[0].split()
116 return self._addPeer(bootpeer)
117 except socket.error:
118 pass
119 except sqlite3.IntegrityError, e:
120 if e.args[0] != 'column prefix is not unique':
121 raise
122 #except Exception, e:
123 # logging.info('Unable to bootstrap : %s' % e)
124 return False
125
126 def usePeer(self, prefix):
127 logging.trace('Updating peers database : using peer %s' % prefix)
128 self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
129 (prefix,))
130 logging.debug('DB updated')
131
132 def unusePeer(self, prefix):
133 logging.trace('Updating peers database : unusing peer %s' % prefix)
134 self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
135 (prefix,))
136 logging.debug('DB updated')
137
138 def flagPeer(self, prefix):
139 logging.trace('Updating peers database : flagging peer %s' % prefix)
140 self._db.execute("UPDATE peers SET used = -1 WHERE prefix = ?",
141 (prefix,))
142 logging.debug('DB updated')
143
144 def handle_message(self, msg):
145 script_type, arg = msg.split()
146 if script_type == 'client-connect':
147 logging.info('Incoming connection from %s' % (arg,))
148 prefix = utils.binFromSubnet(arg)
149 if self.tunnel_manager.checkIncomingTunnel(prefix):
150 self.blacklist(prefix, 2)
151 elif script_type == 'client-disconnect':
152 self.whitelist(utils.binFromSubnet(arg))
153 logging.info('%s has disconnected' % (arg,))
154 elif script_type == 'route-up':
155 if not self._manual:
156 external_ip = arg
157 new_address = list([external_ip, port, proto]
158 for port, proto, _ in self._pp)
159 if self.address != new_address:
160 self.address = new_address
161 logging.info('Received new external ip : %s'
162 % (external_ip,))
163 try:
164 self._declare()
165 except socket.error, e:
166 logging.debug('socket.error : %s' % e)
167 logging.info('''Connection to server failed while
168 declaring external infos''')
169 else:
170 logging.debug('Unknow message recieved from the openvpn pipe : %s'
171 % msg)
172
173 def readSocket(self, msg):
174 peer = msg.split(' ')
175 if len(peer) != 2:
176 logging.debug('Invalid package recieved : %s' % msg)
177 return
178 self._addPeer(peer)
179
180 def _addPeer(self, peer):
181 logging.debug('Adding peer %s' % peer)
182 if int(self._db.execute("""SELECT COUNT(*) FROM blacklist.flag WHERE prefix = ?""", (peer[0],)).next()[0]) > 0:
183 logging.info('Peer is blacklisted')
184 return False
185 self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
186 RANDOM() LIMIT MAX(0, (SELECT COUNT(*) FROM peers
187 WHERE used <= 0) - ?)""", (str(self._db_size),))
188 self._db.execute("INSERT OR IGNORE INTO peers (prefix, address) VALUES (?,?)", peer)
189 logging.debug('Peer added')
190 return True