INCOMPATIBLE: change registry protocol
[re6stnet.git] / re6st / db.py
1 import logging, sqlite3, socket, subprocess, time
2 from . import utils
3
4
5 class PeerDB(object):
6
7 # internal ip = temp arg/attribute
8 def __init__(self, db_path, registry, key_path, prefix, db_size=200):
9 self._prefix = prefix
10 self._db_size = db_size
11 self._key_path = key_path
12 self._registry = registry
13
14 logging.info('Initialize cache ...')
15 self._db = sqlite3.connect(db_path, isolation_level=None)
16 q = self._db.execute
17 q("PRAGMA synchronous = OFF")
18 q("PRAGMA journal_mode = MEMORY")
19 q("""CREATE TABLE IF NOT EXISTS peer (
20 prefix TEXT PRIMARY KEY,
21 address TEXT NOT NULL)""")
22 q("""CREATE TABLE IF NOT EXISTS config (
23 name text primary key,
24 value text)""")
25 q('ATTACH DATABASE ":memory:" AS volatile')
26 q("""CREATE TABLE volatile.stat (
27 peer TEXT PRIMARY KEY,
28 try INTEGER NOT NULL DEFAULT 0)""")
29 q("CREATE INDEX volatile.stat_try ON stat(try)")
30 q("INSERT INTO volatile.stat (peer) SELECT prefix FROM peer")
31 try:
32 a = q("SELECT value FROM config WHERE name='registry'").next()[0]
33 except StopIteration:
34 logging.info("Private IP of registry not in cache."
35 " Asking registry via its public IP ...")
36 retry = 1
37 while True:
38 try:
39 a = self._registry.getPrivateAddress(self._prefix)
40 break
41 except socket.error, e:
42 logging.warning(e)
43 time.sleep(retry)
44 retry = min(60, retry * 2)
45 q("INSERT INTO config VALUES ('registry',?)", (a,))
46 self.registry_ip = utils.binFromIp(a)
47 logging.info("Cache initialized. Registry IP is %s", a)
48
49 def log(self):
50 if logging.getLogger().isEnabledFor(5):
51 logging.trace("Cache:")
52 for prefix, address, _try in self._db.execute(
53 "SELECT peer.*, try FROM peer, volatile.stat"
54 " WHERE prefix=peer ORDER BY prefix"):
55 logging.trace("- %s: %s%s", prefix, address,
56 ' (blacklisted)' if _try else '')
57
58 def connecting(self, prefix, connecting):
59 self._db.execute("UPDATE volatile.stat SET try=? WHERE peer=?",
60 (connecting, prefix))
61
62 def resetConnecting(self):
63 self._db.execute("UPDATE volatile.stat SET try=0")
64
65 def getAddress(self, prefix):
66 r = self._db.execute("SELECT address FROM peer, volatile.stat"
67 " WHERE prefix=? AND prefix=peer AND try=0",
68 (prefix,)).fetchone()
69 return r and r[0]
70
71 # Exclude our own address from results in case it is there, which may
72 # happen if a node change its certificate without clearing the cache.
73 # IOW, one should probably always put our own address there.
74 _get_peer_sql = "SELECT %s FROM peer, volatile.stat" \
75 " WHERE prefix=peer AND prefix!=? AND try=?"
76 def getPeerList(self, failed=0, __sql=_get_peer_sql % "prefix, address"
77 + " ORDER BY RANDOM()"):
78 return self._db.execute(__sql, (self._prefix, failed))
79 def getPeerCount(self, failed=0, __sql=_get_peer_sql % "COUNT(*)"):
80 return self._db.execute(__sql, (self._prefix, failed)).next()[0]
81
82 def getBootstrapPeer(self):
83 logging.info('Getting Boot peer...')
84 try:
85 bootpeer = self._registry.getBootstrapPeer(self._prefix)
86 prefix, address = utils.decrypt(self._key_path, bootpeer).split()
87 except (socket.error, subprocess.CalledProcessError, ValueError), e:
88 logging.warning('Failed to bootstrap (%s)', e)
89 else:
90 if prefix != self._prefix:
91 self.addPeer(prefix, address)
92 return prefix, address
93 logging.warning('Buggy registry sent us our own address')
94
95 def addPeer(self, prefix, address, set_preferred=False):
96 logging.debug('Adding peer %s: %s', prefix, address)
97 with self._db:
98 q = self._db.execute
99 try:
100 (a,), = q("SELECT address FROM peer WHERE prefix=?", (prefix,))
101 if set_preferred:
102 preferred = address.split(';')
103 address = a
104 else:
105 preferred = a.split(';')
106 def key(a):
107 try:
108 return preferred.index(a)
109 except ValueError:
110 return len(preferred)
111 address = ';'.join(sorted(address.split(';'), key=key))
112 except ValueError:
113 a = q("SELECT peer FROM volatile.stat ORDER BY try, RANDOM()"
114 " LIMIT ?,-1", (self._db_size,)).fetchall()
115 if a:
116 qq = self._db.executemany
117 qq("DELETE FROM peer WHERE prefix IN (?)", a)
118 qq("DELETE FROM volatile.stat WHERE peer IN (?)", a)
119 # 'a != address' will evaluate to True because types differs
120 if a != address:
121 q("INSERT OR REPLACE INTO peer VALUES (?,?)", (prefix, address))
122 q("INSERT OR REPLACE INTO volatile.stat VALUES (?,0)", (prefix,))