Fix a race in ResourceLoader::load_threaded_request()

When multiple threads try to load the same sub-resource the
ResourceLoader's anti-deadlock protection thinks that the other threads
loading the dependency are old threads blocked on a dependency.

The current thread will then immediately start to load the sub-resource
since they can't wait for it. This can then lead to multiple threads
loading the same resource concurrently.

We fix the problem by recording in the task when loading has ACTUALLY
started, and if so we simply wait.

This is no worse than before, at the point where we are waiting now we
previously re-started the load entirely.

Tested by running the MRP from the below issue (with and without asan
and msan), and loading and running various public and private Godot
projects.

This also fixes a use-after-free on resource_changed_connections when
multiple threads end up running the same import. This is a pre-existing
bug but this code widened the race window.

This also fixes a pre-existing memory leak in load_threaded_request,
when the user calls the function we capture the Ref<LoadToken> but
immediately drop the ref. Causing the refcount to go to 1. Later in
_run_load_task we unreference manually, bringing the refcount to 0. This
then later prevents the Ref<> from memdeleting the LoadToken.

This fixes #118085
This commit is contained in:
HP van Braam
2026-04-22 00:33:15 +02:00
parent 3e4f548068
commit f63ab5fbd9
2 changed files with 65 additions and 9 deletions
+59 -8
View File
@@ -337,12 +337,42 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin
void ResourceLoader::_run_load_task(void *p_userdata) {
ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata;
bool wait = false;
{
MutexLock thread_load_lock(thread_load_mutex);
if (cleaning_tasks) {
load_task.status = THREAD_LOAD_FAILED;
return;
}
int thread_index = WorkerThreadPool::get_singleton()->get_thread_index();
if (load_task.started_load && load_task.thread_index != thread_index) {
wait = true;
} else {
load_task.started_load = true;
load_task.thread_index = thread_index;
}
}
if (wait) {
// There are a couple of reasons why we got here:
// 1) We re-started the task in _load_complete_inner but we also
// got started via the original task in the WorkerThreadPool
// 2) There's a race between multiple threads in _load_complete_inner
// and more than one thread thought they had to restart
//
// This task was already running, wait for the other thread to complete.
ThreadLoadStatus status;
do {
OS::get_singleton()->delay_usec(1000);
thread_load_mutex.lock();
status = load_task.status;
thread_load_mutex.unlock();
} while (status == THREAD_LOAD_IN_PROGRESS);
load_task.load_token->unreference();
return;
}
ThreadLoadTask *curr_load_task_backup = curr_load_task;
@@ -487,6 +517,8 @@ String ResourceLoader::_validate_local_path(const String &p_path) {
Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, CacheMode p_cache_mode) {
Ref<ResourceLoader::LoadToken> token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode, true);
// We need to keep at least one reference to the token until it is done.
token->reference();
return token.is_valid() ? OK : FAILED;
}
@@ -843,16 +875,32 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
bool loader_is_wtp = load_task.task_id != 0;
if (loader_is_wtp) {
// Loading thread is in the worker pool.
p_thread_load_lock.temp_unlock();
int load_nesting_backup = load_nesting;
load_nesting = 0;
Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
int thread_index = WorkerThreadPool::get_singleton()->get_thread_index();
int task_thread_index = load_task.thread_index;
bool restart = false;
bool running = load_task.started_load;
p_thread_load_lock.temp_unlock();
if (running && task_thread_index != thread_index) {
Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
DEV_ASSERT(!wait_err || wait_err == ERR_BUSY);
if (wait_err == ERR_BUSY) {
restart = true;
}
} else {
restart = true;
}
DEV_ASSERT(load_nesting == 0);
load_nesting = load_nesting_backup;
DEV_ASSERT(!wait_err || wait_err == ERR_BUSY);
if (wait_err == ERR_BUSY) {
if (restart) {
// The WorkerThreadPool has reported that the current task wants to await on an older one.
// That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of
// resource loading that means that the task to wait for can be restarted here to break the
@@ -907,10 +955,13 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
if (resource.is_valid()) {
if (load_task_ptr->parent_task) {
// A task awaiting another => Let the awaiter accumulate the resource changed connections.
DEV_ASSERT(load_task_ptr->parent_task != load_task_ptr);
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
load_task_ptr->parent_task->resource_changed_connections.push_back(rcc);
if (!load_task_ptr->connections_propagated) {
// A task awaiting another => Let the awaiter accumulate the resource changed connections.
DEV_ASSERT(load_task_ptr->parent_task != load_task_ptr);
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
load_task_ptr->parent_task->resource_changed_connections.push_back(rcc);
}
load_task_ptr->connections_propagated = true;
}
} else {
p_thread_load_lock.temp_unlock();
+6 -1
View File
@@ -181,6 +181,7 @@ private:
struct ThreadLoadTask {
WorkerThreadPool::TaskID task_id = 0; // Used if run on a worker thread from the pool.
Thread::ID thread_id = 0; // Used if running on an user thread (e.g., simple non-threaded load).
int thread_index = -1;
ConditionVariable *cond_var = nullptr; // In not in the worker pool or already awaiting, this is used as a secondary awaiting mechanism.
uint32_t awaiters_count = 0;
LoadToken *load_token = nullptr;
@@ -200,6 +201,8 @@ private:
bool need_wait : 1;
bool in_progress_check : 1; // Measure against recursion cycles in progress reporting. Cycles are not expected, but can happen due to how it's currently implemented.
bool use_sub_threads : 1;
bool started_load : 1;
bool connections_propagated : 1;
struct ResourceChangedConnection {
Resource *source = nullptr;
@@ -212,7 +215,9 @@ private:
awaited(false),
need_wait(true),
in_progress_check(false),
use_sub_threads(false) {}
use_sub_threads(false),
started_load(false),
connections_propagated(false) {}
};
static void _run_load_task(void *p_userdata);