localstack/localstack

View on GitHub
localstack/services/s3/v3/provider.py

Summary

Maintainability
F
2 wks
Test Coverage
import base64
import copy
import datetime
import json
import logging
from collections import defaultdict
from io import BytesIO
from operator import itemgetter
from secrets import token_urlsafe
from typing import IO, Optional, Union
from urllib import parse as urlparse

from localstack import config
from localstack.aws.api import CommonServiceException, RequestContext, handler
from localstack.aws.api.s3 import (
    MFA,
    AbortMultipartUploadOutput,
    AccelerateConfiguration,
    AccessControlPolicy,
    AccessDenied,
    AccountId,
    AnalyticsConfiguration,
    AnalyticsId,
    Body,
    Bucket,
    BucketAlreadyExists,
    BucketAlreadyOwnedByYou,
    BucketCannedACL,
    BucketLifecycleConfiguration,
    BucketLoggingStatus,
    BucketName,
    BucketNotEmpty,
    BucketVersioningStatus,
    BypassGovernanceRetention,
    ChecksumAlgorithm,
    ChecksumCRC32,
    ChecksumCRC32C,
    ChecksumSHA1,
    ChecksumSHA256,
    CommonPrefix,
    CompletedMultipartUpload,
    CompleteMultipartUploadOutput,
    ConfirmRemoveSelfBucketAccess,
    ContentMD5,
    CopyObjectOutput,
    CopyObjectRequest,
    CopyObjectResult,
    CopyPartResult,
    CORSConfiguration,
    CreateBucketOutput,
    CreateBucketRequest,
    CreateMultipartUploadOutput,
    CreateMultipartUploadRequest,
    CrossLocationLoggingProhibitted,
    Delete,
    DeletedObject,
    DeleteMarkerEntry,
    DeleteObjectOutput,
    DeleteObjectsOutput,
    DeleteObjectTaggingOutput,
    Delimiter,
    EncodingType,
    Error,
    Expiration,
    FetchOwner,
    GetBucketAccelerateConfigurationOutput,
    GetBucketAclOutput,
    GetBucketAnalyticsConfigurationOutput,
    GetBucketCorsOutput,
    GetBucketEncryptionOutput,
    GetBucketIntelligentTieringConfigurationOutput,
    GetBucketInventoryConfigurationOutput,
    GetBucketLifecycleConfigurationOutput,
    GetBucketLocationOutput,
    GetBucketLoggingOutput,
    GetBucketOwnershipControlsOutput,
    GetBucketPolicyOutput,
    GetBucketPolicyStatusOutput,
    GetBucketReplicationOutput,
    GetBucketRequestPaymentOutput,
    GetBucketTaggingOutput,
    GetBucketVersioningOutput,
    GetBucketWebsiteOutput,
    GetObjectAclOutput,
    GetObjectAttributesOutput,
    GetObjectAttributesParts,
    GetObjectAttributesRequest,
    GetObjectLegalHoldOutput,
    GetObjectLockConfigurationOutput,
    GetObjectOutput,
    GetObjectRequest,
    GetObjectRetentionOutput,
    GetObjectTaggingOutput,
    GetObjectTorrentOutput,
    GetPublicAccessBlockOutput,
    HeadBucketOutput,
    HeadObjectOutput,
    HeadObjectRequest,
    IntelligentTieringConfiguration,
    IntelligentTieringId,
    InvalidArgument,
    InvalidBucketName,
    InvalidDigest,
    InvalidLocationConstraint,
    InvalidObjectState,
    InvalidPartNumber,
    InvalidPartOrder,
    InvalidStorageClass,
    InvalidTargetBucketForLogging,
    InventoryConfiguration,
    InventoryId,
    KeyMarker,
    LifecycleRules,
    ListBucketAnalyticsConfigurationsOutput,
    ListBucketIntelligentTieringConfigurationsOutput,
    ListBucketInventoryConfigurationsOutput,
    ListBucketsOutput,
    ListMultipartUploadsOutput,
    ListObjectsOutput,
    ListObjectsV2Output,
    ListObjectVersionsOutput,
    ListPartsOutput,
    Marker,
    MaxKeys,
    MaxParts,
    MaxUploads,
    MethodNotAllowed,
    MissingSecurityHeader,
    MultipartUpload,
    MultipartUploadId,
    NoSuchBucket,
    NoSuchBucketPolicy,
    NoSuchCORSConfiguration,
    NoSuchKey,
    NoSuchLifecycleConfiguration,
    NoSuchPublicAccessBlockConfiguration,
    NoSuchTagSet,
    NoSuchUpload,
    NoSuchWebsiteConfiguration,
    NotificationConfiguration,
    Object,
    ObjectIdentifier,
    ObjectKey,
    ObjectLockConfiguration,
    ObjectLockConfigurationNotFoundError,
    ObjectLockEnabled,
    ObjectLockLegalHold,
    ObjectLockMode,
    ObjectLockRetention,
    ObjectLockToken,
    ObjectOwnership,
    ObjectVersion,
    ObjectVersionId,
    ObjectVersionStorageClass,
    OptionalObjectAttributesList,
    Owner,
    OwnershipControls,
    OwnershipControlsNotFoundError,
    Part,
    PartNumber,
    PartNumberMarker,
    Policy,
    PostResponse,
    PreconditionFailed,
    Prefix,
    PublicAccessBlockConfiguration,
    PutBucketAclRequest,
    PutObjectAclOutput,
    PutObjectAclRequest,
    PutObjectLegalHoldOutput,
    PutObjectLockConfigurationOutput,
    PutObjectOutput,
    PutObjectRequest,
    PutObjectRetentionOutput,
    PutObjectTaggingOutput,
    ReplicationConfiguration,
    ReplicationConfigurationNotFoundError,
    RequestPayer,
    RequestPaymentConfiguration,
    RestoreObjectOutput,
    RestoreRequest,
    S3Api,
    ServerSideEncryption,
    ServerSideEncryptionConfiguration,
    SkipValidation,
    SSECustomerAlgorithm,
    SSECustomerKey,
    SSECustomerKeyMD5,
    StartAfter,
    StorageClass,
    Tagging,
    Token,
    UploadIdMarker,
    UploadPartCopyOutput,
    UploadPartCopyRequest,
    UploadPartOutput,
    UploadPartRequest,
    VersionIdMarker,
    VersioningConfiguration,
    WebsiteConfiguration,
)
from localstack.aws.handlers import (
    modify_service_response,
    preprocess_request,
    serve_custom_service_request_handlers,
)
from localstack.constants import AWS_REGION_US_EAST_1
from localstack.services.edge import ROUTER
from localstack.services.plugins import ServiceLifecycleHook
from localstack.services.s3.codec import AwsChunkedDecoder
from localstack.services.s3.constants import (
    ALLOWED_HEADER_OVERRIDES,
    ARCHIVES_STORAGE_CLASSES,
    DEFAULT_BUCKET_ENCRYPTION,
)
from localstack.services.s3.cors import S3CorsHandler, s3_cors_request_handler
from localstack.services.s3.exceptions import (
    InvalidBucketState,
    InvalidRequest,
    MalformedPolicy,
    MalformedXML,
    NoSuchConfiguration,
    NoSuchObjectLockConfiguration,
    UnexpectedContent,
)
from localstack.services.s3.notifications import NotificationDispatcher, S3EventNotificationContext
from localstack.services.s3.presigned_url import validate_post_policy
from localstack.services.s3.utils import (
    ObjectRange,
    add_expiration_days_to_datetime,
    create_redirect_for_post_request,
    create_s3_kms_managed_key_for_region,
    etag_to_base_64_content_md5,
    extract_bucket_key_version_id_from_copy_source,
    get_canned_acl,
    get_class_attrs_from_spec_class,
    get_failed_precondition_copy_source,
    get_full_default_bucket_location,
    get_kms_key_arn,
    get_lifecycle_rule_from_object,
    get_owner_for_account_id,
    get_permission_from_header,
    get_retention_from_now,
    get_system_metadata_from_request,
    get_unique_key_id,
    is_bucket_name_valid,
    parse_copy_source_range_header,
    parse_post_object_tagging_xml,
    parse_range_header,
    parse_tagging_header,
    s3_response_handler,
    serialize_expiration_header,
    str_to_rfc_1123_datetime,
    validate_dict_fields,
    validate_failed_precondition,
    validate_kms_key_id,
    validate_tag_set,
)
from localstack.services.s3.v3.models import (
    BucketCorsIndex,
    EncryptionParameters,
    ObjectLockParameters,
    S3Bucket,
    S3DeleteMarker,
    S3Multipart,
    S3Object,
    S3Part,
    S3Store,
    VersionedKeyStore,
    s3_stores,
)
from localstack.services.s3.v3.storage.core import LimitedIterableStream, S3ObjectStore
from localstack.services.s3.v3.storage.ephemeral import EphemeralS3ObjectStore
from localstack.services.s3.validation import (
    parse_grants_in_headers,
    validate_acl_acp,
    validate_bucket_analytics_configuration,
    validate_bucket_intelligent_tiering_configuration,
    validate_canned_acl,
    validate_cors_configuration,
    validate_inventory_configuration,
    validate_lifecycle_configuration,
    validate_object_key,
    validate_website_configuration,
)
from localstack.services.s3.website_hosting import register_website_hosting_routes
from localstack.state import AssetDirectory, StateVisitor
from localstack.utils.aws.arns import s3_bucket_name
from localstack.utils.strings import short_uid, to_bytes, to_str

LOG = logging.getLogger(__name__)

STORAGE_CLASSES = get_class_attrs_from_spec_class(StorageClass)
SSE_ALGORITHMS = get_class_attrs_from_spec_class(ServerSideEncryption)
OBJECT_OWNERSHIPS = get_class_attrs_from_spec_class(ObjectOwnership)

DEFAULT_S3_TMP_DIR = "/tmp/localstack-s3-storage"


