simpler, more robust PBS promise
[slapos.git] / slapos / recipe / pbs.py
1 ##############################################################################
2 #
3 # Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
4 #
5 # WARNING: This program as such is intended to be used by professional
6 # programmers who take the whole responsibility of assessing all potential
7 # consequences resulting from its eventual inadequacies and bugs
8 # End users who are looking for a ready-to-use solution with commercial
9 # guarantees and support are strongly adviced to contract a Free Software
10 # Service Company
11 #
12 # This program is Free Software; you can redistribute it and/or
13 # modify it under the terms of the GNU General Public License
14 # as published by the Free Software Foundation; either version 3
15 # of the License, or (at your option) any later version.
16 #
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
21 #
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25 #
26 ##############################################################################
27
28 import hashlib
29 import json
30 import os
31 import signal
32 import subprocess
33 import sys
34 import urlparse
35
36 from slapos.recipe.librecipe import GenericSlapRecipe
37 from slapos.recipe.dropbear import KnownHostsFile
38 from slapos.recipe.notifier import Notify
39 from slapos.recipe.notifier import Callback
40 from slapos import slap as slapmodule
41
42
43 def promise(args):
44
45 def failed_ssh():
46 sys.stderr.write("SSH Connection failed\n")
47 partition = slap.registerComputerPartition(args['computer_id'],
48 args['partition_id'])
49 partition.bang("SSH Connection failed. rdiff-backup is unusable.")
50
51 def sigterm_handler(signum, frame):
52 failed_ssh()
53
54 signal.signal(signal.SIGTERM, sigterm_handler)
55
56 slap = slapmodule.slap()
57 slap.initializeConnection(args['server_url'],
58 key_file=args.get('key_file'),
59 cert_file=args.get('cert_file'))
60
61 ssh = subprocess.Popen([args['ssh_client'], '%(user)s@%(host)s/%(port)s' % args],
62 stdin=subprocess.PIPE,
63 stdout=open(os.devnull, 'w'),
64 stderr=open(os.devnull, 'w'))
65
66 # Rdiff Backup protocol quit command
67 quitcommand = 'q' + chr(255) + chr(0) * 7
68
69 ssh.stdin.write(quitcommand)
70 ssh.stdin.flush()
71 ssh.stdin.close()
72 ssh.wait()
73
74 if ssh.poll() is None:
75 return 1
76 if ssh.returncode != 0:
77 ssh.terminate()
78 failed_ssh()
79 return ssh.returncode
80
81
82
83 class Recipe(GenericSlapRecipe, Notify, Callback):
84
85 def add_slave(self, entry, known_hosts_file):
86 path_list = []
87
88 url = entry.get('url')
89 if url is None:
90 url = ''
91
92 # We assume that thanks to sha512 there's no collisions
93 url_hash = hashlib.sha512(url).hexdigest()
94 name_hash = hashlib.sha512(entry['name']).hexdigest()
95
96 promise_path = os.path.join(self.options['promises-directory'],
97 url_hash)
98 parsed_url = urlparse.urlparse(url)
99 promise_dict = self.promise_base_dict.copy()
100 promise_dict.update(user=parsed_url.username,
101 host=parsed_url.hostname,
102 port=parsed_url.port)
103 promise = self.createPythonScript(promise_path,
104 __name__ + '.promise',
105 promise_dict)
106 path_list.append(promise)
107
108
109 host = parsed_url.hostname
110 known_hosts_file[host] = entry['server-key']
111
112 # XXX use -y because the host might not yet be in the
113 # trusted hosts file until the next time slapgrid is run.
114
115 remote_schema = '%(ssh)s -y -p %%s %(user)s@%(host)s' % \
116 {
117 'ssh': self.options['sshclient-binary'],
118 'user': parsed_url.username,
119 'host': parsed_url.hostname,
120 }
121
122 parameters = ['--remote-schema', remote_schema]
123
124 remote_directory = '%(port)s::%(path)s' % {'port': parsed_url.port,
125 'path': parsed_url.path}
126
127 local_directory = self.createDirectory(self.options['directory'],
128 name_hash)
129
130 if entry['type'] == 'push':
131 parameters.extend(['--restore-as-of', 'now'])
132 parameters.append('--force')
133 parameters.extend([local_directory, remote_directory])
134 comments = ['','Push data to a PBS *-import instance.','']
135 else:
136 parameters.extend([remote_directory, local_directory])
137 comments = ['','Pull data from a PBS *-export instance.','']
138
139 wrapper_basepath = os.path.join(self.options['wrappers-directory'],
140 url_hash)
141
142 if 'notify' in entry:
143 wrapper_path = wrapper_basepath + '_raw'
144 else:
145 wrapper_path = wrapper_basepath
146
147 wrapper = self.createWrapper(name=wrapper_path,
148 command=self.options['rdiffbackup-binary'],
149 parameters=parameters,
150 comments = comments)
151 path_list.append(wrapper)
152
153 if 'notify' in entry:
154 feed_url = '%s/get/%s' % (self.options['notifier-url'],
155 entry['notification-id'])
156 wrapper = self.createNotifier(notifier_binary=self.options['notifier-binary'],
157 wrapper=wrapper_basepath,
158 executable=wrapper_path,
159 log=os.path.join(self.options['feeds'], entry['notification-id']),
160 title=entry.get('title', 'Untitled'),
161 notification_url=entry['notify'],
162 feed_url=feed_url,
163 )
164 path_list.append(wrapper)
165 #self.setConnectionDict(dict(feed_url=feed_url), entry['slave_reference'])
166
167 if 'on-notification' in entry:
168 path_list.append(self.createCallback(str(entry['on-notification']),
169 wrapper))
170 else:
171 cron_entry = os.path.join(self.options['cron-entries'], url_hash)
172 with open(cron_entry, 'w') as cron_entry_file:
173 cron_entry_file.write('%s %s' % (entry['frequency'], wrapper))
174 path_list.append(cron_entry)
175
176 return path_list
177
178
179 def _install(self):
180 path_list = []
181
182 if self.optionIsTrue('client', True):
183 self.logger.info("Client mode")
184
185 slap_connection = self.buildout['slap-connection']
186 self.promise_base_dict = {
187 'server_url': slap_connection['server-url'],
188 'computer_id': slap_connection['computer-id'],
189 'cert_file': slap_connection.get('cert-file'),
190 'key_file': slap_connection.get('key-file'),
191 'partition_id': slap_connection['partition-id'],
192 'ssh_client': self.options['sshclient-binary'],
193 }
194
195 slaves = json.loads(self.options['slave-instance-list'])
196 known_hosts = KnownHostsFile(self.options['known-hosts'])
197 with known_hosts:
198 # XXX this API could be cleaner
199 for slave in slaves:
200 path_list.extend(self.add_slave(slave, known_hosts))
201 else:
202 self.logger.info("Server mode")
203
204 wrapper = self.createWrapper(name=self.options['wrapper'],
205 command=self.options['rdiffbackup-binary'],
206 parameters=[
207 '--restrict', self.options['path'],
208 '--server'
209 ])
210 path_list.append(wrapper)
211
212 return path_list
213