slapos.recipe.re6stmaster: More verbosity and update ips if they changed.
[slapos.git] / slapos / recipe / re6stnet / re6stnet.py
1 # -*- coding: utf-8 -*-
2 import logging
3 import json
4 import os
5 import time
6 import sqlite3
7 import slapos
8 import traceback
9 import logging
10 import socket
11 import select
12 from re6st import tunnel, ctl, registry, utils, x509
13 from OpenSSL import crypto
14
15
16 log = logging.getLogger('SLAPOS-RE6STNET')
17 logging.basicConfig(level=logging.INFO)
18
19 logging.trace = logging.debug
20
21 class iterRoutes(object):
22
23 _waiting = True
24
25 def __new__(cls, control_socket, network):
26 self = object.__new__(cls)
27 c = ctl.Babel(control_socket, self, network)
28 c.request_dump()
29 while self._waiting:
30 args = {}, {}, ()
31 c.select(*args)
32 utils.select(*args)
33 return (prefix
34 for neigh_routes in c.neighbours.itervalues()
35 for prefix in neigh_routes[1]
36 if prefix)
37
38 def babel_dump(self):
39 self._waiting = False
40
41 def loadJsonFile(path):
42 if os.path.exists(path):
43 with open(path, 'r') as f:
44 content = f.read()
45 return json.loads(content)
46 else:
47 return {}
48
49 def writeFile(path, data):
50 with open(path, 'w') as f:
51 f.write(data)
52
53 def readFile(path):
54 if os.path.exists(path):
55 with open(path, 'r') as f:
56 content = f.read()
57 return content
58 return ''
59
60 def getDb(db_path):
61 db = sqlite3.connect(db_path, isolation_level=None,
62 check_same_thread=False)
63 db.text_factory = str
64
65 return db.cursor()
66
67 def bang(args):
68 computer_guid = args['computer_id']
69 partition_id = args['partition_id']
70 slap = slapos.slap.slap()
71
72 # Redeploy instance to update published information
73 slap.initializeConnection(args['server_url'], args['key_file'],
74 args['cert_file'])
75 partition = slap.registerComputerPartition(computer_guid=computer_guid,
76 partition_id=partition_id)
77 partition.bang(message='Published parameters changed!')
78 log.info("Bang with message 'parameters changed'...")
79
80
81 def requestAddToken(args, can_bang=True):
82
83 time.sleep(3)
84 registry_url = args['registry_url']
85 base_token_path = args['token_base_path']
86 path_list = [x for x in os.listdir(base_token_path) if x.endswith('.add')]
87
88 if not path_list:
89 log.info("No new token to add. Exiting...")
90 return
91
92 client = registry.RegistryClient(registry_url)
93 call_bang = False
94
95 for reference_key in path_list:
96 request_file = os.path.join(base_token_path, reference_key)
97 token = readFile(request_file)
98 if token :
99 reference = reference_key.split('.')[0]
100 # email is unique as reference is also unique
101 email = '%s@slapos' % reference.lower()
102 try:
103 result = client.requestAddToken(token, email)
104 except Exception:
105 log.debug('Request add token fail for %s... \n %s' % (request_file,
106 traceback.format_exc()))
107 continue
108 if result and result == token:
109 # update information
110 log.info("New token added for slave instance %s. Updating file status..." %
111 reference)
112 writeFile(os.path.join(base_token_path, '%s.status' % reference),
113 'TOKEN_ADDED')
114 os.unlink(request_file)
115 call_bang = True
116 else:
117 log.debug('Bad token. Request add token fail for %s...' % request_file)
118
119 if can_bang and call_bang:
120 bang(args)
121
122 def requestRemoveToken(args):
123 base_token_path = args['token_base_path']
124 path_list = [x for x in os.listdir(base_token_path) if x.endswith('.remove')]
125
126 if not path_list:
127 log.info("No token to delete. Exiting...")
128 return
129
130 client = registry.RegistryClient(args['registry_url'])
131 for reference_key in path_list:
132 request_file = os.path.join(base_token_path, reference_key)
133 token = readFile(request_file)
134 if token :
135 reference = reference_key.split('.')[0]
136 try:
137 result = client.requestDeleteToken(token)
138 except Exception:
139 log.debug('Request delete token fail for %s... \n %s' % (request_file,
140 traceback.format_exc()))
141 continue
142 else:
143 # certificate is invalidated, it will be revoked
144 writeFile(os.path.join(base_token_path, '%s.revoke' % reference), '')
145 if result == 'True':
146 # update information
147 log.info("Token deleted for slave instance %s. Clean up file status..." %
148 reference)
149 if result in ['True', 'False']:
150 os.unlink(request_file)
151 status_file = os.path.join(base_token_path, '%s.status' % reference)
152 if os.path.exists(status_file):
153 os.unlink(status_file)
154 ipv6_file = os.path.join(base_token_path, '%s.ipv6' % reference)
155 if os.path.exists(ipv6_file):
156 os.unlink(ipv6_file)
157
158 else:
159 log.debug('Bad token. Request add token fail for %s...' % request_file)
160
161 def requestRevoqueCertificate(args):
162
163 base_token_path = args['token_base_path']
164 db = getDb(args['db'])
165 path_list = [x for x in os.listdir(base_token_path) if x.endswith('.revoke')]
166 client = registry.RegistryClient(args['registry_url'])
167
168 for reference_key in path_list:
169 reference = reference_key.split('.')[0]
170 # XXX - email is always unique
171 email = '%s@slapos' % reference.lower()
172 cert_string = ''
173 try:
174 cert_string, = db.execute("SELECT cert FROM cert WHERE email = ?",
175 (email,)).next()
176 except StopIteration:
177 # Certificate was not generated yet !!!
178 pass
179
180 try:
181 if cert_string:
182 cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_string)
183 cn = x509.subnetFromCert(cert)
184 result = client.revoke(str(cn))
185 time.sleep(2)
186 except Exception:
187 log.debug('Request revoke certificate fail for %s... \n %s' % (reference,
188 traceback.format_exc()))
189 continue
190 else:
191 os.unlink(os.path.join(base_token_path, reference_key))
192 log.info("Certificate revoked for slave instance %s." % reference)
193
194
195 def dumpIPv6Network(slave_reference, db, network, ipv6_file):
196 email = '%s@slapos' % slave_reference.lower()
197
198 try:
199 cert_string, = db.execute("SELECT cert FROM cert WHERE email = ?",
200 (email,)).next()
201 except StopIteration:
202 # Certificate was not generated yet !!!
203 pass
204
205 try:
206 if cert_string:
207 cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_string)
208 cn = x509.subnetFromCert(cert)
209 subnet = network + utils.binFromSubnet(cn)
210 ipv6 = utils.ipFromBin(subnet)
211 changed = readFile(ipv6_file) != ipv6
212 writeFile(ipv6_file, ipv6)
213 return ipv6, utils.binFromSubnet(cn), changed
214 except Exception:
215 log.debug('XXX for %s... \n %s' % (slave_reference,
216 traceback.format_exc()))
217
218 def sendto(sock, prefix, code):
219 return sock.sendto("%s\0%c" % (prefix, code), ('::1', tunnel.PORT))
220
221 def recv(sock, code):
222 try:
223 prefix, msg = sock.recv(1<<16).split('\0', 1)
224 int(prefix, 2)
225 except ValueError:
226 pass
227 else:
228 if msg and ord(msg[0]) == code:
229 return prefix, msg[1:]
230 return None, None
231
232 def dumpIPv4Network(ipv6_prefix, network, ipv4_file, sock, peer_prefix_list):
233 try:
234
235 if ipv6_prefix == "00000000000000000000000000000000":
236 # workarround to ignore the first node
237 ipv4 = "0.0.0.0"
238 changed = readFile(ipv4_file) != ipv4
239 writeFile(ipv4_file, ipv4)
240 return ipv4, changed
241
242 peers = []
243
244 peer_list = [prefix for prefix in peer_prefix_list if prefix == ipv6_prefix ]
245
246 if len(peer_list) == 0:
247 raise ValueError("Unable to find such prefix on database")
248
249 peer = peer_list[0]
250
251 sendto(sock, peer, 1)
252 s = sock,
253 timeout = 15
254 end = timeout + time.time()
255
256 while select.select(s, (), (), timeout)[0]:
257 prefix, msg = recv(sock, 1)
258 if prefix == peer:
259 break
260
261 timeout = max(0, end - time.time())
262 else:
263 logging.info("Timeout while querying address for %s/%s", int(peer, 2), len(peer))
264 msg = ""
265
266 if "," in msg:
267 ipv4 = msg.split(',')[0]
268 else:
269 ipv4 = "0.0.0.0"
270 changed = readFile(ipv4_file) != ipv4
271 writeFile(ipv4_file, ipv4)
272 return ipv4, changed
273 except Exception:
274 log.info('XXX for %s... \n %s' % (ipv6_prefix,
275 traceback.format_exc()))
276 return "0.0.0.0", False
277
278 def checkService(args, can_bang=True):
279 base_token_path = args['token_base_path']
280 token_dict = loadJsonFile(args['token_json'])
281
282 if not token_dict:
283 return
284
285 db = getDb(args['db'])
286 call_bang = False
287
288 computer_guid = args['computer_id']
289 partition_id = args['partition_id']
290 slap = slapos.slap.slap()
291 client = registry.RegistryClient(args['registry_url'])
292 ca = client.getCa()
293 network = x509.networkFromCa(crypto.load_certificate(crypto.FILETYPE_PEM, ca))
294
295 sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
296
297 peer_prefix_list = [prefix for prefix in
298 iterRoutes("/var/run/re6stnet/babeld.sock", network)]
299
300
301 # Check token status
302 for slave_reference, token in token_dict.iteritems():
303 status_file = os.path.join(base_token_path, '%s.status' % slave_reference)
304 ipv6_file = os.path.join(base_token_path, '%s.ipv6' % slave_reference)
305 ipv4_file = os.path.join(base_token_path, '%s.ipv4' % slave_reference)
306 if not os.path.exists(status_file):
307 # This token is not added yet!
308 log.info("Token %s dont exist yet." % status_file)
309 continue
310
311 msg = readFile(status_file)
312 log.info("Token %s has %s State." % (status_file, msg))
313 if msg == 'TOKEN_USED':
314 log.info("Dumping ipv6...")
315 ipv6, ipv6_prefix, ipv6_changed = dumpIPv6Network(slave_reference, db, network, ipv6_file)
316 log.info("%s, IPV6 = %s, IPV6_PREFIX = %s" % (slave_reference, ipv6, ipv6_prefix))
317 _, ipv4_changed = dumpIPv4Network(ipv6_prefix, network, ipv4_file, sock, peer_prefix_list)
318 if ipv4_changed or ipv6_changed:
319 call_bang = True
320 continue
321
322 # Check if token is not in the database
323 status = False
324 try:
325 token_found, = db.execute("SELECT token FROM token WHERE token = ?",
326 (token,)).next()
327 if token_found == token:
328 status = True
329 except StopIteration:
330 pass
331 if not status:
332 # Token is used to register client
333 call_bang = True
334 try:
335 writeFile(status_file, 'TOKEN_USED')
336 dumpIPv6Network(slave_reference, db, network, ipv6_file)
337 dumpIPv4Network(ipv6_prefix, network, ipv4_file, sock, peer_prefix_list)
338 log.info("Token status of %s updated to 'used'." % slave_reference)
339 except IOError:
340 # XXX- this file should always exists
341 log.debug('Error when writing in file %s. Clould not update status of %s...' %
342 (status_file, slave_reference))
343
344 if call_bang and can_bang:
345 bang(args)
346
347 def manage(args):
348 # Request Add new tokens
349 requestAddToken(args)
350
351 # Request delete removed token
352 requestRemoveToken(args)
353
354 # check status of all token
355 checkService(args)
356