Commit a1b3e6dc authored by Jonathan Mace's avatar Jonathan Mace

Repackage / migrate the atom layer code

parent 65fde953
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>brown.tracingplane</groupId>
<artifactId>atomlayer-baggagecontext</artifactId>
<packaging>jar</packaging>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>brown.tracingplane</groupId>
<artifactId>atomlayer-baggagecontext</artifactId>
<packaging>jar</packaging>
<name>Atom Layer - BaggageContext Impl</name>
<name>Atom Layer - BaggageContext Impl</name>
<parent>
<groupId>brown.tracingplane</groupId>
<artifactId>atomlayer</artifactId>
<version>1.0</version>
</parent>
<parent>
<groupId>brown.tracingplane</groupId>
<artifactId>atomlayer</artifactId>
<version>1.0</version>
</parent>
<dependencies>
<dependency>
<groupId>brown.tracingplane</groupId>
<artifactId>baggagecontext-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>brown.tracingplane</groupId>
<artifactId>atomlayer-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>brown.tracingplane</groupId>
<artifactId>atomlayer-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
</plugins>
</build>
<build>
<plugins>
</plugins>
</build>
</project>
package brown.tracingplane.impl;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import brown.tracingplane.BaggageContext;
import brown.tracingplane.lexicographic.Lexicographic;
import brown.tracingplane.lexicographic.ProtobufVarint;
/**
* <p>
* An implementation of {@link BaggageContext} based on atoms and lexicographic merge. {@link AtomContext} represents
* the minimal logic necessary to propagate {@link BaggageContext}s and participate in the tracing plane.
* </p>
*/
public class AtomContext implements BaggageContext {
/**
* Simple implementation of ref counting
*/
static class RefCount<T> {
T object;
volatile int count = 0;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<RefCount> reffer =
AtomicIntegerFieldUpdater.newUpdater(RefCount.class, "count");
public RefCount(T object) {
this.object = object;
}
public void ref() {
reffer.incrementAndGet(this);
}
void deref() {
if (reffer.decrementAndGet(this) == 0) {
object = null;
}
}
boolean exclusive() {
return count == 1;
}
}
RefCount<List<ByteBuffer>> atoms;
AtomContext() {}
AtomContext(List<ByteBuffer> atoms) {
this(new RefCount<>(atoms));
}
AtomContext(RefCount<List<ByteBuffer>> atoms) {
this.atoms = atoms;
this.atoms.ref();
}
void discard() {
atoms.deref();
atoms = null;
}
AtomContext branch() {
return new AtomContext(atoms);
}
AtomContext merge(AtomContext other) {
if (other == null || other.atoms == null || other.atoms.object == null) {
return this;
}
if (atoms == null || atoms.object == null) {
return this;
}
if (atoms.exclusive()) {
atoms.object = Lexicographic.merge(atoms.object, other.atoms.object);
other.discard();
return this;
}
if (other.atoms.exclusive()) {
return other.merge(this);
}
List<ByteBuffer> merged = Lexicographic.merge(atoms.object, other.atoms.object);
discard();
other.discard();
atoms = new RefCount<>(merged);
atoms.ref();
return this;
}
int serializedSize() {
int size = 0;
for (ByteBuffer atom : atoms.object) {
size += atom.remaining() + ProtobufVarint.sizeOf(atom.remaining());
}
return size;
}
List<ByteBuffer> atoms() {
return atoms == null ? null : atoms.object;
}
}
package brown.tracingplane.impl;
import java.nio.ByteBuffer;
import java.util.List;
import brown.tracingplane.BaggageContext;
import brown.tracingplane.BaggageProvider;
import brown.tracingplane.atomlayer.AtomLayerSerialization;
/**
* <p>
* An implementation of {@link BaggageContext} based on atoms and lexicographic merge. {@link AtomContext} represents
* the minimal logic necessary to propagate {@link BaggageContext}s and participate in the tracing plane.
* </p>
*/
public class AtomContextProvider implements BaggageProvider<AtomContext> {
@Override
public boolean isValid(BaggageContext baggageContext) {
return baggageContext == null || baggageContext instanceof AtomContext;
}
@Override
public AtomContext newInstance() {
return null;
}
@Override
public void discard(AtomContext baggageContext) {
if (baggageContext != null) {
baggageContext.discard();
}
}
@Override
public AtomContext branch(AtomContext from) {
return from == null ? null : from.branch();
}
@Override
public AtomContext join(AtomContext left, AtomContext right) {
return left == null ? right : left.merge(right);
}
@Override
public AtomContext deserialize(byte[] serialized, int offset, int length) {
return wrap(AtomLayerSerialization.deserialize(serialized, offset, length));
}
@Override
public AtomContext deserialize(ByteBuffer buf) {
return wrap(AtomLayerSerialization.deserialize(buf));
}
@Override
public byte[] serialize(AtomContext baggageContext) {
return AtomLayerSerialization.serialize(atoms(baggageContext));
}
@Override
public byte[] serialize(AtomContext baggageContext, int maximumSerializedSize) {
return AtomLayerSerialization.serialize(atoms(baggageContext), maximumSerializedSize);
}
AtomContext wrap(List<ByteBuffer> atoms) {
if (atoms == null || atoms.size() == 0) {
return null;
} else {
return new AtomContext(atoms);
}
}
List<ByteBuffer> atoms(AtomContext baggageContext) {
return baggageContext == null ? null : baggageContext.atoms();
}
}
package brown.tracingplane.impl;
import brown.tracingplane.BaggageContext;
import brown.tracingplane.BaggageProvider;
import brown.tracingplane.BaggageProviderFactory;
/**
* {@link BaggageProviderFactory} for {@link AtomContextProvider}
*/
public class AtomContextProviderFactory implements BaggageProviderFactory {
@Override
public BaggageProvider<? extends BaggageContext> provider() {
return new AtomContextProvider();
}
}
package brown.tracingplane.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.junit.Before;
import org.junit.Test;
import brown.tracingplane.BaggageContext;
import brown.tracingplane.BaggageProvider;
import brown.tracingplane.impl.AtomContext.RefCount;
public class TestAtomContext {
static AtomContextProvider provider = new AtomContextProvider();
@Test
public void testFactory() {
AtomContextProviderFactory factory = new AtomContextProviderFactory();
BaggageProvider<? extends BaggageContext> provider = factory.provider();
assertNotNull(provider);
assertTrue(provider instanceof AtomContextProvider);
}
@Test
public void testNullContext() {
assertNull(provider.newInstance());
assertNull(provider.branch(null));
assertNull(provider.join(null, null));
assertNotNull(provider.serialize(null));
assertEquals(0, provider.serialize(null).length);
assertNotNull(provider.serialize(null, 0));
assertEquals(0, provider.serialize(null, 0).length);
assertNull(provider.deserialize(null));
assertNull(provider.deserialize(null, 0, 0));
assertTrue(provider.isValid(null));
BaggageContext invalidContext = new BaggageContext() {};
assertFalse(provider.isValid(invalidContext));
}
private Random r;
@Before
public void initializeRandom() {
r = new Random(10);
}
private ArrayList<ByteBuffer> genAtoms(int numAtoms, int maxAtomSize) {
ArrayList<ByteBuffer> atoms = new ArrayList<>(numAtoms);
for (int i = 0; i < numAtoms; i++) {
int length = r.nextInt(maxAtomSize);
byte[] bytes = new byte[length];
r.nextBytes(bytes);
atoms.add(ByteBuffer.wrap(bytes));
}
return atoms;
}
@Test
public void testDiscard() {
// Create context
AtomContext ctx = new AtomContext(genAtoms(10, 10));
assertNotNull(ctx);
assertNotNull(ctx.atoms);
assertEquals(1, ctx.atoms.count);
assertNotNull(ctx.atoms.object);
// Discard, check
RefCount<List<ByteBuffer>> atoms = ctx.atoms;
provider.discard(ctx);
assertNull(ctx.atoms);
assertEquals(0, atoms.count);
assertNull(atoms.object);
}
@Test
public void testBranchDiscard() {
// Create context
AtomContext ctx = new AtomContext(genAtoms(10, 10));
AtomContext ctx_branched = provider.branch(ctx);
// Different contexts, same atoms
assertNotSame(ctx, ctx_branched);
assertSame(ctx.atoms, ctx_branched.atoms);
assertEquals(2, ctx.atoms.count);
// Discard branched copy, check refcount, and that atoms were discarded
provider.discard(ctx_branched);
assertEquals(1, ctx.atoms.count);
assertNull(ctx_branched.atoms);
// Discard original
RefCount<List<ByteBuffer>> atoms = ctx.atoms;
provider.discard(ctx);
assertNull(ctx.atoms);
assertEquals(0, atoms.count);
assertNull(atoms.object);
}
@Test
public void testJoinReuse() {
AtomContext ctx1 = new AtomContext(genAtoms(3, 10));
AtomContext ctx2 = new AtomContext(genAtoms(4, 10));
AtomContext ctx1_branched = provider.branch(ctx1);
AtomContext ctx2_branched = provider.branch(ctx2);
RefCount<List<ByteBuffer>> atoms1 = ctx1.atoms;
RefCount<List<ByteBuffer>> atoms2 = ctx2.atoms;
assertEquals(2, atoms1.count);
assertEquals(2, atoms2.count);
AtomContext ctx3 = provider.join(ctx1, ctx2);
assertNotNull(ctx3);
assertNotSame(atoms1, ctx3.atoms);
assertNotSame(atoms2, ctx3.atoms);
RefCount<List<ByteBuffer>> atoms3 = ctx3.atoms;
assertEquals(1, atoms1.count);
assertEquals(1, atoms2.count);
assertEquals(1, atoms3.count);
// Context 1 will have been reused, context 2 nulled out
assertNull(ctx2.atoms);
assertEquals(ctx1, ctx3);
}
@Test
public void testSerialize() {
AtomContext ctx1 = new AtomContext(genAtoms(3, 10));
byte[] serialized = provider.serialize(ctx1);
AtomContext ctx2 = provider.deserialize(ByteBuffer.wrap(serialized));
assertNotSame(ctx1, ctx2);
assertNotSame(ctx1.atoms, ctx2.atoms);
assertEquals(1, ctx1.atoms.count);
assertEquals(1, ctx2.atoms.count);
assertEquals(ctx1.atoms.object, ctx2.atoms.object);
}
}
......@@ -12,14 +12,33 @@
<artifactId>atomlayer</artifactId>
<version>1.0</version>
</parent>
<dependencies>
<dependency>
<groupId>brown.tracingplane</groupId>
<artifactId>baggagecontext-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>brown.tracingplane</groupId>
<artifactId>baggagecontext-staticapi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
......
package brown.tracingplane.atomlayer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import brown.tracingplane.lexicographic.Lexicographic;
/**
* Contains static methods for trimming baggage atoms to size extents.
*/
public class AtomLayerOverflow {
public static final ByteBuffer OVERFLOW_MARKER = ByteBuffer.allocate(0);
private AtomLayerOverflow() {}
static final class TrimExtent {
int atomCount = 0;
int serializedSize = 0;
boolean overflow = false;
}
static TrimExtent determineTrimExtent(List<ByteBuffer> atoms, int limit) {
TrimExtent extent = new TrimExtent();
if (limit <= 0) {
extent.atomCount = atoms.size();
extent.serializedSize = AtomLayerSerialization.serializedSize(atoms);
} else {
int overflowMarkerSize = AtomLayerSerialization.serializedSize(OVERFLOW_MARKER);
for (int i = 0; i < atoms.size(); i++) {
int nextSize = extent.serializedSize + AtomLayerSerialization.serializedSize(atoms.get(i));
if (nextSize <= limit - overflowMarkerSize) {
extent.atomCount++;
extent.serializedSize = nextSize;
} else if (i == atoms.size() - 1 && nextSize <= limit) {
extent.atomCount++;
extent.serializedSize = nextSize;
} else {
extent.serializedSize += overflowMarkerSize;
extent.overflow = true;
break;
}
}
}
return extent;
}
/**
* <p>
* If the serialized size of <code>atoms</code> is {@code <=limit} then this method returns <code>atoms</code> with
* no modifications. Otherwise, this method returns the maximum prefix of <code>atoms</code> such that its
* serialized size will be {@code <=limit}. The returned prefix will also have {@link #OVERFLOW_MARKER} appended to
* the end. {@link #OVERFLOW_MARKER} is taken into account when calculating serialized size, so the return value of
* this method is guaranteed to be serialized to {@code <=limit} bytes.
* </p>
*
* @param atoms a list of atoms, possibly null
* @param limit the maximum serialized size of <code>atoms</code>
* @return <code>atoms</code> if the serialized size of <code>atoms</code> is {@code <= limit}, otherwise a prefix
* of <code>atoms</code> {@link #OVERFLOW_MARKER}. Returns null if atoms is null
*/
public static List<ByteBuffer> trimToSize(List<ByteBuffer> atoms, int limit) {
if (atoms == null) {
return null;
}
TrimExtent extent = determineTrimExtent(atoms, limit);
if (extent.overflow) {
List<ByteBuffer> subList = new ArrayList<>(extent.atomCount + 1);
subList.addAll(atoms.subList(0, extent.atomCount));
subList.add(OVERFLOW_MARKER);
return subList;
} else {
return atoms;
}
}
public static List<ByteBuffer> trimToFirstOverflow(List<ByteBuffer> atoms) {
if (atoms == null) {
return null;
}
int overflowAt = atoms.indexOf(OVERFLOW_MARKER);
if (overflowAt < 0 || overflowAt >= atoms.size() - 1) {
return atoms;
} else {
return atoms.subList(0, overflowAt + 1);
}
}
/**
* Merge the provided atoms until an overflow marker is encountered. Include the encountered overflow marker then
* stop.
*/
public static List<ByteBuffer> mergeOverflowAtoms(List<ByteBuffer> a, List<ByteBuffer> b) {
if (a == null) {
return trimToFirstOverflow(b);
} else if (b == null) {
return trimToFirstOverflow(a);
}
int ia = 0, ib = 0, size_a = a.size(), size_b = b.size();
final List<ByteBuffer> merged = new ArrayList<>(size_a + size_b);
ByteBuffer previous = null;
boolean different = false;
while (ia < size_a && ib < size_b && !OVERFLOW_MARKER.equals(previous)) {
int comparison = Lexicographic.compare(a.get(ia), b.get(ib));
if (comparison == 0) {
merged.add(previous = a.get(ia));
ia++;
ib++;
} else if (comparison < 0) {
merged.add(previous = a.get(ia));
ia++;
different = true;
} else if (comparison > 0) {
merged.add(previous = b.get(ib));
ib++;
different = true;
}
}
while (ia < size_a && !OVERFLOW_MARKER.equals(previous)) {
merged.add(previous = a.get(ia++));
}