class S3Provider(S3Api, ServiceLifecycleHook):
    def __init__(self, storage_backend: S3ObjectStore = None) -> None:
        super().__init__()
        self._storage_backend = storage_backend or EphemeralS3ObjectStore(DEFAULT_S3_TMP_DIR)
        self._notification_dispatcher = NotificationDispatcher()
        self._cors_handler = S3CorsHandler(BucketCorsIndex())

        # runtime cache of Lifecycle Expiration headers, as they need to be calculated everytime we fetch an object
        # in case the rules have changed
        self._expiration_cache: dict[BucketName, dict[ObjectKey, Expiration]] = defaultdict(dict)

    def on_after_init(self):
        preprocess_request.append(self._cors_handler)
        serve_custom_service_request_handlers.append(s3_cors_request_handler)
        modify_service_response.append(self.service, s3_response_handler)
        register_website_hosting_routes(router=ROUTER)

    def accept_state_visitor(self, visitor: StateVisitor):
        visitor.visit(s3_stores)
        visitor.visit(AssetDirectory(self.service, self._storage_backend.root_directory))

    def on_before_state_save(self):
        self._storage_backend.flush()

    def on_after_state_reset(self):
        self._cors_handler.invalidate_cache()

    def on_after_state_load(self):
        self._cors_handler.invalidate_cache()

    def on_before_stop(self):
        self._notification_dispatcher.shutdown()
        self._storage_backend.close()

    def _notify(
        self,
        context: RequestContext,
        s3_bucket: S3Bucket,
        s3_object: S3Object | S3DeleteMarker = None,
        s3_notif_ctx: S3EventNotificationContext = None,
    ):
        """
        :param context: the RequestContext, to retrieve more information about the incoming notification
        :param s3_bucket: the S3Bucket object
        :param s3_object: the S3Object object if S3EventNotificationContext is not given
        :param s3_notif_ctx: S3EventNotificationContext, in case we need specific data only available in the API call
        :return:
        """
        if s3_bucket.notification_configuration:
            if not s3_notif_ctx:
                s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
                    context,
                    s3_bucket=s3_bucket,
                    s3_object=s3_object,
                )

            self._notification_dispatcher.send_notifications(
                s3_notif_ctx, s3_bucket.notification_configuration
            )

    def _verify_notification_configuration(
        self,
        notification_configuration: NotificationConfiguration,
        skip_destination_validation: SkipValidation,
        context: RequestContext,
        bucket_name: str,
    ):
        self._notification_dispatcher.verify_configuration(
            notification_configuration, skip_destination_validation, context, bucket_name
        )

    def _get_expiration_header(
        self,
        lifecycle_rules: LifecycleRules,
        bucket: BucketName,
        s3_object: S3Object,
        object_tags: dict[str, str],
    ) -> Expiration:
        """
        This method will check if the key matches a Lifecycle filter, and return the serializer header if that's
        the case. We're caching it because it can change depending on the set rules on the bucket.
        We can't use `lru_cache` as the parameters needs to be hashable
        :param lifecycle_rules: the bucket LifecycleRules
        :param s3_object: S3Object
        :param object_tags: the object tags
        :return: the Expiration header if there's a rule matching
        """
        if cached_exp := self._expiration_cache.get(bucket, {}).get(s3_object.key):
            return cached_exp

        if lifecycle_rule := get_lifecycle_rule_from_object(
            lifecycle_rules, s3_object.key, s3_object.size, object_tags
        ):
            expiration_header = serialize_expiration_header(
                lifecycle_rule["ID"],
                lifecycle_rule["Expiration"],
                s3_object.last_modified,
            )
            self._expiration_cache[bucket][s3_object.key] = expiration_header
            return expiration_header

    def _get_cross_account_bucket(
        self, context: RequestContext, bucket_name: BucketName
    ) -> tuple[S3Store, S3Bucket]:
        store = self.get_store(context.account_id, context.region)
        if not (s3_bucket := store.buckets.get(bucket_name)):
            if not (account_id := store.global_bucket_map.get(bucket_name)):
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)

            store = self.get_store(account_id, context.region)
            if not (s3_bucket := store.buckets.get(bucket_name)):
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)

        return store, s3_bucket

    @staticmethod
    def get_store(account_id: str, region_name: str) -> S3Store:
        # Use default account id for external access? would need an anonymous one
        return s3_stores[account_id][region_name]

    @handler("CreateBucket", expand=False)
    def create_bucket(
        self,
        context: RequestContext,
        request: CreateBucketRequest,
    ) -> CreateBucketOutput:
        bucket_name = request["Bucket"]

        if not is_bucket_name_valid(bucket_name):
            raise InvalidBucketName("The specified bucket is not valid.", BucketName=bucket_name)

        # the XML parser returns an empty dict if the body contains the following:
        # <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />
        # but it also returns an empty dict if the body is fully empty. We need to differentiate the 2 cases by checking
        # if the body is empty or not
        if context.request.data and (
            (create_bucket_configuration := request.get("CreateBucketConfiguration")) is not None
        ):
            if not (bucket_region := create_bucket_configuration.get("LocationConstraint")):
                raise MalformedXML()

            if bucket_region == "us-east-1":
                raise InvalidLocationConstraint(
                    "The specified location-constraint is not valid",
                    LocationConstraint=bucket_region,
                )
        else:
            bucket_region = "us-east-1"
            if context.region != bucket_region:
                raise CommonServiceException(
                    code="IllegalLocationConstraintException",
                    message="The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.",
                )

        store = self.get_store(context.account_id, bucket_region)

        if bucket_name in store.global_bucket_map:
            existing_bucket_owner = store.global_bucket_map[bucket_name]
            if existing_bucket_owner != context.account_id:
                raise BucketAlreadyExists()

            # if the existing bucket has the same owner, the behaviour will depend on the region
            if bucket_region != "us-east-1":
                raise BucketAlreadyOwnedByYou(
                    "Your previous request to create the named bucket succeeded and you already own it.",
                    BucketName=bucket_name,
                )
            else:
                # CreateBucket is idempotent in us-east-1
                return CreateBucketOutput(Location=f"/{bucket_name}")

        if (
            object_ownership := request.get("ObjectOwnership")
        ) is not None and object_ownership not in OBJECT_OWNERSHIPS:
            raise InvalidArgument(
                f"Invalid x-amz-object-ownership header: {object_ownership}",
                ArgumentName="x-amz-object-ownership",
            )
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
        owner = get_owner_for_account_id(context.account_id)
        acl = get_access_control_policy_for_new_resource_request(request, owner=owner)
        s3_bucket = S3Bucket(
            name=bucket_name,
            account_id=context.account_id,
            bucket_region=bucket_region,
            owner=owner,
            acl=acl,
            object_ownership=request.get("ObjectOwnership"),
            object_lock_enabled_for_bucket=request.get("ObjectLockEnabledForBucket"),
        )

        store.buckets[bucket_name] = s3_bucket
        store.global_bucket_map[bucket_name] = s3_bucket.bucket_account_id
        self._cors_handler.invalidate_cache()
        self._storage_backend.create_bucket(bucket_name)

        # Location is always contained in response -> full url for LocationConstraint outside us-east-1
        location = (
            f"/{bucket_name}"
            if bucket_region == "us-east-1"
            else get_full_default_bucket_location(bucket_name)
        )
        response = CreateBucketOutput(Location=location)
        return response

    def delete_bucket(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        # the bucket still contains objects
        if not s3_bucket.objects.is_empty():
            message = "The bucket you tried to delete is not empty"
            if s3_bucket.versioning_status:
                message += ". You must delete all versions in the bucket."
            raise BucketNotEmpty(
                message,
                BucketName=bucket,
            )

        store.buckets.pop(bucket)
        store.global_bucket_map.pop(bucket)
        self._cors_handler.invalidate_cache()
        self._expiration_cache.pop(bucket, None)
        # clean up the storage backend
        self._storage_backend.delete_bucket(bucket)

    def list_buckets(self, context: RequestContext, **kwargs) -> ListBucketsOutput:
        owner = get_owner_for_account_id(context.account_id)
        store = self.get_store(context.account_id, context.region)
        buckets = [
            Bucket(Name=bucket.name, CreationDate=bucket.creation_date)
            for bucket in store.buckets.values()
        ]
        return ListBucketsOutput(Owner=owner, Buckets=buckets)

    def head_bucket(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> HeadBucketOutput:
        store = self.get_store(context.account_id, context.region)
        if not (s3_bucket := store.buckets.get(bucket)):
            if not (account_id := store.global_bucket_map.get(bucket)):
                # just to return the 404 error message
                raise NoSuchBucket()

            store = self.get_store(account_id, context.region)
            if not (s3_bucket := store.buckets.get(bucket)):
                # just to return the 404 error message
                raise NoSuchBucket()

        # TODO: this call is also used to check if the user has access/authorization for the bucket
        #  it can return 403
        return HeadBucketOutput(BucketRegion=s3_bucket.bucket_region)

    def get_bucket_location(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketLocationOutput:
        """
        When implementing the ASF provider, this operation is implemented because:
        - The spec defines a root element GetBucketLocationOutput containing a LocationConstraint member, where
          S3 actually just returns the LocationConstraint on the root level (only operation so far that we know of).
        - We circumvent the root level element here by patching the spec such that this operation returns a
          single "payload" (the XML body response), which causes the serializer to directly take the payload element.
        - The above "hack" causes the fix in the serializer to not be picked up here as we're passing the XML body as
          the payload, which is why we need to manually do this here by manipulating the string.
        Botocore implements this hack for parsing the response in `botocore.handlers.py#parse_get_bucket_location`
        """
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        location_constraint = (
            '<?xml version="1.0" encoding="UTF-8"?>\n'
            '<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">{{location}}</LocationConstraint>'
        )

        location = s3_bucket.bucket_region if s3_bucket.bucket_region != "us-east-1" else ""
        location_constraint = location_constraint.replace("{{location}}", location)

        response = GetBucketLocationOutput(LocationConstraint=location_constraint)
        return response

    @handler("PutObject", expand=False)
    def put_object(
        self,
        context: RequestContext,
        request: PutObjectRequest,
    ) -> PutObjectOutput:
        # TODO: validate order of validation
        # TODO: still need to handle following parameters
        #  request_payer: RequestPayer = None,
        bucket_name = request["Bucket"]
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)

        if (storage_class := request.get("StorageClass")) is not None and (
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
        ):
            raise InvalidStorageClass(
                "The storage class you specified is not valid", StorageClassRequested=storage_class
            )

        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
            validate_kms_key_id(sse_kms_key_id, s3_bucket)

        key = request["Key"]

        validate_object_key(key)

        system_metadata = get_system_metadata_from_request(request)
        if not system_metadata.get("ContentType"):
            system_metadata["ContentType"] = "binary/octet-stream"

        version_id = generate_version_id(s3_bucket.versioning_status)

        checksum_algorithm = request.get("ChecksumAlgorithm")
        checksum_value = (
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
        )

        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
            request,
            s3_bucket,
            store,
        )

        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)

        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)

        if tagging := request.get("Tagging"):
            tagging = parse_tagging_header(tagging)

        s3_object = S3Object(
            key=key,
            version_id=version_id,
            storage_class=storage_class,
            expires=request.get("Expires"),
            user_metadata=request.get("Metadata"),
            system_metadata=system_metadata,
            checksum_algorithm=checksum_algorithm,
            checksum_value=checksum_value,
            encryption=encryption_parameters.encryption,
            kms_key_id=encryption_parameters.kms_key_id,
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
            lock_mode=lock_parameters.lock_mode,
            lock_legal_status=lock_parameters.lock_legal_status,
            lock_until=lock_parameters.lock_until,
            website_redirect_location=request.get("WebsiteRedirectLocation"),
            acl=acl,
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
        )

        body = request.get("Body")
        # check if chunked request
        headers = context.request.headers
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
            "STREAMING-"
        ) or "aws-chunked" in headers.get("content-encoding", "")
        if is_aws_chunked:
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
            body = AwsChunkedDecoder(body, decoded_content_length, s3_object=s3_object)

        with self._storage_backend.open(bucket_name, s3_object, mode="w") as s3_stored_object:
            s3_stored_object.write(body)

            if checksum_algorithm and s3_object.checksum_value != s3_stored_object.checksum:
                self._storage_backend.remove(bucket_name, s3_object)
                raise InvalidRequest(
                    f"Value for x-amz-checksum-{checksum_algorithm.lower()} header is invalid."
                )

            # TODO: handle ContentMD5 and ChecksumAlgorithm in a handler for all requests except requests with a
            #  streaming body. We can use the specs to verify which operations needs to have the checksum validated
            if content_md5 := request.get("ContentMD5"):
                calculated_md5 = etag_to_base_64_content_md5(s3_stored_object.etag)
                if calculated_md5 != content_md5:
                    self._storage_backend.remove(bucket_name, s3_object)
                    raise InvalidDigest(
                        "The Content-MD5 you specified was invalid.",
                        Content_MD5=content_md5,
                    )

            s3_bucket.objects.set(key, s3_object)

        # in case we are overriding an object, delete the tags entry
        key_id = get_unique_key_id(bucket_name, key, version_id)
        store.TAGS.tags.pop(key_id, None)
        if tagging:
            store.TAGS.tags[key_id] = tagging

        # RequestCharged: Optional[RequestCharged]  # TODO
        response = PutObjectOutput(
            ETag=s3_object.quoted_etag,
        )
        if s3_bucket.versioning_status == "Enabled":
            response["VersionId"] = s3_object.version_id

        if s3_object.checksum_algorithm:
            response[f"Checksum{checksum_algorithm.upper()}"] = s3_object.checksum_value

        if s3_bucket.lifecycle_rules:
            if expiration_header := self._get_expiration_header(
                s3_bucket.lifecycle_rules,
                bucket_name,
                s3_object,
                store.TAGS.tags.get(key_id, {}),
            ):
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
                #  apply them everytime we get/head an object
                response["Expiration"] = expiration_header

        add_encryption_to_response(response, s3_object=s3_object)
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)

        return response

    @handler("GetObject", expand=False)
    def get_object(
        self,
        context: RequestContext,
        request: GetObjectRequest,
    ) -> GetObjectOutput:
        # TODO: missing handling parameters:
        #  request_payer: RequestPayer = None,
        #  expected_bucket_owner: AccountId = None,

        bucket_name = request["Bucket"]
        object_key = request["Key"]
        version_id = request.get("VersionId")
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)

        s3_object = s3_bucket.get_object(
            key=object_key,
            version_id=version_id,
            http_method="GET",
        )
        if s3_object.expires and s3_object.expires < datetime.datetime.now(
            tz=s3_object.expires.tzinfo
        ):
            # TODO: old behaviour was deleting key instantly if expired. AWS cleans up only once a day generally
            #  you can still HeadObject on it and you get the expiry time until scheduled deletion
            kwargs = {"Key": object_key}
            if version_id:
                kwargs["VersionId"] = version_id
            raise NoSuchKey("The specified key does not exist.", **kwargs)

        if s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not s3_object.restore:
            raise InvalidObjectState(
                "The operation is not valid for the object's storage class",
                StorageClass=s3_object.storage_class,
            )

        if not config.S3_SKIP_KMS_KEY_VALIDATION and s3_object.kms_key_id:
            validate_kms_key_id(kms_key=s3_object.kms_key_id, bucket=s3_bucket)

        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)

        # we deliberately do not call `.close()` on the s3_stored_object to keep the read lock acquired. When passing
        # the object to Werkzeug, the handler will call `.close()` after finishing iterating over `__iter__`.
        s3_stored_object = self._storage_backend.open(bucket_name, s3_object, mode="r")

        # TODO: remove this with 3.3, this is for persistence reason
        if not hasattr(s3_object, "internal_last_modified"):
            s3_object.internal_last_modified = s3_stored_object.last_modified
        # this is a hacky way to verify the object hasn't been modified between `s3_object = s3_bucket.get_object`
        # and the storage backend call. If it has been modified, now that we're in the read lock, we can safely fetch
        # the object again
        if s3_stored_object.last_modified != s3_object.internal_last_modified:
            s3_object = s3_bucket.get_object(
                key=object_key,
                version_id=version_id,
                http_method="GET",
            )

        response = GetObjectOutput(
            AcceptRanges="bytes",
            **s3_object.get_system_metadata_fields(),
        )
        if s3_object.user_metadata:
            response["Metadata"] = s3_object.user_metadata

        if s3_object.parts and request.get("PartNumber"):
            response["PartsCount"] = len(s3_object.parts)

        if s3_object.version_id:
            response["VersionId"] = s3_object.version_id

        if s3_object.website_redirect_location:
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location

        if s3_object.restore:
            response["Restore"] = s3_object.restore

        if checksum_algorithm := s3_object.checksum_algorithm:
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
                response[f"Checksum{checksum_algorithm.upper()}"] = s3_object.checksum_value

        range_header = request.get("Range")
        part_number = request.get("PartNumber")
        if range_header and part_number:
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
        range_data = None
        if range_header:
            range_data = parse_range_header(range_header, s3_object.size)
        elif part_number:
            range_data = get_part_range(s3_object, part_number)

        if range_data:
            s3_stored_object.seek(range_data.begin)
            response["Body"] = LimitedIterableStream(
                s3_stored_object, max_length=range_data.content_length
            )
            response["ContentRange"] = range_data.content_range
            response["ContentLength"] = range_data.content_length
            response["StatusCode"] = 206
        else:
            response["Body"] = s3_stored_object

        add_encryption_to_response(response, s3_object=s3_object)

        if object_tags := store.TAGS.tags.get(
            get_unique_key_id(bucket_name, object_key, version_id)
        ):
            response["TagCount"] = len(object_tags)

        if s3_object.is_current and s3_bucket.lifecycle_rules:
            if expiration_header := self._get_expiration_header(
                s3_bucket.lifecycle_rules,
                bucket_name,
                s3_object,
                object_tags,
            ):
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
                #  apply them everytime we get/head an object
                response["Expiration"] = expiration_header

        # TODO: missing returned fields
        #     RequestCharged: Optional[RequestCharged]
        #     ReplicationStatus: Optional[ReplicationStatus]

        if s3_object.lock_mode:
            response["ObjectLockMode"] = s3_object.lock_mode
            if s3_object.lock_until:
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
        if s3_object.lock_legal_status:
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status

        for request_param, response_param in ALLOWED_HEADER_OVERRIDES.items():
            if request_param_value := request.get(request_param):
                response[response_param] = request_param_value

        return response

    @handler("HeadObject", expand=False)
    def head_object(
        self,
        context: RequestContext,
        request: HeadObjectRequest,
    ) -> HeadObjectOutput:
        bucket_name = request["Bucket"]
        object_key = request["Key"]
        version_id = request.get("VersionId")
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)

        s3_object = s3_bucket.get_object(
            key=object_key,
            version_id=version_id,
            http_method="HEAD",
        )

        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)

        response = HeadObjectOutput(
            AcceptRanges="bytes",
            **s3_object.get_system_metadata_fields(),
        )
        if s3_object.user_metadata:
            response["Metadata"] = s3_object.user_metadata

        if checksum_algorithm := s3_object.checksum_algorithm:
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
                response[f"Checksum{checksum_algorithm.upper()}"] = s3_object.checksum_value

        if s3_object.parts:
            response["PartsCount"] = len(s3_object.parts)

        if s3_object.version_id:
            response["VersionId"] = s3_object.version_id

        if s3_object.website_redirect_location:
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location

        if s3_object.restore:
            response["Restore"] = s3_object.restore

        range_header = request.get("Range")
        part_number = request.get("PartNumber")
        if range_header and part_number:
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
        range_data = None
        if range_header:
            range_data = parse_range_header(range_header, s3_object.size)
        elif part_number:
            range_data = get_part_range(s3_object, part_number)

        if range_data:
            response["ContentLength"] = range_data.content_length
            response["StatusCode"] = 206

        add_encryption_to_response(response, s3_object=s3_object)

        # if you specify the VersionId, AWS won't return the Expiration header, even if that's the current version
        if not version_id and s3_bucket.lifecycle_rules:
            object_tags = store.TAGS.tags.get(
                get_unique_key_id(bucket_name, object_key, s3_object.version_id)
            )
            if expiration_header := self._get_expiration_header(
                s3_bucket.lifecycle_rules,
                bucket_name,
                s3_object,
                object_tags,
            ):
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
                #  apply them everytime we get/head an object
                response["Expiration"] = expiration_header

        if s3_object.lock_mode:
            response["ObjectLockMode"] = s3_object.lock_mode
            if s3_object.lock_until:
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
        if s3_object.lock_legal_status:
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status

        # TODO: missing return fields:
        #  ArchiveStatus: Optional[ArchiveStatus]
        #  RequestCharged: Optional[RequestCharged]
        #  ReplicationStatus: Optional[ReplicationStatus]

        return response

    def delete_object(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        mfa: MFA = None,
        version_id: ObjectVersionId = None,
        request_payer: RequestPayer = None,
        bypass_governance_retention: BypassGovernanceRetention = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> DeleteObjectOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
            raise InvalidArgument(
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
                ArgumentName="x-amz-bypass-governance-retention",
            )

        if s3_bucket.versioning_status is None:
            if version_id and version_id != "null":
                raise InvalidArgument(
                    "Invalid version id specified",
                    ArgumentName="versionId",
                    ArgumentValue=version_id,
                )

            found_object = s3_bucket.objects.pop(key, None)
            # TODO: RequestCharged
            if found_object:
                self._storage_backend.remove(bucket, found_object)
                self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
                store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)

            return DeleteObjectOutput()

        if not version_id:
            delete_marker_id = generate_version_id(s3_bucket.versioning_status)
            delete_marker = S3DeleteMarker(key=key, version_id=delete_marker_id)
            s3_bucket.objects.set(key, delete_marker)
            # TODO: make a proper difference between DeleteMarker and S3Object, not done yet
            #  s3:ObjectRemoved:DeleteMarkerCreated
            self._notify(context, s3_bucket=s3_bucket, s3_object=delete_marker)

            return DeleteObjectOutput(VersionId=delete_marker.version_id, DeleteMarker=True)

        if key not in s3_bucket.objects:
            return DeleteObjectOutput()

        if not (s3_object := s3_bucket.objects.get(key, version_id)):
            raise InvalidArgument(
                "Invalid version id specified",
                ArgumentName="versionId",
                ArgumentValue=version_id,
            )

        if s3_object.is_locked(bypass_governance_retention):
            raise AccessDenied("Access Denied")

        s3_bucket.objects.pop(object_key=key, version_id=version_id)
        response = DeleteObjectOutput(VersionId=s3_object.version_id)

        if isinstance(s3_object, S3DeleteMarker):
            response["DeleteMarker"] = True
        else:
            self._storage_backend.remove(bucket, s3_object)
            self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
            store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)

        return response

    def delete_objects(
        self,
        context: RequestContext,
        bucket: BucketName,
        delete: Delete,
        mfa: MFA = None,
        request_payer: RequestPayer = None,
        bypass_governance_retention: BypassGovernanceRetention = None,
        expected_bucket_owner: AccountId = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        **kwargs,
    ) -> DeleteObjectsOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
            raise InvalidArgument(
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
                ArgumentName="x-amz-bypass-governance-retention",
            )

        objects: list[ObjectIdentifier] = delete.get("Objects")
        if not objects:
            raise MalformedXML()

        # TODO: max 1000 delete at once? test against AWS?

        quiet = delete.get("Quiet", False)
        deleted = []
        errors = []

        to_remove = []
        for to_delete_object in objects:
            object_key = to_delete_object.get("Key")
            version_id = to_delete_object.get("VersionId")
            if s3_bucket.versioning_status is None:
                if version_id and version_id != "null":
                    errors.append(
                        Error(
                            Code="NoSuchVersion",
                            Key=object_key,
                            Message="The specified version does not exist.",
                            VersionId=version_id,
                        )
                    )
                    continue

                found_object = s3_bucket.objects.pop(object_key, None)
                if found_object:
                    to_remove.append(found_object)
                    self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
                    store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
                # small hack to not create a fake object for nothing
                elif s3_bucket.notification_configuration:
                    # DeleteObjects is a bit weird, even if the object didn't exist, S3 will trigger a notification
                    # for a non-existing object being deleted
                    self._notify(
                        context, s3_bucket=s3_bucket, s3_object=S3Object(key=object_key, etag="")
                    )

                if not quiet:
                    deleted.append(DeletedObject(Key=object_key))

                continue

            if not version_id:
                delete_marker_id = generate_version_id(s3_bucket.versioning_status)
                delete_marker = S3DeleteMarker(key=object_key, version_id=delete_marker_id)
                s3_bucket.objects.set(object_key, delete_marker)
                # TODO: make a difference between DeleteMarker and S3Object
                self._notify(context, s3_bucket=s3_bucket, s3_object=delete_marker)
                if not quiet:
                    deleted.append(
                        DeletedObject(
                            DeleteMarker=True,
                            DeleteMarkerVersionId=delete_marker_id,
                            Key=object_key,
                        )
                    )
                continue

            if not (
                found_object := s3_bucket.objects.get(object_key=object_key, version_id=version_id)
            ):
                errors.append(
                    Error(
                        Code="NoSuchVersion",
                        Key=object_key,
                        Message="The specified version does not exist.",
                        VersionId=version_id,
                    )
                )
                continue

            if found_object.is_locked(bypass_governance_retention):
                errors.append(
                    Error(
                        Code="AccessDenied",
                        Key=object_key,
                        Message="Access Denied",
                        VersionId=version_id,
                    )
                )
                continue

            s3_bucket.objects.pop(object_key=object_key, version_id=version_id)
            if not quiet:
                deleted_object = DeletedObject(
                    Key=object_key,
                    VersionId=version_id,
                )
                if isinstance(found_object, S3DeleteMarker):
                    deleted_object["DeleteMarker"] = True
                    deleted_object["DeleteMarkerVersionId"] = found_object.version_id

                deleted.append(deleted_object)

            if isinstance(found_object, S3Object):
                to_remove.append(found_object)

            self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
            store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)

        # TODO: request charged
        self._storage_backend.remove(bucket, to_remove)
        response: DeleteObjectsOutput = {}
        # AWS validated: the list of Deleted objects is unordered, multiple identical calls can return different results
        if errors:
            response["Errors"] = errors
        if not quiet:
            response["Deleted"] = deleted

        return response

    @handler("CopyObject", expand=False)
    def copy_object(
        self,
        context: RequestContext,
        request: CopyObjectRequest,
    ) -> CopyObjectOutput:
        # request_payer: RequestPayer = None,  # TODO:
        dest_bucket = request["Bucket"]
        dest_key = request["Key"]
        validate_object_key(dest_key)
        store, dest_s3_bucket = self._get_cross_account_bucket(context, dest_bucket)

        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
            request.get("CopySource")
        )
        _, src_s3_bucket = self._get_cross_account_bucket(context, src_bucket)

        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
            validate_kms_key_id(sse_kms_key_id, dest_s3_bucket)

        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
        try:
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
        except MethodNotAllowed:
            raise InvalidRequest(
                "The source of a copy request may not specifically refer to a delete marker by version id."
            )

        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
            raise InvalidObjectState(
                "Operation is not valid for the source object's storage class",
                StorageClass=src_s3_object.storage_class,
            )

        if failed_condition := get_failed_precondition_copy_source(
            request, src_s3_object.last_modified, src_s3_object.etag
        ):
            raise PreconditionFailed(
                "At least one of the pre-conditions you specified did not hold",
                Condition=failed_condition,
            )

        # TODO validate order of validation
        storage_class = request.get("StorageClass")
        server_side_encryption = request.get("ServerSideEncryption")
        metadata_directive = request.get("MetadataDirective")
        website_redirect_location = request.get("WebsiteRedirectLocation")
        # we need to check for identity of the object, to see if the default one has been changed
        is_default_encryption = (
            dest_s3_bucket.encryption_rule is DEFAULT_BUCKET_ENCRYPTION
            and src_s3_object.encryption == "AES256"
        )
        if (
            src_bucket == dest_bucket
            and src_key == dest_key
            and not any(
                (
                    storage_class,
                    server_side_encryption,
                    metadata_directive == "REPLACE",
                    website_redirect_location,
                    dest_s3_bucket.encryption_rule
                    and not is_default_encryption,  # S3 will allow copy in place if the bucket has encryption configured
                    src_s3_object.restore,
                )
            )
        ):
            raise InvalidRequest(
                "This copy request is illegal because it is trying to copy an object to itself without changing the "
                "object's metadata, storage class, website redirect location or encryption attributes."
            )

        if tagging := request.get("Tagging"):
            tagging = parse_tagging_header(tagging)

        if metadata_directive == "REPLACE":
            user_metadata = request.get("Metadata")
            system_metadata = get_system_metadata_from_request(request)
            if not system_metadata.get("ContentType"):
                system_metadata["ContentType"] = "binary/octet-stream"
        else:
            user_metadata = src_s3_object.user_metadata
            system_metadata = src_s3_object.system_metadata

        dest_version_id = generate_version_id(dest_s3_bucket.versioning_status)

        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
            request,
            dest_s3_bucket,
            store,
        )
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(
            request, dest_s3_bucket
        )

        acl = get_access_control_policy_for_new_resource_request(
            request, owner=dest_s3_bucket.owner
        )

        s3_object = S3Object(
            key=dest_key,
            size=src_s3_object.size,
            version_id=dest_version_id,
            storage_class=storage_class,
            expires=request.get("Expires"),
            user_metadata=user_metadata,
            system_metadata=system_metadata,
            checksum_algorithm=request.get("ChecksumAlgorithm") or src_s3_object.checksum_algorithm,
            encryption=encryption_parameters.encryption,
            kms_key_id=encryption_parameters.kms_key_id,
            bucket_key_enabled=request.get(
                "BucketKeyEnabled"
            ),  # CopyObject does not inherit from the bucket here
            lock_mode=lock_parameters.lock_mode,
            lock_legal_status=lock_parameters.lock_legal_status,
            lock_until=lock_parameters.lock_until,
            website_redirect_location=website_redirect_location,
            expiration=None,  # TODO, from lifecycle
            acl=acl,
            owner=dest_s3_bucket.owner,
        )

        with self._storage_backend.copy(
            src_bucket=src_bucket,
            src_object=src_s3_object,
            dest_bucket=dest_bucket,
            dest_object=s3_object,
        ) as s3_stored_object:
            s3_object.checksum_value = s3_stored_object.checksum or src_s3_object.checksum_value
            s3_object.etag = s3_stored_object.etag or src_s3_object.etag

            dest_s3_bucket.objects.set(dest_key, s3_object)

        dest_key_id = get_unique_key_id(dest_bucket, dest_key, dest_version_id)
        if (request.get("TaggingDirective")) == "REPLACE":
            store.TAGS.tags[dest_key_id] = tagging
        else:
            src_key_id = get_unique_key_id(src_bucket, src_key, src_version_id)
            store.TAGS.tags[dest_key_id] = copy.copy(store.TAGS.tags.get(src_key_id, {}))

        copy_object_result = CopyObjectResult(
            ETag=s3_object.quoted_etag,
            LastModified=s3_object.last_modified,
        )
        if s3_object.checksum_algorithm:
            copy_object_result[f"Checksum{s3_object.checksum_algorithm.upper()}"] = (
                s3_object.checksum_value
            )

        response = CopyObjectOutput(
            CopyObjectResult=copy_object_result,
        )

        if s3_object.version_id:
            response["VersionId"] = s3_object.version_id

        if s3_object.expiration:
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime

        add_encryption_to_response(response, s3_object=s3_object)

        if src_s3_bucket.versioning_status and src_s3_object.version_id:
            response["CopySourceVersionId"] = src_s3_object.version_id

        # RequestCharged: Optional[RequestCharged] # TODO
        self._notify(context, s3_bucket=dest_s3_bucket, s3_object=s3_object)

        return response

    def list_objects(
        self,
        context: RequestContext,
        bucket: BucketName,
        delimiter: Delimiter = None,
        encoding_type: EncodingType = None,
        marker: Marker = None,
        max_keys: MaxKeys = None,
        prefix: Prefix = None,
        request_payer: RequestPayer = None,
        expected_bucket_owner: AccountId = None,
        optional_object_attributes: OptionalObjectAttributesList = None,
        **kwargs,
    ) -> ListObjectsOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        common_prefixes = set()
        count = 0
        is_truncated = False
        next_key_marker = None
        max_keys = max_keys or 1000
        prefix = prefix or ""
        delimiter = delimiter or ""
        if encoding_type:
            prefix = urlparse.quote(prefix)
            delimiter = urlparse.quote(delimiter)

        s3_objects: list[Object] = []

        all_keys = sorted(s3_bucket.objects.values(), key=lambda r: r.key)
        last_key = all_keys[-1] if all_keys else None

        # sort by key
        for s3_object in all_keys:
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
            # skip all keys that alphabetically come before key_marker
            if marker:
                if key <= marker:
                    continue

            # Filter for keys that start with prefix
            if prefix and not key.startswith(prefix):
                continue

            # see ListObjectsV2 for the logic comments (shared logic here)
            prefix_including_delimiter = None
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"

                if prefix_including_delimiter in common_prefixes or (
                    marker and marker.startswith(prefix_including_delimiter)
                ):
                    continue

            if prefix_including_delimiter:
                common_prefixes.add(prefix_including_delimiter)
            else:
                # TODO: add RestoreStatus if present
                object_data = Object(
                    Key=key,
                    ETag=s3_object.quoted_etag,
                    Owner=s3_bucket.owner,  # TODO: verify reality
                    Size=s3_object.size,
                    LastModified=s3_object.last_modified,
                    StorageClass=s3_object.storage_class,
                )

                if s3_object.checksum_algorithm:
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]

                s3_objects.append(object_data)

            # we just added a CommonPrefix or an Object, increase the counter
            count += 1
            if count >= max_keys and last_key.key != s3_object.key:
                is_truncated = True
                if prefix_including_delimiter:
                    next_key_marker = prefix_including_delimiter
                elif s3_objects:
                    next_key_marker = s3_objects[-1]["Key"]
                break

        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]

        response = ListObjectsOutput(
            IsTruncated=is_truncated,
            Name=bucket,
            MaxKeys=max_keys,
            Prefix=prefix or "",
            Marker=marker or "",
        )
        if s3_objects:
            response["Contents"] = s3_objects
        if encoding_type:
            response["EncodingType"] = EncodingType.url
        if delimiter:
            response["Delimiter"] = delimiter
        if common_prefixes:
            response["CommonPrefixes"] = common_prefixes
        if delimiter and next_key_marker:
            response["NextMarker"] = next_key_marker
        if s3_bucket.bucket_region != "us-east-1":
            response["BucketRegion"] = s3_bucket.bucket_region

        # RequestCharged: Optional[RequestCharged]  # TODO
        return response

    def list_objects_v2(
        self,
        context: RequestContext,
        bucket: BucketName,
        delimiter: Delimiter = None,
        encoding_type: EncodingType = None,
        max_keys: MaxKeys = None,
        prefix: Prefix = None,
        continuation_token: Token = None,
        fetch_owner: FetchOwner = None,
        start_after: StartAfter = None,
        request_payer: RequestPayer = None,
        expected_bucket_owner: AccountId = None,
        optional_object_attributes: OptionalObjectAttributesList = None,
        **kwargs,
    ) -> ListObjectsV2Output:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if continuation_token == "":
            raise InvalidArgument(
                "The continuation token provided is incorrect",
                ArgumentName="continuation-token",
            )

        common_prefixes = set()
        count = 0
        is_truncated = False
        next_continuation_token = None
        max_keys = max_keys or 1000
        prefix = prefix or ""
        delimiter = delimiter or ""
        if encoding_type:
            prefix = urlparse.quote(prefix)
            delimiter = urlparse.quote(delimiter)
        decoded_continuation_token = (
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
            if continuation_token
            else None
        )

        s3_objects: list[Object] = []

        # sort by key
        for s3_object in sorted(s3_bucket.objects.values(), key=lambda r: r.key):
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key

            # skip all keys that alphabetically come before continuation_token
            if continuation_token:
                if key < decoded_continuation_token:
                    continue

            elif start_after:
                if key <= start_after:
                    continue

            # Filter for keys that start with prefix
            if prefix and not key.startswith(prefix):
                continue

            # separate keys that contain the same string between the prefix and the first occurrence of the delimiter
            prefix_including_delimiter = None
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"

                # if the CommonPrefix is already in the CommonPrefixes, it doesn't count towards MaxKey, we can skip
                # the entry without increasing the counter. We need to iterate over all of these entries before
                # returning the next continuation marker, to properly start at the next key after this CommonPrefix
                if prefix_including_delimiter in common_prefixes:
                    continue

            # After skipping all entries, verify we're not over the MaxKeys before adding a new entry
            if count >= max_keys:
                is_truncated = True
                next_continuation_token = to_str(base64.urlsafe_b64encode(s3_object.key.encode()))
                break

            # if we found a new CommonPrefix, add it to the CommonPrefixes
            # else, it means it's a new Object, add it to the Contents
            if prefix_including_delimiter:
                common_prefixes.add(prefix_including_delimiter)
            else:
                # TODO: add RestoreStatus if present
                object_data = Object(
                    Key=key,
                    ETag=s3_object.quoted_etag,
                    Size=s3_object.size,
                    LastModified=s3_object.last_modified,
                    StorageClass=s3_object.storage_class,
                )

                if fetch_owner:
                    object_data["Owner"] = s3_bucket.owner

                if s3_object.checksum_algorithm:
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]

                s3_objects.append(object_data)

            # we just added either a CommonPrefix or an Object to the List, increase the counter by one
            count += 1

        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]

        response = ListObjectsV2Output(
            IsTruncated=is_truncated,
            Name=bucket,
            MaxKeys=max_keys,
            Prefix=prefix or "",
            KeyCount=count,
        )
        if s3_objects:
            response["Contents"] = s3_objects
        if encoding_type:
            response["EncodingType"] = EncodingType.url
        if delimiter:
            response["Delimiter"] = delimiter
        if common_prefixes:
            response["CommonPrefixes"] = common_prefixes
        if next_continuation_token:
            response["NextContinuationToken"] = next_continuation_token

        if continuation_token:
            response["ContinuationToken"] = continuation_token
        elif start_after:
            response["StartAfter"] = start_after

        if s3_bucket.bucket_region != "us-east-1":
            response["BucketRegion"] = s3_bucket.bucket_region

        # RequestCharged: Optional[RequestCharged]  # TODO
        return response

    def list_object_versions(
        self,
        context: RequestContext,
        bucket: BucketName,
        delimiter: Delimiter = None,
        encoding_type: EncodingType = None,
        key_marker: KeyMarker = None,
        max_keys: MaxKeys = None,
        prefix: Prefix = None,
        version_id_marker: VersionIdMarker = None,
        expected_bucket_owner: AccountId = None,
        request_payer: RequestPayer = None,
        optional_object_attributes: OptionalObjectAttributesList = None,
        **kwargs,
    ) -> ListObjectVersionsOutput:
        if version_id_marker and not key_marker:
            raise InvalidArgument(
                "A version-id marker cannot be specified without a key marker.",
                ArgumentName="version-id-marker",
                ArgumentValue=version_id_marker,
            )

        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        common_prefixes = set()
        count = 0
        is_truncated = False
        next_key_marker = None
        next_version_id_marker = None
        max_keys = max_keys or 1000
        prefix = prefix or ""
        delimiter = delimiter or ""
        if encoding_type:
            prefix = urlparse.quote(prefix)
            delimiter = urlparse.quote(delimiter)
        version_key_marker_found = False

        object_versions: list[ObjectVersion] = []
        delete_markers: list[DeleteMarkerEntry] = []

        all_versions = s3_bucket.objects.values(with_versions=True)
        # sort by key, and last-modified-date, to get the last version first
        all_versions.sort(key=lambda r: (r.key, -r.last_modified.timestamp()))
        last_version = all_versions[-1] if all_versions else None

        for version in all_versions:
            key = urlparse.quote(version.key) if encoding_type else version.key
            # skip all keys that alphabetically come before key_marker
            if key_marker:
                if key < key_marker:
                    continue
                elif key == key_marker:
                    if not version_id_marker:
                        continue
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
                    if version.version_id == version_id_marker:
                        version_key_marker_found = True
                        continue
                    elif not version_key_marker_found:
                        # as long as we have not passed the version_key_marker, skip the versions
                        continue

            # Filter for keys that start with prefix
            if prefix and not key.startswith(prefix):
                continue

            # see ListObjectsV2 for the logic comments (shared logic here)
            prefix_including_delimiter = None
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"

                if prefix_including_delimiter in common_prefixes or (
                    key_marker and key_marker.startswith(prefix_including_delimiter)
                ):
                    continue

            if prefix_including_delimiter:
                common_prefixes.add(prefix_including_delimiter)

            elif isinstance(version, S3DeleteMarker):
                delete_marker = DeleteMarkerEntry(
                    Key=key,
                    Owner=s3_bucket.owner,
                    VersionId=version.version_id,
                    IsLatest=version.is_current,
                    LastModified=version.last_modified,
                )
                delete_markers.append(delete_marker)
            else:
                # TODO: add RestoreStatus if present
                object_version = ObjectVersion(
                    Key=key,
                    ETag=version.quoted_etag,
                    Owner=s3_bucket.owner,  # TODO: verify reality
                    Size=version.size,
                    VersionId=version.version_id or "null",
                    LastModified=version.last_modified,
                    IsLatest=version.is_current,
                    # TODO: verify this, are other class possible?
                    # StorageClass=version.storage_class,
                    StorageClass=ObjectVersionStorageClass.STANDARD,
                )

                if version.checksum_algorithm:
                    object_version["ChecksumAlgorithm"] = [version.checksum_algorithm]

                object_versions.append(object_version)

            # we just added a CommonPrefix, an Object or a DeleteMarker, increase the counter
            count += 1
            if count >= max_keys and last_version.version_id != version.version_id:
                is_truncated = True
                if prefix_including_delimiter:
                    next_key_marker = prefix_including_delimiter
                else:
                    next_key_marker = version.key
                    next_version_id_marker = version.version_id
                break

        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]

        response = ListObjectVersionsOutput(
            IsTruncated=is_truncated,
            Name=bucket,
            MaxKeys=max_keys,
            Prefix=prefix,
            KeyMarker=key_marker or "",
            VersionIdMarker=version_id_marker or "",
        )
        if object_versions:
            response["Versions"] = object_versions
        if encoding_type:
            response["EncodingType"] = EncodingType.url
        if delete_markers:
            response["DeleteMarkers"] = delete_markers
        if delimiter:
            response["Delimiter"] = delimiter
        if common_prefixes:
            response["CommonPrefixes"] = common_prefixes
        if next_key_marker:
            response["NextKeyMarker"] = next_key_marker
        if next_version_id_marker:
            response["NextVersionIdMarker"] = next_version_id_marker

        # RequestCharged: Optional[RequestCharged]  # TODO
        return response

    @handler("GetObjectAttributes", expand=False)
    def get_object_attributes(
        self,
        context: RequestContext,
        request: GetObjectAttributesRequest,
    ) -> GetObjectAttributesOutput:
        bucket_name = request["Bucket"]
        object_key = request["Key"]
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)

        s3_object = s3_bucket.get_object(
            key=object_key,
            version_id=request.get("VersionId"),
            http_method="GET",
        )

        object_attrs = request.get("ObjectAttributes", [])
        response = GetObjectAttributesOutput()
        if "ETag" in object_attrs:
            response["ETag"] = s3_object.etag
        if "StorageClass" in object_attrs:
            response["StorageClass"] = s3_object.storage_class
        if "ObjectSize" in object_attrs:
            response["ObjectSize"] = s3_object.size
        if "Checksum" in object_attrs and (checksum_algorithm := s3_object.checksum_algorithm):
            response["Checksum"] = {
                f"Checksum{checksum_algorithm.upper()}": s3_object.checksum_value
            }

        response["LastModified"] = s3_object.last_modified

        if s3_bucket.versioning_status:
            response["VersionId"] = s3_object.version_id

        if s3_object.parts:
            # TODO: implements ObjectParts, this is basically a simplified `ListParts` call on the object, we might
            #  need to store more data about the Parts once we implement checksums for them
            response["ObjectParts"] = GetObjectAttributesParts(TotalPartsCount=len(s3_object.parts))

        return response

    def restore_object(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        version_id: ObjectVersionId = None,
        restore_request: RestoreRequest = None,
        request_payer: RequestPayer = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> RestoreObjectOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_object = s3_bucket.get_object(
            key=key,
            version_id=version_id,
            http_method="GET",  # TODO: verify http method
        )
        if s3_object.storage_class not in ARCHIVES_STORAGE_CLASSES:
            raise InvalidObjectState(StorageClass=s3_object.storage_class)

        # TODO: moto was only supported "Days" parameters from RestoreRequest, and was ignoring the others
        # will only implement only the same functionality for now

        # if a request was already done and the object was available, and we're updating it, set the status code to 200
        status_code = 200 if s3_object.restore else 202
        restore_days = restore_request.get("Days")
        if not restore_days:
            LOG.debug("LocalStack does not support restore SELECT requests yet.")
            return RestoreObjectOutput()

        restore_expiration_date = add_expiration_days_to_datetime(
            datetime.datetime.utcnow(), restore_days
        )
        # TODO: add a way to transition from ongoing-request=true to false? for now it is instant
        s3_object.restore = f'ongoing-request="false", expiry-date="{restore_expiration_date}"'

        s3_notif_ctx_initiated = S3EventNotificationContext.from_request_context_native(
            context,
            s3_bucket=s3_bucket,
            s3_object=s3_object,
        )
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_initiated)
        # But because it's instant in LocalStack, we can directly send the Completed notification as well
        # We just need to copy the context so that we don't mutate the first context while it could be sent
        # And modify its event type from `ObjectRestore:Post` to `ObjectRestore:Completed`
        s3_notif_ctx_completed = copy.copy(s3_notif_ctx_initiated)
        s3_notif_ctx_completed.event_type = s3_notif_ctx_completed.event_type.replace(
            "Post", "Completed"
        )
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_completed)

        # TODO: request charged
        return RestoreObjectOutput(StatusCode=status_code)

    @handler("CreateMultipartUpload", expand=False)
    def create_multipart_upload(
        self,
        context: RequestContext,
        request: CreateMultipartUploadRequest,
    ) -> CreateMultipartUploadOutput:
        # TODO: handle missing parameters:
        #  request_payer: RequestPayer = None,
        bucket_name = request["Bucket"]
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)

        if (storage_class := request.get("StorageClass")) is not None and (
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
        ):
            raise InvalidStorageClass(
                "The storage class you specified is not valid", StorageClassRequested=storage_class
            )

        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
            validate_kms_key_id(sse_kms_key_id, s3_bucket)

        if tagging := request.get("Tagging"):
            tagging = parse_tagging_header(tagging_header=tagging)

        key = request["Key"]

        system_metadata = get_system_metadata_from_request(request)
        if not system_metadata.get("ContentType"):
            system_metadata["ContentType"] = "binary/octet-stream"

        # TODO: validate the algorithm?
        checksum_algorithm = request.get("ChecksumAlgorithm")

        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
            request,
            s3_bucket,
            store,
        )
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)

        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)

        # validate encryption values
        s3_multipart = S3Multipart(
            key=key,
            storage_class=storage_class,
            expires=request.get("Expires"),
            user_metadata=request.get("Metadata"),
            system_metadata=system_metadata,
            checksum_algorithm=checksum_algorithm,
            encryption=encryption_parameters.encryption,
            kms_key_id=encryption_parameters.kms_key_id,
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
            lock_mode=lock_parameters.lock_mode,
            lock_legal_status=lock_parameters.lock_legal_status,
            lock_until=lock_parameters.lock_until,
            website_redirect_location=request.get("WebsiteRedirectLocation"),
            expiration=None,  # TODO, from lifecycle, or should it be updated with config?
            acl=acl,
            initiator=get_owner_for_account_id(context.account_id),
            tagging=tagging,
            owner=s3_bucket.owner,
        )

        s3_bucket.multiparts[s3_multipart.id] = s3_multipart

        response = CreateMultipartUploadOutput(
            Bucket=bucket_name, Key=key, UploadId=s3_multipart.id
        )

        if checksum_algorithm:
            response["ChecksumAlgorithm"] = checksum_algorithm

        add_encryption_to_response(response, s3_object=s3_multipart.object)

        # TODO: missing response fields we're not currently supporting
        # - AbortDate: lifecycle related,not currently supported, todo
        # - AbortRuleId: lifecycle related, not currently supported, todo
        # - RequestCharged: todo

        return response

    @handler("UploadPart", expand=False)
    def upload_part(
        self,
        context: RequestContext,
        request: UploadPartRequest,
    ) -> UploadPartOutput:
        # TODO: missing following parameters:
        #  content_length: ContentLength = None, ->validate?
        #  content_md5: ContentMD5 = None, -> validate?
        #  request_payer: RequestPayer = None,
        bucket_name = request["Bucket"]
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)

        upload_id = request.get("UploadId")
        if not (
            s3_multipart := s3_bucket.multiparts.get(upload_id)
        ) or s3_multipart.object.key != request.get("Key"):
            raise NoSuchUpload(
                "The specified upload does not exist. "
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
                UploadId=upload_id,
            )
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
            raise InvalidArgument(
                "Part number must be an integer between 1 and 10000, inclusive",
                ArgumentName="partNumber",
                ArgumentValue=part_number,
            )

        checksum_algorithm = request.get("ChecksumAlgorithm")
        checksum_value = (
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
        )

        s3_part = S3Part(
            part_number=part_number,
            checksum_algorithm=checksum_algorithm,
            checksum_value=checksum_value,
        )
        body = request.get("Body")
        headers = context.request.headers
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
            "STREAMING-"
        ) or "aws-chunked" in headers.get("content-encoding", "")
        # check if chunked request
        if is_aws_chunked:
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
            body = AwsChunkedDecoder(body, decoded_content_length, s3_part)

        stored_multipart = self._storage_backend.get_multipart(bucket_name, s3_multipart)
        with stored_multipart.open(s3_part, mode="w") as stored_s3_part:
            try:
                stored_s3_part.write(body)
            except Exception:
                stored_multipart.remove_part(s3_part)
                raise

            if checksum_algorithm and s3_part.checksum_value != stored_s3_part.checksum:
                stored_multipart.remove_part(s3_part)
                raise InvalidRequest(
                    f"Value for x-amz-checksum-{checksum_algorithm.lower()} header is invalid."
                )

            s3_multipart.parts[part_number] = s3_part

        response = UploadPartOutput(
            ETag=s3_part.quoted_etag,
        )

        add_encryption_to_response(response, s3_object=s3_multipart.object)

        if s3_part.checksum_algorithm:
            response[f"Checksum{checksum_algorithm.upper()}"] = s3_part.checksum_value

        # TODO: RequestCharged: Optional[RequestCharged]
        return response

    @handler("UploadPartCopy", expand=False)
    def upload_part_copy(
        self,
        context: RequestContext,
        request: UploadPartCopyRequest,
    ) -> UploadPartCopyOutput:
        # TODO: handle following parameters:
        #  copy_source_if_match: CopySourceIfMatch = None,
        #  copy_source_if_modified_since: CopySourceIfModifiedSince = None,
        #  copy_source_if_none_match: CopySourceIfNoneMatch = None,
        #  copy_source_if_unmodified_since: CopySourceIfUnmodifiedSince = None,
        #  request_payer: RequestPayer = None,
        dest_bucket = request["Bucket"]
        dest_key = request["Key"]
        store = self.get_store(context.account_id, context.region)
        # TODO: validate cross-account UploadPartCopy
        if not (dest_s3_bucket := store.buckets.get(dest_bucket)):
            raise NoSuchBucket("The specified bucket does not exist", BucketName=dest_bucket)

        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
            request.get("CopySource")
        )

        if not (src_s3_bucket := store.buckets.get(src_bucket)):
            raise NoSuchBucket("The specified bucket does not exist", BucketName=src_bucket)

        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
        try:
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
        except MethodNotAllowed:
            raise InvalidRequest(
                "The source of a copy request may not specifically refer to a delete marker by version id."
            )

        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
            raise InvalidObjectState(
                "Operation is not valid for the source object's storage class",
                StorageClass=src_s3_object.storage_class,
            )

        upload_id = request.get("UploadId")
        if (
            not (s3_multipart := dest_s3_bucket.multiparts.get(upload_id))
            or s3_multipart.object.key != dest_key
        ):
            raise NoSuchUpload(
                "The specified upload does not exist. "
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
                UploadId=upload_id,
            )

        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
            raise InvalidArgument(
                "Part number must be an integer between 1 and 10000, inclusive",
                ArgumentName="partNumber",
                ArgumentValue=part_number,
            )

        source_range = request.get("CopySourceRange")
        # TODO implement copy source IF (done in ASF provider)

        range_data: Optional[ObjectRange] = None
        if source_range:
            range_data = parse_copy_source_range_header(source_range, src_s3_object.size)

        s3_part = S3Part(part_number=part_number)

        stored_multipart = self._storage_backend.get_multipart(dest_bucket, s3_multipart)
        stored_multipart.copy_from_object(s3_part, src_bucket, src_s3_object, range_data)

        s3_multipart.parts[part_number] = s3_part

        # TODO: return those fields (checksum not handled currently in moto for parts)
        # ChecksumCRC32: Optional[ChecksumCRC32]
        # ChecksumCRC32C: Optional[ChecksumCRC32C]
        # ChecksumSHA1: Optional[ChecksumSHA1]
        # ChecksumSHA256: Optional[ChecksumSHA256]
        #     RequestCharged: Optional[RequestCharged]

        result = CopyPartResult(
            ETag=s3_part.quoted_etag,
            LastModified=s3_part.last_modified,
        )

        response = UploadPartCopyOutput(
            CopyPartResult=result,
        )

        if src_s3_bucket.versioning_status and src_s3_object.version_id:
            response["CopySourceVersionId"] = src_s3_object.version_id

        add_encryption_to_response(response, s3_object=s3_multipart.object)

        return response

    def complete_multipart_upload(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        upload_id: MultipartUploadId,
        multipart_upload: CompletedMultipartUpload = None,
        checksum_crc32: ChecksumCRC32 = None,
        checksum_crc32_c: ChecksumCRC32C = None,
        checksum_sha1: ChecksumSHA1 = None,
        checksum_sha256: ChecksumSHA256 = None,
        request_payer: RequestPayer = None,
        expected_bucket_owner: AccountId = None,
        sse_customer_algorithm: SSECustomerAlgorithm = None,
        sse_customer_key: SSECustomerKey = None,
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
        **kwargs,
    ) -> CompleteMultipartUploadOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if (
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
            or s3_multipart.object.key != key
        ):
            raise NoSuchUpload(
                "The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.",
                UploadId=upload_id,
            )

        parts = multipart_upload.get("Parts", [])
        if not parts:
            raise InvalidRequest("You must specify at least one part")

        parts_numbers = [part.get("PartNumber") for part in parts]
        # sorted is very fast (fastest) if the list is already sorted, which should be the case
        if sorted(parts_numbers) != parts_numbers:
            raise InvalidPartOrder(
                "The list of parts was not in ascending order. Parts must be ordered by part number.",
                UploadId=upload_id,
            )

        # generate the versionId before completing, in case the bucket versioning status has changed between
        # creation and completion? AWS validate this
        version_id = generate_version_id(s3_bucket.versioning_status)
        s3_multipart.object.version_id = version_id
        s3_multipart.complete_multipart(parts)

        stored_multipart = self._storage_backend.get_multipart(bucket, s3_multipart)
        stored_multipart.complete_multipart(
            [s3_multipart.parts.get(part_number) for part_number in parts_numbers]
        )

        s3_object = s3_multipart.object

        s3_bucket.objects.set(key, s3_object)

        # remove the multipart now that it's complete
        self._storage_backend.remove_multipart(bucket, s3_multipart)
        s3_bucket.multiparts.pop(s3_multipart.id, None)

        key_id = get_unique_key_id(bucket, key, version_id)
        store.TAGS.tags.pop(key_id, None)
        if s3_multipart.tagging:
            store.TAGS.tags[key_id] = s3_multipart.tagging

        # TODO: validate if you provide wrong checksum compared to the given algorithm? should you calculate it anyway
        #  when you complete? sounds weird, not sure how that works?

        #     ChecksumCRC32: Optional[ChecksumCRC32] ??
        #     ChecksumCRC32C: Optional[ChecksumCRC32C] ??
        #     ChecksumSHA1: Optional[ChecksumSHA1] ??
        #     ChecksumSHA256: Optional[ChecksumSHA256] ??
        #     RequestCharged: Optional[RequestCharged] TODO

        response = CompleteMultipartUploadOutput(
            Bucket=bucket,
            Key=key,
            ETag=s3_object.quoted_etag,
            Location=f"{get_full_default_bucket_location(bucket)}{key}",
        )

        if s3_object.version_id:
            response["VersionId"] = s3_object.version_id

        # TODO: check this?
        if s3_object.checksum_algorithm:
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value

        if s3_object.expiration:
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime

        add_encryption_to_response(response, s3_object=s3_object)

        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)

        return response

    def abort_multipart_upload(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        upload_id: MultipartUploadId,
        request_payer: RequestPayer = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> AbortMultipartUploadOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if (
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
            or s3_multipart.object.key != key
        ):
            raise NoSuchUpload(
                "The specified upload does not exist. "
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
                UploadId=upload_id,
            )
        s3_bucket.multiparts.pop(upload_id, None)

        self._storage_backend.remove_multipart(bucket, s3_multipart)
        response = AbortMultipartUploadOutput()
        # TODO: requestCharged
        return response

    def list_parts(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        upload_id: MultipartUploadId,
        max_parts: MaxParts = None,
        part_number_marker: PartNumberMarker = None,
        request_payer: RequestPayer = None,
        expected_bucket_owner: AccountId = None,
        sse_customer_algorithm: SSECustomerAlgorithm = None,
        sse_customer_key: SSECustomerKey = None,
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
        **kwargs,
    ) -> ListPartsOutput:
        # TODO: implement MaxParts
        # TODO: implements PartNumberMarker
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if (
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
            or s3_multipart.object.key != key
        ):
            raise NoSuchUpload(
                "The specified upload does not exist. "
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
                UploadId=upload_id,
            )

        #     AbortDate: Optional[AbortDate] TODO: lifecycle
        #     AbortRuleId: Optional[AbortRuleId] TODO: lifecycle
        #     RequestCharged: Optional[RequestCharged]

        count = 0
        is_truncated = False
        part_number_marker = part_number_marker or 0
        max_parts = max_parts or 1000

        parts = []
        all_parts = sorted(s3_multipart.parts.items())
        last_part_number = all_parts[-1][0] if all_parts else None
        for part_number, part in all_parts:
            if part_number <= part_number_marker:
                continue
            part_item = Part(
                ETag=part.quoted_etag,
                LastModified=part.last_modified,
                PartNumber=part_number,
                Size=part.size,
            )
            parts.append(part_item)
            count += 1

            if count >= max_parts and part.part_number != last_part_number:
                is_truncated = True
                break

        response = ListPartsOutput(
            Bucket=bucket,
            Key=key,
            UploadId=upload_id,
            Initiator=s3_multipart.initiator,
            Owner=s3_multipart.initiator,
            StorageClass=s3_multipart.object.storage_class,
            IsTruncated=is_truncated,
            MaxParts=max_parts,
            PartNumberMarker=0,
            NextPartNumberMarker=0,
        )
        if parts:
            response["Parts"] = parts
            last_part = parts[-1]["PartNumber"]
            response["NextPartNumberMarker"] = last_part

        if part_number_marker:
            response["PartNumberMarker"] = part_number_marker
        if s3_multipart.object.checksum_algorithm:
            response["ChecksumAlgorithm"] = s3_multipart.object.checksum_algorithm

        return response

    def list_multipart_uploads(
        self,
        context: RequestContext,
        bucket: BucketName,
        delimiter: Delimiter = None,
        encoding_type: EncodingType = None,
        key_marker: KeyMarker = None,
        max_uploads: MaxUploads = None,
        prefix: Prefix = None,
        upload_id_marker: UploadIdMarker = None,
        expected_bucket_owner: AccountId = None,
        request_payer: RequestPayer = None,
        **kwargs,
    ) -> ListMultipartUploadsOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        common_prefixes = set()
        count = 0
        is_truncated = False
        max_uploads = max_uploads or 1000
        prefix = prefix or ""
        delimiter = delimiter or ""
        if encoding_type:
            prefix = urlparse.quote(prefix)
            delimiter = urlparse.quote(delimiter)
        upload_id_marker_found = False

        if key_marker and upload_id_marker:
            multipart = s3_bucket.multiparts.get(upload_id_marker)
            if multipart:
                key = (
                    urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
                )
            else:
                # set key to None so it fails if the multipart is not Found
                key = None

            if key_marker != key:
                raise InvalidArgument(
                    "Invalid uploadId marker",
                    ArgumentName="upload-id-marker",
                    ArgumentValue=upload_id_marker,
                )

        uploads = []
        # sort by key and initiated
        all_multiparts = sorted(
            s3_bucket.multiparts.values(), key=lambda r: (r.object.key, r.initiated.timestamp())
        )
        last_multipart = all_multiparts[-1] if all_multiparts else None

        for multipart in all_multiparts:
            key = urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
            # skip all keys that are different than key_marker
            if key_marker:
                if key < key_marker:
                    continue
                elif key == key_marker:
                    if not upload_id_marker:
                        continue
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
                    if multipart.id == upload_id_marker:
                        upload_id_marker_found = True
                        continue
                    elif not upload_id_marker_found:
                        # as long as we have not passed the version_key_marker, skip the versions
                        continue

            # Filter for keys that start with prefix
            if prefix and not key.startswith(prefix):
                continue

            # see ListObjectsV2 for the logic comments (shared logic here)
            prefix_including_delimiter = None
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"

                if prefix_including_delimiter in common_prefixes or (
                    key_marker and key_marker.startswith(prefix_including_delimiter)
                ):
                    continue

            if prefix_including_delimiter:
                common_prefixes.add(prefix_including_delimiter)
            else:
                multipart_upload = MultipartUpload(
                    UploadId=multipart.id,
                    Key=multipart.object.key,
                    Initiated=multipart.initiated,
                    StorageClass=multipart.object.storage_class,
                    Owner=multipart.initiator,  # TODO: check the difference
                    Initiator=multipart.initiator,
                )
                uploads.append(multipart_upload)

            count += 1
            if count >= max_uploads and last_multipart.id != multipart.id:
                is_truncated = True
                break

        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]

        response = ListMultipartUploadsOutput(
            Bucket=bucket,
            IsTruncated=is_truncated,
            MaxUploads=max_uploads or 1000,
            KeyMarker=key_marker or "",
            UploadIdMarker=upload_id_marker or "" if key_marker else "",
            NextKeyMarker="",
            NextUploadIdMarker="",
        )
        if uploads:
            response["Uploads"] = uploads
            last_upload = uploads[-1]
            response["NextKeyMarker"] = last_upload["Key"]
            response["NextUploadIdMarker"] = last_upload["UploadId"]
        if delimiter:
            response["Delimiter"] = delimiter
        if prefix:
            response["Prefix"] = prefix
        if encoding_type:
            response["EncodingType"] = EncodingType.url
        if common_prefixes:
            response["CommonPrefixes"] = common_prefixes

        return response

    def put_bucket_versioning(
        self,
        context: RequestContext,
        bucket: BucketName,
        versioning_configuration: VersioningConfiguration,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        mfa: MFA = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        if not (versioning_status := versioning_configuration.get("Status")):
            raise CommonServiceException(
                code="IllegalVersioningConfigurationException",
                message="The Versioning element must be specified",
            )

        if s3_bucket.object_lock_enabled:
            raise InvalidBucketState(
                "An Object Lock configuration is present on this bucket, so the versioning state cannot be changed."
            )

        if versioning_status not in ("Enabled", "Suspended"):
            raise MalformedXML()

        if not s3_bucket.versioning_status:
            s3_bucket.objects = VersionedKeyStore.from_key_store(s3_bucket.objects)

        s3_bucket.versioning_status = versioning_status

    def get_bucket_versioning(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketVersioningOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.versioning_status:
            return GetBucketVersioningOutput()

        return GetBucketVersioningOutput(Status=s3_bucket.versioning_status)

    def get_bucket_encryption(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketEncryptionOutput:
        # AWS now encrypts bucket by default with AES256, see:
        # https://docs.aws.amazon.com/AmazonS3/latest/userguide/default-bucket-encryption.html
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.encryption_rule:
            return GetBucketEncryptionOutput()

        return GetBucketEncryptionOutput(
            ServerSideEncryptionConfiguration={"Rules": [s3_bucket.encryption_rule]}
        )

    def put_bucket_encryption(
        self,
        context: RequestContext,
        bucket: BucketName,
        server_side_encryption_configuration: ServerSideEncryptionConfiguration,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not (rules := server_side_encryption_configuration.get("Rules")):
            raise MalformedXML()

        if len(rules) != 1 or not (
            encryption := rules[0].get("ApplyServerSideEncryptionByDefault")
        ):
            raise MalformedXML()

        if not (sse_algorithm := encryption.get("SSEAlgorithm")):
            raise MalformedXML()

        if sse_algorithm not in SSE_ALGORITHMS:
            raise MalformedXML()

        if sse_algorithm != ServerSideEncryption.aws_kms and "KMSMasterKeyID" in encryption:
            raise InvalidArgument(
                "a KMSMasterKeyID is not applicable if the default sse algorithm is not aws:kms",
                ArgumentName="ApplyServerSideEncryptionByDefault",
            )
        # elif master_kms_key := encryption.get("KMSMasterKeyID"):
        # TODO: validate KMS key? not currently done in moto
        # You can pass either the KeyId or the KeyArn. If cross-account, it has to be the ARN.
        # It's always saved as the ARN in the bucket configuration.
        # kms_key_arn = get_kms_key_arn(master_kms_key, s3_bucket.bucket_account_id)
        # encryption["KMSMasterKeyID"] = master_kms_key

        s3_bucket.encryption_rule = rules[0]

    def delete_bucket_encryption(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_bucket.encryption_rule = None

    def put_bucket_notification_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        notification_configuration: NotificationConfiguration,
        expected_bucket_owner: AccountId = None,
        skip_destination_validation: SkipValidation = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        self._verify_notification_configuration(
            notification_configuration, skip_destination_validation, context, bucket
        )
        s3_bucket.notification_configuration = notification_configuration

    def get_bucket_notification_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> NotificationConfiguration:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        return s3_bucket.notification_configuration or NotificationConfiguration()

    def put_bucket_tagging(
        self,
        context: RequestContext,
        bucket: BucketName,
        tagging: Tagging,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if "TagSet" not in tagging:
            raise MalformedXML()

        validate_tag_set(tagging["TagSet"], type_set="bucket")

        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
        store.TAGS.tag_resource(s3_bucket.bucket_arn, tags=tagging["TagSet"])

    def get_bucket_tagging(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketTaggingOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        tag_set = store.TAGS.list_tags_for_resource(s3_bucket.bucket_arn, root_name="Tags")["Tags"]
        if not tag_set:
            raise NoSuchTagSet(
                "The TagSet does not exist",
                BucketName=bucket,
            )

        return GetBucketTaggingOutput(TagSet=tag_set)

    def delete_bucket_tagging(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)

    def put_object_tagging(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        tagging: Tagging,
        version_id: ObjectVersionId = None,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        request_payer: RequestPayer = None,
        **kwargs,
    ) -> PutObjectTaggingOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_object = s3_bucket.get_object(
            key=key,
            version_id=version_id,
            raise_for_delete_marker=False,  # We can tag DeleteMarker
        )

        if "TagSet" not in tagging:
            raise MalformedXML()

        validate_tag_set(tagging["TagSet"], type_set="object")

        key_id = get_unique_key_id(bucket, key, s3_object.version_id)
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
        store.TAGS.tags.pop(key_id, None)
        store.TAGS.tag_resource(key_id, tags=tagging["TagSet"])
        response = PutObjectTaggingOutput()
        if s3_object.version_id:
            response["VersionId"] = s3_object.version_id

        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)

        return response

    def get_object_tagging(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        version_id: ObjectVersionId = None,
        expected_bucket_owner: AccountId = None,
        request_payer: RequestPayer = None,
        **kwargs,
    ) -> GetObjectTaggingOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        try:
            s3_object = s3_bucket.get_object(
                key=key,
                version_id=version_id,
                raise_for_delete_marker=False,  # We can tag DeleteMarker
            )
        except NoSuchKey as e:
            # There a weird AWS validated bug in S3: the returned key contains the bucket name as well
            # follow AWS on this one
            e.Key = f"{bucket}/{key}"
            raise e

        tag_set = store.TAGS.list_tags_for_resource(
            get_unique_key_id(bucket, key, s3_object.version_id)
        )["Tags"]
        response = GetObjectTaggingOutput(TagSet=tag_set)
        if s3_object.version_id:
            response["VersionId"] = s3_object.version_id

        return response

    def delete_object_tagging(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        version_id: ObjectVersionId = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> DeleteObjectTaggingOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_object = s3_bucket.get_object(
            key=key,
            version_id=version_id,
            raise_for_delete_marker=False,
        )

        store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
        response = DeleteObjectTaggingOutput()
        if s3_object.version_id:
            response["VersionId"] = s3_object.version_id

        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)

        return response

    def put_bucket_cors(
        self,
        context: RequestContext,
        bucket: BucketName,
        cors_configuration: CORSConfiguration,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        validate_cors_configuration(cors_configuration)
        s3_bucket.cors_rules = cors_configuration
        self._cors_handler.invalidate_cache()

    def get_bucket_cors(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketCorsOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.cors_rules:
            raise NoSuchCORSConfiguration(
                "The CORS configuration does not exist",
                BucketName=bucket,
            )
        return GetBucketCorsOutput(CORSRules=s3_bucket.cors_rules["CORSRules"])

    def delete_bucket_cors(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if s3_bucket.cors_rules:
            self._cors_handler.invalidate_cache()
            s3_bucket.cors_rules = None

    def get_bucket_lifecycle_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketLifecycleConfigurationOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.lifecycle_rules:
            raise NoSuchLifecycleConfiguration(
                "The lifecycle configuration does not exist",
                BucketName=bucket,
            )

        return GetBucketLifecycleConfigurationOutput(Rules=s3_bucket.lifecycle_rules)

    def put_bucket_lifecycle_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        checksum_algorithm: ChecksumAlgorithm = None,
        lifecycle_configuration: BucketLifecycleConfiguration = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        validate_lifecycle_configuration(lifecycle_configuration)
        # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to apply them
        #  everytime we get/head an object
        # for now, we keep a cache and get it everytime we fetch an object
        s3_bucket.lifecycle_rules = lifecycle_configuration["Rules"]
        self._expiration_cache[bucket].clear()

    def delete_bucket_lifecycle(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_bucket.lifecycle_rules = None
        self._expiration_cache[bucket].clear()

    def put_bucket_analytics_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        id: AnalyticsId,
        analytics_configuration: AnalyticsConfiguration,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        validate_bucket_analytics_configuration(
            id=id, analytics_configuration=analytics_configuration
        )

        s3_bucket.analytics_configurations[id] = analytics_configuration

    def get_bucket_analytics_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        id: AnalyticsId,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketAnalyticsConfigurationOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not (analytic_config := s3_bucket.analytics_configurations.get(id)):
            raise NoSuchConfiguration("The specified configuration does not exist.")

        return GetBucketAnalyticsConfigurationOutput(AnalyticsConfiguration=analytic_config)

    def list_bucket_analytics_configurations(
        self,
        context: RequestContext,
        bucket: BucketName,
        continuation_token: Token = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> ListBucketAnalyticsConfigurationsOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        return ListBucketAnalyticsConfigurationsOutput(
            IsTruncated=False,
            AnalyticsConfigurationList=sorted(
                s3_bucket.analytics_configurations.values(),
                key=itemgetter("Id"),
            ),
        )

    def delete_bucket_analytics_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        id: AnalyticsId,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.analytics_configurations.pop(id, None):
            raise NoSuchConfiguration("The specified configuration does not exist.")

    def put_bucket_intelligent_tiering_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        id: IntelligentTieringId,
        intelligent_tiering_configuration: IntelligentTieringConfiguration,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        validate_bucket_intelligent_tiering_configuration(id, intelligent_tiering_configuration)

        s3_bucket.intelligent_tiering_configurations[id] = intelligent_tiering_configuration

    def get_bucket_intelligent_tiering_configuration(
        self, context: RequestContext, bucket: BucketName, id: IntelligentTieringId, **kwargs
    ) -> GetBucketIntelligentTieringConfigurationOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not (itier_config := s3_bucket.intelligent_tiering_configurations.get(id)):
            raise NoSuchConfiguration("The specified configuration does not exist.")

        return GetBucketIntelligentTieringConfigurationOutput(
            IntelligentTieringConfiguration=itier_config
        )

    def delete_bucket_intelligent_tiering_configuration(
        self, context: RequestContext, bucket: BucketName, id: IntelligentTieringId, **kwargs
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.intelligent_tiering_configurations.pop(id, None):
            raise NoSuchConfiguration("The specified configuration does not exist.")

    def list_bucket_intelligent_tiering_configurations(
        self,
        context: RequestContext,
        bucket: BucketName,
        continuation_token: Token = None,
        **kwargs,
    ) -> ListBucketIntelligentTieringConfigurationsOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        return ListBucketIntelligentTieringConfigurationsOutput(
            IsTruncated=False,
            IntelligentTieringConfigurationList=sorted(
                s3_bucket.intelligent_tiering_configurations.values(),
                key=itemgetter("Id"),
            ),
        )

    def put_bucket_inventory_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        id: InventoryId,
        inventory_configuration: InventoryConfiguration,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        validate_inventory_configuration(
            config_id=id, inventory_configuration=inventory_configuration
        )
        s3_bucket.inventory_configurations[id] = inventory_configuration

    def get_bucket_inventory_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        id: InventoryId,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketInventoryConfigurationOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not (inv_config := s3_bucket.inventory_configurations.get(id)):
            raise NoSuchConfiguration("The specified configuration does not exist.")
        return GetBucketInventoryConfigurationOutput(InventoryConfiguration=inv_config)

    def list_bucket_inventory_configurations(
        self,
        context: RequestContext,
        bucket: BucketName,
        continuation_token: Token = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> ListBucketInventoryConfigurationsOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        return ListBucketInventoryConfigurationsOutput(
            IsTruncated=False,
            InventoryConfigurationList=sorted(
                s3_bucket.inventory_configurations.values(), key=itemgetter("Id")
            ),
        )

    def delete_bucket_inventory_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        id: InventoryId,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.inventory_configurations.pop(id, None):
            raise NoSuchConfiguration("The specified configuration does not exist.")

    def get_bucket_website(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketWebsiteOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.website_configuration:
            raise NoSuchWebsiteConfiguration(
                "The specified bucket does not have a website configuration",
                BucketName=bucket,
            )
        return s3_bucket.website_configuration

    def put_bucket_website(
        self,
        context: RequestContext,
        bucket: BucketName,
        website_configuration: WebsiteConfiguration,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        validate_website_configuration(website_configuration)
        s3_bucket.website_configuration = website_configuration

    def delete_bucket_website(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        # does not raise error if the bucket did not have a config, will simply return
        s3_bucket.website_configuration = None

    def get_object_lock_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetObjectLockConfigurationOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        if not s3_bucket.object_lock_enabled:
            raise ObjectLockConfigurationNotFoundError(
                "Object Lock configuration does not exist for this bucket",
                BucketName=bucket,
            )

        response = GetObjectLockConfigurationOutput(
            ObjectLockConfiguration=ObjectLockConfiguration(
                ObjectLockEnabled=ObjectLockEnabled.Enabled
            )
        )
        if s3_bucket.object_lock_default_retention:
            response["ObjectLockConfiguration"]["Rule"] = {
                "DefaultRetention": s3_bucket.object_lock_default_retention
            }

        return response

    def put_object_lock_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        object_lock_configuration: ObjectLockConfiguration = None,
        request_payer: RequestPayer = None,
        token: ObjectLockToken = None,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> PutObjectLockConfigurationOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        if s3_bucket.versioning_status != "Enabled":
            raise InvalidBucketState(
                "Versioning must be 'Enabled' on the bucket to apply a Object Lock configuration"
            )

        if (
            not object_lock_configuration
            or object_lock_configuration.get("ObjectLockEnabled") != "Enabled"
        ):
            raise MalformedXML()

        if "Rule" not in object_lock_configuration:
            s3_bucket.object_lock_default_retention = None
            if not s3_bucket.object_lock_enabled:
                s3_bucket.object_lock_enabled = True

            return PutObjectLockConfigurationOutput()
        elif not (rule := object_lock_configuration["Rule"]) or not (
            default_retention := rule.get("DefaultRetention")
        ):
            raise MalformedXML()

        if "Mode" not in default_retention or (
            ("Days" in default_retention and "Years" in default_retention)
            or ("Days" not in default_retention and "Years" not in default_retention)
        ):
            raise MalformedXML()

        s3_bucket.object_lock_default_retention = default_retention
        if not s3_bucket.object_lock_enabled:
            s3_bucket.object_lock_enabled = True

        return PutObjectLockConfigurationOutput()

    def get_object_legal_hold(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        version_id: ObjectVersionId = None,
        request_payer: RequestPayer = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetObjectLegalHoldOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        if not s3_bucket.object_lock_enabled:
            raise InvalidRequest("Bucket is missing Object Lock Configuration")

        s3_object = s3_bucket.get_object(
            key=key,
            version_id=version_id,
            http_method="GET",
        )
        if not s3_object.lock_legal_status:
            raise NoSuchObjectLockConfiguration(
                "The specified object does not have a ObjectLock configuration"
            )

        return GetObjectLegalHoldOutput(
            LegalHold=ObjectLockLegalHold(Status=s3_object.lock_legal_status)
        )

    def put_object_legal_hold(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        legal_hold: ObjectLockLegalHold = None,
        request_payer: RequestPayer = None,
        version_id: ObjectVersionId = None,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> PutObjectLegalHoldOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not legal_hold:
            raise MalformedXML()

        if not s3_bucket.object_lock_enabled:
            raise InvalidRequest("Bucket is missing Object Lock Configuration")

        s3_object = s3_bucket.get_object(
            key=key,
            version_id=version_id,
            http_method="PUT",
        )
        # TODO: check casing
        if not (status := legal_hold.get("Status")) or status not in ("ON", "OFF"):
            raise MalformedXML()

        s3_object.lock_legal_status = status

        # TODO: return RequestCharged
        return PutObjectRetentionOutput()

    def get_object_retention(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        version_id: ObjectVersionId = None,
        request_payer: RequestPayer = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetObjectRetentionOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        if not s3_bucket.object_lock_enabled:
            raise InvalidRequest("Bucket is missing Object Lock Configuration")

        s3_object = s3_bucket.get_object(
            key=key,
            version_id=version_id,
            http_method="GET",
        )
        if not s3_object.lock_mode:
            raise NoSuchObjectLockConfiguration(
                "The specified object does not have a ObjectLock configuration"
            )

        return GetObjectRetentionOutput(
            Retention=ObjectLockRetention(
                Mode=s3_object.lock_mode,
                RetainUntilDate=s3_object.lock_until,
            )
        )

    def put_object_retention(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        retention: ObjectLockRetention = None,
        request_payer: RequestPayer = None,
        version_id: ObjectVersionId = None,
        bypass_governance_retention: BypassGovernanceRetention = None,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> PutObjectRetentionOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        if not s3_bucket.object_lock_enabled:
            raise InvalidRequest("Bucket is missing Object Lock Configuration")

        s3_object = s3_bucket.get_object(
            key=key,
            version_id=version_id,
            http_method="PUT",
        )

        if retention and not validate_dict_fields(
            retention, required_fields={"Mode", "RetainUntilDate"}
        ):
            raise MalformedXML()

        if (
            not retention
            or (s3_object.lock_until and s3_object.lock_until > retention["RetainUntilDate"])
        ) and not (
            bypass_governance_retention and s3_object.lock_mode == ObjectLockMode.GOVERNANCE
        ):
            raise AccessDenied("Access Denied")

        s3_object.lock_mode = retention["Mode"] if retention else None
        s3_object.lock_until = retention["RetainUntilDate"] if retention else None

        # TODO: return RequestCharged
        return PutObjectRetentionOutput()

    def put_bucket_request_payment(
        self,
        context: RequestContext,
        bucket: BucketName,
        request_payment_configuration: RequestPaymentConfiguration,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        # TODO: this currently only mock the operation, but its actual effect is not emulated
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        payer = request_payment_configuration.get("Payer")
        if payer not in ["Requester", "BucketOwner"]:
            raise MalformedXML()

        s3_bucket.payer = payer

    def get_bucket_request_payment(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketRequestPaymentOutput:
        # TODO: this currently only mock the operation, but its actual effect is not emulated
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        return GetBucketRequestPaymentOutput(Payer=s3_bucket.payer)

    def get_bucket_ownership_controls(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketOwnershipControlsOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.object_ownership:
            raise OwnershipControlsNotFoundError(
                "The bucket ownership controls were not found",
                BucketName=bucket,
            )

        return GetBucketOwnershipControlsOutput(
            OwnershipControls={"Rules": [{"ObjectOwnership": s3_bucket.object_ownership}]}
        )

    def put_bucket_ownership_controls(
        self,
        context: RequestContext,
        bucket: BucketName,
        ownership_controls: OwnershipControls,
        content_md5: ContentMD5 = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        # TODO: this currently only mock the operation, but its actual effect is not emulated
        #  it for example almost forbid ACL usage when set to BucketOwnerEnforced
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not (rules := ownership_controls.get("Rules")) or len(rules) > 1:
            raise MalformedXML()

        rule = rules[0]
        if (object_ownership := rule.get("ObjectOwnership")) not in OBJECT_OWNERSHIPS:
            raise MalformedXML()

        s3_bucket.object_ownership = object_ownership

    def delete_bucket_ownership_controls(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_bucket.object_ownership = None

    def get_public_access_block(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetPublicAccessBlockOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.public_access_block:
            raise NoSuchPublicAccessBlockConfiguration(
                "The public access block configuration was not found", BucketName=bucket
            )

        return GetPublicAccessBlockOutput(
            PublicAccessBlockConfiguration=s3_bucket.public_access_block
        )

    def put_public_access_block(
        self,
        context: RequestContext,
        bucket: BucketName,
        public_access_block_configuration: PublicAccessBlockConfiguration,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        # TODO: this currently only mock the operation, but its actual effect is not emulated
        #  as we do not enforce ACL directly. Also, this should take the most restrictive between S3Control and the
        #  bucket configuration. See s3control
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        public_access_block_fields = {
            "BlockPublicAcls",
            "BlockPublicPolicy",
            "IgnorePublicAcls",
            "RestrictPublicBuckets",
        }
        if not validate_dict_fields(
            public_access_block_configuration,
            required_fields=set(),
            optional_fields=public_access_block_fields,
        ):
            raise MalformedXML()

        for field in public_access_block_fields:
            if public_access_block_configuration.get(field) is None:
                public_access_block_configuration[field] = False

        s3_bucket.public_access_block = public_access_block_configuration

    def delete_public_access_block(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_bucket.public_access_block = None

    def get_bucket_policy(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketPolicyOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        if not s3_bucket.policy:
            raise NoSuchBucketPolicy(
                "The bucket policy does not exist",
                BucketName=bucket,
            )
        return GetBucketPolicyOutput(Policy=s3_bucket.policy)

    def put_bucket_policy(
        self,
        context: RequestContext,
        bucket: BucketName,
        policy: Policy,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        confirm_remove_self_bucket_access: ConfirmRemoveSelfBucketAccess = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not policy or policy[0] != "{":
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
        try:
            json_policy = json.loads(policy)
            if not json_policy:
                # TODO: add more validation around the policy?
                raise MalformedPolicy("Missing required field Statement")
        except ValueError:
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")

        s3_bucket.policy = policy

    def delete_bucket_policy(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_bucket.policy = None

    def get_bucket_accelerate_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        request_payer: RequestPayer = None,
        **kwargs,
    ) -> GetBucketAccelerateConfigurationOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        response = GetBucketAccelerateConfigurationOutput()
        if s3_bucket.accelerate_status:
            response["Status"] = s3_bucket.accelerate_status

        return response

    def put_bucket_accelerate_configuration(
        self,
        context: RequestContext,
        bucket: BucketName,
        accelerate_configuration: AccelerateConfiguration,
        expected_bucket_owner: AccountId = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if "." in bucket:
            raise InvalidRequest(
                "S3 Transfer Acceleration is not supported for buckets with periods (.) in their names"
            )

        if not (status := accelerate_configuration.get("Status")) or status not in (
            "Enabled",
            "Suspended",
        ):
            raise MalformedXML()

        s3_bucket.accelerate_status = status

    def put_bucket_logging(
        self,
        context: RequestContext,
        bucket: BucketName,
        bucket_logging_status: BucketLoggingStatus,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not (logging_config := bucket_logging_status.get("LoggingEnabled")):
            s3_bucket.logging = {}
            return

        # the target bucket must be in the same account
        if not (target_bucket_name := logging_config.get("TargetBucket")):
            raise MalformedXML()

        if not logging_config.get("TargetPrefix"):
            logging_config["TargetPrefix"] = ""

        # TODO: validate Grants

        if not (target_s3_bucket := store.buckets.get(target_bucket_name)):
            raise InvalidTargetBucketForLogging(
                "The target bucket for logging does not exist",
                TargetBucket=target_bucket_name,
            )

        source_bucket_region = s3_bucket.bucket_region
        if target_s3_bucket.bucket_region != source_bucket_region:
            raise (
                CrossLocationLoggingProhibitted(
                    "Cross S3 location logging not allowed. ",
                    TargetBucketLocation=target_s3_bucket.bucket_region,
                )
                if source_bucket_region == AWS_REGION_US_EAST_1
                else CrossLocationLoggingProhibitted(
                    "Cross S3 location logging not allowed. ",
                    SourceBucketLocation=source_bucket_region,
                    TargetBucketLocation=target_s3_bucket.bucket_region,
                )
            )

        s3_bucket.logging = logging_config

    def get_bucket_logging(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketLoggingOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.logging:
            return GetBucketLoggingOutput()

        return GetBucketLoggingOutput(LoggingEnabled=s3_bucket.logging)

    def put_bucket_replication(
        self,
        context: RequestContext,
        bucket: BucketName,
        replication_configuration: ReplicationConfiguration,
        content_md5: ContentMD5 = None,
        checksum_algorithm: ChecksumAlgorithm = None,
        token: ObjectLockToken = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        if not s3_bucket.versioning_status == BucketVersioningStatus.Enabled:
            raise InvalidRequest(
                "Versioning must be 'Enabled' on the bucket to apply a replication configuration"
            )

        if not (rules := replication_configuration.get("Rules")):
            raise MalformedXML()

        for rule in rules:
            if "ID" not in rule:
                rule["ID"] = short_uid()

            dest_bucket_arn = rule.get("Destination", {}).get("Bucket")
            dest_bucket_name = s3_bucket_name(dest_bucket_arn)
            if (
                not (dest_s3_bucket := store.buckets.get(dest_bucket_name))
                or not dest_s3_bucket.versioning_status == BucketVersioningStatus.Enabled
            ):
                # according to AWS testing the same exception is raised if the bucket does not exist
                # or if versioning was disabled
                raise InvalidRequest("Destination bucket must have versioning enabled.")

        # TODO more validation on input
        s3_bucket.replication = replication_configuration

    def get_bucket_replication(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketReplicationOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        if not s3_bucket.replication:
            raise ReplicationConfigurationNotFoundError(
                "The replication configuration was not found",
                BucketName=bucket,
            )

        return GetBucketReplicationOutput(ReplicationConfiguration=s3_bucket.replication)

    def delete_bucket_replication(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> None:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_bucket.replication = None

    @handler("PutBucketAcl", expand=False)
    def put_bucket_acl(
        self,
        context: RequestContext,
        request: PutBucketAclRequest,
    ) -> None:
        bucket = request["Bucket"]
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
        acp = get_access_control_policy_from_acl_request(
            request=request, owner=s3_bucket.owner, request_body=context.request.data
        )
        s3_bucket.acl = acp

    def get_bucket_acl(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketAclOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        return GetBucketAclOutput(Owner=s3_bucket.acl["Owner"], Grants=s3_bucket.acl["Grants"])

    @handler("PutObjectAcl", expand=False)
    def put_object_acl(
        self,
        context: RequestContext,
        request: PutObjectAclRequest,
    ) -> PutObjectAclOutput:
        bucket = request["Bucket"]
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_object = s3_bucket.get_object(
            key=request["Key"],
            version_id=request.get("VersionId"),
            http_method="PUT",
        )
        acp = get_access_control_policy_from_acl_request(
            request=request, owner=s3_object.owner, request_body=context.request.data
        )
        previous_acl = s3_object.acl
        s3_object.acl = acp

        if previous_acl != acp:
            self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)

        # TODO: RequestCharged
        return PutObjectAclOutput()

    def get_object_acl(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        version_id: ObjectVersionId = None,
        request_payer: RequestPayer = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetObjectAclOutput:
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        s3_object = s3_bucket.get_object(
            key=key,
            version_id=version_id,
        )
        # TODO: RequestCharged
        return GetObjectAclOutput(Owner=s3_object.acl["Owner"], Grants=s3_object.acl["Grants"])

    def get_bucket_policy_status(
        self,
        context: RequestContext,
        bucket: BucketName,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetBucketPolicyStatusOutput:
        raise NotImplementedError

    def get_object_torrent(
        self,
        context: RequestContext,
        bucket: BucketName,
        key: ObjectKey,
        request_payer: RequestPayer = None,
        expected_bucket_owner: AccountId = None,
        **kwargs,
    ) -> GetObjectTorrentOutput:
        raise NotImplementedError

    def post_object(
        self, context: RequestContext, bucket: BucketName, body: IO[Body] = None, **kwargs
    ) -> PostResponse:
        if "multipart/form-data" not in context.request.headers.get("Content-Type", ""):
            raise PreconditionFailed(
                "At least one of the pre-conditions you specified did not hold",
                Condition="Bucket POST must be of the enclosure-type multipart/form-data",
            )
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html
        # TODO: signature validation is not implemented for pre-signed POST
        # policy validation is not implemented either, except expiration and mandatory fields
        # This operation is the only one using form for storing the request data. We will have to do some manual
        # parsing here, as no specs are present for this, as no client directly implements this operation.
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)

        form = context.request.form
        object_key = context.request.form.get("key")

        if "file" in form:
            # in AWS, you can pass the file content as a string in the form field and not as a file object
            file_data = to_bytes(form["file"])
            object_content_length = len(file_data)
            stream = BytesIO(file_data)
        else:
            # this is the default behaviour
            fileobj = context.request.files["file"]
            stream = fileobj.stream
            # stream is a SpooledTemporaryFile, so we can seek the stream to know its length, necessary for policy
            # validation
            original_pos = stream.tell()
            object_content_length = stream.seek(0, 2)
            # reset the stream and put it back at its original position
            stream.seek(original_pos, 0)

            if "${filename}" in object_key:
                # TODO: ${filename} is actually usable in all form fields
                # See https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/PresignedPost.html
                # > The string ${filename} is automatically replaced with the name of the file provided by the user and
                # is recognized by all form fields.
                object_key = object_key.replace("${filename}", fileobj.filename)

        # TODO: see if we need to pass additional metadata not contained in the policy from the table under
        # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions
        additional_policy_metadata = {
            "bucket": bucket,
            "content_length": object_content_length,
        }
        validate_post_policy(form, additional_policy_metadata)

        if canned_acl := form.get("acl"):
            validate_canned_acl(canned_acl)
            acp = get_canned_acl(canned_acl, owner=s3_bucket.owner)
        else:
            acp = get_canned_acl(BucketCannedACL.private, owner=s3_bucket.owner)

        post_system_settable_headers = [
            "Cache-Control",
            "Content-Type",
            "Content-Disposition",
            "Content-Encoding",
        ]
        system_metadata = {}
        for system_metadata_field in post_system_settable_headers:
            if field_value := form.get(system_metadata_field):
                system_metadata[system_metadata_field.replace("-", "")] = field_value

        if not system_metadata.get("ContentType"):
            system_metadata["ContentType"] = "binary/octet-stream"

        user_metadata = {
            field.removeprefix("x-amz-meta-").lower(): form.get(field)
            for field in form
            if field.startswith("x-amz-meta-")
        }

        if tagging := form.get("tagging"):
            # this is weird, as it's direct XML in the form, we need to parse it directly
            tagging = parse_post_object_tagging_xml(tagging)

        if (storage_class := form.get("x-amz-storage-class")) is not None and (
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
        ):
            raise InvalidStorageClass(
                "The storage class you specified is not valid", StorageClassRequested=storage_class
            )

        encryption_request = {
            "ServerSideEncryption": form.get("x-amz-server-side-encryption"),
            "SSEKMSKeyId": form.get("x-amz-server-side-encryption-aws-kms-key-id"),
            "BucketKeyEnabled": form.get("x-amz-server-side-encryption-bucket-key-enabled"),
        }

        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
            encryption_request,
            s3_bucket,
            store,
        )

        checksum_algorithm = form.get("x-amz-checksum-algorithm")
        checksum_value = (
            form.get(f"x-amz-checksum-{checksum_algorithm.lower()}") if checksum_algorithm else None
        )
        expires = (
            str_to_rfc_1123_datetime(expires_str) if (expires_str := form.get("Expires")) else None
        )

        version_id = generate_version_id(s3_bucket.versioning_status)

        s3_object = S3Object(
            key=object_key,
            version_id=version_id,
            storage_class=storage_class,
            expires=expires,
            user_metadata=user_metadata,
            system_metadata=system_metadata,
            checksum_algorithm=checksum_algorithm,
            checksum_value=checksum_value,
            encryption=encryption_parameters.encryption,
            kms_key_id=encryption_parameters.kms_key_id,
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
            website_redirect_location=form.get("x-amz-website-redirect-location"),
            acl=acp,
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
        )

        with self._storage_backend.open(bucket, s3_object, mode="w") as s3_stored_object:
            s3_stored_object.write(stream)

            if checksum_algorithm and s3_object.checksum_value != s3_stored_object.checksum:
                self._storage_backend.remove(bucket, s3_object)
                raise InvalidRequest(
                    f"Value for x-amz-checksum-{checksum_algorithm.lower()} header is invalid."
                )

            s3_bucket.objects.set(object_key, s3_object)

        # in case we are overriding an object, delete the tags entry
        key_id = get_unique_key_id(bucket, object_key, version_id)
        store.TAGS.tags.pop(key_id, None)
        if tagging:
            store.TAGS.tags[key_id] = tagging

        response = PostResponse()
        # hacky way to set the etag in the headers as well: two locations for one value
        response["ETagHeader"] = s3_object.quoted_etag

        if redirect := form.get("success_action_redirect"):
            # we need to create the redirect, as the parser could not return the moto-calculated one
            try:
                redirect = create_redirect_for_post_request(
                    base_redirect=redirect,
                    bucket=bucket,
                    object_key=object_key,
                    etag=s3_object.quoted_etag,
                )
                response["LocationHeader"] = redirect
                response["StatusCode"] = 303
            except ValueError:
                # If S3 cannot interpret the URL, it acts as if the field is not present.
                response["StatusCode"] = form.get("success_action_status", 204)

        elif status_code := form.get("success_action_status"):
            response["StatusCode"] = status_code
        else:
            response["StatusCode"] = 204

        response["LocationHeader"] = response.get(
            "LocationHeader", f"{get_full_default_bucket_location(bucket)}{object_key}"
        )

        if s3_bucket.versioning_status == "Enabled":
            response["VersionId"] = s3_object.version_id

        if s3_object.checksum_algorithm:
            response[f"Checksum{checksum_algorithm.upper()}"] = s3_object.checksum_value

        if s3_bucket.lifecycle_rules:
            if expiration_header := self._get_expiration_header(
                s3_bucket.lifecycle_rules,
                bucket,
                s3_object,
                store.TAGS.tags.get(key_id, {}),
            ):
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
                #  apply them everytime we get/head an object
                response["Expiration"] = expiration_header

        add_encryption_to_response(response, s3_object=s3_object)

        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)

        if response["StatusCode"] == "201":
            # if the StatusCode is 201, S3 returns an XML body with additional information
            response["ETag"] = s3_object.quoted_etag
            response["Bucket"] = bucket
            response["Key"] = object_key
            response["Location"] = response["LocationHeader"]

        return response


def generate_version_id(bucket_versioning_status: str) -> str | None:
    if not bucket_versioning_status:
        return None
    # TODO: check VersionID format, could it be base64 urlsafe encoded?
    return token_urlsafe(16) if bucket_versioning_status.lower() == "enabled" else "null"


def add_encryption_to_response(response: dict, s3_object: S3Object):
    if encryption := s3_object.encryption:
        response["ServerSideEncryption"] = encryption
        if encryption == ServerSideEncryption.aws_kms:
            response["SSEKMSKeyId"] = s3_object.kms_key_id
            if s3_object.bucket_key_enabled:
                response["BucketKeyEnabled"] = s3_object.bucket_key_enabled


def get_encryption_parameters_from_request_and_bucket(
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
    s3_bucket: S3Bucket,
    store: S3Store,
) -> EncryptionParameters:
    encryption = request.get("ServerSideEncryption")
    kms_key_id = request.get("SSEKMSKeyId")
    bucket_key_enabled = request.get("BucketKeyEnabled")
    if s3_bucket.encryption_rule:
        bucket_key_enabled = bucket_key_enabled or s3_bucket.encryption_rule.get("BucketKeyEnabled")
        encryption = (
            encryption
            or s3_bucket.encryption_rule["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
        )
        if encryption == ServerSideEncryption.aws_kms:
            key_id = kms_key_id or s3_bucket.encryption_rule[
                "ApplyServerSideEncryptionByDefault"
            ].get("KMSMasterKeyID")
            kms_key_id = get_kms_key_arn(
                key_id, s3_bucket.bucket_account_id, s3_bucket.bucket_region
            )
            if not kms_key_id:
                # if not key is provided, AWS will use an AWS managed KMS key
                # create it if it doesn't already exist, and save it in the store per region
                if not store.aws_managed_kms_key_id:
                    managed_kms_key_id = create_s3_kms_managed_key_for_region(
                        s3_bucket.bucket_account_id, s3_bucket.bucket_region
                    )
                    store.aws_managed_kms_key_id = managed_kms_key_id

                kms_key_id = store.aws_managed_kms_key_id

    return EncryptionParameters(encryption, kms_key_id, bucket_key_enabled)


def get_object_lock_parameters_from_bucket_and_request(
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
    s3_bucket: S3Bucket,
):
    # TODO: also validate here?
    lock_mode = request.get("ObjectLockMode")
    lock_legal_status = request.get("ObjectLockLegalHoldStatus")
    lock_until = request.get("ObjectLockRetainUntilDate")

    if default_retention := s3_bucket.object_lock_default_retention:
        lock_mode = lock_mode or default_retention.get("Mode")
        if lock_mode and not lock_until:
            lock_until = get_retention_from_now(
                days=default_retention.get("Days"),
                years=default_retention.get("Years"),
            )

    return ObjectLockParameters(lock_until, lock_legal_status, lock_mode)


def get_part_range(s3_object: S3Object, part_number: PartNumber) -> ObjectRange:
    """
    Calculate the range value from a part Number for an S3 Object
    :param s3_object: S3Object
    :param part_number: the wanted part from the S3Object
    :return: an ObjectRange used to return only a slice of an Object
    """
    if not s3_object.parts:
        if part_number > 1:
            raise InvalidPartNumber(
                "The requested partnumber is not satisfiable",
                PartNumberRequested=part_number,
                ActualPartCount=1,
            )
        return ObjectRange(
            begin=0,
            end=s3_object.size - 1,
            content_length=s3_object.size,
            content_range=f"bytes 0-{s3_object.size - 1}/{s3_object.size}",
        )
    elif not (part_data := s3_object.parts.get(part_number)):
        raise InvalidPartNumber(
            "The requested partnumber is not satisfiable",
            PartNumberRequested=part_number,
            ActualPartCount=len(s3_object.parts),
        )

    begin, part_length = part_data
    end = begin + part_length - 1
    return ObjectRange(
        begin=begin,
        end=end,
        content_length=part_length,
        content_range=f"bytes {begin}-{end}/{s3_object.size}",
    )


def get_acl_headers_from_request(
    request: Union[
        PutObjectRequest,
        CreateMultipartUploadRequest,
        CopyObjectRequest,
        CreateBucketRequest,
        PutBucketAclRequest,
        PutObjectAclRequest,
    ],
) -> list[tuple[str, str]]:
    permission_keys = [
        "GrantFullControl",
        "GrantRead",
        "GrantReadACP",
        "GrantWrite",
        "GrantWriteACP",
    ]
    acl_headers = [
        (permission, grant_header)
        for permission in permission_keys
        if (grant_header := request.get(permission))
    ]
    return acl_headers


def get_access_control_policy_from_acl_request(
    request: Union[PutBucketAclRequest, PutObjectAclRequest],
    owner: Owner,
    request_body: bytes,
) -> AccessControlPolicy:
    canned_acl = request.get("ACL")
    acl_headers = get_acl_headers_from_request(request)

    # FIXME: this is very dirty, but the parser does not differentiate between an empty body and an empty XML node
    # errors are different depending on that data, so we need to access the context. Modifying the parser for this
    # use case seems dangerous
    is_acp_in_body = request_body

    if not (canned_acl or acl_headers or is_acp_in_body):
        raise MissingSecurityHeader(
            "Your request was missing a required header", MissingHeaderName="x-amz-acl"
        )

    elif canned_acl and acl_headers:
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")

    elif (canned_acl or acl_headers) and is_acp_in_body:
        raise UnexpectedContent("This request does not support content")

    if canned_acl:
        validate_canned_acl(canned_acl)
        acp = get_canned_acl(canned_acl, owner=owner)

    elif acl_headers:
        grants = []
        for permission, grantees_values in acl_headers:
            permission = get_permission_from_header(permission)
            partial_grants = parse_grants_in_headers(permission, grantees_values)
            grants.extend(partial_grants)

        acp = AccessControlPolicy(Owner=owner, Grants=grants)
    else:
        acp = request.get("AccessControlPolicy")
        validate_acl_acp(acp)
        if (
            owner.get("DisplayName")
            and acp["Grants"]
            and "DisplayName" not in acp["Grants"][0]["Grantee"]
        ):
            acp["Grants"][0]["Grantee"]["DisplayName"] = owner["DisplayName"]

    return acp


def get_access_control_policy_for_new_resource_request(
    request: Union[
        PutObjectRequest, CreateMultipartUploadRequest, CopyObjectRequest, CreateBucketRequest
    ],
    owner: Owner,
) -> AccessControlPolicy:
    # TODO: this is basic ACL, not taking into account Bucket settings. Revisit once we really implement ACLs.
    canned_acl = request.get("ACL")
    acl_headers = get_acl_headers_from_request(request)

    if not (canned_acl or acl_headers):
        return get_canned_acl(BucketCannedACL.private, owner=owner)

    elif canned_acl and acl_headers:
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")

    if canned_acl:
        validate_canned_acl(canned_acl)
        return get_canned_acl(canned_acl, owner=owner)

    grants = []
    for permission, grantees_values in acl_headers:
        permission = get_permission_from_header(permission)
        partial_grants = parse_grants_in_headers(permission, grantees_values)
        grants.extend(partial_grants)

    return AccessControlPolicy(Owner=owner, Grants=grants)