Commit a79cf43c authored by Vaastav Anand's avatar Vaastav Anand

Add instrumentation for cast info service

parent 21ca0db9
......@@ -15,6 +15,8 @@
#include "../ThriftClient.h"
#include "../logger.h"
#include "../tracing.h"
#include <xtrace/xtrace.h>
#include <xtrace/baggage.h>
namespace media_service {
......@@ -51,6 +53,17 @@ void CastInfoHandler::WriteCastInfo(
bool gender,
const std::string &intro,
const std::map<std::string, std::string> &carrier) {
std::map<std::string, std::string>::const_iterator baggage_it = carrier.find("baggage");
if (baggage_it != carrier.end()) {
SET_CURRENT_BAGGAGE(Baggage::deserialize(baggage_it->second));
}
if (!XTrace::IsTracing()) {
XTrace::StartTrace("CastInfoHandler");
}
XTRACE("CastInfoHandler::WriteCastInfo", {{"RequestID", std::to_string(req_id)}});
// Initialize a span
TextMapReader reader(carrier);
std::map<std::string, std::string> writer_text_map;
......@@ -73,6 +86,7 @@ void CastInfoHandler::WriteCastInfo(
ServiceException se;
se.errorCode = ErrorCode::SE_MONGODB_ERROR;
se.message = "Failed to pop a client from MongoDB pool";
XTRACE("Failed to pop a client from MongoDB pool");
throw se;
}
auto collection = mongoc_client_get_collection(
......@@ -81,11 +95,13 @@ void CastInfoHandler::WriteCastInfo(
ServiceException se;
se.errorCode = ErrorCode::SE_MONGODB_ERROR;
se.message = "Failed to create collection cast-info from DB cast-info";
XTRACE("Failed to create collection cast-info from DB cast-info");
mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
throw se;
}
bson_error_t error;
XTRACE("Inserting cast-info to MongoDB");
auto insert_span = opentracing::Tracer::Global()->StartSpan(
"MongoInsertCastInfo", { opentracing::ChildOf(&span->context()) });
bool plotinsert = mongoc_collection_insert_one (
......@@ -97,17 +113,22 @@ void CastInfoHandler::WriteCastInfo(
ServiceException se;
se.errorCode = ErrorCode::SE_MONGODB_ERROR;
se.message = error.message;
XTRACE("Failed to insert cast-info to MongoDB");
bson_destroy(new_doc);
mongoc_collection_destroy(collection);
mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
throw se;
}
XTRACE("Successfully inserted cast-info to MongoDB");
bson_destroy(new_doc);
mongoc_collection_destroy(collection);
mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
span->Finish();
XTRACE("CastInfoHandler::WriteCastInfo complete");
DELETE_CURRENT_BAGGAGE();
}
void CastInfoHandler::ReadCastInfo(
......@@ -116,6 +137,16 @@ void CastInfoHandler::ReadCastInfo(
const std::vector<int64_t> &cast_info_ids,
const std::map<std::string, std::string> &carrier) {
std::map<std::string, std::string>::const_iterator baggage_it = carrier.find("baggage");
if (baggage_it != carrier.end()) {
SET_CURRENT_BAGGAGE(Baggage::deserialize(baggage_it->second));
}
if (!XTrace::IsTracing()) {
XTrace::StartTrace("CastInfoHandler");
}
XTRACE("CastInfoHandler::ReadCastInfo", {{"RequestID", std::to_string(req_id)}});
// Initialize a span
TextMapReader reader(carrier);
std::map<std::string, std::string> writer_text_map;
......@@ -127,6 +158,7 @@ void CastInfoHandler::ReadCastInfo(
opentracing::Tracer::Global()->Inject(span->context(), writer);
if (cast_info_ids.empty()) {
XTRACE("Empty cast info ids");
return;
}
......@@ -135,6 +167,7 @@ void CastInfoHandler::ReadCastInfo(
ServiceException se;
se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
se.message = "cast_info_ids are duplicated";
XTRACE("Cast Info Ids are duplicated");
throw se;
}
......@@ -146,6 +179,7 @@ void CastInfoHandler::ReadCastInfo(
ServiceException se;
se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
se.message = "Failed to pop a client from memcached pool";
XTRACE("Failed to pop a client from memcached pool");
throw se;
}
char** keys;
......@@ -166,6 +200,7 @@ void CastInfoHandler::ReadCastInfo(
<< memcached_strerror(memcached_client, memcached_rc);
ServiceException se;
se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
XTRACE("Cannot get cast_info_ids of request " + std::to_string(req_id) + ": " + memcached_strerror(memcached_client, memcached_rc));
se.message = memcached_strerror(memcached_client, memcached_rc);
memcached_pool_push(_memcached_client_pool, memcached_client);
throw se;
......@@ -176,12 +211,14 @@ void CastInfoHandler::ReadCastInfo(
char *return_value;
size_t return_value_length;
uint32_t flags;
XTRACE("MEMCACHED mget start");
auto get_span = opentracing::Tracer::Global()->StartSpan(
"MmcMgetCastInfo", { opentracing::ChildOf(&span->context()) });
while (true) {
return_value = memcached_fetch(memcached_client, return_key,
&return_key_length, &return_value_length, &flags, &memcached_rc);
if (return_value == nullptr) {
XTRACE("Memcached mget finished");
LOG(debug) << "Memcached mget finished";
break;
}
......@@ -189,6 +226,7 @@ void CastInfoHandler::ReadCastInfo(
free(return_value);
memcached_quit(memcached_client);
memcached_pool_push(_memcached_client_pool, memcached_client);
XTRACE("Cannot get components of request " + std::to_string(req_id));
LOG(error) << "Cannot get components of request " << req_id;
ServiceException se;
se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
......@@ -217,9 +255,11 @@ void CastInfoHandler::ReadCastInfo(
std::vector<std::future<void>> set_futures;
std::map<int64_t, std::string> cast_info_json_map;
std::vector<Baggage> set_baggages;
// Find the rest in MongoDB
if (!cast_info_ids_not_cached.empty()) {
XTRACE("Finding the rest of the cast info in MongoDB");
bson_t *query = bson_new();
bson_t query_child;
bson_t query_cast_info_id_list;
......@@ -242,6 +282,7 @@ void CastInfoHandler::ReadCastInfo(
ServiceException se;
se.errorCode = ErrorCode::SE_MONGODB_ERROR;
se.message = "Failed to pop a client from MongoDB pool";
XTRACE("Failed to pop a client from MongoDB pool");
throw se;
}
auto collection = mongoc_client_get_collection(
......@@ -250,6 +291,7 @@ void CastInfoHandler::ReadCastInfo(
ServiceException se;
se.errorCode = ErrorCode::SE_MONGODB_ERROR;
se.message = "Failed to create collection user from DB user";
XTRACE("Failed to create collection user from DB user");
mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
throw se;
}
......@@ -290,6 +332,7 @@ void CastInfoHandler::ReadCastInfo(
ServiceException se;
se.errorCode = ErrorCode::SE_MONGODB_ERROR;
se.message = error.message;
XTRACE(error.message);
throw se;
}
bson_destroy(query);
......@@ -297,9 +340,12 @@ void CastInfoHandler::ReadCastInfo(
mongoc_collection_destroy(collection);
mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
Baggage set_future_baggage = BRANCH_CURRENT_BAGGAGE();
set_baggages.emplace_back(set_future_baggage);
// Upload cast-info to memcached
set_futures.emplace_back(std::async(std::launch::async, [&]() {
memcached_return_t _rc;
BAGGAGE(set_future_baggage);
auto _memcached_client = memcached_pool_pop(
_memcached_client_pool, true, &_rc);
if (!_memcached_client) {
......@@ -307,8 +353,10 @@ void CastInfoHandler::ReadCastInfo(
ServiceException se;
se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
se.message = "Failed to pop a client from memcached pool";
XTRACE("Failed to pop a client from memcached pool");
throw se;
}
XTRACE("MemcachedSetCastInfo start");
auto set_span = opentracing::Tracer::Global()->StartSpan(
"MmcSetCastInfo", {opentracing::ChildOf(&span->context())});
for (auto & it : cast_info_json_map) {
......@@ -324,19 +372,25 @@ void CastInfoHandler::ReadCastInfo(
}
memcached_pool_push(_memcached_client_pool, _memcached_client);
set_span->Finish();
XTRACE("MemcachedSetPost complete");
}));
}
if (return_map.size() != cast_info_ids.size()) {
try {
for (auto &it : set_futures) { it.get(); }
for (std::vector<int>::size_type i = 0; i < set_futures.size(); i++) {
set_futures[i].get();
JOIN_CURRENT_BAGGAGE(set_baggages[i]);
}
} catch (...) {
LOG(warning) << "Failed to set cast-info to memcached";
XTRACE("Failed to set cast-info to memcached");
}
LOG(error) << "cast-info-service return set incomplete";
ServiceException se;
se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
se.message = "cast-info-service return set incomplete";
XTRACE("cast-info-service return set incomplete");
throw se;
}
......@@ -345,10 +399,17 @@ void CastInfoHandler::ReadCastInfo(
}
try {
for (auto &it : set_futures) { it.get(); }
for (std::vector<int>::size_type i = 0; i < set_futures.size(); i++) {
set_futures[i].get();
JOIN_CURRENT_BAGGAGE(set_baggages[i]);
}
} catch (...) {
LOG(warning) << "Failed to set cast-info to memcached";
XTRACE("Failed to set cast-info to memcached");
}
XTRACE("CastInfoHandler::ReadCastInfo complete");
DELETE_CURRENT_BAGGAGE();
}
} // namespace media_service
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment