1/* MtCoder.c -- Multi-thread Coder 22023-04-13 : Igor Pavlov : Public domain */ 3 4#include "Precomp.h" 5 6#include "MtCoder.h" 7 8#ifndef Z7_ST 9 10static SRes MtProgressThunk_Progress(ICompressProgressPtr pp, UInt64 inSize, UInt64 outSize) 11{ 12 Z7_CONTAINER_FROM_VTBL_TO_DECL_VAR_pp_vt_p(CMtProgressThunk) 13 UInt64 inSize2 = 0; 14 UInt64 outSize2 = 0; 15 if (inSize != (UInt64)(Int64)-1) 16 { 17 inSize2 = inSize - p->inSize; 18 p->inSize = inSize; 19 } 20 if (outSize != (UInt64)(Int64)-1) 21 { 22 outSize2 = outSize - p->outSize; 23 p->outSize = outSize; 24 } 25 return MtProgress_ProgressAdd(p->mtProgress, inSize2, outSize2); 26} 27 28 29void MtProgressThunk_CreateVTable(CMtProgressThunk *p) 30{ 31 p->vt.Progress = MtProgressThunk_Progress; 32} 33 34 35 36#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; } 37 38 39static THREAD_FUNC_DECL ThreadFunc(void *pp); 40 41 42static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t) 43{ 44 WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->startEvent); 45 if (wres == 0) 46 { 47 t->stop = False; 48 if (!Thread_WasCreated(&t->thread)) 49 wres = Thread_Create(&t->thread, ThreadFunc, t); 50 if (wres == 0) 51 wres = Event_Set(&t->startEvent); 52 } 53 if (wres == 0) 54 return SZ_OK; 55 return MY_SRes_HRESULT_FROM_WRes(wres); 56} 57 58 59static void MtCoderThread_Destruct(CMtCoderThread *t) 60{ 61 if (Thread_WasCreated(&t->thread)) 62 { 63 t->stop = 1; 64 Event_Set(&t->startEvent); 65 Thread_Wait_Close(&t->thread); 66 } 67 68 Event_Close(&t->startEvent); 69 70 if (t->inBuf) 71 { 72 ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf); 73 t->inBuf = NULL; 74 } 75} 76 77 78 79 80/* 81 ThreadFunc2() returns: 82 SZ_OK - in all normal cases (even for stream error or memory allocation error) 83 SZ_ERROR_THREAD - in case of failure in system synch function 84*/ 85 86static SRes ThreadFunc2(CMtCoderThread *t) 87{ 88 CMtCoder *mtc = t->mtCoder; 89 90 for (;;) 91 { 92 unsigned bi; 93 SRes res; 94 SRes res2; 95 BoolInt finished; 96 unsigned bufIndex; 97 size_t size; 98 const Byte *inData; 99 UInt64 readProcessed = 0; 100 101 RINOK_THREAD(Event_Wait(&mtc->readEvent)) 102 103 /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */ 104 105 if (mtc->stopReading) 106 { 107 return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD; 108 } 109 110 res = MtProgress_GetError(&mtc->mtProgress); 111 112 size = 0; 113 inData = NULL; 114 finished = True; 115 116 if (res == SZ_OK) 117 { 118 size = mtc->blockSize; 119 if (mtc->inStream) 120 { 121 if (!t->inBuf) 122 { 123 t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize); 124 if (!t->inBuf) 125 res = SZ_ERROR_MEM; 126 } 127 if (res == SZ_OK) 128 { 129 res = SeqInStream_ReadMax(mtc->inStream, t->inBuf, &size); 130 readProcessed = mtc->readProcessed + size; 131 mtc->readProcessed = readProcessed; 132 } 133 if (res != SZ_OK) 134 { 135 mtc->readRes = res; 136 /* after reading error - we can stop encoding of previous blocks */ 137 MtProgress_SetError(&mtc->mtProgress, res); 138 } 139 else 140 finished = (size != mtc->blockSize); 141 } 142 else 143 { 144 size_t rem; 145 readProcessed = mtc->readProcessed; 146 rem = mtc->inDataSize - (size_t)readProcessed; 147 if (size > rem) 148 size = rem; 149 inData = mtc->inData + (size_t)readProcessed; 150 readProcessed += size; 151 mtc->readProcessed = readProcessed; 152 finished = (mtc->inDataSize == (size_t)readProcessed); 153 } 154 } 155 156 /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */ 157 158 res2 = SZ_OK; 159 160 if (Semaphore_Wait(&mtc->blocksSemaphore) != 0) 161 { 162 res2 = SZ_ERROR_THREAD; 163 if (res == SZ_OK) 164 { 165 res = res2; 166 // MtProgress_SetError(&mtc->mtProgress, res); 167 } 168 } 169 170 bi = mtc->blockIndex; 171 172 if (++mtc->blockIndex >= mtc->numBlocksMax) 173 mtc->blockIndex = 0; 174 175 bufIndex = (unsigned)(int)-1; 176 177 if (res == SZ_OK) 178 res = MtProgress_GetError(&mtc->mtProgress); 179 180 if (res != SZ_OK) 181 finished = True; 182 183 if (!finished) 184 { 185 if (mtc->numStartedThreads < mtc->numStartedThreadsLimit 186 && mtc->expectedDataSize != readProcessed) 187 { 188 res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]); 189 if (res == SZ_OK) 190 mtc->numStartedThreads++; 191 else 192 { 193 MtProgress_SetError(&mtc->mtProgress, res); 194 finished = True; 195 } 196 } 197 } 198 199 if (finished) 200 mtc->stopReading = True; 201 202 RINOK_THREAD(Event_Set(&mtc->readEvent)) 203 204 if (res2 != SZ_OK) 205 return res2; 206 207 if (res == SZ_OK) 208 { 209 CriticalSection_Enter(&mtc->cs); 210 bufIndex = mtc->freeBlockHead; 211 mtc->freeBlockHead = mtc->freeBlockList[bufIndex]; 212 CriticalSection_Leave(&mtc->cs); 213 214 res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex, 215 mtc->inStream ? t->inBuf : inData, size, finished); 216 217 // MtProgress_Reinit(&mtc->mtProgress, t->index); 218 219 if (res != SZ_OK) 220 MtProgress_SetError(&mtc->mtProgress, res); 221 } 222 223 { 224 CMtCoderBlock *block = &mtc->blocks[bi]; 225 block->res = res; 226 block->bufIndex = bufIndex; 227 block->finished = finished; 228 } 229 230 #ifdef MTCODER_USE_WRITE_THREAD 231 RINOK_THREAD(Event_Set(&mtc->writeEvents[bi])) 232 #else 233 { 234 unsigned wi; 235 { 236 CriticalSection_Enter(&mtc->cs); 237 wi = mtc->writeIndex; 238 if (wi == bi) 239 mtc->writeIndex = (unsigned)(int)-1; 240 else 241 mtc->ReadyBlocks[bi] = True; 242 CriticalSection_Leave(&mtc->cs); 243 } 244 245 if (wi != bi) 246 { 247 if (res != SZ_OK || finished) 248 return 0; 249 continue; 250 } 251 252 if (mtc->writeRes != SZ_OK) 253 res = mtc->writeRes; 254 255 for (;;) 256 { 257 if (res == SZ_OK && bufIndex != (unsigned)(int)-1) 258 { 259 res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex); 260 if (res != SZ_OK) 261 { 262 mtc->writeRes = res; 263 MtProgress_SetError(&mtc->mtProgress, res); 264 } 265 } 266 267 if (++wi >= mtc->numBlocksMax) 268 wi = 0; 269 { 270 BoolInt isReady; 271 272 CriticalSection_Enter(&mtc->cs); 273 274 if (bufIndex != (unsigned)(int)-1) 275 { 276 mtc->freeBlockList[bufIndex] = mtc->freeBlockHead; 277 mtc->freeBlockHead = bufIndex; 278 } 279 280 isReady = mtc->ReadyBlocks[wi]; 281 282 if (isReady) 283 mtc->ReadyBlocks[wi] = False; 284 else 285 mtc->writeIndex = wi; 286 287 CriticalSection_Leave(&mtc->cs); 288 289 RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore)) 290 291 if (!isReady) 292 break; 293 } 294 295 { 296 CMtCoderBlock *block = &mtc->blocks[wi]; 297 if (res == SZ_OK && block->res != SZ_OK) 298 res = block->res; 299 bufIndex = block->bufIndex; 300 finished = block->finished; 301 } 302 } 303 } 304 #endif 305 306 if (finished || res != SZ_OK) 307 return 0; 308 } 309} 310 311 312static THREAD_FUNC_DECL ThreadFunc(void *pp) 313{ 314 CMtCoderThread *t = (CMtCoderThread *)pp; 315 for (;;) 316 { 317 if (Event_Wait(&t->startEvent) != 0) 318 return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD; 319 if (t->stop) 320 return 0; 321 { 322 SRes res = ThreadFunc2(t); 323 CMtCoder *mtc = t->mtCoder; 324 if (res != SZ_OK) 325 { 326 MtProgress_SetError(&mtc->mtProgress, res); 327 } 328 329 #ifndef MTCODER_USE_WRITE_THREAD 330 { 331 unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads); 332 if (numFinished == mtc->numStartedThreads) 333 if (Event_Set(&mtc->finishedEvent) != 0) 334 return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD; 335 } 336 #endif 337 } 338 } 339} 340 341 342 343void MtCoder_Construct(CMtCoder *p) 344{ 345 unsigned i; 346 347 p->blockSize = 0; 348 p->numThreadsMax = 0; 349 p->expectedDataSize = (UInt64)(Int64)-1; 350 351 p->inStream = NULL; 352 p->inData = NULL; 353 p->inDataSize = 0; 354 355 p->progress = NULL; 356 p->allocBig = NULL; 357 358 p->mtCallback = NULL; 359 p->mtCallbackObject = NULL; 360 361 p->allocatedBufsSize = 0; 362 363 Event_Construct(&p->readEvent); 364 Semaphore_Construct(&p->blocksSemaphore); 365 366 for (i = 0; i < MTCODER_THREADS_MAX; i++) 367 { 368 CMtCoderThread *t = &p->threads[i]; 369 t->mtCoder = p; 370 t->index = i; 371 t->inBuf = NULL; 372 t->stop = False; 373 Event_Construct(&t->startEvent); 374 Thread_CONSTRUCT(&t->thread) 375 } 376 377 #ifdef MTCODER_USE_WRITE_THREAD 378 for (i = 0; i < MTCODER_BLOCKS_MAX; i++) 379 Event_Construct(&p->writeEvents[i]); 380 #else 381 Event_Construct(&p->finishedEvent); 382 #endif 383 384 CriticalSection_Init(&p->cs); 385 CriticalSection_Init(&p->mtProgress.cs); 386} 387 388 389 390 391static void MtCoder_Free(CMtCoder *p) 392{ 393 unsigned i; 394 395 /* 396 p->stopReading = True; 397 if (Event_IsCreated(&p->readEvent)) 398 Event_Set(&p->readEvent); 399 */ 400 401 for (i = 0; i < MTCODER_THREADS_MAX; i++) 402 MtCoderThread_Destruct(&p->threads[i]); 403 404 Event_Close(&p->readEvent); 405 Semaphore_Close(&p->blocksSemaphore); 406 407 #ifdef MTCODER_USE_WRITE_THREAD 408 for (i = 0; i < MTCODER_BLOCKS_MAX; i++) 409 Event_Close(&p->writeEvents[i]); 410 #else 411 Event_Close(&p->finishedEvent); 412 #endif 413} 414 415 416void MtCoder_Destruct(CMtCoder *p) 417{ 418 MtCoder_Free(p); 419 420 CriticalSection_Delete(&p->cs); 421 CriticalSection_Delete(&p->mtProgress.cs); 422} 423 424 425SRes MtCoder_Code(CMtCoder *p) 426{ 427 unsigned numThreads = p->numThreadsMax; 428 unsigned numBlocksMax; 429 unsigned i; 430 SRes res = SZ_OK; 431 432 if (numThreads > MTCODER_THREADS_MAX) 433 numThreads = MTCODER_THREADS_MAX; 434 numBlocksMax = MTCODER_GET_NUM_BLOCKS_FROM_THREADS(numThreads); 435 436 if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++; 437 if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++; 438 if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++; 439 440 if (numBlocksMax > MTCODER_BLOCKS_MAX) 441 numBlocksMax = MTCODER_BLOCKS_MAX; 442 443 if (p->blockSize != p->allocatedBufsSize) 444 { 445 for (i = 0; i < MTCODER_THREADS_MAX; i++) 446 { 447 CMtCoderThread *t = &p->threads[i]; 448 if (t->inBuf) 449 { 450 ISzAlloc_Free(p->allocBig, t->inBuf); 451 t->inBuf = NULL; 452 } 453 } 454 p->allocatedBufsSize = p->blockSize; 455 } 456 457 p->readRes = SZ_OK; 458 459 MtProgress_Init(&p->mtProgress, p->progress); 460 461 #ifdef MTCODER_USE_WRITE_THREAD 462 for (i = 0; i < numBlocksMax; i++) 463 { 464 RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->writeEvents[i])) 465 } 466 #else 467 RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent)) 468 #endif 469 470 { 471 RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->readEvent)) 472 RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax)) 473 } 474 475 for (i = 0; i < MTCODER_BLOCKS_MAX - 1; i++) 476 p->freeBlockList[i] = i + 1; 477 p->freeBlockList[MTCODER_BLOCKS_MAX - 1] = (unsigned)(int)-1; 478 p->freeBlockHead = 0; 479 480 p->readProcessed = 0; 481 p->blockIndex = 0; 482 p->numBlocksMax = numBlocksMax; 483 p->stopReading = False; 484 485 #ifndef MTCODER_USE_WRITE_THREAD 486 p->writeIndex = 0; 487 p->writeRes = SZ_OK; 488 for (i = 0; i < MTCODER_BLOCKS_MAX; i++) 489 p->ReadyBlocks[i] = False; 490 p->numFinishedThreads = 0; 491 #endif 492 493 p->numStartedThreadsLimit = numThreads; 494 p->numStartedThreads = 0; 495 496 // for (i = 0; i < numThreads; i++) 497 { 498 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++]; 499 RINOK(MtCoderThread_CreateAndStart(nextThread)) 500 } 501 502 RINOK_THREAD(Event_Set(&p->readEvent)) 503 504 #ifdef MTCODER_USE_WRITE_THREAD 505 { 506 unsigned bi = 0; 507 508 for (;; bi++) 509 { 510 if (bi >= numBlocksMax) 511 bi = 0; 512 513 RINOK_THREAD(Event_Wait(&p->writeEvents[bi])) 514 515 { 516 const CMtCoderBlock *block = &p->blocks[bi]; 517 unsigned bufIndex = block->bufIndex; 518 BoolInt finished = block->finished; 519 if (res == SZ_OK && block->res != SZ_OK) 520 res = block->res; 521 522 if (bufIndex != (unsigned)(int)-1) 523 { 524 if (res == SZ_OK) 525 { 526 res = p->mtCallback->Write(p->mtCallbackObject, bufIndex); 527 if (res != SZ_OK) 528 MtProgress_SetError(&p->mtProgress, res); 529 } 530 531 CriticalSection_Enter(&p->cs); 532 { 533 p->freeBlockList[bufIndex] = p->freeBlockHead; 534 p->freeBlockHead = bufIndex; 535 } 536 CriticalSection_Leave(&p->cs); 537 } 538 539 RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore)) 540 541 if (finished) 542 break; 543 } 544 } 545 } 546 #else 547 { 548 WRes wres = Event_Wait(&p->finishedEvent); 549 res = MY_SRes_HRESULT_FROM_WRes(wres); 550 } 551 #endif 552 553 if (res == SZ_OK) 554 res = p->readRes; 555 556 if (res == SZ_OK) 557 res = p->mtProgress.res; 558 559 #ifndef MTCODER_USE_WRITE_THREAD 560 if (res == SZ_OK) 561 res = p->writeRes; 562 #endif 563 564 if (res != SZ_OK) 565 MtCoder_Free(p); 566 return res; 567} 568 569#endif 570 571#undef RINOK_THREAD 572