Skip to content

Commit

Permalink
Move io-based components to io package
Browse files Browse the repository at this point in the history
Fix #125

Signed-off-by: Byron Ruth <[email protected]>
  • Loading branch information
bruth committed Jun 25, 2015
1 parent a0dcd70 commit 89b67af
Show file tree
Hide file tree
Showing 10 changed files with 472 additions and 60 deletions.
5 changes: 3 additions & 2 deletions cmd/origins/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/chop-dbhi/origins"
"github.com/chop-dbhi/origins/chrono"
"github.com/chop-dbhi/origins/io"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -95,10 +96,10 @@ https://github.com/chop-dbhi/origins-generators`,

// Data written to the buffer are converted into facts, normalized, and
// copied to the CSV writer.
iterator = origins.NewCSVReader(stdout)
iterator = io.NewCSVReader(stdout)

// Facts are written to stdout.
writer = origins.NewCSVWriter(os.Stdout)
writer = io.NewCSVWriter(os.Stdout)

// Modify the fact before writing using the passed arguments.
// TODO: there is a overlap with the transactor.
Expand Down
3 changes: 2 additions & 1 deletion cmd/origins/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/chop-dbhi/origins"
"github.com/chop-dbhi/origins/chrono"
. "github.com/chop-dbhi/origins/io"
"github.com/chop-dbhi/origins/storage"
"github.com/chop-dbhi/origins/view"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -115,7 +116,7 @@ var logCmd = &cobra.Command{

switch format {
case "csv":
fw = origins.NewCSVWriter(w)
fw = NewCSVWriter(w)
default:
logrus.Fatal("unknown format", format)
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/origins/transact.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"

"github.com/chop-dbhi/origins"
. "github.com/chop-dbhi/origins/io"
"github.com/chop-dbhi/origins/transactor"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -22,20 +23,20 @@ func transactFile(tx *transactor.Transaction, r io.Reader, format, compression s
if compression != "" {
logrus.Debugf("transact: applying %s decompression", compression)

if r, err = origins.Decompressor(r, compression); err != nil {
if r, err = Decompressor(r, compression); err != nil {
logrus.Fatalf("transact: %s", err)
}
}

// Wrap in a reader to handle carriage returns before passing
// it into the format reader.
r = origins.NewUniversalReader(r)
r = NewUniversalReader(r)

var pub origins.Publisher

switch format {
case "csv":
pub = origins.NewCSVReader(r)
pub = NewCSVReader(r)
default:
logrus.Fatal("transact: unsupported file format", format)
}
Expand Down Expand Up @@ -78,11 +79,11 @@ var transactCmd = &cobra.Command{
compression = viper.GetString("transact_compression")

if format == "" {
format = origins.DetectFileFormat(fn)
format = DetectFileFormat(fn)
}

if compression == "" {
compression = origins.DetectFileCompression(fn)
compression = DetectFileCompression(fn)
}

file, err := os.Open(fn)
Expand Down
3 changes: 2 additions & 1 deletion http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/chop-dbhi/origins"
"github.com/chop-dbhi/origins/io"
"github.com/chop-dbhi/origins/storage"
"github.com/chop-dbhi/origins/view"
"github.com/julienschmidt/httprouter"
Expand Down Expand Up @@ -99,7 +100,7 @@ func httpLog(w http.ResponseWriter, r *http.Request, p httprouter.Params) {

switch format {
case "text", "csv":
fw = origins.NewCSVWriter(w)
fw = io.NewCSVWriter(w)

if _, err := origins.Copy(iter, fw); err != nil {
w.WriteHeader(http.StatusInternalServerError)
Expand Down
65 changes: 17 additions & 48 deletions csv.go → io/csv.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,4 @@
// This module implements a CSV reader for reading facts from CSV-like formatted data.
//
// The following fields are supported:
//
// - Domain - The domain the fact will be asserted in. This is only required for bulk-formatted data, otherwise it is ignored.
// - Operation - The operation to apply to this fact. Optional, defaults to "assert".
// - Valid Time - The time this fact should be considered valid. This is separate from the "database time" which denotes when the fact was physically added. Optional, defaults to "now".
// - Entity Domain - The domain of the entity. Optional, defaults to the fact domain.
// - Entity - The local name of the attribute.
// - Attribute Domain - The domain of the attribute. Optional, defaults to the fact domain.
// - Attribute - The local name of the attribute.
// - Value Domain - The domain of the value. Optional, defaults to the fact domain.
// - Value - The local name of the value.
//
// As noted, most of these fields are optional so they do not need to be included in the file. To do this, a header must be present using the above names to denote the field a column corresponds to. For example, this is a valid file:
//
// entity,attribute,value
// bill,likes,soccer
// bill,likes,fútbol
// soccer,is,fútbol
//
// Applying this to the domain "sports", this will expand out to:
//
// domain,operation,time,entity domain,entity,attribute domain,attribute,value domain,value
// sports,assert,now,sports,bill,sports,likes,sports,soccer
// sports,assert,now,sports,bill,sports,likes,sports,fútbol
// sports,assert,now,sports,soccer,sports,is,sports,fútbol
//
// The time "now" will be transaction time when it is committed to the database.
//
// At a minimum, the entity, attribute, and value fields must be present.

package origins
package io

import (
"encoding/csv"
Expand All @@ -39,6 +7,7 @@ import (
"io"
"strings"

"github.com/chop-dbhi/origins"
"github.com/chop-dbhi/origins/chrono"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -114,19 +83,19 @@ func isEmptyRecord(r []string) bool {
type CSVReader struct {
reader *csv.Reader
header map[string]int
fact *Fact
fact *origins.Fact
err error
}

func (r *CSVReader) parse(record []string) (*Fact, error) {
func (r *CSVReader) parse(record []string) (*origins.Fact, error) {
// Map row values to fact fields.
var (
ok bool
err error
idx, rlen int
val string
dom string
f = Fact{}
f = origins.Fact{}
)

rlen = len(record)
Expand All @@ -138,7 +107,7 @@ func (r *CSVReader) parse(record []string) (*Fact, error) {

// Operation
if idx, ok = r.header["operation"]; ok && idx < rlen {
op, err := ParseOperation(record[idx])
op, err := origins.ParseOperation(record[idx])

if err != nil {
logrus.Error(err)
Expand All @@ -164,7 +133,7 @@ func (r *CSVReader) parse(record []string) (*Fact, error) {
}
}

var ident *Ident
var ident *origins.Ident

// Entity
idx, _ = r.header["entity"]
Expand All @@ -176,7 +145,7 @@ func (r *CSVReader) parse(record []string) (*Fact, error) {
dom = ""
}

if ident, err = NewIdent(dom, val); err != nil {
if ident, err = origins.NewIdent(dom, val); err != nil {
return nil, err
}

Expand All @@ -192,7 +161,7 @@ func (r *CSVReader) parse(record []string) (*Fact, error) {
dom = ""
}

if ident, err = NewIdent(dom, val); err != nil {
if ident, err = origins.NewIdent(dom, val); err != nil {
return nil, err
}

Expand All @@ -208,7 +177,7 @@ func (r *CSVReader) parse(record []string) (*Fact, error) {
dom = ""
}

if ident, err = NewIdent(dom, val); err != nil {
if ident, err = origins.NewIdent(dom, val); err != nil {
return nil, err
}

Expand All @@ -218,11 +187,11 @@ func (r *CSVReader) parse(record []string) (*Fact, error) {
}

// scan reads the CSV file for the next fact.
func (r *CSVReader) next() (*Fact, error) {
func (r *CSVReader) next() (*origins.Fact, error) {
var (
err error
record []string
fact *Fact
fact *origins.Fact
)

// Logic defined in a loop to skip comments.
Expand Down Expand Up @@ -268,7 +237,7 @@ func (r *CSVReader) next() (*Fact, error) {
}

// Next returns the next fact in the stream.
func (r *CSVReader) Next() *Fact {
func (r *CSVReader) Next() *origins.Fact {
if r.err != nil {
return nil
}
Expand Down Expand Up @@ -297,8 +266,8 @@ func (r *CSVReader) Err() error {

// Subscribe satisfies the Publisher interface. It returns a channel of facts that
// can be consumed by downstream consumers.
func (r *CSVReader) Subscribe(done <-chan struct{}) (<-chan *Fact, <-chan error) {
ch := make(chan *Fact)
func (r *CSVReader) Subscribe(done <-chan struct{}) (<-chan *origins.Fact, <-chan error) {
ch := make(chan *origins.Fact)
errch := make(chan error)

go func() {
Expand All @@ -308,7 +277,7 @@ func (r *CSVReader) Subscribe(done <-chan struct{}) (<-chan *Fact, <-chan error)
}()

var (
f *Fact
f *origins.Fact
err error
)

Expand Down Expand Up @@ -358,7 +327,7 @@ type CSVWriter struct {
started bool
}

func (w *CSVWriter) Write(f *Fact) error {
func (w *CSVWriter) Write(f *origins.Fact) error {
if !w.started {
w.writer.Write(csvHeader)
w.started = true
Expand Down
2 changes: 1 addition & 1 deletion csv_test.go → io/csv_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package origins
package io

import (
"bytes"
Expand Down
2 changes: 1 addition & 1 deletion file.go → io/file.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package origins
package io

import (
"compress/bzip2"
Expand Down
2 changes: 1 addition & 1 deletion file_test.go → io/file_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package origins
package io

import (
"bytes"
Expand Down
Loading

0 comments on commit 89b67af

Please sign in to comment.