ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/rpc_tools
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
rpc_tools
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
exceptions.py
403 B
chmod
View
DL
Edit
Rename
Delete
lookup.py
4.39 MB
chmod
View
DL
Edit
Rename
Delete
middleware.py
5.8 MB
chmod
View
DL
Edit
Rename
Delete
utils.py
4.25 MB
chmod
View
DL
Edit
Rename
Delete
validate.py
6.54 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
694 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/rpc_tools/validate.py
import datetime import logging import math import os import re from collections import namedtuple from functools import wraps from cerberus.validator import Validator from defence360agent.contracts.config import ( ANTIVIRUS_MODE, BackupRestore, Malware, ) from defence360agent.contracts.license import LicenseCLN from defence360agent.subsys.backup_systems import BackupSystem, get_backend logger = logging.getLogger(__name__) SHA256_REGEXP = re.compile("^[A-Fa-f0-9]{64}$") class ValidationError(Exception): def __init__(self, errors, extra_data=None): if isinstance(errors, str): self.errors = [errors] else: self.errors = errors self.extra_data = extra_data or {} OrderByBase = namedtuple("OrderByBase", ["column_name", "desc"]) class OrderBy(OrderByBase): def __new__(cls, column_name, desc): return super().__new__(cls, column_name, desc) @classmethod def fromstring(cls, ob_string): """ :param ob_string: for example: 'user+', 'id-' :return: """ try: col_name, sign = re.compile("^(.+)([+|-])").split(ob_string)[1:-1] return cls(col_name, sign == "-") except ValueError as e: raise ValueError( "Incorrect order_by: ({}): {}".format(str(e), ob_string) ) class SchemaValidator(Validator): _DATE_FORMAT = "%Y-%m-%d" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.extra_data = {} def _normalize_coerce_order_by(self, value): if isinstance(value, OrderBy): return value return OrderBy.fromstring(value) def _normalize_coerce_sha256hash(self, value): return str(value).strip().lower() def _normalize_coerce_scan_db(self, value): if ANTIVIRUS_MODE: return False if value is None: return Malware.DATABASE_SCAN_ENABLED return value def _validate_type_order_by(self, value): if isinstance(value, OrderBy): return True return False def _validate_type_sha256hash(self, value: str): return SHA256_REGEXP.match(str(value).strip()) def _validate_is_absolute_path(self, is_absolute_path, field, value): """{'type': 'boolean', 'empty': False}""" if is_absolute_path: if not os.path.isabs(value): self._error(field, "Path {} should be absolute".format(value)) def _validate_isascii(self, isascii, field, value): """{'type': 'boolean'}""" if isascii: try: value.encode("ascii") except UnicodeEncodeError: self._error(field, "Must only contain ascii symbols") def _normalize_coerce_int(self, value): return int(value) def _normalize_default_setter_now(self, document) -> int: return math.ceil(datetime.datetime.now().timestamp()) # for argparser support def _validate_cli(self, *args, **kwargs): """{'type': 'dict', 'empty': False, 'schema': { 'users': {'type': 'list', 'allowed': ['non-root', 'root'], 'empty': False}, 'require_rpc': {'type': 'string', 'empty': True, 'default': 'running', 'allowed': ['running', 'stopped', 'any', 'direct']} }} """ # for argparser support def _validate_help(self, *args, **kwargs): """{'type': 'string', 'empty': False}""" # for argparser support def _validate_positional(self, *args, **kwargs): """{'type': 'boolean', 'empty': True, 'default': False}""" # metadata for response validation def _validate_return_type(self, *args, **kwargs): """{'type': 'string', 'empty': True}""" def _validate_cli_only(self, *args, **kwargs): """{'type': 'boolean', 'empty': False, 'default': False}""" def _validate_envvar(self, *args, **kwargs): """ Parameter can be passed via the specified environment variable. The value specified via a CLI argument takes precedence. The rule's arguments are validated against this schema: {'type': 'string', 'empty': False} """ def _validate_envvar_only(self, *args, **kwargs): """ Parameter will only be accepted if provided via environment variable specified by `envvar`. It will be rejected if passed as a CLI argument. The rule's arguments are validated against this schema: {'type': 'boolean', 'default': False} """ def _normalize_coerce_path(self, value: str): if value: return os.path.abspath(value) return value def _normalize_coerce_backup_system(self, value): if isinstance(value, BackupSystem): return value return get_backend(value) def _validator_backup_is_enabled(self, field, value): if not (BackupRestore.ENABLED and BackupRestore.backup_system()): self._error(field, "Backup is not enabled!") def _validator_timestamp(self, field: str, value: int) -> None: try: datetime.datetime.fromtimestamp(value) except ValueError as e: self._error(field, f"Incorrect timestamp: {value} ({str(e)})") def validate(validator, hashable, params): values = validator.normalized( {hashable: params}, always_return_document=True ) if not validator.validate({hashable: values[hashable]}): logger.warning( "Validation error with command {}, params {}, errors {}".format( hashable, params, validator.errors ) ) raise ValidationError(validator.errors, validator.extra_data) return validator.document[hashable] def validate_middleware(validator): def wrapped(f): @wraps(f) async def wrapper(request, *args, **kwargs): hashable = tuple(request["command"]) request["params"] = validate( validator, hashable, request["params"] ) result = await f(request, *args, **kwargs) return result return wrapper return wrapped def validate_av_plus_license(func): """ Decorator for CLI commands methods that ensures that the AV+ license is valid. :raises ValidationError: """ exception = ValidationError("ImunifyAV+ license required") @wraps(func) async def async_wrapper(*args, **kwargs): if LicenseCLN.is_valid_av_plus(): return await func(*args, **kwargs) raise exception if ANTIVIRUS_MODE: return async_wrapper return func
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply