HEX
Server: Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/7.4.30
System: Linux iZj6c1151k3ad370bosnmsZ 3.10.0-1160.76.1.el7.x86_64 #1 SMP Wed Aug 10 16:21:17 UTC 2022 x86_64
User: root (0)
PHP: 7.4.30
Disabled: NONE
Upload Files
File: //usr/local/share/aliyun-assist/plugin/ecsgo-helper/0.48/src/main.py
# -*- coding: utf-8 -*-
# The module search path `sys.path` MUST be modified carefully to put the
# bundled package directory just after the directory containing the entry script
# but before ANY other system-wide or even user-specified Python module search
# paths. Thus keep the bundled packages are ALWAYS imported correctly for stable
# behavior on various GuestOS environment. See issue #34699130 for problem
# details.
import os
import os.path
import sys

if (sys.version_info.major == 2 and sys.version_info.minor == 7) \
    or (sys.version_info.major == 3 and 6 <= sys.version_info.minor <= 14):
    source_dir = os.path.dirname(__file__)
    bundle_dir = os.path.abspath(os.path.join(source_dir, "../lib"))
    triplet_dir = os.path.join(bundle_dir, "{0}-linux-gnu".format(os.uname()[4]))
    sys.path.insert(1, os.path.join(triplet_dir, "python{0}.{1}/site-packages".format(sys.version_info.major, sys.version_info.minor)))
    sys.path.insert(1, os.path.join(triplet_dir, "python{0}/site-packages".format(sys.version_info.major)))
    sys.path.insert(1, os.path.join(bundle_dir, "python/site-packages"))
else:
    print("""{
    "version": "1.0",
    "status": 125,
    "errors": [{
        "code": "UnsupportedPythonVersion",
        "message": "Unsupported Python version: %s"
    }],
    "metadata": {},
    "analysis": [],
    "data": {}
}
""" % sys.version)
    sys.exit(125)


import argparse
import logging
import os
import re
import traceback

from ecsgo_utils import logger
from ecsgo_utils import oss
from ecsgo_utils.service import SystemdService

class BaselineDiagnosticSubcommand(object):
    NAME = "baseline-diagnostic"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Baseline diagnostic suite")
        self.subparser.add_argument("--since", nargs='?', const=None,
            default=None, type=str, required=False,
            help="Diagnose log messages not older than the specified UTC datetime in format \"yyyy-MM-dd HH:mm:ss\"",
            dest="since")
        self.subparser.add_argument("--until", nargs='?', const=None,
            default=None, type=str, required=False,
            help="Diagnose log messages not newer than the specified UTC datetime in format \"yyyy-MM-dd HH:mm:ss\"",
            dest="until")
        self.subparser.add_argument("--describe-analysis", action="store_true",
            default=False, required=False, help="Describe analysis items with rendered description text",
            dest="describe_analysis")

    def process_options(self, parsed_args, remained_args, root_parser):
        from diagnostic.diagnostic_baseline import BaselineDiagnostic

        since = parsed_args.since
        until = parsed_args.until
        describe_analysis = parsed_args.describe_analysis
        diagnostic = BaselineDiagnostic(since=since, until=until,
            describe_analysis=describe_analysis)
        ret, report = diagnostic.run()

        print(report)
        # Handle general options
        if hasattr(parsed_args, "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
            try:
                oss.upload_report(parsed_args.oss_report_url, report)
            except BaseException as ex:
                logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
                traceback.print_exc(None, sys.stderr)
                sys.exit(115)

        sys.exit(ret)

class EciBaselineDiagnosticSubcommand(object):
    NAME = "eci-baseline-diagnostic"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="ECI baseline diagnostic suite")
        self.subparser.add_argument("--since", nargs='?', const=None,
            default=None, type=str, required=False,
            help="Diagnose log messages not older than the specified UTC datetime in format \"yyyy-MM-dd HH:mm:ss\"",
            dest="since")
        self.subparser.add_argument("--until", nargs='?', const=None,
            default=None, type=str, required=False,
            help="Diagnose log messages not newer than the specified UTC datetime in format \"yyyy-MM-dd HH:mm:ss\"",
            dest="until")
        self.subparser.add_argument("--no-analysis", action="store_true",
            default=False, required=False, help="Do not perform analysis but just gather data",
            dest="no_analysis")
        self.subparser.add_argument("--describe-analysis", action="store_true",
            default=False, required=False, help="Describe analysis items with rendered description text",
            dest="describe_analysis")

    def process_options(self, parsed_args, remained_args, root_parser):
        from diagnostic.diagnostic_eci_baseline import EciBaselineDiagnostic

        since = parsed_args.since
        until = parsed_args.until
        describe_analysis = parsed_args.describe_analysis
        no_analysis = parsed_args.no_analysis
        diagnostic = EciBaselineDiagnostic(since=since, until=until,
            describe_analysis=describe_analysis, no_analysis=no_analysis)
        ret, report = diagnostic.run()

        print(report)
        # Handle general options
        if hasattr(parsed_args, "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
            try:
                oss.upload_report(parsed_args.oss_report_url, report)
            except BaseException as ex:
                logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
                traceback.print_exc(None, sys.stderr)
                sys.exit(115)

        sys.exit(ret)

class OfflineDiagnosticSubcommand(object):
    NAME = "offline-diagnostic"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Offline diagnostic suite")
        self.subparser.add_argument("--disk-serial", type=str, required=True,
            help="Serial number of mounted sysdisk from other instance",
            dest="disk_serial")
        self.subparser.add_argument("--describe-analysis", action="store_true",
            default=False, required=False, help="Describe analysis items with rendered description text",
            dest="describe_analysis")

    def process_options(self, parsed_args, remained_args, root_parser):
        from diagnostic.diagnostic_offline import OfflineDiagnostic

        disk_serial = parsed_args.disk_serial
        describe_analysis = parsed_args.describe_analysis
        diagnostic = OfflineDiagnostic(disk_serial, describe_analysis=describe_analysis)
        ret, report = diagnostic.run()

        print(report)
        # Handle general options
        if hasattr(parsed_args, "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
            try:
                oss.upload_report(parsed_args.oss_report_url, report)
            except BaseException as ex:
                logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
                traceback.print_exc(None, sys.stderr)
                sys.exit(115)

        sys.exit(ret)


class ImageOfflineDiagnosticSubcommand(object):
    NAME = "image-offline-diagnostic"
    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Image Offline diagnostic suite")
        self.subparser.add_argument("--disk-serial", type=str, required=True, help="Serial number of mounted sysdisk from other instance", dest="disk_serial")
        self.subparser.add_argument("--image-id", nargs='?', type=str, required=False, help="The Imageid to detect", dest="image_id")
        self.subparser.add_argument("--save-data-path", nargs='?', type=str, required=False, const=None, default=None,  help="Specify a file path to save image diagnostic data report", dest="save_data_path")

    def process_options(self, parsed_args, remained_args, root_parser):
        import json
        from diagnostic.diagnostic_image_offline import ImageOfflineDiagnostic
        from exporter.oss import ImageOfflineDetectOSSExporter
        from ecsgo_error.disk import CheckPartitionTableError,MountRootPartitionError,RootFilesystemMountEntryNotFoundError,WindowsNotSupportError,IsoNotSupportError,LvmNotSupportError
        from ecsgohelper.kb.image import ISOImage, NoPartition, UnknownFileSystem, LvmNotSupport, UnsupportedOperatingSystem
        from ecsgo_utils.exception import UnsupportedOperatingSystemError
        disk_serial = parsed_args.disk_serial
        image_id = parsed_args.image_id
        uploadLogId = image_id if image_id is not None else disk_serial

        #for the invaild system image, we collect the ecsgo.log analysis
        ecsgoLogHandler = ImageOfflineDetectOSSExporter(uploadLogId)
        # try:
        #     diagnostic = ImageOfflineDiagnostic(disk_serial)
        # except Exception as err:
        #     print(err)
        #     sys.exit(1)
        diagnostic = ImageOfflineDiagnostic(disk_serial)
        try:
            ret, report, analysis = diagnostic.run()
            # print(report)
        except (CheckPartitionTableError):
            diagnostic.updateReport("GUESTOS.SystemImage", "Invalid", NoPartition())
            diagnostic.ImageCheckReport["status"] = 1
            print(json.dumps(diagnostic.ImageCheckReport))
            ecsgoLogHandler.export_ecsgo_log()
            sys.exit(0)
        except (MountRootPartitionError):
            diagnostic.updateReport("GUESTOS.SystemImage", "Invalid", UnknownFileSystem())
            diagnostic.ImageCheckReport["status"] = 1
            print(json.dumps(diagnostic.ImageCheckReport))
            ecsgoLogHandler.export_ecsgo_log()
            sys.exit(0)
        except (IsoNotSupportError):
            diagnostic.updateReport("GUESTOS.SystemImage", "Invalid", ISOImage())
            diagnostic.ImageCheckReport["status"] = 1
            print(json.dumps(diagnostic.ImageCheckReport))
            sys.exit(0)
        except (WindowsNotSupportError):
            diagnostic.updateReport("GUESTOS.Platform","windows")
            print(json.dumps(diagnostic.ImageCheckReport))
            sys.exit(0)
        except (LvmNotSupportError):
            diagnostic.update_report_lvm_no_support(LvmNotSupport())
            diagnostic.ImageCheckReport["status"] = 1
            print(json.dumps(diagnostic.ImageCheckReport))
            sys.exit(0)
        except (UnsupportedOperatingSystemError) as err:
            diagnostic.update_report_os_no_support(UnsupportedOperatingSystem(err.distro))
            diagnostic.ImageCheckReport["status"] = 1
            print(json.dumps(diagnostic.ImageCheckReport))
            sys.exit(0)


        if parsed_args.save_data_path:
            _data_dir = os.path.dirname(parsed_args.save_data_path)
            if not os.path.exists(_data_dir):
                os.makedirs(_data_dir)
            with open(parsed_args.save_data_path, "w") as outfile:
                 json.dump(report, outfile)
        #for the vaild system image, when detect failed
        #in addition ecsgo.log, we also collect report info to  analysis when run diagnosis failed
        reportHandler = ImageOfflineDetectOSSExporter(uploadLogId + "_report")
        try:
            diagnostic.run_analyzers_generate_human_json_report(report,analysis)
            print(json.dumps(diagnostic.ImageCheckReport))
            sys.exit(ret)
        except Exception:
            print("run image offline failed")
            logging.info("run image offline failed")
            logging.info(traceback.format_exc())
            ecsgoLogHandler.export_ecsgo_log()
            reportHandler.export_json_report(report)
            sys.exit(1)

class ImageOnlineDiagnosticSubcommand(object):
    NAME = "image-online-diagnostic"
    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Image Online diagnostic suite")
        self.subparser.add_argument("--json-format", action="store_true", default=False, required=False, help="Report in jsonformat", dest="json_format")
        self.subparser.add_argument("--save-data-path", nargs='?', type=str, required=False, const=None, default=None,  help="Specify a file path to save image diagnostic data report", dest="save_data_path")


    def process_options(self, parsed_args, remained_args, root_parser):
        json_format = parsed_args.json_format
        from diagnostic.diagnostic_image_online import ImageOnlineDiagnostic

        #for the invaild system image, we collect the ecsgo.log analysis
        #try:
        diagnostic = ImageOnlineDiagnostic()
        # except Exception as err:
        #     print(333,err)

        try:
            import json
            ret, report, analysis = diagnostic.run()
            if parsed_args.save_data_path:
                _data_dir = os.path.dirname(parsed_args.save_data_path)
                if not os.path.exists(_data_dir):
                    os.makedirs(_data_dir)
                with open(parsed_args.save_data_path, "w") as outfile:
                    json.dump(report, outfile)
            diagnostic.run_analyzers_generate_human_json_report(report,analysis)
            if json_format:
                print(json.dumps(diagnostic.ImageCheckReport))
            else:
                diagnostic.print_text_report()
        except Exception as err:
            print(err)

class NetworkOnlineDiagnosticSubcommand(object):
    NAME = "network-online-diagnostic"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Network-online diagnostic suite")
        self.subparser.add_argument("--direction", nargs='?', const=None,
            default=None, type=str, choices=["incoming", "outgoing"],
            required=False, help="Request direction relative to the machine performing diagnostic",
            dest="direction")
        self.subparser.add_argument("--source-ip", nargs='?', const=None,
            default=None, type=str, required=False,
            help="Source IPv4/6 address for network diagnostic",
            dest="source_ip")
        self.subparser.add_argument("--source-port", nargs='?', const=None,
            default=None, type=str, required=False,
            help="Source port for network diagnostic", dest="source_port")
        self.subparser.add_argument("--target-ip", nargs='?', const=None,
            default=None, type=str, required=False,
            help="Target IPv4/6 address for network diagnostic",
            dest="target_ip")
        self.subparser.add_argument("--target", nargs='?', const=None,
            default=None, type=str, required=False,
            help="Special diagnostic target of network, i.e., port number, or network service name supported",
            dest="target")
        self.subparser.add_argument("--protocol", nargs='?', const=None,
            default=None, type=str, required=False,
            help="Protocol for network diagnostic, i.e., tcp/udp",
            dest="protocol")

    def process_options(self, parsed_args, remained_args, root_parser):
        from diagnostic.diagnostic_network_online import NetworkOnlineDiagnostic

        diagnostic = NetworkOnlineDiagnostic(parsed_args.direction,
            parsed_args.source_ip, parsed_args.source_port,
            parsed_args.target_ip, parsed_args.target, parsed_args.protocol)
        ret, report = diagnostic.run()

        print(report)
        # Handle general options
        if hasattr(parsed_args, "oss_report_url") \
            and parsed_args.oss_report_url is not None \
            and parsed_args.oss_report_url != "":
            try:
                oss.upload_report(parsed_args.oss_report_url, report)
            except BaseException as ex:
                logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
                traceback.print_exc(None, sys.stderr)
                sys.exit(115)

        sys.exit(ret)


class PerformanceDiagnosticSubcommand(object):
    NAME = "performance-diagnostic"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Performance diagnostic suite")
        self.subparser.add_argument("--perf-impact", action="store_true", default=False, required=False,
                                    help="Also run cases might cause performance impact",
                                    dest="perf-impact")


    def process_options(self, parsed_args, remained_args, root_parser):
        from diagnostic.diagnostic_performance import PerformanceDiagnostic

        diagnostic = PerformanceDiagnostic()
        ret, report = diagnostic.run()

        print(report)
        # Handle general options
        if hasattr(parsed_args, "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
            try:
                oss.upload_report(parsed_args.oss_report_url, report)
            except BaseException as ex:
                logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
                traceback.print_exc(None, sys.stderr)
                sys.exit(115)

        sys.exit(ret)


class ApplicationDiagnosticSubcommand(object):
    NAME = "application-diagnostic"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Application diagnostic suite")

    def process_options(self, parsed_args, remained_args, root_parser):
        from diagnostic.diagnostic_application import ApplicationDiagnostic

        diagnostic = ApplicationDiagnostic()
        ret, report = diagnostic.run()

        print(report)
        # Handle general options
        if hasattr(parsed_args,
                   "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
            try:
                oss.upload_report(parsed_args.oss_report_url, report)
            except BaseException as ex:
                logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
                traceback.print_exc(None, sys.stderr)
                sys.exit(115)

        sys.exit(ret)


class PartialDiagnosticSubcommand(object):
    NAME = "partial-diagnostic"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="partial diagnostic suite on demand")
        self.subparser.add_argument("--config", required=True, help="Path of periodical checking configuration, - for reading from stdin")
        self.subparser.add_argument("--describe-analysis", action="store_true",
            default=False, required=False, help="Describe analysis items with rendered description text",
            dest="describe_analysis")
        self.subparser.add_argument("--associated-analysis", action="store_true",
            default=False, required=False, help="Also diagnose the analysis items related to the analysis in the configuration",
            dest="associated_analysis")

    def process_options(self, parsed_args, remained_args, root_parser):
        from diagnostic.diagnostic_partial import partial_diagnostic_main

        describe_analysis = parsed_args.describe_analysis
        associated_analysis = parsed_args.associated_analysis
        ret, report = partial_diagnostic_main(parsed_args.config,
                                              describe_analysis=describe_analysis,
                                              associated_analysis=associated_analysis)

        print(report)
        # Handle general options
        if hasattr(parsed_args, "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
            try:
                oss.upload_report(parsed_args.oss_report_url, report)
            except BaseException as ex:
                logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
                traceback.print_exc(None, sys.stderr)
                sys.exit(115)

        sys.exit(ret)


class DiskInfoSubcommand(object):
    NAME = "disk-info"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Show information of disk with specified serial number")
        self.subparser.add_argument("--disk-serial", nargs='?', const=None, default=None, type=str, required=False, help="Serial number of disk", dest="disk_serial")

    def process_options(self, parsed_args, remained_args, root_parser):
        from action.disk_info import DiskInfo

        disk_info = DiskInfo(getattr(parsed_args, "disk_serial", None))
        ret, report = disk_info.run()

        print(report)
        # Handle general options
        if hasattr(parsed_args, "oss_report_url") \
            and parsed_args.oss_report_url is not None \
            and parsed_args.oss_report_url != "":
            try:
                oss.upload_report(parsed_args.oss_report_url, report)
            except BaseException as ex:
                logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
                traceback.print_exc(None, sys.stderr)
                sys.exit(115)

        sys.exit(ret)


class PartitionDiskSubcommand(object):
    NAME = "partition-disk"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Partition new empty disk")
        self.subparser.add_argument("--precheck", action="store_true", default=False, help="Check preconditions without acutal operations", dest="precheck")
        self.subparser.add_argument("--disk-serial", type=str, required=True, help="Serial number of new empty disk", dest="disk_serial")
        self.subparser.add_argument("--partition-table-type", type=str, choices=("GPT",), required=True, help="Partition table type for disk: MBR or GPT", dest="partition_table_type")
        self.subparser.add_argument("--partition-count", type=int, required=True, help="Partition count to be created", dest="partition_count")

    def process_options(self, parsed_args, remained_args, root_parser):
        from action.disk_partition import PartitionDisk

        if parsed_args.partition_count > 0:
            for i in range(parsed_args.partition_count):
                self.subparser.add_argument("--partition.{0}.filesystem-label".format(i), default="", type=str, required=False, help="Filesystem label for partition {0}".format(i), dest="partition_{0}_filesystem_label".format(i))
                self.subparser.add_argument("--partition.{0}.size".format(i), type=str, required=True, help="Size for partition {0}".format(i), dest="partition_{0}_size".format(i))
                self.subparser.add_argument("--partition.{0}.filesystem-type".format(i), type=str, choices=("ext4",), required=False, help="Filesystem type for partition {0}".format(i), dest="partition_{0}_filesystem_type".format(i))
                self.subparser.add_argument("--partition.{0}.mountpoint".format(i), type=str, required=False, help="(Optional) Mount point for partition {0}".format(i), dest="partition_{0}_mountpoint".format(i))
            full_args = parser.parse_args()

            partition_params = self._collect_partition_args(full_args, full_args.partition_count)
            action = PartitionDisk(full_args.disk_serial, full_args.partition_table_type, partition_params, precheck=full_args.precheck)
            ret, report = action.run()
            print(report)
            sys.exit(ret)

    def _collect_partition_args(self, args, partition_count):
        from action.disk_partition import PartitionParameter

        SIZE_ARGUMENT_PATTERN = re.compile(r"^(\d+)(\w+)$")

        partition_params = list()
        for i in range(partition_count):
            size_str = getattr(args, "partition_{0}_size".format(i))
            filesystem_type = getattr(args, "partition_{0}_filesystem_type".format(i))
            mountpoint = getattr(args, "partition_{0}_mountpoint".format(i))
            filesystem_label = getattr(args, "partition_{0}_filesystem_label".format(i))

            size_match = SIZE_ARGUMENT_PATTERN.match(size_str)
            size = int(size_match.group(1))
            size_unit = size_match.group(2)

            size_unit = PartitionParameter.find_size_unit(size_unit)
            if filesystem_type is not None:
                filesystem_type = PartitionParameter.find_filesystem_type(filesystem_type)
            partition_params.append(PartitionParameter(size, size_unit, filesystem_type, mountpoint, filesystem_label))
        return partition_params


class MountSubcommand(object):
    NAME = "mount"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Mount device to specified point")
        self.subparser.add_argument("--precheck", action="store_true", default=False, required=False, help="Check preconditions without acutal operation", dest="precheck")
        self.subparser.add_argument("--device", type=str, required=True, help="Path of specified device to mount")
        self.subparser.add_argument("--mountpoint", type=str, required=True, help="Path of specified mountpoint for mounting")

    def process_options(self, parsed_args, remained_args, root_parser):
        from action.mount import Mount

        mounter = Mount(parsed_args.device, parsed_args.mountpoint, precheck=parsed_args.precheck)
        ret, report = mounter.run()

        print(report)
        sys.exit(ret)


class PeriodicCheckSubcommand(object):
    NAME = "periodic-check"

    def register_subparser(self, subparsers):
        self.subparser = subparsers.add_parser(self.NAME, help="Check system state periodically")
        self.subparser.add_argument("--immediate-report-url", nargs='?', const="", default="", type=str, required=False, help="Upload diagnostic report to OSS with specified URL", dest="immediate_report_url")
        self.subparser.add_argument("--config", required=False, help="Path of periodical checking configuration, - for reading from stdin")
        self.subparser.add_argument("--no-daemon", action="store_true", default=False, required=False, help="Run periodic checking directly at foreground", dest="no_daemon")
        self.subparser.add_argument("--oneline-report", action="store_true", default=False, required=False, help="Upload diagnosetic report in oneline json format", dest="oneline_report")

    def process_options(self, parsed_args, remained_args, root_parser):
        from diagnostic.periodic_check import PeriodicCheck

        exitcode = PeriodicCheck.main(getattr(parsed_args, "config", None),
            getattr(parsed_args, "no_daemon", None),
            getattr(parsed_args, "immediate_report_url", ""),
            getattr(parsed_args, "oneline_report", None))
        # Report printing is handled in periodic checker itself
        sys.exit(exitcode)


class InstallAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
        # type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
        super(InstallAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)

    def __call__(self, parser, namespace, values, option_string=None):
        # type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
        SystemdService.install()
        sys.exit(0)


class UninstallAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
        # type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
        super(UninstallAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)

    def __call__(self, parser, namespace, values, option_string=None):
        # type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
        SystemdService.uninstall()
        sys.exit(0)


class StartAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
        # type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
        super(StartAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)

    def __call__(self, parser, namespace, values, option_string=None):
        # type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
        SystemdService.start()
        sys.exit(0)


class StopAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
        # type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
        super(StopAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)

    def __call__(self, parser, namespace, values, option_string=None):
        # type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
        SystemdService.stop()
        sys.exit(0)


class UpgradeAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
        # type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
        super(UpgradeAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)

    def __call__(self, parser, namespace, values, option_string=None):
        # type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
        running_old_version = False
        if SystemdService.is_service_installed():
            if SystemdService.is_running():
                running_old_version = True
                SystemdService.stop()

            SystemdService.uninstall()

        SystemdService.install()
        if running_old_version:
            SystemdService.start()
        sys.exit(0)


class StatusAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
        # type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
        super(StatusAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)

    def __call__(self, parser, namespace, values, option_string=None):
        # type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
        if SystemdService.check_status() is True:
            sys.exit(0)
        else:
            sys.exit(1)

class RestartAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
        # type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
        super(RestartAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)

    def __call__(self, parser, namespace, values, option_string=None):
        # type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
        SystemdService.restart()
        sys.exit(0)

if __name__ == '__main__':
    # Standard logging library must be configured before any codes with potential
    # logging actions
    logger.default_configure()
    # Log the module search path `sys.path` for checking and debugging. See
    # issue #34699130 for problem details
    logging.info("Module search path: {0}".format(sys.path))
    # Log the environmental variable XTABLES_LIBDIR for checking and debugging.
    # See system_utils/iptables/iptcutil.py for problem details
    env_xtables_libdir = os.getenv("XTABLES_LIBDIR")
    if env_xtables_libdir is not None:
        if os.getenv("_ECSGO_DETECTED_XTABLES_LIBDIR") is not None:
            logging.info("Environment variable XTABLES_LIBDIR detected by ecsgo-helper: {0}".format(env_xtables_libdir))
        else:
            logging.info("Environment variable XTABLES_LIBDIR provided from user: {0}".format(env_xtables_libdir))

    subcommands = {c.NAME: c() for c in [BaselineDiagnosticSubcommand,
                                         EciBaselineDiagnosticSubcommand,
                                         OfflineDiagnosticSubcommand,
                                         ImageOfflineDiagnosticSubcommand,
                                         ImageOnlineDiagnosticSubcommand,
                                         NetworkOnlineDiagnosticSubcommand,
                                         PerformanceDiagnosticSubcommand,
                                         ApplicationDiagnosticSubcommand,
                                         PartialDiagnosticSubcommand,
                                         DiskInfoSubcommand,
                                         PartitionDiskSubcommand,
                                         MountSubcommand,
                                         PeriodicCheckSubcommand]}

    parser = argparse.ArgumentParser(description="ECSGO helper")
    parser.add_argument("-v", "--version", action='version', version='ECSGO Helper v0.1', help="ECSGO Version")
    parser.add_argument("--enable-console-logging", action="store_true", default=False, required=False, help="Also send logging messages to console", dest="enable_console_logging")
    parser.add_argument("--oss-report-url", type=str, required=False, help="Upload diagnostic report to OSS with specified URL", dest="oss_report_url")
    service_group = parser.add_mutually_exclusive_group()
    service_group.add_argument("--install", action=InstallAction, nargs=0, default=False, required=False, help="Install ecsgo-helper service for watching system state")
    service_group.add_argument("--uninstall", action=UninstallAction, nargs=0, default=False, required=False, help="Uninstall ecsgo-helper service for watching system state")
    service_group.add_argument("--start", action=StartAction, nargs=0, default=False, required=False, help="Start ecsgo-helper service for watching system state")
    service_group.add_argument("--stop", action=StopAction, nargs=0, default=False, required=False, help="Stop running ecsgo-helper service for watching system state")
    service_group.add_argument("--upgrade", action=UpgradeAction, nargs=0, default=False, required=False, help="Upgrade ecsgo-helper service for watching system state")
    service_group.add_argument("--status", action=StatusAction, nargs=0, default=False, required=False, help="Check status of ecsgo-helper service for watching system state")
    service_group.add_argument("--restart", action=RestartAction, nargs=0, default=False, required=False, help="Restart ecsgo-helper service for watching system state")

    subparsers = parser.add_subparsers(title="Subcommands", dest="subcommand")
    for _, c in subcommands.items():
        c.register_subparser(subparsers)

    args, remained_args = parser.parse_known_args()
    if hasattr(args, "enable_console_logging") and \
        args.enable_console_logging is not None and args.enable_console_logging == True:
        logger.configure_console_logging()

    if args.subcommand in subcommands:
        subcommands[args.subcommand].process_options(args, remained_args, parser)
    else:
        parser.print_usage()