Commit 2bd7f8ce authored by Jonathan Mace's avatar Jonathan Mace

Initial commit of skeleton implementation of atom layer and baggage protocol. ...

Initial commit of skeleton implementation of atom layer and baggage protocol.  Minimally tested in baggage protocol
parent ccefe7cf
package atomlayer
import (
"bytes"
"github.com/golang/protobuf/proto"
"encoding/base64"
"fmt"
)
type Atom []byte
type BaggageContext []Atom
// Merges two BaggageContexts by lexicographically comparing their atoms
func Merge(a, b BaggageContext) BaggageContext {
if a == nil && b == nil { return nil }
merged := BaggageContext(make([]Atom, 0, len(a)+len(b)))
i, j := 0, 0
for i < len(a) && j < len(b) {
switch bytes.Compare(a[i], b[j]) {
case -1: merged = append(merged, a[i]); i++;
case 0: merged = append(merged, a[i]); i++; j++;
case 1: merged = append(merged, b[j]); j++;
}
}
merged = append(merged, a[i:]...)
merged = append(merged, b[j:]...)
return merged
}
// Duplicates a BaggageContext
func Branch(a BaggageContext) BaggageContext {
return append(BaggageContext(nil), a...)
}
// Returns the serialized size in bytes of this atom array.
func (atoms BaggageContext) SerializedSize() (size int) {
for _, atom := range atoms { size += atom.serializedSize() }
return
}
// Calculate the serialized size in bytes of this atom
func (atom Atom) serializedSize() int {
return proto.SizeVarint(uint64(len(atom))) + len(atom)
}
// Serializes the baggage context by varint-prefixing each atom.]
func Serialize(atoms BaggageContext) []byte {
if atoms == nil { return nil }
length := atoms.SerializedSize()
serializedAtoms := make([]byte, 0, length)
for _, atom := range atoms {
serializedAtoms = append(serializedAtoms, proto.EncodeVarint(uint64(len(atom)))...)
serializedAtoms = append(serializedAtoms, atom...)
}
return serializedAtoms
}
// Deserializes a baggage context from bytes
func Deserialize(bytes []byte) (atoms BaggageContext, err error) {
pos := 0
for len(bytes) > 0 {
x, n := proto.DecodeVarint(bytes)
switch {
case n == 0 && len(bytes) > 10: bytes = bytes[:10]; fallthrough
case n == 0: err = fmt.Errorf("Encountered at position %v invalid varint %v", pos, bytes); return
case n + int(x) > len(bytes): err = fmt.Errorf("Insufficient bytes remaining in buffer for %v-length atom at position %v", x, pos); return
default: {
bytes = bytes[n:]
atoms = append(atoms, Atom(bytes[:int(x)]))
bytes = bytes[int(x):]
pos += n + int(x)
}}
}
return
}
// Serializes the provided BaggageContext then base64 encodes it into a string
func EncodeBase64(ctx BaggageContext) string {
return base64.StdEncoding.EncodeToString(Serialize(ctx))
}
// Decodes and deserializes a BaggageContext from the provided base64-encoded string
func DecodeBase64(encoded string) (BaggageContext, error) {
bytes, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return nil, err
}
return Deserialize(bytes)
}
var trimMarker = Atom(make([]byte, 0, 0)) // Special zero-length atom used to indicate trim
func IsTrimMarker(a Atom) bool {
return bytes.Equal(trimMarker, a)
}
// Drop atoms from the BaggageContext so that it fits into the specified number of bytes
func Trim(atoms BaggageContext, maxSize int) BaggageContext {
switch trimAt := atoms.indexForTrim(maxSize); {
case trimAt == len(atoms): return atoms
default: return append(atoms[:trimAt], trimMarker)
}
}
// Calculates the index at which to trim the baggage to fit in the specified size
func (baggage BaggageContext) indexForTrim(size int) int {
for i, atom := range baggage {
switch atomSize := atom.serializedSize(); {
case atomSize < size: size -= atomSize;
case atomSize > size: return i
case i == len(baggage)-1: size -= atomSize;
default: return i
}
}
return len(baggage)
}
package atomlayer
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestLexicographicMerge(t *testing.T) {
a := BaggageContext{Atom{0,1,1,1}, Atom{1}}
b := BaggageContext{Atom{0,1,1,1}, Atom{2}}
assert.Equal(t, BaggageContext{Atom{0,1,1,1}, Atom{1}, Atom{2}}, Merge(a, b))
c := BaggageContext{Atom{1}, Atom{0,1,1,1}}
assert.Equal(t, BaggageContext{Atom{0,1,1,1}, Atom{1}, Atom{0,1,1,1}}, Merge(a, c))
assert.Equal(t, BaggageContext{Atom{0,1,1,1}, Atom{1}, Atom{0,1,1,1}, Atom{2}}, Merge(b, c))
}
func TestLexicographicMerge2(t *testing.T) {
a := BaggageContext{Atom{1}, Atom{0,3,1,6}, Atom{3,1,1,1}, }
b := BaggageContext{Atom{1}, Atom{2,10}}
assert.Equal(t, BaggageContext{Atom{1}, Atom{0,3,1,6}, Atom{2,10}, Atom{3,1,1,1}}, Merge(a, b))
}
func TestLexicographicMergeNils(t *testing.T) {
assert.Equal(t, BaggageContext(nil), Merge(nil, nil))
assert.Equal(t, BaggageContext{Atom{1}}, Merge(BaggageContext{Atom{1}}, nil))
assert.Equal(t, BaggageContext{Atom{1}}, Merge(nil, BaggageContext{Atom{1}}))
assert.Equal(t, BaggageContext{}, Merge(nil, BaggageContext{}))
}
func TestSerializeEmptyAtoms(t *testing.T) {
atomContext := BaggageContext{Atom{}}
assert.Equal(t, 1, len(atomContext))
assert.Equal(t, 0, len(atomContext[0]))
serialized := Serialize(atomContext)
assert.NotNil(t, serialized)
assert.Equal(t, 1, len(serialized))
assert.Equal(t, byte(0), serialized[0])
atomContext = BaggageContext{Atom{}, Atom{}, Atom{}}
assert.Equal(t, 3, len(atomContext))
assert.Equal(t, 0, len(atomContext[0]))
assert.Equal(t, 0, len(atomContext[1]))
assert.Equal(t, 0, len(atomContext[2]))
serialized = Serialize(atomContext)
assert.NotNil(t, serialized)
assert.Equal(t, 3, len(serialized))
assert.Equal(t, byte(0), serialized[0])
assert.Equal(t, byte(0), serialized[1])
assert.Equal(t, byte(0), serialized[2])
}
func TestSerializeOneAtom(t *testing.T) {
atomContext := BaggageContext{Atom{5, 10, 20}}
assert.Equal(t, 1, len(atomContext))
assert.Equal(t, 3, len(atomContext[0]))
serialized := Serialize(atomContext)
assert.NotNil(t, serialized)
assert.Equal(t, 4, len(serialized))
assert.Equal(t, byte(3), serialized[0])
assert.Equal(t, byte(5), serialized[1])
assert.Equal(t, byte(10), serialized[2])
assert.Equal(t, byte(20), serialized[3])
}
func TestDeserializeEmpty(t *testing.T) {
emptyBytes := Atom{}
atoms, err := Deserialize(emptyBytes)
assert.Equal(t, 0, len(atoms))
assert.Nil(t, err)
}
func TestDeserializeEmptyAtoms(t *testing.T) {
emptyBytes := Atom{0,0,0,0,0}
atoms, err := Deserialize(emptyBytes)
assert.Equal(t, 5, len(atoms))
for i := 0; i < 5; i++ {
assert.Equal(t, 0, len(atoms[i]))
}
assert.Nil(t, err)
}
func TestInsufficientAtomBytesRemaining(t *testing.T) {
badBytes := Atom{1}
atoms, err := Deserialize(badBytes)
assert.Equal(t, 0, len(atoms))
assert.NotNil(t, err)
}
func TestBadVarintPrefix(t *testing.T) {
badBytes := Atom{255}
atoms, err := Deserialize(badBytes)
assert.Equal(t, 0, len(atoms))
assert.NotNil(t, err)
}
func TestSerializeDeserialize(t *testing.T) {
atomContext := BaggageContext{Atom{1,2,3,4,5}, Atom{7,3,7}, Atom{}, Atom{1}, Atom{1}}
bytes := Serialize(atomContext)
assert.Equal(t, 15, len(bytes))
deserializedContext, err := Deserialize(bytes)
assert.Nil(t, err)
assert.Equal(t, atomContext, deserializedContext)
deserializedContext, err = Deserialize(bytes[:14])
assert.NotNil(t, err)
assert.Equal(t, 4, len(deserializedContext))
assert.Equal(t, atomContext[:4], deserializedContext[:4])
}
func TestBase64EncodeDecodeNilContext(t *testing.T) {
atomContext, err := DecodeBase64(EncodeBase64(nil))
assert.Nil(t, err)
assert.Equal(t, 0, len(atomContext))
}
func TestBase64EncodeDecode(t *testing.T) {
atomContext := BaggageContext{Atom{1,2,3,4,5}, Atom{7,3,7}, Atom{}, Atom{1}, Atom{1}}
decodedContext, err := DecodeBase64(EncodeBase64(atomContext))
assert.Nil(t, err)
assert.Equal(t, atomContext, decodedContext)
}
func TestBase64Decode(t *testing.T) {
decodedContext, err := DecodeBase64("")
assert.Nil(t, err)
assert.Equal(t, 0, len(decodedContext))
}
func TestTrim(t *testing.T) {
assert.Equal(t, BaggageContext{Atom{1,2,3,4,5}}, Trim(BaggageContext{Atom{1,2,3,4,5}}, 6))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}}, 5))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}}, 4))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}}, 3))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}}, 2))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}}, 1))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}}, 0))
assert.Equal(t, BaggageContext(nil), Trim(nil, 3))
assert.Equal(t, BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 10))
assert.Equal(t, BaggageContext{Atom{1,2,3,4,5}, Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 9))
assert.Equal(t, BaggageContext{Atom{1,2,3,4,5}, Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 8))
assert.Equal(t, BaggageContext{Atom{1,2,3,4,5}, Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 7))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 6))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 5))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 4))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 3))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 2))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 1))
assert.Equal(t, BaggageContext{Atom{}}, Trim(BaggageContext{Atom{1,2,3,4,5}, Atom{3, 2, 1}}, 0))
}
\ No newline at end of file
package baggageprotocol
import (
"github.com/tracingplane/tracingplane-go/atomlayer"
"fmt"
)
const header_prefix_byte = 0x80
const data_prefix_byte = 0x00
func IsHeader(atom atomlayer.Atom) bool {
return len(atom) != 0 && (atom[0] & 0x80) == 0x80
}
func IsData(atom atomlayer.Atom) bool {
return len(atom) != 0 && (atom[0] & 0x80) == 0x00
}
func IsIndexedHeader(atom atomlayer.Atom) bool {
return len(atom) != 0 && (atom[0] & 0x03) == 0x00
}
func IsKeyedHeader(atom atomlayer.Atom) bool {
return len(atom) != 0 && (atom[0] & 0x03) == 0x02
}
func HeaderLevel(atom atomlayer.Atom) (int, error) {
if len(atom) == 0 { return 0, fmt.Errorf("Invalid zero-length header atom") }
return 15 - int((atom[0] & 0x78) >> 3), nil
}
func HeaderIndex(atom atomlayer.Atom) (uint64, error) {
if len(atom) == 0 { return 0, fmt.Errorf("Invalid zero-length header atom") }
index, length := DecodeUnsignedLexVarint(atom[1:])
if length == 0 { return 0, fmt.Errorf("Malformed indexed header atom -- cannot decode varint %v", atom[1:]) }
return uint64(index), nil
}
func HeaderKey(atom atomlayer.Atom) ([]byte, error) {
if len(atom) == 0 { return nil, fmt.Errorf("Invalid zero-length header atom") }
return atom[1:], nil
}
func Payload(atom atomlayer.Atom) ([]byte, error) {
if len(atom) == 0 { return nil, fmt.Errorf("Invalid zero-length data atom") }
return atom[1:], nil
}
func MakeIndexedHeader(level int, index uint64) []byte {
prefix := 0x80 | ((uint8(15 - level) << 3) & 0x78) | 0x00
payload := EncodeUnsignedLexVarint(index)
return append(append(make([]byte, 0, len(payload)+1), prefix), payload...)
}
func MakeKeyedHeader(level int, key []byte) []byte {
prefix := 0x80 | ((uint8(level) << 3) & 0x78) | 0x02
return append(append(make([]byte, 0, len(key)+1), prefix), key...)
}
func MakeDataAtom(payload []byte) []byte {
return append(append(make([]byte, 0, len(payload)+1), 0x00), payload...)
}
\ No newline at end of file
package baggageprotocol
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tracingplane/tracingplane-go/atomlayer"
)
func TestInterpretAtoms(t *testing.T) {
assert.True(t, atomlayer.IsTrimMarker(atomlayer.Atom(nil)))
assert.False(t, IsHeader(atomlayer.Atom(nil)))
assert.False(t, IsData(atomlayer.Atom(nil)))
assert.False(t, atomlayer.IsTrimMarker(atomlayer.Atom([]byte{0})))
assert.True(t, IsData(atomlayer.Atom([]byte{0})))
assert.False(t, IsHeader(atomlayer.Atom([]byte{0})))
assert.False(t, atomlayer.IsTrimMarker(atomlayer.Atom([]byte{0, 128, 5, 3})))
assert.True(t, IsData(atomlayer.Atom([]byte{0, 128, 5, 3})))
assert.False(t, IsHeader(atomlayer.Atom([]byte{0, 128, 5, 3})))
assert.False(t, atomlayer.IsTrimMarker(atomlayer.Atom([]byte{128})))
assert.False(t, IsData(atomlayer.Atom([]byte{128})))
assert.True(t, IsHeader(atomlayer.Atom([]byte{128})))
}
func TestInterpretDataAtoms(t *testing.T) {
for i:=0; i<128; i++ {
atom := atomlayer.Atom([]byte{byte(i)})
assert.True(t, IsData(atom))
assert.False(t, IsHeader(atom))
assert.False(t, atomlayer.IsTrimMarker(atom))
}
for i:=128; i<256; i++ {
atom := atomlayer.Atom([]byte{byte(i)})
assert.False(t, IsData(atom))
assert.True(t, IsHeader(atom))
assert.False(t, atomlayer.IsTrimMarker(atom))
}
}
func checkHeaderAtomLevel(t *testing.T, expectedLevel int, bytes []byte) {
actualLevel, err := HeaderLevel(atomlayer.Atom(bytes))
assert.Nil(t, err)
assert.Equal(t, expectedLevel, actualLevel)
}
func TestInterpretHeaderAtoms(t *testing.T) {
checkHeaderAtomLevel(t, 0, []byte{31 << 3})
checkHeaderAtomLevel(t, 1, []byte{30 << 3})
checkHeaderAtomLevel(t, 2, []byte{29 << 3})
checkHeaderAtomLevel(t, 3, []byte{28 << 3})
checkHeaderAtomLevel(t, 4, []byte{27 << 3})
checkHeaderAtomLevel(t, 5, []byte{26 << 3})
checkHeaderAtomLevel(t, 6, []byte{25 << 3})
checkHeaderAtomLevel(t, 7, []byte{24 << 3})
checkHeaderAtomLevel(t, 8, []byte{23 << 3})
checkHeaderAtomLevel(t, 9, []byte{22 << 3})
checkHeaderAtomLevel(t, 10, []byte{21 << 3})
checkHeaderAtomLevel(t, 11, []byte{20 << 3})
checkHeaderAtomLevel(t, 12, []byte{19 << 3})
checkHeaderAtomLevel(t, 13, []byte{18 << 3})
checkHeaderAtomLevel(t, 14, []byte{17 << 3})
checkHeaderAtomLevel(t, 15, []byte{16 << 3})
}
\ No newline at end of file
package baggageprotocol
import (
"github.com/tracingplane/tracingplane-go/atomlayer"
"fmt"
"bytes"
)
type reader struct {
current atomlayer.Atom
remaining atomlayer.BaggageContext
level int
overflowed bool
err error
}
func ReadBaggageBytes(serializedBaggageContext []byte) (r reader) {
r.remaining, r.err = atomlayer.Deserialize(serializedBaggageContext)
r.level = -1
r.advance()
return
}
func ReadBaggageAtoms(atoms atomlayer.BaggageContext) (r reader) {
r.remaining = atoms
r.level = -1
r.advance()
return
}
// Advances r.current zero or more atoms, until it's a header atom. If it's already a header atom, does nothing.
// Returns the headeratom and its level.
func (r *reader) advanceToNextHeader() (atomlayer.Atom, int) {
// Find a header atom if we're not already at one
for r.current != nil && !IsHeader(r.current) {
r.advance()
}
if r.current == nil { return nil, -1 } // Reached end of baggage or error
// Interpret the header level
level, err := HeaderLevel(r.current);
if err != nil {
r.seterror(err)
return nil, -1
}
return r.current, level
}
// Advance into the next child bag of the current bag, if there is one; if there isn't, does nothing, and returns nil
func (r *reader) Enter() atomlayer.Atom {
switch header, level := r.advanceToNextHeader(); {
case header == nil: return nil // End of baggage or error was encountered
case level <= r.level: return nil // End of current bag
case level == r.level+1: r.level++; r.advance(); return header // Found child bag 1 level deeper
default: { // Unexpected grandchild 2 levels or deeper
r.seterror(fmt.Errorf("Child bag jumped more than one level from %v to %v", r.level, level))
return nil
}}
return nil
}
// Advance to the specified child bag, ignoring all preceding child bags, and stopping if we reach the end of bag
func (r *reader) EnterIndexed(index uint64) bool {
return r.enter(MakeIndexedHeader(r.level + 1, index))
}
// Advance to the specified child bag, ignoring all preceding child bags, and stopping if we reach the end of bag
func (r *reader) EnterKeyed(key []byte) bool {
return r.enter(MakeKeyedHeader(r.level + 1, key))
}
// Advance to provided header atom, ignoring all preceding child bags, and stopping if we reach the end of bag
func (r *reader) enter(target []byte) bool {
for {
switch header, level := r.advanceToNextHeader(); {
case header == nil: return false // End of baggage or error was encountered
case level <= r.level: return false // End of current bag
case level == r.level+1: { // Compare to child bag
switch bytes.Compare(header, target) {
case -1: r.advance() // Found a preceding child; enter it and continue
case 0: r.level++; r.advance(); return true // Found the target bag; enter it and return
case 1: return false // We've advanced past where the header would have been
}
}
case level > r.level+1: r.advance() // Ignore all descendent bags of current bag and continue
}
}
}
// Advance to the end of the current bag and pop back up to the parent
func (r *reader) Exit() {
for {
switch header, level := r.advanceToNextHeader(); {
case header == nil: r.level -= 1; return // Reached end of baggage or error
case level <= r.level: r.level -= 1; return // Successfully reached end of current bag
case level > r.level: r.advance(); // Ignore all descendent bags of current bag and continue
}
}
}
// Reads the payload of the next data atom from the current bag. Returns nil if there are no data atoms remaining
func (r *reader) Next() []byte {
if IsData(r.current) {
payload, err := Payload(r.current)
if err != nil {
r.seterror(err)
return nil
}
r.advance()
return payload
} else {
return nil
}
}
// Returns the error if one occurred. All operations stop after an error occurs
func (r *reader) Error() error {
return r.err
}
// Returns true or false if the
func (r *reader) seterror(err error) error {
if err != nil {
r.err = err
r.current = nil
}
return r.err
}
func (r *reader) advance() {
r.current = nil
if r.err != nil { return } // We're done once we encounter an error
for len(r.remaining) > 0 {
r.current = r.remaining[0]
r.remaining = r.remaining[1:]
switch atomlayer.IsTrimMarker(r.current) {
case true: r.overflowed = true // We overflowed here -- advance to next atom
case false: return // At a valid atom, continue
}
}
}
\ No newline at end of file
package baggageprotocol
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tracingplane/tracingplane-go/atomlayer"
"encoding/binary"
"bytes"
"github.com/gogo/protobuf/proto"
)
func TestSliceEqual(t *testing.T) {
assert.NotEqual(t, make([]byte, 0), nil)
assert.NotEqual(t, nil, make([]byte, 0))
assert.False(t, make([]byte, 0) == nil)
arr := []byte{0}
assert.NotEqual(t, arr[1:], nil)
}
func TestSimpleXTraceBaggage(t *testing.T) {
baggage := []byte{ 2, 248, 5,
2, 240, 0,
9, 0, 131, 154, 212, 173, 65, 53, 70, 55,
2, 240, 1,
9, 0, 185, 124, 187, 14, 103, 240, 88, 153 }
// bag is registered to index 5
//
// bag XTraceBaggage {
// fixed64 task_id = 0;
// set<fixed64> parent_ids = 1;
// fixed64 discovery_id = 2;
// int32 logging_level = 3;
//}
xtraceBagIndex := uint64(5)
taskIdIndex := uint64(0)
parentEventIdIndex := uint64(1)
atoms, err := atomlayer.Deserialize(baggage);
assert.Nil(t, err)
assert.Equal(t, 5, len(atoms))
reader := ReadBaggageAtoms(atoms)
assert.Nil(t, reader.Next())
assert.True(t, reader.EnterIndexed(xtraceBagIndex))
assert.True(t, reader.EnterIndexed(taskIdIndex))
var taskId int64
err = binary.Read(bytes.NewReader(reader.Next()), binary.BigEndian, &taskId)
assert.Nil(t, err)
assert.Equal(t, int64(-8963618267739109833), taskId)