removed call to ssh.terminate()
[slapos.git] / slapos / recipe / pbs.py
1 ##############################################################################
2 #
3 # Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
4 #
5 # WARNING: This program as such is intended to be used by professional
6 # programmers who take the whole responsibility of assessing all potential
7 # consequences resulting from its eventual inadequacies and bugs
8 # End users who are looking for a ready-to-use solution with commercial
9 # guarantees and support are strongly adviced to contract a Free Software
10 # Service Company
11 #
12 # This program is Free Software; you can redistribute it and/or
13 # modify it under the terms of the GNU General Public License
14 # as published by the Free Software Foundation; either version 3
15 # of the License, or (at your option) any later version.
16 #
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
21 #
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25 #
26 ##############################################################################
27
28 import hashlib
29 import json
30 import os
31 import signal
32 import subprocess
33 import sys
34 import urlparse
35
36 from slapos.recipe.librecipe import GenericSlapRecipe
37 from slapos.recipe.dropbear import KnownHostsFile
38 from slapos.recipe.notifier import Notify
39 from slapos.recipe.notifier import Callback
40 from slapos import slap as slapmodule
41
42
43 def promise(args):
44
45 def failed_ssh():
46 sys.stderr.write("SSH Connection failed\n")
47 partition = slap.registerComputerPartition(args['computer_id'],
48 args['partition_id'])
49 partition.bang("SSH Connection failed. rdiff-backup is unusable.")
50
51 def sigterm_handler(signum, frame):
52 failed_ssh()
53
54 signal.signal(signal.SIGTERM, sigterm_handler)
55
56 slap = slapmodule.slap()
57 slap.initializeConnection(args['server_url'],
58 key_file=args.get('key_file'),
59 cert_file=args.get('cert_file'))
60
61 ssh = subprocess.Popen([args['ssh_client'], '%(user)s@%(host)s/%(port)s' % args],
62 stdin=subprocess.PIPE,
63 stdout=open(os.devnull, 'w'),
64 stderr=open(os.devnull, 'w'))
65
66 # Rdiff Backup protocol quit command
67 quitcommand = 'q' + chr(255) + chr(0) * 7
68
69 ssh.stdin.write(quitcommand)
70 ssh.stdin.flush()
71 ssh.stdin.close()
72 ssh.wait()
73
74 if ssh.poll() is None:
75 return 1
76 if ssh.returncode != 0:
77 failed_ssh()
78 return ssh.returncode
79
80
81
82 class Recipe(GenericSlapRecipe, Notify, Callback):
83
84 def add_slave(self, entry, known_hosts_file):
85 path_list = []
86
87 url = entry.get('url')
88 if url is None:
89 url = ''
90
91 # We assume that thanks to sha512 there's no collisions
92 url_hash = hashlib.sha512(url).hexdigest()
93 name_hash = hashlib.sha512(entry['name']).hexdigest()
94
95 promise_path = os.path.join(self.options['promises-directory'],
96 url_hash)
97 parsed_url = urlparse.urlparse(url)
98 promise_dict = self.promise_base_dict.copy()
99 promise_dict.update(user=parsed_url.username,
100 host=parsed_url.hostname,
101 port=parsed_url.port)
102 promise = self.createPythonScript(promise_path,
103 __name__ + '.promise',
104 promise_dict)
105 path_list.append(promise)
106
107
108 host = parsed_url.hostname
109 known_hosts_file[host] = entry['server-key']
110
111 # XXX use -y because the host might not yet be in the
112 # trusted hosts file until the next time slapgrid is run.
113
114 remote_schema = '%(ssh)s -y -p %%s %(user)s@%(host)s' % \
115 {
116 'ssh': self.options['sshclient-binary'],
117 'user': parsed_url.username,
118 'host': parsed_url.hostname,
119 }
120
121 parameters = ['--remote-schema', remote_schema]
122
123 remote_directory = '%(port)s::%(path)s' % {'port': parsed_url.port,
124 'path': parsed_url.path}
125
126 local_directory = self.createDirectory(self.options['directory'],
127 name_hash)
128
129 if entry['type'] == 'push':
130 parameters.extend(['--restore-as-of', 'now'])
131 parameters.append('--force')
132 parameters.extend([local_directory, remote_directory])
133 comments = ['','Push data to a PBS *-import instance.','']
134 else:
135 parameters.extend([remote_directory, local_directory])
136 comments = ['','Pull data from a PBS *-export instance.','']
137
138 wrapper_basepath = os.path.join(self.options['wrappers-directory'],
139 url_hash)
140
141 if 'notify' in entry:
142 wrapper_path = wrapper_basepath + '_raw'
143 else:
144 wrapper_path = wrapper_basepath
145
146 wrapper = self.createWrapper(name=wrapper_path,
147 command=self.options['rdiffbackup-binary'],
148 parameters=parameters,
149 comments = comments)
150 path_list.append(wrapper)
151
152 if 'notify' in entry:
153 feed_url = '%s/get/%s' % (self.options['notifier-url'],
154 entry['notification-id'])
155 wrapper = self.createNotifier(notifier_binary=self.options['notifier-binary'],
156 wrapper=wrapper_basepath,
157 executable=wrapper_path,
158 log=os.path.join(self.options['feeds'], entry['notification-id']),
159 title=entry.get('title', 'Untitled'),
160 notification_url=entry['notify'],
161 feed_url=feed_url,
162 )
163 path_list.append(wrapper)
164 #self.setConnectionDict(dict(feed_url=feed_url), entry['slave_reference'])
165
166 if 'on-notification' in entry:
167 path_list.append(self.createCallback(str(entry['on-notification']),
168 wrapper))
169 else:
170 cron_entry = os.path.join(self.options['cron-entries'], url_hash)
171 with open(cron_entry, 'w') as cron_entry_file:
172 cron_entry_file.write('%s %s' % (entry['frequency'], wrapper))
173 path_list.append(cron_entry)
174
175 return path_list
176
177
178 def _install(self):
179 path_list = []
180
181 if self.optionIsTrue('client', True):
182 self.logger.info("Client mode")
183
184 slap_connection = self.buildout['slap-connection']
185 self.promise_base_dict = {
186 'server_url': slap_connection['server-url'],
187 'computer_id': slap_connection['computer-id'],
188 'cert_file': slap_connection.get('cert-file'),
189 'key_file': slap_connection.get('key-file'),
190 'partition_id': slap_connection['partition-id'],
191 'ssh_client': self.options['sshclient-binary'],
192 }
193
194 slaves = json.loads(self.options['slave-instance-list'])
195 known_hosts = KnownHostsFile(self.options['known-hosts'])
196 with known_hosts:
197 # XXX this API could be cleaner
198 for slave in slaves:
199 path_list.extend(self.add_slave(slave, known_hosts))
200 else:
201 self.logger.info("Server mode")
202
203 wrapper = self.createWrapper(name=self.options['wrapper'],
204 command=self.options['rdiffbackup-binary'],
205 parameters=[
206 '--restrict', self.options['path'],
207 '--server'
208 ])
209 path_list.append(wrapper)
210
211 return path_list
212