diff --git a/source/ps/UserReport.cpp b/source/ps/UserReport.cpp index a36cdd68e1..9a429dcf78 100644 --- a/source/ps/UserReport.cpp +++ b/source/ps/UserReport.cpp @@ -36,10 +36,11 @@ #include "ps/ConfigDB.h" #include "ps/Profiler2.h" #include "ps/Pyrogenesis.h" +#include "ps/TaskManager.h" +#include "ps/Future.h" #include "ps/Threading.h" #include -#include #include #include #include @@ -49,6 +50,7 @@ #include #include #include +#include #define DEBUG_UPLOADS 0 @@ -104,7 +106,7 @@ class CUserReporterWorker public: CUserReporterWorker(const std::string& userID, const std::string& url) : m_URL(url), m_UserID(userID), m_Enabled(false), m_Shutdown(false), m_Status("disabled"), - m_PauseUntilTime(timer_Time()), m_LastUpdateTime(timer_Time()) + m_PauseUntilTime(timer_Time()), m_LastUpdateTime(timer_Time()), m_ProxyInitialised(false) { // Set up libcurl: @@ -149,8 +151,6 @@ public: // Disable the Accept header because it's a waste of a dozen bytes m_Headers = curl_slist_append(m_Headers, "Accept: "); curl_easy_setopt(m_Curl, CURLOPT_HTTPHEADER, m_Headers); - - m_WorkerThread = std::thread(Threading::HandleExceptions::Wrapper, this); } ~CUserReporterWorker() @@ -164,14 +164,12 @@ public: */ void SetEnabled(bool enabled) { - std::lock_guard lock(m_WorkerMutex); - if (enabled != m_Enabled) { + std::lock_guard lock(m_WorkerMutex); m_Enabled = enabled; - - // Wake up the worker thread - m_WorkerCV.notify_all(); } + + TryStartTask(); } /** @@ -187,14 +185,7 @@ public: std::lock_guard lock(m_WorkerMutex); m_Shutdown = true; } - - // Wake up the worker thread - m_WorkerCV.notify_all(); - - // Wait for it to shut down cleanly - // TODO: should have a timeout in case of network hangs - m_WorkerThread.join(); - + CleanupFuture(); return true; } @@ -217,8 +208,7 @@ public: m_ReportQueue.push_back(report); } - // Wake up the worker thread - m_WorkerCV.notify_all(); + TryStartTask(); } /** @@ -230,91 +220,73 @@ public: double now = timer_Time(); if (now > m_LastUpdateTime + TIMER_CHECK_INTERVAL) { - // Wake up the worker thread - m_WorkerCV.notify_all(); - m_LastUpdateTime = now; + TryStartTask(); } } private: - static void RunThread(CUserReporterWorker* data) + void CleanupFuture() { - debug_SetThreadName("CUserReportWorker"); - g_Profiler2.RegisterCurrentThread("userreport"); - - data->Run(); + if (m_Future.Valid()) + m_Future.Get(); } - void Run() + void TryStartTask() { - // Set libcurl's proxy configuration - // (This has to be done in the thread because it's potentially very slow) + if (m_Future.Valid() && !m_Future.IsDone()) + return; + if (!m_Enabled) + return; + if (m_Shutdown) + return; + if (timer_Time() < m_PauseUntilTime) + return; + + CleanupFuture(); + + // Start or replace the future + m_Future = {g_TaskManager, std::bind_front(&CUserReporterWorker::RunTask, this)}; + } + + void RunTask() + { + PROFILE2("CUserReporterWorker::RunTask"); + + InitializeProxyIfNeeded(); + + while (ShouldRun()) + { + if (!ProcessReport()) + break; + } + } + + +private: + + void InitializeProxyIfNeeded() + { + if (m_ProxyInitialised) + return; + + PROFILE2("CUserReporterWorker::InitializeProxyIfNeeded"); + SetStatus("proxy"); std::wstring proxy; - { - PROFILE2("get proxy config"); if (sys_get_proxy_config(wstring_from_utf8(m_URL), proxy) == INFO::OK) curl_easy_setopt(m_Curl, CURLOPT_PROXY, utf8_from_wstring(proxy).c_str()); } + m_ProxyInitialised = true; SetStatus("waiting"); - - /* - * We use a condition_variable to let the thread be woken up when it has - * work to do. Various actions from the main thread can wake it: - * * SetEnabled() - * * Shutdown() - * * Submit() - * * Retransmission timeouts, once every several seconds - * - * If multiple actions have triggered wakeups, we might respond to - * all of those actions after the first wakeup, which is okay (we'll do - * nothing during the subsequent wakeups). We should never hang due to - * processing fewer actions than wakeups. - * - * Retransmission timeouts are triggered via the main thread. - */ - - // Wait until the main thread wakes us up - while (true) - { - g_Profiler2.RecordRegionEnter("condition_variable wait"); - - std::unique_lock lock(m_WorkerMutex); - m_WorkerCV.wait(lock); - lock.unlock(); - - g_Profiler2.RecordRegionLeave(); - - // Handle shutdown requests as soon as possible - if (GetShutdown()) - return; - - // If we're not enabled, ignore this wakeup - if (!GetEnabled()) - continue; - - // If we're still pausing due to a failed connection, - // go back to sleep again - if (timer_Time() < m_PauseUntilTime) - continue; - - // We're enabled, so process as many reports as possible - while (ProcessReport()) - { - // Handle shutdowns while we were sending the report - if (GetShutdown()) - return; - } - } } - bool GetEnabled() + bool ShouldRun() { std::lock_guard lock(m_WorkerMutex); - return m_Enabled; + return m_Enabled && !m_Shutdown; } bool GetShutdown() @@ -334,7 +306,7 @@ private: bool ProcessReport() { - PROFILE2("process report"); + PROFILE2("CUserReporterWorker::ProcessReport"); std::shared_ptr report; @@ -482,10 +454,10 @@ private: } private: - // Thread-related members: - std::thread m_WorkerThread; + // Concurrency members: std::mutex m_WorkerMutex; - std::condition_variable m_WorkerCV; + Future m_Future; + bool m_ProxyInitialised; // Shared by main thread and worker thread: // These variables are all protected by m_WorkerMutex