Fix duplicate bootpeer bug
[re6stnet.git] / re6st / db.py
1 import logging, sqlite3, socket, subprocess, xmlrpclib, time, os
2 import utils
3
4 class PeerManager:
5
6 # internal ip = temp arg/attribute
7 def __init__(self, db_path, registry, key_path, refresh_time, address,
8 internal_ip, prefix, manual, pp , db_size):
9 self._refresh_time = refresh_time
10 self._address = address
11 self._internal_ip = internal_ip
12 self._prefix = prefix
13 self._db_size = db_size
14 self._registry = registry
15 self._key_path = key_path
16 self._pp = pp
17 self._manual = manual
18
19 logging.info('Connecting to peers database...')
20 self._db = sqlite3.connect(db_path, isolation_level=None)
21 logging.debug('Database opened')
22
23 logging.info('Preparing peers database...')
24 self._db.execute("""CREATE TABLE IF NOT EXISTS peers (
25 prefix TEXT PRIMARY KEY,
26 address TEXT NOT NULL,
27 used INTEGER NOT NULL DEFAULT 0,
28 date INTEGER DEFAULT (strftime('%s', 'now')))""")
29 self._db.execute("UPDATE peers SET used = 0")
30 self._db.execute("CREATE INDEX IF NOT EXISTS _peers_used ON peers(used)")
31 self._db.execute("""CREATE TABLE IF NOT EXISTS blacklist (
32 prefix TEXT PRIMARY KEY,
33 flag INTEGER NOT NULL)""")
34 self._db.execute("""CREATE INDEX IF NOT EXISTS
35 blacklist_flag ON blacklist(flag)""")
36 self._db.execute("INSERT OR REPLACE INTO blacklist VALUES (?,?)",
37 (prefix, 1))
38 self._db.execute("""CREATE TABLE IF NOT EXISTS config (
39 name text primary key,
40 value text)""")
41 try:
42 a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next()
43 except StopIteration:
44 proxy = xmlrpclib.ServerProxy(registry)
45 a = proxy.getPrivateAddress()
46 self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,))
47 self._proxy = xmlrpclib.ServerProxy(a)
48 logging.debug('Database prepared')
49
50 self.next_refresh = time.time()
51
52 def clear_blacklist(self, flag):
53 logging.info('Clearing blacklist from flag %u' % flag)
54 self._db.execute("DELETE FROM blacklist WHERE flag = ?",
55 (flag,))
56 logging.info('Blacklist cleared')
57
58 def blacklist(self, prefix, flag):
59 logging.ninfo('Blacklisting %s' % prefix)
60 self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,))
61 self._db.execute("INSERT OR REPLACE INTO blacklist VALUES (?,?)",
62 (prefix, flag))
63 logging.debug('%s blacklisted' % prefix)
64
65 def whitelist(self, prefix):
66 logging.info('Unblacklisting %s' % prefix)
67 self._db.execute("DELETE FROM blacklist WHERE prefix = ?", (prefix,))
68 logging.debug('%s whitelisted' % prefix)
69
70 def refresh(self):
71 logging.info('Refreshing the peers DB...')
72 try:
73 self._declare()
74 self._populate()
75 logging.info('DB refreshed')
76 self.next_refresh = time.time() + self._refresh_time
77 return True
78 except socket.error, e:
79 logging.debug('socket.error : %s' % e)
80 logging.info('Connection to server failed, retrying in 30s')
81 self.next_refresh = time.time() + 30
82 return False
83
84 def _declare(self):
85 if self._address != None:
86 logging.info('Sending connection info to server...')
87 self._proxy.declare((self._internal_ip,
88 utils.address_str(self._address)))
89 logging.debug('Info sent')
90 else:
91 logging.warning("Warning : couldn't send ip, unknown external config")
92
93 def _populate(self):
94 logging.info('Populating the peers DB...')
95 new_peer_list = self._proxy.getPeerList(self._db_size,
96 self._internal_ip)
97 with self._db:
98 self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
99 RANDOM() LIMIT MAX(0, ? + (SELECT COUNT(*)
100 FROM peers WHERE used <= 0))""",
101 (str(len(new_peer_list) - self._db_size),))
102 self._db.executemany("""INSERT OR IGNORE INTO peers (prefix, address)
103 VALUES (?,?)""", new_peer_list)
104 self._db.execute("""DELETE FROM peers WHERE prefix IN
105 (SELECT prefix FROM blacklist)""")
106 logging.info('DB populated')
107 logging.trace('New peers : %s' % (', '.join(map(str, new_peer_list)),))
108
109 def getUnusedPeers(self, peer_count):
110 for populate in self.refresh, self._bootstrap, bool:
111 peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
112 <= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
113 (peer_count,)).fetchall()
114 if peer_list or populate():
115 return peer_list
116
117 def _bootstrap(self):
118 logging.info('Getting Boot peer...')
119 proxy = xmlrpclib.ServerProxy(self._registry)
120 try:
121 bootpeer = proxy.getBootstrapPeer(self._prefix).data
122 logging.debug('Boot peer received from server')
123 p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
124 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
125 bootpeer = p.communicate(bootpeer)[0].split()
126 self._db.execute("INSERT INTO peers (prefix, address) VALUES (?,?)", bootpeer)
127 logging.debug('Boot peer added')
128 return True
129 except socket.error:
130 pass
131 except sqlite3.IntegrityError, e:
132 if e.args[0] != 'column prefix is not unique':
133 raise
134 return False
135
136 def usePeer(self, prefix):
137 logging.trace('Updating peers database : using peer %s' % prefix)
138 self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
139 (prefix,))
140 logging.debug('DB updated')
141
142 def unusePeer(self, prefix):
143 logging.trace('Updating peers database : unusing peer %s' % prefix)
144 self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
145 (prefix,))
146 logging.debug('DB updated')
147
148 def flagPeer(self, prefix):
149 logging.trace('Updating peers database : flagging peer %s' % prefix)
150 self._db.execute("UPDATE peers SET used = -1 WHERE prefix = ?",
151 (prefix,))
152 logging.debug('DB updated')
153
154 def handle_message(self, msg):
155 script_type, arg = msg.split()
156 if script_type == 'client-connect':
157 logging.info('Incomming connection from %s' % (arg,))
158 elif script_type == 'client-disconnect':
159 logging.info('%s has disconnected' % (arg,))
160 elif script_type == 'route-up':
161 if not self._manual:
162 external_ip = arg
163 new_address = list([external_ip, port, proto]
164 for port, proto, _ in self._pp)
165 if self._address != new_address:
166 self._address = new_address
167 logging.info('Received new external ip : %s'
168 % (external_ip,))
169 self._declare()
170 else:
171 logging.debug('Unknow message recieved from the openvpn pipe : %s'
172 % msg)
173