Commit 60824a34 authored by Jonathan Mace's avatar Jonathan Mace

Updates to baggage reader

parent 59c8d0a0
......@@ -7,6 +7,13 @@ import (
"fmt"
)
// Provides the base declaration of BaggageContext and Atoms. BaggageContext is just a slice of atoms.
// Also provides implementation of the five fundamental propagation primitives:
// * Branch -- duplicate a context because execution is branching
// * Merge -- merge two contexts from merging execution branches
// * Serialize / Deserialize --
// * Trim -- impose size restrictions on context
type Atom []byte
type BaggageContext []Atom
......
......@@ -7,57 +7,89 @@ import (
)
type reader struct {
current atomlayer.Atom
remaining atomlayer.BaggageContext
level int
next atomlayer.Atom
currentPath []atomlayer.Atom
remaining atomlayer.BaggageContext
skipped atomlayer.BaggageContext
level int
overflowed bool
err error
err error
}
func ReadBaggageBytes(serializedBaggageContext []byte) (r reader) {
r.remaining, r.err = atomlayer.Deserialize(serializedBaggageContext)
func Read(atoms atomlayer.BaggageContext) (r reader) {
r.remaining = atoms
r.level = -1
r.advance()
return
}
func ReadBaggageAtoms(atoms atomlayer.BaggageContext) (r reader) {
r.remaining = atoms
r.level = -1
r.advance()
return
func Open(atoms atomlayer.BaggageContext, bagIndex uint64) (r reader) {
// TODO: this
return Read(atoms)
}
// Advances r.current zero or more atoms, until it's a header atom. If it's already a header atom, does nothing.
// Closes the reader, treating all remaining atoms as skipped
func (r *reader) Close() {
// Exit any current bags
for r.level >= 0 { r.Exit() }
// Make sure we're not at data atoms
r.advanceToNextHeader()
// Remaining are skipped
if r.next != nil {
r.skipped = append(r.skipped, r.next)
if len(r.remaining) > 0 {
r.skipped = append(r.skipped, r.remaining...)
r.remaining = nil
}
r.next = nil
}
}
// Advances r.next zero or more atoms, until it's a header atom. If it's already a header atom, does nothing.
// Returns the headeratom and its level.
func (r *reader) advanceToNextHeader() (atomlayer.Atom, int) {
// Find a header atom if we're not already at one
for r.current != nil && !IsHeader(r.current) {
r.advance()
}
for {
switch {
case r.next == nil: goto noheader // End of baggage or error
case atomlayer.IsTrimMarker(r.next): r.overflowed = true; goto nextatom // Handle overflow marker
case IsHeader(r.next): goto foundheader // Found the next header atom
case IsData(r.next): goto nextatom // Skip any data atoms
}
if r.current == nil { return nil, -1 } // Reached end of baggage or error
foundheader:
switch level, err := HeaderLevel(r.next); {
case err != nil: r.seterror(err); goto noheader // Cannot interpret the header
default: return r.next, level // Valid header; return it
}
// Interpret the header level
level, err := HeaderLevel(r.current);
if err != nil {
r.seterror(err)
noheader:
return nil, -1
}
return r.current, level
nextatom:
r.advance()
}
}
// Advance into the next child bag of the current bag, if there is one; if there isn't, does nothing, and returns nil
// Advance into the next child bag of the next bag, if there is one; if there isn't, does nothing, and returns nil
func (r *reader) Enter() atomlayer.Atom {
switch header, level := r.advanceToNextHeader(); {
case header == nil: return nil // End of baggage or error was encountered
case level <= r.level: return nil // End of current bag
case level == r.level+1: r.level++; r.advance(); return header // Found child bag 1 level deeper
default: { // Unexpected grandchild 2 levels or deeper
r.seterror(fmt.Errorf("Child bag jumped more than one level from %v to %v", r.level, level))
return nil
}}
header, level := r.advanceToNextHeader()
switch {
case header == nil: goto exhausted // End of baggage/error
case level <= r.level: goto exhausted // Bag exhausted
case level == r.level+1: goto found // Found child bag
default: r.seterror(invalidGrandchild(r.level, level)); goto exhausted // Invalid jump >1 level
}
found:
r.level++
r.currentPath = append(r.currentPath, r.next)
r.advance()
return header
exhausted:
return nil
}
......@@ -74,44 +106,114 @@ func (r *reader) EnterKeyed(key []byte) bool {
// Advance to provided header atom, ignoring all preceding child bags, and stopping if we reach the end of bag
func (r *reader) enter(target []byte) bool {
for {
switch header, level := r.advanceToNextHeader(); {
case header == nil: return false // End of baggage or error was encountered
case level <= r.level: return false // End of current bag
case level == r.level+1: { // Compare to child bag
switch bytes.Compare(header, target) {
case -1: r.advance() // Found a preceding child; enter it and continue
case 0: r.level++; r.advance(); return true // Found the target bag; enter it and return
case 1: return false // We've advanced past where the header would have been
}
header, level := r.advanceToNextHeader()
// Check if parent/child
switch {
case header == nil: goto notfound // End of baggage or error was encountered
case level <= r.level: goto notfound // Reached end of current bag
case level > r.level+1: goto nextbag // A descendent bag that we want to ignore
}
case level > r.level+1: r.advance() // Ignore all descendent bags of current bag and continue
// Check sibling bag precedence
switch bytes.Compare(header, target) {
case -1: goto nextbag // A preceding sibling; bag can still appear later
case 0: goto found // Found the target bag; enter it and return
case 1: goto notfound // We've advanced past where the bag would have been
}
nextbag:
r.skipuntil(r.level+1);
continue
found:
r.level++
r.currentPath = append(r.currentPath, r.next)
r.advance()
return true
notfound:
return false
}
}
// Advance to the end of the current bag and pop back up to the parent
// Skips bags, treating them as unprocessed, until we reach a bag at or below the specified level
func (r *reader) skipuntil(stopAtLevel int) {
skippedAtoms := append(append([]atomlayer.Atom(nil), r.currentPath...), r.next)
r.advance()
for {
// Non-header atoms
switch {
case r.next == nil: goto finish // End of baggage so we're done
case atomlayer.IsTrimMarker(r.next): goto trimmarker // Include trim marker in skipped
case IsData(r.next): goto skipatom // Skip all data atoms
}
// Header atoms
switch level, err := HeaderLevel(r.next); {
case err != nil: r.seterror(err); goto finish // Invalid header, abort
case level <= stopAtLevel: goto finish // End of the bag being skipped
default: goto skipatom // A descendent bag; keep skipping
}
trimmarker:
switch r.overflowed {
case true: goto nextatom // Ignore redundant trim marker
case false: r.overflowed = true; goto skipatom // First trim marker seen
}
skipatom:
skippedAtoms = append(skippedAtoms, r.next);
nextatom:
r.advance()
}
finish:
r.skipped = atomlayer.Merge(r.skipped, skippedAtoms);
}
// Advance to the end of the next bag and pop back up to the parent
func (r *reader) Exit() {
for {
switch header, level := r.advanceToNextHeader(); {
case header == nil: r.level -= 1; return // Reached end of baggage or error
case level <= r.level: r.level -= 1; return // Successfully reached end of current bag
case level > r.level: r.advance(); // Ignore all descendent bags of current bag and continue
case len(r.currentPath) == 0: r.seterror(invalidExit()); return // Called exit too many times
case header == nil: goto exit // End of baggage or error encountered
case level <= r.level: goto exit // Reached end of current bag
case level > r.level: goto skipbag // A descendent bag to ignore
}
exit:
r.level--
r.currentPath = r.currentPath[:len(r.currentPath)-1]
return
skipbag:
r.skipuntil(r.level)
}
}
// Reads the payload of the next data atom from the current bag. Returns nil if there are no data atoms remaining
// Reads the payload of the next data atom from the next bag. Returns nil if there are no data atoms remaining
func (r *reader) Next() []byte {
if IsData(r.current) {
payload, err := Payload(r.current)
if err != nil {
r.seterror(err)
return nil
for {
// Non-data atoms
switch {
case r.next == nil: goto nodata // End of baggage or an error
case atomlayer.IsTrimMarker(r.next): r.overflowed = true; goto nextatom // Trim marker, continue
case !IsData(r.next): goto nodata // Not a data atom
}
r.advance()
return payload
} else {
// Data atoms
switch payload, err := Payload(r.next); {
case err != nil: r.seterror(err); goto nodata // Invalid data atom
default: r.advance(); return payload // Valid data atom
}
nodata:
return nil
nextatom:
r.advance();
}
}
......@@ -120,27 +222,35 @@ func (r *reader) Error() error {
return r.err
}
// Returns true or false if the
func (r *reader) seterror(err error) error {
if err != nil {
r.err = err
r.current = nil
r.next = nil
}
return r.err
}
func (r *reader) advance() {
r.current = nil
if r.err != nil { return } // We're done once we encounter an error
switch {
case r.err != nil: goto exhausted // Error occurred - stop
case len(r.remaining) == 0: goto exhausted // No atoms remaining
default: goto advance // Advance to next atom
}
for len(r.remaining) > 0 {
r.current = r.remaining[0]
r.remaining = r.remaining[1:]
advance:
r.next = r.remaining[0]
r.remaining = r.remaining[1:]
return
switch atomlayer.IsTrimMarker(r.current) {
case true: r.overflowed = true // We overflowed here -- advance to next atom
case false: return // At a valid atom, continue
}
}
exhausted:
r.next = nil
return
}
func invalidGrandchild(currentLevel, childLevel int) error {
return fmt.Errorf("Child bag jumped more than one level from %v to %v", currentLevel, childLevel)
}
func invalidExit() error {
return fmt.Errorf("Exit called too many times without corresponding bag entries")
}
\ No newline at end of file
......@@ -19,6 +19,518 @@ func TestSliceEqual(t *testing.T) {
assert.NotEqual(t, arr[1:], nil)
}
func TestSimpleEnter1(t *testing.T) {
baggage := []byte{}
atoms, err := atomlayer.Deserialize(baggage);
assert.Nil(t, err)
assert.Equal(t, 0, len(atoms))
reader := Read(atoms)
assert.Nil(t, reader.Enter())
assert.Nil(t, reader.Next())
}
func TestSimpleEnter2(t *testing.T) {
reader := Read(nil)
assert.Nil(t, reader.Enter())
assert.Nil(t, reader.Next())
}
func TestSimpleReadData(t *testing.T) {
header := atomlayer.Atom{ 248, 5 }
r := Read(atomlayer.BaggageContext{header})
entered := r.Enter()
assert.NotNil(t, entered)
assert.Equal(t, header, entered)
assert.Equal(t, 0, r.level)
assert.Nil(t, r.next)
assert.Empty(t, r.remaining)
assert.Empty(t, r.skipped)
assert.Equal(t, []atomlayer.Atom{header}, r.currentPath)
assert.Nil(t, r.Enter())
assert.Nil(t, r.Next())
assert.Equal(t, 0, r.level)
assert.Equal(t, []atomlayer.Atom{header}, r.currentPath)
r.Exit()
assert.Equal(t, -1, r.level)
assert.Empty(t, r.currentPath)
r.Exit()
assert.Equal(t, -1, r.level)
assert.Empty(t, r.currentPath)
}
func TestValidLevelJump(t *testing.T) {
baggage := atomlayer.BaggageContext{
[]byte{248, 5},
[]byte{0, 185, 124, 187, 14, 103, 240, 88, 153},
[]byte{240, 0},
[]byte{0, 131, 154, 212, 173, 65, 53, 70, 55},
}
r := Read(baggage)
assert.Nil(t, r.err)
header := r.Enter()
assert.NotNil(t, header)
assert.Nil(t, r.err)
}
func TestInvalidLevelJump(t *testing.T) {
baggage := atomlayer.BaggageContext{
[]byte{240, 0},
[]byte{0, 131, 154, 212, 173, 65, 53, 70, 55},
[]byte{248, 5},
[]byte{0, 185, 124, 187, 14, 103, 240, 88, 153},
}
r := Read(baggage)
assert.Nil(t, r.err)
header := r.Enter()
assert.Nil(t, header)
assert.NotNil(t, r.err)
}
func TestInvalidHeaderAtom(t *testing.T) {
baggage := atomlayer.BaggageContext{
[]byte{240},
[]byte{0, 131, 154, 212, 173, 65, 53, 70, 55},
}
r := Read(baggage)
assert.Nil(t, r.err)
header := r.Enter()
assert.Nil(t, header)
assert.NotNil(t, r.err)
}
func TestMultipleDataAtoms(t *testing.T) {
baggage := atomlayer.BaggageContext{
[]byte{0, 0},
[]byte{0, 1},
[]byte{0, 2},
}
r := Read(baggage)
assert.Nil(t, r.err)
assert.NotNil(t, r.Next())
assert.NotNil(t, r.Next())
assert.NotNil(t, r.Next())
assert.Nil(t, r.Next())
}
func TestEnterSkipsBags(t *testing.T) {
baggage := atomlayer.BaggageContext{
[]byte{248, 3},
[]byte{0, 6},
[]byte{248, 5},
[]byte{0, 7},
}
r := Read(baggage)
assert.True(t, r.EnterIndexed(5))
assert.Equal(t, []byte{7}, r.Next())
assert.Nil(t, r.Next())
assert.Nil(t, r.Enter())
assert.Equal(t, []atomlayer.Atom{[]byte{248, 5}}, r.currentPath)
r.Exit()
assert.Equal(t, []atomlayer.Atom{}, r.currentPath)
}
func TestSkippedAtomsSimple(t *testing.T) {
baggage := atomlayer.BaggageContext{
[]byte{248, 3},
[]byte{0, 6},
[]byte{248, 5},
[]byte{0, 7},
}
r := Read(baggage)
assert.True(t, r.EnterIndexed(5))
assert.Equal(t, atomlayer.BaggageContext{[]byte{248, 3}, []byte{0, 6}}, r.skipped)
r.Exit()
assert.Equal(t, atomlayer.BaggageContext{[]byte{248, 3}, []byte{0, 6}}, r.skipped)
r.Exit()
assert.Equal(t, atomlayer.BaggageContext{[]byte{248, 3}, []byte{0, 6}}, r.skipped)
}
func TestSkipNestedBags(t *testing.T) {
baggage := atomlayer.BaggageContext{
[]byte{248, 3},
[]byte{0, 7},
[]byte{0, 100},
[]byte{240, 0},
[]byte{0, 6},
[]byte{248, 5},
[]byte{0, 7},
}
r := Read(baggage)
assert.True(t, r.EnterIndexed(5))
assert.Equal(t,
atomlayer.BaggageContext{[]byte{248, 3}, []byte{0, 7}, []byte{0, 100}, []byte{240, 0}, []byte{0, 6}},
r.skipped)
r.Exit()
r.Exit()
assert.Equal(t,
atomlayer.BaggageContext{[]byte{248, 3}, []byte{0, 7}, []byte{0, 100}, []byte{240, 0}, []byte{0, 6}},
r.skipped)
}
func TestSkippedAtomsPartial(t *testing.T) {
baggage := atomlayer.BaggageContext{
[]byte{248, 3},
[]byte{240, 0},
[]byte{0, 6},
}
r := Read(baggage)
assert.True(t, r.EnterIndexed(3))
r.Exit()
assert.Equal(t, atomlayer.BaggageContext{[]byte{248, 3}, []byte{240, 0}, []byte{0, 6}}, r.skipped)
r.Exit()
r.Exit()
assert.Equal(t, atomlayer.BaggageContext{[]byte{248, 3}, []byte{240, 0}, []byte{0, 6}}, r.skipped)
}
func TestSkippedAtomsDropsInitialDataAtoms(t *testing.T) {
baggage := atomlayer.BaggageContext{
[]byte{248, 3},
[]byte{0, 7},
[]byte{0, 100},
[]byte{240, 0},
[]byte{0, 6},
}
r := Read(baggage)
assert.True(t, r.EnterIndexed(3))
r.Exit()
assert.Equal(t, atomlayer.BaggageContext{[]byte{248, 3}, []byte{240, 0}, []byte{0, 6}}, r.skipped)
r.Exit()
r.Exit()
assert.Equal(t, atomlayer.BaggageContext{[]byte{248, 3}, []byte{240, 0}, []byte{0, 6}}, r.skipped)
}
func header(level int, index uint64) atomlayer.Atom {
return MakeIndexedHeader(level, index)
}
func keyed(level int, key string) atomlayer.Atom {
return MakeKeyedHeader(level, []byte(key))
}
func data(bytes ...byte) atomlayer.Atom {
return append([]byte{0}, bytes...)
}
func atoms(atoms ...atomlayer.Atom) atomlayer.BaggageContext {
return atomlayer.BaggageContext(atoms)
}
func TestMultipleSkippedAtoms(t *testing.T) {
baggage := atomlayer.BaggageContext{
header(0, 3),
data(7),
data(100),
header(1, 0),
data(6),
header(1, 3),
data(15),
header(0, 4),
data(2),
header(1,0),
data(20),
header(0,5),
data(2),
data(11),
header(1, 1000000),
data(15),
header(0, 10000001),
data(5,5,5,5,5),
}
r := Read(baggage)
assert.True(t, r.EnterIndexed(3))
assert.NotNil(t, r.Next())
assert.True(t, r.EnterIndexed(0))
r.Exit()
r.Exit()
assert.Equal(t, atoms(
header(0,3),
header(1, 3),
data(15),
), r.skipped)
assert.True(t, r.EnterIndexed(5))
assert.Equal(t, atoms(
header(0,3),
header(1, 3),
data(15),
header(0, 4),
data(2),
header(1, 0),
data(20),
), r.skipped)
r.Exit()
assert.Equal(t, atoms(
header(0,3),
header(1, 3),
data(15),
header(0, 4),
data(2),
header(1, 0),
data(20),
header(0, 5),
header(1, 1000000),
data(15),
), r.skipped)
r.Close()
assert.Equal(t, atoms(
header(0,3),
header(1, 3),
data(15),
header(0, 4),
data(2),
header(1, 0),
data(20),
header(0, 5),
header(1, 1000000),
data(15),
header(0, 10000001),
data(5,5,5,5,5),
), r.skipped)
}