solving a bug
[re6stnet.git] / re6st / db.py
1 import logging, sqlite3, socket, subprocess, xmlrpclib, time
2 import utils
3
4 # used = 0 : fresh node
5 # used = 1 : previously used peer
6 # used = 2 : curently in use
7
8
9 class PeerManager:
10
11 # internal ip = temp arg/attribute
12 def __init__(self, db_path, registry, key_path, refresh_time, address,
13 internal_ip, prefix, manual, pp, db_size):
14 self._refresh_time = refresh_time
15 self.address = address
16 self._internal_ip = internal_ip
17 self._prefix = prefix
18 self.db_size = db_size
19 self._registry = registry
20 self._key_path = key_path
21 self._pp = pp
22 self._manual = manual
23 self.tunnel_manager = None
24
25 self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
26 self.sock.bind(('::', 326))
27 self.socket_file = self.sock.makefile()
28
29 logging.info('Connecting to peers database...')
30 self._db = sqlite3.connect(db_path, isolation_level=None)
31 logging.debug('Database opened')
32
33 logging.info('Preparing peers database...')
34 self._db.execute("""CREATE TABLE IF NOT EXISTS peers (
35 prefix TEXT PRIMARY KEY,
36 address TEXT NOT NULL,
37 used INTEGER NOT NULL DEFAULT 0,
38 date INTEGER DEFAULT (strftime('%s', 'now')))""")
39 self._db.execute("UPDATE peers SET used = 1 WHERE used = 2")
40 self._db.execute("""CREATE INDEX IF NOT EXISTS
41 _peers_used ON peers(used)""")
42 self._db.execute("""CREATE TABLE IF NOT EXISTS config (
43 name text primary key,
44 value text)""")
45 self._db.execute('ATTACH DATABASE ":memory:" AS blacklist')
46 self._db.execute("""CREATE TABLE blacklist.flag (
47 prefix TEXT PRIMARY KEY,
48 flag INTEGER NOT NULL)""")
49 self._db.execute("""CREATE INDEX blacklist.blacklist_flag
50 ON flag(flag)""")
51 self._db.execute("INSERT INTO blacklist.flag VALUES (?,?)", (prefix, 1))
52 try:
53 a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next()
54 except StopIteration:
55 proxy = xmlrpclib.ServerProxy(registry)
56 a = proxy.getPrivateAddress()
57 self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,))
58 self._proxy = xmlrpclib.ServerProxy(a)
59 logging.debug('Database prepared')
60
61 self.next_refresh = time.time()
62
63 def clear_blacklist(self, flag):
64 logging.info('Clearing blacklist from flag %u' % flag)
65 self._db.execute("DELETE FROM blacklist.flag WHERE flag = ?",
66 (flag,))
67 logging.info('Blacklist cleared')
68
69 def blacklist(self, prefix, flag):
70 logging.info('Blacklisting %s' % prefix)
71 self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,))
72 self._db.execute("INSERT OR REPLACE INTO blacklist.flag VALUES (?,?)",
73 (prefix, flag))
74 logging.debug('%s blacklisted' % prefix)
75
76 def whitelist(self, prefix):
77 logging.info('Unblacklisting %s' % prefix)
78 self._db.execute("DELETE FROM blacklist.flag WHERE prefix = ?", (prefix,))
79 logging.debug('%s whitelisted' % prefix)
80
81 def refresh(self):
82 logging.info('Refreshing the peers DB...')
83 try:
84 self.next_refresh = time.time() + 30
85 self._declare()
86 except socket.error, e:
87 logging.info('Connection to server failed, re-bootstraping and retrying in 30s')
88 try:
89 self._bootstrap()
90 except socket.error, e:
91 logging.debug('socket.error : %s' % e)
92
93 def _declare(self):
94 if self.address != None:
95 logging.info('Sending connection info to server...')
96 self._proxy.declare(utils.address_str(self.address))
97 self.next_refresh = time.time() + self._refresh_time
98 logging.debug('Info sent')
99 else:
100 logging.warning("Warning : couldn't send ip, unknown external config. retrying in 30s")
101
102 def getUnusedPeers(self, peer_count):
103 for populate in self._bootstrap, bool:
104 peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
105 <> 2 ORDER BY used ASC, RANDOM() LIMIT ?""",
106 (peer_count,)).fetchall()
107 if peer_list:
108 return peer_list
109 populate()
110 logging.warning('Cannot find any new peers')
111 return []
112
113 def _bootstrap(self):
114 logging.info('Getting Boot peer...')
115 proxy = xmlrpclib.ServerProxy(self._registry)
116 try:
117 bootpeer = proxy.getBootstrapPeer(self._prefix).data
118 logging.debug('Boot peer received from server')
119 p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
120 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
121 bootpeer = p.communicate(bootpeer)[0].split()
122 return self._addPeer(bootpeer)
123 except socket.error:
124 pass
125 except sqlite3.IntegrityError, e:
126 if e.args[0] != 'column prefix is not unique':
127 raise
128 except Exception, e:
129 logging.info('Unable to bootstrap : %s' % e)
130 return False
131
132 def usePeer(self, prefix):
133 logging.trace('Updating peers database : using peer %s' % prefix)
134 self._db.execute("UPDATE peers SET used = 2 WHERE prefix = ?",
135 (prefix,))
136 logging.debug('DB updated')
137
138 def unusePeer(self, prefix):
139 logging.trace('Updating peers database : unusing peer %s' % prefix)
140 self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
141 (prefix,))
142 logging.debug('DB updated')
143
144 def handle_message(self, msg):
145 script_type, arg = msg.split()
146 if script_type == 'client-connect':
147 logging.info('Incoming connection from %s' % (arg,))
148 prefix = utils.binFromSubnet(arg)
149 if self.tunnel_manager.checkIncomingTunnel(prefix):
150 self.blacklist(prefix, 2)
151 elif script_type == 'client-disconnect':
152 self.whitelist(utils.binFromSubnet(arg))
153 logging.info('%s has disconnected' % (arg,))
154 elif script_type == 'route-up':
155 if not self._manual:
156 external_ip = arg
157 new_address = list([external_ip, port, proto]
158 for port, proto, _ in self._pp)
159 if self.address != new_address:
160 self.address = new_address
161 logging.info('Received new external ip : %s'
162 % (external_ip,))
163 try:
164 self._declare()
165 except socket.error, e:
166 logging.debug('socket.error : %s' % e)
167 logging.info("""Connection to server failed while declaring external infos""")
168 else:
169 logging.debug('Unknow message recieved from the openvpn pipe : %s'
170 % msg)
171
172 def readSocket(self):
173 msg = self.socket_file.readline()
174 peer = msg.replace('\n', '').split(' ')
175 if len(peer) != 2:
176 logging.debug('Invalid package recieved : %s' % msg)
177 return
178 self._addPeer(peer)
179
180 def _addPeer(self, peer):
181 logging.debug('Adding peer %s' % peer)
182 if int(self._db.execute("""SELECT COUNT(*) FROM blacklist.flag WHERE prefix = ?""", (peer[0],)).next()[0]) > 0:
183 logging.info('Peer is blacklisted')
184 return False
185 self._db.execute("""DELETE FROM peers WHERE used <> 2 ORDER BY used DESC, date DESC
186 LIMIT MAX(0, (SELECT COUNT(*) FROM peers
187 WHERE used <> 2) - ?)""", (str(self.db_size),))
188 self._db.execute("UPDATE peers SET address = ?, used = 0, date = strftime('%s','now') WHERE used = 1 and prefix = ?", (peer[1], peer[0],))
189 self._db.execute("INSERT OR IGNORE INTO peers (prefix, address) VALUES (?,?)", peer)
190 logging.debug('Peer added')
191 return True