Merge branch 'master' of https://git.erp5.org/repos/vifibnet
[re6stnet.git] / db.py
1 import logging, sqlite3, socket, subprocess, xmlrpclib, time, os
2 import utils
3
4 class PeerManager:
5
6 # internal ip = temp arg/attribute
7 def __init__(self, db_path, registry, key_path, refresh_time, address,
8 internal_ip, prefix, manual, pp , db_size):
9 self._refresh_time = refresh_time
10 self._address = address
11 self._internal_ip = internal_ip
12 self._prefix = prefix
13 self._db_size = db_size
14 self._registry = registry
15 self._key_path = key_path
16 self._pp = pp
17 self._manual = manual
18
19 logging.info('Connecting to peers database...')
20 self._db = sqlite3.connect(db_path, isolation_level=None)
21 logging.debug('Database opened')
22
23 logging.info('Preparing peers database...')
24 self._db.execute("""CREATE TABLE IF NOT EXISTS peers (
25 prefix TEXT PRIMARY KEY,
26 address TEXT NOT NULL,
27 used INTEGER NOT NULL DEFAULT 0,
28 date INTEGER DEFAULT (strftime('%s', 'now')))""")
29 self._db.execute("UPDATE peers SET used = 0")
30 self._db.execute("CREATE INDEX IF NOT EXISTS _peers_used ON peers(used)")
31 self._db.execute("""CREATE TABLE IF NOT EXISTS blacklist (
32 prefix TEXT PRIMARY KEY,
33 flag INTEGER NOT NULL)""")
34 self._db.execute("""CREATE INDEX IF NOT EXISTS
35 blacklist_flag ON blacklist(flag)""")
36 self._db.execute("INSERT OR REPLACE INTO blacklist VALUES (?,?)",
37 (prefix, 1))
38 self._db.execute("""CREATE TABLE IF NOT EXISTS config (
39 name text primary key,
40 value text)""")
41 try:
42 a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next()
43 except StopIteration:
44 proxy = xmlrpclib.ServerProxy(registry)
45 a = proxy.getPrivateAddress()
46 self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,))
47 self._proxy = xmlrpclib.ServerProxy(a)
48 logging.debug('Database prepared')
49
50 self.next_refresh = time.time()
51
52 def clear_blacklist(self, flag):
53 logging.info('Clearing blacklist from flag %u' % flag)
54 self._db.execute("DELETE FROM blacklist WHERE flag = ?",
55 (flag,))
56 logging.info('Blacklist cleared')
57
58 def blacklist(self, prefix, flag):
59 logging.ninfo('Blacklisting %s' % prefix)
60 self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,))
61 self._db.execute("INSERT OR REPLACE INTO blacklist VALUES (?,?)",
62 (prefix, flag))
63 logging.debug('%s blacklisted' % prefix)
64
65 def whitelist(self, prefix):
66 logging.info('Unblacklisting %s' % prefix)
67 self._db.execute("DELETE FROM blacklist WHERE prefix = ?", (prefix,))
68 logging.debug('%s whitelisted' % prefix)
69
70 def refresh(self):
71 logging.info('Refreshing the peers DB...')
72 try:
73 self._declare()
74 self._populate()
75 logging.info('DB refreshed')
76 self.next_refresh = time.time() + self._refresh_time
77 return True
78 except socket.error, e:
79 logging.debug('socket.error : %s' % e)
80 logging.info('Connection to server failed, retrying in 30s')
81 self.next_refresh = time.time() + 30
82 return False
83
84 def _declare(self):
85 if self._address != None:
86 logging.info('Sending connection info to server...')
87 self._proxy.declare((self._internal_ip,
88 utils.address_str(self._address)))
89 logging.debug('Info sent')
90 else:
91 logging.warning("Warning : couldn't send ip, unknown external config")
92
93 def _populate(self):
94 logging.info('Populating the peers DB...')
95 new_peer_list = self._proxy.getPeerList(self._db_size,
96 self._internal_ip)
97 with self._db:
98 self._db.execute("""DELETE FROM peers WHERE used <= 0 ORDER BY used,
99 RANDOM() LIMIT MAX(0, ? + (SELECT COUNT(*)
100 FROM peers WHERE used <= 0))""",
101 (str(len(new_peer_list) - self._db_size),))
102 self._db.executemany("""INSERT OR IGNORE INTO peers (prefix, address)
103 VALUES (?,?)""", new_peer_list)
104 self._db.execute("""DELETE FROM peers WHERE prefix IN
105 (SELECT prefix FROM blacklist)""")
106 logging.info('DB populated')
107 logging.trace('New peers : %s' % (', '.join(map(str, new_peer_list)),))
108
109 def getUnusedPeers(self, peer_count):
110 for populate in self.refresh, self._bootstrap, bool:
111 peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
112 <= 0 ORDER BY used DESC,RANDOM() LIMIT ?""",
113 (peer_count,)).fetchall()
114 if peer_list or populate():
115 return peer_list
116
117 def _bootstrap(self):
118 logging.info('Getting Boot peer...')
119 proxy = xmlrpclib.ServerProxy(self._registry)
120 try:
121 bootpeer = proxy.getBootstrapPeer(self._prefix).data
122 logging.debug('Boot peer received from server')
123 p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
124 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
125 bootpeer = p.communicate(bootpeer)[0].split()
126 self._db.execute("INSERT INTO peers (prefix, address) VALUES (?,?)", bootpeer)
127 logging.debug('Boot peer added')
128 return True
129 except socket.error:
130 pass
131 except sqlite3.IntegrityError, e:
132 import pdb; pdb.set_trace()
133 if e.args[0] != '':
134 raise
135 return False
136
137 def usePeer(self, prefix):
138 logging.trace('Updating peers database : using peer %s' % prefix)
139 self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?",
140 (prefix,))
141 logging.debug('DB updated')
142
143 def unusePeer(self, prefix):
144 logging.trace('Updating peers database : unusing peer %s' % prefix)
145 self._db.execute("UPDATE peers SET used = 0 WHERE prefix = ?",
146 (prefix,))
147 logging.debug('DB updated')
148
149 def flagPeer(self, prefix):
150 logging.trace('Updating peers database : flagging peer %s' % prefix)
151 self._db.execute("UPDATE peers SET used = -1 WHERE prefix = ?",
152 (prefix,))
153 logging.debug('DB updated')
154
155 def handle_message(self, msg):
156 script_type, arg = msg.split()
157 if script_type == 'client-connect':
158 logging.info('Incomming connection from %s' % (arg,))
159 elif script_type == 'client-disconnect':
160 logging.info('%s has disconnected' % (arg,))
161 elif script_type == 'route-up':
162 if not self._manual:
163 external_ip = arg
164 new_address = list([external_ip, port, proto]
165 for port, proto, _ in self._pp)
166 if self._address != new_address:
167 self._address = new_address
168 logging.info('Received new external ip : %s'
169 % (external_ip,))
170 self._declare()
171 else:
172 logging.debug('Unknow message recieved from the openvpn pipe : %s'
173 % msg)
174