MovieReviewHandler.h 15.8 KB
Newer Older
Yu Gan's avatar
Yu Gan committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#ifndef MEDIA_MICROSERVICES_MOVIEREVIEWHANDLER_H
#define MEDIA_MICROSERVICES_MOVIEREVIEWHANDLER_H

#include <iostream>
#include <string>

#include <mongoc.h>
#include <bson/bson.h>

#include "../../gen-cpp/MovieReviewService.h"
#include "../../gen-cpp/ReviewStorageService.h"
#include "../logger.h"
#include "../tracing.h"
#include "../ClientPool.h"
#include "../RedisClient.h"
#include "../ThriftClient.h"
17 18
#include <xtrace/xtrace.h>
#include <xtrace/baggage.h>
Yu Gan's avatar
Yu Gan committed
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55

namespace media_service {
class MovieReviewHandler : public MovieReviewServiceIf {
 public:
  MovieReviewHandler(
      ClientPool<RedisClient> *,
      mongoc_client_pool_t *,
      ClientPool<ThriftClient<ReviewStorageServiceClient>> *);
  ~MovieReviewHandler() override = default;
  void UploadMovieReview(int64_t, const std::string&, int64_t, int64_t,
                         const std::map<std::string, std::string> &) override;
  void ReadMovieReviews(std::vector<Review> & _return, int64_t req_id,
      const std::string& movie_id, int32_t start, int32_t stop, 
      const std::map<std::string, std::string> & carrier) override;
  
