Use Future for the userreporter

Previously we relied on a raw thread.
Pull Request: #8537
Reviewed by: @phosit
Refs: #5874
This commit is contained in:
Stan 2025-11-14 22:17:59 +01:00
parent 5609492731
commit 84d4dbc506
No known key found for this signature in database
GPG key ID: 244943DFF8370D60

View file

@ -36,10 +36,11 @@
#include "ps/ConfigDB.h"
#include "ps/Profiler2.h"
#include "ps/Pyrogenesis.h"
#include "ps/TaskManager.h"
#include "ps/Future.h"
#include "ps/Threading.h"
#include <algorithm>
#include <condition_variable>
#include <cstring>
#include <ctime>
#include <deque>
@ -49,6 +50,7 @@
#include <string>
#include <thread>
#include <utility>
#include <functional>
#define DEBUG_UPLOADS 0
@ -104,7 +106,7 @@ class CUserReporterWorker
public:
CUserReporterWorker(const std::string& userID, const std::string& url) :
m_URL(url), m_UserID(userID), m_Enabled(false), m_Shutdown(false), m_Status("disabled"),
m_PauseUntilTime(timer_Time()), m_LastUpdateTime(timer_Time())
m_PauseUntilTime(timer_Time()), m_LastUpdateTime(timer_Time()), m_ProxyInitialised(false)
{
// Set up libcurl:
@ -149,8 +151,6 @@ public:
// Disable the Accept header because it's a waste of a dozen bytes
m_Headers = curl_slist_append(m_Headers, "Accept: ");
curl_easy_setopt(m_Curl, CURLOPT_HTTPHEADER, m_Headers);
m_WorkerThread = std::thread(Threading::HandleExceptions<RunThread>::Wrapper, this);
}
~CUserReporterWorker()
@ -164,14 +164,12 @@ public:
*/
void SetEnabled(bool enabled)
{
std::lock_guard<std::mutex> lock(m_WorkerMutex);
if (enabled != m_Enabled)
{
std::lock_guard<std::mutex> lock(m_WorkerMutex);
m_Enabled = enabled;
// Wake up the worker thread
m_WorkerCV.notify_all();
}
TryStartTask();
}
/**
@ -187,14 +185,7 @@ public:
std::lock_guard<std::mutex> lock(m_WorkerMutex);
m_Shutdown = true;
}
// Wake up the worker thread
m_WorkerCV.notify_all();
// Wait for it to shut down cleanly
// TODO: should have a timeout in case of network hangs
m_WorkerThread.join();
CleanupFuture();
return true;
}
@ -217,8 +208,7 @@ public:
m_ReportQueue.push_back(report);
}
// Wake up the worker thread
m_WorkerCV.notify_all();
TryStartTask();
}
/**
@ -230,91 +220,73 @@ public:
double now = timer_Time();
if (now > m_LastUpdateTime + TIMER_CHECK_INTERVAL)
{
// Wake up the worker thread
m_WorkerCV.notify_all();
m_LastUpdateTime = now;
TryStartTask();
}
}
private:
static void RunThread(CUserReporterWorker* data)
void CleanupFuture()
{
debug_SetThreadName("CUserReportWorker");
g_Profiler2.RegisterCurrentThread("userreport");
data->Run();
if (m_Future.Valid())
m_Future.Get();
}
void Run()
void TryStartTask()
{
// Set libcurl's proxy configuration
// (This has to be done in the thread because it's potentially very slow)
if (m_Future.Valid() && !m_Future.IsDone())
return;
if (!m_Enabled)
return;
if (m_Shutdown)
return;
if (timer_Time() < m_PauseUntilTime)
return;
CleanupFuture();
// Start or replace the future
m_Future = {g_TaskManager, std::bind_front(&CUserReporterWorker::RunTask, this)};
}
void RunTask()
{
PROFILE2("CUserReporterWorker::RunTask");
InitializeProxyIfNeeded();
while (ShouldRun())
{
if (!ProcessReport())
break;
}
}
private:
void InitializeProxyIfNeeded()
{
if (m_ProxyInitialised)
return;
PROFILE2("CUserReporterWorker::InitializeProxyIfNeeded");
SetStatus("proxy");
std::wstring proxy;
{
PROFILE2("get proxy config");
if (sys_get_proxy_config(wstring_from_utf8(m_URL), proxy) == INFO::OK)
curl_easy_setopt(m_Curl, CURLOPT_PROXY, utf8_from_wstring(proxy).c_str());
}
m_ProxyInitialised = true;
SetStatus("waiting");
/*
* We use a condition_variable to let the thread be woken up when it has
* work to do. Various actions from the main thread can wake it:
* * SetEnabled()
* * Shutdown()
* * Submit()
* * Retransmission timeouts, once every several seconds
*
* If multiple actions have triggered wakeups, we might respond to
* all of those actions after the first wakeup, which is okay (we'll do
* nothing during the subsequent wakeups). We should never hang due to
* processing fewer actions than wakeups.
*
* Retransmission timeouts are triggered via the main thread.
*/
// Wait until the main thread wakes us up
while (true)
{
g_Profiler2.RecordRegionEnter("condition_variable wait");
std::unique_lock<std::mutex> lock(m_WorkerMutex);
m_WorkerCV.wait(lock);
lock.unlock();
g_Profiler2.RecordRegionLeave();
// Handle shutdown requests as soon as possible
if (GetShutdown())
return;
// If we're not enabled, ignore this wakeup
if (!GetEnabled())
continue;
// If we're still pausing due to a failed connection,
// go back to sleep again
if (timer_Time() < m_PauseUntilTime)
continue;
// We're enabled, so process as many reports as possible
while (ProcessReport())
{
// Handle shutdowns while we were sending the report
if (GetShutdown())
return;
}
}
}
bool GetEnabled()
bool ShouldRun()
{
std::lock_guard<std::mutex> lock(m_WorkerMutex);
return m_Enabled;
return m_Enabled && !m_Shutdown;
}
bool GetShutdown()
@ -334,7 +306,7 @@ private:
bool ProcessReport()
{
PROFILE2("process report");
PROFILE2("CUserReporterWorker::ProcessReport");
std::shared_ptr<CUserReport> report;
@ -482,10 +454,10 @@ private:
}
private:
// Thread-related members:
std::thread m_WorkerThread;
// Concurrency members:
std::mutex m_WorkerMutex;
std::condition_variable m_WorkerCV;
Future<void> m_Future;
bool m_ProxyInitialised;
// Shared by main thread and worker thread:
// These variables are all protected by m_WorkerMutex