xref: /third_party/lzma/C/MtDec.c (revision 370b324c)
1/* MtDec.c -- Multi-thread Decoder
22023-04-02 : Igor Pavlov : Public domain */
3
4#include "Precomp.h"
5
6// #define SHOW_DEBUG_INFO
7
8// #include <stdio.h>
9#include <string.h>
10
11#ifdef SHOW_DEBUG_INFO
12#include <stdio.h>
13#endif
14
15#include "MtDec.h"
16
17#ifndef Z7_ST
18
19#ifdef SHOW_DEBUG_INFO
20#define PRF(x) x
21#else
22#define PRF(x)
23#endif
24
25#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
26
27void MtProgress_Init(CMtProgress *p, ICompressProgressPtr progress)
28{
29  p->progress = progress;
30  p->res = SZ_OK;
31  p->totalInSize = 0;
32  p->totalOutSize = 0;
33}
34
35
36SRes MtProgress_Progress_ST(CMtProgress *p)
37{
38  if (p->res == SZ_OK && p->progress)
39    if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
40      p->res = SZ_ERROR_PROGRESS;
41  return p->res;
42}
43
44
45SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
46{
47  SRes res;
48  CriticalSection_Enter(&p->cs);
49
50  p->totalInSize += inSize;
51  p->totalOutSize += outSize;
52  if (p->res == SZ_OK && p->progress)
53    if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
54      p->res = SZ_ERROR_PROGRESS;
55  res = p->res;
56
57  CriticalSection_Leave(&p->cs);
58  return res;
59}
60
61
62SRes MtProgress_GetError(CMtProgress *p)
63{
64  SRes res;
65  CriticalSection_Enter(&p->cs);
66  res = p->res;
67  CriticalSection_Leave(&p->cs);
68  return res;
69}
70
71
72void MtProgress_SetError(CMtProgress *p, SRes res)
73{
74  CriticalSection_Enter(&p->cs);
75  if (p->res == SZ_OK)
76    p->res = res;
77  CriticalSection_Leave(&p->cs);
78}
79
80
81#define RINOK_THREAD(x) RINOK_WRes(x)
82
83
84struct CMtDecBufLink_
85{
86  struct CMtDecBufLink_ *next;
87  void *pad[3];
88};
89
90typedef struct CMtDecBufLink_ CMtDecBufLink;
91
92#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
93#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
94
95
96
97static THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp);
98
99
100static WRes MtDecThread_CreateEvents(CMtDecThread *t)
101{
102  WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->canWrite);
103  if (wres == 0)
104  {
105    wres = AutoResetEvent_OptCreate_And_Reset(&t->canRead);
106    if (wres == 0)
107      return SZ_OK;
108  }
109  return wres;
110}
111
112
113static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
114{
115  WRes wres = MtDecThread_CreateEvents(t);
116  // wres = 17; // for test
117  if (wres == 0)
118  {
119    if (Thread_WasCreated(&t->thread))
120      return SZ_OK;
121    wres = Thread_Create(&t->thread, MtDec_ThreadFunc, t);
122    if (wres == 0)
123      return SZ_OK;
124  }
125  return MY_SRes_HRESULT_FROM_WRes(wres);
126}
127
128
129void MtDecThread_FreeInBufs(CMtDecThread *t)
130{
131  if (t->inBuf)
132  {
133    void *link = t->inBuf;
134    t->inBuf = NULL;
135    do
136    {
137      void *next = ((CMtDecBufLink *)link)->next;
138      ISzAlloc_Free(t->mtDec->alloc, link);
139      link = next;
140    }
141    while (link);
142  }
143}
144
145
146static void MtDecThread_CloseThread(CMtDecThread *t)
147{
148  if (Thread_WasCreated(&t->thread))
149  {
150    Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
151    Event_Set(&t->canRead);
152    Thread_Wait_Close(&t->thread);
153  }
154
155  Event_Close(&t->canRead);
156  Event_Close(&t->canWrite);
157}
158
159static void MtDec_CloseThreads(CMtDec *p)
160{
161  unsigned i;
162  for (i = 0; i < MTDEC_THREADS_MAX; i++)
163    MtDecThread_CloseThread(&p->threads[i]);
164}
165
166static void MtDecThread_Destruct(CMtDecThread *t)
167{
168  MtDecThread_CloseThread(t);
169  MtDecThread_FreeInBufs(t);
170}
171
172
173
174static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
175{
176  SRes res;
177  CriticalSection_Enter(&p->mtProgress.cs);
178  *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
179  res = p->mtProgress.res;
180  CriticalSection_Leave(&p->mtProgress.cs);
181  return res;
182}
183
184static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
185{
186  SRes res;
187  CriticalSection_Enter(&p->mtProgress.cs);
188
189  p->mtProgress.totalInSize += inSize;
190  p->mtProgress.totalOutSize += outSize;
191  if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
192    if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
193      p->mtProgress.res = SZ_ERROR_PROGRESS;
194
195  *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
196  res = p->mtProgress.res;
197
198  CriticalSection_Leave(&p->mtProgress.cs);
199
200  return res;
201}
202
203static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
204{
205  CriticalSection_Enter(&p->mtProgress.cs);
206  if (!p->needInterrupt || interruptIndex < p->interruptIndex)
207  {
208    p->interruptIndex = interruptIndex;
209    p->needInterrupt = True;
210  }
211  CriticalSection_Leave(&p->mtProgress.cs);
212}
213
214Byte *MtDec_GetCrossBuff(CMtDec *p)
215{
216  Byte *cr = p->crossBlock;
217  if (!cr)
218  {
219    cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
220    if (!cr)
221      return NULL;
222    p->crossBlock = cr;
223  }
224  return MTDEC__DATA_PTR_FROM_LINK(cr);
225}
226
227
228/*
229  MtDec_ThreadFunc2() returns:
230  0      - in all normal cases (even for stream error or memory allocation error)
231  (!= 0) - WRes error return by system threading function
232*/
233
234// #define MTDEC_ProgessStep (1 << 22)
235#define MTDEC_ProgessStep (1 << 0)
236
237static WRes MtDec_ThreadFunc2(CMtDecThread *t)
238{
239  CMtDec *p = t->mtDec;
240
241  PRF_STR_INT("MtDec_ThreadFunc2", t->index)
242
243  // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
244
245  for (;;)
246  {
247    SRes res, codeRes;
248    BoolInt wasInterrupted, isAllocError, overflow, finish;
249    SRes threadingErrorSRes;
250    BoolInt needCode, needWrite, needContinue;
251
252    size_t inDataSize_Start;
253    UInt64 inDataSize;
254    // UInt64 inDataSize_Full;
255
256    UInt64 blockIndex;
257
258    UInt64 inPrev = 0;
259    UInt64 outPrev = 0;
260    UInt64 inCodePos;
261    UInt64 outCodePos;
262
263    Byte *afterEndData = NULL;
264    size_t afterEndData_Size = 0;
265    BoolInt afterEndData_IsCross = False;
266
267    BoolInt canCreateNewThread = False;
268    // CMtDecCallbackInfo parse;
269    CMtDecThread *nextThread;
270
271    PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index)
272
273    RINOK_THREAD(Event_Wait(&t->canRead))
274    if (p->exitThread)
275      return 0;
276
277    PRF_STR_INT("after Event_Wait(&t->canRead)", t->index)
278
279    // if (t->index == 3) return 19; // for test
280
281    blockIndex = p->blockIndex++;
282
283    // PRF(printf("\ncanRead\n"))
284
285    res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
286
287    finish = p->readWasFinished;
288    needCode = False;
289    needWrite = False;
290    isAllocError = False;
291    overflow = False;
292
293    inDataSize_Start = 0;
294    inDataSize = 0;
295    // inDataSize_Full = 0;
296
297    if (res == SZ_OK && !wasInterrupted)
298    {
299      // if (p->inStream)
300      {
301        CMtDecBufLink *prev = NULL;
302        CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
303        size_t crossSize = p->crossEnd - p->crossStart;
304
305        PRF(printf("\ncrossSize = %d\n", crossSize));
306
307        for (;;)
308        {
309          if (!link)
310          {
311            link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
312            if (!link)
313            {
314              finish = True;
315              // p->allocError_for_Read_BlockIndex = blockIndex;
316              isAllocError = True;
317              break;
318            }
319            link->next = NULL;
320            if (prev)
321            {
322              // static unsigned g_num = 0;
323              // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
324              prev->next = link;
325            }
326            else
327              t->inBuf = (void *)link;
328          }
329
330          {
331            Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
332            Byte *parseData = data;
333            size_t size;
334
335            if (crossSize != 0)
336            {
337              inDataSize = crossSize;
338              // inDataSize_Full = inDataSize;
339              inDataSize_Start = crossSize;
340              size = crossSize;
341              parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
342              PRF(printf("\ncross : crossStart = %7d  crossEnd = %7d finish = %1d",
343                  (int)p->crossStart, (int)p->crossEnd, (int)finish));
344            }
345            else
346            {
347              size = p->inBufSize;
348
349              res = SeqInStream_ReadMax(p->inStream, data, &size);
350
351              // size = 10; // test
352
353              inDataSize += size;
354              // inDataSize_Full = inDataSize;
355              if (!prev)
356                inDataSize_Start = size;
357
358              p->readProcessed += size;
359              finish = (size != p->inBufSize);
360              if (finish)
361                p->readWasFinished = True;
362
363              // res = E_INVALIDARG; // test
364
365              if (res != SZ_OK)
366              {
367                // PRF(printf("\nRead error = %d\n", res))
368                // we want to decode all data before error
369                p->readRes = res;
370                // p->readError_BlockIndex = blockIndex;
371                p->readWasFinished = True;
372                finish = True;
373                res = SZ_OK;
374                // break;
375              }
376
377              if (inDataSize - inPrev >= MTDEC_ProgessStep)
378              {
379                res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
380                if (res != SZ_OK || wasInterrupted)
381                  break;
382                inPrev = inDataSize;
383              }
384            }
385
386            {
387              CMtDecCallbackInfo parse;
388
389              parse.startCall = (prev == NULL);
390              parse.src = parseData;
391              parse.srcSize = size;
392              parse.srcFinished = finish;
393              parse.canCreateNewThread = True;
394
395              PRF(printf("\nParse size = %d\n", (unsigned)size));
396
397              p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
398
399              PRF(printf("   Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));
400
401              needWrite = True;
402              canCreateNewThread = parse.canCreateNewThread;
403
404              // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
405
406              if (
407                  // parseRes != SZ_OK ||
408                  // inDataSize - (size - parse.srcSize) > p->inBlockMax
409                  // ||
410                  parse.state == MTDEC_PARSE_OVERFLOW
411                  // || wasInterrupted
412                  )
413              {
414                // Overflow or Parse error - switch from MT decoding to ST decoding
415                finish = True;
416                overflow = True;
417
418                {
419                  PRF(printf("\n Overflow"));
420                  // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
421                  PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
422                }
423
424                if (crossSize != 0)
425                  memcpy(data, parseData, size);
426                p->crossStart = 0;
427                p->crossEnd = 0;
428                break;
429              }
430
431              if (crossSize != 0)
432              {
433                memcpy(data, parseData, parse.srcSize);
434                p->crossStart += parse.srcSize;
435              }
436
437              if (parse.state != MTDEC_PARSE_CONTINUE || finish)
438              {
439                // we don't need to parse in current thread anymore
440
441                if (parse.state == MTDEC_PARSE_END)
442                  finish = True;
443
444                needCode = True;
445                // p->crossFinished = finish;
446
447                if (parse.srcSize == size)
448                {
449                  // full parsed - no cross transfer
450                  p->crossStart = 0;
451                  p->crossEnd = 0;
452                  break;
453                }
454
455                if (parse.state == MTDEC_PARSE_END)
456                {
457                  afterEndData = parseData + parse.srcSize;
458                  afterEndData_Size = size - parse.srcSize;
459                  if (crossSize != 0)
460                    afterEndData_IsCross = True;
461                  // we reduce data size to required bytes (parsed only)
462                  inDataSize -= afterEndData_Size;
463                  if (!prev)
464                    inDataSize_Start = parse.srcSize;
465                  break;
466                }
467
468                {
469                  // partial parsed - need cross transfer
470                  if (crossSize != 0)
471                    inDataSize = parse.srcSize; // it's only parsed now
472                  else
473                  {
474                    // partial parsed - is not in initial cross block - we need to copy new data to cross block
475                    Byte *cr = MtDec_GetCrossBuff(p);
476                    if (!cr)
477                    {
478                      {
479                        PRF(printf("\ncross alloc error error\n"));
480                        // res = SZ_ERROR_MEM;
481                        finish = True;
482                        // p->allocError_for_Read_BlockIndex = blockIndex;
483                        isAllocError = True;
484                        break;
485                      }
486                    }
487
488                    {
489                      size_t crSize = size - parse.srcSize;
490                      inDataSize -= crSize;
491                      p->crossEnd = crSize;
492                      p->crossStart = 0;
493                      memcpy(cr, parseData + parse.srcSize, crSize);
494                    }
495                  }
496
497                  // inDataSize_Full = inDataSize;
498                  if (!prev)
499                    inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
500
501                  finish = False;
502                  break;
503                }
504              }
505
506              if (parse.srcSize != size)
507              {
508                res = SZ_ERROR_FAIL;
509                PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
510                break;
511              }
512            }
513          }
514
515          prev = link;
516          link = link->next;
517
518          if (crossSize != 0)
519          {
520            crossSize = 0;
521            p->crossStart = 0;
522            p->crossEnd = 0;
523          }
524        }
525      }
526
527      if (res == SZ_OK)
528        res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
529    }
530
531    codeRes = SZ_OK;
532
533    if (res == SZ_OK && needCode && !wasInterrupted)
534    {
535      codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
536      if (codeRes != SZ_OK)
537      {
538        needCode = False;
539        finish = True;
540        // SZ_ERROR_MEM is expected error here.
541        //   if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
542        //   if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
543      }
544    }
545
546    if (res != SZ_OK || wasInterrupted)
547      finish = True;
548
549    nextThread = NULL;
550    threadingErrorSRes = SZ_OK;
551
552    if (!finish)
553    {
554      if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
555      {
556        SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
557        if (res2 == SZ_OK)
558        {
559          // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
560          p->numStartedThreads++;
561        }
562        else
563        {
564          PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
565          if (p->numStartedThreads == 1)
566          {
567            // if only one thread is possible, we leave muti-threading code
568            finish = True;
569            needCode = False;
570            threadingErrorSRes = res2;
571          }
572          else
573            p->numStartedThreads_Limit = p->numStartedThreads;
574        }
575      }
576
577      if (!finish)
578      {
579        unsigned nextIndex = t->index + 1;
580        nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
581        RINOK_THREAD(Event_Set(&nextThread->canRead))
582        // We have started executing for new iteration (with next thread)
583        // And that next thread now is responsible for possible exit from decoding (threading_code)
584      }
585    }
586
587    // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
588    // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
589    // if (  finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
590    //   - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
591    //   - otherwise we stop decoding and exit from MtDec_ThreadFunc2()
592
593    // Don't change (finish) variable in the further code
594
595
596    // ---------- CODE ----------
597
598    inPrev = 0;
599    outPrev = 0;
600    inCodePos = 0;
601    outCodePos = 0;
602
603    if (res == SZ_OK && needCode && codeRes == SZ_OK)
604    {
605      BoolInt isStartBlock = True;
606      CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
607
608      for (;;)
609      {
610        size_t inSize;
611        int stop;
612
613        if (isStartBlock)
614          inSize = inDataSize_Start;
615        else
616        {
617          UInt64 rem = inDataSize - inCodePos;
618          inSize = p->inBufSize;
619          if (inSize > rem)
620            inSize = (size_t)rem;
621        }
622
623        inCodePos += inSize;
624        stop = True;
625
626        codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
627            (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
628            (inCodePos == inDataSize), // srcFinished
629            &inCodePos, &outCodePos, &stop);
630
631        if (codeRes != SZ_OK)
632        {
633          PRF(printf("\nCode Interrupt error = %x\n", codeRes));
634          // we interrupt only later blocks
635          MtDec_Interrupt(p, blockIndex);
636          break;
637        }
638
639        if (stop || inCodePos == inDataSize)
640          break;
641
642        {
643          const UInt64 inDelta = inCodePos - inPrev;
644          const UInt64 outDelta = outCodePos - outPrev;
645          if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
646          {
647            // Sleep(1);
648            res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
649            if (res != SZ_OK || wasInterrupted)
650              break;
651            inPrev = inCodePos;
652            outPrev = outCodePos;
653          }
654        }
655
656        link = link->next;
657        isStartBlock = False;
658      }
659    }
660
661
662    // ---------- WRITE ----------
663
664    RINOK_THREAD(Event_Wait(&t->canWrite))
665
666  {
667    BoolInt isErrorMode = False;
668    BoolInt canRecode = True;
669    BoolInt needWriteToStream = needWrite;
670
671    if (p->exitThread) return 0; // it's never executed in normal cases
672
673    if (p->wasInterrupted)
674      wasInterrupted = True;
675    else
676    {
677      if (codeRes != SZ_OK) // || !needCode // check it !!!
678      {
679        p->wasInterrupted = True;
680        p->codeRes = codeRes;
681        if (codeRes == SZ_ERROR_MEM)
682          isAllocError = True;
683      }
684
685      if (threadingErrorSRes)
686      {
687        p->wasInterrupted = True;
688        p->threadingErrorSRes = threadingErrorSRes;
689        needWriteToStream = False;
690      }
691      if (isAllocError)
692      {
693        p->wasInterrupted = True;
694        p->isAllocError = True;
695        needWriteToStream = False;
696      }
697      if (overflow)
698      {
699        p->wasInterrupted = True;
700        p->overflow = True;
701        needWriteToStream = False;
702      }
703    }
704
705    if (needCode)
706    {
707      if (wasInterrupted)
708      {
709        inCodePos = 0;
710        outCodePos = 0;
711      }
712      {
713        const UInt64 inDelta = inCodePos - inPrev;
714        const UInt64 outDelta = outCodePos - outPrev;
715        // if (inDelta != 0 || outDelta != 0)
716        res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
717      }
718    }
719
720    needContinue = (!finish);
721
722    // if (res == SZ_OK && needWrite && !wasInterrupted)
723    if (needWrite)
724    {
725      // p->inProcessed += inCodePos;
726
727      PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));
728
729      res = p->mtCallback->Write(p->mtCallbackObject, t->index,
730          res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
731          afterEndData, afterEndData_Size, afterEndData_IsCross,
732          &needContinue,
733          &canRecode);
734
735      // res = SZ_ERROR_FAIL; // for test
736
737      PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
738      PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
739
740      if (res != SZ_OK)
741      {
742        PRF(printf("\nWrite error = %d\n", res));
743        isErrorMode = True;
744        p->wasInterrupted = True;
745      }
746      if (res != SZ_OK
747          || (!needContinue && !finish))
748      {
749        PRF(printf("\nWrite Interrupt error = %x\n", res));
750        MtDec_Interrupt(p, blockIndex);
751      }
752    }
753
754    if (canRecode)
755    if (!needCode
756        || res != SZ_OK
757        || p->wasInterrupted
758        || codeRes != SZ_OK
759        || wasInterrupted
760        || p->numFilledThreads != 0
761        || isErrorMode)
762    {
763      if (p->numFilledThreads == 0)
764        p->filledThreadStart = t->index;
765      if (inDataSize != 0 || !finish)
766      {
767        t->inDataSize_Start = inDataSize_Start;
768        t->inDataSize = inDataSize;
769        p->numFilledThreads++;
770      }
771      PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
772      PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
773    }
774
775    if (!finish)
776    {
777      RINOK_THREAD(Event_Set(&nextThread->canWrite))
778    }
779    else
780    {
781      if (needContinue)
782      {
783        // we restore decoding with new iteration
784        RINOK_THREAD(Event_Set(&p->threads[0].canWrite))
785      }
786      else
787      {
788        // we exit from decoding
789        if (t->index == 0)
790          return SZ_OK;
791        p->exitThread = True;
792      }
793      RINOK_THREAD(Event_Set(&p->threads[0].canRead))
794    }
795  }
796  }
797}
798
799#ifdef _WIN32
800#define USE_ALLOCA
801#endif
802
803#ifdef USE_ALLOCA
804#ifdef _WIN32
805#include <malloc.h>
806#else
807#include <stdlib.h>
808#endif
809#endif
810
811
812static THREAD_FUNC_DECL MtDec_ThreadFunc1(void *pp)
813{
814  WRes res;
815
816  CMtDecThread *t = (CMtDecThread *)pp;
817  CMtDec *p;
818
819  // fprintf(stdout, "\n%d = %p\n", t->index, &t);
820
821  res = MtDec_ThreadFunc2(t);
822  p = t->mtDec;
823  if (res == 0)
824    return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;
825  {
826    // it's unexpected situation for some threading function error
827    if (p->exitThreadWRes == 0)
828      p->exitThreadWRes = res;
829    PRF(printf("\nthread exit error = %d\n", res));
830    p->exitThread = True;
831    Event_Set(&p->threads[0].canRead);
832    Event_Set(&p->threads[0].canWrite);
833    MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
834  }
835  return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;
836}
837
838static Z7_NO_INLINE THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp)
839{
840  #ifdef USE_ALLOCA
841  CMtDecThread *t = (CMtDecThread *)pp;
842  // fprintf(stderr, "\n%d = %p - before", t->index, &t);
843  t->allocaPtr = alloca(t->index * 128);
844  #endif
845  return MtDec_ThreadFunc1(pp);
846}
847
848
849int MtDec_PrepareRead(CMtDec *p)
850{
851  if (p->crossBlock && p->crossStart == p->crossEnd)
852  {
853    ISzAlloc_Free(p->alloc, p->crossBlock);
854    p->crossBlock = NULL;
855  }
856
857  {
858    unsigned i;
859    for (i = 0; i < MTDEC_THREADS_MAX; i++)
860      if (i > p->numStartedThreads
861          || p->numFilledThreads <=
862            (i >= p->filledThreadStart ?
863              i - p->filledThreadStart :
864              i + p->numStartedThreads - p->filledThreadStart))
865        MtDecThread_FreeInBufs(&p->threads[i]);
866  }
867
868  return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
869}
870
871
872const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
873{
874  while (p->numFilledThreads != 0)
875  {
876    CMtDecThread *t = &p->threads[p->filledThreadStart];
877
878    if (*inLim != 0)
879    {
880      {
881        void *link = t->inBuf;
882        void *next = ((CMtDecBufLink *)link)->next;
883        ISzAlloc_Free(p->alloc, link);
884        t->inBuf = next;
885      }
886
887      if (t->inDataSize == 0)
888      {
889        MtDecThread_FreeInBufs(t);
890        if (--p->numFilledThreads == 0)
891          break;
892        if (++p->filledThreadStart == p->numStartedThreads)
893          p->filledThreadStart = 0;
894        t = &p->threads[p->filledThreadStart];
895      }
896    }
897
898    {
899      size_t lim = t->inDataSize_Start;
900      if (lim != 0)
901        t->inDataSize_Start = 0;
902      else
903      {
904        UInt64 rem = t->inDataSize;
905        lim = p->inBufSize;
906        if (lim > rem)
907          lim = (size_t)rem;
908      }
909      t->inDataSize -= lim;
910      *inLim = lim;
911      return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
912    }
913  }
914
915  {
916    size_t crossSize = p->crossEnd - p->crossStart;
917    if (crossSize != 0)
918    {
919      const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
920      *inLim = crossSize;
921      p->crossStart = 0;
922      p->crossEnd = 0;
923      return data;
924    }
925    *inLim = 0;
926    if (p->crossBlock)
927    {
928      ISzAlloc_Free(p->alloc, p->crossBlock);
929      p->crossBlock = NULL;
930    }
931    return NULL;
932  }
933}
934
935
936void MtDec_Construct(CMtDec *p)
937{
938  unsigned i;
939
940  p->inBufSize = (size_t)1 << 18;
941
942  p->numThreadsMax = 0;
943
944  p->inStream = NULL;
945
946  // p->inData = NULL;
947  // p->inDataSize = 0;
948
949  p->crossBlock = NULL;
950  p->crossStart = 0;
951  p->crossEnd = 0;
952
953  p->numFilledThreads = 0;
954
955  p->progress = NULL;
956  p->alloc = NULL;
957
958  p->mtCallback = NULL;
959  p->mtCallbackObject = NULL;
960
961  p->allocatedBufsSize = 0;
962
963  for (i = 0; i < MTDEC_THREADS_MAX; i++)
964  {
965    CMtDecThread *t = &p->threads[i];
966    t->mtDec = p;
967    t->index = i;
968    t->inBuf = NULL;
969    Event_Construct(&t->canRead);
970    Event_Construct(&t->canWrite);
971    Thread_CONSTRUCT(&t->thread)
972  }
973
974  // Event_Construct(&p->finishedEvent);
975
976  CriticalSection_Init(&p->mtProgress.cs);
977}
978
979
980static void MtDec_Free(CMtDec *p)
981{
982  unsigned i;
983
984  p->exitThread = True;
985
986  for (i = 0; i < MTDEC_THREADS_MAX; i++)
987    MtDecThread_Destruct(&p->threads[i]);
988
989  // Event_Close(&p->finishedEvent);
990
991  if (p->crossBlock)
992  {
993    ISzAlloc_Free(p->alloc, p->crossBlock);
994    p->crossBlock = NULL;
995  }
996}
997
998
999void MtDec_Destruct(CMtDec *p)
1000{
1001  MtDec_Free(p);
1002
1003  CriticalSection_Delete(&p->mtProgress.cs);
1004}
1005
1006
1007SRes MtDec_Code(CMtDec *p)
1008{
1009  unsigned i;
1010
1011  p->inProcessed = 0;
1012
1013  p->blockIndex = 1; // it must be larger than not_defined index (0)
1014  p->isAllocError = False;
1015  p->overflow = False;
1016  p->threadingErrorSRes = SZ_OK;
1017
1018  p->needContinue = True;
1019
1020  p->readWasFinished = False;
1021  p->needInterrupt = False;
1022  p->interruptIndex = (UInt64)(Int64)-1;
1023
1024  p->readProcessed = 0;
1025  p->readRes = SZ_OK;
1026  p->codeRes = SZ_OK;
1027  p->wasInterrupted = False;
1028
1029  p->crossStart = 0;
1030  p->crossEnd = 0;
1031
1032  p->filledThreadStart = 0;
1033  p->numFilledThreads = 0;
1034
1035  {
1036    unsigned numThreads = p->numThreadsMax;
1037    if (numThreads > MTDEC_THREADS_MAX)
1038      numThreads = MTDEC_THREADS_MAX;
1039    p->numStartedThreads_Limit = numThreads;
1040    p->numStartedThreads = 0;
1041  }
1042
1043  if (p->inBufSize != p->allocatedBufsSize)
1044  {
1045    for (i = 0; i < MTDEC_THREADS_MAX; i++)
1046    {
1047      CMtDecThread *t = &p->threads[i];
1048      if (t->inBuf)
1049        MtDecThread_FreeInBufs(t);
1050    }
1051    if (p->crossBlock)
1052    {
1053      ISzAlloc_Free(p->alloc, p->crossBlock);
1054      p->crossBlock = NULL;
1055    }
1056
1057    p->allocatedBufsSize = p->inBufSize;
1058  }
1059
1060  MtProgress_Init(&p->mtProgress, p->progress);
1061
1062  // RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
1063  p->exitThread = False;
1064  p->exitThreadWRes = 0;
1065
1066  {
1067    WRes wres;
1068    SRes sres;
1069    CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
1070    // wres = MtDecThread_CreateAndStart(nextThread);
1071    wres = MtDecThread_CreateEvents(nextThread);
1072    if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
1073    if (wres == 0) { wres = Event_Set(&nextThread->canRead);
1074    if (wres == 0) { THREAD_FUNC_RET_TYPE res = MtDec_ThreadFunc(nextThread);
1075    wres = (WRes)(UINT_PTR)res;
1076    if (wres != 0)
1077    {
1078      p->needContinue = False;
1079      MtDec_CloseThreads(p);
1080    }}}}
1081
1082    // wres = 17; // for test
1083    // wres = Event_Wait(&p->finishedEvent);
1084
1085    sres = MY_SRes_HRESULT_FROM_WRes(wres);
1086
1087    if (sres != 0)
1088      p->threadingErrorSRes = sres;
1089
1090    if (
1091        // wres == 0
1092        // wres != 0
1093        // || p->mtc.codeRes == SZ_ERROR_MEM
1094        p->isAllocError
1095        || p->threadingErrorSRes != SZ_OK
1096        || p->overflow)
1097    {
1098      // p->needContinue = True;
1099    }
1100    else
1101      p->needContinue = False;
1102
1103    if (p->needContinue)
1104      return SZ_OK;
1105
1106    // if (sres != SZ_OK)
1107    return sres;
1108    // return SZ_ERROR_FAIL;
1109  }
1110}
1111
1112#endif
1113
1114#undef PRF
1115