def consume_each_batch_inline
      @consumer.each_batch(**@message_processing_opts) do |batch|
        batch_processor = Phobos::Actions::ProcessBatchInline.new(
          listener: self,
          batch: batch,