Resilience: add pidfiles in PBS.
[slapos.git] / slapos / recipe / pbs.py
1 ##############################################################################
2 #
3 # Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
4 #
5 # WARNING: This program as such is intended to be used by professional
6 # programmers who take the whole responsibility of assessing all potential
7 # consequences resulting from its eventual inadequacies and bugs
8 # End users who are looking for a ready-to-use solution with commercial
9 # guarantees and support are strongly adviced to contract a Free Software
10 # Service Company
11 #
12 # This program is Free Software; you can redistribute it and/or
13 # modify it under the terms of the GNU General Public License
14 # as published by the Free Software Foundation; either version 3
15 # of the License, or (at your option) any later version.
16 #
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
21 #
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25 #
26 ##############################################################################
27
28 import json
29 import os
30 import signal
31 import subprocess
32 import sys
33 import textwrap
34 import urlparse
35
36 from slapos.recipe.librecipe import GenericSlapRecipe
37 from slapos.recipe.dropbear import KnownHostsFile
38 from slapos.recipe.notifier import Notify
39 from slapos.recipe.notifier import Callback
40 from slapos import slap as slapmodule
41
42
43 def promise(args):
44
45 def failed_ssh():
46 sys.stderr.write("SSH Connection failed\n")
47 partition = slap.registerComputerPartition(args['computer_id'],
48 args['partition_id'])
49 partition.bang("SSH Connection failed. rdiff-backup is unusable.")
50
51 def sigterm_handler(signum, frame):
52 failed_ssh()
53
54 signal.signal(signal.SIGTERM, sigterm_handler)
55
56 slap = slapmodule.slap()
57 slap.initializeConnection(args['server_url'],
58 key_file=args.get('key_file'),
59 cert_file=args.get('cert_file'))
60
61 ssh = subprocess.Popen([args['ssh_client'], '%(user)s@%(host)s/%(port)s' % args],
62 stdin=subprocess.PIPE,
63 stdout=open(os.devnull, 'w'),
64 stderr=open(os.devnull, 'w'))
65
66 # Rdiff Backup protocol quit command
67 quitcommand = 'q' + chr(255) + chr(0) * 7
68
69 ssh.stdin.write(quitcommand)
70 ssh.stdin.flush()
71 ssh.stdin.close()
72 ssh.wait()
73
74 if ssh.poll() is None:
75 return 1
76 if ssh.returncode != 0:
77 failed_ssh()
78 return ssh.returncode
79
80
81
82 class Recipe(GenericSlapRecipe, Notify, Callback):
83
84 def add_slave(self, entry, known_hosts_file):
85 path_list = []
86
87 url = entry.get('url')
88 if not url:
89 raise ValueError('Missing URL parameter for PBS recipe')
90 parsed_url = urlparse.urlparse(url)
91
92 slave_type = entry['type']
93 if not slave_type in ['pull', 'push']:
94 raise ValueError('type parameter must be either pull or push.')
95
96 slave_id = entry['notification-id']
97
98 print 'Processing PBS slave %s with type %s' % (slave_id, slave_type)
99
100 promise_path = os.path.join(self.options['promises-directory'], slave_id)
101 promise_dict = self.promise_base_dict.copy()
102 promise_dict.update(user=parsed_url.username,
103 host=parsed_url.hostname,
104 port=parsed_url.port)
105 promise = self.createPythonScript(promise_path,
106 __name__ + '.promise',
107 promise_dict)
108 path_list.append(promise)
109
110 host = parsed_url.hostname
111 known_hosts_file[host] = entry['server-key']
112
113 notifier_wrapper_path = os.path.join(self.options['wrappers-directory'], slave_id)
114 rdiff_wrapper_path = notifier_wrapper_path + '_raw'
115
116 # Create the rdiff-backup wrapper
117 # It is useful to separate it from the notifier so that we can run it
118 # Manually.
119 rdiffbackup_parameter_list = []
120
121 # XXX use -y because the host might not yet be in the
122 # trusted hosts file until the next time slapgrid is run.
123 rdiffbackup_remote_schema = '%(ssh)s -y -p %%s %(user)s@%(host)s' % {
124 'ssh': self.options['sshclient-binary'],
125 'user': parsed_url.username,
126 'host': parsed_url.hostname,
127 }
128 remote_directory = '%(port)s::%(path)s' % {'port': parsed_url.port,
129 'path': parsed_url.path}
130 local_directory = self.createDirectory(self.options['directory'], entry['name'])
131
132 if slave_type == 'push':
133 # Create a simple rdiff-backup wrapper that will push
134 rdiffbackup_parameter_list.extend(['--remote-schema', rdiffbackup_remote_schema])
135 rdiffbackup_parameter_list.extend(['--restore-as-of', 'now'])
136 rdiffbackup_parameter_list.append('--force')
137 rdiffbackup_parameter_list.append(local_directory)
138 rdiffbackup_parameter_list.append(remote_directory)
139 comments = ['', 'Push data to a PBS *-import instance.', '']
140 rdiff_wrapper = self.createWrapper(
141 name=rdiff_wrapper_path,
142 command=self.options['rdiffbackup-binary'],
143 parameters=rdiffbackup_parameter_list,
144 comments=comments,
145 pidfile=os.path.join(self.options['run-directory'], '%s_raw.pid' % slave_id),
146 )
147 elif slave_type == 'pull':
148 # Wrap rdiff-backup call into a script that checks consistency of backup
149 # We need to manually escape the remote schema
150 rdiffbackup_parameter_list.extend(['--remote-schema', '"%s"' % rdiffbackup_remote_schema])
151 rdiffbackup_parameter_list.append(remote_directory)
152 rdiffbackup_parameter_list.append(local_directory)
153 comments = ['', 'Pull data from a PBS *-export instance.', '']
154 rdiff_wrapper_template = textwrap.dedent("""\
155 #!/bin/sh
156 # %(comment)s
157 RDIFF_BACKUP="%(rdiffbackup_binary)s"
158 $RDIFF_BACKUP %(rdiffbackup_parameter)s
159 if [ ! $? -eq 0 ]; then
160 # Check the backup, go to the last consistent backup, so that next
161 # run will be okay.
162 echo "Checking backup directory..."
163 $RDIFF_BACKUP --check-destination-dir %(local_directory)s
164 if [ ! $? -eq 0 ]; then
165 # Here, two possiblities:
166 # * The first backup failed. It is safe to remove it since there is nothing valuable there.
167 # * The backup has been complete, but is now in a really weird state. Not safe to remove it.
168 echo "Impossible to check backup: we move it to a safe place."
169 # XXX: bang
170 mv %(local_directory)s %(local_directory)s.$(date +%%s)
171 fi
172 fi
173 """)
174 rdiff_wrapper_content = rdiff_wrapper_template % {
175 'comment': comments,
176 'rdiffbackup_binary': self.options['rdiffbackup-binary'],
177 'local_directory': local_directory,
178 'rdiffbackup_parameter': ' \\\n '.join(rdiffbackup_parameter_list),
179 }
180 rdiff_wrapper = self.createFile(
181 name=rdiff_wrapper_path,
182 content=rdiff_wrapper_content,
183 mode=0700
184 )
185
186 path_list.append(rdiff_wrapper)
187
188 # Create notifier wrapper
189 notifier_wrapper = self.createNotifier(
190 notifier_binary=self.options['notifier-binary'],
191 wrapper=notifier_wrapper_path,
192 executable=rdiff_wrapper,
193 log=os.path.join(self.options['feeds'], entry['notification-id']),
194 title=entry.get('title', slave_id),
195 notification_url=entry['notify'],
196 feed_url='%s/get/%s' % (self.options['notifier-url'], entry['notification-id']),
197 pidfile=os.path.join(self.options['run-directory'], '%s.pid' % slave_id)
198 )
199 path_list.append(notifier_wrapper)
200
201 if 'on-notification' in entry:
202 path_list.append(self.createCallback(str(entry['on-notification']),
203 notifier_wrapper))
204 else:
205 cron_entry = os.path.join(self.options['cron-entries'], slave_id)
206 with open(cron_entry, 'w') as cron_entry_file:
207 cron_entry_file.write('%s %s' % (entry['frequency'], notifier_wrapper))
208 path_list.append(cron_entry)
209
210 return path_list
211
212
213 def _install(self):
214 path_list = []
215
216 if self.optionIsTrue('client', True):
217 self.logger.info("Client mode")
218
219 slap_connection = self.buildout['slap-connection']
220 self.promise_base_dict = {
221 'server_url': slap_connection['server-url'],
222 'computer_id': slap_connection['computer-id'],
223 'cert_file': slap_connection.get('cert-file'),
224 'key_file': slap_connection.get('key-file'),
225 'partition_id': slap_connection['partition-id'],
226 'ssh_client': self.options['sshclient-binary'],
227 }
228
229 slaves = json.loads(self.options['slave-instance-list'])
230 known_hosts = KnownHostsFile(self.options['known-hosts'])
231 with known_hosts:
232 for slave in slaves:
233 path_list.extend(self.add_slave(slave, known_hosts))
234 else:
235 self.logger.info("Server mode")
236
237 wrapper = self.createWrapper(name=self.options['wrapper'],
238 command=self.options['rdiffbackup-binary'],
239 parameters=[
240 '--restrict', self.options['path'],
241 '--server'
242 ])
243 path_list.append(wrapper)
244
245 return path_list