MovieIdHandler.h 14.6 KB
Newer Older
Yu Gan's avatar
Yu Gan committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#ifndef MEDIA_MICROSERVICES_MOVIEIDHANDLER_H
#define MEDIA_MICROSERVICES_MOVIEIDHANDLER_H

#include <iostream>
#include <string>
#include <future>

#include <mongoc.h>
#include <libmemcached/memcached.h>
#include <libmemcached/util.h>
#include <bson/bson.h>

#include "../../gen-cpp/MovieIdService.h"
#include "../../gen-cpp/ComposeReviewService.h"
#include "../../gen-cpp/RatingService.h"
#include "../ClientPool.h"
#include "../ThriftClient.h"
#include "../logger.h"
#include "../tracing.h"
20 21
#include <xtrace/xtrace.h>
#include <xtrace/baggage.h>
Yu Gan's avatar
Yu Gan committed
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62


namespace media_service {

class MovieIdHandler : public MovieIdServiceIf {
 public:
  MovieIdHandler(
      memcached_pool_st *,
      mongoc_client_pool_t *,
      ClientPool<ThriftClient<ComposeReviewServiceClient>> *,
      ClientPool<ThriftClient<RatingServiceClient>> *);
  ~MovieIdHandler() override = default;
  void UploadMovieId(int64_t, const std::string &, int32_t,
                     const std::map<std::string, std::string> &) override;
  void RegisterMovieId(int64_t, const std::string &, const std::string &,
                       const std::map<std::string, std::string> &) override;

