1/* MtDec.c -- Multi-thread Decoder 22023-04-02 : Igor Pavlov : Public domain */ 3 4#include "Precomp.h" 5 6// #define SHOW_DEBUG_INFO 7 8// #include <stdio.h> 9#include <string.h> 10 11#ifdef SHOW_DEBUG_INFO 12#include <stdio.h> 13#endif 14 15#include "MtDec.h" 16 17#ifndef Z7_ST 18 19#ifdef SHOW_DEBUG_INFO 20#define PRF(x) x 21#else 22#define PRF(x) 23#endif 24 25#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d)) 26 27void MtProgress_Init(CMtProgress *p, ICompressProgressPtr progress) 28{ 29 p->progress = progress; 30 p->res = SZ_OK; 31 p->totalInSize = 0; 32 p->totalOutSize = 0; 33} 34 35 36SRes MtProgress_Progress_ST(CMtProgress *p) 37{ 38 if (p->res == SZ_OK && p->progress) 39 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK) 40 p->res = SZ_ERROR_PROGRESS; 41 return p->res; 42} 43 44 45SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize) 46{ 47 SRes res; 48 CriticalSection_Enter(&p->cs); 49 50 p->totalInSize += inSize; 51 p->totalOutSize += outSize; 52 if (p->res == SZ_OK && p->progress) 53 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK) 54 p->res = SZ_ERROR_PROGRESS; 55 res = p->res; 56 57 CriticalSection_Leave(&p->cs); 58 return res; 59} 60 61 62SRes MtProgress_GetError(CMtProgress *p) 63{ 64 SRes res; 65 CriticalSection_Enter(&p->cs); 66 res = p->res; 67 CriticalSection_Leave(&p->cs); 68 return res; 69} 70 71 72void MtProgress_SetError(CMtProgress *p, SRes res) 73{ 74 CriticalSection_Enter(&p->cs); 75 if (p->res == SZ_OK) 76 p->res = res; 77 CriticalSection_Leave(&p->cs); 78} 79 80 81#define RINOK_THREAD(x) RINOK_WRes(x) 82 83 84struct CMtDecBufLink_ 85{ 86 struct CMtDecBufLink_ *next; 87 void *pad[3]; 88}; 89 90typedef struct CMtDecBufLink_ CMtDecBufLink; 91 92#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink) 93#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET) 94 95 96 97static THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp); 98 99 100static WRes MtDecThread_CreateEvents(CMtDecThread *t) 101{ 102 WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->canWrite); 103 if (wres == 0) 104 { 105 wres = AutoResetEvent_OptCreate_And_Reset(&t->canRead); 106 if (wres == 0) 107 return SZ_OK; 108 } 109 return wres; 110} 111 112 113static SRes MtDecThread_CreateAndStart(CMtDecThread *t) 114{ 115 WRes wres = MtDecThread_CreateEvents(t); 116 // wres = 17; // for test 117 if (wres == 0) 118 { 119 if (Thread_WasCreated(&t->thread)) 120 return SZ_OK; 121 wres = Thread_Create(&t->thread, MtDec_ThreadFunc, t); 122 if (wres == 0) 123 return SZ_OK; 124 } 125 return MY_SRes_HRESULT_FROM_WRes(wres); 126} 127 128 129void MtDecThread_FreeInBufs(CMtDecThread *t) 130{ 131 if (t->inBuf) 132 { 133 void *link = t->inBuf; 134 t->inBuf = NULL; 135 do 136 { 137 void *next = ((CMtDecBufLink *)link)->next; 138 ISzAlloc_Free(t->mtDec->alloc, link); 139 link = next; 140 } 141 while (link); 142 } 143} 144 145 146static void MtDecThread_CloseThread(CMtDecThread *t) 147{ 148 if (Thread_WasCreated(&t->thread)) 149 { 150 Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */ 151 Event_Set(&t->canRead); 152 Thread_Wait_Close(&t->thread); 153 } 154 155 Event_Close(&t->canRead); 156 Event_Close(&t->canWrite); 157} 158 159static void MtDec_CloseThreads(CMtDec *p) 160{ 161 unsigned i; 162 for (i = 0; i < MTDEC_THREADS_MAX; i++) 163 MtDecThread_CloseThread(&p->threads[i]); 164} 165 166static void MtDecThread_Destruct(CMtDecThread *t) 167{ 168 MtDecThread_CloseThread(t); 169 MtDecThread_FreeInBufs(t); 170} 171 172 173 174static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted) 175{ 176 SRes res; 177 CriticalSection_Enter(&p->mtProgress.cs); 178 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex); 179 res = p->mtProgress.res; 180 CriticalSection_Leave(&p->mtProgress.cs); 181 return res; 182} 183 184static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted) 185{ 186 SRes res; 187 CriticalSection_Enter(&p->mtProgress.cs); 188 189 p->mtProgress.totalInSize += inSize; 190 p->mtProgress.totalOutSize += outSize; 191 if (p->mtProgress.res == SZ_OK && p->mtProgress.progress) 192 if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK) 193 p->mtProgress.res = SZ_ERROR_PROGRESS; 194 195 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex); 196 res = p->mtProgress.res; 197 198 CriticalSection_Leave(&p->mtProgress.cs); 199 200 return res; 201} 202 203static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex) 204{ 205 CriticalSection_Enter(&p->mtProgress.cs); 206 if (!p->needInterrupt || interruptIndex < p->interruptIndex) 207 { 208 p->interruptIndex = interruptIndex; 209 p->needInterrupt = True; 210 } 211 CriticalSection_Leave(&p->mtProgress.cs); 212} 213 214Byte *MtDec_GetCrossBuff(CMtDec *p) 215{ 216 Byte *cr = p->crossBlock; 217 if (!cr) 218 { 219 cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize); 220 if (!cr) 221 return NULL; 222 p->crossBlock = cr; 223 } 224 return MTDEC__DATA_PTR_FROM_LINK(cr); 225} 226 227 228/* 229 MtDec_ThreadFunc2() returns: 230 0 - in all normal cases (even for stream error or memory allocation error) 231 (!= 0) - WRes error return by system threading function 232*/ 233 234// #define MTDEC_ProgessStep (1 << 22) 235#define MTDEC_ProgessStep (1 << 0) 236 237static WRes MtDec_ThreadFunc2(CMtDecThread *t) 238{ 239 CMtDec *p = t->mtDec; 240 241 PRF_STR_INT("MtDec_ThreadFunc2", t->index) 242 243 // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index); 244 245 for (;;) 246 { 247 SRes res, codeRes; 248 BoolInt wasInterrupted, isAllocError, overflow, finish; 249 SRes threadingErrorSRes; 250 BoolInt needCode, needWrite, needContinue; 251 252 size_t inDataSize_Start; 253 UInt64 inDataSize; 254 // UInt64 inDataSize_Full; 255 256 UInt64 blockIndex; 257 258 UInt64 inPrev = 0; 259 UInt64 outPrev = 0; 260 UInt64 inCodePos; 261 UInt64 outCodePos; 262 263 Byte *afterEndData = NULL; 264 size_t afterEndData_Size = 0; 265 BoolInt afterEndData_IsCross = False; 266 267 BoolInt canCreateNewThread = False; 268 // CMtDecCallbackInfo parse; 269 CMtDecThread *nextThread; 270 271 PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index) 272 273 RINOK_THREAD(Event_Wait(&t->canRead)) 274 if (p->exitThread) 275 return 0; 276 277 PRF_STR_INT("after Event_Wait(&t->canRead)", t->index) 278 279 // if (t->index == 3) return 19; // for test 280 281 blockIndex = p->blockIndex++; 282 283 // PRF(printf("\ncanRead\n")) 284 285 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted); 286 287 finish = p->readWasFinished; 288 needCode = False; 289 needWrite = False; 290 isAllocError = False; 291 overflow = False; 292 293 inDataSize_Start = 0; 294 inDataSize = 0; 295 // inDataSize_Full = 0; 296 297 if (res == SZ_OK && !wasInterrupted) 298 { 299 // if (p->inStream) 300 { 301 CMtDecBufLink *prev = NULL; 302 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf; 303 size_t crossSize = p->crossEnd - p->crossStart; 304 305 PRF(printf("\ncrossSize = %d\n", crossSize)); 306 307 for (;;) 308 { 309 if (!link) 310 { 311 link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize); 312 if (!link) 313 { 314 finish = True; 315 // p->allocError_for_Read_BlockIndex = blockIndex; 316 isAllocError = True; 317 break; 318 } 319 link->next = NULL; 320 if (prev) 321 { 322 // static unsigned g_num = 0; 323 // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev)); 324 prev->next = link; 325 } 326 else 327 t->inBuf = (void *)link; 328 } 329 330 { 331 Byte *data = MTDEC__DATA_PTR_FROM_LINK(link); 332 Byte *parseData = data; 333 size_t size; 334 335 if (crossSize != 0) 336 { 337 inDataSize = crossSize; 338 // inDataSize_Full = inDataSize; 339 inDataSize_Start = crossSize; 340 size = crossSize; 341 parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart; 342 PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d", 343 (int)p->crossStart, (int)p->crossEnd, (int)finish)); 344 } 345 else 346 { 347 size = p->inBufSize; 348 349 res = SeqInStream_ReadMax(p->inStream, data, &size); 350 351 // size = 10; // test 352 353 inDataSize += size; 354 // inDataSize_Full = inDataSize; 355 if (!prev) 356 inDataSize_Start = size; 357 358 p->readProcessed += size; 359 finish = (size != p->inBufSize); 360 if (finish) 361 p->readWasFinished = True; 362 363 // res = E_INVALIDARG; // test 364 365 if (res != SZ_OK) 366 { 367 // PRF(printf("\nRead error = %d\n", res)) 368 // we want to decode all data before error 369 p->readRes = res; 370 // p->readError_BlockIndex = blockIndex; 371 p->readWasFinished = True; 372 finish = True; 373 res = SZ_OK; 374 // break; 375 } 376 377 if (inDataSize - inPrev >= MTDEC_ProgessStep) 378 { 379 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted); 380 if (res != SZ_OK || wasInterrupted) 381 break; 382 inPrev = inDataSize; 383 } 384 } 385 386 { 387 CMtDecCallbackInfo parse; 388 389 parse.startCall = (prev == NULL); 390 parse.src = parseData; 391 parse.srcSize = size; 392 parse.srcFinished = finish; 393 parse.canCreateNewThread = True; 394 395 PRF(printf("\nParse size = %d\n", (unsigned)size)); 396 397 p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse); 398 399 PRF(printf(" Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state)); 400 401 needWrite = True; 402 canCreateNewThread = parse.canCreateNewThread; 403 404 // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize); 405 406 if ( 407 // parseRes != SZ_OK || 408 // inDataSize - (size - parse.srcSize) > p->inBlockMax 409 // || 410 parse.state == MTDEC_PARSE_OVERFLOW 411 // || wasInterrupted 412 ) 413 { 414 // Overflow or Parse error - switch from MT decoding to ST decoding 415 finish = True; 416 overflow = True; 417 418 { 419 PRF(printf("\n Overflow")); 420 // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished)); 421 PRF(printf("\n inDataSize = %d", (unsigned)inDataSize)); 422 } 423 424 if (crossSize != 0) 425 memcpy(data, parseData, size); 426 p->crossStart = 0; 427 p->crossEnd = 0; 428 break; 429 } 430 431 if (crossSize != 0) 432 { 433 memcpy(data, parseData, parse.srcSize); 434 p->crossStart += parse.srcSize; 435 } 436 437 if (parse.state != MTDEC_PARSE_CONTINUE || finish) 438 { 439 // we don't need to parse in current thread anymore 440 441 if (parse.state == MTDEC_PARSE_END) 442 finish = True; 443 444 needCode = True; 445 // p->crossFinished = finish; 446 447 if (parse.srcSize == size) 448 { 449 // full parsed - no cross transfer 450 p->crossStart = 0; 451 p->crossEnd = 0; 452 break; 453 } 454 455 if (parse.state == MTDEC_PARSE_END) 456 { 457 afterEndData = parseData + parse.srcSize; 458 afterEndData_Size = size - parse.srcSize; 459 if (crossSize != 0) 460 afterEndData_IsCross = True; 461 // we reduce data size to required bytes (parsed only) 462 inDataSize -= afterEndData_Size; 463 if (!prev) 464 inDataSize_Start = parse.srcSize; 465 break; 466 } 467 468 { 469 // partial parsed - need cross transfer 470 if (crossSize != 0) 471 inDataSize = parse.srcSize; // it's only parsed now 472 else 473 { 474 // partial parsed - is not in initial cross block - we need to copy new data to cross block 475 Byte *cr = MtDec_GetCrossBuff(p); 476 if (!cr) 477 { 478 { 479 PRF(printf("\ncross alloc error error\n")); 480 // res = SZ_ERROR_MEM; 481 finish = True; 482 // p->allocError_for_Read_BlockIndex = blockIndex; 483 isAllocError = True; 484 break; 485 } 486 } 487 488 { 489 size_t crSize = size - parse.srcSize; 490 inDataSize -= crSize; 491 p->crossEnd = crSize; 492 p->crossStart = 0; 493 memcpy(cr, parseData + parse.srcSize, crSize); 494 } 495 } 496 497 // inDataSize_Full = inDataSize; 498 if (!prev) 499 inDataSize_Start = parse.srcSize; // it's partial size (parsed only) 500 501 finish = False; 502 break; 503 } 504 } 505 506 if (parse.srcSize != size) 507 { 508 res = SZ_ERROR_FAIL; 509 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res)); 510 break; 511 } 512 } 513 } 514 515 prev = link; 516 link = link->next; 517 518 if (crossSize != 0) 519 { 520 crossSize = 0; 521 p->crossStart = 0; 522 p->crossEnd = 0; 523 } 524 } 525 } 526 527 if (res == SZ_OK) 528 res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted); 529 } 530 531 codeRes = SZ_OK; 532 533 if (res == SZ_OK && needCode && !wasInterrupted) 534 { 535 codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index); 536 if (codeRes != SZ_OK) 537 { 538 needCode = False; 539 finish = True; 540 // SZ_ERROR_MEM is expected error here. 541 // if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later. 542 // if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding. 543 } 544 } 545 546 if (res != SZ_OK || wasInterrupted) 547 finish = True; 548 549 nextThread = NULL; 550 threadingErrorSRes = SZ_OK; 551 552 if (!finish) 553 { 554 if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread) 555 { 556 SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]); 557 if (res2 == SZ_OK) 558 { 559 // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads)); 560 p->numStartedThreads++; 561 } 562 else 563 { 564 PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads)); 565 if (p->numStartedThreads == 1) 566 { 567 // if only one thread is possible, we leave muti-threading code 568 finish = True; 569 needCode = False; 570 threadingErrorSRes = res2; 571 } 572 else 573 p->numStartedThreads_Limit = p->numStartedThreads; 574 } 575 } 576 577 if (!finish) 578 { 579 unsigned nextIndex = t->index + 1; 580 nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex]; 581 RINOK_THREAD(Event_Set(&nextThread->canRead)) 582 // We have started executing for new iteration (with next thread) 583 // And that next thread now is responsible for possible exit from decoding (threading_code) 584 } 585 } 586 587 // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite) 588 // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case 589 // if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block): 590 // - if (needContinue) after Write(&needContinue), we restore decoding with new iteration 591 // - otherwise we stop decoding and exit from MtDec_ThreadFunc2() 592 593 // Don't change (finish) variable in the further code 594 595 596 // ---------- CODE ---------- 597 598 inPrev = 0; 599 outPrev = 0; 600 inCodePos = 0; 601 outCodePos = 0; 602 603 if (res == SZ_OK && needCode && codeRes == SZ_OK) 604 { 605 BoolInt isStartBlock = True; 606 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf; 607 608 for (;;) 609 { 610 size_t inSize; 611 int stop; 612 613 if (isStartBlock) 614 inSize = inDataSize_Start; 615 else 616 { 617 UInt64 rem = inDataSize - inCodePos; 618 inSize = p->inBufSize; 619 if (inSize > rem) 620 inSize = (size_t)rem; 621 } 622 623 inCodePos += inSize; 624 stop = True; 625 626 codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index, 627 (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize, 628 (inCodePos == inDataSize), // srcFinished 629 &inCodePos, &outCodePos, &stop); 630 631 if (codeRes != SZ_OK) 632 { 633 PRF(printf("\nCode Interrupt error = %x\n", codeRes)); 634 // we interrupt only later blocks 635 MtDec_Interrupt(p, blockIndex); 636 break; 637 } 638 639 if (stop || inCodePos == inDataSize) 640 break; 641 642 { 643 const UInt64 inDelta = inCodePos - inPrev; 644 const UInt64 outDelta = outCodePos - outPrev; 645 if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep) 646 { 647 // Sleep(1); 648 res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted); 649 if (res != SZ_OK || wasInterrupted) 650 break; 651 inPrev = inCodePos; 652 outPrev = outCodePos; 653 } 654 } 655 656 link = link->next; 657 isStartBlock = False; 658 } 659 } 660 661 662 // ---------- WRITE ---------- 663 664 RINOK_THREAD(Event_Wait(&t->canWrite)) 665 666 { 667 BoolInt isErrorMode = False; 668 BoolInt canRecode = True; 669 BoolInt needWriteToStream = needWrite; 670 671 if (p->exitThread) return 0; // it's never executed in normal cases 672 673 if (p->wasInterrupted) 674 wasInterrupted = True; 675 else 676 { 677 if (codeRes != SZ_OK) // || !needCode // check it !!! 678 { 679 p->wasInterrupted = True; 680 p->codeRes = codeRes; 681 if (codeRes == SZ_ERROR_MEM) 682 isAllocError = True; 683 } 684 685 if (threadingErrorSRes) 686 { 687 p->wasInterrupted = True; 688 p->threadingErrorSRes = threadingErrorSRes; 689 needWriteToStream = False; 690 } 691 if (isAllocError) 692 { 693 p->wasInterrupted = True; 694 p->isAllocError = True; 695 needWriteToStream = False; 696 } 697 if (overflow) 698 { 699 p->wasInterrupted = True; 700 p->overflow = True; 701 needWriteToStream = False; 702 } 703 } 704 705 if (needCode) 706 { 707 if (wasInterrupted) 708 { 709 inCodePos = 0; 710 outCodePos = 0; 711 } 712 { 713 const UInt64 inDelta = inCodePos - inPrev; 714 const UInt64 outDelta = outCodePos - outPrev; 715 // if (inDelta != 0 || outDelta != 0) 716 res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta); 717 } 718 } 719 720 needContinue = (!finish); 721 722 // if (res == SZ_OK && needWrite && !wasInterrupted) 723 if (needWrite) 724 { 725 // p->inProcessed += inCodePos; 726 727 PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size)); 728 729 res = p->mtCallback->Write(p->mtCallbackObject, t->index, 730 res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite 731 afterEndData, afterEndData_Size, afterEndData_IsCross, 732 &needContinue, 733 &canRecode); 734 735 // res = SZ_ERROR_FAIL; // for test 736 737 PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue)); 738 PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed)); 739 740 if (res != SZ_OK) 741 { 742 PRF(printf("\nWrite error = %d\n", res)); 743 isErrorMode = True; 744 p->wasInterrupted = True; 745 } 746 if (res != SZ_OK 747 || (!needContinue && !finish)) 748 { 749 PRF(printf("\nWrite Interrupt error = %x\n", res)); 750 MtDec_Interrupt(p, blockIndex); 751 } 752 } 753 754 if (canRecode) 755 if (!needCode 756 || res != SZ_OK 757 || p->wasInterrupted 758 || codeRes != SZ_OK 759 || wasInterrupted 760 || p->numFilledThreads != 0 761 || isErrorMode) 762 { 763 if (p->numFilledThreads == 0) 764 p->filledThreadStart = t->index; 765 if (inDataSize != 0 || !finish) 766 { 767 t->inDataSize_Start = inDataSize_Start; 768 t->inDataSize = inDataSize; 769 p->numFilledThreads++; 770 } 771 PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads)); 772 PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart)); 773 } 774 775 if (!finish) 776 { 777 RINOK_THREAD(Event_Set(&nextThread->canWrite)) 778 } 779 else 780 { 781 if (needContinue) 782 { 783 // we restore decoding with new iteration 784 RINOK_THREAD(Event_Set(&p->threads[0].canWrite)) 785 } 786 else 787 { 788 // we exit from decoding 789 if (t->index == 0) 790 return SZ_OK; 791 p->exitThread = True; 792 } 793 RINOK_THREAD(Event_Set(&p->threads[0].canRead)) 794 } 795 } 796 } 797} 798 799#ifdef _WIN32 800#define USE_ALLOCA 801#endif 802 803#ifdef USE_ALLOCA 804#ifdef _WIN32 805#include <malloc.h> 806#else 807#include <stdlib.h> 808#endif 809#endif 810 811 812static THREAD_FUNC_DECL MtDec_ThreadFunc1(void *pp) 813{ 814 WRes res; 815 816 CMtDecThread *t = (CMtDecThread *)pp; 817 CMtDec *p; 818 819 // fprintf(stdout, "\n%d = %p\n", t->index, &t); 820 821 res = MtDec_ThreadFunc2(t); 822 p = t->mtDec; 823 if (res == 0) 824 return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes; 825 { 826 // it's unexpected situation for some threading function error 827 if (p->exitThreadWRes == 0) 828 p->exitThreadWRes = res; 829 PRF(printf("\nthread exit error = %d\n", res)); 830 p->exitThread = True; 831 Event_Set(&p->threads[0].canRead); 832 Event_Set(&p->threads[0].canWrite); 833 MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res)); 834 } 835 return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res; 836} 837 838static Z7_NO_INLINE THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp) 839{ 840 #ifdef USE_ALLOCA 841 CMtDecThread *t = (CMtDecThread *)pp; 842 // fprintf(stderr, "\n%d = %p - before", t->index, &t); 843 t->allocaPtr = alloca(t->index * 128); 844 #endif 845 return MtDec_ThreadFunc1(pp); 846} 847 848 849int MtDec_PrepareRead(CMtDec *p) 850{ 851 if (p->crossBlock && p->crossStart == p->crossEnd) 852 { 853 ISzAlloc_Free(p->alloc, p->crossBlock); 854 p->crossBlock = NULL; 855 } 856 857 { 858 unsigned i; 859 for (i = 0; i < MTDEC_THREADS_MAX; i++) 860 if (i > p->numStartedThreads 861 || p->numFilledThreads <= 862 (i >= p->filledThreadStart ? 863 i - p->filledThreadStart : 864 i + p->numStartedThreads - p->filledThreadStart)) 865 MtDecThread_FreeInBufs(&p->threads[i]); 866 } 867 868 return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd); 869} 870 871 872const Byte *MtDec_Read(CMtDec *p, size_t *inLim) 873{ 874 while (p->numFilledThreads != 0) 875 { 876 CMtDecThread *t = &p->threads[p->filledThreadStart]; 877 878 if (*inLim != 0) 879 { 880 { 881 void *link = t->inBuf; 882 void *next = ((CMtDecBufLink *)link)->next; 883 ISzAlloc_Free(p->alloc, link); 884 t->inBuf = next; 885 } 886 887 if (t->inDataSize == 0) 888 { 889 MtDecThread_FreeInBufs(t); 890 if (--p->numFilledThreads == 0) 891 break; 892 if (++p->filledThreadStart == p->numStartedThreads) 893 p->filledThreadStart = 0; 894 t = &p->threads[p->filledThreadStart]; 895 } 896 } 897 898 { 899 size_t lim = t->inDataSize_Start; 900 if (lim != 0) 901 t->inDataSize_Start = 0; 902 else 903 { 904 UInt64 rem = t->inDataSize; 905 lim = p->inBufSize; 906 if (lim > rem) 907 lim = (size_t)rem; 908 } 909 t->inDataSize -= lim; 910 *inLim = lim; 911 return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf); 912 } 913 } 914 915 { 916 size_t crossSize = p->crossEnd - p->crossStart; 917 if (crossSize != 0) 918 { 919 const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart; 920 *inLim = crossSize; 921 p->crossStart = 0; 922 p->crossEnd = 0; 923 return data; 924 } 925 *inLim = 0; 926 if (p->crossBlock) 927 { 928 ISzAlloc_Free(p->alloc, p->crossBlock); 929 p->crossBlock = NULL; 930 } 931 return NULL; 932 } 933} 934 935 936void MtDec_Construct(CMtDec *p) 937{ 938 unsigned i; 939 940 p->inBufSize = (size_t)1 << 18; 941 942 p->numThreadsMax = 0; 943 944 p->inStream = NULL; 945 946 // p->inData = NULL; 947 // p->inDataSize = 0; 948 949 p->crossBlock = NULL; 950 p->crossStart = 0; 951 p->crossEnd = 0; 952 953 p->numFilledThreads = 0; 954 955 p->progress = NULL; 956 p->alloc = NULL; 957 958 p->mtCallback = NULL; 959 p->mtCallbackObject = NULL; 960 961 p->allocatedBufsSize = 0; 962 963 for (i = 0; i < MTDEC_THREADS_MAX; i++) 964 { 965 CMtDecThread *t = &p->threads[i]; 966 t->mtDec = p; 967 t->index = i; 968 t->inBuf = NULL; 969 Event_Construct(&t->canRead); 970 Event_Construct(&t->canWrite); 971 Thread_CONSTRUCT(&t->thread) 972 } 973 974 // Event_Construct(&p->finishedEvent); 975 976 CriticalSection_Init(&p->mtProgress.cs); 977} 978 979 980static void MtDec_Free(CMtDec *p) 981{ 982 unsigned i; 983 984 p->exitThread = True; 985 986 for (i = 0; i < MTDEC_THREADS_MAX; i++) 987 MtDecThread_Destruct(&p->threads[i]); 988 989 // Event_Close(&p->finishedEvent); 990 991 if (p->crossBlock) 992 { 993 ISzAlloc_Free(p->alloc, p->crossBlock); 994 p->crossBlock = NULL; 995 } 996} 997 998 999void MtDec_Destruct(CMtDec *p) 1000{ 1001 MtDec_Free(p); 1002 1003 CriticalSection_Delete(&p->mtProgress.cs); 1004} 1005 1006 1007SRes MtDec_Code(CMtDec *p) 1008{ 1009 unsigned i; 1010 1011 p->inProcessed = 0; 1012 1013 p->blockIndex = 1; // it must be larger than not_defined index (0) 1014 p->isAllocError = False; 1015 p->overflow = False; 1016 p->threadingErrorSRes = SZ_OK; 1017 1018 p->needContinue = True; 1019 1020 p->readWasFinished = False; 1021 p->needInterrupt = False; 1022 p->interruptIndex = (UInt64)(Int64)-1; 1023 1024 p->readProcessed = 0; 1025 p->readRes = SZ_OK; 1026 p->codeRes = SZ_OK; 1027 p->wasInterrupted = False; 1028 1029 p->crossStart = 0; 1030 p->crossEnd = 0; 1031 1032 p->filledThreadStart = 0; 1033 p->numFilledThreads = 0; 1034 1035 { 1036 unsigned numThreads = p->numThreadsMax; 1037 if (numThreads > MTDEC_THREADS_MAX) 1038 numThreads = MTDEC_THREADS_MAX; 1039 p->numStartedThreads_Limit = numThreads; 1040 p->numStartedThreads = 0; 1041 } 1042 1043 if (p->inBufSize != p->allocatedBufsSize) 1044 { 1045 for (i = 0; i < MTDEC_THREADS_MAX; i++) 1046 { 1047 CMtDecThread *t = &p->threads[i]; 1048 if (t->inBuf) 1049 MtDecThread_FreeInBufs(t); 1050 } 1051 if (p->crossBlock) 1052 { 1053 ISzAlloc_Free(p->alloc, p->crossBlock); 1054 p->crossBlock = NULL; 1055 } 1056 1057 p->allocatedBufsSize = p->inBufSize; 1058 } 1059 1060 MtProgress_Init(&p->mtProgress, p->progress); 1061 1062 // RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent)) 1063 p->exitThread = False; 1064 p->exitThreadWRes = 0; 1065 1066 { 1067 WRes wres; 1068 SRes sres; 1069 CMtDecThread *nextThread = &p->threads[p->numStartedThreads++]; 1070 // wres = MtDecThread_CreateAndStart(nextThread); 1071 wres = MtDecThread_CreateEvents(nextThread); 1072 if (wres == 0) { wres = Event_Set(&nextThread->canWrite); 1073 if (wres == 0) { wres = Event_Set(&nextThread->canRead); 1074 if (wres == 0) { THREAD_FUNC_RET_TYPE res = MtDec_ThreadFunc(nextThread); 1075 wres = (WRes)(UINT_PTR)res; 1076 if (wres != 0) 1077 { 1078 p->needContinue = False; 1079 MtDec_CloseThreads(p); 1080 }}}} 1081 1082 // wres = 17; // for test 1083 // wres = Event_Wait(&p->finishedEvent); 1084 1085 sres = MY_SRes_HRESULT_FROM_WRes(wres); 1086 1087 if (sres != 0) 1088 p->threadingErrorSRes = sres; 1089 1090 if ( 1091 // wres == 0 1092 // wres != 0 1093 // || p->mtc.codeRes == SZ_ERROR_MEM 1094 p->isAllocError 1095 || p->threadingErrorSRes != SZ_OK 1096 || p->overflow) 1097 { 1098 // p->needContinue = True; 1099 } 1100 else 1101 p->needContinue = False; 1102 1103 if (p->needContinue) 1104 return SZ_OK; 1105 1106 // if (sres != SZ_OK) 1107 return sres; 1108 // return SZ_ERROR_FAIL; 1109 } 1110} 1111 1112#endif 1113 1114#undef PRF 1115