Commit 05c6a781 authored by DEPRECATED (Jonathan Mace) (Use @JonathanMace instead)'s avatar DEPRECATED (Jonathan Mace) (Use @JonathanMace instead)
Browse files

Initial commit of minimal implementation.

parents
cmake_minimum_required(VERSION 3.5.1)
project(libxtrace)
include_directories( include )
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 ")
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
find_package(Boost COMPONENTS system filesystem REQUIRED)
find_package(Protobuf REQUIRED)
add_library (libxtrace SHARED
src/atomlayer.cpp
src/thread_local_baggage.cpp
src/pubsub.cpp
src/xtrace.cpp
src/xtrace.pb.cc
src/xtrace_baggage.cpp
src/lexvarint.cpp
)
set_target_properties(libxtrace
PROPERTIES
OUTPUT_NAME "xtrace"
PREFIX "lib"
)
target_link_libraries(libxtrace Threads::Threads ${Protobuf_LIBRARIES} ${Boost_LIBRARIES} )
# Introduce variables:
# * CMAKE_INSTALL_LIBDIR
# * CMAKE_INSTALL_BINDIR
# * CMAKE_INSTALL_INCLUDEDIR
include(GNUInstallDirs)
install(DIRECTORY "include"
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/xtrace"
FILES_MATCHING
PATTERN "*.h")
install(TARGETS libxtrace DESTINATION "${CMAKE_INSTALL_LIBDIR}")
#ifndef _ATOM_LAYER_H_
#define _ATOM_LAYER_H_
#include <iostream>
#include <cstdint>
#include <cstddef>
#include <vector>
struct Atom {
std::vector<uint8_t> bytes;
Atom() : bytes(0) {}
Atom(int x) : bytes(4) {
for (unsigned i = 0; i < 4; i++) {
bytes[3-i] = (x >> (i * 8));
}
}
Atom(std::vector<uint8_t> bytes) : bytes(bytes) {}
const int compare(const Atom& other);
friend std::ostream& operator<< ( std::ostream& os, const Atom& atom );
};
struct Baggage {
std::vector<Atom> atoms;
Baggage() : atoms(0) {}
Baggage branch();
static Baggage merge(Baggage& a, Baggage& b);
std::vector<uint8_t> serialize();
static Baggage deserialize(std::vector<uint8_t> bytes);
friend std::ostream& operator<< ( std::ostream& os, const Baggage& baggage );
};
#endif
\ No newline at end of file
#ifndef _BAGGAGE_PROTOCOL_H_
#define _BAGGAGE_PROTOCOL_H_
#include "atomlayer.h"
#include "lexvarint.h"
#define ATOM_TYPE_MASK 0x80
#define ATOM_TYPE_HEADER 0x80
#define ATOM_TYPE_DATA 0x00
#define HEADER_TYPE_MASK 0x03
#define HEADER_TYPE_INDEXED 0x00
#define HEADER_TYPE_KEYED 0x02
namespace BaggageProtocol {
inline bool IsHeader(Atom &atom){
return atom.bytes.size() != 0 && (atom.bytes[0] & ATOM_TYPE_MASK) == ATOM_TYPE_HEADER;
}
inline bool IsData(Atom &atom) {
return atom.bytes.size() != 0 && (atom.bytes[0] & ATOM_TYPE_MASK) == ATOM_TYPE_DATA;
}
inline bool IsIndexedHeader(Atom &atom) {
return atom.bytes.size() != 0 && (atom.bytes[0] & HEADER_TYPE_MASK) == HEADER_TYPE_INDEXED;
}
inline bool IsKeyedHeader(Atom &atom) {
return atom.bytes.size() != 0 && (atom.bytes[0] & HEADER_TYPE_MASK) == HEADER_TYPE_KEYED;
}
inline int HeaderLevel(Atom &atom) {
if (atom.bytes.size() == 0) {
return -1;
} else {
return 15 - int((atom.bytes[0] & 0x78) >> 3);
}
}
inline uint64_t HeaderIndex(Atom &atom) {
if (atom.bytes.size() == 0) {
return -1;
}
int n = 1;
uint64_t header_index = DecodeUnsignedLexVarint(atom.bytes, n);
return n == 1 ? -1 : header_index;
}
inline std::vector<uint8_t> Payload(Atom &atom) {
if (atom.bytes.size() > 0) {
return std::vector<uint8_t>(atom.bytes.begin()+1, atom.bytes.end());
}
}
inline std::vector<uint8_t> HeaderKey(Atom &atom) {
return Payload(atom);
}
inline Atom MakeIndexedHeader(int level, uint64_t index) {
uint8_t prefix = 0x80 | ((((uint8_t) (15 - level)) << 3) & 0x78) | 0x00;
std::vector<uint8_t> payload = EncodeUnsignedLexVarint(index);
payload.insert(payload.begin(), prefix);
return Atom(payload);
}
inline Atom MakeKeyedHeader(int level, std::vector<uint8_t> key) {
uint8_t prefix = 0x80 | ((((uint8_t) (15 - level)) << 3) & 0x78) | 0x04;
key.insert(key.begin(), prefix);
return Atom(key);
}
inline Atom MakeDataAtom(std::vector<uint8_t> payload) {
payload.insert(payload.begin(), 0x00);
return Atom(payload);
}
inline bool findBag(std::vector<Atom> &atoms, Atom bagKey, int &startIndex, int &endIndex) {
bool foundStart = false;
while (startIndex < atoms.size() && startIndex < endIndex) {
int comparison = atoms[startIndex].compare(bagKey);
if (comparison == 0) {
foundStart = true;
break;
} else if (comparison > 0) {
endIndex = startIndex;
break;
}
startIndex++;
}
if (!foundStart) {
return false;
}
// Find the end of the bag
int end = startIndex+1;
while (end < atoms.size() && end < endIndex) {
int comparison = atoms[end].compare(bagKey);
if (comparison > 0) {
break;
}
end++;
}
endIndex = end;
return true;
}
inline std::vector<Atom> getBag(std::vector<Atom> &atoms, Atom key) {
int startIndex = 0;
int endIndex = atoms.size();
findBag(atoms, key, startIndex, endIndex);
if (endIndex > startIndex) {
return std::vector<Atom>(atoms.begin()+startIndex+1, atoms.begin()+endIndex);
} else {
return std::vector<Atom>();
}
}
inline std::vector<Atom> removeBag(std::vector<Atom> &atoms, Atom key) {
int startIndex = 0;
int endIndex = atoms.size();
if (findBag(atoms, key, startIndex, endIndex)) {
std::vector<Atom> bag(atoms.begin()+startIndex+1, atoms.begin()+endIndex);
atoms.erase(atoms.begin()+startIndex, atoms.begin()+endIndex);
return bag;
} else {
return std::vector<Atom>();
}
}
}
#endif
\ No newline at end of file
#ifndef _LEXVARINT_H_
#define _LEXVARINT_H_
#include <cstdint>
#include <vector>
int SizeUnsignedLexVarint(uint64_t v);
int SizeSignedLexVarint(int64_t v);
std::vector<uint8_t> EncodeUnsignedLexVarint(uint64_t v);
std::vector<uint8_t> EncodeSignedLexVarint(int64_t v);
uint64_t DecodeUnsignedLexVarint(std::vector<uint8_t> &bytes, int &n);
int64_t DecodeSignedLexVarint(std::vector<uint8_t> &bytes, int &n);
#endif
\ No newline at end of file
#ifndef _PUBSUB_H_
#define _PUBSUB_H_
#include <boost/asio.hpp>
#include <string>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <unistd.h>
#include <thread>
// Statically define these variables for now
#define QUEUE_MAX_MESSAGES 1000
#define PUBLISHER_CONNECT_RETRY_TIMEOUT_MICROS 1000000
#define DEFAULT_SERVER_ADDRESS "localhost"
#define DEFAULT_SERVER_PORT "5563"
typedef std::pair<std::string, std::string> PubSubMessage;
class PubSubClient {
public:
PubSubClient(std::string server_address, std::string server_port);
// Starts the publisher thread. Not necessary to call explicitly; publisher thread is automatically created when publishing.
void start();
void shutdown();
void join();
void publish(std::string topic, std::string message);
private:
std::string server_hostname, server_port;
std::queue<PubSubMessage> queue;
std::mutex lock;
std::condition_variable condition;
std::thread* publisher_thread = nullptr;
volatile bool is_shutdown = false;
void publisher_main();
void publish_loop(boost::asio::ip::tcp::socket &socket);
};
class PubSub {
public:
static void publish(std::string topic, std::string message);
static void shutdown();
static void join();
private:
PubSub() {}
static PubSubClient* client;
};
#endif
\ No newline at end of file
#ifndef _THREAD_LOCAL_BAGGAGE_H_
#define _THREAD_LOCAL_BAGGAGE_H_
#include "atomlayer.h"
namespace ThreadLocalBaggage {
Baggage& Get(); // Get the current thread's baggage
Baggage Take(); // Get the current thread's baggage, and clear the thread-local storage
Baggage Branch(); // Get a copy of the current thread's baggage
void Delete(); // Delete the current thread's baggage
void Set(Baggage new_baggage);
void Join(Baggage &otherBaggage);
}
#endif
\ No newline at end of file
#ifndef _XTRACE_H_
#define _XTRACE_H_
#include <string>
#include <vector>
#define XTRACE_REPORT_PROTOBUF_TOPIC "xtpb"
#define __XTRACE_FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
// Macro overloading fuckery
#define XTRACE1(msg) XTrace::Logger(__XTRACE_FILENAME__).log(msg, __FILE__, __LINE__)
#define XTRACE2(logger, msg) logger.log(msg, __FILE__, __LINE__)
#define GET_XTRACE_MACRO(_1,_2,NAME,...) NAME
#define XTRACE(...) GET_XTRACE_MACRO(__VA_ARGS__,XTRACE2,XTRACE1)(__VA_ARGS__)
namespace XTrace {
class Logger {
public:
const std::string agent;
Logger(std::string agent) : agent(agent) {}
void log(std::string message);
void log(std::string message, std::string file, int line);
};
// Start a new trace, with a randomly generated taskID
void StartTrace();
void StartTrace(std::string tag);
void StartTrace(std::vector<std::string> tags);
Logger GetLogger(std::string agent);
void log(std::string message);
void setProcessName(std::string processName);
}
#endif
\ No newline at end of file
This diff is collapsed.
#ifndef _XTRACE_BAGGAGE_H_
#define _XTRACE_BAGGAGE_H_
#include "thread_local_baggage.h"
namespace XTraceBaggage {
void Clear();
bool HasTaskID();
uint64_t GetTaskID();
void SetTaskID(uint64_t taskId);
std::vector<uint64_t> GetParentEventIDs();
void SetParentEventID(uint64_t eventId);
}
#endif
\ No newline at end of file
//package pubsub;
option java_package = "edu.brown.cs.systems.xtrace";
option java_outer_classname = "Reporting";
// The report specification for X-Trace version 4.
message XTraceReportv4 {
// X-Trace related fields
optional sfixed64 task_id = 1; // ID of the execution this report belongs to
optional sfixed64 event_id = 2; // ID of this report
repeated sfixed64 parent_event_id = 3; // ID of causally-preceding reports
// Timestamp fields
optional int64 timestamp = 4; // Epoch time in milliseconds
optional int64 hrt = 5; // Process-dependent high-resolution timer in nanoseconds
optional int64 cycles = 6; // Thread-dependent CPU cycle timer
// Report source fields
optional string host = 7; // Hostname
optional int32 process_id = 8; // Numeric process ID
optional string process_name = 9; // Process name
optional int32 thread_id = 10; // Numeric thread ID
optional string threadName = 11; // Thread name
optional string agent = 12; // An arbitrary string, usually a java class, where this report was generated
optional string source = 13; // The source code location where this report was generated
// Messages
optional string label = 14; // String text message
repeated string key = 15; // keys for custom fields; only add a key if you will also add a value
repeated string value = 16; // keys for custom values; only add a value if you also added a key
repeated string tags = 17; // Database tags
// X-Trace Tenant field
optional int32 tenantClass = 18; // Tenant ID
}
#include "atomlayer.h"
#include <iostream>
#include <cstdint>
#include <cstddef>
#include <vector>
std::vector<uint8_t> encodeVarint(uint64_t x) {
std::vector<uint8_t> s;
while (x >= 128) {
uint8_t c = x | 0x80;
s.push_back(c);
x >>= 7;
}
s.push_back(x);
return s;
}
uint64_t decodeVarint(std::vector<uint8_t> s, unsigned &n) {
uint64_t x = 0;
for (unsigned shift = 0; shift < 64; shift += 7) {
if (n > s.size()) {
return 0;
}
uint8_t b = s[n];
n++;
x |= ((uint64_t) (b & 0x7F)) << shift;
if ((b & 0x80) == 0) {
return x;
}
}
}
const int Atom::compare(const Atom& other) {
for (unsigned i = 0; i < bytes.size() && i < other.bytes.size(); i++) {
if (bytes[i] < other.bytes[i]) {
return -1;
} else if (bytes[i] > other.bytes[i]) {
return 1;
}
}
if (bytes.size() < other.bytes.size()) {
return -1;
} else if (bytes.size() > other.bytes.size()) {
return 1;
} else {
return 0;
}
}
std::ostream& operator<< ( std::ostream& os, const Atom& atom ) {
os << "[";
bool first = true;
for (unsigned i = 0; i < atom.bytes.size(); i++) {
if (!first) os << " ";
first = false;
os << unsigned(atom.bytes[i]);
}
if (first) os << "-";
os << "]";
}
Baggage Baggage::branch() {
Baggage duplicate;
duplicate.atoms = atoms;
return duplicate;
}
Baggage Baggage::merge(Baggage& a, Baggage& b) {
Baggage merged;
unsigned i = 0, j = 0;
while (i < a.atoms.size() && j < b.atoms.size()) {
switch(a.atoms[i].compare(b.atoms[j])) {
case -1: merged.atoms.push_back(a.atoms[i]); i++; break;
case 0: merged.atoms.push_back(a.atoms[i]); i++; j++; break;
case 1: merged.atoms.push_back(b.atoms[j]); j++; break;
}
}
for (; i < a.atoms.size(); i++) {
merged.atoms.push_back(a.atoms[i]);
}
for (; j < b.atoms.size(); j++) {
merged.atoms.push_back(b.atoms[j]);
}
return merged;
}
std::vector<uint8_t> Baggage::serialize() {
std::vector<uint8_t> bytes;
for (unsigned i = 0; i < atoms.size(); i++) {
std::vector<uint8_t> size_prefix = encodeVarint(atoms[i].bytes.size());
bytes.insert(bytes.end(), size_prefix.begin(), size_prefix.end());
bytes.insert(bytes.end(), atoms[i].bytes.begin(), atoms[i].bytes.end());
}
return bytes;
}
Baggage Baggage::deserialize(std::vector<uint8_t> bytes) {
Baggage baggage;
unsigned n = 0;
while (n < bytes.size()) {
uint64_t next_atom_size = decodeVarint(bytes, n);
Atom a(std::vector<uint8_t>(bytes.begin() + n, bytes.begin() + n + next_atom_size));
baggage.atoms.push_back(a);
n += next_atom_size;
}
return baggage;
}
std::ostream& operator<< ( std::ostream& os, const Baggage& baggage ) {
os << "[";
bool first = true;
for (unsigned i = 0; i < baggage.atoms.size(); i++) {
if (!first) os << " ";
first = false;
os << baggage.atoms[i];
}
os << "]";
}
\ No newline at end of file
#include "baggageprotocol.h"
#include "lexvarint.h"
#include <functional>
#include <iostream>
int SizeUnsignedLexVarint(uint64_t v) {
if ((v & 0xFFFFFFFFFFFFFF80) == 0) return 1;
if ((v & 0xFFFFFFFFFFFFC000) == 0) return 2;
if ((v & 0xFFFFFFFFFFE00000) == 0) return 3;
if ((v & 0xFFFFFFFFF0000000) == 0) return 4;
if ((v & 0xFFFFFFF800000000) == 0) return 5;
if ((v & 0xFFFFFC0000000000) == 0) return 6;
if ((v & 0xFFFE000000000000) == 0) return 7;
if ((v & 0xFF00000000000000) == 0) return 8;
return 9;
}
int SizeSignedLexVarint(int64_t v) {
if (v < 0) v = (-v + 1);
if ((v & 0xFFFFFFFFFFFFFFC0) == 0) return 1;
if ((v & 0xFFFFFFFFFFFFE000) == 0) return 2;
if ((v & 0xFFFFFFFFFFF00000) == 0) return 3;
if ((v & 0xFFFFFFFFF8000000) == 0) return 4;
if ((v & 0xFFFFFFFC00000000) == 0) return 5;
if ((v & 0xFFFFFE0000000000) == 0) return 6;
if ((v & 0xFFFF000000000000) == 0) return 7;
if ((v & 0xFF80000000000000) == 0) return 8;
return 9;
}
std::vector<uint8_t> EncodeUnsignedLexVarint(uint64_t v) {
int size = SizeUnsignedLexVarint(v);
std::vector<uint8_t> bytes(size);
for (int i = size-1; i >= 0; i--) {