ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/clcommon/cpapi/plugins
/
opt
imunify360
venv
lib
python3.11
site-packages
clcommon
cpapi
plugins
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
backward_plugin.py
10.88 MB
chmod
View
DL
Edit
Rename
Delete
cpanel.py
39.04 MB
chmod
View
DL
Edit
Rename
Delete
directadmin.py
39.79 MB
chmod
View
DL
Edit
Rename
Delete
interworx.py
3.68 MB
chmod
View
DL
Edit
Rename
Delete
ispmanager.py
9.64 MB
chmod
View
DL
Edit
Rename
Delete
nopanel.py
7.73 MB
chmod
View
DL
Edit
Rename
Delete
plesk.py
34.78 MB
chmod
View
DL
Edit
Rename
Delete
universal.py
1.86 MB
chmod
View
DL
Edit
Rename
Delete
vendors.py
11.45 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
23 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/clcommon/cpapi/plugins/plesk.py
# -*- coding: utf-8 -*- import os import re import time import xml.etree.ElementTree as ETree from collections import defaultdict from functools import wraps from traceback import format_exc from typing import List, Any, Tuple, Dict, AnyStr, Optional, Union # NOQA from urllib.parse import urlparse from clcommon import ClPwd, mysql_lib from clcommon.features import Feature from clcommon.cpapi.cpapiexceptions import ( NotSupported, NoPanelUser, NoPackage, NoDomain, DuplicateData ) from clcommon.clfunc import uid_max from clcommon.cpapi.GeneralPanel import GeneralPanelPluginV1, PHPDescription, DomainDescription from clcommon.cpapi.cpapicustombin import get_domains_via_custom_binary, _docroot_under_user_via_custom_bin from clcommon.utils import run_command, find_module_param_in_config, ExternalProgramFailed PSA_SHADOW_PATH = "/etc/psa/.psa.shadow" SUPPORTED_CPINFO = {'cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'} UID_MAX = uid_max() __cpname__ = 'Plesk' # WARN: Probably will be deprecated for our "official" plugins. # See pluginlib.detect_panel_fast() def detect(): return os.path.isfile('/usr/local/psa/version') def db_access(_pass_path=PSA_SHADOW_PATH): access = {} access['login'] = 'admin' with open(_pass_path, 'r', encoding='utf-8') as f: access['pass'] = f.read().strip() return access def query_sql(query, data=None, _access=None, _dbname='psa', as_dict=False): """ Return the result of a Plesk database query :param query: SQL query string with possible parameters :param data: arguments for the SQL parameter insertion :param _access: database authentication data :param _dbname: the name of the database :param as_dict: controls the format of the output data :type query: str :type _access: dict :type as_dict: bool :return: Tuple of rows according to the query in the format specified by as_dict :rtype: tuple(tuple) or tuple(dict) """ # Example of returned data: # >>> query_sql('SELECT login from sys_users') # ((u'cltest',), (u'cltest3',), (u'user2',), (u'user1tst',)) # >>> query_sql('SELECT login from sys_users', as_dict=True) # ({'login': u'cltest'}, # {'login': u'cltest3'}, # {'login': u'user2'}, # {'login': u'user1tst'}) access = _access or db_access() dbhost = access.get('host', 'localhost') dblogin = access['login'] dbpass = access['pass'] connector = mysql_lib.MySQLConnector(host=dbhost, user=dblogin, passwd=dbpass, db=_dbname, use_unicode=True, charset='utf8', as_dict=as_dict) with connector.connect() as db: return db.execute_query(query, args=data) def cpusers(_access=None, _dbname='psa'): cpusers_lst = [fetched_one[0] for fetched_one in cpinfo(keyls=('cplogin', ))] return cpusers_lst def resellers(): sql = "SELECT clients.login FROM clients WHERE clients.type='reseller'" return [cplogin for (cplogin, ) in query_sql(sql)] def admins(): sql = "SELECT clients.login FROM clients WHERE clients.type='admin'" return set([cplogin for (cplogin, ) in query_sql(sql)]) def is_reseller(username): sql = "SELECT clients.type FROM clients WHERE clients.login=%s" try: return query_sql(sql, (username,))[0][0] == 'reseller' except IndexError: return False def _sys_users_info(sys_login, keyls): # type: (Any[str, None], Tuple[str]) -> List[Tuple] # Templates.name can be None and it is ok mapping = { 'cplogin': 'sys_users.login AS cplogin', 'mail': 'clients.email AS email', 'reseller': 'reseller.login AS reseller', 'dns': 'domains.name AS dns', 'locale': 'clients.locale AS local', 'package': 'Templates.name AS package' } select_query = ', '.join([mapping[key] for key in keyls]) sql = rf"""SELECT {select_query} FROM sys_users JOIN hosting ON hosting.sys_user_id=sys_users.id JOIN domains ON hosting.dom_id=domains.id AND domains.webspace_id=0 JOIN clients ON clients.id=domains.cl_id JOIN clients reseller ON reseller.id=domains.vendor_id LEFT JOIN Subscriptions ON Subscriptions.object_type = "domain" AND domains.id = Subscriptions.object_id LEFT JOIN PlansSubscriptions ON PlansSubscriptions.subscription_id = Subscriptions.id LEFT JOIN Templates AS Templates ON Templates.id = PlansSubscriptions.plan_id AND "domain" = Templates.type """ # make query like "where x in (%s, %s, %s, ...)" if isinstance(sys_login, (list, tuple)): placeholders = ','.join(['%s'] * len(sys_login)) sql += rf" WHERE sys_users.login IN ({placeholders})" users = query_sql(sql, data=sys_login) return users def _resellers_info(sys_login, keyls): # type: (Any[str, None], Tuple[str]) -> List[Tuple] # items with 'NULL' are not available for this panel mapping = { 'cplogin': 'clients.login AS cplogin', 'mail': 'clients.email AS email', 'reseller': 'NULL as reseller', 'dns': 'NULL as dns', 'locale': 'clients.locale AS local', 'package': 'NULL as package' } select_query = ', '.join([mapping[key] for key in keyls]) sql = f"SELECT {select_query} FROM clients WHERE clients.type IN (\"reseller\", \"admin\")" # make query like "where x in (%s, %s, %s, ...)" if isinstance(sys_login, (list, tuple)): placeholders = ','.join(['%s'] * len(sys_login)) sql += rf" AND clients.login IN ({placeholders})" users = query_sql(sql, data=sys_login) return users def cpinfo(cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'), search_sys_users=True): """ Get info about user[s] or about reseller[s]. :param str|None cpuser: get info about specified login, None for all :param list|tuple keyls: keys to return :param bool search_sys_users: work with sys users or with resellers :rtype: tuple[tuple] """ if isinstance(cpuser, str): cpuser = [cpuser] # just for developers for key in keyls: if key not in SUPPORTED_CPINFO: raise NotSupported(f'Key {key} is not supported for this control panel. ' f'Available keys: {SUPPORTED_CPINFO}') if search_sys_users: return _sys_users_info(cpuser, keyls) return _resellers_info(cpuser, keyls) def get_admin_email(*args, **kwargs): try: return query_sql(r"SELECT val FROM misc WHERE param='admin_email';")[0][0] except IndexError: return None def docroot_basic(domain): # type: (str) -> Any[None, Tuple[str, str]] sql = r""" SELECT hosting.www_root, sys_users.login FROM hosting JOIN domains ON hosting.dom_id=domains.id JOIN sys_users ON hosting.sys_user_id=sys_users.id WHERE domains.name=%s """ try: return query_sql(sql, data=(domain,))[0] except IndexError as e: raise NoDomain(f'Cannot obtain document root for {domain}') from e def docroot(domain): # type: (str) -> Any[None, Tuple[str, str]] res = None domain = domain.strip() uid = os.getuid() euid = os.geteuid() if euid == 0 and uid == 0: res = docroot_basic(domain) else: res = _docroot_under_user_via_custom_bin(domain) # If there was successful result, res object will have # (doc_root, domain_user) format. If there wasn't found any correct # doc_roots, res will be None. if res is not None: return res raise NoDomain(f"Can't obtain document root for domain '{domain}'") def reseller_users(resellername): """ Return reseller users :param resellername: reseller name; return empty list if None :return list[str]: user names list """ if resellername is None: return [] sql = """ SELECT sys_users.login FROM clients as reseller JOIN domains ON domains.vendor_id=reseller.id JOIN hosting ON hosting.dom_id=domains.id JOIN sys_users ON hosting.sys_user_id=sys_users.id WHERE domains.webspace_id=0 AND reseller.login=%s; """ return [sys_login for (sys_login,) in query_sql(sql, data=(resellername,))] def memoize(f): cache = {'userdomains_map': {}} @wraps(f) def wrapper(cpuser, *args, **kwargs): if cpuser not in cache['userdomains_map']: cache['userdomains_map'] = f(cpuser, *args, **kwargs) return cache['userdomains_map'][cpuser] return wrapper @memoize def userdomains_basic(cpuser, _access=None, _dbname='psa'): """ Return domains of given user :param str cpuser: Username :param str _dbname: Database name where is located data :return: List of domains pairs such as (domain_name, None) to be suitable for domain_lib, starting from a main domain. :rtype: list of tuples :raises NoPanelUser: User is not found in Plesk database. """ # WARN: ORDER BY columns must be present in SELECT columns for newer Mysql # webspace_id == 0 is main domain sql = r""" SELECT DISTINCT su.login, d.name, h.www_root, d.webspace_id FROM domains as d, hosting as h, sys_users as su WHERE h.sys_user_id = su.id AND h.dom_id = d.id ORDER BY d.webspace_id ASC; """ # data: # ( # (u'customer1', u'customer1.org', 10L), # (u'customer1', u'mk.customer1.org.customer1.org', 10L) # ) data = query_sql(sql, as_dict=True, _access=_access) # _user_to_domains_map: # { 'user1': [('user1.org', '/var/www/vhosts/user1.com/httpdocs'), # ('mk.user1.org', '/var/www/vhosts/user1.com/mk.user1.org')] } _user_to_domains_map = defaultdict(list) for data_dict in data: _user_to_domains_map[data_dict['login']].append( (data_dict['name'], data_dict['www_root'])) if cpuser not in _user_to_domains_map: raise NoPanelUser( f'User {cpuser} not found in the database') return _user_to_domains_map def userdomains(cpuser, _access=None, _dbname='psa', as_root=False): """ Return domains of given user :param str cpuser: Username :param str _dbname: Database name where is located data :return: List of domains pairs such as (domain_name, None) to be suitable for domain_lib, starting from a main domain. :rtype: list of tuples :raises NoPanelUser: User is not found in Plesk database. """ euid = os.geteuid() if euid == 0 or _access or as_root: return userdomains_basic(cpuser, _access, _dbname) # this case works the same as above but through the rights escalation binary wrapper # call path: here -> binary -> python(diradmin euid) -> userdomains(as_root=True) -> print json result to stdout rc, res = get_domains_via_custom_binary() if rc == 0: return res elif rc == 11: raise NoPanelUser(f'User {cpuser} not found in the database') else: raise ExternalProgramFailed(f'Failed to get userdomains: {res}') def domain_owner(domain, _access=None, _dbname='psa'): """ Return domain owner :param str domain: Domain/sub-domain/add-domain name :param str _dbname: Database name where is located data :return: user name or None if domain not found :rtype: str """ sql = r""" SELECT DISTINCT `su`.`login` FROM `sys_users` `su`, `hosting` `h`, `domains` `d`, `domains` `sd` WHERE `h`.`sys_user_id`=`su`.`id` AND `h`.`dom_id`=`d`.`id` AND (`d`.`name`=%s OR `d`.`id`=`sd`.`webspace_id` AND `sd`.`name`=%s)""" users_list = [u[0] for u in query_sql(sql, (domain, domain))] # FIXME: how this possible? if len(users_list) > 1: raise DuplicateData( f"domain {domain} belongs to few users: [{','.join(users_list)}]" ) if len(users_list) == 0: return None return users_list[0] def dblogin_cplogin_pairs(cplogin_lst=None, with_system_users=False): raise NotSupported('Getting binding credentials in the database to the user name in the system is not currently ' 'supported.') def homedirs(_sysusers=None, _cpusers=None): """ Detects and returns list of folders contained the home dirs of users of the Plesk :param str|None _sysusers: for testing :param str|None _cpusers: for testing :return: list of folders, which are parent of home dirs of users of the panel """ homedirs = [] if _cpusers is None: try: results = cpusers() except NoPackage: results = None else: results = _cpusers users = [] if results is not None: users = [line[0] for line in results] # Plesk assumes MIN_UID as 10000 clpwd = ClPwd(10000) users_dict = clpwd.get_user_dict() # for testing only if isinstance(_sysusers, (list, tuple)): class pw: def __init__(self, name, dir): self.pw_name = name self.pw_dir = dir users_dict = {} for (name, dir) in _sysusers: users_dict[name] = pw(name, dir) for user_name, user_data in users_dict.items(): if len(users) and user_name not in users: continue homedir = os.path.dirname(user_data.pw_dir) if homedir not in homedirs: homedirs.append(homedir) return homedirs def get_user_login_url(domain): return f'https://{domain}:8443' def get_reseller_id_pairs(): """ Plesk has no user associated with reseller, but we need some id for out internal purposes. Let's take it from database. """ sql = """SELECT clients.login, clients.id + %s FROM clients WHERE clients.type='reseller'""" return dict(query_sql(sql, data=[UID_MAX])) def reseller_domains(resellername): # type: (str) -> Dict[str, str] if not resellername: return {} sql = r"""SELECT sys_users.login AS cplogin, domains.name AS dns FROM sys_users JOIN hosting ON hosting.sys_user_id=sys_users.id JOIN domains ON hosting.dom_id=domains.id AND domains.webspace_id=0 JOIN clients reseller ON reseller.id=domains.vendor_id WHERE reseller.login=%s """ users = query_sql(sql, data=[resellername]) return dict(users) def _extract_xml_value(xml_string, key): """ Plesk stores some information in simple xml formatted strings. """ try: elem = ETree.fromstring(xml_string).find(key) except ETree.ParseError: return None else: return elem.text if elem is not None else None def get_domains_php_info(): """ Plesk stores the information about the handler in xml format. Return the php version info for each domain. Example output: {'cltest.com': {'handler_type': 'fpm', 'php_version_id': 'plesk-php71-fpm', 'username': 'cltest'},` 'cltest2.com': {'handler_type': 'fastcgi', 'php_version_id': 'x-httpd-lsphp-custom', 'username': 'kek_2'}, 'cltest3.com': {'handler_type': 'fastcgi', 'php_version_id': 'plesk-php56-fastcgi', 'username': 'cltest3'}, 'omg.kek': {'handler_type': 'fastcgi', 'php_version_id': 'plesk-php71-fastcgi', 'username': 'cltest'}} :rtype: dict[str, dict] """ sql = r""" SELECT sys_users.login, d.name, h.php_handler_id, handlers.value FROM domains AS d JOIN hosting AS h ON h.dom_id=d.id JOIN sys_users ON h.sys_user_id=sys_users.id JOIN (SELECT ServiceNodeEnvironment.* FROM ServiceNodeEnvironment WHERE (serviceNodeId = '1' AND section = 'phphandlers')) AS handlers ON handlers.name=h.php_handler_id WHERE h.php='true' """ # Php hanlder info xml example: # # <?xml version="1.0" encoding="UTF-8"?> # <handler> # <id>plesk-php71-fpm</id> # <type>fpm</type> # <typeName>FPM application</typeName> # <version>7.1</version> # <fullVersion>7.1.22</fullVersion> # <displayname>7.1.22</displayname> # <path>/opt/plesk/php/7.1/sbin/php-fpm</path> # <clipath>/opt/plesk/php/7.1/bin/php</clipath> # <phpini>/opt/plesk/php/7.1/etc/php.ini</phpini> # <custom>true</custom> # <registered>true</registered> # <service>plesk-php71-fpm</service> # <poold>/opt/plesk/php/7.1/etc/php-fpm.d</poold> # <outdated /> # </handler> domains_php_info = query_sql(sql) # yep, vendor php_handler_id has only "fpm/cgi/fastcgi" w/o version, so additional bicycle needed vendor_version_ids = ['cgi', 'fastcgi', 'fpm', 'x-httpd-lsphp-custom'] php_versions = {} for username, domain, php_handler_id, handler_xml in domains_php_info: display_version = php_handler_id if php_handler_id not in vendor_version_ids \ else f'vendor-php{_extract_xml_value(handler_xml, "version")}'.replace('.', '') def _cast(handler_name: str, version_id: str) -> str: if handler_name == 'fpm': return 'php-fpm' elif 'x-httpd-lsphp' in version_id: return 'lsapi' return handler_name handler = _extract_xml_value(handler_xml, key='type') or 'unknown' handler = _cast(handler, php_handler_id) # transform different php variations into some normal form display_version = display_version\ .replace('-dedicated', '')\ .replace('-fpm', '')\ .replace('-fastcgi', '')\ .replace('x-httpd-lsphp-', 'alt-php') php_versions[domain] = DomainDescription( username=username, php_version_id=display_version, # not a typo handler_type=handler, display_version=display_version ) return php_versions def get_main_username_by_uid(uid: int) -> str: """ Get "main" panel username by uid. :param uid: uid :return Username or 'N/A' if user not found """ if uid == 0: return 'root' try: _clpwd = ClPwd() pwd_list = _clpwd.get_pw_by_uid(uid) if os.geteuid() == 0: for user_pwd in pwd_list: username = user_pwd.pw_name try: userdomains(username) return username except NoPanelUser: pass else: # Under user cycle implemented in suid binary, see scripts/plesk_suid_caller.py username = pwd_list[0].pw_name userdomains(username) return username except (NoPanelUser, ClPwd.NoSuchUserException): pass return 'N/A' class PanelPlugin(GeneralPanelPluginV1): def __init__(self): super().__init__() self.HTTPD_MPM_CONFIG = '/etc/httpd/conf.modules.d/01-cgi.conf' # Defaults of MaxRequestWorkers for all possible mpm modules self.MPM_MODULES = { "prefork": 256, "worker": 400, "event": 400 } # Vars for httpd modules caching self.httpd_modules_ts = 0 self.httpd_modules = "" def getCPName(self): """ Return panel name :return: """ return __cpname__ def get_cp_description(self): """ Retrieve panel name and it's version :return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'} or None if can't get info """ try: with open("/usr/local/psa/version", "r", encoding="utf-8") as f: out = f.read() return {'name': __cpname__, 'version': out.split()[0], 'additional_info': None} except Exception: return None def db_access(self): """ Getting root access to mysql database. For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'} :return: root access to mysql database :rtype: dict :raises: NoDBAccessData """ return db_access() def cpusers(self): """ Generates a list of cpusers registered in the control panel :return: list of cpusers registered in the control panel :rtype: tuple """ return cpusers() def resellers(self): """ Generates a list of resellers in the control panel :return: list of cpusers registered in the control panel :rtype: tuple """ return resellers() def is_reseller(self, username): """ Check if given user is reseller; :type username: str :rtype: bool """ return is_reseller(username) def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False): """ Get mapping between system and DB users @param cplogin_lst :list: list with usernames for generate mapping @param with_system_users :bool: add system users to result list or no. default: False """ return dblogin_cplogin_pairs(cplogin_lst, with_system_users) def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'), search_sys_users=True): """ Retrieves info about panel user(s) :param str|unicode|list|tuple|None cpuser: user login :param keyls: list of data which is necessary to obtain the user, the valuescan be: cplogin - name/login user control panel mail - Email users reseller - name reseller/owner users locale - localization of the user account package - User name of the package dns - domain of the user :param bool search_sys_users: search for cpuser in sys_users or in control panel users (e.g. for Plesk) :return: returns a tuple of tuples of data in the same sequence as specified keys in keylst :rtype: tuple """ return cpinfo(cpuser, keyls, search_sys_users=search_sys_users) def get_admin_email(self): """ Retrieve admin email address :return: Host admin's email """ return get_admin_email() def docroot(self, domain): """ Return document root for domain :param str|unicode domain: :return str: document root for domain """ return docroot(domain) @staticmethod def useraliases(cpuser, domain): """ Return aliases from user domain :param str|unicode cpuser: user login :param str|unicode domain: :return list of aliases """ sql = """ SELECT a.name, d.name FROM domains AS d INNER JOIN domain_aliases AS a ON a.dom_id = d.id INNER JOIN hosting AS h ON h.dom_id = d.id INNER JOIN sys_users AS su ON h.sys_user_id = su.id WHERE su.login = %s AND d.name = %s """ return [item[0] for item in query_sql(sql, (cpuser, domain))] def userdomains(self, cpuser): """ Return domain and document root pairs for control panel user first domain is main domain :param str|unicode cpuser: user login :return list of tuples (domain_name, documen_root) """ return userdomains(cpuser) def homedirs(self): """ Detects and returns list of folders contained the home dirs of users of the cPanel :return: list of folders, which are parent of home dirs of users of the panel """ return homedirs() def reseller_users(self, resellername=None): """ Return reseller users :param resellername: reseller name; autodetect name if None :return list[str]: user names list """ return reseller_users(resellername) def reseller_domains(self, resellername=None): """ Get dict[user, domain] :param resellername: reseller's name :rtype: dict[str, str|None] :raises DomainException: if cannot obtain domains """ return reseller_domains(resellername) def get_user_login_url(self, domain): """ Get login url for current panel; :type domain: str :rtype: str """ return get_user_login_url(domain) def admins(self): """ List all admins names in given control panel :return: list of strings """ return admins() def get_reseller_id_pairs(self): """ Plesk has no user associated with reseller, but we need some id for out internal purposes. Let's take it from database. """ return get_reseller_id_pairs() def domain_owner(self, domain): """ Return domain's owner :param domain: Domain/sub-domain/add-domain name :rtype: str :return: user name or None if domain not found """ return domain_owner(domain) def get_domains_php_info(self): """ Return php version information for each domain :return: domain to php info mapping :rtype: dict[str, dict] """ return get_domains_php_info() def get_installed_php_versions(self): """ Get the list of PHP version installed in panel in the form of 'versionXY', for example alt-php56 or plesk-php80 "Versions by OS vendor" in Plesk DB have names: - module - synced They are FILTERED from the list :return: list """ sql = """ SELECT ServiceNodeEnvironment.name, ServiceNodeEnvironment.value FROM ServiceNodeEnvironment WHERE (serviceNodeId = '1' AND section = 'phphandlers') """ # handler list example: # ['alt-php-internal-cgi', 'alt-php44-cgi', 'alt-php44-fastcgi', # 'alt-php51-cgi', 'alt-php51-fastcgi', 'fpm', 'cgi', # 'fastcgi', 'x-httpd-lsphp-custom'] query_result = query_sql(sql) ver_name_pattern = re.compile(r'^(alt-|plesk-)php+\d+', re.IGNORECASE) named_php_handlers = [item[0] for item in query_result if ver_name_pattern.match(item[0])] vendor_handler_names = ['cgi', 'fastcgi', 'fpm', 'x-httpd-lsphp-custom'] named_php_handlers.extend([self._cast_to_vendor_name(name, xmlconfig) for name, xmlconfig in query_result if name in vendor_handler_names]) versions_set = set('-'.join(item.split('-')[:2]) for item in named_php_handlers) php_description = [] for php_name in versions_set: if php_name.startswith("alt-") or php_name.startswith("x-httpd-lsphp-"): php_root_dir = f'/opt/{php_name.replace("-", "/")}/' php_description.append(PHPDescription( identifier=php_name, version=f'{php_name[-2]}.{php_name[-1]}', dir=os.path.join(php_root_dir), modules_dir=os.path.join(php_root_dir, 'usr/lib64/php/modules/'), bin=os.path.join(php_root_dir, 'usr/bin/php'), ini=os.path.join(php_root_dir, 'link/conf/default.ini'), )) elif php_name.startswith("plesk-"): php_root_dir = f'/opt/plesk/php/{php_name[-2]}.{php_name[-1]}/' php_description.append(PHPDescription( identifier=php_name, version=f'{php_name[-2]}.{php_name[-1]}', modules_dir=os.path.join(php_root_dir, 'lib64/php/modules/'), dir=os.path.join(php_root_dir), bin=os.path.join(php_root_dir, 'bin/php'), ini=os.path.join(php_root_dir, 'etc/php.ini'), )) elif php_name.startswith("vendor-"): php_root_dir = '/' php_description.append(PHPDescription( identifier=php_name, version=f'{php_name[-2]}.{php_name[-1]}', modules_dir=os.path.join(php_root_dir, 'usr/lib64/php/modules/'), dir=os.path.join(php_root_dir), bin=os.path.join(php_root_dir, 'bin/php'), ini=os.path.join(php_root_dir, 'etc/php.ini'), )) else: # unknown php, skip continue return php_description def _cast_to_vendor_name(self, name, value): return f'vendor-php{_extract_xml_value(value, "version")}-{name}'.replace('.', '') def get_unsupported_cl_features(self) -> tuple[Feature, ...]: return ( Feature.RUBY_SELECTOR, Feature.PYTHON_SELECTOR, Feature.NODEJS_SELECTOR, ) def _get_active_apache_mpm_module(self) -> Optional[AnyStr]: """ Determines active MPM module for Apache Web Server :return: apache_active_module_name apache_active_module_name: 'prefork', 'event', 'worker' """ try: # Caching httpd output and refresh it only one time in hour if time.time() - self.httpd_modules_ts > 3600: self.httpd_modules = run_command(["httpd", "-M"]) self.httpd_modules_ts = time.time() except (OSError, IOError, ExternalProgramFailed): self.httpd_modules = "" self.httpd_modules_ts = time.time() for mpm_module in self.MPM_MODULES: if f"mpm_{mpm_module}_module" in self.httpd_modules: return mpm_module return None def _get_max_request_workers_for_module(self, apache_module_name: str) \ -> Tuple[int, str]: """ Determine MaxRequestWorkers directive value for specified apache module Reads config file /etc/httpd/conf.modules.d/01-cgi.conf :param apache_module_name: Current apache's module name: 'prefork', 'event', 'worker' :return: tuple (max_req_num, message) max_req_num - Maximum request apache workers number or 0 if error message - OK/Error message """ try: return find_module_param_in_config(self.HTTPD_MPM_CONFIG, apache_module_name, 'MaxRequestWorkers', self.MPM_MODULES[apache_module_name]) except (OSError, IOError, IndexError, ValueError): return 0, format_exc() def get_apache_max_request_workers(self) -> Tuple[int, str]: """ Get current maximum request apache workers from httpd's config :return: tuple (max_req_num, message) max_req_num - Maximum request apache workers number or 0 if error message - OK/Error message """ apache_active_module = self._get_active_apache_mpm_module() if apache_active_module is None: return 0, "httpd service doesn't work or mpm modules are absent" return self._get_max_request_workers_for_module(apache_active_module) @staticmethod def get_main_username_by_uid(uid: int) -> str: """ Get "main" panel username by uid. :param uid: uid :return Username """ return get_main_username_by_uid(uid) @staticmethod def get_user_emails_list(username: str, domain: str): sql = f""" SELECT clients.email FROM clients WHERE clients.id = ( SELECT domains.cl_id FROM domains WHERE domains.name = '{domain}') """ query_result = query_sql(sql) return ','.join(item[0] for item in query_result) @staticmethod def panel_login_link(username): link = run_command(['/usr/sbin/plesk', 'login']) if not link: return '' # https://10.51.32.129/login?secret=RZ3NqTqneO0ZQgkIb-QKxyMZkvOgdAS0SGaNnAgN-nKyAYgc -> https://10.51.32.129/ parsed = urlparse(link) return f'{parsed.scheme}://{parsed.netloc}/' @staticmethod def panel_awp_link(username): link = PanelPlugin.panel_login_link(username).rstrip("/") if len(link) == 0: return '' return f'{link}/modules/plesk-lvemanager/index.php/awp/index#/' def get_customer_login(self, username): """ In some rare situations we need customer login instead of system user name. E.g. when communicating with WHMCS. This method resolves customer login by his system user name. """ sql = r"""SELECT clients.login FROM sys_users JOIN hosting ON hosting.sys_user_id=sys_users.id JOIN domains ON hosting.dom_id=domains.id AND domains.webspace_id=0 JOIN clients ON clients.id=domains.cl_id WHERE sys_users.login = %s""" customers = query_sql(sql, data=[username]) try: return customers[0][0] except IndexError as e: raise NoPanelUser(f'Unknown user {username}') from e def get_domain_login(self, username, domain): """ In some rare situations we need subscription login instead of client login. E.g. when communicating with WHMCS. This method resolves sys_users login by domain. One client can create several subscriptions Each subscription creates a new login in the sys_users table The user can create several domains for one subscription upgrade_url requires subscription login from sys_users. """ sql = r"""SELECT sys_users.login FROM sys_users JOIN hosting ON hosting.sys_user_id=sys_users.id JOIN domains ON hosting.dom_id=domains.id AND domains.webspace_id=0 AND domains.name = %s""" logins = query_sql(sql, data=[domain]) try: return logins[0][0] except IndexError as e: raise NoPanelUser(f'Unknown user for domain {domain}') from e def get_server_ip(self): sql = r""" SELECT ip_address FROM IP_Addresses WHERE main = 'true' """ ip_addresses = query_sql(sql) try: return ip_addresses[0][0] except IndexError as e: raise NotSupported( 'Unable to detect main ip for this server. ' 'Contact CloudLinux support and report the issue.' ) from e def suspended_users_list(self): """ Returns list of suspended system users suspended means domain status == 2 """ sql = r""" SELECT su.login FROM sys_users su JOIN hosting h ON su.id = h.sys_user_id JOIN domains d ON h.dom_id = d.id WHERE d.status = 2 """ suspended = query_sql(sql) return [item[0] for item in suspended]
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply