File: //usr/local/share/aliyun-assist/plugin/ecsgo-helper/0.48/src/main.py
# -*- coding: utf-8 -*-
# The module search path `sys.path` MUST be modified carefully to put the
# bundled package directory just after the directory containing the entry script
# but before ANY other system-wide or even user-specified Python module search
# paths. Thus keep the bundled packages are ALWAYS imported correctly for stable
# behavior on various GuestOS environment. See issue #34699130 for problem
# details.
import os
import os.path
import sys
if (sys.version_info.major == 2 and sys.version_info.minor == 7) \
or (sys.version_info.major == 3 and 6 <= sys.version_info.minor <= 14):
source_dir = os.path.dirname(__file__)
bundle_dir = os.path.abspath(os.path.join(source_dir, "../lib"))
triplet_dir = os.path.join(bundle_dir, "{0}-linux-gnu".format(os.uname()[4]))
sys.path.insert(1, os.path.join(triplet_dir, "python{0}.{1}/site-packages".format(sys.version_info.major, sys.version_info.minor)))
sys.path.insert(1, os.path.join(triplet_dir, "python{0}/site-packages".format(sys.version_info.major)))
sys.path.insert(1, os.path.join(bundle_dir, "python/site-packages"))
else:
print("""{
"version": "1.0",
"status": 125,
"errors": [{
"code": "UnsupportedPythonVersion",
"message": "Unsupported Python version: %s"
}],
"metadata": {},
"analysis": [],
"data": {}
}
""" % sys.version)
sys.exit(125)
import argparse
import logging
import os
import re
import traceback
from ecsgo_utils import logger
from ecsgo_utils import oss
from ecsgo_utils.service import SystemdService
class BaselineDiagnosticSubcommand(object):
NAME = "baseline-diagnostic"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Baseline diagnostic suite")
self.subparser.add_argument("--since", nargs='?', const=None,
default=None, type=str, required=False,
help="Diagnose log messages not older than the specified UTC datetime in format \"yyyy-MM-dd HH:mm:ss\"",
dest="since")
self.subparser.add_argument("--until", nargs='?', const=None,
default=None, type=str, required=False,
help="Diagnose log messages not newer than the specified UTC datetime in format \"yyyy-MM-dd HH:mm:ss\"",
dest="until")
self.subparser.add_argument("--describe-analysis", action="store_true",
default=False, required=False, help="Describe analysis items with rendered description text",
dest="describe_analysis")
def process_options(self, parsed_args, remained_args, root_parser):
from diagnostic.diagnostic_baseline import BaselineDiagnostic
since = parsed_args.since
until = parsed_args.until
describe_analysis = parsed_args.describe_analysis
diagnostic = BaselineDiagnostic(since=since, until=until,
describe_analysis=describe_analysis)
ret, report = diagnostic.run()
print(report)
# Handle general options
if hasattr(parsed_args, "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
try:
oss.upload_report(parsed_args.oss_report_url, report)
except BaseException as ex:
logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
traceback.print_exc(None, sys.stderr)
sys.exit(115)
sys.exit(ret)
class EciBaselineDiagnosticSubcommand(object):
NAME = "eci-baseline-diagnostic"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="ECI baseline diagnostic suite")
self.subparser.add_argument("--since", nargs='?', const=None,
default=None, type=str, required=False,
help="Diagnose log messages not older than the specified UTC datetime in format \"yyyy-MM-dd HH:mm:ss\"",
dest="since")
self.subparser.add_argument("--until", nargs='?', const=None,
default=None, type=str, required=False,
help="Diagnose log messages not newer than the specified UTC datetime in format \"yyyy-MM-dd HH:mm:ss\"",
dest="until")
self.subparser.add_argument("--no-analysis", action="store_true",
default=False, required=False, help="Do not perform analysis but just gather data",
dest="no_analysis")
self.subparser.add_argument("--describe-analysis", action="store_true",
default=False, required=False, help="Describe analysis items with rendered description text",
dest="describe_analysis")
def process_options(self, parsed_args, remained_args, root_parser):
from diagnostic.diagnostic_eci_baseline import EciBaselineDiagnostic
since = parsed_args.since
until = parsed_args.until
describe_analysis = parsed_args.describe_analysis
no_analysis = parsed_args.no_analysis
diagnostic = EciBaselineDiagnostic(since=since, until=until,
describe_analysis=describe_analysis, no_analysis=no_analysis)
ret, report = diagnostic.run()
print(report)
# Handle general options
if hasattr(parsed_args, "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
try:
oss.upload_report(parsed_args.oss_report_url, report)
except BaseException as ex:
logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
traceback.print_exc(None, sys.stderr)
sys.exit(115)
sys.exit(ret)
class OfflineDiagnosticSubcommand(object):
NAME = "offline-diagnostic"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Offline diagnostic suite")
self.subparser.add_argument("--disk-serial", type=str, required=True,
help="Serial number of mounted sysdisk from other instance",
dest="disk_serial")
self.subparser.add_argument("--describe-analysis", action="store_true",
default=False, required=False, help="Describe analysis items with rendered description text",
dest="describe_analysis")
def process_options(self, parsed_args, remained_args, root_parser):
from diagnostic.diagnostic_offline import OfflineDiagnostic
disk_serial = parsed_args.disk_serial
describe_analysis = parsed_args.describe_analysis
diagnostic = OfflineDiagnostic(disk_serial, describe_analysis=describe_analysis)
ret, report = diagnostic.run()
print(report)
# Handle general options
if hasattr(parsed_args, "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
try:
oss.upload_report(parsed_args.oss_report_url, report)
except BaseException as ex:
logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
traceback.print_exc(None, sys.stderr)
sys.exit(115)
sys.exit(ret)
class ImageOfflineDiagnosticSubcommand(object):
NAME = "image-offline-diagnostic"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Image Offline diagnostic suite")
self.subparser.add_argument("--disk-serial", type=str, required=True, help="Serial number of mounted sysdisk from other instance", dest="disk_serial")
self.subparser.add_argument("--image-id", nargs='?', type=str, required=False, help="The Imageid to detect", dest="image_id")
self.subparser.add_argument("--save-data-path", nargs='?', type=str, required=False, const=None, default=None, help="Specify a file path to save image diagnostic data report", dest="save_data_path")
def process_options(self, parsed_args, remained_args, root_parser):
import json
from diagnostic.diagnostic_image_offline import ImageOfflineDiagnostic
from exporter.oss import ImageOfflineDetectOSSExporter
from ecsgo_error.disk import CheckPartitionTableError,MountRootPartitionError,RootFilesystemMountEntryNotFoundError,WindowsNotSupportError,IsoNotSupportError,LvmNotSupportError
from ecsgohelper.kb.image import ISOImage, NoPartition, UnknownFileSystem, LvmNotSupport, UnsupportedOperatingSystem
from ecsgo_utils.exception import UnsupportedOperatingSystemError
disk_serial = parsed_args.disk_serial
image_id = parsed_args.image_id
uploadLogId = image_id if image_id is not None else disk_serial
#for the invaild system image, we collect the ecsgo.log analysis
ecsgoLogHandler = ImageOfflineDetectOSSExporter(uploadLogId)
# try:
# diagnostic = ImageOfflineDiagnostic(disk_serial)
# except Exception as err:
# print(err)
# sys.exit(1)
diagnostic = ImageOfflineDiagnostic(disk_serial)
try:
ret, report, analysis = diagnostic.run()
# print(report)
except (CheckPartitionTableError):
diagnostic.updateReport("GUESTOS.SystemImage", "Invalid", NoPartition())
diagnostic.ImageCheckReport["status"] = 1
print(json.dumps(diagnostic.ImageCheckReport))
ecsgoLogHandler.export_ecsgo_log()
sys.exit(0)
except (MountRootPartitionError):
diagnostic.updateReport("GUESTOS.SystemImage", "Invalid", UnknownFileSystem())
diagnostic.ImageCheckReport["status"] = 1
print(json.dumps(diagnostic.ImageCheckReport))
ecsgoLogHandler.export_ecsgo_log()
sys.exit(0)
except (IsoNotSupportError):
diagnostic.updateReport("GUESTOS.SystemImage", "Invalid", ISOImage())
diagnostic.ImageCheckReport["status"] = 1
print(json.dumps(diagnostic.ImageCheckReport))
sys.exit(0)
except (WindowsNotSupportError):
diagnostic.updateReport("GUESTOS.Platform","windows")
print(json.dumps(diagnostic.ImageCheckReport))
sys.exit(0)
except (LvmNotSupportError):
diagnostic.update_report_lvm_no_support(LvmNotSupport())
diagnostic.ImageCheckReport["status"] = 1
print(json.dumps(diagnostic.ImageCheckReport))
sys.exit(0)
except (UnsupportedOperatingSystemError) as err:
diagnostic.update_report_os_no_support(UnsupportedOperatingSystem(err.distro))
diagnostic.ImageCheckReport["status"] = 1
print(json.dumps(diagnostic.ImageCheckReport))
sys.exit(0)
if parsed_args.save_data_path:
_data_dir = os.path.dirname(parsed_args.save_data_path)
if not os.path.exists(_data_dir):
os.makedirs(_data_dir)
with open(parsed_args.save_data_path, "w") as outfile:
json.dump(report, outfile)
#for the vaild system image, when detect failed
#in addition ecsgo.log, we also collect report info to analysis when run diagnosis failed
reportHandler = ImageOfflineDetectOSSExporter(uploadLogId + "_report")
try:
diagnostic.run_analyzers_generate_human_json_report(report,analysis)
print(json.dumps(diagnostic.ImageCheckReport))
sys.exit(ret)
except Exception:
print("run image offline failed")
logging.info("run image offline failed")
logging.info(traceback.format_exc())
ecsgoLogHandler.export_ecsgo_log()
reportHandler.export_json_report(report)
sys.exit(1)
class ImageOnlineDiagnosticSubcommand(object):
NAME = "image-online-diagnostic"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Image Online diagnostic suite")
self.subparser.add_argument("--json-format", action="store_true", default=False, required=False, help="Report in jsonformat", dest="json_format")
self.subparser.add_argument("--save-data-path", nargs='?', type=str, required=False, const=None, default=None, help="Specify a file path to save image diagnostic data report", dest="save_data_path")
def process_options(self, parsed_args, remained_args, root_parser):
json_format = parsed_args.json_format
from diagnostic.diagnostic_image_online import ImageOnlineDiagnostic
#for the invaild system image, we collect the ecsgo.log analysis
#try:
diagnostic = ImageOnlineDiagnostic()
# except Exception as err:
# print(333,err)
try:
import json
ret, report, analysis = diagnostic.run()
if parsed_args.save_data_path:
_data_dir = os.path.dirname(parsed_args.save_data_path)
if not os.path.exists(_data_dir):
os.makedirs(_data_dir)
with open(parsed_args.save_data_path, "w") as outfile:
json.dump(report, outfile)
diagnostic.run_analyzers_generate_human_json_report(report,analysis)
if json_format:
print(json.dumps(diagnostic.ImageCheckReport))
else:
diagnostic.print_text_report()
except Exception as err:
print(err)
class NetworkOnlineDiagnosticSubcommand(object):
NAME = "network-online-diagnostic"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Network-online diagnostic suite")
self.subparser.add_argument("--direction", nargs='?', const=None,
default=None, type=str, choices=["incoming", "outgoing"],
required=False, help="Request direction relative to the machine performing diagnostic",
dest="direction")
self.subparser.add_argument("--source-ip", nargs='?', const=None,
default=None, type=str, required=False,
help="Source IPv4/6 address for network diagnostic",
dest="source_ip")
self.subparser.add_argument("--source-port", nargs='?', const=None,
default=None, type=str, required=False,
help="Source port for network diagnostic", dest="source_port")
self.subparser.add_argument("--target-ip", nargs='?', const=None,
default=None, type=str, required=False,
help="Target IPv4/6 address for network diagnostic",
dest="target_ip")
self.subparser.add_argument("--target", nargs='?', const=None,
default=None, type=str, required=False,
help="Special diagnostic target of network, i.e., port number, or network service name supported",
dest="target")
self.subparser.add_argument("--protocol", nargs='?', const=None,
default=None, type=str, required=False,
help="Protocol for network diagnostic, i.e., tcp/udp",
dest="protocol")
def process_options(self, parsed_args, remained_args, root_parser):
from diagnostic.diagnostic_network_online import NetworkOnlineDiagnostic
diagnostic = NetworkOnlineDiagnostic(parsed_args.direction,
parsed_args.source_ip, parsed_args.source_port,
parsed_args.target_ip, parsed_args.target, parsed_args.protocol)
ret, report = diagnostic.run()
print(report)
# Handle general options
if hasattr(parsed_args, "oss_report_url") \
and parsed_args.oss_report_url is not None \
and parsed_args.oss_report_url != "":
try:
oss.upload_report(parsed_args.oss_report_url, report)
except BaseException as ex:
logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
traceback.print_exc(None, sys.stderr)
sys.exit(115)
sys.exit(ret)
class PerformanceDiagnosticSubcommand(object):
NAME = "performance-diagnostic"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Performance diagnostic suite")
self.subparser.add_argument("--perf-impact", action="store_true", default=False, required=False,
help="Also run cases might cause performance impact",
dest="perf-impact")
def process_options(self, parsed_args, remained_args, root_parser):
from diagnostic.diagnostic_performance import PerformanceDiagnostic
diagnostic = PerformanceDiagnostic()
ret, report = diagnostic.run()
print(report)
# Handle general options
if hasattr(parsed_args, "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
try:
oss.upload_report(parsed_args.oss_report_url, report)
except BaseException as ex:
logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
traceback.print_exc(None, sys.stderr)
sys.exit(115)
sys.exit(ret)
class ApplicationDiagnosticSubcommand(object):
NAME = "application-diagnostic"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Application diagnostic suite")
def process_options(self, parsed_args, remained_args, root_parser):
from diagnostic.diagnostic_application import ApplicationDiagnostic
diagnostic = ApplicationDiagnostic()
ret, report = diagnostic.run()
print(report)
# Handle general options
if hasattr(parsed_args,
"oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
try:
oss.upload_report(parsed_args.oss_report_url, report)
except BaseException as ex:
logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
traceback.print_exc(None, sys.stderr)
sys.exit(115)
sys.exit(ret)
class PartialDiagnosticSubcommand(object):
NAME = "partial-diagnostic"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="partial diagnostic suite on demand")
self.subparser.add_argument("--config", required=True, help="Path of periodical checking configuration, - for reading from stdin")
self.subparser.add_argument("--describe-analysis", action="store_true",
default=False, required=False, help="Describe analysis items with rendered description text",
dest="describe_analysis")
self.subparser.add_argument("--associated-analysis", action="store_true",
default=False, required=False, help="Also diagnose the analysis items related to the analysis in the configuration",
dest="associated_analysis")
def process_options(self, parsed_args, remained_args, root_parser):
from diagnostic.diagnostic_partial import partial_diagnostic_main
describe_analysis = parsed_args.describe_analysis
associated_analysis = parsed_args.associated_analysis
ret, report = partial_diagnostic_main(parsed_args.config,
describe_analysis=describe_analysis,
associated_analysis=associated_analysis)
print(report)
# Handle general options
if hasattr(parsed_args, "oss_report_url") and parsed_args.oss_report_url is not None and parsed_args.oss_report_url != "":
try:
oss.upload_report(parsed_args.oss_report_url, report)
except BaseException as ex:
logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
traceback.print_exc(None, sys.stderr)
sys.exit(115)
sys.exit(ret)
class DiskInfoSubcommand(object):
NAME = "disk-info"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Show information of disk with specified serial number")
self.subparser.add_argument("--disk-serial", nargs='?', const=None, default=None, type=str, required=False, help="Serial number of disk", dest="disk_serial")
def process_options(self, parsed_args, remained_args, root_parser):
from action.disk_info import DiskInfo
disk_info = DiskInfo(getattr(parsed_args, "disk_serial", None))
ret, report = disk_info.run()
print(report)
# Handle general options
if hasattr(parsed_args, "oss_report_url") \
and parsed_args.oss_report_url is not None \
and parsed_args.oss_report_url != "":
try:
oss.upload_report(parsed_args.oss_report_url, report)
except BaseException as ex:
logging.critical("Failed to upload diagnostic report to %s", parsed_args.oss_report_url, exc_info=True)
traceback.print_exc(None, sys.stderr)
sys.exit(115)
sys.exit(ret)
class PartitionDiskSubcommand(object):
NAME = "partition-disk"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Partition new empty disk")
self.subparser.add_argument("--precheck", action="store_true", default=False, help="Check preconditions without acutal operations", dest="precheck")
self.subparser.add_argument("--disk-serial", type=str, required=True, help="Serial number of new empty disk", dest="disk_serial")
self.subparser.add_argument("--partition-table-type", type=str, choices=("GPT",), required=True, help="Partition table type for disk: MBR or GPT", dest="partition_table_type")
self.subparser.add_argument("--partition-count", type=int, required=True, help="Partition count to be created", dest="partition_count")
def process_options(self, parsed_args, remained_args, root_parser):
from action.disk_partition import PartitionDisk
if parsed_args.partition_count > 0:
for i in range(parsed_args.partition_count):
self.subparser.add_argument("--partition.{0}.filesystem-label".format(i), default="", type=str, required=False, help="Filesystem label for partition {0}".format(i), dest="partition_{0}_filesystem_label".format(i))
self.subparser.add_argument("--partition.{0}.size".format(i), type=str, required=True, help="Size for partition {0}".format(i), dest="partition_{0}_size".format(i))
self.subparser.add_argument("--partition.{0}.filesystem-type".format(i), type=str, choices=("ext4",), required=False, help="Filesystem type for partition {0}".format(i), dest="partition_{0}_filesystem_type".format(i))
self.subparser.add_argument("--partition.{0}.mountpoint".format(i), type=str, required=False, help="(Optional) Mount point for partition {0}".format(i), dest="partition_{0}_mountpoint".format(i))
full_args = parser.parse_args()
partition_params = self._collect_partition_args(full_args, full_args.partition_count)
action = PartitionDisk(full_args.disk_serial, full_args.partition_table_type, partition_params, precheck=full_args.precheck)
ret, report = action.run()
print(report)
sys.exit(ret)
def _collect_partition_args(self, args, partition_count):
from action.disk_partition import PartitionParameter
SIZE_ARGUMENT_PATTERN = re.compile(r"^(\d+)(\w+)$")
partition_params = list()
for i in range(partition_count):
size_str = getattr(args, "partition_{0}_size".format(i))
filesystem_type = getattr(args, "partition_{0}_filesystem_type".format(i))
mountpoint = getattr(args, "partition_{0}_mountpoint".format(i))
filesystem_label = getattr(args, "partition_{0}_filesystem_label".format(i))
size_match = SIZE_ARGUMENT_PATTERN.match(size_str)
size = int(size_match.group(1))
size_unit = size_match.group(2)
size_unit = PartitionParameter.find_size_unit(size_unit)
if filesystem_type is not None:
filesystem_type = PartitionParameter.find_filesystem_type(filesystem_type)
partition_params.append(PartitionParameter(size, size_unit, filesystem_type, mountpoint, filesystem_label))
return partition_params
class MountSubcommand(object):
NAME = "mount"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Mount device to specified point")
self.subparser.add_argument("--precheck", action="store_true", default=False, required=False, help="Check preconditions without acutal operation", dest="precheck")
self.subparser.add_argument("--device", type=str, required=True, help="Path of specified device to mount")
self.subparser.add_argument("--mountpoint", type=str, required=True, help="Path of specified mountpoint for mounting")
def process_options(self, parsed_args, remained_args, root_parser):
from action.mount import Mount
mounter = Mount(parsed_args.device, parsed_args.mountpoint, precheck=parsed_args.precheck)
ret, report = mounter.run()
print(report)
sys.exit(ret)
class PeriodicCheckSubcommand(object):
NAME = "periodic-check"
def register_subparser(self, subparsers):
self.subparser = subparsers.add_parser(self.NAME, help="Check system state periodically")
self.subparser.add_argument("--immediate-report-url", nargs='?', const="", default="", type=str, required=False, help="Upload diagnostic report to OSS with specified URL", dest="immediate_report_url")
self.subparser.add_argument("--config", required=False, help="Path of periodical checking configuration, - for reading from stdin")
self.subparser.add_argument("--no-daemon", action="store_true", default=False, required=False, help="Run periodic checking directly at foreground", dest="no_daemon")
self.subparser.add_argument("--oneline-report", action="store_true", default=False, required=False, help="Upload diagnosetic report in oneline json format", dest="oneline_report")
def process_options(self, parsed_args, remained_args, root_parser):
from diagnostic.periodic_check import PeriodicCheck
exitcode = PeriodicCheck.main(getattr(parsed_args, "config", None),
getattr(parsed_args, "no_daemon", None),
getattr(parsed_args, "immediate_report_url", ""),
getattr(parsed_args, "oneline_report", None))
# Report printing is handled in periodic checker itself
sys.exit(exitcode)
class InstallAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
# type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
super(InstallAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)
def __call__(self, parser, namespace, values, option_string=None):
# type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
SystemdService.install()
sys.exit(0)
class UninstallAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
# type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
super(UninstallAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)
def __call__(self, parser, namespace, values, option_string=None):
# type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
SystemdService.uninstall()
sys.exit(0)
class StartAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
# type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
super(StartAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)
def __call__(self, parser, namespace, values, option_string=None):
# type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
SystemdService.start()
sys.exit(0)
class StopAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
# type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
super(StopAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)
def __call__(self, parser, namespace, values, option_string=None):
# type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
SystemdService.stop()
sys.exit(0)
class UpgradeAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
# type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
super(UpgradeAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)
def __call__(self, parser, namespace, values, option_string=None):
# type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
running_old_version = False
if SystemdService.is_service_installed():
if SystemdService.is_running():
running_old_version = True
SystemdService.stop()
SystemdService.uninstall()
SystemdService.install()
if running_old_version:
SystemdService.start()
sys.exit(0)
class StatusAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
# type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
super(StatusAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)
def __call__(self, parser, namespace, values, option_string=None):
# type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
if SystemdService.check_status() is True:
sys.exit(0)
else:
sys.exit(1)
class RestartAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=None, help=None, metavar=None):
# type: (Sequence[str], str, Union[int, str, None], Optional[Any], Union[Any, str, None], Union[Callable[[str], Any], FileType, None], Optional[Iterable[Any]], Optional[bool], Optional[str], Union[str, tuple[str, Any], None]) -> None
super(RestartAction, self).__init__(option_strings, dest, nargs, const, default, type, choices, required, help, metavar)
def __call__(self, parser, namespace, values, option_string=None):
# type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
SystemdService.restart()
sys.exit(0)
if __name__ == '__main__':
# Standard logging library must be configured before any codes with potential
# logging actions
logger.default_configure()
# Log the module search path `sys.path` for checking and debugging. See
# issue #34699130 for problem details
logging.info("Module search path: {0}".format(sys.path))
# Log the environmental variable XTABLES_LIBDIR for checking and debugging.
# See system_utils/iptables/iptcutil.py for problem details
env_xtables_libdir = os.getenv("XTABLES_LIBDIR")
if env_xtables_libdir is not None:
if os.getenv("_ECSGO_DETECTED_XTABLES_LIBDIR") is not None:
logging.info("Environment variable XTABLES_LIBDIR detected by ecsgo-helper: {0}".format(env_xtables_libdir))
else:
logging.info("Environment variable XTABLES_LIBDIR provided from user: {0}".format(env_xtables_libdir))
subcommands = {c.NAME: c() for c in [BaselineDiagnosticSubcommand,
EciBaselineDiagnosticSubcommand,
OfflineDiagnosticSubcommand,
ImageOfflineDiagnosticSubcommand,
ImageOnlineDiagnosticSubcommand,
NetworkOnlineDiagnosticSubcommand,
PerformanceDiagnosticSubcommand,
ApplicationDiagnosticSubcommand,
PartialDiagnosticSubcommand,
DiskInfoSubcommand,
PartitionDiskSubcommand,
MountSubcommand,
PeriodicCheckSubcommand]}
parser = argparse.ArgumentParser(description="ECSGO helper")
parser.add_argument("-v", "--version", action='version', version='ECSGO Helper v0.1', help="ECSGO Version")
parser.add_argument("--enable-console-logging", action="store_true", default=False, required=False, help="Also send logging messages to console", dest="enable_console_logging")
parser.add_argument("--oss-report-url", type=str, required=False, help="Upload diagnostic report to OSS with specified URL", dest="oss_report_url")
service_group = parser.add_mutually_exclusive_group()
service_group.add_argument("--install", action=InstallAction, nargs=0, default=False, required=False, help="Install ecsgo-helper service for watching system state")
service_group.add_argument("--uninstall", action=UninstallAction, nargs=0, default=False, required=False, help="Uninstall ecsgo-helper service for watching system state")
service_group.add_argument("--start", action=StartAction, nargs=0, default=False, required=False, help="Start ecsgo-helper service for watching system state")
service_group.add_argument("--stop", action=StopAction, nargs=0, default=False, required=False, help="Stop running ecsgo-helper service for watching system state")
service_group.add_argument("--upgrade", action=UpgradeAction, nargs=0, default=False, required=False, help="Upgrade ecsgo-helper service for watching system state")
service_group.add_argument("--status", action=StatusAction, nargs=0, default=False, required=False, help="Check status of ecsgo-helper service for watching system state")
service_group.add_argument("--restart", action=RestartAction, nargs=0, default=False, required=False, help="Restart ecsgo-helper service for watching system state")
subparsers = parser.add_subparsers(title="Subcommands", dest="subcommand")
for _, c in subcommands.items():
c.register_subparser(subparsers)
args, remained_args = parser.parse_known_args()
if hasattr(args, "enable_console_logging") and \
args.enable_console_logging is not None and args.enable_console_logging == True:
logger.configure_console_logging()
if args.subcommand in subcommands:
subcommands[args.subcommand].process_options(args, remained_args, parser)
else:
parser.print_usage()