Skip to content

MDEV-39492 Parallel Query: Study how to create worker threads#5099

Open
mariadb-RexJohnston wants to merge 1 commit into
mainfrom
13.0-MDEV-39492-2
Open

MDEV-39492 Parallel Query: Study how to create worker threads#5099
mariadb-RexJohnston wants to merge 1 commit into
mainfrom
13.0-MDEV-39492-2

Conversation

@mariadb-RexJohnston
Copy link
Copy Markdown
Member

Introduces parallel_worker_threads variable to control the number of worker threads created by a parallel execution query.

2 new files, sql_parallel_workers.h sql_parallel_workers.cc which contain structures for the creation, management and deletion of parallel worker threads (pwt_ in the name). Main management class created in the stack in JOIN::exec, implemented for the top level select.

Current parallel_worker_thread_func sleeps for 10 seconds, generates a warning, signals the main thread, sleeps 10 seconds, signals the main thread again, sets it's finished flag and cleans it's THD.

The main thread loops through worker threads, looking for finished thread and cleans them up if they have finished.
It then waits for a signal, then processes it's message queue.

The thread management data is allocated on the stack in JOIN::exec. Everything else is allocated using my_malloc() and my_free().

Threads are registed in server_threads, so are visible in information_schema.processlist and the show processlist command.

We check that a kill query on a parallel worker is passed onto it's manager and the query is properly aborted, and that a kill connection is handled properly in parallel_worker.test.

Introduces parallel_worker_threads variable to control the number
of worker threads created by a parallel execution query.

2 new files, sql_parallel_workers.h sql_parallel_workers.cc which
contain structures for the creation, management and deletion of
parallel worker threads (pwt_ in the name).  Main management
class created in the stack in JOIN::exec, implemented for the
top level select.

Current parallel_worker_thread_func sleeps for 10 seconds, generates
a warning, signals the main thread, sleeps 10 seconds, signals the
main thread again, sets it's finished flag and cleans it's THD.

The main thread loops through worker threads, looking for finished
thread and cleans them up if they have finished.
It then waits for a signal, then processes it's message queue.

The thread management data is allocated on the stack in JOIN::exec.
Everything else is allocated using my_malloc() and my_free().

Threads are registed in server_threads, so are visible in
information_schema.processlist and the show processlist command.

We check that a kill query on a parallel worker is passed onto it's
manager and the query is properly aborted, and that a kill connection
is handled properly in parallel_worker.test.
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a framework for parallel query execution, introducing worker thread management, background thread creation, and an error propagation system. It adds Performance Schema instrumentation and a new system variable, parallel_worker_threads, to control the number of workers. The code review identified several critical issues, including a race condition in worker status checks, missing synchronization when accessing the global thread list, and potential double-free and memory leak scenarios during error handling. Suggestions were also made to use monotonic time for timeouts and to adopt more specific naming for global macros.

Comment on lines +352 to +357
if (workers[i].finished)
continue;

