1cc1dc7a3Sopenharmony_ci// SPDX-License-Identifier: Apache-2.0
2cc1dc7a3Sopenharmony_ci// ----------------------------------------------------------------------------
3cc1dc7a3Sopenharmony_ci// Copyright 2011-2024 Arm Limited
4cc1dc7a3Sopenharmony_ci//
5cc1dc7a3Sopenharmony_ci// Licensed under the Apache License, Version 2.0 (the "License"); you may not
6cc1dc7a3Sopenharmony_ci// use this file except in compliance with the License. You may obtain a copy
7cc1dc7a3Sopenharmony_ci// of the License at:
8cc1dc7a3Sopenharmony_ci//
9cc1dc7a3Sopenharmony_ci//     http://www.apache.org/licenses/LICENSE-2.0
10cc1dc7a3Sopenharmony_ci//
11cc1dc7a3Sopenharmony_ci// Unless required by applicable law or agreed to in writing, software
12cc1dc7a3Sopenharmony_ci// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13cc1dc7a3Sopenharmony_ci// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14cc1dc7a3Sopenharmony_ci// License for the specific language governing permissions and limitations
15cc1dc7a3Sopenharmony_ci// under the License.
16cc1dc7a3Sopenharmony_ci// ----------------------------------------------------------------------------
17cc1dc7a3Sopenharmony_ci
18cc1dc7a3Sopenharmony_ci/**
19cc1dc7a3Sopenharmony_ci * @brief Functions and data declarations for the outer context.
20cc1dc7a3Sopenharmony_ci *
21cc1dc7a3Sopenharmony_ci * The outer context includes thread-pool management, which is slower to
22cc1dc7a3Sopenharmony_ci * compile due to increased use of C++ stdlib. The inner context used in the
23cc1dc7a3Sopenharmony_ci * majority of the codec library does not include this.
24cc1dc7a3Sopenharmony_ci */
25cc1dc7a3Sopenharmony_ci
26cc1dc7a3Sopenharmony_ci#ifndef ASTCENC_INTERNAL_ENTRY_INCLUDED
27cc1dc7a3Sopenharmony_ci#define ASTCENC_INTERNAL_ENTRY_INCLUDED
28cc1dc7a3Sopenharmony_ci
29cc1dc7a3Sopenharmony_ci#include <atomic>
30cc1dc7a3Sopenharmony_ci#include <condition_variable>
31cc1dc7a3Sopenharmony_ci#include <functional>
32cc1dc7a3Sopenharmony_ci#include <mutex>
33cc1dc7a3Sopenharmony_ci
34cc1dc7a3Sopenharmony_ci#include "astcenc_internal.h"
35cc1dc7a3Sopenharmony_ci
36cc1dc7a3Sopenharmony_ci/* ============================================================================
37cc1dc7a3Sopenharmony_ci  Parallel execution control
38cc1dc7a3Sopenharmony_ci============================================================================ */
39cc1dc7a3Sopenharmony_ci
40cc1dc7a3Sopenharmony_ci/**
41cc1dc7a3Sopenharmony_ci * @brief A simple counter-based manager for parallel task execution.
42cc1dc7a3Sopenharmony_ci *
43cc1dc7a3Sopenharmony_ci * The task processing execution consists of:
44cc1dc7a3Sopenharmony_ci *
45cc1dc7a3Sopenharmony_ci *     * A single-threaded init stage.
46cc1dc7a3Sopenharmony_ci *     * A multi-threaded processing stage.
47cc1dc7a3Sopenharmony_ci *     * A condition variable so threads can wait for processing completion.
48cc1dc7a3Sopenharmony_ci *
49cc1dc7a3Sopenharmony_ci * The init stage will be executed by the first thread to arrive in the critical section, there is
50cc1dc7a3Sopenharmony_ci * no main thread in the thread pool.
51cc1dc7a3Sopenharmony_ci *
52cc1dc7a3Sopenharmony_ci * The processing stage uses dynamic dispatch to assign task tickets to threads on an on-demand
53cc1dc7a3Sopenharmony_ci * basis. Threads may each therefore executed different numbers of tasks, depending on their
54cc1dc7a3Sopenharmony_ci * processing complexity. The task queue and the task tickets are just counters; the caller must map
55cc1dc7a3Sopenharmony_ci * these integers to an actual processing partition in a specific problem domain.
56cc1dc7a3Sopenharmony_ci *
57cc1dc7a3Sopenharmony_ci * The exit wait condition is needed to ensure processing has finished before a worker thread can
58cc1dc7a3Sopenharmony_ci * progress to the next stage of the pipeline. Specifically a worker may exit the processing stage
59cc1dc7a3Sopenharmony_ci * because there are no new tasks to assign to it while other worker threads are still processing.
60cc1dc7a3Sopenharmony_ci * Calling @c wait() will ensure that all other worker have finished before the thread can proceed.
61cc1dc7a3Sopenharmony_ci *
62cc1dc7a3Sopenharmony_ci * The basic usage model:
63cc1dc7a3Sopenharmony_ci *
64cc1dc7a3Sopenharmony_ci *     // --------- From single-threaded code ---------
65cc1dc7a3Sopenharmony_ci *
66cc1dc7a3Sopenharmony_ci *     // Reset the tracker state
67cc1dc7a3Sopenharmony_ci *     manager->reset()
68cc1dc7a3Sopenharmony_ci *
69cc1dc7a3Sopenharmony_ci *     // --------- From multi-threaded code ---------
70cc1dc7a3Sopenharmony_ci *
71cc1dc7a3Sopenharmony_ci *     // Run the stage init; only first thread actually runs the lambda
72cc1dc7a3Sopenharmony_ci *     manager->init(<lambda>)
73cc1dc7a3Sopenharmony_ci *
74cc1dc7a3Sopenharmony_ci *     do
75cc1dc7a3Sopenharmony_ci *     {
76cc1dc7a3Sopenharmony_ci *         // Request a task assignment
77cc1dc7a3Sopenharmony_ci *         uint task_count;
78cc1dc7a3Sopenharmony_ci *         uint base_index = manager->get_tasks(<granule>, task_count);
79cc1dc7a3Sopenharmony_ci *
80cc1dc7a3Sopenharmony_ci *         // Process any tasks we were given (task_count <= granule size)
81cc1dc7a3Sopenharmony_ci *         if (task_count)
82cc1dc7a3Sopenharmony_ci *         {
83cc1dc7a3Sopenharmony_ci *             // Run the user task processing code for N tasks here
84cc1dc7a3Sopenharmony_ci *             ...
85cc1dc7a3Sopenharmony_ci *
86cc1dc7a3Sopenharmony_ci *             // Flag these tasks as complete
87cc1dc7a3Sopenharmony_ci *             manager->complete_tasks(task_count);
88cc1dc7a3Sopenharmony_ci *         }
89cc1dc7a3Sopenharmony_ci *     } while (task_count);
90cc1dc7a3Sopenharmony_ci *
91cc1dc7a3Sopenharmony_ci *     // Wait for all threads to complete tasks before progressing
92cc1dc7a3Sopenharmony_ci *     manager->wait()
93cc1dc7a3Sopenharmony_ci *
94cc1dc7a3Sopenharmony_ci  *     // Run the stage term; only first thread actually runs the lambda
95cc1dc7a3Sopenharmony_ci *     manager->term(<lambda>)
96cc1dc7a3Sopenharmony_ci */
97cc1dc7a3Sopenharmony_ciclass ParallelManager
98cc1dc7a3Sopenharmony_ci{
99cc1dc7a3Sopenharmony_ciprivate:
100cc1dc7a3Sopenharmony_ci	/** @brief Lock used for critical section and condition synchronization. */
101cc1dc7a3Sopenharmony_ci	std::mutex m_lock;
102cc1dc7a3Sopenharmony_ci
103cc1dc7a3Sopenharmony_ci	/** @brief True if the stage init() step has been executed. */
104cc1dc7a3Sopenharmony_ci	bool m_init_done;
105cc1dc7a3Sopenharmony_ci
106cc1dc7a3Sopenharmony_ci	/** @brief True if the stage term() step has been executed. */
107cc1dc7a3Sopenharmony_ci	bool m_term_done;
108cc1dc7a3Sopenharmony_ci
109cc1dc7a3Sopenharmony_ci	/** @brief Condition variable for tracking stage processing completion. */
110cc1dc7a3Sopenharmony_ci	std::condition_variable m_complete;
111cc1dc7a3Sopenharmony_ci
112cc1dc7a3Sopenharmony_ci	/** @brief Number of tasks started, but not necessarily finished. */
113cc1dc7a3Sopenharmony_ci	std::atomic<unsigned int> m_start_count;
114cc1dc7a3Sopenharmony_ci
115cc1dc7a3Sopenharmony_ci	/** @brief Number of tasks finished. */
116cc1dc7a3Sopenharmony_ci	unsigned int m_done_count;
117cc1dc7a3Sopenharmony_ci
118cc1dc7a3Sopenharmony_ci	/** @brief Number of tasks that need to be processed. */
119cc1dc7a3Sopenharmony_ci	unsigned int m_task_count;
120cc1dc7a3Sopenharmony_ci
121cc1dc7a3Sopenharmony_ci	/** @brief Progress callback (optional). */
122cc1dc7a3Sopenharmony_ci	astcenc_progress_callback m_callback;
123cc1dc7a3Sopenharmony_ci
124cc1dc7a3Sopenharmony_ci	/** @brief Lock used for callback synchronization. */
125cc1dc7a3Sopenharmony_ci	std::mutex m_callback_lock;
126cc1dc7a3Sopenharmony_ci
127cc1dc7a3Sopenharmony_ci	/** @brief Minimum progress before making a callback. */
128cc1dc7a3Sopenharmony_ci	float m_callback_min_diff;
129cc1dc7a3Sopenharmony_ci
130cc1dc7a3Sopenharmony_ci	/** @brief Last progress callback value. */
131cc1dc7a3Sopenharmony_ci	float m_callback_last_value;
132cc1dc7a3Sopenharmony_ci
133cc1dc7a3Sopenharmony_cipublic:
134cc1dc7a3Sopenharmony_ci	/** @brief Create a new ParallelManager. */
135cc1dc7a3Sopenharmony_ci	ParallelManager()
136cc1dc7a3Sopenharmony_ci	{
137cc1dc7a3Sopenharmony_ci		reset();
138cc1dc7a3Sopenharmony_ci	}
139cc1dc7a3Sopenharmony_ci
140cc1dc7a3Sopenharmony_ci	/**
141cc1dc7a3Sopenharmony_ci	 * @brief Reset the tracker for a new processing batch.
142cc1dc7a3Sopenharmony_ci	 *
143cc1dc7a3Sopenharmony_ci	 * This must be called from single-threaded code before starting the multi-threaded processing
144cc1dc7a3Sopenharmony_ci	 * operations.
145cc1dc7a3Sopenharmony_ci	 */
146cc1dc7a3Sopenharmony_ci	void reset()
147cc1dc7a3Sopenharmony_ci	{
148cc1dc7a3Sopenharmony_ci		m_init_done = false;
149cc1dc7a3Sopenharmony_ci		m_term_done = false;
150cc1dc7a3Sopenharmony_ci		m_start_count = 0;
151cc1dc7a3Sopenharmony_ci		m_done_count = 0;
152cc1dc7a3Sopenharmony_ci		m_task_count = 0;
153cc1dc7a3Sopenharmony_ci		m_callback_last_value = 0.0f;
154cc1dc7a3Sopenharmony_ci		m_callback_min_diff = 1.0f;
155cc1dc7a3Sopenharmony_ci	}
156cc1dc7a3Sopenharmony_ci
157cc1dc7a3Sopenharmony_ci	/**
158cc1dc7a3Sopenharmony_ci	 * @brief Trigger the pipeline stage init step.
159cc1dc7a3Sopenharmony_ci	 *
160cc1dc7a3Sopenharmony_ci	 * This can be called from multi-threaded code. The first thread to hit this will process the
161cc1dc7a3Sopenharmony_ci	 * initialization. Other threads will block and wait for it to complete.
162cc1dc7a3Sopenharmony_ci	 *
163cc1dc7a3Sopenharmony_ci	 * @param init_func   Callable which executes the stage initialization. It must return the
164cc1dc7a3Sopenharmony_ci	 *                    total number of tasks in the stage.
165cc1dc7a3Sopenharmony_ci	 */
166cc1dc7a3Sopenharmony_ci	void init(std::function<unsigned int(void)> init_func)
167cc1dc7a3Sopenharmony_ci	{
168cc1dc7a3Sopenharmony_ci		std::lock_guard<std::mutex> lck(m_lock);
169cc1dc7a3Sopenharmony_ci		if (!m_init_done)
170cc1dc7a3Sopenharmony_ci		{
171cc1dc7a3Sopenharmony_ci			m_task_count = init_func();
172cc1dc7a3Sopenharmony_ci			m_init_done = true;
173cc1dc7a3Sopenharmony_ci		}
174cc1dc7a3Sopenharmony_ci	}
175cc1dc7a3Sopenharmony_ci
176cc1dc7a3Sopenharmony_ci	/**
177cc1dc7a3Sopenharmony_ci	 * @brief Trigger the pipeline stage init step.
178cc1dc7a3Sopenharmony_ci	 *
179cc1dc7a3Sopenharmony_ci	 * This can be called from multi-threaded code. The first thread to hit this will process the
180cc1dc7a3Sopenharmony_ci	 * initialization. Other threads will block and wait for it to complete.
181cc1dc7a3Sopenharmony_ci	 *
182cc1dc7a3Sopenharmony_ci	 * @param task_count   Total number of tasks needing processing.
183cc1dc7a3Sopenharmony_ci	 * @param callback     Function pointer for progress status callbacks.
184cc1dc7a3Sopenharmony_ci	 */
185cc1dc7a3Sopenharmony_ci	void init(unsigned int task_count, astcenc_progress_callback callback)
186cc1dc7a3Sopenharmony_ci	{
187cc1dc7a3Sopenharmony_ci		std::lock_guard<std::mutex> lck(m_lock);
188cc1dc7a3Sopenharmony_ci		if (!m_init_done)
189cc1dc7a3Sopenharmony_ci		{
190cc1dc7a3Sopenharmony_ci			m_callback = callback;
191cc1dc7a3Sopenharmony_ci			m_task_count = task_count;
192cc1dc7a3Sopenharmony_ci			m_init_done = true;
193cc1dc7a3Sopenharmony_ci
194cc1dc7a3Sopenharmony_ci			// Report every 1% or 4096 blocks, whichever is larger, to avoid callback overhead
195cc1dc7a3Sopenharmony_ci			float min_diff = (4096.0f / static_cast<float>(task_count)) * 100.0f;
196cc1dc7a3Sopenharmony_ci			m_callback_min_diff = astc::max(min_diff, 1.0f);
197cc1dc7a3Sopenharmony_ci		}
198cc1dc7a3Sopenharmony_ci	}
199cc1dc7a3Sopenharmony_ci
200cc1dc7a3Sopenharmony_ci	/**
201cc1dc7a3Sopenharmony_ci	 * @brief Request a task assignment.
202cc1dc7a3Sopenharmony_ci	 *
203cc1dc7a3Sopenharmony_ci	 * Assign up to @c granule tasks to the caller for processing.
204cc1dc7a3Sopenharmony_ci	 *
205cc1dc7a3Sopenharmony_ci	 * @param      granule   Maximum number of tasks that can be assigned.
206cc1dc7a3Sopenharmony_ci	 * @param[out] count     Actual number of tasks assigned, or zero if no tasks were assigned.
207cc1dc7a3Sopenharmony_ci	 *
208cc1dc7a3Sopenharmony_ci	 * @return Task index of the first assigned task; assigned tasks increment from this.
209cc1dc7a3Sopenharmony_ci	 */
210cc1dc7a3Sopenharmony_ci	unsigned int get_task_assignment(unsigned int granule, unsigned int& count)
211cc1dc7a3Sopenharmony_ci	{
212cc1dc7a3Sopenharmony_ci		unsigned int base = m_start_count.fetch_add(granule, std::memory_order_relaxed);
213cc1dc7a3Sopenharmony_ci		if (base >= m_task_count)
214cc1dc7a3Sopenharmony_ci		{
215cc1dc7a3Sopenharmony_ci			count = 0;
216cc1dc7a3Sopenharmony_ci			return 0;
217cc1dc7a3Sopenharmony_ci		}
218cc1dc7a3Sopenharmony_ci
219cc1dc7a3Sopenharmony_ci		count = astc::min(m_task_count - base, granule);
220cc1dc7a3Sopenharmony_ci		return base;
221cc1dc7a3Sopenharmony_ci	}
222cc1dc7a3Sopenharmony_ci
223cc1dc7a3Sopenharmony_ci	/**
224cc1dc7a3Sopenharmony_ci	 * @brief Complete a task assignment.
225cc1dc7a3Sopenharmony_ci	 *
226cc1dc7a3Sopenharmony_ci	 * Mark @c count tasks as complete. This will notify all threads blocked on @c wait() if this
227cc1dc7a3Sopenharmony_ci	 * completes the processing of the stage.
228cc1dc7a3Sopenharmony_ci	 *
229cc1dc7a3Sopenharmony_ci	 * @param count   The number of completed tasks.
230cc1dc7a3Sopenharmony_ci	 */
231cc1dc7a3Sopenharmony_ci	void complete_task_assignment(unsigned int count)
232cc1dc7a3Sopenharmony_ci	{
233cc1dc7a3Sopenharmony_ci		// Note: m_done_count cannot use an atomic without the mutex; this has a race between the
234cc1dc7a3Sopenharmony_ci		// update here and the wait() for other threads
235cc1dc7a3Sopenharmony_ci		unsigned int local_count;
236cc1dc7a3Sopenharmony_ci		float local_last_value;
237cc1dc7a3Sopenharmony_ci		{
238cc1dc7a3Sopenharmony_ci			std::unique_lock<std::mutex> lck(m_lock);
239cc1dc7a3Sopenharmony_ci			m_done_count += count;
240cc1dc7a3Sopenharmony_ci			local_count = m_done_count;
241cc1dc7a3Sopenharmony_ci			local_last_value = m_callback_last_value;
242cc1dc7a3Sopenharmony_ci
243cc1dc7a3Sopenharmony_ci			if (m_done_count == m_task_count)
244cc1dc7a3Sopenharmony_ci			{
245cc1dc7a3Sopenharmony_ci				// Ensure the progress bar hits 100%
246cc1dc7a3Sopenharmony_ci				if (m_callback)
247cc1dc7a3Sopenharmony_ci				{
248cc1dc7a3Sopenharmony_ci					std::unique_lock<std::mutex> cblck(m_callback_lock);
249cc1dc7a3Sopenharmony_ci					m_callback(100.0f);
250cc1dc7a3Sopenharmony_ci					m_callback_last_value = 100.0f;
251cc1dc7a3Sopenharmony_ci				}
252cc1dc7a3Sopenharmony_ci
253cc1dc7a3Sopenharmony_ci				lck.unlock();
254cc1dc7a3Sopenharmony_ci				m_complete.notify_all();
255cc1dc7a3Sopenharmony_ci			}
256cc1dc7a3Sopenharmony_ci		}
257cc1dc7a3Sopenharmony_ci
258cc1dc7a3Sopenharmony_ci		// Process progress callback if we have one
259cc1dc7a3Sopenharmony_ci		if (m_callback)
260cc1dc7a3Sopenharmony_ci		{
261cc1dc7a3Sopenharmony_ci			// Initial lockless test - have we progressed enough to emit?
262cc1dc7a3Sopenharmony_ci			float num = static_cast<float>(local_count);
263cc1dc7a3Sopenharmony_ci			float den = static_cast<float>(m_task_count);
264cc1dc7a3Sopenharmony_ci			float this_value =  (num / den) * 100.0f;
265cc1dc7a3Sopenharmony_ci			bool report_test = (this_value - local_last_value) > m_callback_min_diff;
266cc1dc7a3Sopenharmony_ci
267cc1dc7a3Sopenharmony_ci			// Recheck under lock, because another thread might report first
268cc1dc7a3Sopenharmony_ci			if (report_test)
269cc1dc7a3Sopenharmony_ci			{
270cc1dc7a3Sopenharmony_ci				std::unique_lock<std::mutex> cblck(m_callback_lock);
271cc1dc7a3Sopenharmony_ci				bool report_retest = (this_value - m_callback_last_value) > m_callback_min_diff;
272cc1dc7a3Sopenharmony_ci				if (report_retest)
273cc1dc7a3Sopenharmony_ci				{
274cc1dc7a3Sopenharmony_ci					m_callback(this_value);
275cc1dc7a3Sopenharmony_ci					m_callback_last_value = this_value;
276cc1dc7a3Sopenharmony_ci				}
277cc1dc7a3Sopenharmony_ci			}
278cc1dc7a3Sopenharmony_ci		}
279cc1dc7a3Sopenharmony_ci	}
280cc1dc7a3Sopenharmony_ci
281cc1dc7a3Sopenharmony_ci	/**
282cc1dc7a3Sopenharmony_ci	 * @brief Wait for stage processing to complete.
283cc1dc7a3Sopenharmony_ci	 */
284cc1dc7a3Sopenharmony_ci	void wait()
285cc1dc7a3Sopenharmony_ci	{
286cc1dc7a3Sopenharmony_ci		std::unique_lock<std::mutex> lck(m_lock);
287cc1dc7a3Sopenharmony_ci		m_complete.wait(lck, [this]{ return m_done_count == m_task_count; });
288cc1dc7a3Sopenharmony_ci	}
289cc1dc7a3Sopenharmony_ci
290cc1dc7a3Sopenharmony_ci	/**
291cc1dc7a3Sopenharmony_ci	 * @brief Trigger the pipeline stage term step.
292cc1dc7a3Sopenharmony_ci	 *
293cc1dc7a3Sopenharmony_ci	 * This can be called from multi-threaded code. The first thread to hit this will process the
294cc1dc7a3Sopenharmony_ci	 * work pool termination. Caller must have called @c wait() prior to calling this function to
295cc1dc7a3Sopenharmony_ci	 * ensure that processing is complete.
296cc1dc7a3Sopenharmony_ci	 *
297cc1dc7a3Sopenharmony_ci	 * @param term_func   Callable which executes the stage termination.
298cc1dc7a3Sopenharmony_ci	 */
299cc1dc7a3Sopenharmony_ci	void term(std::function<void(void)> term_func)
300cc1dc7a3Sopenharmony_ci	{
301cc1dc7a3Sopenharmony_ci		std::lock_guard<std::mutex> lck(m_lock);
302cc1dc7a3Sopenharmony_ci		if (!m_term_done)
303cc1dc7a3Sopenharmony_ci		{
304cc1dc7a3Sopenharmony_ci			term_func();
305cc1dc7a3Sopenharmony_ci			m_term_done = true;
306cc1dc7a3Sopenharmony_ci		}
307cc1dc7a3Sopenharmony_ci	}
308cc1dc7a3Sopenharmony_ci};
309cc1dc7a3Sopenharmony_ci
310cc1dc7a3Sopenharmony_ci/**
311cc1dc7a3Sopenharmony_ci * @brief The astcenc compression context.
312cc1dc7a3Sopenharmony_ci */
313cc1dc7a3Sopenharmony_cistruct astcenc_context
314cc1dc7a3Sopenharmony_ci{
315cc1dc7a3Sopenharmony_ci	/** @brief The context internal state. */
316cc1dc7a3Sopenharmony_ci	astcenc_contexti context;
317cc1dc7a3Sopenharmony_ci
318cc1dc7a3Sopenharmony_ci#if !defined(ASTCENC_DECOMPRESS_ONLY)
319cc1dc7a3Sopenharmony_ci	/** @brief The parallel manager for averages computation. */
320cc1dc7a3Sopenharmony_ci	ParallelManager manage_avg;
321cc1dc7a3Sopenharmony_ci
322cc1dc7a3Sopenharmony_ci	/** @brief The parallel manager for compression. */
323cc1dc7a3Sopenharmony_ci	ParallelManager manage_compress;
324cc1dc7a3Sopenharmony_ci#endif
325cc1dc7a3Sopenharmony_ci
326cc1dc7a3Sopenharmony_ci	/** @brief The parallel manager for decompression. */
327cc1dc7a3Sopenharmony_ci	ParallelManager manage_decompress;
328cc1dc7a3Sopenharmony_ci};
329cc1dc7a3Sopenharmony_ci
330cc1dc7a3Sopenharmony_ci#endif
331