MDEV-39492 Parallel Query: Study how to create worker threads#5099
MDEV-39492 Parallel Query: Study how to create worker threads#5099mariadb-RexJohnston wants to merge 1 commit into
Conversation
Introduces parallel_worker_threads variable to control the number of worker threads created by a parallel execution query. 2 new files, sql_parallel_workers.h sql_parallel_workers.cc which contain structures for the creation, management and deletion of parallel worker threads (pwt_ in the name). Main management class created in the stack in JOIN::exec, implemented for the top level select. Current parallel_worker_thread_func sleeps for 10 seconds, generates a warning, signals the main thread, sleeps 10 seconds, signals the main thread again, sets it's finished flag and cleans it's THD. The main thread loops through worker threads, looking for finished thread and cleans them up if they have finished. It then waits for a signal, then processes it's message queue. The thread management data is allocated on the stack in JOIN::exec. Everything else is allocated using my_malloc() and my_free(). Threads are registed in server_threads, so are visible in information_schema.processlist and the show processlist command. We check that a kill query on a parallel worker is passed onto it's manager and the query is properly aborted, and that a kill connection is handled properly in parallel_worker.test.
There was a problem hiding this comment.
Code Review
This pull request implements a framework for parallel query execution, introducing worker thread management, background thread creation, and an error propagation system. It adds Performance Schema instrumentation and a new system variable, parallel_worker_threads, to control the number of workers. The code review identified several critical issues, including a race condition in worker status checks, missing synchronization when accessing the global thread list, and potential double-free and memory leak scenarios during error handling. Suggestions were also made to use monotonic time for timeouts and to adopt more specific naming for global macros.
| if (workers[i].finished) | ||
| continue; | ||
|
|
||
| if ((int)i != killed_from) | ||
| { | ||
| workers[i].thd->killed= thd->killed; |
There was a problem hiding this comment.
Critical race condition: The manager thread checks workers[i].finished without a lock. If it is false, it proceeds to access workers[i].thd at line 357. However, the worker thread can set finished = true and then clear thd = nullptr (at line 161) in the window between the manager's check and its access. This will cause a null pointer dereference and crash the server. Access to worker->thd and worker->finished must be synchronized using a mutex or atomic variables.
| server_threads.erase(worker->thd); | ||
| destroy_background_thd(worker->thd); |
| my_free(const_cast<char*>(workers[i].thd->db.str)); | ||
| server_threads.erase(workers[i].thd); | ||
| destroy_background_thd(workers[i].thd); |
There was a problem hiding this comment.
There is a potential double-free here. If destroy_background_thd eventually calls THD::free_connection(), it will attempt to free db.str. Since you are manually calling my_free on db.str at line 277 just before destroy_background_thd, this will result in a double-free error.
if (mysql_thread_create(key_thread_pwt, &workers[i].pthread, nullptr,
parallel_worker_thread_func, &workers[i]))
{
server_threads.erase(workers[i].thd);
destroy_background_thd(workers[i].thd);
for (uint j= 0; j < i; j++)| ~pwt_management() | ||
| { | ||
| if (workers) | ||
| join_parallel_workers(current_thd); | ||
| } |
There was a problem hiding this comment.
Memory leak: If init_parallel_workers fails, it frees the workers array and sets it to nullptr. However, some worker threads might have already started and pushed events into the parallel_messages queue. Since the destructor only calls join_parallel_workers (which drains the queue) if workers is not null, any messages already in the queue will be leaked. The queue should be drained regardless of the workers pointer state.
| #ifdef _WIN32 | ||
| #define MAX_THREAD_NAME 256 | ||
| #elif defined(__linux__) | ||
| #define MAX_THREAD_NAME 16 | ||
| #elif defined(__FreeBSD__) || defined(__OpenBSD__) | ||
| #define MAX_THREAD_NAME 19 | ||
| #include <pthread_np.h> | ||
| #elif defined(__apple_build_version__) | ||
| #include <sys/proc_info.h> | ||
| #define MAX_THREAD_NAME MAXTHREADNAMESIZE | ||
| #endif |
| bool error_to_queue( THD *thd, pwt_queued_event **event, uint error, | ||
| Sql_condition::enum_warning_level level, const char *msg ) | ||
| { | ||
| *event= (pwt_queued_event*) my_malloc(PSI_INSTRUMENT_ME, | ||
| sizeof(pwt_queued_event), | ||
| MYF(0)); | ||
| if (!*event) | ||
| return false; | ||
| (*event)->error= (pwt_error_message*) my_malloc(PSI_INSTRUMENT_ME, | ||
| sizeof(pwt_error_message), | ||
| MYF(0)); | ||
| if (!(*event)->error) | ||
| { | ||
| my_free(*event); | ||
| return false; | ||
| } | ||
| (*event)->data= nullptr; | ||
| (*event)->error->level= level; | ||
| if (level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR) | ||
| (*event)->error->worker_errno= thd->killed_errno(); | ||
| (*event)->error->code= error; | ||
| (*event)->error->message= (char *) my_malloc(PSI_INSTRUMENT_ME, strlen(msg)+1, | ||
| MYF(0)); | ||
| if (!(*event)->error->message) | ||
| { | ||
| my_free((*event)->error); | ||
| my_free(*event); | ||
| return false; | ||
| } | ||
| strmake((*event)->error->message, msg, strlen(msg)); | ||
| return true; | ||
| } |
There was a problem hiding this comment.
In error_to_queue, if any my_malloc call fails (e.g., at line 49), the function returns false but the *event pointer in the caller's scope will point to memory that has already been freed (at line 54). While the current caller doesn't use the pointer on failure, this is a dangerous API pattern. It is safer to set *event = nullptr before returning false.
bool error_to_queue( THD *thd, pwt_queued_event **event, uint error,
Sql_condition::enum_warning_level level, const char *msg )
{
*event= (pwt_queued_event*) my_malloc(PSI_INSTRUMENT_ME,
sizeof(pwt_queued_event),
MYF(0));
if (!*event)
return false;
(*event)->error= (pwt_error_message*) my_malloc(PSI_INSTRUMENT_ME,
sizeof(pwt_error_message),
MYF(0));
if (!(*event)->error)
{
my_free(*event);
*event= nullptr;
return false;
}
(*event)->data= nullptr;
(*event)->error->level= level;
if (level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR)
(*event)->error->worker_errno= thd->killed_errno();
(*event)->error->code= error;
(*event)->error->message= (char *) my_malloc(PSI_INSTRUMENT_ME, strlen(msg)+1,
MYF(0));
if (!(*event)->error->message)
{
my_free((*event)->error);
my_free(*event);
*event= nullptr;
return false;
}
strmake((*event)->error->message, msg, strlen(msg));
return true;
}| START: in lieu of work, wait 10 seconds, push out an error or a warning, | ||
| wait another 10 seconds then exit | ||
| */ | ||
| abs_timeout.tv_sec= time(0)+10; |
Introduces parallel_worker_threads variable to control the number of worker threads created by a parallel execution query.
2 new files, sql_parallel_workers.h sql_parallel_workers.cc which contain structures for the creation, management and deletion of parallel worker threads (pwt_ in the name). Main management class created in the stack in JOIN::exec, implemented for the top level select.
Current parallel_worker_thread_func sleeps for 10 seconds, generates a warning, signals the main thread, sleeps 10 seconds, signals the main thread again, sets it's finished flag and cleans it's THD.
The main thread loops through worker threads, looking for finished thread and cleans them up if they have finished.
It then waits for a signal, then processes it's message queue.
The thread management data is allocated on the stack in JOIN::exec. Everything else is allocated using my_malloc() and my_free().
Threads are registed in server_threads, so are visible in information_schema.processlist and the show processlist command.
We check that a kill query on a parallel worker is passed onto it's manager and the query is properly aborted, and that a kill connection is handled properly in parallel_worker.test.