Mstdlib-1.24.0
Thread Task Pipeline

Typedefs

typedef struct M_thread_pipeline_task M_thread_pipeline_task_t
 
typedef struct M_thread_pipeline_steps M_thread_pipeline_steps_t
 
typedef struct M_thread_pipeline M_thread_pipeline_t
 
typedef M_bool(* M_thread_pipeline_task_cb) (M_thread_pipeline_task_t *task)
 
typedef void(* M_thread_pipeline_taskfinish_cb) (M_thread_pipeline_task_t *task, M_thread_pipeline_result_t result)
 

Enumerations

enum  M_thread_pipeline_flags_t {
  M_THREAD_PIPELINE_FLAG_NONE = 0 ,
  M_THREAD_PIPELINE_FLAG_NOABORT = 1 << 0
}
 
enum  M_thread_pipeline_result_t {
  M_THREAD_PIPELINE_RESULT_SUCCESS = 1 ,
  M_THREAD_PIPELINE_RESULT_FAIL = 2 ,
  M_THREAD_PIPELINE_RESULT_ABORT = 3
}
 

Functions

M_thread_pipeline_steps_tM_thread_pipeline_steps_create (void)
 
M_bool M_thread_pipeline_steps_insert (M_thread_pipeline_steps_t *steps, M_thread_pipeline_task_cb task_cb)
 
void M_thread_pipeline_steps_destroy (M_thread_pipeline_steps_t *steps)
 
M_thread_pipeline_tM_thread_pipeline_create (const M_thread_pipeline_steps_t *steps, int flags, M_thread_pipeline_taskfinish_cb finish_cb)
 
M_bool M_thread_pipeline_task_insert (M_thread_pipeline_t *pipeline, M_thread_pipeline_task_t *task)
 
void M_thread_pipeline_wait (M_thread_pipeline_t *pipeline, size_t queue_limit)
 
size_t M_thread_pipeline_queue_count (M_thread_pipeline_t *pipeline)
 
M_bool M_thread_pipeline_status (M_thread_pipeline_t *pipeline)
 
void M_thread_pipeline_destroy (M_thread_pipeline_t *pipeline)
 

Detailed Description

Implementation of a thread pipeline. Useful if there are a series of tasks which must be completed in order, and each task has one or more CPU or I/O intensive steps. This allows handoff to a dedicated thread for each step, while ensuring each task result is processed in a serialized manner. For CPU intensive workloads this helps in spreading load across multiple CPU cores, and also allows I/O to be embedded into a step that can run without blocking CPU.

Example:

struct M_thread_pipeline_task {
const char *filename;
M_uint8 *buf;
size_t buf_len;
}
static void finish_cb(M_thread_pipeline_task_t *task, M_thread_pipeline_result_t result)
{
M_free(task->buf);
M_free(task);
}
static M_bool fetch_cb(M_thread_pipeline_task_t *task)
{
task->buf = fetch_data(task->name, &task->buf_len);
if (!task->buf)
return M_FALSE;
return M_TRUE;
}
static M_bool compress_cb(M_thread_pipeline_task_t *task)
{
M_uint8 *uncompressed = task->buf;
size_t uncompressed_len = task->buf_len;
task->buf = my_compress(uncompressed, uncompressed_len, &task->buf_len);
M_free(uncompressed);
if (!task->buf)
return M_FALSE;
return M_TRUE;
}
static M_bool encrypt_cb(M_thread_pipeline_task_t *task)
{
M_uint8 *compressed = task->buf;
size_t compressed_len = task->buf_len;
task->buf = my_encrypt(compressed, compressed_len, &task->buf_len);
M_free(compressed);
if (task->buf == NULL)
return M_FALSE;
return M_TRUE;
}
static M_bool write_cb(M_thread_pipeline_task_t *task)
{
M_fs_file_t *fp = NULL;
char filename[1024];
M_snprintf(filename, sizeof(filename), "%s.out", task->name);
err = M_fs_file_open(&fp, task->name, 0,
if (err != M_FS_ERROR_SUCCESS)
goto fail;
err = M_fs_file_write(fp, task->buf, task->buf_len, &wrote_len, M_FS_FILE_RW_FULLBUF);
if (err == M_FS_ERROR_SUCCESS && wrote_len != task->buf_len) {
}
fail:
return err == M_FS_ERROR_SUCCESS?M_TRUE:M_FAIL;
}
int main()
{
const char *tasks[] = {
"red",
"white",
"blue",
"yellow",
"green",
"brown",
NULL
};
M_thread_pipeline_t *pipeline = NULL;
size_t i;
M_thread_pipeline_steps_insert(steps, compress_cb);
M_thread_pipeline_steps_insert(steps, encrypt_cb);
for (i=0; tasks[i] != NULL; i++) {
M_thread_pipeline_task_t *task = M_malloc_zero(sizeof(*task));
task->name = tasks[i];
}
M_thread_pipeline_wait(pipeline, 0);
return 0;
}
size_t M_snprintf(char *buf, size_t size, const char *fmt,...)
struct M_fs_file M_fs_file_t
Definition: m_fs.h:132
M_fs_error_t
Definition: m_fs.h:154
@ M_FS_FILE_RW_FULLBUF
Definition: m_fs.h:263
@ M_FS_FILE_MODE_READ
Definition: m_fs.h:247
@ M_FS_FILE_MODE_OVERWRITE
Definition: m_fs.h:251
@ M_FS_FILE_MODE_WRITE
Definition: m_fs.h:248
@ M_FS_ERROR_FILE_2BIG
Definition: m_fs.h:167
@ M_FS_ERROR_SUCCESS
Definition: m_fs.h:155
void M_fs_file_close(M_fs_file_t *fd)
M_fs_error_t M_fs_file_open(M_fs_file_t **fd, const char *path, size_t buf_size, M_uint32 mode, const M_fs_perms_t *perms)
M_fs_error_t M_fs_file_write(M_fs_file_t *fd, const unsigned char *buf, size_t count, size_t *wrote_len, M_uint32 flags)
void * M_malloc_zero(size_t size) M_ALLOC_SIZE(1) M_WARN_UNUSED_RESULT M_MALLOC
void M_free(void *ptr) M_FREE(1)
void M_thread_pipeline_wait(M_thread_pipeline_t *pipeline, size_t queue_limit)
struct M_thread_pipeline M_thread_pipeline_t
Definition: m_thread_pipeline.h:173
struct M_thread_pipeline_task M_thread_pipeline_task_t
Definition: m_thread_pipeline.h:163
M_bool M_thread_pipeline_task_insert(M_thread_pipeline_t *pipeline, M_thread_pipeline_task_t *task)
M_thread_pipeline_result_t
Definition: m_thread_pipeline.h:182
void M_thread_pipeline_destroy(M_thread_pipeline_t *pipeline)
M_thread_pipeline_steps_t * M_thread_pipeline_steps_create(void)
M_thread_pipeline_t * M_thread_pipeline_create(const M_thread_pipeline_steps_t *steps, int flags, M_thread_pipeline_taskfinish_cb finish_cb)
M_bool M_thread_pipeline_steps_insert(M_thread_pipeline_steps_t *steps, M_thread_pipeline_task_cb task_cb)
void M_thread_pipeline_steps_destroy(M_thread_pipeline_steps_t *steps)
struct M_thread_pipeline_steps M_thread_pipeline_steps_t
Definition: m_thread_pipeline.h:168
@ M_THREAD_PIPELINE_FLAG_NONE
Definition: m_thread_pipeline.h:155

Typedef Documentation

◆ M_thread_pipeline_task_t

typedef struct M_thread_pipeline_task M_thread_pipeline_task_t

◆ M_thread_pipeline_steps_t

typedef struct M_thread_pipeline_steps M_thread_pipeline_steps_t

◆ M_thread_pipeline_t

typedef struct M_thread_pipeline M_thread_pipeline_t

◆ M_thread_pipeline_task_cb

typedef M_bool(* M_thread_pipeline_task_cb) (M_thread_pipeline_task_t *task)

User-defined callback for each step

Parameters
[in]taskUser-defined task data to be operated on
Returns
M_TRUE if completed successfully, M_FALSE otherwise

◆ M_thread_pipeline_taskfinish_cb

typedef void(* M_thread_pipeline_taskfinish_cb) (M_thread_pipeline_task_t *task, M_thread_pipeline_result_t result)

User-defined, and required, callback at the completion of each task. This may be called:

  • Upon completion of task, whether successful or not (see result)
  • Upon abort due to a prior task failure if the pipeline is configured to abort all other tasks if a single task fails (default). At a minimum, this must free any memory associated with the user-defined task structure.
Parameters
[in]taskUser-defined task structure
[in]resultthe result of the task, one of M_thread_pipeline_result_t.

Enumeration Type Documentation

◆ M_thread_pipeline_flags_t

Flags for pipeline initialization

Enumerator
M_THREAD_PIPELINE_FLAG_NONE 

No flags, normal operation

M_THREAD_PIPELINE_FLAG_NOABORT 

Do not abort all other enqueued tasks due to a failure of another task

◆ M_thread_pipeline_result_t

Result codes passed to M_thread_pipeline_taskfinish_cb()

Enumerator
M_THREAD_PIPELINE_RESULT_SUCCESS 

Task completed successfully

M_THREAD_PIPELINE_RESULT_FAIL 

Task failed – record error in user-defined task structure

M_THREAD_PIPELINE_RESULT_ABORT 

Task was forcibly aborted due to a failure of another task, or M_thread_pipeline_destroy() was called before completion

Function Documentation

◆ M_thread_pipeline_steps_create()

M_thread_pipeline_steps_t * M_thread_pipeline_steps_create ( void  )

Initialize an empty pipeline step list

Returns
initialized and empty step list. NULL on failure. freed with M_thread_pipeline_steps_destroy()

◆ M_thread_pipeline_steps_insert()

M_bool M_thread_pipeline_steps_insert ( M_thread_pipeline_steps_t steps,
M_thread_pipeline_task_cb  task_cb 
)

Insert a step into the task pipeline

Parameters
[in]stepsInitialized pipeline steps structure from M_thread_pipeline_steps_create()
[in]task_cbTask to perform
Returns
M_TRUE on success, M_FALSE on usage error.

◆ M_thread_pipeline_steps_destroy()

void M_thread_pipeline_steps_destroy ( M_thread_pipeline_steps_t steps)

Destroy the task step list initialized with M_thread_pipeline_steps_create()

Parameters
[in]stepsInitialized piipeline steps structure from M_thread_pipeline_steps_create()

◆ M_thread_pipeline_create()

M_thread_pipeline_t * M_thread_pipeline_create ( const M_thread_pipeline_steps_t steps,
int  flags,
M_thread_pipeline_taskfinish_cb  finish_cb 
)

Initialize the thread pipeline with the various steps to be performed for each task. This will spawn one thread per step and immediately start all threads. There is no additional function to start the pipeline other than to insert each task to be processed.

Parameters
[in]stepsPointer to steps to perform for each task. The passed in pointer is internally duplicated, so it may be destroyed immediately after this function returns.
[in]flagsOne or more pipeline flags from M_thread_pipeline_flags_t
[in]finish_cbCallback to be called after each task is completed. At a minimum, this should free the memory assocated with the task pointer. The finish_cb is not called from the same thread as enqueued it so proper thread concurrency protections (e.g. mutexes) must be in place.
Returns
initialized M_thread_pipeline_t or NULL on failure (usage, thread limits)

◆ M_thread_pipeline_task_insert()

M_bool M_thread_pipeline_task_insert ( M_thread_pipeline_t pipeline,
M_thread_pipeline_task_t task 
)

Insert a task into the thread pipeline.

This function will enqueue tasks into an internal task list indefinitely and will not block. If it is desirable to cap the enqueued task list, please see M_thread_pipeline_wait() and M_thread_pipeline_queue_count().

Parameters
[in]pipelinePointer to the initialized thread pipeline returned from M_thread_pipeline_create().
[in]taskUser-defined task structure describing task to be perfomed for each step. It is the responsibility of the user to define their own private struct M_thread_pipeline_task with all members and necessary state tracking to perform each step callback. It is guaranteed that no more than 1 step will be accessing this structure in parallel.
Returns
M_TRUE if task is inserted, M_FALSE on either usage error or if task could not be enqueued as a prior step had failed.

◆ M_thread_pipeline_wait()

void M_thread_pipeline_wait ( M_thread_pipeline_t pipeline,
size_t  queue_limit 
)

Wait pipeline tasks/steps to complete to the task queue limit specified.

Parameters
[in]pipelinePipeline initialized with M_thread_pipeline_create()
[in]queue_limitWill block until the queued task list is reduced to at least this size. Use 0 to wait until all tasks are completed.

◆ M_thread_pipeline_queue_count()

size_t M_thread_pipeline_queue_count ( M_thread_pipeline_t pipeline)

Count of queued tasks, this includes the task currently being processed if any.

Parameters
[in]pipelinePipeline initialized with M_thread_pipeline_create()
Returns
count of queued tasks.

◆ M_thread_pipeline_status()

M_bool M_thread_pipeline_status ( M_thread_pipeline_t pipeline)

Retrieve if the pipeline is in a good state. The only time a pipeline will not be in a good state is if a step failed.

Returns
M_TRUE if in a good state

◆ M_thread_pipeline_destroy()

void M_thread_pipeline_destroy ( M_thread_pipeline_t pipeline)

Destroy the thread pipeline. If there are any outstanding tasks/steps, they will be aborted and return an abort error code to their finish_cb

Parameters
[in]pipelinePipeline initialized with M_thread_pipeline_create()