 private:
  ClientPool<RedisClient> *_redis_client_pool;
  mongoc_client_pool_t *_mongodb_client_pool;
  ClientPool<ThriftClient<ReviewStorageServiceClient>> *_review_client_pool;
};

MovieReviewHandler::MovieReviewHandler(
    ClientPool<RedisClient> *redis_client_pool,
    mongoc_client_pool_t *mongodb_pool,
    ClientPool<ThriftClient<ReviewStorageServiceClient>> *review_storage_client_pool) {
  _redis_client_pool = redis_client_pool;
  _mongodb_client_pool = mongodb_pool;
  _review_client_pool = review_storage_client_pool;
}

void MovieReviewHandler::UploadMovieReview(
    int64_t req_id,
    const std::string& movie_id,
    int64_t review_id,
    int64_t timestamp,
    const std::map<std::string, std::string> & carrier) {

56 57 58 59 60 61 62 63 64 65
  std::map<std::string, std::string>::const_iterator baggage_it = carrier.find("baggage");
  if (baggage_it != carrier.end()) {
    SET_CURRENT_BAGGAGE(Baggage::deserialize(baggage_it->second));
  }

  if (!XTrace::IsTracing()) {
    XTrace::StartTrace("MovieReviewHandler");
  }
  XTRACE("MovieReviewHandler::UploadMovieReview", {{"RequestID", std::to_string(req_id)}});

Yu Gan's avatar
Yu Gan committed
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
  // Initialize a span
  TextMapReader reader(carrier);
  std::map<std::string, std::string> writer_text_map;
  TextMapWriter writer(writer_text_map);
  auto parent_span = opentracing::Tracer::Global()->Extract(reader);
  auto span = opentracing::Tracer::Global()->StartSpan(
      "UploadMovieReview",
      { opentracing::ChildOf(parent_span->get()) });
  opentracing::Tracer::Global()->Inject(span->context(), writer);

  mongoc_client_t *mongodb_client = mongoc_client_pool_pop(
      _mongodb_client_pool);
  if (!mongodb_client) {
    ServiceException se;
    se.errorCode = ErrorCode::SE_MONGODB_ERROR;
    se.message = "Failed to pop a client from MongoDB pool";
82
    XTRACE("Failed to pop a client from MongoDB pool");
Yu Gan's avatar
Yu Gan committed
83 84 85 86 87 88 89 90 91
    throw se;
  }

  auto collection = mongoc_client_get_collection(
      mongodb_client, "movie-review", "movie-review");
  if (!collection) {
    ServiceException se;
    se.errorCode = ErrorCode::SE_MONGODB_ERROR;
    se.message = "Failed to create collection movie-review from DB movie-review";
92
    XTRACE("Failed to create collection movie-review from DB movie-review");
Yu Gan's avatar
Yu Gan committed
93 94 95 96 97 98
    mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
    throw se;
  }

  bson_t *query = bson_new();
  BSON_APPEND_UTF8(query, "movie_id", movie_id.c_str());
99
  XTRACE("MongoFindMovie start");
Yu Gan's avatar
Yu Gan committed
100 101 102 103
  auto find_span = opentracing::Tracer::Global()->StartSpan(
      "MongoFindMovie", {opentracing::ChildOf(&span->context())});
  mongoc_cursor_t *cursor = mongoc_collection_find_with_opts(
      collection, query, nullptr, nullptr);
104
  XTRACE("MongoFindMovie finish");
Yu Gan's avatar
Yu Gan committed
105 106 107 108 109 110 111 112 113 114
  const bson_t *doc;
  bool found = mongoc_cursor_next(cursor, &doc);
  if (!found) {
    bson_t *new_doc = BCON_NEW(
        "movie_id", BCON_UTF8(movie_id.c_str()),
        "reviews",
        "[", "{", "review_id", BCON_INT64(review_id),
        "timestamp", BCON_INT64(timestamp), "}", "]"
    );
    bson_error_t error;
115
    XTRACE("MongoInsert start");
Yu Gan's avatar
Yu Gan committed
116 117 118 119 120
    auto insert_span = opentracing::Tracer::Global()->StartSpan(
        "MongoInsert", {opentracing::ChildOf(&span->context())});
    bool plotinsert = mongoc_collection_insert_one(
        collection, new_doc, nullptr, nullptr, &error);
    insert_span->Finish();
121
    XTRACE("MongoInsert finish");
Yu Gan's avatar
Yu Gan committed
122 123 124
    if (!plotinsert) {
      LOG(error) << "Failed to insert movie review of movie " << movie_id
                 << " to MongoDB: " << error.message;
125
      XTRACE("Failed to insert movie review of movie " + movie_id + " to MongoDB");
Yu Gan's avatar
Yu Gan committed
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
      ServiceException se;
      se.errorCode = ErrorCode::SE_MONGODB_ERROR;
      se.message = error.message;
      bson_destroy(new_doc);
      bson_destroy(query);
      mongoc_cursor_destroy(cursor);
      mongoc_collection_destroy(collection);
      mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
      throw se;
    }
    bson_destroy(new_doc);
  } else {
    bson_t *update = BCON_NEW(
        "$push", "{",
        "reviews", "{",
        "$each", "[", "{",
        "review_id", BCON_INT64(review_id),
        "timestamp", BCON_INT64(timestamp),
        "}", "]",
        "$position", BCON_INT32(0),
        "}",
        "}"
    );
    bson_error_t error;
    bson_t reply;
151
    XTRACE("MongoUpdate start");
Yu Gan's avatar
Yu Gan committed
152 153 154 155 156 157
    auto update_span = opentracing::Tracer::Global()->StartSpan(
        "MongoUpdate.", {opentracing::ChildOf(&span->context())});
    bool plotupdate = mongoc_collection_find_and_modify(
        collection, query, nullptr, update, nullptr, false, false,
        true, &reply, &error);
    update_span->Finish();
158
    XTRACE("MongoUpdate finish");
Yu Gan's avatar
Yu Gan committed
159 160 161
    if (!plotupdate) {
      LOG(error) << "Failed to update movie-review for movie " << movie_id
                 << " to MongoDB: " << error.message;
162
      XTRACE("Failed to update movie-review for movie " + movie_id + " to MongoDB");
Yu Gan's avatar
Yu Gan committed
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
      ServiceException se;
      se.errorCode = ErrorCode::SE_MONGODB_ERROR;
      se.message = error.message;
      bson_destroy(update);
      bson_destroy(query);
      bson_destroy(&reply);
      mongoc_cursor_destroy(cursor);
      mongoc_collection_destroy(collection);
      mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
      throw se;
    }
    bson_destroy(update);
    bson_destroy(&reply);
  }
  bson_destroy(query);
  mongoc_cursor_destroy(cursor);
  mongoc_collection_destroy(collection);
  mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);

