def consume_each_batch
      @consumer.each_batch(**@message_processing_opts) do |batch|
        batch_processor = Phobos::Actions::ProcessBatch.new(
          listener: self,
          batch: batch,