1// StreamBinder.cpp
2
3#include "StdAfx.h"
4
5#include "../../Common/MyCom.h"
6
7#include "StreamBinder.h"
8
9Z7_CLASS_IMP_COM_1(
10  CBinderInStream
11  , ISequentialInStream
12)
13  CStreamBinder *_binder;
14public:
15  ~CBinderInStream() { _binder->CloseRead_CallOnce(); }
16  CBinderInStream(CStreamBinder *binder): _binder(binder) {}
17};
18
19Z7_COM7F_IMF(CBinderInStream::Read(void *data, UInt32 size, UInt32 *processedSize))
20  { return _binder->Read(data, size, processedSize); }
21
22
23Z7_CLASS_IMP_COM_1(
24  CBinderOutStream
25  , ISequentialOutStream
26)
27  CStreamBinder *_binder;
28public:
29  ~CBinderOutStream() { _binder->CloseWrite(); }
30  CBinderOutStream(CStreamBinder *binder): _binder(binder) {}
31};
32
33Z7_COM7F_IMF(CBinderOutStream::Write(const void *data, UInt32 size, UInt32 *processedSize))
34  { return _binder->Write(data, size, processedSize); }
35
36
37static HRESULT Event_Create_or_Reset(NWindows::NSynchronization::CAutoResetEvent &event)
38{
39  const WRes wres = event.CreateIfNotCreated_Reset();
40  return HRESULT_FROM_WIN32(wres);
41}
42
43HRESULT CStreamBinder::Create_ReInit()
44{
45  RINOK(Event_Create_or_Reset(_canRead_Event))
46  // RINOK(Event_Create_or_Reset(_canWrite_Event))
47
48  // _canWrite_Semaphore.Close();
49  // we need at least 3 items of maxCount: 1 for normal unlock in Read(), 2 items for unlock in CloseRead_CallOnce()
50  _canWrite_Semaphore.OptCreateInit(0, 3);
51
52  // _readingWasClosed = false;
53  _readingWasClosed2 = false;
54
55  _waitWrite = true;
56  _bufSize = 0;
57  _buf = NULL;
58  ProcessedSize = 0;
59  // WritingWasCut = false;
60  return S_OK;
61}
62
63
64void CStreamBinder::CreateStreams2(CMyComPtr<ISequentialInStream> &inStream, CMyComPtr<ISequentialOutStream> &outStream)
65{
66  inStream = new CBinderInStream(this);
67  outStream = new CBinderOutStream(this);
68}
69
70// (_canRead_Event && _bufSize == 0) means that stream is finished.
71
72HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
73{
74  if (processedSize)
75    *processedSize = 0;
76  if (size != 0)
77  {
78    if (_waitWrite)
79    {
80      WRes wres = _canRead_Event.Lock();
81      if (wres != 0)
82        return HRESULT_FROM_WIN32(wres);
83      _waitWrite = false;
84    }
85    if (size > _bufSize)
86      size = _bufSize;
87    if (size != 0)
88    {
89      memcpy(data, _buf, size);
90      _buf = ((const Byte *)_buf) + size;
91      ProcessedSize += size;
92      if (processedSize)
93        *processedSize = size;
94      _bufSize -= size;
95
96      /*
97      if (_bufSize == 0), then we have read whole buffer
98      we have two ways here:
99        - if we       check (_bufSize == 0) here, we unlock Write only after full data Reading - it reduces the number of syncs
100        - if we don't check (_bufSize == 0) here, we unlock Write after partial data Reading
101      */
102      if (_bufSize == 0)
103      {
104        _waitWrite = true;
105        // _canWrite_Event.Set();
106        _canWrite_Semaphore.Release();
107      }
108    }
109  }
110  return S_OK;
111}
112
113
114HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
115{
116  if (processedSize)
117    *processedSize = 0;
118  if (size == 0)
119    return S_OK;
120
121  if (!_readingWasClosed2)
122  {
123    _buf = data;
124    _bufSize = size;
125    _canRead_Event.Set();
126
127    /*
128    _canWrite_Event.Lock();
129    if (_readingWasClosed)
130      _readingWasClosed2 = true;
131    */
132
133    _canWrite_Semaphore.Lock();
134
135    // _bufSize : is remain size that was not read
136    size -= _bufSize;
137
138    // size : is size of data that was read
139    if (size != 0)
140    {
141      // if some data was read, then we report that size and return
142      if (processedSize)
143        *processedSize = size;
144      return S_OK;
145    }
146    _readingWasClosed2 = true;
147  }
148
149  // WritingWasCut = true;
150  return k_My_HRESULT_WritingWasCut;
151}
152