1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 import os
16 import time
17 import logging
18 import commands
19
20 from bodhi import buildsys, mail
21 from bodhi.util import synchronized
22 from threading import Thread, Lock
23 from turbogears import config
24 from os.path import exists, join, islink
25
26 log = logging.getLogger(__name__)
27 masher = None
28 lock = Lock()
29
33
35 """
36 The Masher. This is a TurboGears extension that runs alongside bodhi that
37 is in charge of queueing and dispatching mash composes.
38 """
40 log.info("Starting the Masher")
41 self._queue = []
42 self._threads = []
43 self.thread_id = 0
44 self.mashing = 0
45 self.last_log = None
46
47 @synchronized(lock)
48 - def queue(self, updates, repos=set()):
49 self._queue.append((self.thread_id, updates, repos))
50 self.thread_id += 1
51 if len(self._threads) == 0:
52 if len(self._queue):
53 self._mash(self._queue.pop())
54
55 @synchronized(lock)
56 - def done(self, thread):
57 """
58 Called by each MashTask upon completion. If there are more in the
59 queue, then dispatch them.
60 """
61 log.info("MashTask %d done!" % thread.id)
62 self.mashing = 0
63 self.last_log = thread.log
64 mail.send_releng('Bodhi Masher Report %s' %
65 time.strftime("%y%m%d.%H%M"), thread.report())
66 self._threads.remove(thread)
67 if len(self._threads) == 0:
68 if len(self._queue):
69 self._mash(self._queue.pop())
70
72 """ Dispatch a given MashTask """
73 thread = MashTask(task[0], task[1], task[2])
74 self._threads.append(thread)
75 thread.start()
76 self.mashing = 1
77
79 """
80 Return the most recent mash (log_filename, log_data)
81 """
82 log = 'Previous mash log not available'
83 if self.last_log and exists(self.last_log):
84 logfile = file(self.last_log, 'r')
85 log = logfile.read()
86 logfile.close()
87 return (self.last_log, log)
88
94
96 """
97 Return a string representation of the Masher, including the current
98 queue and updates that are getting moved/mashed
99 """
100 val = 'Currently Mashing: %s\n\n' % (self.mashing and 'Yes' or 'No')
101 if self.mashing:
102 for thread in self._threads:
103 val += str(thread)
104 if len(self._queue):
105 val += "\n[ Queue ]\n"
106 for item in self._queue:
107 if len(item[1]):
108 val += " Move tags\n"
109 for update in item[1]:
110 val += " - %s (%s)" % (update.title,
111 update.request)
112 if len(item[2]):
113 val += " Mash repos\n"
114 for repo in item[2]:
115 val += " - %s" % repo
116
117 (status, output) = commands.getstatusoutput("ps -U %d --forest v" %
118 os.getuid())
119 val += "\n" + output
120
121 return val
122
123
125
126 - def __init__(self, id, updates, repos=set()):
127 Thread.__init__(self)
128 log.debug("MashTask(%d, %s)" % (id, updates))
129 self.id = id
130 self.tag = None
131 self.updates = updates
132 self.koji = buildsys.get_session()
133
134 self.repos = repos
135 self.success = False
136 self.cmd = 'mash -o %s -c ' + config.get('mash_conf') + ' -f %s '
137 self.actions = []
138 self.mashing = False
139 self.moving = False
140 self.log = None
141 self.mashed_repos = {}
142
174
176 """
177 Move the builds back to their original tag
178 """
179 log.debug("Rolling back updates to their original tag")
180 tasks = []
181 for action in self.actions:
182 log.debug("Moving %s from %s to %s" % (action[0], action[2],
183 action[1]))
184 task_id = self.koji.moveBuild(action[2], action[1], action[0],
185 force=True)
186 tasks.append(task_id)
187 buildsys.wait_for_tasks(tasks)
188
190 log.debug("Updating comps...")
191 olddir = os.getcwd()
192 os.chdir(config.get('comps_dir'))
193 (status, output) = commands.getstatusoutput("cvs update")
194 log.debug("(%d, %s) from cvs update" % (status, output))
195 (status, output) = commands.getstatusoutput("make")
196 log.debug("(%d, %s) from make" % (status, output))
197 os.chdir(olddir)
198
200 mashed_dir = config.get('mashed_dir')
201 for repo, mashdir in self.mashed_repos.items():
202 link = join(mashed_dir, repo)
203 if islink(link):
204 os.unlink(link)
205 os.symlink(join(mashdir, repo), link)
206 log.debug("Created symlink: %s => %s" % (join(mashdir, repo), link))
207
209 self.mashing = True
210 self.update_comps()
211 for repo in self.repos:
212 mashdir = join(config.get('mashed_dir'), repo + '-' + \
213 time.strftime("%y%m%d.%H%M"))
214 self.mashed_repos[repo] = mashdir
215 comps = join(config.get('comps_dir'), 'comps-%s.xml' %
216 repo.split('-')[0])
217 mashcmd = self.cmd % (mashdir, comps) + repo
218 log.info("Running `%s`" % mashcmd)
219 (status, output) = commands.getstatusoutput(mashcmd)
220 log.info("status = %s" % status)
221 if status == 0:
222 self.success = True
223 mash_output = '%s/mash.out' % mashdir
224 out = file(mash_output, 'w')
225 out.write(output)
226 out.close()
227 log.info("Wrote mash output to %s" % mash_output)
228 self.log = mash_output
229 else:
230 self.success = False
231 failed_output = join(config.get('mashed_dir'), 'mash-failed-%s'
232 % time.strftime("%y%m%d.%H%M"))
233 out = file(failed_output, 'w')
234 out.write(output)
235 out.close()
236 log.info("Wrote failed mash output to %s" % failed_output)
237 self.log = failed_output
238 break
239 self.mashing = False
240 log.info("Mashing complete")
241
243 """
244 Move all of the builds to the appropriate tag, and then run mash. If
245 anything fails, undo any tag moves.
246 """
247 try:
248 t0 = time.time()
249 if self.move_builds():
250 log.debug("Moved builds in %s seconds" % (time.time() - t0))
251 self.success = True
252 t0 = time.time()
253 self.mash()
254 log.debug("Mashed for %s seconds" % (time.time() - t0))
255 if self.success:
256 log.debug("Running post-request actions on updates")
257 for update in self.updates:
258 update.request_complete()
259 self.generate_updateinfo()
260 self.update_symlinks()
261 else:
262 log.error("Error mashing.. skipping post-request actions")
263 if self.undo_move():
264 log.info("Tag rollback successful!")
265 else:
266 log.error("Tag rollback failed!")
267 else:
268 log.error("Error with build moves.. rolling back")
269 self.undo_move()
270 self.success = False
271 except Exception, e:
272 log.error("Exception thrown in MashTask %d" % self.id)
273 log.error(str(e))
274 masher.done(self)
275
277 """
278 Generate the updateinfo.xml.gz and insert it into the appropriate
279 repositories.
280 """
281 from bodhi.metadata import ExtendedMetadata
282 t0 = time.time()
283 for repo, mashdir in self.mashed_repos.items():
284 repo = join(mashdir, repo)
285 log.debug("Generating updateinfo.xml.gz for %s" % repo)
286 uinfo = ExtendedMetadata(repo)
287 uinfo.insert_updateinfo()
288 log.debug("Updateinfo generation took: %s secs" % (time.time()-t0))
289
291 val = '[ Mash Task #%d ]\n' % self.id
292 if self.moving:
293 val += ' Moving Updates\n'
294 for action in self.actions:
295 val += ' %s :: %s => %s\n' % (action[0], action[1], action[2])
296 elif self.mashing:
297 val += ' Mashing Repos %s\n' % ([str(repo) for repo in self.repos])
298 for update in self.updates:
299 val += ' %s (%s)\n' % (update.title, update.request)
300 else:
301 val += ' Not doing anything?'
302 return val
303
305 val = '[ Mash Task #%d ]\n' % self.id
306 val += 'The following actions were %ssuccessful.' % (self.success and
307 [''] or
308 ['*NOT* '])[0]
309 if len(self.actions):
310 val += '\n Moved the following package tags:\n'
311 for action in self.actions:
312 val += ' %s :: %s => %s\n' % (action[0], action[1], action[2])
313 val += '\n Mashed the following repositories:\n'
314 for repo in self.repos:
315 val += ' - %s\n' % repo
316 if self.log:
317 mashlog = file(self.log, 'r')
318 val += '\nMash Output:\n\n%s' % mashlog.read()
319 mashlog.close()
320 return val
321
325
330