Skip to content
Snippets Groups Projects
stress_test_controller.h 12.40 KiB
#ifndef _CLOCKWORK_CONTROLLER_STRESS_TEST_CONTROLLER_H_
#define _CLOCKWORK_CONTROLLER_STRESS_TEST_CONTROLLER_H_

#include "clockwork/network/controller.h"
#include "clockwork/api/worker_api.h"
#include "controller.h"
#include <iomanip>
#include <sstream>
#include <algorithm>
#include <numeric>
#include <cmath>
#include <unordered_map>
#include <unistd.h>
#include <sys/types.h>
#include <tbb/concurrent_queue.h>

using namespace clockwork;

bool log_actions = false;

class StressTestGPU {
public:
	unsigned worker_id;
	unsigned worker_gpu_id;
	int outstanding_loadweights;
	int outstanding_infer;
	std::deque<unsigned> models_on_gpu;
	std::deque<unsigned> models_not_on_gpu;

	unsigned load_weights_errors = 0;
	unsigned infer_errors = 0;
	std::vector<uint64_t> load_weights_measurements;
	std::vector<uint64_t> load_weights_e2e;
	std::vector<uint64_t> infer_measurements;
	std::vector<uint64_t> infer_e2e;
	StressTestGPU(): 
		models_on_gpu(), 
		models_not_on_gpu(), 
		load_weights_measurements(), 
		load_weights_e2e(),
		infer_measurements(), 
		infer_e2e(),
		outstanding_loadweights(0),
		outstanding_infer(0) {}
};

// Simple controller implementation that send requests to worker and wait for response
class StressTestController : public Controller {
public:
	bool stress_infer = true;
	bool stress_loadweights = true;
	bool send_inputs = false;

	std::string model_path = "/home/jcmace/clockwork-modelzoo-volta/resnet50_v2/model";
	unsigned duplicates = 40;
	unsigned max_models_on_gpu = 20;
	size_t input_size = send_inputs ? 602112 : 0;
	char* input;

	int max_outstanding_loadweights = 2;
	int max_outstanding_infer = 2;

	unsigned num_gpus = 2;

	std::atomic_int action_id_seed;

	std::recursive_mutex mutex;
	uint64_t profiled_load_weights;
	uint64_t profiled_exec;
	std::vector<StressTestGPU> gpus;

	unsigned pending_workers;

	std::thread printer;

	std::map<unsigned, std::function<void(std::shared_ptr<workerapi::Result>)>> callbacks;

	tbb::concurrent_queue<std::shared_ptr<workerapi::Result>> results;

	StressTestController(int client_port, std::vector<std::pair<std::string, std::string>> worker_host_port_pairs):
		Controller::Controller(client_port, worker_host_port_pairs), action_id_seed(0), 
		pending_workers(workers.size()),
		results(),
		printer(&StressTestController::printerThread, this) {
		threading::initLoggerThread(printer);
		input = static_cast<char*>(malloc(input_size));

		init();
	}

	std::string stats(std::vector<uint64_t> v, std::vector<uint64_t> e2e, uint64_t profiled, uint64_t duration) {
		if (v.size() == 0) {
			return "throughput=0";
		}

		const auto [min, max] = std::minmax_element(v.begin(), v.end());
		const auto [e2emin, e2emax] = std::minmax_element(e2e.begin(), e2e.end());
		int count = v.size();
		double sum = std::accumulate(v.begin(), v.end(), 0.0);
		double sume2e = std::accumulate(e2e.begin(), e2e.end(), 0.0);
		double throughput = count * 1000000000.0 / static_cast<double>(duration);

		std::stringstream s;
		s << std::fixed << std::setprecision(2);
		s << "profiled=" << profiled << " min=" << *min << " max=" << *max << " mean=" << (sum/count) << " e2e=" << (sume2e/count) << " e2emax=" << *e2emax << " throughput=" << throughput << " efficiency=" << (sum/((float) duration));
		return s.str();
	}

	void printerThread() {
		uint64_t last_print = util::now();
		uint64_t print_interval = 1000000000UL;

		while (true) {
			if (last_print + print_interval > util::now()) {
				usleep(10000);
				continue;
			}

			std::vector<StressTestGPU> gpu_copies;
			uint64_t now;
			{
				std::lock_guard<std::recursive_mutex> lock(mutex);

				now = util::now();

				for (StressTestGPU &gpu : this->gpus) {
					gpu_copies.push_back(gpu);
					gpu.load_weights_measurements.clear();
					gpu.load_weights_e2e.clear();
					gpu.infer_measurements.clear();
					gpu.infer_e2e.clear();
					gpu.infer_errors = 0;
					gpu.load_weights_errors = 0;
				}
			}
			uint64_t duration = now - last_print;
			last_print = now;

			std::stringstream report;
			for (auto &gpu : gpu_copies) {
				report << "LoadWeights W" << gpu.worker_id << " GPU" << gpu.worker_gpu_id
				       << " errors=" << gpu.load_weights_errors 
				       << " " << stats(gpu.load_weights_measurements, gpu.load_weights_e2e, profiled_load_weights, duration) << std::endl;
			}
			for (auto &gpu : gpu_copies) {
				report << "Infer W" << gpu.worker_id << " GPU" << gpu.worker_gpu_id 
					   << " errors=" << gpu.infer_errors 
					   << " " << stats(gpu.infer_measurements, gpu.infer_e2e, profiled_exec, duration) << std::endl;
			}

			std::cout << report.str();
		}
	}
	