  auto redis_client_wrapper = _redis_client_pool->Pop();
  if (!redis_client_wrapper) {
    ServiceException se;
    se.errorCode = ErrorCode::SE_REDIS_ERROR;
    se.message = "Cannot connected to Redis server";
187
    XTRACE("Cannot connect to Redis server");
Yu Gan's avatar
Yu Gan committed
188 189 190
    throw se;
  }
  auto redis_client = redis_client_wrapper->GetClient();
191
  XTRACE("RedisUpdate start");
Yu Gan's avatar
Yu Gan committed
192 193 194 195 196 197 198 199 200 201 202 203 204 205
  auto redis_span = opentracing::Tracer::Global()->StartSpan(
      "RedisUpdate", {opentracing::ChildOf(&span->context())});
  auto num_reviews = redis_client->zcard(movie_id);
  redis_client->sync_commit();
  auto num_reviews_reply = num_reviews.get();
  std::vector<std::string> options{"NX"};
  if (num_reviews_reply.ok() && num_reviews_reply.as_integer()) {
    std::multimap<std::string, std::string> value = {{
      std::to_string(timestamp), std::to_string(review_id)}};
    redis_client->zadd(movie_id, options, value);
    redis_client->sync_commit();
  }
  _redis_client_pool->Push(redis_client_wrapper);
  redis_span->Finish();
206
  XTRACE("RedisUpdate finish");
Yu Gan's avatar
Yu Gan committed
207
  span->Finish();
208 209
  XTRACE("MovieReviewHandler::UpdateMovieReview finish");
  DELETE_CURRENT_BAGGAGE();
Yu Gan's avatar
Yu Gan committed
210 211 212 213 214 215 216
}

