Skip to content

RawRecord PR follow-up - address DGollings' comment #139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ situations.
- Golang 1.14

## Recent Major Feature Additions/Changes
- Added `Transform.CurrentRawRecord()` for caller of omniparser to access the raw ingested record.
- Added `Transform.RawRecord()` for caller of omniparser to access the raw ingested record.
- Deprecated `custom_parse` in favor of `custom_func` (`custom_parse` is still usable for
back-compatibility, it is just removed from all public docs and samples).
- Added `NonValidatingReader` EDI segment reader.
Expand Down
5 changes: 4 additions & 1 deletion doc/gettingstarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,10 @@ for {
break
}
if err != nil { ... }
// output contains a []byte of the ingested and transformed record.
// output contains a []byte of the ingested and transformed record.

// Also transform.RawRecord() gives you access to the raw record.
fmt.Println(transform.RawRecord().Checksum())
}
```

Expand Down
8 changes: 3 additions & 5 deletions doc/programmability.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ for {
}
if err != nil { ... }
// output contains a []byte of the ingested and transformed record.

raw, err := transform.CurrentRawRecord()
if err != nil { ... }
rawRecord := raw.(*omniv21.RawRecord) // assuming the schema is of `omni.2.1` version.
fmt.Println(rawRecord.UUIDv3()) // rawRecord.UUIDv3() returns a stable hash of the current raw record.

// Also transform.RawRecord() gives you access to the raw record.
fmt.Println(transform.RawRecord().Checksum())
}
```
Note this out-of-box omniparser setup contains only the `omni.2.1` schema handler, meaning only schemas
Expand Down
30 changes: 16 additions & 14 deletions extensions/omniv21/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ import (
"github.com/jf-tech/omniparser/extensions/omniv21/fileformat"
"github.com/jf-tech/omniparser/extensions/omniv21/transform"
"github.com/jf-tech/omniparser/idr"
"github.com/jf-tech/omniparser/schemahandler"
"github.com/jf-tech/omniparser/transformctx"
)

