Fixed minor bug in db. Added a HOW TO section in re6stnet man page
[re6stnet.git] / re6st / db.py
1 import logging, sqlite3, socket, subprocess, xmlrpclib, time, os
2 import utils
3
4 class PeerManager:
5
6 # internal ip = temp arg/attribute
7 def __init__(self, db_path, registry, key_path, refresh_time, address,
8 internal_ip, prefix, manual, pp , db_size):
9 self._refresh_time = refresh_time
10 self._address = address
11 self._internal_ip = internal_ip
12 self._prefix = prefix
13 self._db_size = db_size
14 self._registry = registry
15 self._key_path = key_path
16 self._pp = pp
17 self._manual = manual
18
19 logging.info('Connecting to peers database...')
20 self._db = sqlite3.connect(db_path, isolation_level=None)
21 logging.debug('Database opened')
22
23 logging.info('Preparing peers database...')
24 self._db.execute("""CREATE TABLE IF NOT EXISTS peers (
25 prefix TEXT PRIMARY KEY,
26 address TEXT NOT NULL,
27 used INTEGER NOT NULL DEFAULT 0,
28 date INTEGER DEFAULT (strftime('%s', 'now')))""")
29 self._db.execute("UPDATE peers SET used = 0")
30 self._db.execute("CREATE INDEX IF NOT EXISTS
31 _peers_used ON peers(used)")
32 self._db.execute("""CREATE TABLE IF NOT EXISTS config (
33 name text primary key,
34 value text)""")
35 self._db.execute('ATTACH DATABASE ":memory:" AS blacklist')
36 self._db.execute("""CREATE TABLE blacklist.flag (
37 prefix TEXT PRIMARY KEY,
38 flag INTEGER NOT NULL)""")
39 self._db.execute("""CREATE INDEX blacklist.blacklist_flag
40 ON flag(flag)""")
41 self._db.execute("INSERT INTO blacklist.flag VALUES (?,?)", (prefix, 1))
42 try:
43 a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next()
44 except StopIteration:
45 proxy = xmlrpclib.ServerProxy(registry)
46 a = proxy.getPrivateAddress()
47 self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,))
48 self._proxy = xmlrpclib.ServerProxy(a)
49 logging.debug('Database prepared')
50
51 self.next_refresh = time.time()
52
53 def clear_blacklist(self, flag):
54 logging.info('Clearing blacklist from flag %u' % flag)
55 self._db.execute("DELETE FROM blacklist.flag WHERE flag = ?",
56 (flag,))
57 logging.info('Blacklist cleared')
58
59 def blacklist(self, prefix, flag):
60 logging.ninfo('Blacklisting %s' % prefix)
61 self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,))
62 self._db.execute("INSERT OR REPLACE INTO blacklist.flag VALUES (?,?)",
63 (prefix, flag))
64 logging.debug('%s blacklisted' % prefix)
65
66 def whitelist(self, prefix):
67 logging.info('Unblacklisting %s' % prefix)
68 self._db.execute("DELETE FROM blacklist.flag WHERE prefix = ?", (prefix,))
69 logging.debug('%s whitelisted' % prefix)
70
71 def refresh(self):
72 logging.info('Refreshing the peers DB...')
73 try:
74 self._declare()
75 self._populate()
76 logging.info('DB refreshed')
77 self.next_refresh = time.time() + self._refresh_time
78 return True
79 except socket.error, e:
80 logging.debug('socket.error : %s' % e)
81 logging.info('Connection to server failed, retrying in 30s')
82 self.next_refresh = time.time() + 30
83 return False
84
85 def _declare(self):
86 if self._address != None:
87 logging.info('Sending connection info to server...')
88 self._proxy.declare((self._internal_ip,
89 utils.address_str(self._address)))
90 logging.debug('Info sent')
91 else:
92 logging.warning("Warning : couldn't send ip, unknown external config")
93
94 def _populate(self):
95 logging.info('Populating the peers DB...')
96 new_peer_list = self._proxy.getPeerList(self._db_size,
97 self._internal_ip)
98 with self._db:
99 self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
100 RANDOM() LIMIT MAX(0, ? + (SELECT COUNT(*)
101 FROM peers WHERE used <= 0))""",
102 (str(len(new_peer_list) - self._db_size),))
103 self._db.executemany("""INSERT OR IGNORE INTO peers (prefix, address)
104 VALUES (?,?)""", new_peer_list)
105 self._db.execute("""DELETE FROM peers WHERE prefix IN
106 (SELECT prefix FROM blacklist.flag)""")
107 logging.info('DB populated')
108 logging.trace('New peers : %s' % (', '.join(map(str, new_peer_list)),))
109
110 def getUnusedPeers(self, peer_count):
111 for populate in self.refresh, self._bootstrap, bool:
112 peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
113 <= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
114 (peer_count,)).fetchall()
115 if peer_list:
116 return peer_list
117 populate()
118 logging.warning('Cannot find any new peers')
119 return []
120
121 def _bootstrap(self):
122 logging.info('Getting Boot peer...')
123 proxy = xmlrpclib.ServerProxy(self._registry)
124 try:
125 bootpeer = proxy.getBootstrapPeer(self._prefix).data
126 logging.debug('Boot peer received from server')
127 p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
128 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
129 bootpeer = p.communicate(bootpeer)[0].split()
130 if bootpeer[0] != self._prefix:
131 self._db.execute("INSERT INTO peers (prefix, address) VALUES (?,?)", bootpeer)
132 logging.debug('Boot peer added')
133 return True
134 except socket.error:
135 pass
136 except sqlite3.IntegrityError, e:
137 if e.args[0] != 'column prefix is not unique':
138 raise
139 return False
140
141 def usePeer(self, prefix):
142 logging.trace('Updating peers database : using peer %s' % prefix)
143 self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
144 (prefix,))
145 logging.debug('DB updated')
146
147 def unusePeer(self, prefix):
148 logging.trace('Updating peers database : unusing peer %s' % prefix)
149 self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
150 (prefix,))
151 logging.debug('DB updated')
152
153 def flagPeer(self, prefix):
154 logging.trace('Updating peers database : flagging peer %s' % prefix)
155 self._db.execute("UPDATE peers SET used = -1 WHERE prefix = ?",
156 (prefix,))
157 logging.debug('DB updated')
158
159 def handle_message(self, msg):
160 script_type, arg = msg.split()
161 if script_type == 'client-connect':
162 logging.info('Incomming connection from %s' % (arg,))
163 elif script_type == 'client-disconnect':
164 logging.info('%s has disconnected' % (arg,))
165 elif script_type == 'route-up':
166 if not self._manual:
167 external_ip = arg
168 new_address = list([external_ip, port, proto]
169 for port, proto, _ in self._pp)
170 if self._address != new_address:
171 self._address = new_address
172 logging.info('Received new external ip : %s'
173 % (external_ip,))
174 self._declare()
175 else:
176 logging.debug('Unknow message recieved from the openvpn pipe : %s'
177 % msg)
178