#! /usr/bin/env python3 # Source : https://github.com/ixs/smart_status # smartmontools disk status # # Copyright (c) 2015 Andreas Thienemann # # Use all available SMART data to ascertain whether a disk is probably okay or not. # As customer available SMART attributes are basically unusable to predict failure, # the script will schedule selftests in order to discover disk (hopefully) before # they result in loss of data. # # Licensed under the GPL v3.0 or any later version # import sys import subprocess import os import time import re import pprint import traceback import stat import argparse class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' class smart_status: def __init__(self): # The errorcode decoder map for smartctl taken from the manpage self.error_map = ( 'Command line did not parse.', 'Device open failed, device did not return an IDENTIFY DEVICE structure, or device is in a low-power mode', 'Some SMART or other ATA command to the disk failed, or there was a checksum error in a SMART data structure', 'SMART status check returned "DISK FAILING"', 'We found prefail Attributes <= threshold.', 'SMART status check returned "DISK OK" but we found that some (usage or prefail) Attributes have been <= threshold at some time in the past.', 'The device error log contains records of errors.', 'The device self-test log contains records of errors. [ATA only] Failed self-tests outdated by a newer successful extended self-test are ignored.' ) self.cfg = dict() self.cfg['smartctl_bin'] = 'smartctl' self.cfg['strict'] = False self.cfg['smartctl_test_threshold'] = 0 self.cfg['smartctl_test_frequency'] = 0 self.cfg['verbose'] = False self.cfg['color'] = False self.cfg['disks'] = list() def colorize(self, mode): if mode == False: bcolors.HEADER = '' bcolors.WARNING = '' bcolors.OKGREEN = '' bcolors.OKBLUE = '' bcolors.FAIL = '' bcolors.ENDC = '' def find_disks(self): disks = list() for dev in sorted(os.listdir('/sys/block')): try: with open('/sys/block/{}/device/type'.format(dev)) as f: if f.read().strip() == '0': disks.append('/dev/{}'.format(dev)) except: continue return disks def schedule_selftest(self, dev, report = False): (smart_health, smart_selftest, smart_log, smart_attr) = self.fetch_smart(dev, report) if not self.judge_selftest(dev, smart_selftest, report = report): if report: print( "{col}{dev} Cannot schedule SMART selftest.{cls}".format(col = bcolors.FAIL, dev = dev, cls=bcolors.ENDC) ) return False if self.judge_selftest_log(dev, smart_log, smart_attr, report = report)[1]: if report: print( "{col}{dev} SMART selftest ran recently. Not scheduling a new one.{cls}".format(col = bcolors.OKBLUE, dev = dev, cls=bcolors.ENDC) ) return False else: if report: print( "{col}{dev} Scheduling SMART selftest.{cls}".format(col = bcolors.HEADER, dev = dev, cls=bcolors.ENDC) ) output = subprocess.check_output([self.cfg['smartctl_bin'], '-t', 'long', dev], universal_newlines=True) if 'Drive command "Execute SMART Extended self-test routine immediately in off-line mode" successful.' not in output: if report: print( "{col}{dev} Scheduling SMART selftest failed.{cls}".format(col = bcolors.FAIL, dev = dev, cls=bcolors.ENDC) ) return False elif 'Testing has begun.' in output: for l in output.split("\n"): if l.startswith("Please wait "): duration = l.split()[2] continue if l.startswith("Test will complete after "): eta = l[len("Test will complete after "):] if report: print( "{col}{dev} Scheduling SMART selftest successful. Expected duration {duration} min, ETA: {eta}.{cls}".format(col = bcolors.OKBLUE, dev = dev, duration = duration, eta = eta, cls=bcolors.ENDC) ) return True def judge_health(self, dev, smart_health, report = False): # Overall health try: if smart_health == "PASSED": healthy = True col = bcolors.HEADER else: col = bcolors.FAIL healthy = False if report: print( "{col}{dev} SMART Health status is {health}. (This value cannot necessarily be trusted){cls}".format(col = col, dev = dev, health = smart_health, cls=bcolors.ENDC) ) except: if report: print( "{col}{dev} SMART Health status cannot be determined.{cls}".format(col=bcolors.FAIL, dev = dev, health = smart_health, cls=bcolors.ENDC) ) healthy = None return healthy def judge_attributes(self, dev, smart_attr, report = False): healthy = None try: # Smart Attributes to watch # for a in ('Reallocated_Sector_Ct', 'Reported_Uncorrect', 'Command_Timeout', 'Current_Pending_Sector', 'Offline_Uncorrectable'): # try: # print a, smart_attr[a]['raw_value'] # except: # print if int(smart_attr['Current_Pending_Sector']['raw_value']) > 0: if report: print( "{col}{dev} SMART Attribute Current_Pending_Sector indicates failing disk.{cls}".format(col=bcolors.FAIL, dev = dev, cls=bcolors.ENDC) ) healthy = False else: healthy = True except: pass return healthy def judge_selftest(self, dev, smart_selftest, report = False): """Judge whether we can schedule a selftest """ try: (selftest_num, selftest_txt) = smart_selftest if selftest_num == 0: if report: print( "{col}{dev} No SMART selftest is currently running.{cls}".format(col=bcolors.OKBLUE, dev = dev, txt = selftest_txt, cls=bcolors.ENDC) ) return True elif selftest_num >= 240 and selftest_num <= 250: if report: print( "{col}{dev} SMART selftest is currently running: {txt}.{cls}".format(col=bcolors.OKBLUE, dev = dev, txt = selftest_txt, cls=bcolors.ENDC) ) return False elif selftest_num == 25: if report: print( "{col}{dev} Last SMART selftest had a problem: {txt}.{cls}".format(col=bcolors.WARNING, dev = dev, txt = selftest_txt, cls=bcolors.ENDC) ) return True else: if report: print( "{col}{dev} SMART selftest had a problem: {txt}.{cls}".format(col=bcolors.FAIL, dev = dev, txt = selftest_txt, cls=bcolors.ENDC) ) return True except: if report: print( "{col}{dev} cannot determine selftest status.{cls}".format(col = bcolors.WARNING, dev = dev, cls = bcolors.ENDC) ) return False def judge_selftest_log(self, dev, smart_log, smart_attr, report = False): """ returns (selftest ok, selftest current) """ healthy = True current = None try: uptime = int(smart_attr['Power_On_Hours']['raw_value']) except: if report: #print "{col}{dev} cannot determine power on hours.{cls}".format(col=bcolors.WARNING, dev=dev, cls=bcolors.ENDC) pass pass try: # Iterate over the log entrys and ignore useless/invalid logs for entry in sorted(smart_log): if smart_log[entry]['Status'] in ('Self-test routine in progress', 'Interrupted (host reset)' and 'Aborted by host'): continue else: last_test = int(smart_log[entry]['LifeTime(hours)']) test_type = smart_log[entry]['Test_Description'] test_state = smart_log[entry]['Status'] test_diff = uptime - last_test break if test_diff < self.cfg['smartctl_test_frequency'] * 24 and test_state == 'Completed without error': if self.cfg['smartctl_test_frequency'] == 0: col = bcolors.HEADER else: col = bcolors.OKGREEN current = True elif test_diff >= self.cfg['smartctl_test_frequency'] * 24 * 2 and test_state == 'Completed without error': if self.cfg['smartctl_test_frequency'] == 0: col = bcolors.HEADER else: col = bcolors.FAIL current = False elif test_diff >= self.cfg['smartctl_test_frequency'] * 24 and test_state == 'Completed without error': if self.cfg['smartctl_test_frequency'] == 0: col = bcolors.HEADER else: col = bcolors.WARNING current = False elif test_state.startswith('Self-test routine in'): col = '' healthy = None current = True else: col = bcolors.FAIL healthy = False current = False if report: hrs = uptime - last_test if hrs < 1: tspec = '1 hour' elif hrs <= 24: tspec = '{} hours'.format(hrs) elif hrs > 24 and hrs < 24 * 2: tspec = '{} day {} hours'.format(hrs / 24, hrs % 24) elif hrs >= 24 * 2 and hrs < 24 * 14: tspec = '{} days {} hours'.format(hrs / 24, hrs % 24) else: tspec = '{} weeks {} days {} hours'.format(hrs / 24 / 7, hrs / 24, hrs % 24) print( "{col}{dev} last {type} selftest {state} and finished {tspec} ago.{cls}".format(col = col, dev = dev, tspec = tspec, type = test_type.lower(), state = test_state.lower(), cls = bcolors.ENDC) ) except Exception as err: if report: print( "{col}{dev} never finished a SMART selftest.{cls}".format(col = bcolors.WARNING, dev = dev, cls = bcolors.ENDC) ) return (healthy, current) def verify_smart(self, dev, report = False): """Verify the SMART status of a disk and return True or False depending on state. This is a guesstimate as SMART is basically unreliable""" health = [] (smart_health, smart_selftest, smart_log, smart_attr) = self.fetch_smart(dev, report) try: # Overall health health.append(self.judge_health(dev, smart_health, report = report)) # Attribute health health.append(self.judge_attributes(dev, smart_attr, report = report)) # Smart Selftest capability self.judge_selftest(dev, smart_selftest, report = report) # Selftest log health.append(self.judge_selftest_log(dev, smart_log, smart_attr, report = report)[0]) except Exception as err: print( traceback.format_exc() ) raise(err) if None in health and self.cfg['strict'] == True: return None elif False in health: return False else: return True def fetch_smart(self, dev, report = False): """Verify the disk is still safe to use according to smartctl output. Yes, this is only a best effort... SMART is not trustworthy. """ try: output = subprocess.check_output([self.cfg['smartctl_bin'], '-H', '-c', '-A', '-l', 'selftest', dev], universal_newlines=True) except subprocess.CalledProcessError as e: ret = e.returncode output = e.output # Decode bitmasked return code msg = list() for i in range(0,len(self.error_map)): if ((ret & 2**i) >> i) != 0: msg.append(self.error_map[i]) for m in msg: if report and self.error_map.index(m) in (2,) and smart.cfg['strict'] == False: col = bcolors.WARNING else: col = bcolors.FAIL if report: print( "{col}{dev} smartctl output: {msg}{cls}".format(col=col, dev=dev, msg=m, cls=bcolors.ENDC) ) if report: if 'SMART Attributes Data Structure revision number' not in output: print( "{col}{dev} does not support SMART attributes.{cls}".format(col=bcolors.WARNING, dev=dev, cls=bcolors.ENDC) ) if 'SMART Self-test log structure revision number' not in output: print( "{col}{dev} does not support SMART selftest.{cls}".format(col=bcolors.WARNING, dev=dev, cls=bcolors.ENDC) ) # Simple smartctl output parser # Attributes we can split by whitespace # Log entries we need to parse by looking at str.find() based using the header as a template section = None attrs = dict() logs = dict() health = None selftest = list() linecont = False # Is the next line a continuation of the current item? Important for capabilities for l in output.split("\n"): attr = dict() log = list() # section end if section is not None and l == "": section = None continue # Overall health if l.startswith("SMART overall-health self-assessment test result"): health = l.split(':')[1].strip() # Capabilities, we're only caring for the selftest status if l.startswith("General SMART Values"): section = 'cap' continue if section == 'cap': if l.startswith('Self-test execution status'): selftest.append(l) linecont = 'selftest' continue if linecont is not None and l.startswith("\t"): if linecont == 'selftest': selftest.append(l) continue else: linecont = None # Attr if l.startswith("Vendor Specific SMART Attributes with Thresholds"): section = 'attr' continue if section == 'attr': if l.startswith("ID#"): continue else: attr = dict(zip(('id', 'name', 'flag', 'value', 'worst', 'thresh', 'type', 'updated', 'when_failed', 'raw_value'), l.split(None, 9))) attrs[attr['name']] = attr # Log if l.startswith("SMART Self-test log structure revision number"): section = 'log' continue if section == 'log': if l.startswith("Num"): log_header = l log_item_pos = map(log_header.find, log_header.split()) continue elif l.startswith('No self-tests have been logged.'): section = None continue else: try : log_item_pos = list(log_item_pos) for i in range(0, len(log_item_pos)): if i == 3: s = log_item_pos[i] + 5 # Special handling for the status where the table header doesn't line up with the table data else: s = log_item_pos[i] if i < len(log_item_pos) - 1: if i == 2: e = log_item_pos[i + 1] + 5 # Special handling for the status where the table header doesn't line up with the table data else: e = log_item_pos[i + 1] else: e = len(l) log.append(l[s:e].strip()) logs[log[0]] = dict(zip(log_header.split(), log)) except UnboundLocalError as exc : print(f"Device {dev} doesn't offer logs capacity") # Fixup the selftest status try: m = re.search('\([ ]*(?P\d+)\)\s(?P.*)', selftest[0]) num = int(m.group('num_status')) txt = ([m.group('text_status')]) txt.extend(map(str.strip, selftest[1:])) txt = " ".join(txt) selftest = (num, txt) except: selftest = None return health, selftest, logs, attrs def check_single_dev(dev, report = True): try: res = smart.verify_smart(dev, report) return res except Exception as err: pass print( "{0} Error getting SMART data".format(dev) ) print( traceback.format_exc() ) def parse_opts(): parser = argparse.ArgumentParser(description="""Hard drives use Self-Monitoring, Analysis and Reporting Technology (SMART) to export data about the health of a disk device. {prog} is a tool to parse this data and tries to detect pending or post disk failures and report on disk status. Unfortunately SMART failure prediction is rarely reliable. Reporting on actual disk failures however generally works.""".format(prog=os.path.basename(sys.argv[0]))) group_op_sel = parser.add_mutually_exclusive_group(required=True) group_op_sel.add_argument("-a", "--autodetect", "--all", action='store_true', help="Autodetect disks and scan.") group_op_sel.add_argument("-d", "--disks", action='append', nargs=1, help="Only handle specific disk device.") group_op_sel.add_argument("-b", "--smartctl", help="Overide smartctl binary location if not in path.", default = 'smartctl') group_nag = parser.add_argument_group('Nagios', description="Format output to be usable as a Nagios compatible plugin.") group_nag.add_argument("-n", "--nagios", action='store_true', help="Return data in a form usable as a nagios check.") group_nag.add_argument("-u", "--unknown", choices=['warning', 'critical'], help="Change alert level of unknown smart status.") group_nag.add_argument("-w", "--warning", choices=['unknown', 'critical'], help="Change alert level of warning smart status.") parser.add_argument("-i", "--ignore", action='append', nargs="+", help="Ignore specific disk devices. Helpful when scanning for all disks.", default = []) parser.add_argument("-s", "--schedule", type=int, help="Frequency in days after which a selftest is considered out of date and will be rescheduled.") parser.add_argument("-t", "--threshold", type=int, help="Frequency in days after which a selftest is considered out of date and will be warned about but not rescheduled.") parser.add_argument("-v", "--verbose", action='store_true', help="Print more status information.") parser.add_argument("-x", "--strict", action='store_true', help="Strict checking. Report a device not supporting SMART attributes or selftest as unknown/error instead of relying on the unreliable general SMART health feedback.", default = False) parser.add_argument("-c", "--color", "--colour", action='store_true', help="Colorize output.", default = False) args = parser.parse_args() return args if __name__ == '__main__': smart = smart_status() args = parse_opts() smart.cfg['smartctl_bin'] = args.smartctl if args.autodetect: smart.cfg['disks'] = smart.find_disks() if args.strict: smart.cfg['strict'] = True if args.schedule == None: smart.cfg['smartctl_test_frequency'] = 0 else: smart.cfg['smartctl_test_frequency'] = args.schedule if args.schedule == None: smart.cfg['smartctl_test_threshold'] = 0 else: smart.cfg['smartctl_test_threshold'] = args.schedule if not args.color: smart.colorize(False) if args.verbose: smart.cfg['verbose'] = True try: if args.disks : map(lambda x: x[0], args.disks) smart.cfg['disks'] = sorted(list(set(map(lambda x: x[0], args.disks)) - set(map(lambda x: x[0], args.ignore)))) except Exception as e: print( traceback.format_exc() ) pass if len(smart.cfg['disks']) > 0 and not args.nagios: col = list() msg = list() ret = list() sched = list() for disk in smart.cfg['disks']: try: if not stat.S_ISBLK(os.stat(disk).st_mode): raise() except: msg.append("Invalid device") ret.append(255) if smart.cfg['verbose']: print( "Checking {}:".format(disk) ) res = check_single_dev(disk, report = smart.cfg['verbose']) if res == True: col.append(bcolors.OKGREEN) msg.append("Disk healthy") ret.append(0) elif res == None: col.append(bcolors.WARNING) msg.append("Insufficient SMART support") ret.append(2) else: col.append(bcolors.FAIL) msg.append("Disk failing") ret.append(1) if smart.cfg['smartctl_test_frequency'] > 0: if smart.cfg['verbose']: print( "Scheduling selftest {}:".format(disk) ) if smart.schedule_selftest(disk, report = smart.cfg['verbose']): sched.append('New selftest scheduled.') else: sched.append('') else: sched.append('') for i in range(0, len(smart.cfg['disks'])): print( "{disk}: {col}{msg}{cls} {sched}".format(col=col[i], msg=msg[i], disk=smart.cfg['disks'][i], cls=bcolors.ENDC, sched = sched[i]) ) sys.exit(max(ret)) elif 'disks' in args and args.nagios: res = dict() for disk in smart.cfg['disks']: res[disk] = check_single_dev(disk, report = smart.cfg['verbose']) if smart.cfg['smartctl_test_frequency'] > 0: smart.schedule_selftest(disk, report = smart.cfg['verbose']) # Format nagios line line = '' for disk in sorted(res): if res[disk] == True: status = 'Ok' elif res[disk] == None: status = 'Unkn' elif res[disk] == False: status = 'Err' line += "{}: {}, ".format(disk, status) line = line[:-2] if False in res.values(): print( 'CRITICAL: smart_status reports {} disk(s) as having errors. {}'.format(res.values().count(False), line) ) sys.exit(2) else: print( 'OK: smart_status reports {} disk(s) as okay. {}'.format(res.values().count(True), line) ) sys.exit(0)