1// SPDX-License-Identifier: Apache-2.0
2// ----------------------------------------------------------------------------
3// Copyright 2011-2024 Arm Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License"); you may not
6// use this file except in compliance with the License. You may obtain a copy
7// of the License at:
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14// License for the specific language governing permissions and limitations
15// under the License.
16// ----------------------------------------------------------------------------
17
18/**
19 * @brief Functions and data declarations for the outer context.
20 *
21 * The outer context includes thread-pool management, which is slower to
22 * compile due to increased use of C++ stdlib. The inner context used in the
23 * majority of the codec library does not include this.
24 */
25
26#ifndef ASTCENC_INTERNAL_ENTRY_INCLUDED
27#define ASTCENC_INTERNAL_ENTRY_INCLUDED
28
29#include <atomic>
30#include <condition_variable>
31#include <functional>
32#include <mutex>
33
34#include "astcenc_internal.h"
35
36/* ============================================================================
37  Parallel execution control
38============================================================================ */
39
40/**
41 * @brief A simple counter-based manager for parallel task execution.
42 *
43 * The task processing execution consists of:
44 *
45 *     * A single-threaded init stage.
46 *     * A multi-threaded processing stage.
47 *     * A condition variable so threads can wait for processing completion.
48 *
49 * The init stage will be executed by the first thread to arrive in the critical section, there is
50 * no main thread in the thread pool.
51 *
52 * The processing stage uses dynamic dispatch to assign task tickets to threads on an on-demand
53 * basis. Threads may each therefore executed different numbers of tasks, depending on their
54 * processing complexity. The task queue and the task tickets are just counters; the caller must map
55 * these integers to an actual processing partition in a specific problem domain.
56 *
57 * The exit wait condition is needed to ensure processing has finished before a worker thread can
58 * progress to the next stage of the pipeline. Specifically a worker may exit the processing stage
59 * because there are no new tasks to assign to it while other worker threads are still processing.
60 * Calling @c wait() will ensure that all other worker have finished before the thread can proceed.
61 *
62 * The basic usage model:
63 *
64 *     // --------- From single-threaded code ---------
65 *
66 *     // Reset the tracker state
67 *     manager->reset()
68 *
69 *     // --------- From multi-threaded code ---------
70 *
71 *     // Run the stage init; only first thread actually runs the lambda
72 *     manager->init(<lambda>)
73 *
74 *     do
75 *     {
76 *         // Request a task assignment
77 *         uint task_count;
78 *         uint base_index = manager->get_tasks(<granule>, task_count);
79 *
80 *         // Process any tasks we were given (task_count <= granule size)
81 *         if (task_count)
82 *         {
83 *             // Run the user task processing code for N tasks here
84 *             ...
85 *
86 *             // Flag these tasks as complete
87 *             manager->complete_tasks(task_count);
88 *         }
89 *     } while (task_count);
90 *
91 *     // Wait for all threads to complete tasks before progressing
92 *     manager->wait()
93 *
94  *     // Run the stage term; only first thread actually runs the lambda
95 *     manager->term(<lambda>)
96 */
97class ParallelManager
98{
99private:
100	/** @brief Lock used for critical section and condition synchronization. */
101	std::mutex m_lock;
102
103	/** @brief True if the stage init() step has been executed. */
104	bool m_init_done;
105
106	/** @brief True if the stage term() step has been executed. */
107	bool m_term_done;
108
109	/** @brief Condition variable for tracking stage processing completion. */
110	std::condition_variable m_complete;
111
112	/** @brief Number of tasks started, but not necessarily finished. */
113	std::atomic<unsigned int> m_start_count;
114
115	/** @brief Number of tasks finished. */
116	unsigned int m_done_count;
117
118	/** @brief Number of tasks that need to be processed. */
119	unsigned int m_task_count;
120
121	/** @brief Progress callback (optional). */
122	astcenc_progress_callback m_callback;
123
124	/** @brief Lock used for callback synchronization. */
125	std::mutex m_callback_lock;
126
127	/** @brief Minimum progress before making a callback. */
128	float m_callback_min_diff;
129
130	/** @brief Last progress callback value. */
131	float m_callback_last_value;
132
133public:
134	/** @brief Create a new ParallelManager. */
135	ParallelManager()
136	{
137		reset();
138	}
139
140	/**
141	 * @brief Reset the tracker for a new processing batch.
142	 *
143	 * This must be called from single-threaded code before starting the multi-threaded processing
144	 * operations.
145	 */
146	void reset()
147	{
148		m_init_done = false;
149		m_term_done = false;
150		m_start_count = 0;
151		m_done_count = 0;
152		m_task_count = 0;
153		m_callback_last_value = 0.0f;
154		m_callback_min_diff = 1.0f;
155	}
156
157	/**
158	 * @brief Trigger the pipeline stage init step.
159	 *
160	 * This can be called from multi-threaded code. The first thread to hit this will process the
161	 * initialization. Other threads will block and wait for it to complete.
162	 *
163	 * @param init_func   Callable which executes the stage initialization. It must return the
164	 *                    total number of tasks in the stage.
165	 */
166	void init(std::function<unsigned int(void)> init_func)
167	{
168		std::lock_guard<std::mutex> lck(m_lock);
169		if (!m_init_done)
170		{
171			m_task_count = init_func();
172			m_init_done = true;
173		}
174	}
175
176	/**
177	 * @brief Trigger the pipeline stage init step.
178	 *
179	 * This can be called from multi-threaded code. The first thread to hit this will process the
180	 * initialization. Other threads will block and wait for it to complete.
181	 *
182	 * @param task_count   Total number of tasks needing processing.
183	 * @param callback     Function pointer for progress status callbacks.
184	 */
185	void init(unsigned int task_count, astcenc_progress_callback callback)
186	{
187		std::lock_guard<std::mutex> lck(m_lock);
188		if (!m_init_done)
189		{
190			m_callback = callback;
191			m_task_count = task_count;
192			m_init_done = true;
193
194			// Report every 1% or 4096 blocks, whichever is larger, to avoid callback overhead
195			float min_diff = (4096.0f / static_cast<float>(task_count)) * 100.0f;
196			m_callback_min_diff = astc::max(min_diff, 1.0f);
197		}
198	}
199
200	/**
201	 * @brief Request a task assignment.
202	 *
203	 * Assign up to @c granule tasks to the caller for processing.
204	 *
205	 * @param      granule   Maximum number of tasks that can be assigned.
206	 * @param[out] count     Actual number of tasks assigned, or zero if no tasks were assigned.
207	 *
208	 * @return Task index of the first assigned task; assigned tasks increment from this.
209	 */
210	unsigned int get_task_assignment(unsigned int granule, unsigned int& count)
211	{
212		unsigned int base = m_start_count.fetch_add(granule, std::memory_order_relaxed);
213		if (base >= m_task_count)
214		{
215			count = 0;
216			return 0;
217		}
218
219		count = astc::min(m_task_count - base, granule);
220		return base;
221	}
222
223	/**
224	 * @brief Complete a task assignment.
225	 *
226	 * Mark @c count tasks as complete. This will notify all threads blocked on @c wait() if this
227	 * completes the processing of the stage.
228	 *
229	 * @param count   The number of completed tasks.
230	 */
231	void complete_task_assignment(unsigned int count)
232	{
233		// Note: m_done_count cannot use an atomic without the mutex; this has a race between the
234		// update here and the wait() for other threads
235		unsigned int local_count;
236		float local_last_value;
237		{
238			std::unique_lock<std::mutex> lck(m_lock);
239			m_done_count += count;
240			local_count = m_done_count;
241			local_last_value = m_callback_last_value;
242
243			if (m_done_count == m_task_count)
244			{
245				// Ensure the progress bar hits 100%
246				if (m_callback)
247				{
248					std::unique_lock<std::mutex> cblck(m_callback_lock);
249					m_callback(100.0f);
250					m_callback_last_value = 100.0f;
251				}
252
253				lck.unlock();
254				m_complete.notify_all();
255			}
256		}
257
258		// Process progress callback if we have one
259		if (m_callback)
260		{
261			// Initial lockless test - have we progressed enough to emit?
262			float num = static_cast<float>(local_count);
263			float den = static_cast<float>(m_task_count);
264			float this_value =  (num / den) * 100.0f;
265			bool report_test = (this_value - local_last_value) > m_callback_min_diff;
266
267			// Recheck under lock, because another thread might report first
268			if (report_test)
269			{
270				std::unique_lock<std::mutex> cblck(m_callback_lock);
271				bool report_retest = (this_value - m_callback_last_value) > m_callback_min_diff;
272				if (report_retest)
273				{
274					m_callback(this_value);
275					m_callback_last_value = this_value;
276				}
277			}
278		}
279	}
280
281	/**
282	 * @brief Wait for stage processing to complete.
283	 */
284	void wait()
285	{
286		std::unique_lock<std::mutex> lck(m_lock);
287		m_complete.wait(lck, [this]{ return m_done_count == m_task_count; });
288	}
289
290	/**
291	 * @brief Trigger the pipeline stage term step.
292	 *
293	 * This can be called from multi-threaded code. The first thread to hit this will process the
294	 * work pool termination. Caller must have called @c wait() prior to calling this function to
295	 * ensure that processing is complete.
296	 *
297	 * @param term_func   Callable which executes the stage termination.
298	 */
299	void term(std::function<void(void)> term_func)
300	{
301		std::lock_guard<std::mutex> lck(m_lock);
302		if (!m_term_done)
303		{
304			term_func();
305			m_term_done = true;
306		}
307	}
308};
309
310/**
311 * @brief The astcenc compression context.
312 */
313struct astcenc_context
314{
315	/** @brief The context internal state. */
316	astcenc_contexti context;
317
318#if !defined(ASTCENC_DECOMPRESS_ONLY)
319	/** @brief The parallel manager for averages computation. */
320	ParallelManager manage_avg;
321
322	/** @brief The parallel manager for compression. */
323	ParallelManager manage_compress;
324#endif
325
326	/** @brief The parallel manager for decompression. */
327	ParallelManager manage_decompress;
328};
329
330#endif
331