// RawRecord contains the raw data ingested in from the input stream in the form of an IDR tree.
// Note callers outside this package should absolutely make **NO** modifications to the content of
// RawRecord. Treat it like read-only.
type RawRecord struct {
Node *idr.Node
type rawRecord struct {
node *idr.Node
}

// UUIDv3 returns a stable MD5(v3) hash of the RawRecord.
func (rr *RawRecord) UUIDv3() string {
hash, _ := customfuncs.UUIDv3(nil, idr.JSONify2(rr.Node))
func (rr *rawRecord) Raw() interface{} {
return rr.node
}

// Checksum returns a stable MD5(v3) hash of the rawRecord.
func (rr *rawRecord) Checksum() string {
hash, _ := customfuncs.UUIDv3(nil, idr.JSONify2(rr.node))
return hash
}

Expand All @@ -31,19 +33,19 @@ type ingester struct {
customParseFuncs transform.CustomParseFuncs // Deprecated.
ctx *transformctx.Ctx
reader fileformat.FormatReader
rawRecord RawRecord
rawRecord rawRecord
}

// Read ingests a raw record from the input stream, transforms it according the given schema and return
// the raw record, transformed JSON bytes.
func (g *ingester) Read() (interface{}, []byte, error) {
if g.rawRecord.Node != nil {
g.reader.Release(g.rawRecord.Node)
g.rawRecord.Node = nil
func (g *ingester) Read() (schemahandler.RawRecord, []byte, error) {
if g.rawRecord.node != nil {
g.reader.Release(g.rawRecord.node)
g.rawRecord.node = nil
}
n, err := g.reader.Read()
if n != nil {
g.rawRecord.Node = n
g.rawRecord.node = n
}
if err != nil {
// Read() supposed to have already done CtxAwareErr error wrapping. So directly return.
Expand Down
3 changes: 2 additions & 1 deletion extensions/omniv21/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func TestIngester_Read_Success(t *testing.T) {
}
raw, b, err := g.Read()
assert.NoError(t, err)
assert.Equal(t, "41665284-dab9-300d-b647-7ace9cb514b4", raw.(*RawRecord).UUIDv3())
assert.Equal(t, "41665284-dab9-300d-b647-7ace9cb514b4", raw.Checksum())
assert.Equal(t, "{}", idr.JSONify2(raw.Raw().(*idr.Node)))
assert.Equal(t, "123", string(b))
assert.Equal(t, 0, g.reader.(*testReader).releaseCalled)
raw, b, err = g.Read()
Expand Down
8 changes: 3 additions & 5 deletions extensions/omniv21/samples/testCommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/assert"

"github.com/jf-tech/omniparser"
"github.com/jf-tech/omniparser/extensions/omniv21"
"github.com/jf-tech/omniparser/idr"
"github.com/jf-tech/omniparser/transformctx"
)
Expand Down Expand Up @@ -49,12 +48,11 @@ func SampleTestCommon(t *testing.T, schemaFile, inputFile string) string {
err = json.Unmarshal(recordBytes, &transformed)
assert.NoError(t, err)

raw, err := transform.CurrentRawRecord()
raw, err := transform.RawRecord()
assert.NoError(t, err)
rawRecord := raw.(*omniv21.RawRecord)
records = append(records, record{
RawRecord: idr.JSONify2(rawRecord.Node),
RawRecordHash: rawRecord.UUIDv3(),
RawRecord: idr.JSONify2(raw.Raw().(*idr.Node)),
RawRecordHash: raw.Checksum(),
TransformedRecord: transformed,
})
}
Expand Down
10 changes: 9 additions & 1 deletion schemahandler/schemaHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ type SchemaHandler interface {
NewIngester(ctx *transformctx.Ctx, input io.Reader) (Ingester, error)
}

// RawRecord represents a raw record ingested from the input.
type RawRecord interface {
// Raw returns the actual raw record that is version specific to each of the schema handler.
Raw() interface{}
// Checksum returns a UUIDv3 (MD5) stable hash of the raw record.
Checksum() string
}

// Ingester is an interface of ingestion and transformation for a given input stream.
type Ingester interface {
// Read is called repeatedly during the processing of an input stream. Each call it should return
Expand All @@ -46,7 +54,7 @@ type Ingester interface {
// one record at a time, OR, processes and returns one record for each call. However, the overall
// design principle of omniparser is to have streaming processing capability so memory won't be a
// constraint when dealing with large input file. All built-in ingesters are implemented this way.
Read() (interface{}, []byte, error)
Read() (RawRecord, []byte, error)

// IsContinuableError is called to determine if the error returned by Read is fatal or not. After Read
// is called, the result record or error will be returned to caller. After caller consumes record or
Expand Down
16 changes: 7 additions & 9 deletions transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ type Transform interface {
// return the same error.
// Note if returned error isn't nil, then returned []byte will be nil.
Read() ([]byte, error)
// CurrentRawRecord returns the current raw record ingested from the input stream. If
// the last Read call failed, or Read hasn't been called yet, it will return an error.
// Each schema handler and extension has its own definition of what a raw record is
// so please check their corresponding doc.
CurrentRawRecord() (interface{}, error)
// RawRecord returns the current raw record ingested from the input stream. If the last
// Read call failed, or Read hasn't been called yet, it will return an error.
RawRecord() (schemahandler.RawRecord, error)
}

type transform struct {
ingester schemahandler.Ingester
lastRawRecord interface{}
lastRawRecord schemahandler.RawRecord
lastErr error
}

Expand Down Expand Up @@ -70,9 +68,9 @@ func (o *transform) Read() ([]byte, error) {
return transformed, err
}

// CurrentRawRecord returns the current raw record ingested from the input stream. If
// the last Read call failed, or Read hasn't been called yet, it will return an error.
func (o *transform) CurrentRawRecord() (interface{}, error) {
// RawRecord returns the current raw record ingested from the input stream. If the last
// Read call failed, or Read hasn't been called yet, it will return an error.
func (o *transform) RawRecord() (schemahandler.RawRecord, error) {
if o.lastErr != nil {
return nil, o.lastErr
}
Expand Down
56 changes: 37 additions & 19 deletions transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,41 @@ import (
"github.com/stretchr/testify/assert"

"github.com/jf-tech/omniparser/errs"
"github.com/jf-tech/omniparser/schemahandler"
)

type testReadCall struct {
record []byte
result []byte
err error
}

func (trc testReadCall) Checksum() string {
if trc.err != nil {
panic("Checksum() called when err != nil")
}
return fmt.Sprintf("checksum of raw record of '%s'", string(trc.result))
}

func (trc testReadCall) Raw() interface{} {
if trc.err != nil {
panic("Raw() called when err != nil")
}
return fmt.Sprintf("raw record of '%s'", string(trc.result))
}

type testIngester struct {
readCalled int
readCalls []testReadCall
continuableErrs map[error]bool
}

func (g *testIngester) Read() (interface{}, []byte, error) {
func (g *testIngester) Read() (schemahandler.RawRecord, []byte, error) {
if g.readCalled >= len(g.readCalls) {
panic(fmt.Sprintf("Read() called %d time(s), but not enough mock entries setup", g.readCalled))
}
r := g.readCalls[g.readCalled]
g.readCalled++
return fmt.Sprintf("raw record %d", g.readCalled-1), r.record, r.err
return r, r.result, r.err
}

func (g *testIngester) IsContinuableError(err error) bool {
Expand All @@ -45,9 +60,9 @@ func TestTransform_Read_EndWithEOF(t *testing.T) {
tfm := &transform{
ingester: &testIngester{
readCalls: []testReadCall{
{record: []byte("1st good read")},
{result: []byte("1st good read")},
{err: continuableErr1},
{record: []byte("2nd good read")},
{result: []byte("2nd good read")},
{err: io.EOF},
},
continuableErrs: map[error]bool{continuableErr1: true},
Expand All @@ -56,32 +71,34 @@ func TestTransform_Read_EndWithEOF(t *testing.T) {
record, err := tfm.Read()
assert.NoError(t, err)
assert.Equal(t, "1st good read", string(record))
raw, err := tfm.CurrentRawRecord()
raw, err := tfm.RawRecord()
assert.NoError(t, err)
assert.Equal(t, "raw record 0", raw.(string))
assert.Equal(t, "raw record of '1st good read'", raw.Raw())
assert.Equal(t, "checksum of raw record of '1st good read'", raw.Checksum())

record, err = tfm.Read()
assert.Error(t, err)
assert.True(t, errs.IsErrTransformFailed(err))
assert.Equal(t, continuableErr1.Error(), err.Error())
assert.Nil(t, record)
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.Error(t, err)
assert.True(t, errs.IsErrTransformFailed(err))
assert.Nil(t, raw)

record, err = tfm.Read()
assert.NoError(t, err)
assert.Equal(t, "2nd good read", string(record))
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.NoError(t, err)
assert.Equal(t, "raw record 2", raw.(string))
assert.Equal(t, "raw record of '2nd good read'", raw.Raw())
assert.Equal(t, "checksum of raw record of '2nd good read'", raw.Checksum())

record, err = tfm.Read()
assert.Error(t, err)
assert.Equal(t, io.EOF, err)
assert.Nil(t, record)
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.Error(t, err)
assert.Equal(t, io.EOF, err)
assert.Nil(t, raw)
Expand All @@ -91,7 +108,7 @@ func TestTransform_Read_EndWithEOF(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, io.EOF, err)
assert.Nil(t, record)
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.Error(t, err)
assert.Equal(t, io.EOF, err)
assert.Nil(t, raw)
Expand All @@ -101,24 +118,25 @@ func TestTransform_Read_EndWithNonContinuableError(t *testing.T) {
tfm := &transform{
ingester: &testIngester{
readCalls: []testReadCall{
{record: []byte("1st good read")},
{result: []byte("1st good read")},
{err: errors.New("fatal error")},
},
},
}
record, err := tfm.Read()
assert.NoError(t, err)
assert.Equal(t, "1st good read", string(record))
raw, err := tfm.CurrentRawRecord()
raw, err := tfm.RawRecord()
assert.NoError(t, err)
assert.Equal(t, "raw record 0", raw.(string))
assert.Equal(t, "raw record of '1st good read'", raw.Raw())
assert.Equal(t, "checksum of raw record of '1st good read'", raw.Checksum())

record, err = tfm.Read()
assert.Error(t, err)
assert.False(t, errs.IsErrTransformFailed(err))
assert.Equal(t, "fatal error", err.Error())
assert.Nil(t, record)
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.Error(t, err)
assert.False(t, errs.IsErrTransformFailed(err))
assert.Equal(t, "fatal error", err.Error())
Expand All @@ -129,15 +147,15 @@ func TestTransform_Read_EndWithNonContinuableError(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, "fatal error", err.Error())
assert.Nil(t, record)
raw, err = tfm.CurrentRawRecord()
raw, err = tfm.RawRecord()
assert.Error(t, err)
assert.Equal(t, "fatal error", err.Error())
assert.Nil(t, raw)
}

func TestTransform_CurrentRawRecord_CalledBeforeRead(t *testing.T) {
func TestTransform_RawRecord_CalledBeforeRead(t *testing.T) {
tfm := &transform{ingester: &testIngester{readCalls: []testReadCall{}}}
raw, err := tfm.CurrentRawRecord()
raw, err := tfm.RawRecord()
assert.Error(t, err)
assert.Equal(t, "must call Read first", err.Error())
assert.Nil(t, raw)
Expand Down