	void save_callback(unsigned action_id, std::function<void(std::shared_ptr<workerapi::Result>)> callback) {
		auto it = callbacks.find(action_id);
		CHECK(it == callbacks.end()) << "Action " << action_id << " already exists";
		callbacks.insert(std::make_pair(action_id, callback));
	}

	void callback(std::shared_ptr<workerapi::Result> result) {
		auto it = callbacks.find(result->id);
		CHECK(it != callbacks.end()) << "Received result with no callback";

		auto f = it->second;
		callbacks.erase(it);

		f(result);
	}

	void init() {
		std::lock_guard<std::recursive_mutex> lock(mutex);

		for (unsigned worker_id = 0; worker_id < this->workers.size(); worker_id++) {
			unsigned action_id = action_id_seed++;

			auto load_model = std::make_shared<workerapi::LoadModelFromDisk>();
			load_model->id = action_id;
			load_model->model_id = 0;
			load_model->model_path = model_path;
			load_model->earliest = 0;
			load_model->latest = util::now() + 600000000000UL;
			load_model->no_of_copies = duplicates;

			save_callback(action_id, std::bind(&StressTestController::onLoadModelsComplete, this, worker_id, std::placeholders::_1));

			this->workers[worker_id]->sendAction(load_model);
		}
	}

	void onLoadModelsComplete(unsigned worker_id, std::shared_ptr<workerapi::Result> result) {
		std::lock_guard<std::recursive_mutex> lock(mutex);

		if (auto error = std::dynamic_pointer_cast<workerapi::ErrorResult>(result)) {
			CHECK(false) << "StressTestController unable to load initial models " << result->str();
		} else if (auto load = std::dynamic_pointer_cast<workerapi::LoadModelFromDiskResult>(result)) {
			profiled_load_weights = load->weights_load_time_nanos;
			profiled_exec = load->batch_size_exec_times_nanos[0];
			for (unsigned i = 0; i < num_gpus; i++) {
				StressTestGPU gpu;
				gpu.worker_id = worker_id;
				gpu.worker_gpu_id = i;
				for (unsigned i = 0; i < duplicates; i++) {
					gpu.models_not_on_gpu.push_back(i);
				}
				gpus.push_back(gpu);
			}
			pending_workers--;
		} else {
			CHECK(false) << "Unexpected response to EvictWeights action";
		}
	}

	bool evictNext(unsigned gpu_id) {
		std::lock_guard<std::recursive_mutex> lock(mutex);

		auto &gpu = gpus[gpu_id];

		if (gpu.models_on_gpu.size() <= max_models_on_gpu) return false;

		unsigned model_id = gpu.models_on_gpu.front();
		gpu.models_on_gpu.pop_front();

		unsigned action_id = action_id_seed++;

		auto evict = std::make_shared<workerapi::EvictWeights>();
		evict->id = action_id;
		evict->model_id = model_id;
		evict->gpu_id = gpu.worker_gpu_id;
		evict->earliest = util::now();
		evict->latest = util::now() + 10000000000UL; // 10s

		save_callback(action_id, std::bind(&StressTestController::onEvictWeightsComplete, this, model_id, gpu_id, std::placeholders::_1));

		this->workers[gpu.worker_id]->sendAction(evict);

		return true;
	}

	void onEvictWeightsComplete(unsigned model_id, unsigned gpu_id, std::shared_ptr<workerapi::Result> result) {
		std::lock_guard<std::recursive_mutex> lock(mutex);

		if (auto error = std::dynamic_pointer_cast<workerapi::ErrorResult>(result)) {
			CHECK(false) << "Error in evict weights action, which should never happen except for fatal errors";
		} else if (auto evict = std::dynamic_pointer_cast<workerapi::EvictWeightsResult>(result)) {
			auto &gpu = gpus[gpu_id];
			gpu.models_not_on_gpu.push_back(model_id);
		} else {
			CHECK(false) << "Unexpected response to EvictWeights action";
		}
	}

	bool loadNext(unsigned gpu_id) {
		std::lock_guard<std::recursive_mutex> lock(mutex);

		auto &gpu = gpus[gpu_id];

		if (gpu.models_not_on_gpu.size() == 0) return false;
		if (gpu.outstanding_loadweights >= max_outstanding_loadweights) return false;

		unsigned model_id = gpu.models_not_on_gpu.front();
		gpu.models_not_on_gpu.pop_front();

		unsigned action_id = action_id_seed++;

		auto load = std::make_shared<workerapi::LoadWeights>();
		load->id = action_id;
		load->gpu_id = gpu.worker_gpu_id;
		load->model_id = model_id;
		load->earliest = util::now();
		load->latest = util::now() + 10000000000UL; // 10s

		save_callback(action_id, std::bind(&StressTestController::onLoadWeightsComplete, this, model_id, gpu_id, util::now(), std::placeholders::_1));

		if (log_actions) std::cout << "S: " << load->str() << std::endl;
		this->workers[gpu.worker_id]->sendAction(load);

		gpu.outstanding_loadweights++;

		return true;
	}

	void onLoadWeightsComplete(unsigned model_id, unsigned gpu_id, uint64_t submitted_at, std::shared_ptr<workerapi::Result> result) {
		std::lock_guard<std::recursive_mutex> lock(mutex);

		auto &gpu = gpus[gpu_id];
		if (auto error = std::dynamic_pointer_cast<workerapi::ErrorResult>(result)) {
			std::cout << "LoadWeights error " << result->str() << std::endl;
			gpu.load_weights_errors++;
			gpu.models_not_on_gpu.push_back(model_id);
		} else if (auto load = std::dynamic_pointer_cast<workerapi::LoadWeightsResult>(result)) {
			gpu.models_on_gpu.push_back(model_id);
			gpu.load_weights_measurements.push_back(load->duration);
			gpu.load_weights_e2e.push_back(util::now() - submitted_at);
		} else {
			CHECK(false) << "Unexpected response to LoadWeights action";
		}
		gpu.outstanding_loadweights--;
	}

	bool inferNext(unsigned gpu_id) {
		std::lock_guard<std::recursive_mutex> lock(mutex);

		auto &gpu = gpus[gpu_id];
		if (gpu.outstanding_infer >= max_outstanding_infer) return false;

		unsigned next_model_ix = 6 + gpu.outstanding_infer;
		if (gpu.models_on_gpu.size() <= next_model_ix) return false;

		unsigned model_id = gpu.models_on_gpu[next_model_ix++];
		unsigned action_id = action_id_seed++;

		auto infer = std::make_shared<workerapi::Infer>();
		infer->id = action_id;
		infer->model_id = model_id;
		infer->gpu_id = gpu.worker_gpu_id;
		infer->batch_size = 1;
		infer->input = input;
		infer->input_size = input_size;
		infer->earliest = util::now();
		infer->latest = util::now() + 10000000000UL; // 10s

		save_callback(action_id, std::bind(&StressTestController::onInferComplete, this, model_id, gpu_id, util::now(), std::placeholders::_1));


		if (log_actions) std::cout << "S: " << infer->str() << std::endl;
		this->workers[gpu.worker_id]->sendAction(infer);

		gpu.outstanding_infer++;

		return true;
	}

	void onInferComplete(unsigned model_id, unsigned gpu_id, uint64_t submitted_at, std::shared_ptr<workerapi::Result> result) {
		std::lock_guard<std::recursive_mutex> lock(mutex);

		auto &gpu = gpus[gpu_id];
		if (auto error = std::dynamic_pointer_cast<workerapi::ErrorResult>(result)) {
			std::cout << "Infer error " << result->str() << std::endl;
			gpu.infer_errors++;
		} else if (auto infer = std::dynamic_pointer_cast<workerapi::InferResult>(result)) {
			gpu.infer_measurements.push_back(infer->exec.duration);
			gpu.infer_e2e.push_back(util::now() - submitted_at);
		} else {
			CHECK(false) << "Unexpected response to LoadWeights action";
		}
		gpu.outstanding_infer--;
	}

	virtual void sendResult(std::shared_ptr<workerapi::Result> result) {
		if (log_actions) std::cout << "R: " << result->str() << std::endl;

		std::lock_guard<std::recursive_mutex> lock(mutex);

		callback(result);

		if (pending_workers > 0) return;

		for (unsigned i = 0; i < gpus.size(); i++) {
			if (stress_loadweights) {
				while (evictNext(i));	
			}
			while (loadNext(i));
			if (stress_infer) {
				while (inferNext(i));
			}
		}
	}

	virtual void infer(clientapi::InferenceRequest &request, std::function<void(clientapi::InferenceResponse&)> callback) {
		CHECK(false) << "infer from client not supported";
	}
	virtual void uploadModel(clientapi::UploadModelRequest &request, std::function<void(clientapi::UploadModelResponse&)> callback) {
		CHECK(false) << "uploadModel not supported";
	}
	virtual void evict(clientapi::EvictRequest &request, std::function<void(clientapi::EvictResponse&)> callback) {
		CHECK(false) << "evict not supported";
	}
	virtual void loadRemoteModel(clientapi::LoadModelFromRemoteDiskRequest &request, std::function<void(clientapi::LoadModelFromRemoteDiskResponse&)> callback) {
		CHECK(false) << "loadRemoteModel not supported";
	}
};

#endif