RobertDober/lab42_streams

View on GitHub
lib/lab42/stream/enumerable.rb

Summary

Maintainability
B
6 hrs
Test Coverage
require_relative 'core/enumerable'
require_relative 'behavior'

module Lab42
  class Stream
    IllegalState = Class.new RuntimeError

    module Enumerable

      def drop_until *bhv, &blk
        bhv = Behavior.make( *bhv, &blk )
        __drop_while__ bhv.not
      end

      def drop_while *bhv, &blk
        bhv = Behavior.make( *bhv, &blk )
        __drop_while__ bhv
      end


      def __drop_while__ bhv
        s = self
        loop do
          return s unless bhv.(s.head)
          s = s.tail
        end
        empty_stream
      end

      def each
        t = self
        loop do
          return if t.empty?
          yield t.head
          t = t.tail
        end
      end

      def each_without_loops
        visited = {}
        t = self
        loop do
          h = t.head
          yield h
          visited[ t.object_id ] = true
          t = t.tail
          return if visited[t.object_id]
        end
      end

      def force_all cache={}
        x = []
        each_without_loops do | ele |
          if self.class === ele
            if ! cache[ele.object_id]
              cache[ele.object_id] = true
              x << ele.force_all( cache )
            end
          else
            x << ele
          end
        end
        x
      end

      def lazy_take n=1
        raise ArgumentError, "need a non negative Integer" if !(Integer === n) || n < 0
        __lazy_take__ n
      end

      def __lazy_take__ n
        return empty_stream if n.zero?
        cons_stream( head ){ tail.__lazy_take__ n.pred }
      end

      def lazy_take_until *bhv, &blk
        bhv = Behavior.make( *bhv, &blk )
        __lazy_take_until__ bhv
      end

      def __lazy_take_until__ bhv
        return empty_stream if bhv.(head)
        cons_stream( head ){
          tail.__lazy_take_until__ bhv
        }
      end

      def lazy_take_while *bhv, &blk
        bhv = Behavior.make( *bhv, &blk )
        __lazy_take_while__ bhv
      end

      def __lazy_take_while__ bhv
        return empty_stream unless bhv.(head)
        cons_stream( head ){
          tail.__lazy_take_while__ bhv
        }
      end

      def reduce red=nil, &reducer
        red = Behavior.make( red, &reducer)
        tail.__inject__ head, red
      end

      def inject agg, *red, &reducer
        __inject__ agg, Behavior.make( *red, &reducer )
      end

      def filter *args, &blk
        __filter__ self, Behavior.make( *args, &blk )
      end

      def reject *args, &blk
        __filter__ self, Behavior.make( *args, &blk ).not
      end

      def flatmap *args, &blk
        __flatmap__ Behavior.make( *args, &blk )
      end

      def __flatmap__ a_proc
        hh = a_proc.( head )
        raise ArgumentError, "flatmap can only map on streams, use flatmap_with_each to map over streams and enumerables" unless
          Lab42::Stream === hh
        if hh.empty?
          tail.__flatmap__ a_proc
        else
          cons_stream( hh.head ){ hh.tail + tail.__flatmap__( a_proc ) }
        end
      end

      def flatmap_with_each *args, &blk
        __flatmap_with_each__ Behavior.make( *args, &blk )
      end

      def __flatmap_with_each__ a_proc, rest_of_enum = []
        # Process expanded values
        return cons_stream( rest_of_enum.first ){ __flatmap_with_each__ a_proc, rest_of_enum.drop( 1 ) } unless
        rest_of_enum.empty?

        # Map a scalar value
        hh = a_proc.( head )
        return cons_stream( hh ){ tail.__flatmap_with_each__ a_proc } unless
        hh.respond_to? :each

        # Start a new expansion...
        # ... consider an empty expansion
        return tail.__flatmap__ a_proc if hh.empty?
        # ... expand values
        cons_stream( hh.first ){ tail.__flatmap_with_each__( a_proc, hh.drop( 1 ) ) }
      end


      def scan initial, *args, &blk
        cons_stream initial do
          __scan__ initial, Behavior.make( *args, &blk )
        end.tap{ |r|
        }
      end

      def scan1 *args, &blk
        tail.scan( head, *args, &blk )
      end

      def __scan__ initial, beh
        h = beh.(initial, head)
        cons_stream( h ){ tail.__scan__ h, beh }
      end

      def take_until *bhv, &blk
        bhv = Behavior.make( *bhv, &blk )
        __take_while__ bhv.not
      end

      def take_while *bhv, &blk
        bhv = Behavior.make( *bhv, &blk )
        __take_while__ bhv
      end

      def __take_while__ bhv
        x = []
        each do | ele |
          return x unless bhv.( ele )
          x << ele
        end
        x
      end

      def to_a
        take_while Behavior.const( true ) 
      end

      alias_method :entries, :to_a

      def make_cyclic
        cons_stream( head ){
          tail.append( make_cyclic )
        }
      end

      def map *args, &blk
        raise ArgumentError, "use either a block or arguments" if args.empty? && !blk || !args.empty? && blk
        __map__ Behavior.make( *args, &blk )
      end

      def __map__ prc
        cons_stream( prc.(head) ){ tail.__map__ prc }
      end


      def reduce_while cond, red=nil, &reducer
        red ||= reducer
        tail.__inject_while__ head, cond, red
      end

      def take n=1
        raise ArgumentError, "need a non negative Integer" if !(Integer === n) || n < 0
        x = []
        each do | ele |
          return x if n.zero?
          n -= 1
          x << ele
        end
        x
      end

      def zip *other_streamables
        streams = other_streamables.map{ |s|
          self.class === s ? s : s.to_stream
        }
        __zip__ streams
      end

      def zip_as_ary *other_streamables
        zip( *other_streamables )
        .map( &:entries )
      end

      def __zip__ streams
        cons_stream( [head] + streams.map(&:head) ){
          tail.__zip__ streams.map(&:tail)
        }
      end

      def __filter__ stream, a_proc
        loop do
          return stream if stream.empty?
          return cons_stream( stream.head ){ __filter__ stream.tail, a_proc } if a_proc.( stream.head )
          stream = stream.tail
        end
      end

      def __inject__ agg, a_proc
        new_agg = a_proc.(agg, head)
        tail.__inject__ new_agg, a_proc
      end

      def __inject_while__ ival, cond, red
        raise ConstraintError unless cond.(ival)
        s = self
        loop do
          new_val = red.(ival, s.head)
          return ival unless cond.(new_val)
          ival = new_val
          s = s.tail
          return ival if s.empty?
        end
      end
    end # module Enumerable
    include Enumerable
  end # class Stream
end # module Lab42