 private:
  memcached_pool_st *_memcached_client_pool;
  mongoc_client_pool_t *_mongodb_client_pool;
  ClientPool<ThriftClient<ComposeReviewServiceClient>> *_compose_client_pool;
  ClientPool<ThriftClient<RatingServiceClient>> *_rating_client_pool;
};

MovieIdHandler::MovieIdHandler(
    memcached_pool_st *memcached_client_pool,
    mongoc_client_pool_t *mongodb_client_pool,
    ClientPool<ThriftClient<ComposeReviewServiceClient>> *compose_client_pool,
    ClientPool<ThriftClient<RatingServiceClient>> *rating_client_pool) {
  _memcached_client_pool = memcached_client_pool;
  _mongodb_client_pool = mongodb_client_pool;
  _compose_client_pool = compose_client_pool;
  _rating_client_pool = rating_client_pool;
}

void MovieIdHandler::UploadMovieId(
    int64_t req_id,
    const std::string &title,
    int32_t rating,
    const std::map<std::string, std::string> & carrier) {

63 64 65 66 67 68 69 70 71 72
  std::map<std::string, std::string>::const_iterator baggage_it = carrier.find("baggage");
  if (baggage_it != carrier.end()) {
    SET_CURRENT_BAGGAGE(Baggage::deserialize(baggage_it->second));
  }

  if (!XTrace::IsTracing()) {
    XTrace::StartTrace("MovieIdHandler");
  }
  XTRACE("MovieIdHandler::UploadMovieId", {{"RequestID", std::to_string(req_id)}});

Yu Gan's avatar
Yu Gan committed
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
  // Initialize a span
  TextMapReader reader(carrier);
  std::map<std::string, std::string> writer_text_map;
  TextMapWriter writer(writer_text_map);
  auto parent_span = opentracing::Tracer::Global()->Extract(reader);
  auto span = opentracing::Tracer::Global()->StartSpan(
      "UploadMovieId",
      { opentracing::ChildOf(parent_span->get()) });
  opentracing::Tracer::Global()->Inject(span->context(), writer);

  memcached_return_t memcached_rc;
  memcached_st *memcached_client = memcached_pool_pop(
      _memcached_client_pool, true, &memcached_rc);
  if (!memcached_client) {
    ServiceException se;
    se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
    se.message = "Failed to pop a client from memcached pool";
90
    XTRACE("Failed to pop a client from memcached pool");
Yu Gan's avatar
Yu Gan committed
91 92 93 94 95
    throw se;
  }

  size_t movie_id_size;
  uint32_t memcached_flags;
96
  XTRACE("Looking for the movie id in memcached");
Yu Gan's avatar
Yu Gan committed
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
  // Look for the movie id from memcached

  auto get_span = opentracing::Tracer::Global()->StartSpan(
      "MmcGetMovieId", { opentracing::ChildOf(&span->context()) });

  char* movie_id_mmc = memcached_get(
      memcached_client,
      title.c_str(),
      title.length(),
      &movie_id_size,
      &memcached_flags,
      &memcached_rc);
  if (!movie_id_mmc && memcached_rc != MEMCACHED_NOTFOUND) {
    ServiceException se;
    se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
    se.message = memcached_strerror(memcached_client, memcached_rc);
    memcached_pool_push(_memcached_client_pool, memcached_client);
    throw se;
  }
  get_span->Finish();
  memcached_pool_push(_memcached_client_pool, memcached_client);
  std::string movie_id_str;

  // If cached in memcached
  if (movie_id_mmc) {
    LOG(debug) << "Get movie_id " << movie_id_mmc
        << " cache hit from Memcached";
    movie_id_str = std::string(movie_id_mmc);
125
    XTRACE("Cache hit in Memcached for movie_id " + std::string(movie_id_mmc));
Yu Gan's avatar
Yu Gan committed
126 127 128 129 130
    free(movie_id_mmc);
  }

    // If not cached in memcached
  else {
131
    XTRACE("Cache miss in Memcached for movie_id. Looking for movie in MongoDB");
Yu Gan's avatar
Yu Gan committed
132 133 134 135 136 137
    mongoc_client_t *mongodb_client = mongoc_client_pool_pop(
        _mongodb_client_pool);
    if (!mongodb_client) {
      ServiceException se;
      se.errorCode = ErrorCode::SE_MONGODB_ERROR;
      se.message = "Failed to pop a client from MongoDB pool";
138
      XTRACE("Failed to pop a client from MongoDB pool");
Yu Gan's avatar
Yu Gan committed
139 140 141 142 143 144 145 146 147 148
      free(movie_id_mmc);
      throw se;
    }
    auto collection = mongoc_client_get_collection(
        mongodb_client, "movie-id", "movie-id");

    if (!collection) {
      ServiceException se;
      se.errorCode = ErrorCode::SE_MONGODB_ERROR;
      se.message = "Failed to create collection user from DB movie-id";
149
      XTRACE("Failed to create collection user from DB movie-id");
Yu Gan's avatar
Yu Gan committed
150 151 152 153 154 155 156 157
      free(movie_id_mmc);
      mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
      throw se;
    }

    bson_t *query = bson_new();
    BSON_APPEND_UTF8(query, "title", title.c_str());

158
    XTRACE("Start MongoFindMovieId");
Yu Gan's avatar
Yu Gan committed
159 160 161 162 163 164 165
    auto find_span = opentracing::Tracer::Global()->StartSpan(
        "MongoFindMovieId", { opentracing::ChildOf(&span->context()) });
    mongoc_cursor_t *cursor = mongoc_collection_find_with_opts(
        collection, query, nullptr, nullptr);
    const bson_t *doc;
    bool found = mongoc_cursor_next(cursor, &doc);
    find_span->Finish();
166
    XTRACE("Finish MongoFindMovieId");
Yu Gan's avatar
Yu Gan committed
167 168 169 170 171 172

    if (found) {
      bson_iter_t iter;
      if (bson_iter_init_find(&iter, doc, "movie_id")) {
        movie_id_str = std::string(bson_iter_value(&iter)->value.v_utf8.str);
        LOG(debug) << "Find movie " << movie_id_str << " cache miss";
173
        XTRACE("Find movie " + movie_id_str + " cache miss");
Yu Gan's avatar
Yu Gan committed
174 175 176 177 178 179 180 181 182
      } else {
        LOG(error) << "Attribute movie_id is not find in MongoDB";
        bson_destroy(query);
        mongoc_cursor_destroy(cursor);
        mongoc_collection_destroy(collection);
        mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
        ServiceException se;
        se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
        se.message = "Attribute movie_id is not find in MongoDB";
183
        XTRACE("Attribute movie_id not found in MongoDB");
Yu Gan's avatar
Yu Gan committed
184 185 186 187 188 189 190 191 192 193 194 195
        free(movie_id_mmc);
        throw se;
      }
    } else {
      LOG(error) << "Movie " << title << " is not found in MongoDB";
      bson_destroy(query);
      mongoc_cursor_destroy(cursor);
      mongoc_collection_destroy(collection);
      mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
      ServiceException se;
      se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
      se.message = "Movie " + title + " is not found in MongoDB";
196
      XTRACE("Movie " + title + " not found in MongoDB");
Yu Gan's avatar
Yu Gan committed
197 198 199 200 201 202 203 204 205 206 207 208
      free(movie_id_mmc);
      throw se;
    }
    bson_destroy(query);
    mongoc_cursor_destroy(cursor);
    mongoc_collection_destroy(collection);
    mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
  }
  
  std::future<void> set_future;
  std::future<void> movie_id_future;
  std::future<void> rating_future;
209 210 211

  Baggage set_baggage = BRANCH_CURRENT_BAGGAGE();

Yu Gan's avatar
Yu Gan committed
212
  set_future = std::async(std::launch::async, [&]() {
213 214
    BAGGAGE(set_baggage);
    XTRACE("Start Memcached SetMovieId");
Yu Gan's avatar
Yu Gan committed
215 216 217 218 219 220 221 222 223 224 225 226 227 228
    memcached_client = memcached_pool_pop(
        _memcached_client_pool, true, &memcached_rc);
    auto set_span = opentracing::Tracer::Global()->StartSpan(
        "MmcSetMovieId", { opentracing::ChildOf(&span->context()) });
    // Upload the movie id to memcached
    memcached_rc = memcached_set(
        memcached_client,
        title.c_str(),
        title.length(),
        movie_id_str.c_str(),
        movie_id_str.length(),
        static_cast<time_t>(0),
        static_cast<uint32_t>(0)
    );
229
    XTRACE("End Memcached SetMovieId");
Yu Gan's avatar
Yu Gan committed
230 231 232 233
    set_span->Finish();
    if (memcached_rc != MEMCACHED_SUCCESS) {
      LOG(warning) << "Failed to set movie_id to Memcached: "
                   << memcached_strerror(memcached_client, memcached_rc);
234
      XTRACE("Failed to set movie_id to Memcached: " + std::string(memcached_strerror(memcached_client, memcached_rc)));
Yu Gan's avatar
Yu Gan committed
235 236 237 238
    }
    memcached_pool_push(_memcached_client_pool, memcached_client);    
  });

239 240
  Baggage movie_baggage = BRANCH_CURRENT_BAGGAGE();

Yu Gan's avatar
Yu Gan committed
241
  movie_id_future = std::async(std::launch::async, [&]() {
242
    BAGGAGE(movie_baggage);
Yu Gan's avatar
Yu Gan committed
243 244 245 246 247
    auto compose_client_wrapper = _compose_client_pool->Pop();
    if (!compose_client_wrapper) {
      ServiceException se;
      se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
      se.message = "Failed to connected to compose-review-service";
248
      XTRACE("Failed to connect to compose-review-service");
Yu Gan's avatar
Yu Gan committed
249 250 251 252
      throw se;
    }
    auto compose_client = compose_client_wrapper->GetClient();
    try {
253
      writer_text_map["baggage"] = GET_CURRENT_BAGGAGE().str();
Yu Gan's avatar
Yu Gan committed
254 255 256 257
      compose_client->UploadMovieId(req_id, movie_id_str, writer_text_map);
    } catch (...) {
      _compose_client_pool->Push(compose_client_wrapper);
      LOG(error) << "Failed to upload movie_id to compose-review-service";
258
      XTRACE("Failed to upload movie_id to compose-review-service");
Yu Gan's avatar
Yu Gan committed
259 260 261 262 263
      throw;
    }
    _compose_client_pool->Push(compose_client_wrapper);
  });

264
  Baggage rating_baggage = BRANCH_CURRENT_BAGGAGE();
Yu Gan's avatar
Yu Gan committed
265
  rating_future = std::async(std::launch::async, [&]() {
266
    BAGGAGE(rating_baggage);
Yu Gan's avatar
Yu Gan committed
267 268 269 270 271
    auto rating_client_wrapper = _rating_client_pool->Pop();
    if (!rating_client_wrapper) {
      ServiceException se;
      se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
      se.message = "Failed to connected to rating-service";
272
      XTRACE("Failed to connect to rating-service");
Yu Gan's avatar
Yu Gan committed
273 274 275 276
      throw se;
    }
    auto rating_client = rating_client_wrapper->GetClient();
    try {
277
      writer_text_map["baggage"] = GET_CURRENT_BAGGAGE().str();
Yu Gan's avatar
Yu Gan committed
278 279 280 281 282 283 284 285 286 287 288
      rating_client->UploadRating(req_id, movie_id_str, rating, writer_text_map);
    } catch (...) {
      _rating_client_pool->Push(rating_client_wrapper);
      LOG(error) << "Failed to upload rating to rating-service";
      throw;
    }
    _rating_client_pool->Push(rating_client_wrapper);
  });

  try {
    movie_id_future.get();
289
    JOIN_CURRENT_BAGGAGE(movie_baggage);
Yu Gan's avatar
Yu Gan committed
290
    rating_future.get();
291
    JOIN_CURRENT_BAGGAGE(rating_baggage);
Yu Gan's avatar
Yu Gan committed
292
    set_future.get();
293
    JOIN_CURRENT_BAGGAGE(set_baggage);
Yu Gan's avatar
Yu Gan committed
294 295 296 297 298
  } catch (...) {
    throw;
  }

  span->Finish();
299 300
  XTRACE("MovieHandler::UploadMovieId complete");
  DELETE_CURRENT_BAGGAGE();
Yu Gan's avatar
Yu Gan committed
301 302 303 304 305 306 307 308
}

void MovieIdHandler::RegisterMovieId (
    const int64_t req_id,
    const std::string &title,
    const std::string &movie_id,
    const std::map<std::string, std::string> & carrier) {

309 310 311 312 313 314 315 316 317 318
  std::map<std::string, std::string>::const_iterator baggage_it = carrier.find("baggage");
  if (baggage_it != carrier.end()) {
    SET_CURRENT_BAGGAGE(Baggage::deserialize(baggage_it->second));
  }

  if (!XTrace::IsTracing()) {
    XTrace::StartTrace("MovieIdHandler");
  }
  XTRACE("MovieIdHandler::RegisterMovieId", {{"RequestID", std::to_string(req_id)}});

Yu Gan's avatar
Yu Gan committed
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
  // Initialize a span
  TextMapReader reader(carrier);
  std::map<std::string, std::string> writer_text_map;
  TextMapWriter writer(writer_text_map);
  auto parent_span = opentracing::Tracer::Global()->Extract(reader);
  auto span = opentracing::Tracer::Global()->StartSpan(
      "RegisterMovieId",
      { opentracing::ChildOf(parent_span->get()) });
  opentracing::Tracer::Global()->Inject(span->context(), writer);

  mongoc_client_t *mongodb_client = mongoc_client_pool_pop(
      _mongodb_client_pool);
  if (!mongodb_client) {
    ServiceException se;
    se.errorCode = ErrorCode::SE_MONGODB_ERROR;
    se.message = "Failed to pop a client from MongoDB pool";
335
    XTRACE("Failed to pop a client from MongoDB pool");
Yu Gan's avatar
Yu Gan committed
336 337 338 339 340 341 342 343
    throw se;
  }
  auto collection = mongoc_client_get_collection(
      mongodb_client, "movie-id", "movie-id");
  if (!collection) {
    ServiceException se;
    se.errorCode = ErrorCode::SE_MONGODB_ERROR;
    se.message = "Failed to create collection movie_id from DB movie-id";
344
    XTRACE("Failed to create collection movie_id from DB movie-id");
Yu Gan's avatar
Yu Gan committed
345 346 347 348 349 350 351 352
    mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
    throw se;
  }

  // Check if the username has existed in the database
  bson_t *query = bson_new();
  BSON_APPEND_UTF8(query, "title", title.c_str());

353
  XTRACE("MongoFindMovie start");
Yu Gan's avatar
Yu Gan committed
354 355 356 357 358 359 360
  auto find_span = opentracing::Tracer::Global()->StartSpan(
      "MongoFindMovie", { opentracing::ChildOf(&span->context()) });
  mongoc_cursor_t *cursor = mongoc_collection_find_with_opts(
      collection, query, nullptr, nullptr);
  const bson_t *doc;
  bool found = mongoc_cursor_next(cursor, &doc);
  find_span->Finish();
361
  XTRACE("MongoFindMovie finish");
Yu Gan's avatar
Yu Gan committed
362 363 364 365 366 367

  if (found) {
    LOG(warning) << "Movie "<< title << " already existed in MongoDB";
    ServiceException se;
    se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
    se.message = "Movie " + title + " already existed in MongoDB";
368
    XTRACE("Movie " + title + " already existed in MongoDB");
Yu Gan's avatar
Yu Gan committed
369 370 371 372 373 374 375 376 377 378
    mongoc_cursor_destroy(cursor);
    mongoc_collection_destroy(collection);
    mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
    throw se;
  } else {
    bson_t *new_doc = bson_new();
    BSON_APPEND_UTF8(new_doc, "title", title.c_str());
    BSON_APPEND_UTF8(new_doc, "movie_id", movie_id.c_str());
    bson_error_t error;

379
    XTRACE("MongoInsertMovie start");
Yu Gan's avatar
Yu Gan committed
380 381 382 383 384
    auto insert_span = opentracing::Tracer::Global()->StartSpan(
        "MongoInsertMovie", { opentracing::ChildOf(&span->context()) });
    bool plotinsert = mongoc_collection_insert_one (
        collection, new_doc, nullptr, nullptr, &error);
    insert_span->Finish();
385
    XTRACE("MongoInsertMovie finish");
Yu Gan's avatar
Yu Gan committed
386 387 388 389 390 391 392

    if (!plotinsert) {
      LOG(error) << "Failed to insert movie_id of " << title
          << " to MongoDB: " << error.message;
      ServiceException se;
      se.errorCode = ErrorCode::SE_MONGODB_ERROR;
      se.message = error.message;
393
      XTRACE("Failed to insert movie_id of " + title + " to MongoDB");
Yu Gan's avatar
Yu Gan committed
394 395 396 397 398 399 400 401 402 403 404 405 406
      bson_destroy(new_doc);
      mongoc_cursor_destroy(cursor);
      mongoc_collection_destroy(collection);
      mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
      throw se;
    }
    bson_destroy(new_doc);
  }
  mongoc_cursor_destroy(cursor);
  mongoc_collection_destroy(collection);
  mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);

  span->Finish();
407 408
  XTRACE("MovieIdService::RegisterMovieId complete");
  DELETE_CURRENT_BAGGAGE();
Yu Gan's avatar
Yu Gan committed
409 410 411 412
}
} // namespace media_service

#endif //MEDIA_MICROSERVICES_MOVIEIDHANDLER_H