ÿØÿàJFIFHHÿá .
BSA HACKER
Logo of a company Server : Apache
System : Linux nusantara.hosteko.com 4.18.0-553.16.1.lve.el8.x86_64 #1 SMP Tue Aug 13 17:45:03 UTC 2024 x86_64
User : koperas1 ( 1254)
PHP Version : 7.4.33
Disable Function : NONE
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/manager/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/manager/base.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
This module contains classes implementing X-Ray Manager behaviour
and helper functions
"""
import json
import logging
import os
import pwd
import shutil
import signal
import subprocess
import sys
import typing
from datetime import datetime, timedelta
from glob import glob
from typing import Iterator, Optional, Tuple

import psutil
from clcommon.const import Feature
from clcommon.cpapi import (
    is_panel_feature_supported,
    get_installed_php_versions
)
from clwpos.papi import php_get_vhost_versions, is_wpos_supported
from clcommon.utils import is_litespeed_running
from clcommon.clpwd import drop_privileges
from secureio import disable_quota

from xray import gettext as _
from ..adviser.clwpos_get import ClWposGetter
from ..apiclient import get_client
from ..continuous.manager import ContinuousManager
from ..internal.exceptions import (
    XRayManagerError,
    XRayAPIError,
    XRayMissingDomain, XRayManagerExit, XRayManagerExitPHPUnsupported
)
from ..internal.fpm_utils import FPMReloadController
from ..internal.local_counters import open_local_storage
from ..internal.nginx_utils import NginxUserCache
from ..internal.types import DomainInfo, Task, url_split
from ..internal.user_manager import ManageUserAgent
from ..internal.advanced_metrics import AdvancedMetrics
from ..internal.user_plugin_utils import (
    get_xray_exec_user,
    user_mode_restricted,
    nginx_user_cache
)
from ..internal.utils import (
    no_active_tasks,
    switch_schedstats,
    ssa_disabled,
    _cagefsctl_remount,
    set_privileges,
    is_file_recently_modified
)
from ..reconfiguration.global_ini import (
    create_ini_files,
    remove_ini_files,
    is_global_ini_mode,
    remove_global_ini_mode_marker,
    create_global_ini_mode_marker
)

if typing.TYPE_CHECKING:
    from ..apiclient.api_client import APIClient


class BaseManager:
    """
    Basic manager abstract class.
    Implements methods common for all managers.
    Requires managers to implement their custom methods.
    """
    DAEMON_PIDFILE = '/var/run/xray-agent.pid'

    VERSIONS = {
        'alt-php54': '/opt/alt/php54/link/conf',
        'alt-php55': '/opt/alt/php55/link/conf',
        'alt-php56': '/opt/alt/php56/link/conf',
        'alt-php70': '/opt/alt/php70/link/conf',
        'alt-php71': '/opt/alt/php71/link/conf',
        'alt-php72': '/opt/alt/php72/link/conf',
        'alt-php73': '/opt/alt/php73/link/conf',
        'alt-php74': '/opt/alt/php74/link/conf',
        'alt-php80': '/opt/alt/php80/link/conf',
        'alt-php81': '/opt/alt/php81/link/conf',
        'alt-php82': '/opt/alt/php82/link/conf',
        'alt-php83': '/opt/alt/php83/link/conf'
    }

    def __init__(self, system_id: str, phpinfo_mode: bool = False):
        # FIXME: there is no reason to keep class instead of instance
        #        refactor this once it causes issues
        self.api_client_class: type[APIClient] = get_client()
        self.sys_id = system_id
        self.logger = logging.getLogger('manager')
        self.continuous_monitoring = ContinuousManager()
        self.manage_user_agent = ManageUserAgent()
        self.phpinfo_mode = phpinfo_mode
        self.ui_api_client = get_client('lists')(system_id=self.sys_id)

    def _get_daemon_pid(self) -> Optional[int]:
        try:
            with open(self.DAEMON_PIDFILE) as f:
                return int(f.read())
        except (OSError, IOError):
            logging.warning('Unable to read daemon pid from pidfile.')

    def supported_versions(self) -> dict:
        """
        Get supported PHP versions
        :return: a dict with supported versions
        """
        return self.VERSIONS

    def is_version_supported(self, php_version: str) -> bool:
        """
        Check if given PHP version is supported
        :param php_version: PHP version, e.g. 'alt-phpXY'
        :return: True if version is supported, False otherwise
        """
        return php_version in self.supported_versions()

    def _ini_path(self, domain_info: DomainInfo) -> str:
        """
        Get a path to directory for additional ini file for based on
        panel-set version
        """
        return self.supported_versions().get(domain_info.panel_php_version)

    def get_ini_path(self, domain_info: DomainInfo) -> str:
        """
        Resolve a path to directory for additional ini file
        It depends on version set for domain and on selector
        :param domain_info: a DomainInfo object
        :return: path to directory for ini files
        """
        ini_path = self._ini_path(domain_info)
        try:
            if domain_info.php_ini_scan_dir:
                ini_path = domain_info.php_ini_scan_dir
            elif self.is_selector_enabled(domain_info):
                ini_path = domain_info.selector_ini_path
            else:
                ini_path = domain_info.phpd_location_ini_path or ini_path
        except ValueError:
            # failed to resolve CageFS prefix for user
            pass
        self.logger.info('Ini path resolved as %s', ini_path)
        return ini_path

    def get_php_version(self, domain_info: DomainInfo) -> str:
        """
        Get PHP version which serves given domain
        :param domain_info: a DomainInfo object, including user of domain
                            and PHP version set in control panel environment
        :return: real php version of domain (selector or panel one)
        """
        if not domain_info.is_selector_applied and \
                self.is_selector_enabled(domain_info):
            self.logger.info('Selector is enabled for user %s',
                             domain_info.user)
            current_version = domain_info.selector_php_version
        else:
            current_version = domain_info.panel_php_version
        self.logger.info('PHP version detected as %s', current_version)
        return current_version

    def is_selector_enabled(self, domain_info: DomainInfo) -> bool:
        """
        Is selector enabled for given domain
        :param domain_info: a DomainInfo object, including user of domain
                            and PHP version set in control panel
        :return: True if selector is enabled for domain, False otherwise
        """
        if not is_panel_feature_supported(Feature.PHP_SELECTOR):
            return False
        if domain_info.selector_php_version is None:
            return False
        return self.panel_specific_selector_enabled(domain_info)

    def php_procs_reload(self, domain_info: DomainInfo) -> None:
        """
        Reload FPM service or kill all *php* processes of user
        :param domain_info: a ready-to-use DomainInfo object
        """
        self.reset_criu_imgs(domain_info.name)

        if domain_info.panel_fpm:
            self.restart_fpm_service(domain_info)
        elif is_litespeed_running():
            self.gracefully_restart_litespeed(domain_info.user)
        else:
            self.kill_user_php(domain_info.user)

    def gracefully_restart_litespeed(self, username):
        """
        Litespeed's graceful restart.
        https://www.litespeedtech.com/support/wiki/doku.php/litespeed_wiki:php:detached_mode#for_a_user
        """
        pw = pwd.getpwnam(username)

        with drop_privileges(username), disable_quota():
            # just  change it's modification date
            try:
                open(os.path.join(pw.pw_dir, '.lsphp_restart.txt'), 'w').close()
            except (FileNotFoundError, PermissionError):
                self.logger.warning('Unable to restart lsws after changes', exc_info=True)

    def kill_user_php(self, username: str) -> list:
        """
        Kill all PHP processes, which belong to given username
        :param username: name of user
        :return: list of pids of successfully killed processes
        """
        killed_procs = list()
        for proc in self.user_procs(username):
            if 'php' in proc.info['name']:
                proc.send_signal(signal.SIGHUP)
                killed_procs.append(proc.info['pid'])
        return killed_procs

    @staticmethod
    def user_procs(user_name: str) -> Iterator[psutil.Process]:
        """
        Generator yields processes, which belong to given user_name.
        Processes are checked using UID, not user_name
        :param user_name: user name
        :return: a generator object
        """
        uid_by_name = pwd.getpwnam(user_name).pw_uid
        attrs = ['name', 'pid', 'uids']
        for p in psutil.process_iter(attrs):
            if uid_by_name in (p.info['uids'].real, p.info['uids'].effective):
                yield p

    def reset_criu_imgs(self, domain: str) -> None:
        """
        Reset criu images if any found for given domain in order to
        enable/disable X Ray correctly
        """
        criu_imgs_dir = glob(f'/var/run/mod_lsapi/*{domain}_criu_imgs')
        if criu_imgs_dir:
            shutil.rmtree(criu_imgs_dir[0])
            self.logger.info('criu images in %s dropped', criu_imgs_dir[0])

    def _domain_info_by_url(self, url: str) -> DomainInfo:
        """
        DomainInfo retrieving based on url
        """
        domain_name, _ = url_split(url)
        # get_domain_info includes validation of domain existence
        return self.get_domain_info(domain_name)

    def get_domain_info(self, domain_name: str) -> DomainInfo:
        """
        Retrieve information about given domain from control panel environment
        Required to be implemented by child classes
        :param domain_name: name of domain
        :return: a DomainInfo object
        """
        raise NotImplementedError(
            _('Manager should implement retrieving domain info!'))

    def panel_specific_selector_enabled(self, domain_info: DomainInfo) -> bool:
        """
        Check if selector is enabled specifically for panel
        Required to be implemented by child classes
        :param domain_info: a DomainInfo object
        :return: True if yes, False otherwise
        """
        raise NotImplementedError(
            _('Manager should implement specific panel check for selector enabled!'))

    def fpm_service_name(self, dom_info: DomainInfo) -> str:
        """
        Get FPM service name for particular panel
        :param dom_info: DomainInfo object
        :return: name of FPM service
        """
        raise NotImplementedError(
            _('Manager should implement FPM service name retrieving!'))

    def restart_fpm_service(self, dom_info: DomainInfo) -> None:
        """
        Restart FPM service for particular version
        :param dom_info: DomainInfo object
        """
        fpm_serv = self.fpm_service_name(dom_info)
        try:
            subprocess.run(
                ['/sbin/service', fpm_serv, 'reload'],
                capture_output=True,
                text=True,
                check=True)
            self.logger.info('Service %s reloaded', fpm_serv)
        except (OSError, ValueError, subprocess.SubprocessError) as e:
            self.logger.error('Failed to reload FPM service',
                              extra={'err': str(e),
                                     'info': dom_info})
        else:
            FPMReloadController(fpm_serv).save_latest_reload()

    @staticmethod
    def prepare_wpos_info_path(username: str) -> Tuple[str, int]:
        """Resolve path for user and prepare directory if needed"""
        _upwd = pwd.getpwnam(username)
        _path = f'/var/clwpos/uids/{_upwd.pw_uid}/info.json'
        if not os.path.isdir(os.path.dirname(_path)):
            os.makedirs(os.path.dirname(_path))
            _cagefsctl_remount(username)
        return _path, _upwd.pw_gid

    def write_wpos_info(self, user: str) -> None:
        """Prepare and write info.json file for WPOS (AWP) utilities"""
        if not is_wpos_supported():
            return

        dest, user_gid = self.prepare_wpos_info_path(user)
        if is_file_recently_modified(dest):
            self.logger.info('File %s modified recently, skip generating',
                             dest)
            return

        working_dest = dest + '.tmp'
        to_write = dict(
            vhost_versions=php_get_vhost_versions(user),
            installed_versions=get_installed_php_versions()
        )
        self.logger.debug('Going to write file %s with %s',
                          dest, to_write)
        with set_privileges(target_uid=0, target_gid=user_gid, mask=0o137):
            try:
                with open(working_dest, 'w') as info_json:
                    json.dump(to_write, info_json)
                shutil.move(working_dest, dest)
                self.logger.info('WPOS data written successfully')
            except OSError as e:
                self.logger.error('Unable to write info for WPOS due to %s',
                                  str(e))

    @staticmethod
    def response(**kwargs) -> str:
        """
        Create JSON response message with result field == success
        and given keyword arguments in other fields
        :return: json packed string
        """
        initial = {'result': 'success'}
        if kwargs:
            initial.update(kwargs)
        return json.dumps(dict(sorted(initial.items())))

    def response_continuous(self, **kwargs) -> 'json str':
        """
        Create JSON response message for continuous actions.
        Extends action definition with 'continuous' addition
        :return: json packed string
        """
        if kwargs.get('action'):
            kwargs['action'] = f"{kwargs['action']} continuous"
        return self.response(**kwargs)

    @user_mode_restricted
    def start(self, *, url: str, client_ip: str,
              tracing_by: str, tracing_count: int,
              auto_task: bool = False,
              autotracing: bool = False) -> 'json str':
        """
        Start monitoring of given URL.
        Arguments are only allowed by keyword
        :param url: URL to monitor
        :param client_ip: a client IP address
        :param tracing_by: time or qty
        :param tracing_count: a number of minutes|requests to monitor
        :param auto_task: if a task is an auto created one (x-ray 2.0)
        :param autotracing: task is an auto created with
                            user marked as *autotracing*
        :return: JSON encoded result of start action
        """
        # domain existence validation inside
        domain_info = self._domain_info_by_url(url)
        self.write_wpos_info(domain_info.user)
        real_php_version = self.get_php_version(domain_info)
        if self.is_version_supported(real_php_version):
            ini_files_location = self.get_ini_path(domain_info)
        else:
            self.logger.warning('PHP version is unsupported',
                                extra={'version': real_php_version})
            raise XRayManagerExitPHPUnsupported(url, real_php_version)

        # --- ask redis for tracing task id ---
        client = self.api_client_class(system_id=self.sys_id)
        user = '*autotracing*' if autotracing else get_xray_exec_user()
        tracing_task = Task(url=url, client_ip=client_ip,
                            tracing_by=tracing_by,
                            tracing_count=tracing_count,
                            ini_location=ini_files_location,
                            auto_task=auto_task, user=user,
                            domain_owner=domain_info.user)
        tracing_task.task_id = client.create(tracing_task)
        # ---
        tracing_task.add(php_version=real_php_version)
        # --- if time, add cron job
        tracing_task.set_cronjob(system_id=self.sys_id)
        # ---
        # --- update task status in redis db
        client.update(tracing_task.starttime)
        # ---
        self.php_procs_reload(domain_info)
        switch_schedstats(enabled=True)
        try:
            ClWposGetter().post_metadata(domain_info.user, domain_info.name)
        except XRayAPIError as e:
            logging.warning('Failed to send metadata to Smart Advice with: %s',
                            e.reason)
        NginxUserCache(domain_info.user).disable()

        # automatically create missing ini files
        # in cagefs and other places if we use global mode
        if is_global_ini_mode():
            create_ini_files()

        return self.response(action='start',
                             tracing_task_id=tracing_task.task_id)

    def start_auto(self, *, url: str) -> 'json str':
        """
        Start continuous monitoring of given URL (auto task).
        Arguments are only allowed by keyword
        :param url: URL to monitor
        :return: JSON encoded result of start action
        """
        return self.start(url=url, client_ip='*', tracing_by='time',
                          tracing_count=1430, auto_task=True)

    def start_autotracing(self, *, url: str,
                          tracing_count: int = 20) -> 'json str':
        """
        Start monitoring of given URL (autotracing task).
        Note, user is overridden as *autotracing*
        Arguments are only allowed by keyword
        :param url: URL to monitor
        :param tracing_count: count of requests to capture
        :return: JSON encoded result of start action
        """
        return self.start(url=url, client_ip='*', tracing_by='request_qty',
                          tracing_count=tracing_count, auto_task=True,
                          autotracing=True)

    def stop(self, tracing_task_id: str) -> 'json str':
        """
        Stop monitoring of given task ID
        :param tracing_task_id: an ID of task to stop
        :return: JSON encoded result of stop action
        """
        client = self.api_client_class(system_id=self.sys_id,
                                       tracing_task_id=tracing_task_id)
        tracing_task = client.get_task()

        self._request_daemon_storage_flush()
        with open_local_storage(tracing_task.fake_id) as storage:
            tracing_task.update_with_local_data(next_request_id=storage.next_request_id)

        try_to_complete_statuses = ['running', 'stopped', 'completed', 'hold']

        if tracing_task.status not in try_to_complete_statuses:
            raise XRayManagerExit(
                _("Cannot stop task with status '%s'") % str(tracing_task.status))

        # for FPM pre-reload check
        try:
            domain_info = self._domain_info_by_url(tracing_task.url)
            tracing_task.set_domain_owner(domain_info.user)
        except XRayMissingDomain:
            self.logger.info('Stopping task for an already deleted domain')
            domain_info = None

        tracing_task.remove()
        # --- if time, remove cron job
        tracing_task.drop_cronjob()
        # ---
        # --- recalculate remaining count
        remaining_count = tracing_task.recalculate_counts()
        # ---
        # --- stop or complete task
        if remaining_count > 0:
            client.stop(remaining_count)
        else:
            self._complete(tracing_task, client)
        # ---
        if domain_info is not None:
            NginxUserCache(domain_info.user).restore()
            self.php_procs_reload(domain_info)
        if no_active_tasks() and ssa_disabled():
            switch_schedstats(enabled=False)
        return self.response(action='stop',
                             tracing_task_id=tracing_task.task_id)

    def _request_daemon_storage_flush(self):
        """
        Request daemon to flush it's in-memory storage on disk using SIGUSR2
        and wait for getting SIGUSR2 back that daemon sends when it successfully
        flushed data on disk and we are ready to proceed.
        """
        # don't process SIGUSR2 with default handler
        signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGUSR2})
        daemon_pid = self._get_daemon_pid()

        if daemon_pid:
            try:
                os.kill(daemon_pid, signal.SIGUSR2)
            except OSError:
                self.logger.warning('Unable to send daemon signal. Some stats will not be flushed. '
                                    'Maybe daemon is stopped?')

        self.logger.info('Waiting for daemon to signal back about flush end.')
        siginfo = signal.sigtimedwait({signal.SIGUSR2}, 15.0)
        if siginfo is None:
            logging.warning('Daemon did not signal back in given timeout. '
                            'Some stats may not be flushed. Continue.')
        # and restore default signal handler
        signal.pthread_sigmask(signal.SIG_DFL, {signal.SIGUSR2})

    @user_mode_restricted
    def continue_(self, tracing_task_id: str) -> 'json str':
        """
        Continue monitoring of given task ID
        :param tracing_task_id: an ID of task to continue
        :return: JSON encoded result of continue action
        """
        client = self.api_client_class(system_id=self.sys_id,
                                       tracing_task_id=tracing_task_id)
        tracing_task = client.get_task()
        if tracing_task.status in ('running', 'completed'):
            raise XRayManagerExit(
                _("Cannot continue task with status '%s'") % str(tracing_task.status))

        # for FPM pre-reload check
        domain_info = self._domain_info_by_url(tracing_task.url)
        tracing_task.set_domain_owner(domain_info.user)

        if '54' in self.supported_versions():
            # for custom panels only
            real_php_version = self.get_php_version(domain_info)
        else:
            # no need in redundant call for non-custom panels
            real_php_version = None

        tracing_task.add(php_version=real_php_version)
        # --- if time, add cron job
        tracing_task.set_cronjob(system_id=self.sys_id)
        # ---
        # --- update task status in redis db
        client.update(tracing_task.starttime)
        # ---
        NginxUserCache(domain_info.user).disable()
        self.php_procs_reload(domain_info)
        switch_schedstats(enabled=True)
        return self.response(action='continue',
                             tracing_task_id=tracing_task.task_id)

    def complete(self, tracing_task_id: str) -> 'json str':
        """
        Complete given tak ID
        :param tracing_task_id: an ID of task to complete
        :return: JSON encoded result of complete action
        """
        client = self.api_client_class(system_id=self.sys_id,
                                       tracing_task_id=tracing_task_id)
        tracing_task = client.get_task()
        if tracing_task.status in ('running', 'completed'):
            raise XRayManagerExit(
                _("Cannot complete task with status '%s'") % str(tracing_task.status))
        self._complete(tracing_task, client)
        return self.response(action='complete',
                             tracing_task_id=tracing_task.task_id)

    @staticmethod
    def _is_to_complete(task: dict) -> bool:
        """
        Returns True if task should be completed.
        False otherwise.
        """
        now = datetime.now()
        tracing_by = task['tracing_by']
        inception = task['starttime'] or task['createtime']
        task_start_time = datetime.fromtimestamp(inception)
        if tracing_by == 'request_qty' and task_start_time + timedelta(days=2) < now:
            return True
        if tracing_by == 'time' and task_start_time + timedelta(minutes=task['tracing_count']) < now:
            return True
        return False

    def autocomplete_tasks(self) -> 'json str':
        """
        Gets all server tasks and complete those which match following criteria:
         - traced_by requests_qty: if task started 2 days ago - complete
         - traced_by time: if task runs longer than start time + tracing time - complete
        """
        all_server_tasks = self.ui_api_client.get_task_list().get('result', [])
        self.logger.info('Check tasks for autocompleting...%s', str(all_server_tasks))
        for task_item in all_server_tasks:
            if not self._is_to_complete(task_item):
                continue
            logging.info('Going to complete task with id: %s', str(task_item['tracing_task_id']))
            self.stop(task_item['tracing_task_id'])
        return self.response(action='autocomplete-tasks')

    @staticmethod
    def _complete(t_task: Task, client: 'APIClient'):
        """
        Common complete actions:
        - delete cron job,
        - erase request id file
        - send 'complete' status to mongo
        :param t_task: a tracing task object
        :param client: an APIClient object
        """
        t_task.drop_cronjob()
        t_task.erase_request_id_storage()
        client.complete()
        if t_task.auto_task:
            client.share()

    def delete(self, tracing_task_id: str) -> 'json str':
        """
        Delete given task ID
        :param tracing_task_id: an ID of task to delete
        :return: JSON encoded result of delete action
        """
        client = self.api_client_class(system_id=self.sys_id,
                                       tracing_task_id=tracing_task_id)
        tracing_task = client.get_task()
        if tracing_task.status == 'running':
            raise XRayManagerExit(
                _("Cannot delete task with status '%s'") % str(tracing_task.status))
        client.delete()
        return self.response(action='delete',
                             tracing_task_id=tracing_task.task_id)

    def enable_continuous(self, url: str, email: str) -> 'json str':
        """
        Enable continuous monitoring for given URL
        :param url: URL to monitor
        :param email: email to send reports to
        :return: JSON encoded result of enable action
        """
        # get_domain_info throws an exception in case of non-existent domain
        d_info = self._domain_info_by_url(url)

        self.continuous_monitoring.enable(d_info.name, url, email)
        return self.response_continuous(action='enable', url=url)

    def disable_continuous(self, url: str) -> 'json str':
        """
        Disable continuous monitoring for given URL
        :param url: URL to monitor
        :return: JSON encoded result of disable action
        """
        domain_name, _ = url_split(url)
        self.continuous_monitoring.disable(domain_name)
        return self.response_continuous(action='disable', url=url)

    def start_continuous(self, url: str) -> 'json str':
        """
        Start continuous monitoring for given URL
        :param url: URL to monitor
        :return: JSON encoded result of start action
        """
        domain_name, _ = url_split(url)
        self.continuous_monitoring.start(domain_name)
        return self.response_continuous(action='start', url=url)

    def stop_continuous(self, url: str) -> 'json str':
        """
        Stop continuous monitoring for given URL
        :param url: URL to monitor
        :return: JSON encoded result of stop action
        """
        domain_name, _ = url_split(url)
        self.continuous_monitoring.stop(domain_name)
        return self.response_continuous(action='stop', url=url)

    def continuous_tracing_list(self) -> 'json str':
        """
        Get list of continuous monitoring tasks
        :return: JSON encoded result of get list action
        """
        tracing_list = self.continuous_monitoring.get_tracing_list()
        return self.response(action='get continuous list',
                             data=tracing_list)

    def tasks_list(self) -> 'json str':
        """
        Get list of tasks
        """
        return self.response(action='tasks-list',
                             data=self.ui_api_client.get_task_list())

    def requests_list(self, task_id: str) -> 'json str':
        """
        Get list of requests for given tracing task id
        """
        # needed for user verification
        client = self.api_client_class(system_id=self.sys_id,
                                       tracing_task_id=task_id)
        _ = client.get_task()
        return self.response(action='requests-list',
                             data=self.ui_api_client.get_request_list(task_id))

    def request_data(self, task_id: str, request_id: int) -> 'json str':
        """
        Get collected statistics for request ID of given tracing task
        """
        # needed for user verification
        client = self.api_client_class(system_id=self.sys_id,
                                       tracing_task_id=task_id)
        _ = client.get_task()
        return self.response(action='request-data',
                             data=self.ui_api_client.get_request_data(
                                 task_id, request_id))

    def enable_user_agent(self) -> 'json str':
        """
        Enable X-Ray User Agent:
            start or restart service if it is accidentally already running
        For systemd systems -- start socket unit only
        For SysV -- start the entire service
        :return: JSON encoded result of enable action
        """
        self.manage_user_agent.enable()
        return self.response(action='enable-user-agent')

    def disable_user_agent(self) -> 'json str':
        """
        Disable X-Ray User Agent:
             stop the entire service
             or do nothing if it is accidentally not running
        For systemd systems -- also check if socket unit is running
                               and stop it too
        :return: JSON encoded result of disable action
        """
        self.manage_user_agent.disable()
        return self.response(action='disable-user-agent')

    def user_agent_status(self) -> 'json str':
        """
        Get status of X-Ray User Agent service
        :return: JSON encoded result of status action
        """
        agent_status = self.manage_user_agent.status()
        return self.response(action='user-agent-status', status=agent_status,
                             user_nginx_cache=nginx_user_cache())

    def advanced_metrics(self, args) -> 'json str':
        """
        Advanced metrics tool
        :return: JSON encoded result of status action
        """
        am = AdvancedMetrics()
        if args['enable'] is True:
            am.enable()
        elif args['disable'] is True:
            am.disable()
        elif args['status'] is True:
            status = am.status()
            return self.response(status=status)

        return self.response()

    def enable_serverwide_mode(self, args) -> 'json str':
        """
        Advanced metrics tool
        :return: JSON encoded result of status action
        """
        create_global_ini_mode_marker()
        create_ini_files()

        return self.response()

    def disable_serverwide_mode(self, args) -> 'json str':
        """
        Advanced metrics tool
        :return: JSON encoded result of status action
        """
        remove_ini_files()
        remove_global_ini_mode_marker()

        return self.response()