Added some comments
[re6stnet.git] / db.py
1 import sqlite3, socket, xmlrpclib, time, os
2 import utils
3
4
5 class PeerManager:
6
7 # internal ip = temp arg/attribute
8 def __init__(self, db_dir_path, server, server_port, refresh_time, address,
9 internal_ip, prefix, manual, pp , db_size):
10 self._refresh_time = refresh_time
11 self._address = address
12 self._internal_ip = internal_ip
13 self._prefix = prefix
14 self._server = server
15 self._server_port = server_port
16 self._db_size = db_size
17 self._pp = pp
18 self._blacklist = [(prefix,)]
19 self._manual = manual
20
21 self._proxy = xmlrpclib.ServerProxy('http://%s:%u'
22 % (server, server_port))
23
24 utils.log('Connectiong to peers database...', 4)
25 self._db = sqlite3.connect(os.path.join(db_dir_path, 'peers.db'),
26 isolation_level=None)
27 utils.log('Database opened', 5)
28 utils.log('Preparing peers database...', 4)
29 try:
30 self._db.execute("UPDATE peers SET used = 0")
31 self._db.execute("""CREATE TABLE IF NOT EXISTS blacklist (
32 prefix TEXT PRIMARY KEY,
33 flag INTEGER NOT NULL)""")
34 self._db.execute("""CREATE INDEX IF NOT EXISTS
35 blacklist_flag ON blacklist(flag)""")
36 except sqlite3.OperationalError, e:
37 if e.args[0] == 'no such table: peers':
38 raise RuntimeError
39 utils.log('Database prepared', 5)
40
41 self.next_refresh = time.time()
42
43 def clear_blacklist(self, flag):
44 utils.log('Clearing blacklist from flag %u' % (flag,), 3)
45 self._db.execute("DELETE FROM blacklist WHERE flag = ?", (flag,))
46 utils.log('Blacklist cleared', 5)
47
48 def blacklist(self, prefix, flag):
49 utils.log('Blacklisting %s' % (prefix,), 4)
50 self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,))
51 self._db.execute("INSERT OR REPLACE INTO blacklist VALUES (?,?)",
52 (prefix, flag))
53 utils.log('%s blacklisted' % (prefix,), 5)
54
55 def whitelist(self, prefix):
56 utils.log('Unblacklisting %s' % (prefix,), 4)
57 self._db.execute("DELETE FROM blacklist WHERE prefix = ?", (prefix,))
58 utils.log('%s whitelisted' % (prefix,), 5)
59
60 def refresh(self):
61 utils.log('Refreshing the peers DB...', 2)
62 try:
63 self._declare()
64 self._populate()
65 utils.log('DB refreshed', 3)
66 self.next_refresh = time.time() + self._refresh_time
67 except socket.error, e:
68 utils.log(str(e), 4)
69 utils.log('Connection to server failed, retrying in 30s', 2)
70 self.next_refresh = time.time() + 30
71
72 def _declare(self):
73 if self._address != None:
74 utils.log('Sending connection info to server...', 3)
75 self._proxy.declare((self._internal_ip,
76 utils.address_list(self._address)))
77 utils.log('Info sent', 5)
78 else:
79 utils.log("Warning : couldn't send ip, unknown external config", 4)
80
81 def _populate(self):
82 utils.log('Populating the peers DB...', 2)
83 new_peer_list = self._proxy.getPeerList(self._db_size,
84 self._internal_ip)
85 self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
86 RANDOM() LIMIT MAX(0, ? + (SELECT COUNT(*)
87 FROM peers WHERE used <= 0))""",
88 (str(len(new_peer_list) - self._db_size),))
89 self._db.executemany("""INSERT OR IGNORE INTO peers (prefix, address)
90 VALUES (?,?)""", new_peer_list)
91 self._db.execute("""DELETE FROM peers WHERE prefix IN
92 (SELECT prefix FROM blacklist)""")
93 utils.log('DB populated', 3)
94 utils.log('New peers : %s' % ', '.join(map(str, new_peer_list)), 5)
95
96 def getUnusedPeers(self, peer_count):
97 return self._db.execute("""SELECT prefix, address FROM peers WHERE used
98 <= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
99 (peer_count,))
100
101 def usePeer(self, prefix):
102 utils.log('Updating peers database : using peer ' + str(prefix), 5)
103 self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
104 (prefix,))
105 utils.log('DB updated', 5)
106
107 def unusePeer(self, prefix):
108 utils.log('Updating peers database : unusing peer ' + str(prefix), 5)
109 self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
110 (prefix,))
111 utils.log('DB updated', 5)
112
113 def flagPeer(self, prefix):
114 utils.log('Updating peers database : flagging peer ' + str(prefix), 5)
115 self._db.execute("UPDATE peers SET used = -1 WHERE prefix = ?",
116 (prefix,))
117 utils.log('DB updated', 5)
118
119 def handle_message(self, msg):
120 script_type, arg = msg.split()
121 if script_type == 'client-connect':
122 utils.log('Incomming connection from %s' % (arg,), 3)
123 elif script_type == 'client-disconnect':
124 utils.log('%s has disconnected' % (arg,), 3)
125 elif script_type == 'route-up':
126 if not self._manual:
127 external_ip = arg
128 new_address = list([external_ip, port, proto]
129 for port, proto in self._pp)
130 if self._address != new_address:
131 self._address = new_address
132 utils.log('Received new external ip : %s'
133 % (external_ip,), 3)
134 self._declare()
135 else:
136 utils.log('Unknow message recieved from the openvpn pipe : '
137 + msg, 1)
138