commit 545473f1df3ea7ff8d2a8e3de1fd815d7a640417 Author: alban Date: Sun Jul 16 20:03:11 2023 +0000 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8864d4a --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..bfd0d36 --- /dev/null +++ b/README.md @@ -0,0 +1,39 @@ +# Smart and wipe disks + +**Project target : ultimately provide a way to diagnose and clear old disks before use.** + +It should be : + +- based on classic GNU/LINUX tools (smartctl, dd) +- safe to use (never touch a mounted partition) +- automatized for standalone operations + +--- + +For now the project runs through a shell script that needs to be converted to python. + +``` +$ pip install -r requirements.txt + +# sudo/root is required to operate smartctl + +$ sudo ./bin/main.sh + +``` + +## Roadmap + +It needs to improve based on the sources in the src dir : + +- [pySMART](https://pypi.org/project/pySMART/) +- [py_smartjson](https://github.com/kroy-the-rabbit/py_smartjson) +- [smart_status](https://github.com/ixs/smart_status) +- [pyWype](https://github.com/marshki/pyWype) + + +Known dependencies + +``` +- pip install pySMART +- pip install jsonpickle +``` diff --git a/bin/main.sh b/bin/main.sh new file mode 100755 index 0000000..72c6582 --- /dev/null +++ b/bin/main.sh @@ -0,0 +1,131 @@ +#! /bin/bash +#set -e +APP_PATH=$( cd $(dirname $0) && pwd) +SRC_PATH=$( cd $(dirname $0)/../src && pwd) +cd $APP_PATH +clear +echo -e "## DEVICES\n" +DEVICES=$(lsblk /dev/sd? --nodeps --output NAME,MODEL,VENDOR,SIZE,TYPE,STATE) +echo "$DEVICES" +MOUNT_ROOT=$(mount |grep " / " |awk '{print $1}' |sed -r 's=/dev/(...)[0-9]$=\1=') +declare -a NOT_ROOT +while read device others; do + [[ "$device" != $MOUNT_ROOT ]] && [[ "$device" != "NAME" ]] && NOT_ROOT+=( $device ) +done <<< "$DEVICES" + +NOT_ROOT_STR=$( echo ${NOT_ROOT[@]} ) +echo -e "\n## ROOT MOUNTED DEVICE\n$MOUNT_ROOT" +echo -e "\n## OTHER DEVICES\n${NOT_ROOT_STR}\n" + + +echo -e "\n## SMARTCTL TESTS" +read -e -p "Do you want to run tests? [y/N] " -n 1 +REPLY=${REPLY:-N} +if [[ "N" != "${REPLY^^}" ]] ; then + echo -e "\n## SMARTCTL DISKS SELECTION" + echo -e "Do you want to USE ALL non root devices (empty reply)? \nOr else please type specific DEVICES NAMES to include (ex: '${NOT_ROOT_STR}')?" + read -e -p "Type your answer: " + CMD="$SRC_PATH/smartjson.py long " + if [[ -z "$REPLY" ]] ; then + CMD+=" -e $MOUNT_ROOT" + else + + DEVICES=${REPLY} + for i in $DEVICES; do + CMD+=" -d $i" + done + fi + + echo -e "\n## READY" + read -e -i Y -n 1 -p "About to run command '$CMD'. OK [Y/n]? " + + # $CMD + START=$(date "+%s") + set +e + while true; do + clear + echo -e "\n## CHECKING STATUS" + + date + echo -e "\nRunning since: $(( $( date +%s ) - $START )) seconds.\n" + $SRC_PATH/smartjson.py status && break + echo -e "\n## MANUAL HALT" + read -e -i Y -p "Stop ?" -t 15 + [[ ${REPLY^^} == "Y" ]] && break + done + +fi + +echo -e "\n## SMARTCTL DISKS STATUS" +$SRC_PATH/smartjson.py list + + +echo -e "\n## DISKS BACKGROUND ERASE\nCaution: this might cause data loss." +read -e -p "Do you want to run a background erasure? [y/N] " -n 1 +REPLY=${REPLY:-N} +if [[ "N" != "${REPLY^^}" ]] ; then + ERASE_DEVICES="" + while [[ -z "$ERASE_DEVICES" ]] ; do + echo "Please provide device names. Ex: 'sda sdc'" + read -e -p "Devices to erase: " -i "${NOT_ROOT_STR}" ERASE_DEVICES + declare -A DD_CMD + ERROR="false" + for f in $ERASE_DEVICES; do + if [[ ! -b /dev/$f ]] || [[ "$MOUNT_ROOT" == $f ]] ; then + echo "ERROR. $f is not a valid device." + ERROR="true" + break + fi + DD_CMD["$f"]="dd if=/dev/zero of=/dev/$f bs=512K" + done + if [[ "$ERROR" == "true" ]]; then + ERASE_DEVICES="" + continue + fi + done + echo -e "\n## CONFIRMATION\nYou are about to run the following commands." + for i in ${!DD_CMD[@]}; do + echo "Disk $i: ${DD_CMD[$i]}" + done + read -e -p "Please type 'Yes' to validate: " VALIDATE + if [[ "YES" != "${VALIDATE^^}" ]]; then + echo "EXIT" + exit + else + echo -e "\n## RUNNING ERASURE\n" + declare -A DD_PID + for i in ${!DD_CMD[@]}; do + ${DD_CMD[$i]}& + DD_PID[$i]=$! + echo "$i PID ${DD_PID[$i]}..." + sleep 1 + done + sleep 3 + fi +fi + +[[ -n ${DD_PID[@]} ]] && while true; do + clear + echo -e "\n## CHECKING STATUS" + FINISHED_COUNT=O + + if [[ ${#DD_PID[@]} -ne 0 ]] ; then + for i in ${!DD_PID[@]}; do + PID=${DD_PID[$i]} + ps -f -p $PID &>/dev/null + R=$? + if [[ 0 -ne $R ]] ; then + MSG="Finished" + let $(( FINISHED_COUNT++ )) + else + MSG="Process running" + fi + echo "DISK $i PID $i : $MSG" + done + fi + [[ ${#DD_PID[@]} -eq $FINISHED_COUNT ]] && break + echo -e "\n## MANUAL HALT" + read -e -i Y -p "Stop ?" -t 15 + [[ ${REPLY^^} == "Y" ]] && break +done + diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/src/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/src/SmartDevice.py b/src/SmartDevice.py new file mode 100644 index 0000000..1353423 --- /dev/null +++ b/src/SmartDevice.py @@ -0,0 +1,18 @@ +class SmartDevice: + c_TEMP=194 + c_REALLOC_SECTORS=5 + c_CURRENT_PENDING_SECTORS=197 + c_HOURS=9 + def __init__(self, device): + self.dev = device.name + self.model = device.model + self.serial = device.serial + self.temp = device.attributes[self.c_TEMP].raw + self.reallocated_sectors = device.attributes[self.c_REALLOC_SECTORS].raw + self.capacity = device.capacity + self.firmware = device.firmware + self.smart_status = device.assessment + self.ssd = device.is_ssd + self.hours = device.attributes[self.c_HOURS].raw + self.full_attributes = device.attributes + diff --git a/src/py_wype.py b/src/py_wype.py new file mode 100755 index 0000000..29df7ec --- /dev/null +++ b/src/py_wype.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 + +"""Disk-wiping utility for GNU/Linux, written in Python 2 & 3. +""" + +from __future__ import print_function + +import os +import platform +import re +import sys + +try: + input = raw_input +except NameError: + pass + +def is_linux(): + """Check if system is 'Linux' + """ + + if 'Linux' not in platform.system(): + print("This program was designed for GNU/Linux. Exiting.") + sys.exit() + +def root_user_check(): + """Check if current UID is 0. + """ + + if os.getuid() != 0: + print("This program requires ROOT privileges. Exiting.") + sys.exit() + +def list_mounted_devices(): + """List mounted device(s) / partition(s). + """ + + print(22 * "-", "DEVICES & PARTITIONS", 22 * "-") + + return os.system('lsblk /dev/sd* --nodeps --output NAME,MODEL,VENDOR,SIZE,TYPE,STATE') + +def define_device_to_wipe(): + """Prompt user to define device or partition to wipe. + """ + + while True: + try: + device = input( + "Enter letter [number] of device/partition to wipe," + "\ne.g. to wipe '/dev/sdb1' enter 'b1': ") + + if not re.match("^[a-z][0-9]?$", device): + raise ValueError() + return device + + except ValueError: + print("Sorry, that's not a valid device or partition. Try again.") + +def append_device_to_wipe(): + """Append user-defined device/partition to /dev/sd. + """ + + letter = define_device_to_wipe() + + return '/dev/sd' + letter + +def number_of_wipes(): + """Prompt user for number of wipes to perform. + """ + + while True: + try: + wipes = int(input("How many times do you want to wipe the device or partition?: ")) + + if wipes <= 0: + raise ValueError() + return wipes + + except ValueError: + print("Sorry, that's not a valid number. Try again: ") + +def confirm_wipe(): + """Prompt user to confirm disk erasure. + """ + + print("WARNING!!! WRITING CHANGES TO DISK WILL RESULT IN IRRECOVERABLE DATA LOSS.") + + while True: + try: + reply = input("Do you want to proceed? (Yes/No): ").lower().strip() + + if reply == 'yes': + return True + if reply == 'no': + print("Exiting pyWype.") + sys.exit() + + except ValueError: + print("Sorry, that's not a valid entry. Try again: ") + +def write_zeros_to_device(): + """Write zeros to device/partition. + """ + + append = append_device_to_wipe() + num = number_of_wipes() + confirm_wipe() + + for i in range(num): + print("Processing pass count {} of {} ... ".format(i + 1, num)) + os.system(('dd if=/dev/zero |pv --progress --time --rate --bytes|' + 'dd of={} bs=1024'.format(append))) + +def write_random_to_device(): + """Write random zeros and ones to device/partition. + """ + + append = append_device_to_wipe() + num = number_of_wipes() + confirm_wipe() + + for i in range(num): + print("Processing pass count {} of {} ... ".format(i + 1, num)) + os.system(('dd if=/dev/urandom |pv --progress --time --rate --bytes|' + 'dd of={} bs=1024'.format(append))) + +def menu(): + """Menu prompt for use to select program option. + """ + + list_mounted_devices() + + while True: + try: + print(30 * "-", "MENU", 30 * "-") + print("1. Overwrite device or partition with 0's \n(faster, less secure).") + print("2. Overwrite device or partition with random 0\'s & 1\'s" + "\n(slower, more secure).") + print("3. Quit.") + + choice = input("Select an option (1, 2 or 3): ") + + if choice not in ('1', '2', '3'): + raise ValueError() + return choice + + except ValueError: + print("Sorry, that's not a valid number. Try again: ") + +def interactive_mode(): + """Display menu-driven options and run function based on selection. + """ + + while True: + choice = menu() + + if choice == '3': + sys.exit() + elif choice == '1': + write_zeros_to_device() + elif choice == '2': + write_random_to_device() + +def wipe_device(): + """Program to wipe drive. + """ + + is_linux() + root_user_check() + interactive_mode() + +if __name__ == '__main__': + print(28 * '-', " pyWype ", 28 * '-') + print("PYTHON DISK & PARTITION WIPING UTILITY FOR GNU/LINUX." + "\nTHIS UTILITY WILL IRRECOVERABLY WIPE DATA FROM DRIVE.\nPROCEED WITH CAUTION.") + + wipe_device() diff --git a/src/smart_status.py b/src/smart_status.py new file mode 100755 index 0000000..99dd1f9 --- /dev/null +++ b/src/smart_status.py @@ -0,0 +1,549 @@ +#! /usr/bin/env python3 +# Source : https://github.com/ixs/smart_status +# smartmontools disk status +# +# Copyright (c) 2015 Andreas Thienemann +# +# Use all available SMART data to ascertain whether a disk is probably okay or not. +# As customer available SMART attributes are basically unusable to predict failure, +# the script will schedule selftests in order to discover disk (hopefully) before +# they result in loss of data. +# +# Licensed under the GPL v3.0 or any later version +# + +import sys +import subprocess +import os +import time +import re +import pprint +import traceback +import stat +import argparse + +class bcolors: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + +class smart_status: + def __init__(self): + # The errorcode decoder map for smartctl taken from the manpage + self.error_map = ( + 'Command line did not parse.', + 'Device open failed, device did not return an IDENTIFY DEVICE structure, or device is in a low-power mode', + 'Some SMART or other ATA command to the disk failed, or there was a checksum error in a SMART data structure', + 'SMART status check returned "DISK FAILING"', + 'We found prefail Attributes <= threshold.', + 'SMART status check returned "DISK OK" but we found that some (usage or prefail) Attributes have been <= threshold at some time in the past.', + 'The device error log contains records of errors.', + 'The device self-test log contains records of errors. [ATA only] Failed self-tests outdated by a newer successful extended self-test are ignored.' + ) + + self.cfg = dict() + self.cfg['smartctl_bin'] = 'smartctl' + self.cfg['strict'] = False + self.cfg['smartctl_test_threshold'] = 0 + self.cfg['smartctl_test_frequency'] = 0 + self.cfg['verbose'] = False + self.cfg['color'] = False + self.cfg['disks'] = list() + + def colorize(self, mode): + if mode == False: + bcolors.HEADER = '' + bcolors.WARNING = '' + bcolors.OKGREEN = '' + bcolors.OKBLUE = '' + bcolors.FAIL = '' + bcolors.ENDC = '' + + + def find_disks(self): + disks = list() + + for dev in sorted(os.listdir('/sys/block')): + try: + with open('/sys/block/{}/device/type'.format(dev)) as f: + if f.read().strip() == '0': + disks.append('/dev/{}'.format(dev)) + except: + continue + + return disks + + def schedule_selftest(self, dev, report = False): + (smart_health, smart_selftest, smart_log, smart_attr) = self.fetch_smart(dev, report) + + if not self.judge_selftest(dev, smart_selftest, report = report): + if report: + print( "{col}{dev} Cannot schedule SMART selftest.{cls}".format(col = bcolors.FAIL, dev = dev, cls=bcolors.ENDC) ) + return False + if self.judge_selftest_log(dev, smart_log, smart_attr, report = report)[1]: + if report: + print( "{col}{dev} SMART selftest ran recently. Not scheduling a new one.{cls}".format(col = bcolors.OKBLUE, dev = dev, cls=bcolors.ENDC) ) + return False + else: + if report: + print( "{col}{dev} Scheduling SMART selftest.{cls}".format(col = bcolors.HEADER, dev = dev, cls=bcolors.ENDC) ) + + output = subprocess.check_output([self.cfg['smartctl_bin'], '-t', 'long', dev], universal_newlines=True) + if 'Drive command "Execute SMART Extended self-test routine immediately in off-line mode" successful.' not in output: + if report: + print( "{col}{dev} Scheduling SMART selftest failed.{cls}".format(col = bcolors.FAIL, dev = dev, cls=bcolors.ENDC) ) + return False + elif 'Testing has begun.' in output: + for l in output.split("\n"): + if l.startswith("Please wait "): + duration = l.split()[2] + continue + if l.startswith("Test will complete after "): + eta = l[len("Test will complete after "):] + if report: + print( "{col}{dev} Scheduling SMART selftest successful. Expected duration {duration} min, ETA: {eta}.{cls}".format(col = bcolors.OKBLUE, dev = dev, duration = duration, eta = eta, cls=bcolors.ENDC) ) + return True + + + def judge_health(self, dev, smart_health, report = False): + # Overall health + try: + if smart_health == "PASSED": + healthy = True + col = bcolors.HEADER + else: + col = bcolors.FAIL + healthy = False + if report: + print( "{col}{dev} SMART Health status is {health}. (This value cannot necessarily be trusted){cls}".format(col = col, dev = dev, health = smart_health, cls=bcolors.ENDC) ) + except: + if report: + print( "{col}{dev} SMART Health status cannot be determined.{cls}".format(col=bcolors.FAIL, dev = dev, health = smart_health, cls=bcolors.ENDC) ) + healthy = None + return healthy + + + def judge_attributes(self, dev, smart_attr, report = False): + healthy = None + try: + # Smart Attributes to watch +# for a in ('Reallocated_Sector_Ct', 'Reported_Uncorrect', 'Command_Timeout', 'Current_Pending_Sector', 'Offline_Uncorrectable'): +# try: +# print a, smart_attr[a]['raw_value'] +# except: +# print + if int(smart_attr['Current_Pending_Sector']['raw_value']) > 0: + if report: + print( "{col}{dev} SMART Attribute Current_Pending_Sector indicates failing disk.{cls}".format(col=bcolors.FAIL, dev = dev, cls=bcolors.ENDC) ) + healthy = False + else: + healthy = True + except: + pass + return healthy + + def judge_selftest(self, dev, smart_selftest, report = False): + """Judge whether we can schedule a selftest + """ + + try: + (selftest_num, selftest_txt) = smart_selftest + + if selftest_num == 0: + if report: + print( "{col}{dev} No SMART selftest is currently running.{cls}".format(col=bcolors.OKBLUE, dev = dev, txt = selftest_txt, cls=bcolors.ENDC) ) + return True + elif selftest_num >= 240 and selftest_num <= 250: + if report: + print( "{col}{dev} SMART selftest is currently running: {txt}.{cls}".format(col=bcolors.OKBLUE, dev = dev, txt = selftest_txt, cls=bcolors.ENDC) ) + return False + elif selftest_num == 25: + if report: + print( "{col}{dev} Last SMART selftest had a problem: {txt}.{cls}".format(col=bcolors.WARNING, dev = dev, txt = selftest_txt, cls=bcolors.ENDC) ) + return True + else: + if report: + print( "{col}{dev} SMART selftest had a problem: {txt}.{cls}".format(col=bcolors.FAIL, dev = dev, txt = selftest_txt, cls=bcolors.ENDC) ) + return True + except: + if report: + print( "{col}{dev} cannot determine selftest status.{cls}".format(col = bcolors.WARNING, dev = dev, cls = bcolors.ENDC) ) + return False + + + def judge_selftest_log(self, dev, smart_log, smart_attr, report = False): + """ + returns (selftest ok, selftest current) + """ + healthy = True + current = None + try: + uptime = int(smart_attr['Power_On_Hours']['raw_value']) + except: + if report: + #print "{col}{dev} cannot determine power on hours.{cls}".format(col=bcolors.WARNING, dev=dev, cls=bcolors.ENDC) + pass + pass + + try: + # Iterate over the log entrys and ignore useless/invalid logs + for entry in sorted(smart_log): + if smart_log[entry]['Status'] in ('Self-test routine in progress', 'Interrupted (host reset)' and 'Aborted by host'): + continue + else: + last_test = int(smart_log[entry]['LifeTime(hours)']) + test_type = smart_log[entry]['Test_Description'] + test_state = smart_log[entry]['Status'] + test_diff = uptime - last_test + break + if test_diff < self.cfg['smartctl_test_frequency'] * 24 and test_state == 'Completed without error': + if self.cfg['smartctl_test_frequency'] == 0: + col = bcolors.HEADER + else: + col = bcolors.OKGREEN + current = True + elif test_diff >= self.cfg['smartctl_test_frequency'] * 24 * 2 and test_state == 'Completed without error': + if self.cfg['smartctl_test_frequency'] == 0: + col = bcolors.HEADER + else: + col = bcolors.FAIL + current = False + elif test_diff >= self.cfg['smartctl_test_frequency'] * 24 and test_state == 'Completed without error': + if self.cfg['smartctl_test_frequency'] == 0: + col = bcolors.HEADER + else: + col = bcolors.WARNING + current = False + elif test_state.startswith('Self-test routine in'): + col = '' + healthy = None + current = True + else: + col = bcolors.FAIL + healthy = False + current = False + if report: + hrs = uptime - last_test + if hrs < 1: + tspec = '1 hour' + elif hrs <= 24: + tspec = '{} hours'.format(hrs) + elif hrs > 24 and hrs < 24 * 2: + tspec = '{} day {} hours'.format(hrs / 24, hrs % 24) + elif hrs >= 24 * 2 and hrs < 24 * 14: + tspec = '{} days {} hours'.format(hrs / 24, hrs % 24) + else: + tspec = '{} weeks {} days {} hours'.format(hrs / 24 / 7, hrs / 24, hrs % 24) + + print( "{col}{dev} last {type} selftest {state} and finished {tspec} ago.{cls}".format(col = col, dev = dev, tspec = tspec, type = test_type.lower(), state = test_state.lower(), cls = bcolors.ENDC) ) + except Exception as err: + if report: + print( "{col}{dev} never finished a SMART selftest.{cls}".format(col = bcolors.WARNING, dev = dev, cls = bcolors.ENDC) ) + return (healthy, current) + + + def verify_smart(self, dev, report = False): + """Verify the SMART status of a disk and return True or False depending on state. + This is a guesstimate as SMART is basically unreliable""" + + health = [] + + (smart_health, smart_selftest, smart_log, smart_attr) = self.fetch_smart(dev, report) + try: + # Overall health + health.append(self.judge_health(dev, smart_health, report = report)) + + # Attribute health + health.append(self.judge_attributes(dev, smart_attr, report = report)) + + # Smart Selftest capability + self.judge_selftest(dev, smart_selftest, report = report) + + # Selftest log + health.append(self.judge_selftest_log(dev, smart_log, smart_attr, report = report)[0]) + + except Exception as err: + print( traceback.format_exc() ) + raise(err) + + if None in health and self.cfg['strict'] == True: + return None + elif False in health: + return False + else: + return True + + + def fetch_smart(self, dev, report = False): + """Verify the disk is still safe to use according to smartctl output. + Yes, this is only a best effort... SMART is not trustworthy. + """ + try: + output = subprocess.check_output([self.cfg['smartctl_bin'], '-H', '-c', '-A', '-l', 'selftest', dev], universal_newlines=True) + except subprocess.CalledProcessError as e: + ret = e.returncode + output = e.output + # Decode bitmasked return code + msg = list() + for i in range(0,len(self.error_map)): + if ((ret & 2**i) >> i) != 0: + msg.append(self.error_map[i]) + for m in msg: + if report and self.error_map.index(m) in (2,) and smart.cfg['strict'] == False: + col = bcolors.WARNING + else: + col = bcolors.FAIL + if report: + print( "{col}{dev} smartctl output: {msg}{cls}".format(col=col, dev=dev, msg=m, cls=bcolors.ENDC) ) + + + if report: + if 'SMART Attributes Data Structure revision number' not in output: + print( "{col}{dev} does not support SMART attributes.{cls}".format(col=bcolors.WARNING, dev=dev, cls=bcolors.ENDC) ) + if 'SMART Self-test log structure revision number' not in output: + print( "{col}{dev} does not support SMART selftest.{cls}".format(col=bcolors.WARNING, dev=dev, cls=bcolors.ENDC) ) + + # Simple smartctl output parser + # Attributes we can split by whitespace + # Log entries we need to parse by looking at str.find() based using the header as a template + section = None + attrs = dict() + logs = dict() + health = None + selftest = list() + linecont = False # Is the next line a continuation of the current item? Important for capabilities + for l in output.split("\n"): + attr = dict() + log = list() + + # section end + if section is not None and l == "": + section = None + continue + + # Overall health + if l.startswith("SMART overall-health self-assessment test result"): + health = l.split(':')[1].strip() + + # Capabilities, we're only caring for the selftest status + if l.startswith("General SMART Values"): + section = 'cap' + continue + if section == 'cap': + if l.startswith('Self-test execution status'): + selftest.append(l) + linecont = 'selftest' + continue + + if linecont is not None and l.startswith("\t"): + if linecont == 'selftest': + selftest.append(l) + continue + else: + linecont = None + + + # Attr + if l.startswith("Vendor Specific SMART Attributes with Thresholds"): + section = 'attr' + continue + + if section == 'attr': + if l.startswith("ID#"): + continue + else: + attr = dict(zip(('id', 'name', 'flag', 'value', 'worst', 'thresh', 'type', 'updated', 'when_failed', 'raw_value'), l.split(None, 9))) + attrs[attr['name']] = attr + + + # Log + if l.startswith("SMART Self-test log structure revision number"): + section = 'log' + continue + + if section == 'log': + if l.startswith("Num"): + log_header = l + log_item_pos = map(log_header.find, log_header.split()) + continue + elif l.startswith('No self-tests have been logged.'): + section = None + continue + + else: + try : + log_item_pos = list(log_item_pos) + for i in range(0, len(log_item_pos)): + if i == 3: + s = log_item_pos[i] + 5 # Special handling for the status where the table header doesn't line up with the table data + else: + s = log_item_pos[i] + if i < len(log_item_pos) - 1: + if i == 2: + e = log_item_pos[i + 1] + 5 # Special handling for the status where the table header doesn't line up with the table data + else: + e = log_item_pos[i + 1] + else: + e = len(l) + log.append(l[s:e].strip()) + logs[log[0]] = dict(zip(log_header.split(), log)) + except UnboundLocalError as exc : + print(f"Device {dev} doesn't offer logs capacity") + + # Fixup the selftest status + try: + m = re.search('\([ ]*(?P\d+)\)\s(?P.*)', selftest[0]) + num = int(m.group('num_status')) + txt = ([m.group('text_status')]) + txt.extend(map(str.strip, selftest[1:])) + txt = " ".join(txt) + selftest = (num, txt) + except: + selftest = None + + return health, selftest, logs, attrs + + + +def check_single_dev(dev, report = True): + try: + res = smart.verify_smart(dev, report) + + return res + + except Exception as err: + pass + print( "{0} Error getting SMART data".format(dev) ) + print( traceback.format_exc() ) + +def parse_opts(): + parser = argparse.ArgumentParser(description="""Hard drives use Self-Monitoring, Analysis and Reporting Technology (SMART) to export data about the health of a disk device. +{prog} is a tool to parse this data and tries to detect pending or post disk failures and report on disk status. +Unfortunately SMART failure prediction is rarely reliable. +Reporting on actual disk failures however generally works.""".format(prog=os.path.basename(sys.argv[0]))) + group_op_sel = parser.add_mutually_exclusive_group(required=True) + group_op_sel.add_argument("-a", "--autodetect", "--all", action='store_true', help="Autodetect disks and scan.") + group_op_sel.add_argument("-d", "--disks", action='append', nargs=1, help="Only handle specific disk device.") + group_op_sel.add_argument("-b", "--smartctl", help="Overide smartctl binary location if not in path.", default = 'smartctl') + group_nag = parser.add_argument_group('Nagios', description="Format output to be usable as a Nagios compatible plugin.") + group_nag.add_argument("-n", "--nagios", action='store_true', help="Return data in a form usable as a nagios check.") + group_nag.add_argument("-u", "--unknown", choices=['warning', 'critical'], help="Change alert level of unknown smart status.") + group_nag.add_argument("-w", "--warning", choices=['unknown', 'critical'], help="Change alert level of warning smart status.") + parser.add_argument("-i", "--ignore", action='append', nargs="+", help="Ignore specific disk devices. Helpful when scanning for all disks.", default = []) + parser.add_argument("-s", "--schedule", type=int, help="Frequency in days after which a selftest is considered out of date and will be rescheduled.") + parser.add_argument("-t", "--threshold", type=int, help="Frequency in days after which a selftest is considered out of date and will be warned about but not rescheduled.") + parser.add_argument("-v", "--verbose", action='store_true', help="Print more status information.") + parser.add_argument("-x", "--strict", action='store_true', help="Strict checking. Report a device not supporting SMART attributes or selftest as unknown/error instead of relying on the unreliable general SMART health feedback.", default = False) + parser.add_argument("-c", "--color", "--colour", action='store_true', help="Colorize output.", default = False) + args = parser.parse_args() + return args + +if __name__ == '__main__': + smart = smart_status() + args = parse_opts() + + smart.cfg['smartctl_bin'] = args.smartctl + + if args.autodetect: + smart.cfg['disks'] = smart.find_disks() + + if args.strict: + smart.cfg['strict'] = True + + if args.schedule == None: + smart.cfg['smartctl_test_frequency'] = 0 + else: + smart.cfg['smartctl_test_frequency'] = args.schedule + + if args.schedule == None: + smart.cfg['smartctl_test_threshold'] = 0 + else: + smart.cfg['smartctl_test_threshold'] = args.schedule + + if not args.color: + smart.colorize(False) + + if args.verbose: + smart.cfg['verbose'] = True + + try: + if args.disks : + map(lambda x: x[0], args.disks) + smart.cfg['disks'] = sorted(list(set(map(lambda x: x[0], args.disks)) - set(map(lambda x: x[0], args.ignore)))) + except Exception as e: + print( traceback.format_exc() ) + pass + + if len(smart.cfg['disks']) > 0 and not args.nagios: + col = list() + msg = list() + ret = list() + sched = list() + for disk in smart.cfg['disks']: + try: + if not stat.S_ISBLK(os.stat(disk).st_mode): + raise() + except: + msg.append("Invalid device") + ret.append(255) + + if smart.cfg['verbose']: + print( "Checking {}:".format(disk) ) + res = check_single_dev(disk, report = smart.cfg['verbose']) + + if res == True: + col.append(bcolors.OKGREEN) + msg.append("Disk healthy") + ret.append(0) + elif res == None: + col.append(bcolors.WARNING) + msg.append("Insufficient SMART support") + ret.append(2) + else: + col.append(bcolors.FAIL) + msg.append("Disk failing") + ret.append(1) + + if smart.cfg['smartctl_test_frequency'] > 0: + if smart.cfg['verbose']: + print( "Scheduling selftest {}:".format(disk) ) + if smart.schedule_selftest(disk, report = smart.cfg['verbose']): + sched.append('New selftest scheduled.') + else: + sched.append('') + else: + sched.append('') + + for i in range(0, len(smart.cfg['disks'])): + print( "{disk}: {col}{msg}{cls} {sched}".format(col=col[i], msg=msg[i], disk=smart.cfg['disks'][i], cls=bcolors.ENDC, sched = sched[i]) ) + sys.exit(max(ret)) + + elif 'disks' in args and args.nagios: + res = dict() + for disk in smart.cfg['disks']: + res[disk] = check_single_dev(disk, report = smart.cfg['verbose']) + if smart.cfg['smartctl_test_frequency'] > 0: + smart.schedule_selftest(disk, report = smart.cfg['verbose']) + + # Format nagios line + line = '' + for disk in sorted(res): + if res[disk] == True: + status = 'Ok' + elif res[disk] == None: + status = 'Unkn' + elif res[disk] == False: + status = 'Err' + line += "{}: {}, ".format(disk, status) + line = line[:-2] + + if False in res.values(): + print( 'CRITICAL: smart_status reports {} disk(s) as having errors. {}'.format(res.values().count(False), line) ) + sys.exit(2) + else: + print( 'OK: smart_status reports {} disk(s) as okay. {}'.format(res.values().count(True), line) ) + sys.exit(0) + diff --git a/src/smartjson.py b/src/smartjson.py new file mode 100755 index 0000000..ca208b8 --- /dev/null +++ b/src/smartjson.py @@ -0,0 +1,80 @@ +#! /usr/bin/python3 + +from SmartDevice import SmartDevice +from pySMART import Device, DeviceList + +import jsonpickle +import argparse +import sys + +actionList = ["list","status","short","long","abort"] + +parser = argparse.ArgumentParser(description='Handle smart disks.') +parser.add_argument('action', help="What to do: {}".format(actionList)) +parser.add_argument('-d','--disk', required=False, help="Use a single disk ex: 'sdb'") +parser.add_argument('-e','--exclude', required=False, help="Exclude a single disk ex: 'sda'") + +args = parser.parse_args() +disk = args.disk +exclude = args.exclude +action = args.action + +devices = {} + + + +if disk: + dev = Device(disk) + d = SmartDevice(dev) + devices[dev] = { + "model" : d.model, + "smart_status" : d.smart_status, + "hours": d.hours + } +else: + devlist = DeviceList() + for dev in devlist.devices: + try: + d = SmartDevice(dev) + except Exception as e: + print("Failed to convert to device",e) + continue + if d.dev == exclude : + continue + devices[dev] = { + "model" : d.model, + "smart_status" : d.smart_status, + "hours": d.hours + } + +if action == "list" : + + print (jsonpickle.encode(devices,indent=2)) + +elif action == "long" : + for dev in devices: + dev.run_selftest("long") + +elif action == "short" : + for dev in devices: + dev.run_selftest("short") + +elif action == "status" : + code = 0 + msgList = [] + for dev in devices: + r = dev.get_selftest_result() + d = SmartDevice(dev) + msg = "{} / {} : {}".format(d.dev,d.serial, r[1]) + if r[0] == 1 or r[0] == 3: + code = 1 + if r[2] : + msg += " {}% done".format(r[2]) + msgList.append(msg) + print( "\n".join(msgList) ) + sys.exit(code) + +elif action == "json": + print( jsonpickle.encode(devices, indent=2) ) + +