Blacklist tunnel connected to us
[re6stnet.git] / re6st / db.py
1 import logging, sqlite3, socket, subprocess, xmlrpclib, time
2 import utils
3
4
5 class PeerManager:
6
7 # internal ip = temp arg/attribute
8 def __init__(self, db_path, registry, key_path, refresh_time, address,
9 internal_ip, prefix, manual, pp, db_size):
10 self._refresh_time = refresh_time
11 self._address = address
12 self._internal_ip = internal_ip
13 self._prefix = prefix
14 self._db_size = db_size
15 self._registry = registry
16 self._key_path = key_path
17 self._pp = pp
18 self._manual = manual
19
20 logging.info('Connecting to peers database...')
21 self._db = sqlite3.connect(db_path, isolation_level=None)
22 logging.debug('Database opened')
23
24 logging.info('Preparing peers database...')
25 self._db.execute("""CREATE TABLE IF NOT EXISTS peers (
26 prefix TEXT PRIMARY KEY,
27 address TEXT NOT NULL,
28 used INTEGER NOT NULL DEFAULT 0,
29 date INTEGER DEFAULT (strftime('%s', 'now')))""")
30 self._db.execute("UPDATE peers SET used = 0")
31 self._db.execute("""CREATE INDEX IF NOT EXISTS
32 _peers_used ON peers(used)""")
33 self._db.execute("""CREATE TABLE IF NOT EXISTS config (
34 name text primary key,
35 value text)""")
36 self._db.execute('ATTACH DATABASE ":memory:" AS blacklist')
37 self._db.execute("""CREATE TABLE blacklist.flag (
38 prefix TEXT PRIMARY KEY,
39 flag INTEGER NOT NULL)""")
40 self._db.execute("""CREATE INDEX blacklist.blacklist_flag
41 ON flag(flag)""")
42 self._db.execute("INSERT INTO blacklist.flag VALUES (?,?)", (prefix, 1))
43 try:
44 a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next()
45 except StopIteration:
46 proxy = xmlrpclib.ServerProxy(registry)
47 a = proxy.getPrivateAddress()
48 self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,))
49 self._proxy = xmlrpclib.ServerProxy(a)
50 logging.debug('Database prepared')
51
52 self.next_refresh = time.time()
53
54 def clear_blacklist(self, flag):
55 logging.info('Clearing blacklist from flag %u' % flag)
56 self._db.execute("DELETE FROM blacklist.flag WHERE flag = ?",
57 (flag,))
58 logging.info('Blacklist cleared')
59
60 def blacklist(self, prefix, flag):
61 logging.ninfo('Blacklisting %s' % prefix)
62 self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,))
63 self._db.execute("INSERT OR REPLACE INTO blacklist.flag VALUES (?,?)",
64 (prefix, flag))
65 logging.debug('%s blacklisted' % prefix)
66
67 def whitelist(self, prefix):
68 logging.info('Unblacklisting %s' % prefix)
69 self._db.execute("DELETE FROM blacklist.flag WHERE prefix = ?", (prefix,))
70 logging.debug('%s whitelisted' % prefix)
71
72 def refresh(self):
73 logging.info('Refreshing the peers DB...')
74 try:
75 self._declare()
76 self._populate()
77 logging.info('DB refreshed')
78 self.next_refresh = time.time() + self._refresh_time
79 return True
80 except socket.error, e:
81 logging.debug('socket.error : %s' % e)
82 logging.info('Connection to server failed, retrying in 30s')
83 self.next_refresh = time.time() + 30
84 return False
85
86 def _declare(self):
87 if self._address != None:
88 logging.info('Sending connection info to server...')
89 self._proxy.declare((self._internal_ip,
90 utils.address_str(self._address)))
91 logging.debug('Info sent')
92 else:
93 logging.warning("Warning : couldn't send ip, unknown external config")
94
95 def _populate(self):
96 logging.info('Populating the peers DB...')
97 new_peer_list = self._proxy.getPeerList(self._db_size,
98 self._internal_ip)
99 with self._db:
100 self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
101 RANDOM() LIMIT MAX(0, ? + (SELECT COUNT(*)
102 FROM peers WHERE used <= 0))""",
103 (str(len(new_peer_list) - self._db_size),))
104 self._db.executemany("""INSERT OR IGNORE INTO peers (prefix, address)
105 VALUES (?,?)""", new_peer_list)
106 self._db.execute("""DELETE FROM peers WHERE prefix IN
107 (SELECT prefix FROM blacklist.flag)""")
108 logging.info('DB populated')
109 logging.trace('New peers : %s' % (', '.join(map(str, new_peer_list)),))
110
111 def getUnusedPeers(self, peer_count):
112 for populate in self.refresh, self._bootstrap, bool:
113 peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
114 <= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
115 (peer_count,)).fetchall()
116 if peer_list:
117 return peer_list
118 populate()
119 logging.warning('Cannot find any new peers')
120 return []
121
122 def _bootstrap(self):
123 logging.info('Getting Boot peer...')
124 proxy = xmlrpclib.ServerProxy(self._registry)
125 try:
126 bootpeer = proxy.getBootstrapPeer(self._prefix).data
127 logging.debug('Boot peer received from server')
128 p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
129 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
130 bootpeer = p.communicate(bootpeer)[0].split()
131 if bootpeer[0] != self._prefix:
132 self._db.execute("INSERT INTO peers (prefix, address) VALUES (?,?)", bootpeer)
133 logging.debug('Boot peer added')
134 return True
135 except socket.error:
136 pass
137 except sqlite3.IntegrityError, e:
138 if e.args[0] != 'column prefix is not unique':
139 raise
140 return False
141
142 def usePeer(self, prefix):
143 logging.trace('Updating peers database : using peer %s' % prefix)
144 self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
145 (prefix,))
146 logging.debug('DB updated')
147
148 def unusePeer(self, prefix):
149 logging.trace('Updating peers database : unusing peer %s' % prefix)
150 self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
151 (prefix,))
152 logging.debug('DB updated')
153
154 def flagPeer(self, prefix):
155 logging.trace('Updating peers database : flagging peer %s' % prefix)
156 self._db.execute("UPDATE peers SET used = -1 WHERE prefix = ?",
157 (prefix,))
158 logging.debug('DB updated')
159
160 def handle_message(self, msg):
161 script_type, arg = msg.split()
162 if script_type == 'client-connect':
163 self.blacklist(arg, 2)
164 logging.info('Incomming connection from %s' % (arg,))
165 elif script_type == 'client-disconnect':
166 self.whitelist(arg)
167 logging.info('%s has disconnected' % (arg,))
168 elif script_type == 'route-up':
169 if not self._manual:
170 external_ip = arg
171 new_address = list([external_ip, port, proto]
172 for port, proto, _ in self._pp)
173 if self._address != new_address:
174 self._address = new_address
175 logging.info('Received new external ip : %s'
176 % (external_ip,))
177 self._declare()
178 else:
179 logging.debug('Unknow message recieved from the openvpn pipe : %s'
180 % msg)