xref: /third_party/lzma/C/MtCoder.c (revision 370b324c)
1/* MtCoder.c -- Multi-thread Coder
22023-04-13 : Igor Pavlov : Public domain */
3
4#include "Precomp.h"
5
6#include "MtCoder.h"
7
8#ifndef Z7_ST
9
10static SRes MtProgressThunk_Progress(ICompressProgressPtr pp, UInt64 inSize, UInt64 outSize)
11{
12  Z7_CONTAINER_FROM_VTBL_TO_DECL_VAR_pp_vt_p(CMtProgressThunk)
13  UInt64 inSize2 = 0;
14  UInt64 outSize2 = 0;
15  if (inSize != (UInt64)(Int64)-1)
16  {
17    inSize2 = inSize - p->inSize;
18    p->inSize = inSize;
19  }
20  if (outSize != (UInt64)(Int64)-1)
21  {
22    outSize2 = outSize - p->outSize;
23    p->outSize = outSize;
24  }
25  return MtProgress_ProgressAdd(p->mtProgress, inSize2, outSize2);
26}
27
28
29void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
30{
31  p->vt.Progress = MtProgressThunk_Progress;
32}
33
34
35
36#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
37
38
39static THREAD_FUNC_DECL ThreadFunc(void *pp);
40
41
42static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
43{
44  WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->startEvent);
45  if (wres == 0)
46  {
47    t->stop = False;
48    if (!Thread_WasCreated(&t->thread))
49      wres = Thread_Create(&t->thread, ThreadFunc, t);
50    if (wres == 0)
51      wres = Event_Set(&t->startEvent);
52  }
53  if (wres == 0)
54    return SZ_OK;
55  return MY_SRes_HRESULT_FROM_WRes(wres);
56}
57
58
59static void MtCoderThread_Destruct(CMtCoderThread *t)
60{
61  if (Thread_WasCreated(&t->thread))
62  {
63    t->stop = 1;
64    Event_Set(&t->startEvent);
65    Thread_Wait_Close(&t->thread);
66  }
67
68  Event_Close(&t->startEvent);
69
70  if (t->inBuf)
71  {
72    ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
73    t->inBuf = NULL;
74  }
75}
76
77
78
79
80/*
81  ThreadFunc2() returns:
82  SZ_OK           - in all normal cases (even for stream error or memory allocation error)
83  SZ_ERROR_THREAD - in case of failure in system synch function
84*/
85
86static SRes ThreadFunc2(CMtCoderThread *t)
87{
88  CMtCoder *mtc = t->mtCoder;
89
90  for (;;)
91  {
92    unsigned bi;
93    SRes res;
94    SRes res2;
95    BoolInt finished;
96    unsigned bufIndex;
97    size_t size;
98    const Byte *inData;
99    UInt64 readProcessed = 0;
100
101    RINOK_THREAD(Event_Wait(&mtc->readEvent))
102
103    /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
104
105    if (mtc->stopReading)
106    {
107      return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
108    }
109
110    res = MtProgress_GetError(&mtc->mtProgress);
111
112    size = 0;
113    inData = NULL;
114    finished = True;
115
116    if (res == SZ_OK)
117    {
118      size = mtc->blockSize;
119      if (mtc->inStream)
120      {
121        if (!t->inBuf)
122        {
123          t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
124          if (!t->inBuf)
125            res = SZ_ERROR_MEM;
126        }
127        if (res == SZ_OK)
128        {
129          res = SeqInStream_ReadMax(mtc->inStream, t->inBuf, &size);
130          readProcessed = mtc->readProcessed + size;
131          mtc->readProcessed = readProcessed;
132        }
133        if (res != SZ_OK)
134        {
135          mtc->readRes = res;
136          /* after reading error - we can stop encoding of previous blocks */
137          MtProgress_SetError(&mtc->mtProgress, res);
138        }
139        else
140          finished = (size != mtc->blockSize);
141      }
142      else
143      {
144        size_t rem;
145        readProcessed = mtc->readProcessed;
146        rem = mtc->inDataSize - (size_t)readProcessed;
147        if (size > rem)
148          size = rem;
149        inData = mtc->inData + (size_t)readProcessed;
150        readProcessed += size;
151        mtc->readProcessed = readProcessed;
152        finished = (mtc->inDataSize == (size_t)readProcessed);
153      }
154    }
155
156    /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
157
158    res2 = SZ_OK;
159
160    if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
161    {
162      res2 = SZ_ERROR_THREAD;
163      if (res == SZ_OK)
164      {
165        res = res2;
166        // MtProgress_SetError(&mtc->mtProgress, res);
167      }
168    }
169
170    bi = mtc->blockIndex;
171
172    if (++mtc->blockIndex >= mtc->numBlocksMax)
173      mtc->blockIndex = 0;
174
175    bufIndex = (unsigned)(int)-1;
176
177    if (res == SZ_OK)
178      res = MtProgress_GetError(&mtc->mtProgress);
179
180    if (res != SZ_OK)
181      finished = True;
182
183    if (!finished)
184    {
185      if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
186          && mtc->expectedDataSize != readProcessed)
187      {
188        res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
189        if (res == SZ_OK)
190          mtc->numStartedThreads++;
191        else
192        {
193          MtProgress_SetError(&mtc->mtProgress, res);
194          finished = True;
195        }
196      }
197    }
198
199    if (finished)
200      mtc->stopReading = True;
201
202    RINOK_THREAD(Event_Set(&mtc->readEvent))
203
204    if (res2 != SZ_OK)
205      return res2;
206
207    if (res == SZ_OK)
208    {
209      CriticalSection_Enter(&mtc->cs);
210      bufIndex = mtc->freeBlockHead;
211      mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
212      CriticalSection_Leave(&mtc->cs);
213
214      res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
215          mtc->inStream ? t->inBuf : inData, size, finished);
216
217      // MtProgress_Reinit(&mtc->mtProgress, t->index);
218
219      if (res != SZ_OK)
220        MtProgress_SetError(&mtc->mtProgress, res);
221    }
222
223    {
224      CMtCoderBlock *block = &mtc->blocks[bi];
225      block->res = res;
226      block->bufIndex = bufIndex;
227      block->finished = finished;
228    }
229
230    #ifdef MTCODER_USE_WRITE_THREAD
231      RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
232    #else
233    {
234      unsigned wi;
235      {
236        CriticalSection_Enter(&mtc->cs);
237        wi = mtc->writeIndex;
238        if (wi == bi)
239          mtc->writeIndex = (unsigned)(int)-1;
240        else
241          mtc->ReadyBlocks[bi] = True;
242        CriticalSection_Leave(&mtc->cs);
243      }
244
245      if (wi != bi)
246      {
247        if (res != SZ_OK || finished)
248          return 0;
249        continue;
250      }
251
252      if (mtc->writeRes != SZ_OK)
253        res = mtc->writeRes;
254
255      for (;;)
256      {
257        if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
258        {
259          res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
260          if (res != SZ_OK)
261          {
262            mtc->writeRes = res;
263            MtProgress_SetError(&mtc->mtProgress, res);
264          }
265        }
266
267        if (++wi >= mtc->numBlocksMax)
268          wi = 0;
269        {
270          BoolInt isReady;
271
272          CriticalSection_Enter(&mtc->cs);
273
274          if (bufIndex != (unsigned)(int)-1)
275          {
276            mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
277            mtc->freeBlockHead = bufIndex;
278          }
279
280          isReady = mtc->ReadyBlocks[wi];
281
282          if (isReady)
283            mtc->ReadyBlocks[wi] = False;
284          else
285            mtc->writeIndex = wi;
286
287          CriticalSection_Leave(&mtc->cs);
288
289          RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
290
291          if (!isReady)
292            break;
293        }
294
295        {
296          CMtCoderBlock *block = &mtc->blocks[wi];
297          if (res == SZ_OK && block->res != SZ_OK)
298            res = block->res;
299          bufIndex = block->bufIndex;
300          finished = block->finished;
301        }
302      }
303    }
304    #endif
305
306    if (finished || res != SZ_OK)
307      return 0;
308  }
309}
310
311
312static THREAD_FUNC_DECL ThreadFunc(void *pp)
313{
314  CMtCoderThread *t = (CMtCoderThread *)pp;
315  for (;;)
316  {
317    if (Event_Wait(&t->startEvent) != 0)
318      return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
319    if (t->stop)
320      return 0;
321    {
322      SRes res = ThreadFunc2(t);
323      CMtCoder *mtc = t->mtCoder;
324      if (res != SZ_OK)
325      {
326        MtProgress_SetError(&mtc->mtProgress, res);
327      }
328
329      #ifndef MTCODER_USE_WRITE_THREAD
330      {
331        unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
332        if (numFinished == mtc->numStartedThreads)
333          if (Event_Set(&mtc->finishedEvent) != 0)
334            return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
335      }
336      #endif
337    }
338  }
339}
340
341
342
343void MtCoder_Construct(CMtCoder *p)
344{
345  unsigned i;
346
347  p->blockSize = 0;
348  p->numThreadsMax = 0;
349  p->expectedDataSize = (UInt64)(Int64)-1;
350
351  p->inStream = NULL;
352  p->inData = NULL;
353  p->inDataSize = 0;
354
355  p->progress = NULL;
356  p->allocBig = NULL;
357
358  p->mtCallback = NULL;
359  p->mtCallbackObject = NULL;
360
361  p->allocatedBufsSize = 0;
362
363  Event_Construct(&p->readEvent);
364  Semaphore_Construct(&p->blocksSemaphore);
365
366  for (i = 0; i < MTCODER_THREADS_MAX; i++)
367  {
368    CMtCoderThread *t = &p->threads[i];
369    t->mtCoder = p;
370    t->index = i;
371    t->inBuf = NULL;
372    t->stop = False;
373    Event_Construct(&t->startEvent);
374    Thread_CONSTRUCT(&t->thread)
375  }
376
377  #ifdef MTCODER_USE_WRITE_THREAD
378    for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
379      Event_Construct(&p->writeEvents[i]);
380  #else
381    Event_Construct(&p->finishedEvent);
382  #endif
383
384  CriticalSection_Init(&p->cs);
385  CriticalSection_Init(&p->mtProgress.cs);
386}
387
388
389
390
391static void MtCoder_Free(CMtCoder *p)
392{
393  unsigned i;
394
395  /*
396  p->stopReading = True;
397  if (Event_IsCreated(&p->readEvent))
398    Event_Set(&p->readEvent);
399  */
400
401  for (i = 0; i < MTCODER_THREADS_MAX; i++)
402    MtCoderThread_Destruct(&p->threads[i]);
403
404  Event_Close(&p->readEvent);
405  Semaphore_Close(&p->blocksSemaphore);
406
407  #ifdef MTCODER_USE_WRITE_THREAD
408    for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
409      Event_Close(&p->writeEvents[i]);
410  #else
411    Event_Close(&p->finishedEvent);
412  #endif
413}
414
415
416void MtCoder_Destruct(CMtCoder *p)
417{
418  MtCoder_Free(p);
419
420  CriticalSection_Delete(&p->cs);
421  CriticalSection_Delete(&p->mtProgress.cs);
422}
423
424
425SRes MtCoder_Code(CMtCoder *p)
426{
427  unsigned numThreads = p->numThreadsMax;
428  unsigned numBlocksMax;
429  unsigned i;
430  SRes res = SZ_OK;
431
432  if (numThreads > MTCODER_THREADS_MAX)
433    numThreads = MTCODER_THREADS_MAX;
434  numBlocksMax = MTCODER_GET_NUM_BLOCKS_FROM_THREADS(numThreads);
435
436  if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
437  if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
438  if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
439
440  if (numBlocksMax > MTCODER_BLOCKS_MAX)
441    numBlocksMax = MTCODER_BLOCKS_MAX;
442
443  if (p->blockSize != p->allocatedBufsSize)
444  {
445    for (i = 0; i < MTCODER_THREADS_MAX; i++)
446    {
447      CMtCoderThread *t = &p->threads[i];
448      if (t->inBuf)
449      {
450        ISzAlloc_Free(p->allocBig, t->inBuf);
451        t->inBuf = NULL;
452      }
453    }
454    p->allocatedBufsSize = p->blockSize;
455  }
456
457  p->readRes = SZ_OK;
458
459  MtProgress_Init(&p->mtProgress, p->progress);
460
461  #ifdef MTCODER_USE_WRITE_THREAD
462    for (i = 0; i < numBlocksMax; i++)
463    {
464      RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->writeEvents[i]))
465    }
466  #else
467    RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
468  #endif
469
470  {
471    RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->readEvent))
472    RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax))
473  }
474
475  for (i = 0; i < MTCODER_BLOCKS_MAX - 1; i++)
476    p->freeBlockList[i] = i + 1;
477  p->freeBlockList[MTCODER_BLOCKS_MAX - 1] = (unsigned)(int)-1;
478  p->freeBlockHead = 0;
479
480  p->readProcessed = 0;
481  p->blockIndex = 0;
482  p->numBlocksMax = numBlocksMax;
483  p->stopReading = False;
484
485  #ifndef MTCODER_USE_WRITE_THREAD
486    p->writeIndex = 0;
487    p->writeRes = SZ_OK;
488    for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
489      p->ReadyBlocks[i] = False;
490    p->numFinishedThreads = 0;
491  #endif
492
493  p->numStartedThreadsLimit = numThreads;
494  p->numStartedThreads = 0;
495
496  // for (i = 0; i < numThreads; i++)
497  {
498    CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
499    RINOK(MtCoderThread_CreateAndStart(nextThread))
500  }
501
502  RINOK_THREAD(Event_Set(&p->readEvent))
503
504  #ifdef MTCODER_USE_WRITE_THREAD
505  {
506    unsigned bi = 0;
507
508    for (;; bi++)
509    {
510      if (bi >= numBlocksMax)
511        bi = 0;
512
513      RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
514
515      {
516        const CMtCoderBlock *block = &p->blocks[bi];
517        unsigned bufIndex = block->bufIndex;
518        BoolInt finished = block->finished;
519        if (res == SZ_OK && block->res != SZ_OK)
520          res = block->res;
521
522        if (bufIndex != (unsigned)(int)-1)
523        {
524          if (res == SZ_OK)
525          {
526            res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
527            if (res != SZ_OK)
528              MtProgress_SetError(&p->mtProgress, res);
529          }
530
531          CriticalSection_Enter(&p->cs);
532          {
533            p->freeBlockList[bufIndex] = p->freeBlockHead;
534            p->freeBlockHead = bufIndex;
535          }
536          CriticalSection_Leave(&p->cs);
537        }
538
539        RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
540
541        if (finished)
542          break;
543      }
544    }
545  }
546  #else
547  {
548    WRes wres = Event_Wait(&p->finishedEvent);
549    res = MY_SRes_HRESULT_FROM_WRes(wres);
550  }
551  #endif
552
553  if (res == SZ_OK)
554    res = p->readRes;
555
556  if (res == SZ_OK)
557    res = p->mtProgress.res;
558
559  #ifndef MTCODER_USE_WRITE_THREAD
560    if (res == SZ_OK)
561      res = p->writeRes;
562  #endif
563
564  if (res != SZ_OK)
565    MtCoder_Free(p);
566  return res;
567}
568
569#endif
570
571#undef RINOK_THREAD
572