if ((int)i != killed_from)
{
workers[i].thd->killed= thd->killed;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Critical race condition: The manager thread checks workers[i].finished without a lock. If it is false, it proceeds to access workers[i].thd at line 357. However, the worker thread can set finished = true and then clear thd = nullptr (at line 161) in the window between the manager's check and its access. This will cause a null pointer dereference and crash the server. Access to worker->thd and worker->finished must be synchronized using a mutex or atomic variables.

Comment on lines +159 to +160
server_threads.erase(worker->thd);
destroy_background_thd(worker->thd);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing server_threads (inserting or erasing) typically requires holding LOCK_thread_count to prevent race conditions with other threads iterating over the global thread list (e.g., during SHOW PROCESSLIST).

Comment on lines +277 to +279
my_free(const_cast<char*>(workers[i].thd->db.str));
server_threads.erase(workers[i].thd);
destroy_background_thd(workers[i].thd);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential double-free here. If destroy_background_thd eventually calls THD::free_connection(), it will attempt to free db.str. Since you are manually calling my_free on db.str at line 277 just before destroy_background_thd, this will result in a double-free error.

      if (mysql_thread_create(key_thread_pwt, &workers[i].pthread, nullptr,
                              parallel_worker_thread_func, &workers[i]))
      {
        server_threads.erase(workers[i].thd);
        destroy_background_thd(workers[i].thd);
        for (uint j= 0; j < i; j++)

Comment on lines +85 to +89
~pwt_management()
{
if (workers)
join_parallel_workers(current_thd);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Memory leak: If init_parallel_workers fails, it frees the workers array and sets it to nullptr. However, some worker threads might have already started and pushed events into the parallel_messages queue. Since the destructor only calls join_parallel_workers (which drains the queue) if workers is not null, any messages already in the queue will be leaked. The queue should be drained regardless of the workers pointer state.

Comment thread include/my_pthread.h
Comment on lines +627 to +637
#ifdef _WIN32
#define MAX_THREAD_NAME 256
#elif defined(__linux__)
#define MAX_THREAD_NAME 16
#elif defined(__FreeBSD__) || defined(__OpenBSD__)
#define MAX_THREAD_NAME 19
#include <pthread_np.h>
#elif defined(__apple_build_version__)
#include <sys/proc_info.h>
#define MAX_THREAD_NAME MAXTHREADNAMESIZE
#endif
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The macro name MAX_THREAD_NAME is quite generic and could potentially conflict with definitions in system headers or other libraries. It is recommended to use a more specific prefix, such as MY_MAX_THREAD_NAME or MARIADB_MAX_THREAD_NAME.

Comment on lines +28 to +59
bool error_to_queue( THD *thd, pwt_queued_event **event, uint error,
Sql_condition::enum_warning_level level, const char *msg )
{
*event= (pwt_queued_event*) my_malloc(PSI_INSTRUMENT_ME,
sizeof(pwt_queued_event),
MYF(0));
if (!*event)
return false;
(*event)->error= (pwt_error_message*) my_malloc(PSI_INSTRUMENT_ME,
sizeof(pwt_error_message),
MYF(0));
if (!(*event)->error)
{
my_free(*event);
return false;
}
(*event)->data= nullptr;
(*event)->error->level= level;
if (level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR)
(*event)->error->worker_errno= thd->killed_errno();
(*event)->error->code= error;
(*event)->error->message= (char *) my_malloc(PSI_INSTRUMENT_ME, strlen(msg)+1,
MYF(0));
if (!(*event)->error->message)
{
my_free((*event)->error);
my_free(*event);
return false;
}
strmake((*event)->error->message, msg, strlen(msg));
return true;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In error_to_queue, if any my_malloc call fails (e.g., at line 49), the function returns false but the *event pointer in the caller's scope will point to memory that has already been freed (at line 54). While the current caller doesn't use the pointer on failure, this is a dangerous API pattern. It is safer to set *event = nullptr before returning false.

bool error_to_queue( THD *thd, pwt_queued_event **event, uint error,
                     Sql_condition::enum_warning_level level, const char *msg )
{
  *event= (pwt_queued_event*) my_malloc(PSI_INSTRUMENT_ME,
                                           sizeof(pwt_queued_event),
                                           MYF(0));
  if (!*event)
    return false;
  (*event)->error= (pwt_error_message*) my_malloc(PSI_INSTRUMENT_ME,
                                               sizeof(pwt_error_message),
                                               MYF(0));
  if (!(*event)->error)
  {
    my_free(*event);
    *event= nullptr;
    return false;
  }
  (*event)->data= nullptr;
  (*event)->error->level= level;
  if (level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR)
    (*event)->error->worker_errno= thd->killed_errno();
  (*event)->error->code= error;
  (*event)->error->message= (char *) my_malloc(PSI_INSTRUMENT_ME, strlen(msg)+1,
                                               MYF(0));
  if (!(*event)->error->message)
  {
    my_free((*event)->error);
    my_free(*event);
    *event= nullptr;
    return false;
  }
  strmake((*event)->error->message, msg, strlen(msg));
  return true;
}

START: in lieu of work, wait 10 seconds, push out an error or a warning,
wait another 10 seconds then exit
*/
abs_timeout.tv_sec= time(0)+10;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using time(0) for timeouts is discouraged as it relies on the system wall clock, which can jump. It is better to use my_time(0) or, where possible, monotonic time utilities provided by the server to ensure consistent timeout behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

2 participants