void MovieReviewHandler::ReadMovieReviews(
    std::vector<Review> & _return, int64_t req_id,
    const std::string& movie_id, int32_t start, int32_t stop,
    const std::map<std::string, std::string> & carrier) {
  
217 218 219 220 221 222 223 224 225
  std::map<std::string, std::string>::const_iterator baggage_it = carrier.find("baggage");
  if (baggage_it != carrier.end()) {
    SET_CURRENT_BAGGAGE(Baggage::deserialize(baggage_it->second));
  }

  if (!XTrace::IsTracing()) {
    XTrace::StartTrace("MovieReviewHandler");
  }
  XTRACE("MovieReviewHandler::ReadMovieReviews", {{"RequestID", std::to_string(req_id)}});
Yu Gan's avatar
Yu Gan committed
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
  // Initialize a span
  TextMapReader reader(carrier);
  std::map<std::string, std::string> writer_text_map;
  TextMapWriter writer(writer_text_map);
  auto parent_span = opentracing::Tracer::Global()->Extract(reader);
  auto span = opentracing::Tracer::Global()->StartSpan(
      "ReadMovieReviews",
      { opentracing::ChildOf(parent_span->get()) });
  opentracing::Tracer::Global()->Inject(span->context(), writer);

  if (stop <= start || start < 0) {
    return;
  }

  auto redis_client_wrapper = _redis_client_pool->Pop();
  if (!redis_client_wrapper) {
    ServiceException se;
    se.errorCode = ErrorCode::SE_REDIS_ERROR;
    se.message = "Cannot connected to Redis server";
245
    XTRACE("Cannot connect to Redis server");
Yu Gan's avatar
Yu Gan committed
246 247 248
    throw se;
  }
  auto redis_client = redis_client_wrapper->GetClient();
249
  XTRACE("RedisFind start");
Yu Gan's avatar
Yu Gan committed
250 251 252 253 254
  auto redis_span = opentracing::Tracer::Global()->StartSpan(
      "RedisFind", {opentracing::ChildOf(&span->context())});
  auto review_ids_future = redis_client->zrevrange(movie_id, start, stop - 1);
  redis_client->commit();
  redis_span->Finish();
255
  XTRACE("RedisFind finish");
Yu Gan's avatar
Yu Gan committed
256 257 258 259 260 261

  cpp_redis::reply review_ids_reply;
  try {
    review_ids_reply = review_ids_future.get();
  } catch (...) {
    LOG(error) << "Failed to read review_ids from movie-review-redis";
262
    XTRACE("Failed to read review_ids from movie-review-redis");
Yu Gan's avatar
Yu Gan committed
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
    _redis_client_pool->Push(redis_client_wrapper);
    throw;
  }
  _redis_client_pool->Push(redis_client_wrapper);
  std::vector<int64_t> review_ids;
  auto review_ids_reply_array = review_ids_reply.as_array();
  for (auto &review_id_reply : review_ids_reply_array) {
    review_ids.emplace_back(std::stoul(review_id_reply.as_string()));
  }

  int mongo_start = start + review_ids.size();
  std::multimap<std::string, std::string> redis_update_map;
  if (mongo_start < stop) {
    // Instead find review_ids from mongodb
    mongoc_client_t *mongodb_client = mongoc_client_pool_pop(
        _mongodb_client_pool);
    if (!mongodb_client) {
      ServiceException se;
      se.errorCode = ErrorCode::SE_MONGODB_ERROR;
      se.message = "Failed to pop a client from MongoDB pool";
283
      XTRACE("Failed to pop a client from MongoDB pool");
Yu Gan's avatar
Yu Gan committed
284 285 286 287 288 289 290 291
      throw se;
    }
    auto collection = mongoc_client_get_collection(
        mongodb_client, "movie-review", "movie-review");
    if (!collection) {
      ServiceException se;
      se.errorCode = ErrorCode::SE_MONGODB_ERROR;
      se.message = "Failed to create collection movie-review from MongoDB";
292
      XTRACE("Failed to create collection movie-review from MongoDB");
Yu Gan's avatar
Yu Gan committed
293 294 295 296 297 298 299 300 301 302
      throw se;
    }

    bson_t *query = BCON_NEW("movie_id", BCON_UTF8(movie_id.c_str()));
    bson_t *opts = BCON_NEW(
        "projection", "{",
        "reviews", "{",
        "$slice", "[",
        BCON_INT32(0), BCON_INT32(stop),
        "]", "}", "}");
303
    XTRACE("MongoFindMovieReviews start");
Yu Gan's avatar
Yu Gan committed
304 305 306 307 308
    auto find_span = opentracing::Tracer::Global()->StartSpan(
        "MongoFindMovieReviews", {opentracing::ChildOf(&span->context())});
    mongoc_cursor_t *cursor = mongoc_collection_find_with_opts(
        collection, query, opts, nullptr);
    find_span->Finish();
309
    XTRACE("MongoFindMovieReviews finish");
Yu Gan's avatar
Yu Gan committed
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
    const bson_t *doc;
    bool found = mongoc_cursor_next(cursor, &doc);
    if (found) {
      bson_iter_t iter_0;
      bson_iter_t iter_1;
      bson_iter_t review_id_child;
      bson_iter_t timestamp_child;
      int idx = 0;
      bson_iter_init(&iter_0, doc);
      bson_iter_init(&iter_1, doc);
      while (bson_iter_find_descendant(&iter_0,
                                       ("reviews." + std::to_string(idx)
                                           + ".review_id").c_str(),
                                       &review_id_child)
          && BSON_ITER_HOLDS_INT64(&review_id_child)
          && bson_iter_find_descendant(&iter_1,
                                       ("reviews." + std::to_string(idx)
                                           + ".timestamp").c_str(),
                                       &timestamp_child)
          && BSON_ITER_HOLDS_INT64(&timestamp_child)) {
        auto curr_review_id = bson_iter_int64(&review_id_child);
        auto curr_timestamp = bson_iter_int64(&timestamp_child);
        if (idx >= mongo_start) {
          review_ids.emplace_back(curr_review_id);
        }
        redis_update_map.insert(
            {std::to_string(curr_timestamp), std::to_string(curr_review_id)});
        bson_iter_init(&iter_0, doc);
        bson_iter_init(&iter_1, doc);
        idx++;
      }
    }
    find_span->Finish();
    bson_destroy(opts);
    bson_destroy(query);
    mongoc_cursor_destroy(cursor);
    mongoc_collection_destroy(collection);
    mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
  }

350
  Baggage review_baggage = BRANCH_CURRENT_BAGGAGE();
Yu Gan's avatar
Yu Gan committed
351 352
  std::future<std::vector<Review>> review_future = std::async(
      std::launch::async, [&]() {
353
        BAGGAGE(review_baggage);
Yu Gan's avatar
Yu Gan committed
354 355 356 357 358
        auto review_client_wrapper = _review_client_pool->Pop();
        if (!review_client_wrapper) {
          ServiceException se;
          se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
          se.message = "Failed to connected to review-storage-service";
359
          XTRACE("Failed to connect to review-storage-service");
Yu Gan's avatar
Yu Gan committed
360 361 362 363 364
          throw se;
        }
        std::vector<Review> _return_reviews;
        auto review_client = review_client_wrapper->GetClient();
        try {
365
          writer_text_map["baggage"] = GET_CURRENT_BAGGAGE().str();
Yu Gan's avatar
Yu Gan committed
366 367 368 369 370
          review_client->ReadReviews(
              _return_reviews, req_id, review_ids, writer_text_map);
        } catch (...) {
          _review_client_pool->Push(review_client_wrapper);
          LOG(error) << "Failed to read review from review-storage-service";
371
          XTRACE("Failed to read review from review-storage-service");
Yu Gan's avatar
Yu Gan committed
372 373 374 375 376 377 378 379 380 381 382 383 384 385
          throw;
        }
        _review_client_pool->Push(review_client_wrapper);
        return _return_reviews;
      });

  std::future<cpp_redis::reply> zadd_reply_future;
  if (!redis_update_map.empty()) {
    // Update Redis
    redis_client_wrapper = _redis_client_pool->Pop();
    if (!redis_client_wrapper) {
      ServiceException se;
      se.errorCode = ErrorCode::SE_REDIS_ERROR;
      se.message = "Cannot connected to Redis server";
386
      XTRACE("Cannot connect to Redis server");
Yu Gan's avatar
Yu Gan committed
387 388 389
      throw se;
    }
    redis_client = redis_client_wrapper->GetClient();
390
    XTRACE("RedisUpdate start");
Yu Gan's avatar
Yu Gan committed
391 392 393 394 395 396 397 398
    auto redis_update_span = opentracing::Tracer::Global()->StartSpan(
        "RedisUpdate", {opentracing::ChildOf(&span->context())});
    redis_client->del(std::vector<std::string>{movie_id});
    std::vector<std::string> options{"NX"};
    zadd_reply_future = redis_client->zadd(
        movie_id, options, redis_update_map);
    redis_client->commit();
    redis_update_span->Finish();
399
    XTRACE("RedisUpdate finish");
Yu Gan's avatar
Yu Gan committed
400 401 402 403
  }

  try {
    _return = review_future.get();
404
    JOIN_CURRENT_BAGGAGE(review_baggage);
Yu Gan's avatar
Yu Gan committed
405 406
  } catch (...) {
    LOG(error) << "Failed to get review from review-storage-service";
407
    XTRACE("Failed to get review from review-storage-service");
Yu Gan's avatar
Yu Gan committed
408 409 410 411 412
    if (!redis_update_map.empty()) {
      try {
        zadd_reply_future.get();
      } catch (...) {
        LOG(error) << "Failed to Update Redis Server";
413
        XTRACE("Failed to Update Redis Server");
Yu Gan's avatar
Yu Gan committed
414 415 416 417 418 419 420 421 422 423 424
      }
      _redis_client_pool->Push(redis_client_wrapper);
    }
    throw;
  }

  if (!redis_update_map.empty()) {
    try {
      zadd_reply_future.get();
    } catch (...) {
      LOG(error) << "Failed to Update Redis Server";
425
      XTRACE("Failed to Update Redis Server");
Yu Gan's avatar
Yu Gan committed
426 427 428 429 430 431 432
      _redis_client_pool->Push(redis_client_wrapper);
      throw;
    }
    _redis_client_pool->Push(redis_client_wrapper);
  }

  span->Finish();
433
  XTRACE("MovieReviewHandler::ReadMovieReviews finish");
Yu Gan's avatar
Yu Gan committed
434 435 436 437 438 439 440
  
}

} // namespace media_service


#endif //MEDIA_MICROSERVICES_MOVIEREVIEWHANDLER_H