ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
api
-
chmod
Open
Rename
Delete
application
-
chmod
Open
Rename
Delete
contracts
-
chmod
Open
Rename
Delete
feature_management
-
chmod
Open
Rename
Delete
files
-
chmod
Open
Rename
Delete
hooks
-
chmod
Open
Rename
Delete
internals
-
chmod
Open
Rename
Delete
migrations
-
chmod
Open
Rename
Delete
model
-
chmod
Open
Rename
Delete
mr_proper
-
chmod
Open
Rename
Delete
myimunify
-
chmod
Open
Rename
Delete
plugins
-
chmod
Open
Rename
Delete
rpc_tools
-
chmod
Open
Rename
Delete
simple_rpc
-
chmod
Open
Rename
Delete
subsys
-
chmod
Open
Rename
Delete
utils
-
chmod
Open
Rename
Delete
wordpress
-
chmod
Open
Rename
Delete
__pycache__
-
chmod
Open
Rename
Delete
defence360.py
3.73 MB
chmod
View
DL
Edit
Rename
Delete
migrate.py
6.01 MB
chmod
View
DL
Edit
Rename
Delete
router.py
1.65 MB
chmod
View
DL
Edit
Rename
Delete
run.py
109 B
chmod
View
DL
Edit
Rename
Delete
sentry.py
5.01 MB
chmod
View
DL
Edit
Rename
Delete
_version.py
82 B
chmod
View
DL
Edit
Rename
Delete
__init__.py
0 B
chmod
View
DL
Edit
Rename
Delete
__main__.py
43 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/migrate.py
#!/opt/imunify360/venv/bin/python3 """This module import peewee_migrate and apply migrations, for Imunify-AV it's entrypoint for service""" import contextlib import os import sys import signal import threading import time from collections.abc import Iterable from logging import getLogger from peewee_migrate import migrator from playhouse.sqlite_ext import SqliteExtDatabase import defence360agent.internals.logger from defence360agent.application import app from defence360agent.application.settings import configure from defence360agent.contracts.config import Core from defence360agent.contracts.config import Model from defence360agent.router import Router from defence360agent.subsys import systemd_notifier from defence360agent.model.instance import db as db_instance from defence360agent.model import tls_check from defence360agent.utils import ( write_pid_file, IM360_RESIDENT_PID_PATH, cleanup_pid_file, ) from defence360agent.utils.check_db import ( recreate_schema_models, ) logger = getLogger(__name__) GO_SERVICE_NAME = "/usr/bin/imunify-resident" @contextlib.contextmanager def exc_handler(log_msg: str, reraise: bool): """ Logs error in case of exception. Depending on `reraise`: - re-raise exception and don't include exception info in the log operation - do not re-raise exception and include exception info in the log operation """ try: yield except Exception: logger.error(log_msg, exc_info=not reraise) if reraise: raise def apply_migrations(db: SqliteExtDatabase, migrations_dirs: Iterable[str]): """Apply migrations: restructure db, config files, etc.""" router = Router( db, migrations_dirs=migrations_dirs, logger=logger, ) # HACK: Migrator uses global unconfigurable LOGGER, # overrride it, to use our logging settings migrator.LOGGER = logger router.run() def prepare_databases( migrations_dirs: Iterable[str], attached_dbs: tuple[tuple[str, str], ...] = tuple(), ): """ Apply migrations and recreate attached databases. The workflow: 1. Apply migrations 2. Regardless whether the migrations were applied - recreate attached databases 3. If the recreation of the attached databases was successful - apply migrations again - this is done to verify that migrations will successfully apply in future for the recreated databases - the recreation + the migrations in this step are within the same transaction, so databases will only be recreated if the migrations can applied after the recreation. """ # prepare database to operate in WAL journal_mode and run migrations tls_check.reset() db_instance.init(Model.PATH) attached_schemas = [] for db_path, schema_name in attached_dbs: db_instance.execute_sql("ATTACH ? AS ?", (db_path, schema_name)) attached_schemas.append(schema_name) try: logger.info("Applying database migrations...") systemd_notifier.notify(systemd_notifier.AgentState.MIGRATING) with db_instance.atomic("EXCLUSIVE"), exc_handler( "Error applying migrations", reraise=False ): apply_migrations(db_instance, migrations_dirs) logger.info("Recreating attached databases...") with db_instance.atomic("EXCLUSIVE"), exc_handler( "Error recreating attached databases", reraise=True ): # Migration history is stored in main db, so to automatically recreate # attached dbs it is required to recreate schema for them from models recreate_schema_models(db_instance, attached_schemas) # verify migrations can be applied after the attached dbs recreation with exc_handler( "Error applying migrations after recreating attached" " databases", reraise=True, ): apply_migrations(db_instance, migrations_dirs) finally: # close connection immediately since later this process # will be replaced by execv db_instance.close() # required in case package manager or user sends signals while migrations are still running def signal_handler(sig, _): logger.warning("Received signal %s in signal_handler", sig) logger.warning( "waiting %d seconds so that migrations can finish", Core.SIGNAL_HANDLER_MIGRATION_TIMEOUT_SECS, ) time.sleep(Core.SIGNAL_HANDLER_MIGRATION_TIMEOUT_SECS) logger.info("Exiting") sys.exit(0) def run(*, start_pkg="defence360agent", configure=configure): """Entry point for Imunify-AV service. Apply migrations, and then replace process with {start_pkg}.run module.""" for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP): signal.signal(sig, signal_handler) try: if start_pkg == "im360.run_resident": write_pid_file(IM360_RESIDENT_PID_PATH) os.umask(Core.FILE_UMASK) configure() defence360agent.internals.logger.reconfigure() migration_thread = threading.Thread( target=prepare_databases, args=(app.MIGRATIONS_DIRS, app.MIGRATIONS_ATTACHED_DBS), ) migration_thread.start() migration_thread.join() systemd_notifier.notify(systemd_notifier.AgentState.READY) logger.info("Starting main process...") systemd_notifier.notify(systemd_notifier.AgentState.STARTING) if start_pkg == "im360.run_resident": Core.GO_FLAG_FILE.touch(exist_ok=True) logger.info("Run imunify-resident service") os.execv( GO_SERVICE_NAME, [ GO_SERVICE_NAME, ] + sys.argv[1:], ) else: os.execv( sys.executable, [sys.executable, "-m", "{}".format(start_pkg)] + sys.argv[1:], ) except Exception: if start_pkg == "im360.run_resident": cleanup_pid_file(IM360_RESIDENT_PID_PATH) if __name__ == "__main__": run()
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply