Fix bug preventing the registry to send its own address for bootstrap
[re6stnet.git] / re6st / db.py
1 import logging, sqlite3, socket, subprocess, xmlrpclib, time
2 from urllib import splittype, splithost, splitport
3 import utils
4
5
6 class PeerDB(object):
7
8 # internal ip = temp arg/attribute
9 def __init__(self, db_path, registry, key_path, prefix, db_size=200):
10 self._prefix = prefix
11 self._db_size = db_size
12 self._key_path = key_path
13 self._proxy = xmlrpclib.ServerProxy(registry)
14
15 logging.info('Initialize cache ...')
16 self._db = sqlite3.connect(db_path, isolation_level=None)
17 q = self._db.execute
18 q("PRAGMA synchronous = OFF")
19 q("PRAGMA journal_mode = MEMORY")
20 q("""CREATE TABLE IF NOT EXISTS peer (
21 prefix TEXT PRIMARY KEY,
22 address TEXT NOT NULL)""")
23 q("""CREATE TABLE IF NOT EXISTS config (
24 name text primary key,
25 value text)""")
26 q('ATTACH DATABASE ":memory:" AS volatile')
27 q("""CREATE TABLE volatile.stat (
28 peer TEXT PRIMARY KEY REFERENCES peer(prefix) ON DELETE CASCADE,
29 try INTEGER NOT NULL DEFAULT 0)""")
30 q("CREATE INDEX volatile.stat_try ON stat(try)")
31 q("INSERT INTO volatile.stat (peer) SELECT prefix FROM peer")
32 try:
33 a = q("SELECT value FROM config WHERE name='registry'").next()[0]
34 except StopIteration:
35 logging.info("Private IP of registry not in cache."
36 " Asking registry via its public IP ...")
37 retry = 1
38 while True:
39 try:
40 a = self._proxy.getPrivateAddress()
41 break
42 except socket.error, e:
43 logging.warning(e)
44 time.sleep(retry)
45 retry = min(60, retry * 2)
46 q("INSERT INTO config VALUES ('registry',?)", (a,))
47 self.registry_ip = utils.binFromIp(a)
48 logging.info("Cache initialized. Registry IP is %s", a)
49
50 def log(self):
51 if logging.getLogger().isEnabledFor(5):
52 logging.trace("Cache:")
53 for prefix, address, _try in self._db.execute(
54 "SELECT peer.*, try FROM peer, volatile.stat"
55 " WHERE prefix=peer ORDER BY prefix"):
56 logging.trace("- %s: %s%s", prefix, address,
57 ' (blacklisted)' if _try else '')
58
59 def connecting(self, prefix, connecting):
60 self._db.execute("UPDATE volatile.stat SET try=? WHERE peer=?",
61 (connecting, prefix))
62
63 def resetConnecting(self):
64 self._db.execute("UPDATE volatile.stat SET try=0")
65
66 def getAddress(self, prefix):
67 r = self._db.execute("SELECT address FROM peer, volatile.stat"
68 " WHERE prefix=? AND prefix=peer AND try=0",
69 (prefix,)).fetchone()
70 return r and r[0]
71
72 # Exclude our own address from results in case it is there, which may
73 # happen if a node change its certificate without clearing the cache.
74 # IOW, one should probably always put our own address there.
75 _get_peer_sql = "SELECT %s FROM peer, volatile.stat" \
76 " WHERE prefix=peer AND prefix!=? AND try=?"
77 def getPeerList(self, failed=0, __sql=_get_peer_sql % "prefix, address"
78 + " ORDER BY RANDOM()"):
79 return self._db.execute(__sql, (self._prefix, failed))
80 def getPeerCount(self, failed=0, __sql=_get_peer_sql % "COUNT(*)"):
81 return self._db.execute(__sql, (self._prefix, failed)).next()[0]
82
83 def getBootstrapPeer(self):
84 logging.info('Getting Boot peer...')
85 try:
86 bootpeer = self._proxy.getBootstrapPeer(self._prefix).data
87 except (socket.error, xmlrpclib.Fault), e:
88 logging.warning('Failed to bootstrap (%s)', e)
89 else:
90 p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
91 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
92 bootpeer = p.communicate(bootpeer)[0].split()
93 if bootpeer[0] != self._prefix:
94 self.addPeer(*bootpeer)
95 return bootpeer
96 logging.warning('Buggy registry sent us our own address')
97
98 def addPeer(self, prefix, address):
99 logging.debug('Adding peer %s: %s', prefix, address)
100 with self._db:
101 q = self._db.execute
102 try:
103 (a,), = q("SELECT address FROM peer WHERE prefix=?", (prefix,))
104 except ValueError:
105 q("DELETE FROM peer WHERE prefix IN (SELECT peer"
106 " FROM volatile.stat ORDER BY try, RANDOM() LIMIT ?,-1)",
107 (self._db_size,))
108 a = None
109 if a != address:
110 q("INSERT OR REPLACE INTO peer VALUES (?,?)", (prefix, address))
111 q("INSERT OR REPLACE INTO volatile.stat VALUES (?,0)", (prefix,))