Mstdlib-1.24.0
m_thread_pipeline.h
1/* The MIT License (MIT)
2 *
3 * Copyright (c) 2021 Monetra Technologies, LLC.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 * THE SOFTWARE.
22 */
23
24#ifndef __M_THREAD_PIPELINE_H__
25#define __M_THREAD_PIPELINE_H__
26
27/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
28
29#include <mstdlib/base/m_defs.h>
30#include <mstdlib/base/m_types.h>
31#include <mstdlib/thread/m_thread.h>
32
33/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
34
35__BEGIN_DECLS
36
37/*! \addtogroup m_thread_pipeline Thread Task Pipeline
38 * \ingroup m_thread
39 *
40 * Implementation of a thread pipeline. Useful if there are a series of tasks which
41 * must be completed in order, and each task has one or more CPU or I/O intensive
42 * steps. This allows handoff to a dedicated thread for each step, while ensuring
43 * each task result is processed in a serialized manner. For CPU intensive workloads
44 * this helps in spreading load across multiple CPU cores, and also allows I/O to
45 * be embedded into a step that can run without blocking CPU.
46 *
47 * Example:
48 *
49 * \code{.c}
50 * struct M_thread_pipeline_task {
51 * const char *filename;
52 * M_uint8 *buf;
53 * size_t buf_len;
54 * }
55 *
56 * static void finish_cb(M_thread_pipeline_task_t *task, M_thread_pipeline_result_t result)
57 * {
58 * M_free(task->buf);
59 * M_free(task);
60 * }
61 *
62 * static M_bool fetch_cb(M_thread_pipeline_task_t *task)
63 * {
64 * task->buf = fetch_data(task->name, &task->buf_len);
65 * if (!task->buf)
66 * return M_FALSE;
67 * return M_TRUE;
68 * }
69 *
70 * static M_bool compress_cb(M_thread_pipeline_task_t *task)
71 * {
72 * M_uint8 *uncompressed = task->buf;
73 * size_t uncompressed_len = task->buf_len;
74 *
75 * task->buf = my_compress(uncompressed, uncompressed_len, &task->buf_len);
76 * M_free(uncompressed);
77 * if (!task->buf)
78 * return M_FALSE;
79 * return M_TRUE;
80 * }
81 *
82 * static M_bool encrypt_cb(M_thread_pipeline_task_t *task)
83 * {
84 * M_uint8 *compressed = task->buf;
85 * size_t compressed_len = task->buf_len;
86 *
87 * task->buf = my_encrypt(compressed, compressed_len, &task->buf_len);
88 * M_free(compressed);
89 * if (task->buf == NULL)
90 * return M_FALSE;
91 * return M_TRUE;
92 * }
93 *
94 * static M_bool write_cb(M_thread_pipeline_task_t *task)
95 * {
96 * M_fs_file_t *fp = NULL;
97 * M_fs_error_t err = M_FS_ERROR_SUCCESS;
98 * char filename[1024];
99 * M_snprintf(filename, sizeof(filename), "%s.out", task->name);
100 * err = M_fs_file_open(&fp, task->name, 0,
101 * M_FS_FILE_MODE_READ|M_FS_FILE_MODE_WRITE|M_FS_FILE_MODE_OVERWRITE, NULL);
102 * if (err != M_FS_ERROR_SUCCESS)
103 * goto fail;
104 *
105 * err = M_fs_file_write(fp, task->buf, task->buf_len, &wrote_len, M_FS_FILE_RW_FULLBUF);
106 * if (err == M_FS_ERROR_SUCCESS && wrote_len != task->buf_len) {
107 * err = M_FS_ERROR_FILE_2BIG;
108 * }
109 * fail:
110 * M_fs_file_close(fp);
111 * return err == M_FS_ERROR_SUCCESS?M_TRUE:M_FAIL;
112 * }
113 *
114 * int main()
115 * {
116 * const char *tasks[] = {
117 * "red",
118 * "white",
119 * "blue",
120 * "yellow",
121 * "green",
122 * "brown",
123 * NULL
124 * };
125 * M_thread_pipeline_t *pipeline = NULL;
126 * M_thread_pipeline_steps_t *steps = NULL;
127 * size_t i;
128 *
129 * steps = M_thread_pipeline_steps_create();
130 * M_thread_pipeline_steps_insert(steps, fetch_cb);
131 * M_thread_pipeline_steps_insert(steps, compress_cb);
132 * M_thread_pipeline_steps_insert(steps, encrypt_cb);
133 * M_thread_pipeline_steps_insert(steps, write_cb);
134 *
135 * pipeline = M_thread_pipeline_create(steps, M_THREAD_PIPELINE_FLAG_NONE, finish_cb);
136 * M_thread_pipeline_steps_destroy(steps);
137 *
138 * for (i=0; tasks[i] != NULL; i++) {
139 * M_thread_pipeline_task_t *task = M_malloc_zero(sizeof(*task));
140 * task->name = tasks[i];
141 * M_thread_pipeline_task_insert(pipeline, task);
142 * }
143 *
144 * M_thread_pipeline_wait(pipeline, 0);
145 * M_thread_pipeline_destroy(pipeline);
146 * return 0;
147 * }
148 * \endcode
149 *
150 * @{
151 */
152
153/*! Flags for pipeline initialization */
154typedef enum {
155 M_THREAD_PIPELINE_FLAG_NONE = 0, /*!< No flags, normal operation */
156 M_THREAD_PIPELINE_FLAG_NOABORT = 1 << 0 /*!< Do not abort all other enqueued tasks due to a failure of another task */
158
159/*! Caller-defined structure to hold task data. It is the only data element
160 * passed from thread to thread and must track its own state based on knowing
161 * how the pipeline is configured */
162struct M_thread_pipeline_task;
163typedef struct M_thread_pipeline_task M_thread_pipeline_task_t;
164
165/*! Structure used to pass steps into M_thread_pipeline_create(). Initialized
166 * with M_thread_pipeline_steps_create() and destroyed with M_thread_pipeline_steps_destroy() */
167struct M_thread_pipeline_steps;
168typedef struct M_thread_pipeline_steps M_thread_pipeline_steps_t;
169
170/*! Internal state tracking for thread pipeline, initialized via M_thread_pipeline_create()
171 * and destroyed via M_thread_pipeline_destroy() */
172struct M_thread_pipeline;
173typedef struct M_thread_pipeline M_thread_pipeline_t;
174
175/*! User-defined callback for each step
176 * \param[in] task User-defined task data to be operated on
177 * \return M_TRUE if completed successfully, M_FALSE otherwise
178 */
180
181/*! Result codes passed to M_thread_pipeline_taskfinish_cb() */
182typedef enum {
183 M_THREAD_PIPELINE_RESULT_SUCCESS = 1, /*!< Task completed successfully */
184 M_THREAD_PIPELINE_RESULT_FAIL = 2, /*!< Task failed -- record error in user-defined task structure */
185 M_THREAD_PIPELINE_RESULT_ABORT = 3 /*!< Task was forcibly aborted due to a failure of another task,
186 or M_thread_pipeline_destroy() was called before completion */
188
189/*! User-defined, and required, callback at the completion of each task.
190 * This may be called:
191 * - Upon completion of task, whether successful or not (see result)
192 * - Upon abort due to a prior task failure if the pipeline is configured
193 * to abort all other tasks if a single task fails (default).
194 * At a minimum, this must free any memory associated with the user-defined
195 * task structure.
196 *
197 * \param[in] task User-defined task structure
198 * \param[in] result the result of the task, one of M_thread_pipeline_result_t.
199 */
201
202/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
203
204/*! Initialize an empty pipeline step list
205 * \return initialized and empty step list. NULL on failure.
206 * freed with M_thread_pipeline_steps_destroy()*/
208
209/*! Insert a step into the task pipeline
210 *
211 * \param[in] steps Initialized pipeline steps structure from M_thread_pipeline_steps_create()
212 * \param[in] task_cb Task to perform
213 * \return M_TRUE on success, M_FALSE on usage error.
214 */
216
217/*! Destroy the task step list initialized with M_thread_pipeline_steps_create()
218 *
219 * \param[in] steps Initialized piipeline steps structure from M_thread_pipeline_steps_create()
220 */
222
223/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
224
225/*! Initialize the thread pipeline with the various steps to be performed for each task.
226 * This will spawn one thread per step and immediately start all threads. There is no
227 * additional function to start the pipeline other than to insert each task to be
228 * processed.
229 *
230 * \param[in] steps Pointer to steps to perform for each task. The passed in pointer
231 * is internally duplicated, so it may be destroyed immediately
232 * after this function returns.
233 * \param[in] flags One or more pipeline flags from M_thread_pipeline_flags_t
234 * \param[in] finish_cb Callback to be called after each task is completed. At a minimum,
235 * this should free the memory assocated with the task pointer. The
236 * finish_cb is not called from the same thread as enqueued it so
237 * proper thread concurrency protections (e.g. mutexes) must be
238 * in place.
239 * \return initialized M_thread_pipeline_t or NULL on failure (usage, thread limits)
240 */
242
243/*! Insert a task into the thread pipeline.
244 *
245 * This function will enqueue tasks into an internal task list indefinitely and will not block. If it
246 * is desirable to cap the enqueued task list, please see M_thread_pipeline_wait() and M_thread_pipeline_queue_count().
247 *
248 * \param[in] pipeline Pointer to the initialized thread pipeline returned from M_thread_pipeline_create().
249 * \param[in] task User-defined task structure describing task to be perfomed for each step.
250 * It is the responsibility of the user to define their own private
251 * struct M_thread_pipeline_task with all members and necessary state tracking
252 * to perform each step callback. It is guaranteed that no more than 1 step will
253 * be accessing this structure in parallel.
254 * \return M_TRUE if task is inserted, M_FALSE on either usage error or if task could not be
255 * enqueued as a prior step had failed.
256 */
258
259/*! Wait pipeline tasks/steps to complete to the task queue limit specified.
260 * \param[in] pipeline Pipeline initialized with M_thread_pipeline_create()
261 * \param[in] queue_limit Will block until the queued task list is reduced to
262 * at least this size. Use 0 to wait until all tasks are
263 * completed.
264 */
265M_API void M_thread_pipeline_wait(M_thread_pipeline_t *pipeline, size_t queue_limit);
266
267
268/*! Count of queued tasks, this includes the task currently being processed if any.
269 * \param[in] pipeline Pipeline initialized with M_thread_pipeline_create()
270 * \return count of queued tasks.
271 */
273
274
275/*! Retrieve if the pipeline is in a good state. The only time a pipeline will
276 * not be in a good state is if a step failed.
277 * \return M_TRUE if in a good state
278 */
280
281
282/*! Destroy the thread pipeline. If there are any outstanding tasks/steps, they will be
283 * aborted and return an abort error code to their finish_cb
284 * \param[in] pipeline Pipeline initialized with M_thread_pipeline_create()
285 */
287
288
289
290/*! @} */
291
292__END_DECLS
293
294#endif /* __M_THREAD_PIPELINE_H__ */
void M_thread_pipeline_wait(M_thread_pipeline_t *pipeline, size_t queue_limit)
void(* M_thread_pipeline_taskfinish_cb)(M_thread_pipeline_task_t *task, M_thread_pipeline_result_t result)
Definition: m_thread_pipeline.h:200
struct M_thread_pipeline M_thread_pipeline_t
Definition: m_thread_pipeline.h:173
struct M_thread_pipeline_task M_thread_pipeline_task_t
Definition: m_thread_pipeline.h:163
M_bool(* M_thread_pipeline_task_cb)(M_thread_pipeline_task_t *task)
Definition: m_thread_pipeline.h:179
M_bool M_thread_pipeline_status(M_thread_pipeline_t *pipeline)
M_thread_pipeline_flags_t
Definition: m_thread_pipeline.h:154
M_bool M_thread_pipeline_task_insert(M_thread_pipeline_t *pipeline, M_thread_pipeline_task_t *task)
M_thread_pipeline_result_t
Definition: m_thread_pipeline.h:182
void M_thread_pipeline_destroy(M_thread_pipeline_t *pipeline)
M_thread_pipeline_steps_t * M_thread_pipeline_steps_create(void)
M_thread_pipeline_t * M_thread_pipeline_create(const M_thread_pipeline_steps_t *steps, int flags, M_thread_pipeline_taskfinish_cb finish_cb)
M_bool M_thread_pipeline_steps_insert(M_thread_pipeline_steps_t *steps, M_thread_pipeline_task_cb task_cb)
void M_thread_pipeline_steps_destroy(M_thread_pipeline_steps_t *steps)
struct M_thread_pipeline_steps M_thread_pipeline_steps_t
Definition: m_thread_pipeline.h:168
size_t M_thread_pipeline_queue_count(M_thread_pipeline_t *pipeline)
@ M_THREAD_PIPELINE_FLAG_NOABORT
Definition: m_thread_pipeline.h:156
@ M_THREAD_PIPELINE_FLAG_NONE
Definition: m_thread_pipeline.h:155
@ M_THREAD_PIPELINE_RESULT_SUCCESS
Definition: m_thread_pipeline.h:183
@ M_THREAD_PIPELINE_RESULT_FAIL
Definition: m_thread_pipeline.h:184
@ M_THREAD_PIPELINE_RESULT_ABORT
Definition: m_thread_pipeline.h:185