ACIL FM
Dark
Refresh
Current DIR:
/usr/lib/python3.9/site-packages/cockpit
/
usr
lib
python3.9
site-packages
cockpit
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
channels
-
chmod
Open
Rename
Delete
data
-
chmod
Open
Rename
Delete
_vendor
-
chmod
Open
Rename
Delete
__pycache__
-
chmod
Open
Rename
Delete
beiboot.py
22.15 MB
chmod
View
DL
Edit
Rename
Delete
beipack.py
2.99 MB
chmod
View
DL
Edit
Rename
Delete
bridge.py
12 MB
chmod
View
DL
Edit
Rename
Delete
channel.py
22.32 MB
chmod
View
DL
Edit
Rename
Delete
config.py
3.37 MB
chmod
View
DL
Edit
Rename
Delete
internal_endpoints.py
5.95 MB
chmod
View
DL
Edit
Rename
Delete
jsonutil.py
7.42 MB
chmod
View
DL
Edit
Rename
Delete
osinfo.py
929 B
chmod
View
DL
Edit
Rename
Delete
packages.py
21.25 MB
chmod
View
DL
Edit
Rename
Delete
peer.py
12.45 MB
chmod
View
DL
Edit
Rename
Delete
polkit.py
7.4 MB
chmod
View
DL
Edit
Rename
Delete
polyfills.py
2.27 MB
chmod
View
DL
Edit
Rename
Delete
protocol.py
9.52 MB
chmod
View
DL
Edit
Rename
Delete
remote.py
8.93 MB
chmod
View
DL
Edit
Rename
Delete
router.py
10.08 MB
chmod
View
DL
Edit
Rename
Delete
samples.py
17.02 MB
chmod
View
DL
Edit
Rename
Delete
superuser.py
9.74 MB
chmod
View
DL
Edit
Rename
Delete
transports.py
17.92 MB
chmod
View
DL
Edit
Rename
Delete
_version.py
20 B
chmod
View
DL
Edit
Rename
Delete
__init__.py
68 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /usr/lib/python3.9/site-packages/cockpit/superuser.py
# This file is part of Cockpit. # # Copyright (C) 2022 Red Hat, Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. import array import asyncio import contextlib import getpass import logging import os import socket from tempfile import TemporaryDirectory from typing import List, Optional, Sequence, Tuple from cockpit._vendor import ferny from cockpit._vendor.bei.bootloader import make_bootloader from cockpit._vendor.systemd_ctypes import Variant, bus from .beipack import BridgeBeibootHelper from .jsonutil import JsonObject, get_str from .packages import BridgeConfig from .peer import ConfiguredPeer, Peer, PeerError from .polkit import PolkitAgent from .router import Router, RoutingError, RoutingRule logger = logging.getLogger(__name__) class SuperuserPeer(ConfiguredPeer): responder: ferny.AskpassHandler def __init__(self, router: Router, config: BridgeConfig, responder: ferny.AskpassHandler): super().__init__(router, config) self.responder = responder async def do_connect_transport(self) -> None: async with contextlib.AsyncExitStack() as context: if 'pkexec' in self.args: logger.debug('connecting polkit superuser peer transport %r', self.args) await context.enter_async_context(PolkitAgent(self.responder)) else: logger.debug('connecting non-polkit superuser peer transport %r', self.args) responders: 'list[ferny.InteractionHandler]' = [self.responder] if '# cockpit-bridge' in self.args: logger.debug('going to beiboot superuser bridge %r', self.args) helper = BridgeBeibootHelper(self, ['--privileged']) responders.append(helper) stage1 = make_bootloader(helper.steps, gadgets=ferny.BEIBOOT_GADGETS).encode() else: stage1 = None agent = ferny.InteractionAgent(responders) if 'SUDO_ASKPASS=ferny-askpass' in self.env: tmpdir = context.enter_context(TemporaryDirectory()) ferny_askpass = ferny.write_askpass_to_tmpdir(tmpdir) env: Sequence[str] = [f'SUDO_ASKPASS={ferny_askpass}'] else: env = self.env transport = await self.spawn(self.args, env, stderr=agent, start_new_session=True) if stage1 is not None: transport.write(stage1) try: await agent.communicate() except ferny.InteractionError as exc: raise PeerError('authentication-failed', message=str(exc)) from exc class CockpitResponder(ferny.AskpassHandler): commands = ('ferny.askpass', 'cockpit.send-stderr') async def do_custom_command(self, command: str, args: Tuple, fds: List[int], stderr: str) -> None: if command == 'cockpit.send-stderr': with socket.socket(fileno=fds[0]) as sock: fds.pop(0) # socket.send_fds(sock, [b'\0'], [2]) # New in Python 3.9 sock.sendmsg([b'\0'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", [2]))]) class AuthorizeResponder(CockpitResponder): def __init__(self, router: Router): self.router = router self.authorize_attempted = False async def do_askpass(self, messages: str, prompt: str, hint: str) -> 'str | None': if self.authorize_attempted: logger.info("noninteractive authorize during init already attempted, rejecting") return None self.authorize_attempted = True hexuser = ''.join(f'{c:02x}' for c in getpass.getuser().encode('ascii')) password = await self.router.request_authorization(f'plain1:{hexuser}') # translate "no password" from authorize protocol (empty string) to ferny protocol (None) return None if password == '' else password class SuperuserRoutingRule(RoutingRule, CockpitResponder, bus.Object, interface='cockpit.Superuser'): superuser_configs: Sequence[BridgeConfig] = () pending_prompt: Optional[asyncio.Future] peer: Optional[SuperuserPeer] # D-Bus signals prompt = bus.Interface.Signal('s', 's', 's', 'b', 's') # message, prompt, default, echo, error # D-Bus properties bridges = bus.Interface.Property('as', value=[]) current = bus.Interface.Property('s', value='none') methods = bus.Interface.Property('a{sv}', value={}) # RoutingRule def apply_rule(self, options: JsonObject) -> Optional[Peer]: superuser = options.get('superuser') if not superuser or self.current == 'root': # superuser not requested, or already superuser? Next rule. return None elif self.peer or superuser == 'try': # superuser requested and active? Return it. # 'try' requested? Either return the peer, or None. return self.peer else: # superuser requested, but not active? That's an error. raise RoutingError('access-denied') # ferny.AskpassHandler async def do_askpass(self, messages: str, prompt: str, hint: str) -> Optional[str]: assert self.pending_prompt is None echo = hint == "confirm" self.pending_prompt = asyncio.get_running_loop().create_future() try: logger.debug('prompting for %s', prompt) # with sudo, all stderr messages are treated as warning/errors by the UI # (such as the lecture or "wrong password"), so pass them in the "error" field self.prompt('', prompt, '', echo, messages) return await self.pending_prompt finally: self.pending_prompt = None def __init__(self, router: Router, *, privileged: bool = False): super().__init__(router) self.pending_prompt = None self.peer = None self.startup = None if privileged or os.getuid() == 0: self.current = 'root' def peer_done(self) -> None: self.current = 'none' self.peer = None async def go(self, name: str, responder: ferny.AskpassHandler) -> None: if self.current != 'none': raise bus.BusError('cockpit.Superuser.Error', 'Superuser bridge already running') assert self.peer is None assert self.startup is None for config in self.superuser_configs: if name in (config.name, 'any'): break else: raise bus.BusError('cockpit.Superuser.Error', f'Unknown superuser bridge type "{name}"') self.current = 'init' self.peer = SuperuserPeer(self.router, config, responder) self.peer.add_done_callback(self.peer_done) try: await self.peer.start(init_host=self.router.init_host) except asyncio.CancelledError: raise bus.BusError('cockpit.Superuser.Error.Cancelled', 'Operation aborted') from None except (OSError, PeerError) as exc: raise bus.BusError('cockpit.Superuser.Error', str(exc)) from exc self.current = self.peer.config.name def set_configs(self, configs: Sequence[BridgeConfig]): logger.debug("set_configs() with %d items", len(configs)) configs = [config for config in configs if config.privileged] self.superuser_configs = tuple(configs) self.bridges = [config.name for config in self.superuser_configs] self.methods = {c.label: Variant({'label': Variant(c.label)}, 'a{sv}') for c in configs if c.label} logger.debug(" bridges are now %s", self.bridges) # If the currently active bridge config is not in the new set of configs, stop it if self.peer is not None: if self.peer.config not in self.superuser_configs: logger.debug(" stopping superuser bridge '%s': it disappeared from configs", self.peer.config.name) self.stop() def cancel_prompt(self) -> None: if self.pending_prompt is not None: self.pending_prompt.cancel() self.pending_prompt = None def shutdown(self) -> None: self.cancel_prompt() if self.peer is not None: self.peer.close() # close() should have disconnected the peer immediately assert self.peer is None # Connect-on-startup functionality def init(self, params: JsonObject) -> None: name = get_str(params, 'id', 'any') responder = AuthorizeResponder(self.router) self._init_task = asyncio.create_task(self.go(name, responder)) self._init_task.add_done_callback(self._init_done) def _init_done(self, task: 'asyncio.Task[None]') -> None: logger.debug('superuser init done! %s', task.exception()) self.router.write_control(command='superuser-init-done') del self._init_task # D-Bus methods @bus.Interface.Method(in_types=['s']) async def start(self, name: str) -> None: await self.go(name, self) @bus.Interface.Method() def stop(self) -> None: self.shutdown() @bus.Interface.Method(in_types=['s']) def answer(self, reply: str) -> None: if self.pending_prompt is not None: logger.debug('responding to pending prompt') self.pending_prompt.set_result(reply) else: logger.debug('got Answer, but no prompt pending')
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply