rdfio/rdf2smw

View on GitHub
components/filereader.go

Summary

Maintainability
A
0 mins
Test Coverage
package components

import (
    "bufio"
    "log"

    "github.com/flowbase/flowbase"
    "github.com/spf13/afero"
)

// --------------------------------------------------------------------------------
// FileReader
// --------------------------------------------------------------------------------

// FileReader is a process that reads files, based on file names it receives on the
// FileReader.InFileName port / channel, and writes out the output line by line
// as strings on the FileReader.OutLine port / channel.
type FileReader struct {
    InFileName chan string
    OutLine    chan string
    fs         afero.Fs
}

// NewOsFileReader returns an initialized FileReader, initialized with an OS
// (normal) file system
func NewOsFileReader() *FileReader {
    return NewFileReader(afero.NewOsFs())
}

// NewFileReader returns an initialized FileReader, initialized with the afero
// file system provided as an argument
func NewFileReader(fileSystem afero.Fs) *FileReader {
    return &FileReader{
        InFileName: make(chan string, BUFSIZE),
        OutLine:    make(chan string, BUFSIZE),
        fs:         fileSystem,
    }
}

// Run runs the FileReader process. It does not spawn a separate go-routine, so
// you have to prepend the go keyword when calling it, in order to have it run
// in a separate go-routine.
func (p *FileReader) Run() {
    defer close(p.OutLine)

    flowbase.Debug.Println("Starting loop")
    for fileName := range p.InFileName {
        flowbase.Debug.Printf("Starting processing file %s\n", fileName)
        fh, err := p.fs.Open(fileName)
        if err != nil {
            log.Fatal(err)
        }
        defer fh.Close()

        sc := bufio.NewScanner(fh)
        for sc.Scan() {
            if err := sc.Err(); err != nil {
                log.Fatal(err)
            }
            p.OutLine <- sc.Text()
        }
    }
}