#!/usr/bin/python -tt # A tool to help monitor & manage SELinux using func # # Copyright (C) 2009 Red Hat, Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Authors: Luke Macken import time from pprint import pprint from optparse import OptionParser from func import jobthing from func.overlord.client import Overlord status, stdout, stderr = range(3) class SELinuxOverlord(Overlord): selinux_status = {'Enforcing': [], 'Permissive': [], 'Disabled': []} selinux_minions = {} def __init__(self, minions): super(SELinuxOverlord, self).__init__(minions) self.minion_glob = minions def get_selinux_status(self): results = self.command.run('/usr/sbin/getenforce') for minion, result in results.iteritems(): if result[status]: print "[%s] Error: %s" % (minion, result) else: self.selinux_status[result[stdout].strip()].append(minion) self.selinux_minions[minion] = {} for key in self.selinux_status: self.selinux_status[key].sort() return self.selinux_status def get_selinux_denials(self, minion): overlord = Overlord(minion) return overlord.command.run('ausearch -m AVC -ts this-week --input-logs')[minion] def dump_selinux_denials(self): """ Write out all SELinux denials for all minions """ for minion in self.selinux_minions: result = self.get_selinux_denials(minion) if not result[status]: out = file(minion, 'w') out.write(result[stdout]) out.close() print "[%s] Successfully collected this weeks AVCs" % minion else: if '\n' in result: print "[%s] No AVCs Found" % minion out = file(minion, 'w') out.close() else: print "[%s] Problem running ausearch: %r" % (minion, result) def get_enforced_denials(self): """ Get a quick list of SELinux denials on enforced hosts """ for minion in self.selinux_status['Enforcing']: overlord = Overlord(minion) audit2allow = overlord.command.run('audit2allow -la') for m, r in audit2allow.iteritems(): if r[stdout].strip(): print "[%s]\n%s\n" % (m, r[stdout]) audit2allow = overlord.command.run('audit2allow -l -i /var/log/messages') for m, r in audit2allow.iteritems(): if r[stdout].strip(): print "[%s]\n%s\n" % (m, r[stdout]) def upgrade_policy(self): """ Update the SELinux policy across the given minions """ print "Cleaning yum metadata..." results = self.command.run('yum clean metadata') for minion, result in results.items(): if result[0]: print "[%s] Problem cleaning yum cache: %s" % (minion, result[1]) async_client = Overlord(self.minion_glob, nforks=10, async=True) print "Upgrading SELinux policy..." job_id = async_client.command.run('yum -y update selinux*') running = True while running: time.sleep(20) return_code, results = async_client.job_status(job_id) if return_code == jobthing.JOB_ID_RUNNING: continue elif return_code in (jobthing.JOB_ID_FINISHED, jobthing.JOB_ID_PARTIAL): for minion, result in results.items(): if result[0]: print '[%s] Problem upgrading policy: %s' % (minion, result[1]) if 'Updated: selinux-policy' in result[1]: ver = result[1].split('Updated: ')[-1].split()[1].split(':')[1] print "[%s] selinux-policy successfully upgraded to %s" % (minion, ver) else: print "selinux-policy *not* upgraded on %s: %s" % (minion, result[1]) if return_code == jobthing.JOB_ID_FINISHED: running = False elif return_code == jobthing.JOB_ID_LOST_IN_SPACE: print "Job %s lost in space: %s" % (job_id, results) else: print "Unknown return code %s: %s" % (return_code, results) print "SELinux policy upgrade complete!" if __name__ == '__main__': parser = OptionParser('usage: %prog [options] [minion1[;minion2]]') parser.add_option('-s', '--status', action='store_true', dest='status', help='Display the SELinux status of all minions') parser.add_option('-e', '--enforced-denials', action='store_true', dest='enforced_denials', help='Display enforced denials') parser.add_option('-d', '--dump-avcs', action='store_true', dest='dump_avcs', help='Dump AVCs to disk') parser.add_option('-u', '--upgrade-policy', action='store_true', dest='upgrade', help='Upgrade SELinux policy') opts, args = parser.parse_args() minions = len(args) > 0 and ';'.join(args) or '*' overlord = SELinuxOverlord(minions) #if opts.status or opts.enforced_denials: print "Determining SELinux status on minions: %s" % minions pprint(overlord.get_selinux_status()) if opts.enforced_denials: print "Finding enforced SELinux denials..." overlord.get_enforced_denials() if opts.dump_avcs: print "Dumping SELinux denials to disk..." overlord.dump_selinux_denials() if opts.upgrade: overlord.upgrade_policy() # vim: ts=4 sw=4 expandtab ai