rapid7/metasploit-framework

View on GitHub
lib/msf/core/exploit/remote/kerberos/service_authenticator/base.rb

Summary

Maintainability
F
6 days
Test Coverage
# -*- coding: binary -*-

#
# This class acts as standalone authenticator for Kerberos
#
class Msf::Exploit::Remote::Kerberos::ServiceAuthenticator::Base
  extend Forwardable
  include Msf::Exploit::Remote::Kerberos::Client
  include Msf::Auxiliary::Report
  include Rex::Proto::Gss::Asn1

  # @!attribute [r] realm
  #   @return [String] the realm to use
  attr_reader :realm

  # @!attribute [r] username
  #   @return [String] the username to use
  attr_reader :username

  # @!attribute [r] password
  #   @return [String] the password to use
  attr_reader :password

  # @!attribute [r] pfx
  #   @return [OpenSSL::PKCS12] the pfx certificate to use with pkinit
  attr_reader :pfx

  # @!attribute [r] hostname
  #   @return [String] the unresolved name of the host that the ticket will be used against
  attr_reader :hostname

  # @!attribute [r] host
  #   @return [String] the kerberos host to request a ticket from
  attr_reader :host

  # @!attribute [r] host
  #   @return [Integer] the kerberos port to request a ticket from
  attr_reader :port

  # @!attribute [r] host
  #   @return [String,nil] The proxy directive to use for the socket
  attr_reader :proxies

  # @!attribute [r] timeout
  #   @return [Integer] the kerberos timeout
  attr_reader :timeout

  # @!attribute [r] framework
  #   @return [Msf::Framework] the Metasploit framework instance
  attr_reader :framework

  # @!attribute [r] framework_module
  #   @return [Msf::Module] the Metasploit framework module that is associated with the authentication instance
  attr_reader :framework_module

  # @!attribute [r] mutual_auth
  #   @return [Boolean] whether to use mutual authentication
  attr_reader :mutual_auth

  # @!attribute [r] use_gss_checksum
  #   @return [Boolean] whether to use an RFC4121-compliant checksum
  attr_reader :use_gss_checksum

  # @!attribute [r] mechanism
  #   @return [String] the GSS mechanism being used (from Rex::Proto::Gss::Mechanism)
  attr_reader :mechanism

  # @!attribute [r] send_delegated_creds
  #   @return [String] whether to send delegated creds (from the set Msf::Exploit::Remote::Kerberos::ServiceAuthenticator::Base::Delegation)
  attr_reader :send_delegated_creds

  # @!attribute [r] dce_style
  #   @return [Boolean] Whether this encryptor will be used for DCERPC purposes (since the behaviour is subtly different)
  attr_reader :dce_style

  # @!attribute [r] ticket_storage
  #   @return [Msf::Exploit::Remote::Kerberos::Ticket::Storage::Base] the ticket storage driver
  attr_reader :ticket_storage

  # @!attribute [r] key
  #   @return [String] the encryption key for authentication
  attr_reader :key

  # @!attribute [r] offered_etypes
  #   @return [Array[Integer],nil] the offered etypes, if not present the default values will be used
  # @see Rex::Proto::Kerberos::Crypto::Encryption::DefaultOfferedEtypes
  attr_reader :offered_etypes

  def_delegators :@framework_module,
                :print_status,
                :print_good,
                :vprint_error,
                :vprint_status,
                :workspace

  # Flags - https://datatracker.ietf.org/doc/html/rfc4121#section-4.1.1.1
  GSS_DELEGATE      = 0x01
  GSS_MUTUAL        = 0x02
  GSS_REPLAY_DETECT = 0x04
  GSS_SEQUENCE      = 0x08
  GSS_CONFIDENTIAL  = 0x10
  GSS_INTEGRITY     = 0x20
  GSS_DCE_STYLE     = 0x1000

  module Delegation
    ALWAYS = 'always' # Always send delegated creds
    NEVER = 'never' # Never send delegated creds
    WHEN_UNCONSTRAINED = 'when_unconstrained' # Send delegated creds when service is unconstrained delegation account
  end

  def initialize(
      realm: nil,
      hostname: nil,
      username: nil,
      password: nil,
      host: nil,
      proxies: nil,
      port: 88,
      timeout: 25,
      framework: nil,
      framework_module: nil,
      mutual_auth: false,
      use_gss_checksum: false,
      mechanism: Rex::Proto::Gss::Mechanism::SPNEGO,
      send_delegated_creds: Delegation::ALWAYS,
      dce_style: false,
      cache_file: nil,
      ticket_storage: nil,
      key: nil,
      offered_etypes: nil,
      pfx: nil
  )
    @realm = realm
    @hostname = hostname
    @host = host
    @proxies = proxies
    @port = port
    @timeout = timeout
    @username = username
    @password = password
    @pfx = pfx
    @framework = framework
    @framework_module = framework_module
    @mutual_auth = mutual_auth
    @use_gss_checksum = use_gss_checksum
    @mechanism = mechanism
    @send_delegated_creds = send_delegated_creds
    @dce_style = dce_style
    @ticket_storage = ticket_storage || Msf::Exploit::Remote::Kerberos::Ticket::Storage::ReadWrite.new(
      framework: framework,
      framework_module: framework_module
    )
    @key = key
    @offered_etypes = offered_etypes

    credential = nil
    if cache_file.present?
      # the cache file is only used for loading credentials, it is *not* written to
      credential = load_credential_from_file(cache_file, sname: nil, sname_hostname: @hostname)
      serviceclass = build_spn.name_string.first
      if credential && credential.server.components[0] != serviceclass
        old_sname = credential.server.components.snapshot.join('/')
        credential.server.components[0] = serviceclass
        new_sname = credential.server.components.snapshot.join('/')
        print_status("Patching sname from #{old_sname} to #{new_sname}")
        ticket = Rex::Proto::Kerberos::Model::Ticket.decode(credential.ticket.value)
        ticket.sname.name_string[0] = serviceclass
        credential.ticket = ticket.encode
      elsif credential.nil? && hostname.present?
        credential = load_credential_from_file(cache_file, sname: "krbtgt/#{hostname.split('.', 2).last}")
      end
      if credential.nil?
        raise ::Rex::Proto::Kerberos::Model::Error::KerberosError.new("Failed to load a usable credential from ticket file: #{cache_file}")
      end
      print_status("Loaded a credential from ticket file: #{cache_file}")
    end
    @credential = credential
  end

  # Returns the target host
  #
  # @return [String]
  def rhost
    host
  end

  # Returns the remote port
  #
  # @return [Integer]
  def rport
    port
  end

  def connect(options = {})
    unless options[:rhost]
      unless (host = @host)
        vprint_status("Using DNS to lookup the KDC for #{realm}...")
        host = ::Rex::Socket.getresources("_kerberos._tcp.#{realm}", :SRV)&.sample
        if host.nil?
          raise ::Rex::Proto::Kerberos::Model::Error::KerberosError.new("Failed to lookup the KDC")
        end
        print_status("Using KDC #{host} for realm #{realm}")
        @host = host
      end
      options[:rhost] = host
    end

    super(options)
  end

  # @param [Hash] options
  # @option options [String] :credential An explicit credential object to use for authentication.
  # @option options [Rex::Proto::Kerberos::Model::PrincipalName] :sname The target service principal name.
  # @option options [String] :mechanism The authentication mechanism. One of the Rex::Proto::Gss::Mechanism constants.
  # @return [Hash] The security_blob SPNEGO GSS and TGS session key
  def authenticate(options = {})
    options[:sname] = options.fetch(:sname) { build_spn(options) }

    unless options[:credential]
      if @credential
        # use an explicit credential
        options[:credential] = @credential
      else
        # load a cached TGS
        options[:credential] = get_cached_credential(options)
        tgt_sname = Rex::Proto::Kerberos::Model::PrincipalName.new(
          name_type: Rex::Proto::Kerberos::Model::NameType::NT_SRV_INST,
          name_string: [
            "krbtgt",
            realm
          ]
        )
        unless options[:credential]
          # load a cached TGT (specific host)
          options[:credential] = get_cached_credential(
            options.merge(sname: tgt_sname)
          )
        end
        unless options[:credential]
          # load a cached TGT (any host)
          options[:credential] = get_cached_credential(
            options.merge(sname: tgt_sname, host: nil)
          )
        end
        if options[:credential]
          print_status("Using cached credential for #{options[:credential].server} #{options[:credential].client}")
        end
      end
    end

    if options[:credential] && options[:credential].server.to_s.start_with?('krbtgt/')
      auth_context = authenticate_via_krb5_ccache_credential_tgt(options[:credential], options)
    elsif options[:credential]
      auth_context = authenticate_via_krb5_ccache_credential_tgs(options[:credential], options)
    else
      auth_context = authenticate_via_kdc(options)
      auth_context = authenticate_via_krb5_ccache_credential_tgt(auth_context[:credential], options)
    end

    ap_request_asn1 = auth_context.delete(:service_ap_request).to_asn1

    mechanism = options.fetch(:mechanism) { self.mechanism }
    if mechanism == Rex::Proto::Gss::Mechanism::SPNEGO
      security_blob = encode_gss_spnego_ap_request(ap_request_asn1)
    elsif mechanism == Rex::Proto::Gss::Mechanism::KERBEROS
      security_blob = encode_gss_kerberos_ap_request(ap_request_asn1)
    else
      raise RuntimeError, "Unknown GSS mechanism: #{mechanism}"
    end

    auth_context[:security_blob] = security_blob
    auth_context
  end

  def get_message_encryptor(key, client_sequence_number, server_sequence_number)
    Rex::Proto::Gss::Kerberos::MessageEncryptor.new(key,
                                            client_sequence_number,
                                            server_sequence_number,
                                            is_initiator: true,
                                            use_acceptor_subkey: true,
                                            dce_style: @dce_style)
  end

  def parse_gss_init_response(token, session_key, mechanism: 'kerberos')
    mech_id, encapsulated_token = unwrap_pseudo_asn1(token)

    if mech_id.value == Rex::Proto::Gss::OID_KERBEROS_5.value
      tok_id = encapsulated_token[0,2]
      data = encapsulated_token[2, encapsulated_token.length]
      case tok_id
      when TOK_ID_KRB_AP_REP
        ap_rep = Rex::Proto::Kerberos::Model::ApRep.decode(data)
        print_good("#{peer} - Received AP-REQ. Extracting session key...")

        raise ::Rex::Proto::Kerberos::Model::Error::KerberosError, 'Mismatching etypes' if session_key.type != ap_rep.enc_part.etype

        decrypted = ap_rep.decrypt_enc_part(session_key.value)

        result = {
          ap_rep_subkey: decrypted.subkey,
          server_sequence_number: decrypted.sequence_number,
          etype: ap_rep.enc_part.etype
        }
      when TOK_ID_KRB_ERROR
        krb_err = Rex::Proto::Kerberos::Model::KrbError.decode(data)
        print_error("#{peer} - Received KRB-ERR.")

        raise ::Rex::Proto::Kerberos::Model::Error::KerberosError.new(res: krb_err)
      else
        raise ::Rex::Proto::Kerberos::Model::Error::KerberosError, "Unknown token id: #{tok_id.inspect}"
      end
    else
      raise ::NotImplementedError, "Parsing mechtype #{mech_id.value} not supported"
    end

  end

  # @param security_blob [String] SPNEGO GSS Blob
  # @param accept_incomplete [Boolean] Whether an Incomplete value is an acceptable response
  # @raise [Rex::Proto::Kerberos::Model::Error::KerberosError] if the response was not successful
  # @raise [Rex::Proto::Kerberos::Model::Error::KerberosDecodingError] if the response was invalid per the Kerberos/GSS protocol
  def validate_response!(security_blob, accept_incomplete: false)
    begin
      gss_api = OpenSSL::ASN1.decode(security_blob)
      neg_result = ::RubySMB::Gss.asn1dig(gss_api, 0, 0, 0)&.value.to_i
      supported_neg = ::RubySMB::Gss.asn1dig(gss_api, 0, 1, 0)&.value
    rescue OpenSSL::ASN1::ASN1Error
      raise ::Rex::Proto::Kerberos::Model::Error::KerberosDecodingError.new('Invalid GSS Response')
    end

    is_success = (neg_result == NEG_TOKEN_ACCEPT_COMPLETED || (accept_incomplete && neg_result == NEG_TOKEN_ACCEPT_INCOMPLETE)) &&
      supported_neg == ::Rex::Proto::Gss::OID_MICROSOFT_KERBEROS_5.value

    raise ::Rex::Proto::Kerberos::Model::Error::KerberosError.new('Failed to negotiate Kerberos GSS') unless is_success

    is_success
  end

  def build_spn(options = {})
    nil
  end

  # @param [Hash] options
  # @option options [Msf::Exploit::Remote::Kerberos::Ticket::Storage::Base] :ticket_storage Override the @ticket_storage attribute
  # @see #authenticate_via_kdc Options documentation
  # @see #get_cached_credential Other options documentation
  # @return [Rex::Proto::Kerberos::CredentialCache::Krb5CcacheCredential] The ccache credential
  def request_tgt_only(options = {})
    if options[:cache_file]
      credential = load_credential_from_file(options[:cache_file])
    else
      credential = get_cached_credential(
        options.merge(
          sname: Rex::Proto::Kerberos::Model::PrincipalName.new(
            name_type: Rex::Proto::Kerberos::Model::NameType::NT_SRV_INST,
            name_string: [
              "krbtgt",
              realm
            ]
          )
        )
      )
    end

    if credential
      print_status("Using cached credential for #{credential.server} #{credential.client}")
      return credential
    end

    auth_context = authenticate_via_kdc(options)
    return auth_context[:credential]
  end

  # @param [Hash] options
  # @option options [Msf::Exploit::Remote::Kerberos::Ticket::Storage::Base] :ticket_storage Override the @ticket_storage attribute
  # @param [Rex::Proto::Kerberos::CredentialCache::Krb5CcacheCredential] :credential
  #   The ccache credential from the TGT
  # @see #authenticate_via_krb5_ccache_credential_tgt Options dcoumentation
  # @see #get_cached_credential Other options dcoumentation
  # @return [Rex::Proto::Kerberos::CredentialCache::Krb5CcacheCredential] The ccache credential
  def request_tgs_only(credential, options = {})
    # load a cached TGS
    if (ccache = get_cached_credential(options))
      print_status("Using cached credential for #{ccache.server} #{ccache.client}")
      return ccache
    end

    auth_context = authenticate_via_krb5_ccache_credential_tgt(credential, options)
    auth_context[:credential]
  end

  # Request a service ticket to itself on behalf of a user
  #
  # @param [Rex::Proto::Kerberos::CredentialCache::Krb5CcacheCredential] :credential
  #   The ccache credential from the TGT
  # @param [Hash] options
  # @option options [Rex::Proto::Kerberos::Model::PrincipalName] :sname The target service principal name.
  # @option options [Msf::Exploit::Remote::Kerberos::Ticket::Storage::Base] :ticket_storage Override the @ticket_storage attribute
  # @option options [String] :impersonate The name of the user to request a ticket on behalf of
  # @return [Array] The TGS ticket and the decrypted TGS credentials as a MIT Cache Credential
  def s4u2self(credential, options = {})
    realm = self.realm.upcase
    sname = options.fetch(:sname)
    client_name = username

    now = Time.now.utc
    expiry_time = now + 1.day

    ticket = Rex::Proto::Kerberos::Model::Ticket.decode(credential.ticket.value)
    session_key = Rex::Proto::Kerberos::Model::EncryptionKey.new(
      type: credential.keyblock.enctype.value,
      value: credential.keyblock.data.value
    )

    etypes = Set.new([credential.keyblock.enctype.value])
    etypes << Rex::Proto::Kerberos::Crypto::Encryption::RC4_HMAC

    tgs_options = {
      ticket_storage: options.fetch(:ticket_storage, @ticket_storage),
      credential_cache_username: options[:impersonate],
      pa_data: build_pa_for_user(
        {
          username: options[:impersonate],
          session_key: session_key,
          realm: realm
        }
      )
    }

    request_service_ticket(
      session_key,
      ticket,
      realm,
      client_name,
      etypes,
      expiry_time,
      now,
      sname,
      tgs_options
    )
  end

  # Request a service ticket to another service on behalf of a user
  #
  # @param [Rex::Proto::Kerberos::CredentialCache::Krb5CcacheCredential] credential The ccache credential from the TGT
  # @param [Hash] options
  # @option options [Rex::Proto::Kerberos::Model::PrincipalName] :sname The target service principal name.
  # @option options [Rex::Proto::Kerberos::Model::Ticket] :tgs_ticket The service ticket to the first service.
  #   It must have the forwardable flag set. This ticket can be obtained with #s4u2self.
  # @option options [Msf::Exploit::Remote::Kerberos::Ticket::Storage::Base] :ticket_storage Override the @ticket_storage attribute
  # @option options [String] :impersonate The name of the user to request a ticket on behalf of
  # @return [Array] The new TGS ticket and the decrypted TGS credentials as a MIT Cache Credential
  def s4u2proxy(credential, options = {})
    realm = self.realm.upcase
    sname = options.fetch(:sname)
    client_name = username

    now = Time.now.utc
    expiry_time = now + 1.day

    ticket = Rex::Proto::Kerberos::Model::Ticket.decode(credential.ticket.value)
    session_key = Rex::Proto::Kerberos::Model::EncryptionKey.new(
      type: credential.keyblock.enctype.value,
      value: credential.keyblock.data.value
    )

    pa_pac_options_flags = Rex::Proto::Kerberos::Model::PreAuthPacOptionsFlags.from_flags(
      [
        Rex::Proto::Kerberos::Model::PreAuthPacOptionsFlags::RESOURCE_BASED_CONSTRAINED_DELEGATION
      ]
    )
    pa_pac_options = Rex::Proto::Kerberos::Model::PreAuthPacOptions.new(
      flags: pa_pac_options_flags
    )
    pa_data_entry = Rex::Proto::Kerberos::Model::PreAuthDataEntry.new(
      type: Rex::Proto::Kerberos::Model::PreAuthType::PA_PAC_OPTIONS,
      value: pa_pac_options.encode
    )

    etypes = Set.new([credential.keyblock.enctype.value])
    etypes << Rex::Proto::Kerberos::Crypto::Encryption::RC4_HMAC
    etypes << Rex::Proto::Kerberos::Crypto::Encryption::DES_CBC_MD5
    etypes << Rex::Proto::Kerberos::Crypto::Encryption::DES3_CBC_SHA1

    tgs_options = {
      pa_data: pa_data_entry,
      additional_flags: [Rex::Proto::Kerberos::Model::KdcOptionFlags::CNAME_IN_ADDL_TKT],
      additional_tickets: [options[:tgs_ticket]],
      ticket_storage: options.fetch(:ticket_storage, @ticket_storage),
      credential_cache_username: options[:impersonate]
    }

    request_service_ticket(
      session_key,
      ticket,
      realm,
      client_name,
      etypes,
      expiry_time,
      now,
      sname,
      tgs_options
    )
  end

  # Request a service ticket to a user on behalf of themselves
  # This is mostly useful for PKINIT to recover the NT hash
  # Can combine this with S4U2Self by providing an :impersonate option
  # to retrieve a PAC for any account, i.e. Sapphire Ticket attack
  #
  # @see https://learn.microsoft.com/en-us/archive/blogs/openspecification/how-kerberos-user-to-user-authentication-works
  #
  # @param credential [Rex::Proto::Kerberos::CredentialCache::Krb5CcacheCredential] The ccache credential from the TGT
  # @param [Hash] options
  def u2uself(credential, options = {})
    realm = self.realm.upcase
    client_name = options.fetch(:username) { self.username }
    sname = options.fetch(:sname) {
      Rex::Proto::Kerberos::Model::PrincipalName.new(
        name_type: Rex::Proto::Kerberos::Model::NameType::NT_UNKNOWN,
        name_string: [ client_name ]
      )
    }

    now = Time.now.utc
    expiry_time = now + 1.day

    ticket = Rex::Proto::Kerberos::Model::Ticket.decode(credential.ticket.value)
    session_key = Rex::Proto::Kerberos::Model::EncryptionKey.new(
      type: credential.keyblock.enctype.value,
      value: credential.keyblock.data.value
    )

    etypes = Set.new([ticket.enc_part.etype])
    etypes << Rex::Proto::Kerberos::Crypto::Encryption::RC4_HMAC

    tgs_options = {
      ticket_storage: options.fetch(:ticket_storage, @ticket_storage),
      credential_cache_username: client_name,
      additional_flags: [
        Rex::Proto::Kerberos::Model::KdcOptionFlags::ENC_TKT_IN_SKEY,
        Rex::Proto::Kerberos::Model::KdcOptionFlags::RENEWABLE_OK
      ],
      additional_tickets: [ticket]
    }

    if options[:impersonate]
      tgs_options[:pa_data] = build_pa_for_user(
        {
          username: options[:impersonate],
          session_key: session_key,
          realm: self.realm
        }
      )
    end

    request_service_ticket(
      session_key,
      ticket,
      realm,
      client_name,
      etypes,
      expiry_time,
      now,
      sname,
      tgs_options
    )
  end

  # Authenticate with credentials to the key distribution center (KDC). This will request a TGT only.
  #
  # @param [Hash] options
  def authenticate_via_kdc(options = {})
    realm = self.realm.upcase
    client_name = username
    server_name = "krbtgt/#{realm}"

    ticket_options = Rex::Proto::Kerberos::Model::KdcOptionFlags.from_flags(
      [
        Rex::Proto::Kerberos::Model::KdcOptionFlags::FORWARDABLE,
        Rex::Proto::Kerberos::Model::KdcOptionFlags::RENEWABLE,
        Rex::Proto::Kerberos::Model::KdcOptionFlags::CANONICALIZE,
        Rex::Proto::Kerberos::Model::KdcOptionFlags::RENEWABLE_OK
      ]
    )

    if (pfx = options.fetch(:pfx) { self.pfx })
      offered_etypes = options.fetch(:offered_etypes) do
        self.offered_etypes || Rex::Proto::Kerberos::Crypto::Encryption::PkinitEtypes
      end

      tgt_result = send_request_tgt_pkinit(
        server_name: server_name,
        client_name: client_name,
        pfx: pfx,
        realm: realm,
        options: ticket_options,
        offered_etypes: offered_etypes
      )
    else
      offered_etypes = options.fetch(:offered_etypes) do
        self.offered_etypes || Rex::Proto::Kerberos::Crypto::Encryption::DefaultOfferedEtypes
      end

      tgt_result = send_request_tgt(
        server_name: server_name,
        client_name: client_name,
        password: password,
        key: key,
        realm: realm,
        options: ticket_options,
        offered_etypes: offered_etypes
      )
    end

    if tgt_result.decrypted_part.nil? && !tgt_result.preauth_required
      raise ::Rex::Proto::Kerberos::Model::Error::KerberosError.new(
        'Kerberos ticket does not require preauthentication. It is not possible to decrypt the encrypted message to request further TGS tickets. Try cracking the password via AS-REP Roasting techniques.',
      )
    end

    print_good("#{peer} - Received a valid TGT-Response")

    ccache = Rex::Proto::Kerberos::CredentialCache::Krb5Ccache.from_responses(tgt_result.as_rep, tgt_result.decrypted_part)
    options.fetch(:ticket_storage, @ticket_storage).store_ccache(ccache, host: rhost)

    credential = ccache.credentials.first
    session_key = Rex::Proto::Kerberos::Model::EncryptionKey.new(
      type: credential.keyblock.enctype.value,
      value: credential.keyblock.data.value
    )

    { credential: credential, session_key: session_key, krb_enc_key: tgt_result.krb_enc_key }
  end

  private

  # Authenticate with a ticket-granting-service (TGS). This method will not contact the KDC and can not request a
  # delegation ticket.
  #
  # @param [Rex::Proto::Kerberos::CredentialCache::Krb5CcacheCredential] credential
  # @param [Hash] _options
  def authenticate_via_krb5_ccache_credential_tgs(credential, _options = {})
    unless credential.is_a?(Rex::Proto::Kerberos::CredentialCache::Krb5CcacheCredential)
      raise TypeError, 'credential must be a Krb5CcacheCredential instance'
    end

    tgs_auth_key = Rex::Proto::Kerberos::Model::EncryptionKey.new(
      type: credential.keyblock.enctype.to_i,
      value: credential.keyblock.data.to_s
    )
    tgs_ticket = Rex::Proto::Kerberos::Model::Ticket.decode(credential.ticket.to_s)

    case send_delegated_creds
    when Delegation::ALWAYS
      do_delegation = true
    when Delegation::WHEN_UNCONSTRAINED
      do_delegation = credential.ticket_flags.include?(Rex::Proto::Kerberos::Model::KdcOptionFlags::OK_AS_DELEGATE)
    end

    if do_delegation
      # the cache is currently backed by a looted ccache file (see #authenticate_via_kdc) and the MIT ccache file format
      # does not have a documented means to store a delegation ticket which is a Microsoft-specific extension
      wlog('Can not process delegation when using a cached credential at this time')
    end

    ## Service Authentication
    checksum = nil
    checksum = build_gss_ap_req_checksum_value(mutual_auth, dce_style, nil, nil, nil, nil, nil) if use_gss_checksum

    sequence_number = rand(1 << 32)
    service_ap_request = build_service_ap_request(
      session_key: tgs_auth_key,
      checksum: checksum,
      ticket: tgs_ticket,
      realm: self.realm.upcase,
      client_name: username,
      options: Rex::Proto::Kerberos::Model::KdcOptionFlags.from_flags(
        [
          Rex::Proto::Kerberos::Model::KdcOptionFlags::FORWARDABLE,
          Rex::Proto::Kerberos::Model::KdcOptionFlags::RENEWABLE,
          Rex::Proto::Kerberos::Model::KdcOptionFlags::CANONICALIZE,
          Rex::Proto::Kerberos::Model::KdcOptionFlags::RENEWABLE_OK
        ]
      ),
      sequence_number: sequence_number
    )

    {
      service_ap_request: service_ap_request,
      session_key: tgs_auth_key,
      client_sequence_number: sequence_number
    }
  end

  # Authenticate with a ticket-granting-ticket (TGT). This method will contact the KDC and can request a delegation
  # ticket.
  #
  # @param [Rex::Proto::Kerberos::CredentialCache::Krb5CcacheCredential] credential
  # @param [Hash] options
  def authenticate_via_krb5_ccache_credential_tgt(credential, options = {})
    realm = self.realm.upcase
    sname = options.fetch(:sname)
    client_name = username

    now = Time.now.utc
    expiry_time = now + 1.day

    ticket = Rex::Proto::Kerberos::Model::Ticket.decode(credential.ticket.value)
    session_key = Rex::Proto::Kerberos::Model::EncryptionKey.new(
      type: credential.keyblock.enctype.value,
      value: credential.keyblock.data.value
    )

    etypes = Set.new([credential.keyblock.enctype.value])
    tgs_options = {
      pa_data: [],
      ticket_storage: ticket_storage
    }

    tgs_ticket, tgs_auth = request_service_ticket(
      session_key,
      ticket,
      realm,
      client_name,
      etypes,
      expiry_time,
      now,
      sname,
      tgs_options
    )

    case send_delegated_creds
    when Delegation::ALWAYS
      do_delegation = true
    when Delegation::NEVER
      do_delegation = false
    when Delegation::WHEN_UNCONSTRAINED
      do_delegation = tgs_auth.flags.include?(Rex::Proto::Kerberos::Model::KdcOptionFlags::OK_AS_DELEGATE)
    end

    if do_delegation
      delegated_tgs_ticket, delegated_tgs_auth = request_delegation_ticket(
        session_key,
        ticket,
        realm,
        client_name,
        ticket.enc_part.etype,
        expiry_time,
        now
      )
    end

    ## Service Authentication
    checksum = nil
    if use_gss_checksum
      checksum = build_gss_ap_req_checksum_value(
        mutual_auth,
        dce_style,
        delegated_tgs_ticket,
        delegated_tgs_auth,
        tgs_auth.key,
        realm,
        client_name
      )
    end

    sequence_number = rand(1 << 32)
    service_ap_request = build_service_ap_request(
      session_key: tgs_auth.key,
      checksum: checksum,
      ticket: tgs_ticket,
      realm: realm,
      client_name: client_name,
      options: Rex::Proto::Kerberos::Model::KdcOptionFlags.from_flags(
        [
          Rex::Proto::Kerberos::Model::KdcOptionFlags::FORWARDABLE,
          Rex::Proto::Kerberos::Model::KdcOptionFlags::RENEWABLE,
          Rex::Proto::Kerberos::Model::KdcOptionFlags::CANONICALIZE,
          Rex::Proto::Kerberos::Model::KdcOptionFlags::RENEWABLE_OK
        ]
      ),
      sequence_number: sequence_number,
      subkey_type: ticket.enc_part.etype # The AP-REP will come back with this same type of subkey
    )

    {
      service_ap_request: service_ap_request,
      session_key: tgs_auth.key,
      client_sequence_number: sequence_number
    }
  end

  def build_gss_ap_req_checksum_value(mutual_auth, dce_style, ticket, decrypted_part, session_key, realm, client_name)
    # @see https://datatracker.ietf.org/doc/html/rfc4121#section-4.1.1
    # No channel binding
    channel_binding_info = "\x00" * 16
    channel_binding_info_len = [channel_binding_info.length].pack('V')

    flags = GSS_REPLAY_DETECT | GSS_SEQUENCE | GSS_CONFIDENTIAL | GSS_INTEGRITY
    flags |= GSS_MUTUAL if mutual_auth
    flags |= GSS_DCE_STYLE if dce_style
    flags |= GSS_DELEGATE if ticket

    flags = [flags].pack('V')

    checksum_val = channel_binding_info_len + channel_binding_info + flags

    if ticket
      krb_cred = Rex::Proto::Kerberos::Model::KrbCred.new
      krb_cred.pvno = 5
      krb_cred.msg_type = 0x16
      krb_cred.tickets = [ticket]
      ticket_info = Rex::Proto::Kerberos::Model::KrbCredInfo.new
      ticket_info.key = decrypted_part.key
      ticket_info.prealm = realm
      ticket_info.pname = build_client_name(client_name: client_name)
      ticket_info.flags = decrypted_part.flags
      ticket_info.auth_time = decrypted_part.auth_time
      ticket_info.start_time = decrypted_part.start_time
      ticket_info.end_time = decrypted_part.end_time
      ticket_info.renew_till = decrypted_part.renew_till
      ticket_info.sname = decrypted_part.sname
      ticket_info.srealm = decrypted_part.srealm

      enc_part = Rex::Proto::Kerberos::Model::EncKrbCredPart.new
      enc_part.ticket_info = [ticket_info]

      krb_cred.enc_part = enc_part.encrypt(session_key)

      dlg_opt = [1].pack('v')
      dlg_val = krb_cred.encode
      dlg_length = [dlg_val.length].pack('v')
      checksum_val += dlg_opt + dlg_length + dlg_val
    end

    checksum = Rex::Proto::Kerberos::Model::Checksum.new(
      type: Rex::Proto::Gss::KRB_AP_REQ_CHKSUM_TYPE,
      checksum: checksum_val)
  end

  # @param [Rex::Proto::Kerberos::Model::EncryptionKey] session_key
  # @param [Rex::Proto::Kerberos::Model::Ticket] tgt_ticket
  # @param [String] realm
  # @param [String] client_name
  # @param [Integer] tgt_etype
  # @param [Time] expiry_time
  # @param [Time] now
  def request_delegation_ticket(session_key, tgt_ticket, realm, client_name, tgt_etype, expiry_time, now)
    ticket_options = Rex::Proto::Kerberos::Model::KdcOptionFlags.from_flags(
      [
        Rex::Proto::Kerberos::Model::KdcOptionFlags::FORWARDABLE,
        Rex::Proto::Kerberos::Model::KdcOptionFlags::FORWARDED,
        Rex::Proto::Kerberos::Model::KdcOptionFlags::RENEWABLE,
        Rex::Proto::Kerberos::Model::KdcOptionFlags::CANONICALIZE,
      ]
    )

    krbtgt_sname = Rex::Proto::Kerberos::Model::PrincipalName.new(
      name_type: Rex::Proto::Kerberos::Model::NameType::NT_SRV_INST,
      name_string: [
        "krbtgt",
        realm
      ]
    )
    delegated_tgs_res = send_request_tgs(
      req: build_tgs_request(
        {
          session_key: session_key,
          subkey: nil,
          checksum: nil,
          ticket: tgt_ticket,
          realm: realm,
          client_name: client_name,
          options: ticket_options,

          body: build_tgs_request_body(
            cname: nil,
            sname: krbtgt_sname,
            realm: realm,
            etype: [tgt_etype],
            options: ticket_options,

            # Specify nil to ensure the KDC uses the current time for the desired starttime of the requested ticket
            from: nil,
            till: expiry_time,
            rtime: nil,

            # certificate time
            ctime: now
          )
        }
      )
    )

    # Verify error codes
    if delegated_tgs_res.msg_type != Rex::Proto::Kerberos::Model::KRB_ERROR
      print_good("#{peer} - Received a valid delegation TGS-Response")
    end

    delegated_tgs_ticket = delegated_tgs_res.ticket
    delegated_tgs_auth = decrypt_kdc_tgs_rep_enc_part(
      delegated_tgs_res,
      session_key.value,
      msg_type: Rex::Proto::Kerberos::Crypto::KeyUsage::TGS_REP_ENCPART_SESSION_KEY
    )

    [delegated_tgs_ticket, delegated_tgs_auth]
  end

  # @param [Rex::Proto::Kerberos::Model::EncryptionKey] session_key
  # @param [Rex::Proto::Kerberos::Model::Ticket] tgt_ticket
  # @param [String] realm
  # @param [String] client_name
  # @param [Integer] etypes
  # @param [Time] expiry_time
  # @param [Time] now
  # @param [Rex::Proto::Kerberos::Model::PrincipalName] sname
  # @param [Hash] options
  # @option options [Array<Rex::Proto::Kerberos::Model::KdcOptionFlags>] :additional_flags
  #   Any additional flags to add to the TGS request option flags. The
  #   FORWARDABLE, RENEWABLE and CANONICALIZE flags are set by default.
  # @option options [Array<Rex::Proto::Kerberos::Model::Ticket>] :additional_tickets
  #   Any additional tickets to add to the request
  # @option options [Array<Rex::Proto::Kerberos::Model::PreAuthDataEntry>] :pa_data
  #   Any additional pre-auth data entries to add to the request
  # @option options [Msf::Exploit::Remote::Kerberos::Ticket::Storage::Base] :ticket_storage Override the @ticket_storage attribute
  # @option options [String] :credential_cache_username The name of user
  #   corresponding to the requested TGS ticket. This name will be used in
  #   the info field when the tickets is stored in the database. This can be used
  #   to override the original username in case of impersonation.
  # @raise [Rex::Proto::Kerberos::Model::Error::KerberosError]
  # @return [Array] The TGS ticket and the decrypted TGS credentials as a MIT Cache Credential
  def request_service_ticket(session_key, tgt_ticket, realm, client_name, etypes, expiry_time, now, sname, options = {})
    etypes = etypes.is_a?(::Enumerable) ? etypes : [etypes]

    flags = Set.new([
      Rex::Proto::Kerberos::Model::KdcOptionFlags::FORWARDABLE,
      Rex::Proto::Kerberos::Model::KdcOptionFlags::RENEWABLE,
      Rex::Proto::Kerberos::Model::KdcOptionFlags::CANONICALIZE,
    ])
    if options[:additional_flags].present?
      additional_flags = options[:additional_flags]
      additional_flags = [additional_flags] unless additional_flags.is_a?(::Enumerable)
      flags.merge(additional_flags)
    end
    ticket_options = Rex::Proto::Kerberos::Model::KdcOptionFlags.from_flags(flags)

    tgs_body_options = {
      cname: nil,
      sname: sname,
      realm: realm,
      etype: etypes,
      options: ticket_options,

      # Specify nil to ensure the KDC uses the current time for the desired starttime of the requested ticket
      from: nil,
      till: expiry_time,
      rtime: nil,

      # certificate time
      ctime: now
    }
    if options[:additional_tickets].present?
      additional_tickets = options[:additional_tickets]
      additional_tickets = [additional_tickets] unless additional_tickets.is_a?(::Enumerable)
      tgs_body_options[:additional_tickets] = additional_tickets
    end

    tgs_options = {
      session_key: session_key,
      subkey: nil,
      checksum: nil,
      ticket: tgt_ticket,
      realm: realm,
      client_name: client_name,
      options: ticket_options,

      body: build_tgs_request_body(**tgs_body_options)
    }
    if options[:pa_data].present?
      pa_data = [options[:pa_data]] unless options[:pa_data].is_a?(::Enumerable)
      tgs_options[:pa_data] = pa_data
    end

    tgs_res = send_request_tgs(
        req: build_tgs_request(tgs_options)
    )

    # Verify error codes
    if tgs_res.msg_type == Rex::Proto::Kerberos::Model::KRB_ERROR
      raise ::Rex::Proto::Kerberos::Model::Error::KerberosError.new(res: tgs_res)
    end

    print_good("#{peer} - Received a valid TGS-Response")

    ccache = extract_kerb_creds(
      tgs_res,
      session_key.value,
      msg_type: Rex::Proto::Kerberos::Crypto::KeyUsage::TGS_REP_ENCPART_SESSION_KEY
    )
    if options[:credential_cache_username].present?
      client = options[:credential_cache_username]
    else
      client = self.username
    end
    options.fetch(:ticket_storage, @ticket_storage).store_ccache(
      ccache,
      host: rhost,
      client: client,
      server: sname
    )

    tgs_ticket = tgs_res.ticket
    tgs_auth = decrypt_kdc_tgs_rep_enc_part(
      tgs_res,
      session_key.value,
      msg_type: Rex::Proto::Kerberos::Crypto::KeyUsage::TGS_REP_ENCPART_SESSION_KEY
    )

    [tgs_ticket, tgs_auth]
  end

  # Search the database for a credential object that can be used for authentication.
  #
  # @param [Hash] options
  # @return [Rex::Proto::Kerberos::CredentialCache::Krb5CacheCredential] the credential object for authentication
  # @return [nil] returned if the database is not connected or no usable credentials are found
  def get_cached_credential(options = {})
    driver = options.fetch(:ticket_storage, @ticket_storage)
    driver.load_credential(
      host: options.fetch(:host) { rhost },
      client: options.fetch(:username) { self.username },
      server: options.fetch(:sname, nil),
      realm: options.fetch(:realm) { self.realm }
    )
  end

  # Load a credential object from a file for authentication. Credentials in the file will be filtered by multiple
  # attributes including their timestamps to ensure that the returned credential appears usable.
  #
  # @param [String] file_path The file path to load a credential object from
  # @return [Rex::Proto::Kerberos::CredentialCache::Krb5CacheCredential] the credential object for authentication
  def load_credential_from_file(file_path, options = {})
    unless File.readable?(file_path.to_s)
      wlog("Failed to load ticket file '#{file_path}' (file not readable)")
      return nil
    end

    begin
      cache = Rex::Proto::Kerberos::CredentialCache::Krb5Ccache.read(File.binread(file_path))
    rescue StandardError => e
      elog("Failed to load ticket file '#{file_path}' (parsing failed)", error: e)
      return nil
    end

    sname = options.fetch(:sname) { build_spn&.to_s }
    sname_hostname = options.fetch(:sname_hostname, nil)
    now = Time.now.utc

    cache.credentials.to_ary.each.with_index(1) do |credential, index|
      tkt_start = credential.starttime == Time.at(0).utc ? credential.authtime : credential.starttime
      tkt_end = credential.endtime

      unless tkt_start < now
        wlog("Filtered credential #{file_path} ##{index} reason: Ticket start time is before now (start: #{tkt_start})")
        next
      end

      unless now < tkt_end
        wlog("Filtered credential #{file_path} ##{index} reason: Ticket is expired (expiration: #{tkt_end})")
        next
      end

      unless !@realm || @realm.casecmp?(credential.server.realm.to_s)
        wlog("Filtered credential #{file_path} ##{index} reason: Realm (#{@realm}) does not match (realm: #{credential.server.realm})")
        next
      end

      unless !sname || sname.to_s.casecmp?(credential.server.components.snapshot.join('/'))
        wlog("Filtered credential #{file_path} ##{index} reason: SPN (#{sname}) does not match (spn: #{credential.server.components.snapshot.join('/')})")
        next
      end

      unless !sname_hostname ||
        sname_hostname.to_s.downcase == credential.server.components[1] ||
        sname_hostname.to_s.downcase.ends_with?('.' + credential.server.components[1])
        wlog("Filtered credential #{file_path} ##{index} reason: SPN (#{sname_hostname}) hostname does not match (spn: #{credential.server.components.snapshot.join('/')})")
        next
      end

      unless !@username || @username.casecmp?(credential.client.components.last.to_s)
        wlog("Filtered credential #{file_path} ##{index} reason: Username (#{@username}) does not match (username: #{credential.client.components.last})")
        next
      end

      return credential
    end

    nil
  end
end