Skip to content
Snippets Groups Projects
Commit 85470e22 authored by Jonathan Mace's avatar Jonathan Mace
Browse files

Merge branch 'thread-management' into 'master'

Thread management

See merge request !14
parents 96d14e54 cc9550c0
No related branches found
No related tags found
1 merge request!14Thread management
Showing
with 370 additions and 182 deletions
...@@ -58,6 +58,7 @@ add_library( clockwork ...@@ -58,6 +58,7 @@ add_library( clockwork
src/clockwork/runtime.cpp src/clockwork/runtime.cpp
src/clockwork/memory.cpp src/clockwork/memory.cpp
src/clockwork/worker.cpp src/clockwork/worker.cpp
src/clockwork/thread.cpp
src/clockwork/model/memfile.cpp src/clockwork/model/memfile.cpp
src/clockwork/model/model.cpp src/clockwork/model/model.cpp
src/clockwork/model/batched.cpp src/clockwork/model/batched.cpp
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "clockwork/test/util.h" #include "clockwork/test/util.h"
#include "clockwork/model/so.h" #include "clockwork/model/so.h"
#include "clockwork/model/cuda.h" #include "clockwork/model/cuda.h"
#include "clockwork/thread.h"
#include <nvml.h> #include <nvml.h>
using namespace clockwork; using namespace clockwork;
...@@ -113,7 +114,7 @@ void print_system_status() { ...@@ -113,7 +114,7 @@ void print_system_status() {
std::cout << "GPU core affinity:" << std::endl; std::cout << "GPU core affinity:" << std::endl;
for (unsigned i = 0; i < num_gpus; i++) { for (unsigned i = 0; i < num_gpus; i++) {
std::vector<unsigned> cores = util::get_gpu_core_affinity(i); std::vector<unsigned> cores = threading::getGPUCoreAffinity(i);
std::cout << " GPU " << i << " ="; std::cout << " GPU " << i << " =";
for (unsigned &core : cores) { for (unsigned &core : cores) {
std::cout << " " << core; std::cout << " " << core;
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <atomic> #include <atomic>
#include <thread> #include <thread>
#include <unistd.h> #include <unistd.h>
#include "clockwork/thread.h"
using namespace clockwork; using namespace clockwork;
...@@ -77,8 +78,6 @@ public: ...@@ -77,8 +78,6 @@ public:
} }
void run(unsigned gpu_id, void* host, size_t host_size, std::vector<unsigned> cores) { void run(unsigned gpu_id, void* host, size_t host_size, std::vector<unsigned> cores) {
util::set_cores(cores);
status = cudaSetDevice(gpu_id); status = cudaSetDevice(gpu_id);
if (!successful()) { countdown_begin--; countdown_end--; return; }; if (!successful()) { countdown_begin--; countdown_end--; return; };
...@@ -162,7 +161,7 @@ TEST_CASE("Profile concurrent transfers on single GPU", "[singlegpu]") { ...@@ -162,7 +161,7 @@ TEST_CASE("Profile concurrent transfers on single GPU", "[singlegpu]") {
std::atomic_int countdown_end(1); std::atomic_int countdown_end(1);
std::atomic_bool alive(true); std::atomic_bool alive(true);
auto cores0 = util::get_gpu_core_affinity(0); auto cores0 = threading::getGPUCoreAffinity(0);
TransferThread t0(0, cores0, countdown_begin, countdown_end, alive, host, host_size); TransferThread t0(0, cores0, countdown_begin, countdown_end, alive, host, host_size);
...@@ -203,7 +202,7 @@ TEST_CASE("Profile concurrent transfers on same GPU", "[samegpu]") { ...@@ -203,7 +202,7 @@ TEST_CASE("Profile concurrent transfers on same GPU", "[samegpu]") {
std::atomic_int countdown_end(2); std::atomic_int countdown_end(2);
std::atomic_bool alive(true); std::atomic_bool alive(true);
auto cores0 = util::get_gpu_core_affinity(0); auto cores0 = threading::getGPUCoreAffinity(0);
TransferThread t0(0, cores0, countdown_begin, countdown_end, alive, host, host_size); TransferThread t0(0, cores0, countdown_begin, countdown_end, alive, host, host_size);
TransferThread t1(0, cores0, countdown_begin, countdown_end, alive, host, host_size); TransferThread t1(0, cores0, countdown_begin, countdown_end, alive, host, host_size);
...@@ -246,8 +245,8 @@ TEST_CASE("Profile concurrent transfers on multiple GPUs", "[multigpu]") { ...@@ -246,8 +245,8 @@ TEST_CASE("Profile concurrent transfers on multiple GPUs", "[multigpu]") {
std::atomic_int countdown_end(2); std::atomic_int countdown_end(2);
std::atomic_bool alive(true); std::atomic_bool alive(true);
auto cores0 = util::get_gpu_core_affinity(0); auto cores0 = threading::getGPUCoreAffinity(0);
auto cores1 = util::get_gpu_core_affinity(1); auto cores1 = threading::getGPUCoreAffinity(1);
TransferThread t0(0, cores0, countdown_begin, countdown_end, alive, host, host_size); TransferThread t0(0, cores0, countdown_begin, countdown_end, alive, host, host_size);
TransferThread t1(1, cores1, countdown_begin, countdown_end, alive, host, host_size); TransferThread t1(1, cores1, countdown_begin, countdown_end, alive, host, host_size);
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "clockwork/memory.h" #include "clockwork/memory.h"
#include "clockwork/model/model.h" #include "clockwork/model/model.h"
#include <sys/mman.h> #include <sys/mman.h>
#include "clockwork/thread.h"
using namespace clockwork; using namespace clockwork;
...@@ -375,8 +376,7 @@ public: ...@@ -375,8 +376,7 @@ public:
ModelExecWithModuleLoad(int i, Experiment* experiment, PageCache* weights_cache, MemoryPool* io_pool, MemoryPool* workspace_pool, std::vector<model::Model*> models, std::string input) : ModelExecWithModuleLoad(int i, Experiment* experiment, PageCache* weights_cache, MemoryPool* io_pool, MemoryPool* workspace_pool, std::vector<model::Model*> models, std::string input) :
experiment(experiment), weights_cache(weights_cache), io_pool(io_pool), workspace_pool(workspace_pool), models(models), alive(true), input(input), iterations(0), experiment(experiment), weights_cache(weights_cache), io_pool(io_pool), workspace_pool(workspace_pool), models(models), alive(true), input(input), iterations(0),
ready(false), started(false) { ready(false), started(false) {
util::set_core((i+7) % util::get_num_cores()); threading::setMaxPriority();
util::setCurrentThreadMaxPriority();
} }
void start(bool with_module_load) { void start(bool with_module_load) {
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include "clockwork/memory.h" #include "clockwork/memory.h"
#include "clockwork/cache.h" #include "clockwork/cache.h"
#include "clockwork/cuda_common.h" #include "clockwork/cuda_common.h"
#include "clockwork/thread.h"
using namespace clockwork; using namespace clockwork;
...@@ -33,7 +34,7 @@ void check_model(int page_size, int iterations, std::string model_path) { ...@@ -33,7 +34,7 @@ void check_model(int page_size, int iterations, std::string model_path) {
util::setCudaFlags(); util::setCudaFlags();
util::initializeCudaStream(); util::initializeCudaStream();
util::setCurrentThreadMaxPriority(); threading::setMaxPriority();
clockwork::model::BatchedModel* model = load_model(model_path); clockwork::model::BatchedModel* model = load_model(model_path);
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <string> #include <string>
#include <iostream> #include <iostream>
#include "clockwork/workload/azure.h" #include "clockwork/workload/azure.h"
#include "clockwork/thread.h"
using namespace clockwork; using namespace clockwork;
...@@ -17,7 +18,7 @@ std::pair<std::string, std::string> split(std::string addr) { ...@@ -17,7 +18,7 @@ std::pair<std::string, std::string> split(std::string addr) {
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
util::setCurrentThreadMaxPriority(); threading::initProcess();
if (argc != 3) { if (argc != 3) {
std::cerr << "Usage: client [address] [workload]" << std::endl; std::cerr << "Usage: client [address] [workload]" << std::endl;
......
#include "clockwork/controller/controller.h" #include "clockwork/controller/controller.h"
#include <sstream> #include <sstream>
#include "clockwork/thread.h"
using namespace clockwork; using namespace clockwork;
using namespace clockwork::controller; using namespace clockwork::controller;
...@@ -517,6 +518,7 @@ ControllerWithStartupPhase::ControllerWithStartupPhase( ...@@ -517,6 +518,7 @@ ControllerWithStartupPhase::ControllerWithStartupPhase(
scheduler(scheduler), scheduler(scheduler),
startup_thread(&ControllerWithStartupPhase::runStartup, this), startup_thread(&ControllerWithStartupPhase::runStartup, this),
request_telemetry(request_telemetry) { request_telemetry(request_telemetry) {
threading::initHighPriorityThread(startup_thread);
} }
void ControllerWithStartupPhase::runStartup() { void ControllerWithStartupPhase::runStartup() {
......
...@@ -174,19 +174,6 @@ public: ...@@ -174,19 +174,6 @@ public:
if (print_debug) std::cout << "Worker <-- " << infer->str() << std::endl; if (print_debug) std::cout << "Worker <-- " << infer->str() << std::endl;
} }
void inferTimeoutOnController(PendingInfer* pending) {
sendInferErrorToClient(clockworkTimeout, "", pending->request, pending->callback);
// Populate telemetry
pending->telemetry.result_received = util::now();
pending->telemetry.status = clockworkError; // TODO: use clockworkTimeout instead
pending->telemetry.worker_duration = 0;
printer->log(pending->telemetry);
delete pending;
}
void check_gpu_queue(GPU* gpu) { void check_gpu_queue(GPU* gpu) {
while (gpu->outstanding < max_outstanding && gpu->queue.size() > 0) { while (gpu->outstanding < max_outstanding && gpu->queue.size() > 0) {
PendingInfer* next = gpu->queue.front(); PendingInfer* next = gpu->queue.front();
...@@ -195,7 +182,12 @@ public: ...@@ -195,7 +182,12 @@ public:
gpu->outstanding++; gpu->outstanding++;
sendInferActionToWorker(next); sendInferActionToWorker(next);
} else { } else {
inferTimeoutOnController(next); sendInferErrorToClient(
clockworkTimeout, "",
next->request,
next->callback
);
delete next;
} }
} }
} }
......
...@@ -49,7 +49,7 @@ class StressTestController : public Controller { ...@@ -49,7 +49,7 @@ class StressTestController : public Controller {
public: public:
bool stress_infer = true; bool stress_infer = true;
bool stress_loadweights = true; bool stress_loadweights = true;
bool send_inputs = true; bool send_inputs = false;
std::string model_path = "/home/jcmace/clockwork-modelzoo-volta/resnet50_v2/model"; std::string model_path = "/home/jcmace/clockwork-modelzoo-volta/resnet50_v2/model";
unsigned duplicates = 40; unsigned duplicates = 40;
...@@ -82,6 +82,7 @@ public: ...@@ -82,6 +82,7 @@ public:
pending_workers(workers.size()), pending_workers(workers.size()),
results(), results(),
printer(&StressTestController::printerThread, this) { printer(&StressTestController::printerThread, this) {
threading::initLoggerThread(printer);
input = static_cast<char*>(malloc(input_size)); input = static_cast<char*>(malloc(input_size));
init(); init();
......
...@@ -80,7 +80,9 @@ void Connection::ls(LSRequest &request, std::function<void(LSResponse&)> callbac ...@@ -80,7 +80,9 @@ void Connection::ls(LSRequest &request, std::function<void(LSResponse&)> callbac
send_request(*rpc); send_request(*rpc);
} }
ConnectionManager::ConnectionManager() : alive(true), network_thread(&ConnectionManager::run, this) {} ConnectionManager::ConnectionManager() : alive(true), network_thread(&ConnectionManager::run, this) {
threading::initNetworkThread(network_thread);
}
void ConnectionManager::run() { void ConnectionManager::run() {
while (alive) { while (alive) {
......
#include "clockwork/network/controller.h" #include "clockwork/network/controller.h"
#include "clockwork/thread.h"
namespace clockwork { namespace clockwork {
namespace network { namespace network {
...@@ -161,7 +162,9 @@ void WorkerConnection::sendAction(std::shared_ptr<workerapi::Action> action) { ...@@ -161,7 +162,9 @@ void WorkerConnection::sendAction(std::shared_ptr<workerapi::Action> action) {
} }
} }
WorkerManager::WorkerManager() : alive(true), network_thread(&WorkerManager::run, this) {} WorkerManager::WorkerManager() : alive(true), network_thread(&WorkerManager::run, this) {
threading::initNetworkThread(network_thread);
}
void WorkerManager::run() { void WorkerManager::run() {
while (alive) { while (alive) {
...@@ -343,6 +346,7 @@ Server::Server(clientapi::ClientAPI* api, int port) : ...@@ -343,6 +346,7 @@ Server::Server(clientapi::ClientAPI* api, int port) :
io_service(), io_service(),
alive(true), alive(true),
network_thread(&Server::run, this, port) { network_thread(&Server::run, this, port) {
threading::initNetworkThread(network_thread);
} }
void Server::shutdown(bool awaitShutdown) { void Server::shutdown(bool awaitShutdown) {
......
#include "clockwork/network/worker.h" #include "clockwork/network/worker.h"
#include "clockwork/util.h" #include "clockwork/util.h"
#include <sstream> #include <sstream>
#include "clockwork/thread.h"
namespace clockwork { namespace clockwork {
namespace network { namespace network {
...@@ -90,7 +91,6 @@ public: ...@@ -90,7 +91,6 @@ public:
}; };
void Connection::print() { void Connection::print() {
util::unsetCurrentThreadMaxPriority();
uint64_t print_every = 10000000000UL; // 10s uint64_t print_every = 10000000000UL; // 10s
uint64_t last_print = util::now(); uint64_t last_print = util::now();
...@@ -273,6 +273,7 @@ void Connection::sendResult(std::shared_ptr<workerapi::Result> result) { ...@@ -273,6 +273,7 @@ void Connection::sendResult(std::shared_ptr<workerapi::Result> result) {
void Connection::ready() { void Connection::ready() {
this->printer = std::thread(&Connection::print, this); this->printer = std::thread(&Connection::print, this);
threading::initLoggerThread(this->printer);
} }
void Connection::closed() { void Connection::closed() {
...@@ -285,6 +286,7 @@ Server::Server(ClockworkWorker* worker, int port) : ...@@ -285,6 +286,7 @@ Server::Server(ClockworkWorker* worker, int port) :
worker(worker), worker(worker),
io_service(), io_service(),
network_thread(&Server::run, this, port) { network_thread(&Server::run, this, port) {
threading::initNetworkThread(network_thread);
} }
Server::~Server() {} Server::~Server() {}
...@@ -302,7 +304,6 @@ void Server::join() { ...@@ -302,7 +304,6 @@ void Server::join() {
} }
void Server::run(int port) { void Server::run(int port) {
util::setCurrentThreadMaxPriority();
try { try {
auto endpoint = tcp::endpoint(tcp::v4(), port); auto endpoint = tcp::endpoint(tcp::v4(), port);
is_started.store(true); is_started.store(true);
......
#include "clockwork/api/worker_api.h" #include "clockwork/api/worker_api.h"
#include "clockwork/runtime.h" #include "clockwork/runtime.h"
#include "clockwork/action.h" #include "clockwork/action.h"
#include "clockwork/thread.h"
namespace clockwork { namespace clockwork {
...@@ -21,23 +22,15 @@ void BaseExecutor::join() { ...@@ -21,23 +22,15 @@ void BaseExecutor::join() {
} }
} }
CPUExecutor::CPUExecutor(TaskType type, std::vector<unsigned> cores) : BaseExecutor(type) { CPUExecutor::CPUExecutor(TaskType type) : BaseExecutor(type) {
for (unsigned i = 0; i < cores.size(); i++) { threads.push_back(std::thread(&CPUExecutor::executorMain, this, 0));
threads.push_back(std::thread(&CPUExecutor::executorMain, this, i, cores[i])); for (auto &thread : threads) threading::initGPUThread(0, thread);
}
} }
void CPUExecutor::executorMain(unsigned executor_id, unsigned core) { void CPUExecutor::executorMain(unsigned executor_id) {
std::cout << TaskTypeName(type) << "-" << executor_id << " binding to core " << core << std::endl; std::cout << TaskTypeName(type) << "-" << executor_id << " started" << std::endl;
// util::set_core(core);
// util::setCurrentThreadMaxPriority();
while (alive.load()) { while (alive.load()) {
// TODO: possibility off too many outstanding asyc tasks
// TODO: queue should spin-wait rather than blocking
// TODO: shutdown queue or use try_dequeue
// Currently, CPUExecutor is only used for LoadModelTask // Currently, CPUExecutor is only used for LoadModelTask
LoadModelFromDiskTask* next = dynamic_cast<LoadModelFromDiskTask*>(queue.dequeue()); LoadModelFromDiskTask* next = dynamic_cast<LoadModelFromDiskTask*>(queue.dequeue());
...@@ -55,91 +48,29 @@ void CPUExecutor::executorMain(unsigned executor_id, unsigned core) { ...@@ -55,91 +48,29 @@ void CPUExecutor::executorMain(unsigned executor_id, unsigned core) {
} }
} }
GPUExecutorShared::GPUExecutorShared(TaskType type, std::vector<unsigned> cores, unsigned num_gpus): GPUExecutorExclusive::GPUExecutorExclusive(TaskType type, unsigned gpu_id):
BaseExecutor(type), num_gpus(num_gpus) { BaseExecutor(type), gpu_id(gpu_id) {
for (unsigned i = 0; i < cores.size(); i++) { threads.push_back(std::thread(&GPUExecutorExclusive::executorMain, this, 0));
threads.push_back(std::thread(&GPUExecutorShared::executorMain, this, i, cores[i])); for (auto &thread : threads) threading::initGPUThread(gpu_id, thread);
}
} }
void GPUExecutorShared::executorMain(unsigned executor_id, unsigned core) { void GPUExecutorExclusive::executorMain(unsigned executor_id) {
std::cout << "GPU" << gpu_id << "-" << TaskTypeName(type) << "-" << executor_id << " started" << std::endl;
int priority = 0; int priority = 0;
if (type==TaskType::PCIe_H2D_Inputs || type==TaskType::PCIe_D2H_Output) { if (type==TaskType::PCIe_H2D_Inputs || type==TaskType::PCIe_D2H_Output) {
priority = -1; priority = -1;
} }
std::cout << TaskTypeName(type) << "-" << executor_id << " binding to core " << core << " with GPU priority " << priority << std::endl;
// util::set_core(core);
util::setCurrentThreadMaxPriority();
std::vector<cudaStream_t> streams;
for (unsigned gpu_id = 0; gpu_id < num_gpus; gpu_id++) {
cudaStream_t stream;
CUDA_CALL(cudaSetDevice(gpu_id));
CUDA_CALL(cudaStreamCreateWithPriority(&stream, cudaStreamNonBlocking, priority));
streams.push_back(stream);
}
int prev_gpu_id = -1;
while (alive.load()) {
// TODO: possibility off too many outstanding asyc tasks
// TODO: queue should spin-wait rather than blocking
// TODO: shutdown queue or use try_dequeue
Task* next = queue.dequeue();
if (next != nullptr) {
// For tasks of type PCIe_H2D_Weights, we do not want two streams
// to transfer weights in parallel; therefore, whenever there is a
// a change in the stream, we synchronize host until the previous
// stream has been entirely flushed out
if (type == PCIe_H2D_Weights and prev_gpu_id != -1 and prev_gpu_id != next->gpu_id) {
CUDA_CALL(cudaSetDevice(next->gpu_id));
CUDA_CALL(cudaStreamSynchronize(streams[prev_gpu_id]));
prev_gpu_id = next->gpu_id;
}
auto telemetry = next->telemetry;
telemetry->dequeued = util::hrt();
next->run(streams[next->gpu_id]);
telemetry->exec_complete = util::hrt();
}
}
std::vector<Task*> tasks = queue.drain();
for (Task* task : tasks) {
task->cancel();
}
}
GPUExecutorExclusive::GPUExecutorExclusive(TaskType type, std::vector<unsigned> cores, unsigned gpu_id, int priority):
BaseExecutor(type), gpu_id(gpu_id), priority(priority) {
for (unsigned i = 0; i < cores.size(); i++) {
threads.push_back(std::thread(&GPUExecutorExclusive::executorMain, this, i, cores[i]));
}
}
void GPUExecutorExclusive::executorMain(unsigned executor_id, unsigned core) {
std::cout << TaskTypeName(type) << "-" << executor_id << "(GPU " << gpu_id << ") binding to core " << core << std::endl;
// util::set_core(core);
util::setCurrentThreadMaxPriority();
util::initializeCudaStream(gpu_id, priority); util::initializeCudaStream(gpu_id, priority);
cudaStream_t stream = util::Stream(); cudaStream_t stream = util::Stream();
while (alive.load()) { while (alive.load()) {
// TODO: possibility off too many outstanding asyc tasks
// TODO: queue should spin-wait rather than blocking
// TODO: shutdown queue or use try_dequeue
Task* next = queue.dequeue(); Task* next = queue.dequeue();
if (next != nullptr) { if (next != nullptr) {
// util::setCurrentThreadMaxPriority();
auto telemetry = next->telemetry; auto telemetry = next->telemetry;
telemetry->dequeued = util::hrt(); telemetry->dequeued = util::hrt();
...@@ -155,10 +86,9 @@ void GPUExecutorExclusive::executorMain(unsigned executor_id, unsigned core) { ...@@ -155,10 +86,9 @@ void GPUExecutorExclusive::executorMain(unsigned executor_id, unsigned core) {
} }
AsyncTaskChecker::AsyncTaskChecker(std::vector<unsigned> cores) : alive(true) { AsyncTaskChecker::AsyncTaskChecker() : alive(true) {
for (unsigned i = 0; i < cores.size(); i++) { threads.push_back(std::thread(&AsyncTaskChecker::executorMain, this, 0));
threads.push_back(std::thread(&AsyncTaskChecker::executorMain, this, i, cores[i])); for (auto &thread : threads) threading::initGPUThread(1, thread);
}
} }
void AsyncTaskChecker::enqueue(AsyncTask* task) { void AsyncTaskChecker::enqueue(AsyncTask* task) {
...@@ -175,11 +105,8 @@ void AsyncTaskChecker::join() { ...@@ -175,11 +105,8 @@ void AsyncTaskChecker::join() {
} }
} }
void AsyncTaskChecker::executorMain(unsigned executor_id, unsigned core) { void AsyncTaskChecker::executorMain(unsigned executor_id) {
std::cout << "Checker-" << executor_id << " binding to core " << core << std::endl; std::cout << "AsyncTaskChecker-" << executor_id << " started" << std::endl;
// util::set_core(core);
util::setCurrentThreadMaxPriority();
//util::initializeCudaStream(GPU_ID_0); // TODO Is this call necessary?
std::vector<AsyncTask*> pending_tasks; std::vector<AsyncTask*> pending_tasks;
while (alive.load() || pending_tasks.size() > 0) { while (alive.load() || pending_tasks.size() > 0) {
......
...@@ -42,35 +42,24 @@ public: ...@@ -42,35 +42,24 @@ public:
void shutdown(); void shutdown();
void join(); void join();
virtual void executorMain(unsigned executor_id, unsigned core) = 0; virtual void executorMain(unsigned executor_id) = 0;
}; };
class CPUExecutor : public BaseExecutor { class CPUExecutor : public BaseExecutor {
public: public:
CPUExecutor(TaskType type, std::vector<unsigned> cores); CPUExecutor(TaskType type);
void executorMain(unsigned executor_id, unsigned core); void executorMain(unsigned executor_id);
};
class GPUExecutorShared : public BaseExecutor {
private:
unsigned num_gpus;
public:
GPUExecutorShared(TaskType type, std::vector<unsigned> cores, unsigned num_gpus);
void executorMain(unsigned executor_id, unsigned core);
}; };
class GPUExecutorExclusive : public BaseExecutor { class GPUExecutorExclusive : public BaseExecutor {
private: private:
unsigned gpu_id; unsigned gpu_id;
unsigned priority;
public: public:
GPUExecutorExclusive(TaskType type, std::vector<unsigned> cores, unsigned gpu_id, int priority = 0); GPUExecutorExclusive(TaskType type, unsigned gpu_id);
void executorMain(unsigned executor_id, unsigned core); void executorMain(unsigned executor_id);
}; };
class AsyncTaskChecker { class AsyncTaskChecker {
...@@ -81,12 +70,12 @@ private: ...@@ -81,12 +70,12 @@ private:
public: public:
AsyncTaskChecker(std::vector<unsigned> cores); AsyncTaskChecker();
void enqueue(AsyncTask* task); void enqueue(AsyncTask* task);
void shutdown(); void shutdown();
void join(); void join();
void executorMain(unsigned executor_id, unsigned core); void executorMain(unsigned executor_id);
}; };
...@@ -150,39 +139,6 @@ public: ...@@ -150,39 +139,6 @@ public:
protected: protected:
// Utility class for allocating cores
class CoreAllocator {
public:
std::vector<unsigned> usage_count;
CoreAllocator() {
usage_count.resize(util::get_num_cores(), 0);
}
int try_acquire(unsigned gpu_id) {
std::vector<unsigned> preferred = util::get_gpu_core_affinity(gpu_id);
for (unsigned i = preferred.size()-1; i >= 0; i--) {
unsigned core = preferred[i];
if (usage_count[core] == 0) {
usage_count[core]++;
return core;
}
}
for (unsigned core = 0; core < usage_count.size(); core++) {
if (usage_count[core] == 0) {
usage_count[core]++;
return core;
}
}
return -1;
}
unsigned acquire(unsigned gpu_id) {
int core = try_acquire(gpu_id);
CHECK(core >= 0) << "Unable to acquire core for GPU " << gpu_id << "; all cores exhausted";
return static_cast<unsigned>(core);
}
};
void initialize(ClockworkWorkerConfig &config) { void initialize(ClockworkWorkerConfig &config) {
...@@ -190,17 +146,15 @@ protected: ...@@ -190,17 +146,15 @@ protected:
manager = new MemoryManager(config); manager = new MemoryManager(config);
CoreAllocator cores;
for (unsigned gpu_id = 0; gpu_id < num_gpus; gpu_id++) { for (unsigned gpu_id = 0; gpu_id < num_gpus; gpu_id++) {
event_pools.push_back(new CudaEventPool(gpu_id)); event_pools.push_back(new CudaEventPool(gpu_id));
gpu_executors.push_back(new GPUExecutorExclusive(GPU, {cores.acquire(gpu_id)}, gpu_id)); // Type 3 gpu_executors.push_back(new GPUExecutorExclusive(GPU, gpu_id)); // Type 3
weights_executors.push_back(new GPUExecutorExclusive(PCIe_H2D_Weights, {cores.acquire(gpu_id)}, gpu_id)); // Type 1 weights_executors.push_back(new GPUExecutorExclusive(PCIe_H2D_Weights, gpu_id)); // Type 1
inputs_executors.push_back(new GPUExecutorExclusive(PCIe_H2D_Inputs, {cores.acquire(gpu_id)}, gpu_id, -1)); // Type 2 inputs_executors.push_back(new GPUExecutorExclusive(PCIe_H2D_Inputs, gpu_id)); // Type 2
outputs_executors.push_back(new GPUExecutorExclusive(PCIe_D2H_Output, {cores.acquire(gpu_id)}, gpu_id, -1)); // Type 4 outputs_executors.push_back(new GPUExecutorExclusive(PCIe_D2H_Output, gpu_id)); // Type 4
AsyncTaskChecker* c1 = new AsyncTaskChecker({cores.acquire(gpu_id)}); AsyncTaskChecker* c1 = new AsyncTaskChecker();
gpu_checkers.push_back(c1); gpu_checkers.push_back(c1);
weights_checkers.push_back(c1); weights_checkers.push_back(c1);
...@@ -210,7 +164,7 @@ protected: ...@@ -210,7 +164,7 @@ protected:
all_checkers.push_back(c1); all_checkers.push_back(c1);
} }
load_model_executor = new CPUExecutor(CPU, {cores.acquire(0)}); // Type 0 load_model_executor = new CPUExecutor(CPU); // Type 0
std::string task_file_path = config.telemetry_log_dir + "/" + config.task_telemetry_log_file; std::string task_file_path = config.telemetry_log_dir + "/" + config.task_telemetry_log_file;
std::string action_file_path = config.telemetry_log_dir + "/" + config.action_telemetry_log_file; std::string action_file_path = config.telemetry_log_dir + "/" + config.action_telemetry_log_file;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <pods/binary.h> #include <pods/binary.h>
#include <pods/buffers.h> #include <pods/buffers.h>
#include <pods/streams.h> #include <pods/streams.h>
#include "clockwork/thread.h"
namespace clockwork { namespace clockwork {
...@@ -42,6 +43,7 @@ private: ...@@ -42,6 +43,7 @@ private:
public: public:
ActionTelemetryFileLogger(std::string output_filename) : output_filename(output_filename), alive(true) { ActionTelemetryFileLogger(std::string output_filename) : output_filename(output_filename), alive(true) {
thread = std::thread(&ActionTelemetryFileLogger::main, this); thread = std::thread(&ActionTelemetryFileLogger::main, this);
threading::initLoggerThread(thread);
} }
void shutdown(bool awaitCompletion) { void shutdown(bool awaitCompletion) {
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <pods/streams.h> #include <pods/streams.h>
#include <tbb/concurrent_queue.h> #include <tbb/concurrent_queue.h>
#include <iomanip> #include <iomanip>
#include "clockwork/thread.h"
namespace clockwork { namespace clockwork {
...@@ -103,7 +104,10 @@ public: ...@@ -103,7 +104,10 @@ public:
std::atomic_int errors = 0; std::atomic_int errors = 0;
ClientTelemetrySummarizer(uint64_t print_interval = 10000000000UL) : ClientTelemetrySummarizer(uint64_t print_interval = 10000000000UL) :
thread(&ClientTelemetrySummarizer::run, this), print_interval(print_interval) {} thread(&ClientTelemetrySummarizer::run, this), print_interval(print_interval) {
threading::initLoggerThread(thread);
}
void run() { void run() {
uint64_t last_print = util::now(); uint64_t last_print = util::now();
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "clockwork/api/api_common.h" #include "clockwork/api/api_common.h"
#include <tuple> #include <tuple>
#include "clockwork/api/worker_api.h" #include "clockwork/api/worker_api.h"
#include "clockwork/thread.h"
namespace clockwork { namespace clockwork {
...@@ -70,6 +71,7 @@ public: ...@@ -70,6 +71,7 @@ public:
void start() { void start() {
this->thread = std::thread(&AsyncControllerActionTelemetryLogger::run, this); this->thread = std::thread(&AsyncControllerActionTelemetryLogger::run, this);
threading::initLoggerThread(thread);
} }
void run() { void run() {
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <tbb/concurrent_queue.h> #include <tbb/concurrent_queue.h>
#include <iomanip> #include <iomanip>
#include "clockwork/api/api_common.h" #include "clockwork/api/api_common.h"
#include "clockwork/thread.h"
namespace clockwork { namespace clockwork {
...@@ -61,6 +62,7 @@ public: ...@@ -61,6 +62,7 @@ public:
void start() { void start() {
thread = std::thread(&AsyncRequestTelemetryLogger::run, this); thread = std::thread(&AsyncRequestTelemetryLogger::run, this);
threading::initLoggerThread(thread);
} }
void run() { void run() {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <pods/binary.h> #include <pods/binary.h>
#include <pods/buffers.h> #include <pods/buffers.h>
#include <pods/streams.h> #include <pods/streams.h>
#include "clockwork/thread.h"
namespace clockwork { namespace clockwork {
...@@ -43,6 +44,7 @@ private: ...@@ -43,6 +44,7 @@ private:
public: public:
TaskTelemetryFileLogger(std::string output_filename) : output_filename(output_filename), alive(true) { TaskTelemetryFileLogger(std::string output_filename) : output_filename(output_filename), alive(true) {
thread = std::thread(&TaskTelemetryFileLogger::main, this); thread = std::thread(&TaskTelemetryFileLogger::main, this);
threading::initLoggerThread(thread);
} }
void shutdown(bool awaitCompletion) { void shutdown(bool awaitCompletion) {
......
#include "clockwork/thread.h"
#include <cuda_runtime.h>
#include <nvml.h>
#include <thread>
#include <algorithm>
#include <sstream>
#include <dmlc/logging.h>
#include "clockwork/cuda_common.h"
#include "clockwork/util.h"
namespace clockwork {
namespace threading {
// The priority scheduler in use. SCHED_FIFO or SCHED_RR
int scheduler = SCHED_FIFO;
/*
Globally manages assignment of threads to cores, since RT priority is fragile
*/
class CoreManager {
public:
const int init_pool_size = 2;
const int default_pool_size = 2;
std::vector<bool> in_use;
std::vector<std::vector<unsigned>> gpu_affinity;
std::vector<unsigned> init_pool;
std::vector<unsigned> default_pool;
CoreManager() : in_use(coreCount(), false) {
in_use[0] = true; // Don't use core 0
in_use[1] = true; // Don't use core 1
unsigned gpu_count = util::get_num_gpus();
for (unsigned i = 0; i < gpu_count; i++) {
gpu_affinity.push_back(getGPUCoreAffinity(i));
}
if (gpu_count == 0) gpu_count = 1;
for (unsigned i = 0; i < init_pool_size; i++) {
init_pool.push_back(alloc(i % gpu_count));
}
for (unsigned i = 0; i < default_pool_size; i++) {
default_pool.push_back(alloc(i % gpu_count));
}
}
unsigned alloc(unsigned gpu_id) {
if (gpu_id < gpu_affinity.size()) {
for (unsigned i = 0; i < gpu_affinity[gpu_id].size(); i++) {
unsigned core = gpu_affinity[gpu_id][i];
if (!in_use[core]) {
in_use[core] = true;
return core;
}
}
}
// Couldn't get a core with GPU affinity; get a different core
for (unsigned i = 0; i < in_use.size(); i++) {
if (!in_use[i]) {
in_use[i] = true;
return i;
}
}
CHECK(false) << "All cores exhausted for GPU " << gpu_id;
return 0;
}
std::vector<unsigned> alloc(unsigned count, unsigned gpu_id) {
std::cout << "Alloc " << count << " on " << gpu_id << " " << str() << std::endl;
std::vector<unsigned> result;
for (unsigned i = 0; i < count; i++) {
result.push_back(alloc(gpu_id));
}
return result;
}
std::string str() {
unsigned allocated = 0;
for (unsigned i = 0; i < in_use.size(); i++) {
allocated += in_use[i];
}
std::stringstream ss;
ss << (in_use.size() - allocated) << "/" << in_use.size() << " cores free";
return ss.str();
}
};
bool init = false;
CoreManager manager;
// Initializes a clockwork process
void initProcess() {
// Bind to the init pool and set priority to max
setCores(manager.init_pool, pthread_self());
setPriority(scheduler, maxPriority(scheduler), pthread_self());
init = true;
}
void initHighPriorityThread(int num_cores, int gpu_affinity, std::thread &thread) {
if (init) {
auto cores = manager.alloc(num_cores, gpu_affinity);
setCores(cores, thread.native_handle());
setPriority(scheduler, maxPriority(scheduler), thread.native_handle());
} else {
std::cout << "Warning: trying to initialize high priority thread without threading initialized" << std::endl;
}
}
void initHighPriorityThread(std::thread &thread) {
initHighPriorityThread(1, 1, thread);
}
void initLowPriorityThread(std::thread &thread) {
setCores(manager.default_pool, thread.native_handle());
setDefaultPriority(thread.native_handle());
}
void initNetworkThread(std::thread &thread) {
initHighPriorityThread(1, 0, thread);
}
void initLoggerThread(std::thread &thread) {
initLowPriorityThread(thread);
}
void initGPUThread(int gpu_id, std::thread &thread) {
initHighPriorityThread(2, 0, thread);
}
unsigned coreCount() {
return std::thread::hardware_concurrency();
}
void setCore(unsigned core) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core, &cpuset);
int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
CHECK(rc == 0) << "Unable to set thread affinity: " << rc;
}
void setCores(std::vector<unsigned> cores, pthread_t thread) {
CHECK(cores.size() > 0) << "Trying to bind to empty core set";
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (unsigned core : cores) {
CPU_SET(core, &cpuset);
}
int rc = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
CHECK(rc == 0) << "Unable to set thread affinity: " << rc;
}
void setAllCores() {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (unsigned i = 0; i < coreCount(); i++) {
CPU_SET(i, &cpuset);
}
int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
CHECK(rc == 0) << "Unable to set thread affinity: " << rc;
}
void addCore(unsigned core) {
auto cores = currentCores();
cores.push_back(core);
setCores(cores, pthread_self());
}
void removeCore(unsigned core) {
auto cores = currentCores();
auto it = std::remove(cores.begin(), cores.end(), core);
cores.erase(it, cores.end());
setCores(cores, pthread_self());
}
std::vector<unsigned> currentCores() {
cpu_set_t cpuset;
int rc = pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
CHECK(rc == 0) << "Unable to get thread affinity: " << rc;
std::vector<unsigned> cores;
for (unsigned i = 0; i < coreCount(); i++) {
if (CPU_ISSET(i, &cpuset) > 0) {
cores.push_back(i);
}
}
return cores;
}
std::vector<unsigned> getGPUCoreAffinity(unsigned gpu_id) {
unsigned len = (coreCount() + 63) / 64;
std::vector<uint64_t> bitmaps(len);
nvmlReturn_t status;
status = nvmlInit();
CHECK(status == NVML_SUCCESS);
nvmlDevice_t device;
status = nvmlDeviceGetHandleByIndex(gpu_id, &device);
CHECK(status == NVML_SUCCESS);
// Fill bitmaps with the ideal CPU affinity for the device
// (see https://helpmanual.io/man3/nvmlDeviceGetCpuAffinity/)
status = nvmlDeviceGetCpuAffinity(device, bitmaps.size(), bitmaps.data());
CHECK(status == NVML_SUCCESS);
std::vector<unsigned> cores;
unsigned core = 0;
for (unsigned i = 0; i < bitmaps.size(); i++) {
for (unsigned j = 0; j < 64; j++) {
if (((bitmaps[i] >> j) & 0x01) == 0x01) {
cores.push_back(core);
}
core++;
}
}
status = nvmlShutdown();
CHECK(status == NVML_SUCCESS);
return cores;
}
int minPriority(int scheduler) {
return sched_get_priority_min(scheduler);
}
int maxPriority(int scheduler) {
return sched_get_priority_max(scheduler);
}
void setDefaultPriority() {
setDefaultPriority(pthread_self());
}
void setDefaultPriority(pthread_t thread) {
setPriority(SCHED_OTHER, 0, thread);
}
void setMaxPriority() {
setPriority(SCHED_FIFO, maxPriority(SCHED_FIFO), pthread_self());
}
void setPriority(int scheduler, int priority, pthread_t thId) {
struct sched_param params;
params.sched_priority = sched_get_priority_max(scheduler);
int ret = pthread_setschedparam(thId, scheduler, &params);
CHECK(ret == 0) << "Unable to set thread priority. Don't forget to set `rtprio` to unlimited in `limits.conf`. See Clockwork README for instructions";
int policy = 0;
ret = pthread_getschedparam(thId, &policy, &params);
CHECK(ret == 0) << "Unable to verify thread scheduler params";
CHECK(policy == scheduler) << "Unable to verify thread scheduler params";
}
int currentScheduler() {
pthread_t thId = pthread_self();
struct sched_param params;
int policy = 0;
int ret = pthread_getschedparam(thId, &policy, &params);
CHECK(ret == 0) << "Unable to get current thread scheduler params";
return policy;
}
int currentPriority() {
pthread_t thId = pthread_self();
struct sched_param params;
int policy = 0;
int ret = pthread_getschedparam(thId, &policy, &params);
CHECK(ret == 0) << "Unable to get current thread scheduler params";
return params.sched_priority;
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment