1e5c31af7Sopenharmony_ci/*-------------------------------------------------------------------------
2e5c31af7Sopenharmony_ci * drawElements Quality Program Test Executor
3e5c31af7Sopenharmony_ci * ------------------------------------------
4e5c31af7Sopenharmony_ci *
5e5c31af7Sopenharmony_ci * Copyright 2014 The Android Open Source Project
6e5c31af7Sopenharmony_ci *
7e5c31af7Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
8e5c31af7Sopenharmony_ci * you may not use this file except in compliance with the License.
9e5c31af7Sopenharmony_ci * You may obtain a copy of the License at
10e5c31af7Sopenharmony_ci *
11e5c31af7Sopenharmony_ci *      http://www.apache.org/licenses/LICENSE-2.0
12e5c31af7Sopenharmony_ci *
13e5c31af7Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
14e5c31af7Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
15e5c31af7Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16e5c31af7Sopenharmony_ci * See the License for the specific language governing permissions and
17e5c31af7Sopenharmony_ci * limitations under the License.
18e5c31af7Sopenharmony_ci *
19e5c31af7Sopenharmony_ci *//*!
20e5c31af7Sopenharmony_ci * \file
21e5c31af7Sopenharmony_ci * \brief Tcp/Ip communication link.
22e5c31af7Sopenharmony_ci *//*--------------------------------------------------------------------*/
23e5c31af7Sopenharmony_ci
24e5c31af7Sopenharmony_ci#include "xeTcpIpLink.hpp"
25e5c31af7Sopenharmony_ci#include "xsProtocol.hpp"
26e5c31af7Sopenharmony_ci#include "deClock.h"
27e5c31af7Sopenharmony_ci#include "deInt32.h"
28e5c31af7Sopenharmony_ci
29e5c31af7Sopenharmony_cinamespace xe
30e5c31af7Sopenharmony_ci{
31e5c31af7Sopenharmony_ci
32e5c31af7Sopenharmony_cienum
33e5c31af7Sopenharmony_ci{
34e5c31af7Sopenharmony_ci	SEND_BUFFER_BLOCK_SIZE		= 1024,
35e5c31af7Sopenharmony_ci	SEND_BUFFER_NUM_BLOCKS		= 64
36e5c31af7Sopenharmony_ci};
37e5c31af7Sopenharmony_ci
38e5c31af7Sopenharmony_ci// Utilities for writing messages out.
39e5c31af7Sopenharmony_ci
40e5c31af7Sopenharmony_cistatic void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize)
41e5c31af7Sopenharmony_ci{
42e5c31af7Sopenharmony_ci	deUint8 hdr[xs::MESSAGE_HEADER_SIZE];
43e5c31af7Sopenharmony_ci	xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44e5c31af7Sopenharmony_ci	dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
45e5c31af7Sopenharmony_ci}
46e5c31af7Sopenharmony_ci
47e5c31af7Sopenharmony_cistatic void writeKeepalive (de::BlockBuffer<deUint8>& dst)
48e5c31af7Sopenharmony_ci{
49e5c31af7Sopenharmony_ci	writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
50e5c31af7Sopenharmony_ci	dst.flush();
51e5c31af7Sopenharmony_ci}
52e5c31af7Sopenharmony_ci
53e5c31af7Sopenharmony_cistatic void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList)
54e5c31af7Sopenharmony_ci{
55e5c31af7Sopenharmony_ci	int		nameSize			= (int)strlen(name)		+ 1;
56e5c31af7Sopenharmony_ci	int		paramsSize			= (int)strlen(params)	+ 1;
57e5c31af7Sopenharmony_ci	int		workDirSize			= (int)strlen(workDir)	+ 1;
58e5c31af7Sopenharmony_ci	int		caseListSize		= (int)strlen(caseList)	+ 1;
59e5c31af7Sopenharmony_ci	int		totalSize			= xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
60e5c31af7Sopenharmony_ci
61e5c31af7Sopenharmony_ci	writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
62e5c31af7Sopenharmony_ci	dst.write(nameSize,		(const deUint8*)name);
63e5c31af7Sopenharmony_ci	dst.write(paramsSize,	(const deUint8*)params);
64e5c31af7Sopenharmony_ci	dst.write(workDirSize,	(const deUint8*)workDir);
65e5c31af7Sopenharmony_ci	dst.write(caseListSize,	(const deUint8*)caseList);
66e5c31af7Sopenharmony_ci	dst.flush();
67e5c31af7Sopenharmony_ci}
68e5c31af7Sopenharmony_ci
69e5c31af7Sopenharmony_cistatic void writeStopExecution (de::BlockBuffer<deUint8>& dst)
70e5c31af7Sopenharmony_ci{
71e5c31af7Sopenharmony_ci	writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
72e5c31af7Sopenharmony_ci	dst.flush();
73e5c31af7Sopenharmony_ci}
74e5c31af7Sopenharmony_ci
75e5c31af7Sopenharmony_ci// TcpIpLinkState
76e5c31af7Sopenharmony_ci
77e5c31af7Sopenharmony_ciTcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr)
78e5c31af7Sopenharmony_ci	: m_state					(initialState)
79e5c31af7Sopenharmony_ci	, m_error					(initialErr)
80e5c31af7Sopenharmony_ci	, m_lastKeepaliveReceived	(0)
81e5c31af7Sopenharmony_ci	, m_stateChangedCallback	(DE_NULL)
82e5c31af7Sopenharmony_ci	, m_testLogDataCallback		(DE_NULL)
83e5c31af7Sopenharmony_ci	, m_infoLogDataCallback		(DE_NULL)
84e5c31af7Sopenharmony_ci	, m_userPtr					(DE_NULL)
85e5c31af7Sopenharmony_ci{
86e5c31af7Sopenharmony_ci}
87e5c31af7Sopenharmony_ci
88e5c31af7Sopenharmony_ciTcpIpLinkState::~TcpIpLinkState (void)
89e5c31af7Sopenharmony_ci{
90e5c31af7Sopenharmony_ci}
91e5c31af7Sopenharmony_ci
92e5c31af7Sopenharmony_ciCommLinkState TcpIpLinkState::getState (void) const
93e5c31af7Sopenharmony_ci{
94e5c31af7Sopenharmony_ci	de::ScopedLock lock(m_lock);
95e5c31af7Sopenharmony_ci
96e5c31af7Sopenharmony_ci	return m_state;
97e5c31af7Sopenharmony_ci}
98e5c31af7Sopenharmony_ci
99e5c31af7Sopenharmony_ciCommLinkState TcpIpLinkState::getState (std::string& error) const
100e5c31af7Sopenharmony_ci{
101e5c31af7Sopenharmony_ci	de::ScopedLock lock(m_lock);
102e5c31af7Sopenharmony_ci
103e5c31af7Sopenharmony_ci	error = m_error;
104e5c31af7Sopenharmony_ci	return m_state;
105e5c31af7Sopenharmony_ci}
106e5c31af7Sopenharmony_ci
107e5c31af7Sopenharmony_civoid TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr)
108e5c31af7Sopenharmony_ci{
109e5c31af7Sopenharmony_ci	de::ScopedLock lock(m_lock);
110e5c31af7Sopenharmony_ci
111e5c31af7Sopenharmony_ci	m_stateChangedCallback		= stateChangedCallback;
112e5c31af7Sopenharmony_ci	m_testLogDataCallback		= testLogDataCallback;
113e5c31af7Sopenharmony_ci	m_infoLogDataCallback		= infoLogDataCallback;
114e5c31af7Sopenharmony_ci	m_userPtr					= userPtr;
115e5c31af7Sopenharmony_ci}
116e5c31af7Sopenharmony_ci
117e5c31af7Sopenharmony_civoid TcpIpLinkState::setState (CommLinkState state, const char* error)
118e5c31af7Sopenharmony_ci{
119e5c31af7Sopenharmony_ci	CommLink::StateChangedFunc	callback	= DE_NULL;
120e5c31af7Sopenharmony_ci	void*						userPtr		= DE_NULL;
121e5c31af7Sopenharmony_ci
122e5c31af7Sopenharmony_ci	{
123e5c31af7Sopenharmony_ci		de::ScopedLock lock(m_lock);
124e5c31af7Sopenharmony_ci
125e5c31af7Sopenharmony_ci		m_state = state;
126e5c31af7Sopenharmony_ci		m_error	= error;
127e5c31af7Sopenharmony_ci
128e5c31af7Sopenharmony_ci		callback	= m_stateChangedCallback;
129e5c31af7Sopenharmony_ci		userPtr		= m_userPtr;
130e5c31af7Sopenharmony_ci	}
131e5c31af7Sopenharmony_ci
132e5c31af7Sopenharmony_ci	if (callback)
133e5c31af7Sopenharmony_ci		callback(userPtr, state, error);
134e5c31af7Sopenharmony_ci}
135e5c31af7Sopenharmony_ci
136e5c31af7Sopenharmony_civoid TcpIpLinkState::onTestLogData (const deUint8* bytes, size_t numBytes) const
137e5c31af7Sopenharmony_ci{
138e5c31af7Sopenharmony_ci	CommLink::LogDataFunc	callback	= DE_NULL;
139e5c31af7Sopenharmony_ci	void*					userPtr		= DE_NULL;
140e5c31af7Sopenharmony_ci
141e5c31af7Sopenharmony_ci	m_lock.lock();
142e5c31af7Sopenharmony_ci	callback	= m_testLogDataCallback;
143e5c31af7Sopenharmony_ci	userPtr		= m_userPtr;
144e5c31af7Sopenharmony_ci	m_lock.unlock();
145e5c31af7Sopenharmony_ci
146e5c31af7Sopenharmony_ci	if (callback)
147e5c31af7Sopenharmony_ci		callback(userPtr, bytes, numBytes);
148e5c31af7Sopenharmony_ci}
149e5c31af7Sopenharmony_ci
150e5c31af7Sopenharmony_civoid TcpIpLinkState::onInfoLogData (const deUint8* bytes, size_t numBytes) const
151e5c31af7Sopenharmony_ci{
152e5c31af7Sopenharmony_ci	CommLink::LogDataFunc	callback	= DE_NULL;
153e5c31af7Sopenharmony_ci	void*					userPtr		= DE_NULL;
154e5c31af7Sopenharmony_ci
155e5c31af7Sopenharmony_ci	m_lock.lock();
156e5c31af7Sopenharmony_ci	callback	= m_infoLogDataCallback;
157e5c31af7Sopenharmony_ci	userPtr		= m_userPtr;
158e5c31af7Sopenharmony_ci	m_lock.unlock();
159e5c31af7Sopenharmony_ci
160e5c31af7Sopenharmony_ci	if (callback)
161e5c31af7Sopenharmony_ci		callback(userPtr, bytes, numBytes);
162e5c31af7Sopenharmony_ci}
163e5c31af7Sopenharmony_ci
164e5c31af7Sopenharmony_civoid TcpIpLinkState::onKeepaliveReceived (void)
165e5c31af7Sopenharmony_ci{
166e5c31af7Sopenharmony_ci	de::ScopedLock lock(m_lock);
167e5c31af7Sopenharmony_ci	m_lastKeepaliveReceived = deGetMicroseconds();
168e5c31af7Sopenharmony_ci}
169e5c31af7Sopenharmony_ci
170e5c31af7Sopenharmony_cideUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const
171e5c31af7Sopenharmony_ci{
172e5c31af7Sopenharmony_ci	de::ScopedLock lock(m_lock);
173e5c31af7Sopenharmony_ci	return m_lastKeepaliveReceived;
174e5c31af7Sopenharmony_ci}
175e5c31af7Sopenharmony_ci
176e5c31af7Sopenharmony_ci// TcpIpSendThread
177e5c31af7Sopenharmony_ci
178e5c31af7Sopenharmony_ciTcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state)
179e5c31af7Sopenharmony_ci	: m_socket		(socket)
180e5c31af7Sopenharmony_ci	, m_state		(state)
181e5c31af7Sopenharmony_ci	, m_buffer		(SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
182e5c31af7Sopenharmony_ci	, m_isRunning	(false)
183e5c31af7Sopenharmony_ci{
184e5c31af7Sopenharmony_ci}
185e5c31af7Sopenharmony_ci
186e5c31af7Sopenharmony_ciTcpIpSendThread::~TcpIpSendThread (void)
187e5c31af7Sopenharmony_ci{
188e5c31af7Sopenharmony_ci}
189e5c31af7Sopenharmony_ci
190e5c31af7Sopenharmony_civoid TcpIpSendThread::start (void)
191e5c31af7Sopenharmony_ci{
192e5c31af7Sopenharmony_ci	DE_ASSERT(!m_isRunning);
193e5c31af7Sopenharmony_ci
194e5c31af7Sopenharmony_ci	// Reset state.
195e5c31af7Sopenharmony_ci	m_buffer.clear();
196e5c31af7Sopenharmony_ci	m_isRunning = true;
197e5c31af7Sopenharmony_ci
198e5c31af7Sopenharmony_ci	de::Thread::start();
199e5c31af7Sopenharmony_ci}
200e5c31af7Sopenharmony_ci
201e5c31af7Sopenharmony_civoid TcpIpSendThread::run (void)
202e5c31af7Sopenharmony_ci{
203e5c31af7Sopenharmony_ci	try
204e5c31af7Sopenharmony_ci	{
205e5c31af7Sopenharmony_ci		deUint8 buf[SEND_BUFFER_BLOCK_SIZE];
206e5c31af7Sopenharmony_ci
207e5c31af7Sopenharmony_ci		while (!m_buffer.isCanceled())
208e5c31af7Sopenharmony_ci		{
209e5c31af7Sopenharmony_ci			size_t			numToSend	= 0;
210e5c31af7Sopenharmony_ci			size_t			numSent		= 0;
211e5c31af7Sopenharmony_ci			deSocketResult	result		= DE_SOCKETRESULT_LAST;
212e5c31af7Sopenharmony_ci
213e5c31af7Sopenharmony_ci			try
214e5c31af7Sopenharmony_ci			{
215e5c31af7Sopenharmony_ci				// Wait for single byte and then try to read more.
216e5c31af7Sopenharmony_ci				m_buffer.read(1, &buf[0]);
217e5c31af7Sopenharmony_ci				numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]);
218e5c31af7Sopenharmony_ci			}
219e5c31af7Sopenharmony_ci			catch (const de::BlockBuffer<deUint8>::CanceledException&)
220e5c31af7Sopenharmony_ci			{
221e5c31af7Sopenharmony_ci				// Handled in loop condition.
222e5c31af7Sopenharmony_ci			}
223e5c31af7Sopenharmony_ci
224e5c31af7Sopenharmony_ci			while (numSent < numToSend)
225e5c31af7Sopenharmony_ci			{
226e5c31af7Sopenharmony_ci				result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent);
227e5c31af7Sopenharmony_ci
228e5c31af7Sopenharmony_ci				if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
229e5c31af7Sopenharmony_ci					XE_FAIL("Connection closed");
230e5c31af7Sopenharmony_ci				else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
231e5c31af7Sopenharmony_ci					XE_FAIL("Connection terminated");
232e5c31af7Sopenharmony_ci				else if (result == DE_SOCKETRESULT_ERROR)
233e5c31af7Sopenharmony_ci					XE_FAIL("Socket error");
234e5c31af7Sopenharmony_ci				else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
235e5c31af7Sopenharmony_ci				{
236e5c31af7Sopenharmony_ci					// \note Socket should not be in non-blocking mode.
237e5c31af7Sopenharmony_ci					DE_ASSERT(numSent == 0);
238e5c31af7Sopenharmony_ci					deYield();
239e5c31af7Sopenharmony_ci				}
240e5c31af7Sopenharmony_ci				else
241e5c31af7Sopenharmony_ci					DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
242e5c31af7Sopenharmony_ci			}
243e5c31af7Sopenharmony_ci		}
244e5c31af7Sopenharmony_ci	}
245e5c31af7Sopenharmony_ci	catch (const std::exception& e)
246e5c31af7Sopenharmony_ci	{
247e5c31af7Sopenharmony_ci		m_state.setState(COMMLINKSTATE_ERROR, e.what());
248e5c31af7Sopenharmony_ci	}
249e5c31af7Sopenharmony_ci}
250e5c31af7Sopenharmony_ci
251e5c31af7Sopenharmony_civoid TcpIpSendThread::stop (void)
252e5c31af7Sopenharmony_ci{
253e5c31af7Sopenharmony_ci	if (m_isRunning)
254e5c31af7Sopenharmony_ci	{
255e5c31af7Sopenharmony_ci		m_buffer.cancel();
256e5c31af7Sopenharmony_ci		join();
257e5c31af7Sopenharmony_ci		m_isRunning = false;
258e5c31af7Sopenharmony_ci	}
259e5c31af7Sopenharmony_ci}
260e5c31af7Sopenharmony_ci
261e5c31af7Sopenharmony_ci// TcpIpRecvThread
262e5c31af7Sopenharmony_ci
263e5c31af7Sopenharmony_ciTcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state)
264e5c31af7Sopenharmony_ci	: m_socket		(socket)
265e5c31af7Sopenharmony_ci	, m_state		(state)
266e5c31af7Sopenharmony_ci	, m_curMsgPos	(0)
267e5c31af7Sopenharmony_ci	, m_isRunning	(false)
268e5c31af7Sopenharmony_ci{
269e5c31af7Sopenharmony_ci}
270e5c31af7Sopenharmony_ci
271e5c31af7Sopenharmony_ciTcpIpRecvThread::~TcpIpRecvThread (void)
272e5c31af7Sopenharmony_ci{
273e5c31af7Sopenharmony_ci}
274e5c31af7Sopenharmony_ci
275e5c31af7Sopenharmony_civoid TcpIpRecvThread::start (void)
276e5c31af7Sopenharmony_ci{
277e5c31af7Sopenharmony_ci	DE_ASSERT(!m_isRunning);
278e5c31af7Sopenharmony_ci
279e5c31af7Sopenharmony_ci	// Reset state.
280e5c31af7Sopenharmony_ci	m_curMsgPos = 0;
281e5c31af7Sopenharmony_ci	m_isRunning = true;
282e5c31af7Sopenharmony_ci
283e5c31af7Sopenharmony_ci	de::Thread::start();
284e5c31af7Sopenharmony_ci}
285e5c31af7Sopenharmony_ci
286e5c31af7Sopenharmony_civoid TcpIpRecvThread::run (void)
287e5c31af7Sopenharmony_ci{
288e5c31af7Sopenharmony_ci	try
289e5c31af7Sopenharmony_ci	{
290e5c31af7Sopenharmony_ci		for (;;)
291e5c31af7Sopenharmony_ci		{
292e5c31af7Sopenharmony_ci			bool				hasHeader		= m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
293e5c31af7Sopenharmony_ci			bool				hasPayload		= false;
294e5c31af7Sopenharmony_ci			size_t				messageSize		= 0;
295e5c31af7Sopenharmony_ci			xs::MessageType		messageType		= (xs::MessageType)0;
296e5c31af7Sopenharmony_ci
297e5c31af7Sopenharmony_ci			if (hasHeader)
298e5c31af7Sopenharmony_ci			{
299e5c31af7Sopenharmony_ci				xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
300e5c31af7Sopenharmony_ci				hasPayload = m_curMsgPos >= messageSize;
301e5c31af7Sopenharmony_ci			}
302e5c31af7Sopenharmony_ci
303e5c31af7Sopenharmony_ci			if (hasPayload)
304e5c31af7Sopenharmony_ci			{
305e5c31af7Sopenharmony_ci				// Process message.
306e5c31af7Sopenharmony_ci				handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE);
307e5c31af7Sopenharmony_ci				m_curMsgPos = 0;
308e5c31af7Sopenharmony_ci			}
309e5c31af7Sopenharmony_ci			else
310e5c31af7Sopenharmony_ci			{
311e5c31af7Sopenharmony_ci				// Try to receive missing bytes.
312e5c31af7Sopenharmony_ci				size_t				curSize			= hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE;
313e5c31af7Sopenharmony_ci				size_t				bytesToRecv		= curSize-m_curMsgPos;
314e5c31af7Sopenharmony_ci				size_t				numRecv			= 0;
315e5c31af7Sopenharmony_ci				deSocketResult		result			= DE_SOCKETRESULT_LAST;
316e5c31af7Sopenharmony_ci
317e5c31af7Sopenharmony_ci				if (m_curMsgBuf.size() < curSize)
318e5c31af7Sopenharmony_ci					m_curMsgBuf.resize(curSize);
319e5c31af7Sopenharmony_ci
320e5c31af7Sopenharmony_ci				result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
321e5c31af7Sopenharmony_ci
322e5c31af7Sopenharmony_ci				if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
323e5c31af7Sopenharmony_ci					XE_FAIL("Connection closed");
324e5c31af7Sopenharmony_ci				else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
325e5c31af7Sopenharmony_ci					XE_FAIL("Connection terminated");
326e5c31af7Sopenharmony_ci				else if (result == DE_SOCKETRESULT_ERROR)
327e5c31af7Sopenharmony_ci					XE_FAIL("Socket error");
328e5c31af7Sopenharmony_ci				else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
329e5c31af7Sopenharmony_ci				{
330e5c31af7Sopenharmony_ci					// \note Socket should not be in non-blocking mode.
331e5c31af7Sopenharmony_ci					DE_ASSERT(numRecv == 0);
332e5c31af7Sopenharmony_ci					deYield();
333e5c31af7Sopenharmony_ci				}
334e5c31af7Sopenharmony_ci				else
335e5c31af7Sopenharmony_ci				{
336e5c31af7Sopenharmony_ci					DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
337e5c31af7Sopenharmony_ci					DE_ASSERT(numRecv <= bytesToRecv);
338e5c31af7Sopenharmony_ci					m_curMsgPos += numRecv;
339e5c31af7Sopenharmony_ci					// Continue receiving bytes / handle message in next iter.
340e5c31af7Sopenharmony_ci				}
341e5c31af7Sopenharmony_ci			}
342e5c31af7Sopenharmony_ci		}
343e5c31af7Sopenharmony_ci	}
344e5c31af7Sopenharmony_ci	catch (const std::exception& e)
345e5c31af7Sopenharmony_ci	{
346e5c31af7Sopenharmony_ci		m_state.setState(COMMLINKSTATE_ERROR, e.what());
347e5c31af7Sopenharmony_ci	}
348e5c31af7Sopenharmony_ci}
349e5c31af7Sopenharmony_ci
350e5c31af7Sopenharmony_civoid TcpIpRecvThread::stop (void)
351e5c31af7Sopenharmony_ci{
352e5c31af7Sopenharmony_ci	if (m_isRunning)
353e5c31af7Sopenharmony_ci	{
354e5c31af7Sopenharmony_ci		// \note Socket must be closed before terminating receive thread.
355e5c31af7Sopenharmony_ci		XE_CHECK(!m_socket.isReceiveOpen());
356e5c31af7Sopenharmony_ci
357e5c31af7Sopenharmony_ci		join();
358e5c31af7Sopenharmony_ci		m_isRunning = false;
359e5c31af7Sopenharmony_ci	}
360e5c31af7Sopenharmony_ci}
361e5c31af7Sopenharmony_ci
362e5c31af7Sopenharmony_civoid TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, size_t dataSize)
363e5c31af7Sopenharmony_ci{
364e5c31af7Sopenharmony_ci	switch (messageType)
365e5c31af7Sopenharmony_ci	{
366e5c31af7Sopenharmony_ci		case xs::MESSAGETYPE_KEEPALIVE:
367e5c31af7Sopenharmony_ci			m_state.onKeepaliveReceived();
368e5c31af7Sopenharmony_ci			break;
369e5c31af7Sopenharmony_ci
370e5c31af7Sopenharmony_ci		case xs::MESSAGETYPE_PROCESS_STARTED:
371e5c31af7Sopenharmony_ci			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
372e5c31af7Sopenharmony_ci			m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
373e5c31af7Sopenharmony_ci			break;
374e5c31af7Sopenharmony_ci
375e5c31af7Sopenharmony_ci		case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
376e5c31af7Sopenharmony_ci		{
377e5c31af7Sopenharmony_ci			xs::ProcessLaunchFailedMessage msg(data, dataSize);
378e5c31af7Sopenharmony_ci			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message");
379e5c31af7Sopenharmony_ci			m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
380e5c31af7Sopenharmony_ci			break;
381e5c31af7Sopenharmony_ci		}
382e5c31af7Sopenharmony_ci
383e5c31af7Sopenharmony_ci		case xs::MESSAGETYPE_PROCESS_FINISHED:
384e5c31af7Sopenharmony_ci		{
385e5c31af7Sopenharmony_ci			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
386e5c31af7Sopenharmony_ci			xs::ProcessFinishedMessage msg(data, dataSize);
387e5c31af7Sopenharmony_ci			m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
388e5c31af7Sopenharmony_ci			DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
389e5c31af7Sopenharmony_ci			break;
390e5c31af7Sopenharmony_ci		}
391e5c31af7Sopenharmony_ci
392e5c31af7Sopenharmony_ci		case xs::MESSAGETYPE_PROCESS_LOG_DATA:
393e5c31af7Sopenharmony_ci		case xs::MESSAGETYPE_INFO:
394e5c31af7Sopenharmony_ci			// Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
395e5c31af7Sopenharmony_ci			if (data[dataSize-1] == 0)
396e5c31af7Sopenharmony_ci				dataSize -= 1;
397e5c31af7Sopenharmony_ci
398e5c31af7Sopenharmony_ci			if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
399e5c31af7Sopenharmony_ci			{
400e5c31af7Sopenharmony_ci				XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message");
401e5c31af7Sopenharmony_ci				m_state.onTestLogData(&data[0], dataSize);
402e5c31af7Sopenharmony_ci			}
403e5c31af7Sopenharmony_ci			else
404e5c31af7Sopenharmony_ci				m_state.onInfoLogData(&data[0], dataSize);
405e5c31af7Sopenharmony_ci			break;
406e5c31af7Sopenharmony_ci
407e5c31af7Sopenharmony_ci		default:
408e5c31af7Sopenharmony_ci			XE_FAIL("Unknown message");
409e5c31af7Sopenharmony_ci	}
410e5c31af7Sopenharmony_ci}
411e5c31af7Sopenharmony_ci
412e5c31af7Sopenharmony_ci// TcpIpLink
413e5c31af7Sopenharmony_ci
414e5c31af7Sopenharmony_ciTcpIpLink::TcpIpLink (void)
415e5c31af7Sopenharmony_ci	: m_state			(COMMLINKSTATE_ERROR, "Not connected")
416e5c31af7Sopenharmony_ci	, m_sendThread		(m_socket, m_state)
417e5c31af7Sopenharmony_ci	, m_recvThread		(m_socket, m_state)
418e5c31af7Sopenharmony_ci	, m_keepaliveTimer	(DE_NULL)
419e5c31af7Sopenharmony_ci{
420e5c31af7Sopenharmony_ci	m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
421e5c31af7Sopenharmony_ci	XE_CHECK(m_keepaliveTimer);
422e5c31af7Sopenharmony_ci}
423e5c31af7Sopenharmony_ci
424e5c31af7Sopenharmony_ciTcpIpLink::~TcpIpLink (void)
425e5c31af7Sopenharmony_ci{
426e5c31af7Sopenharmony_ci	try
427e5c31af7Sopenharmony_ci	{
428e5c31af7Sopenharmony_ci		closeConnection();
429e5c31af7Sopenharmony_ci	}
430e5c31af7Sopenharmony_ci	catch (...)
431e5c31af7Sopenharmony_ci	{
432e5c31af7Sopenharmony_ci		// Can't do much except to ignore error.
433e5c31af7Sopenharmony_ci	}
434e5c31af7Sopenharmony_ci	deTimer_destroy(m_keepaliveTimer);
435e5c31af7Sopenharmony_ci}
436e5c31af7Sopenharmony_ci
437e5c31af7Sopenharmony_civoid TcpIpLink::closeConnection (void)
438e5c31af7Sopenharmony_ci{
439e5c31af7Sopenharmony_ci	{
440e5c31af7Sopenharmony_ci		deSocketState state = m_socket.getState();
441e5c31af7Sopenharmony_ci		if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
442e5c31af7Sopenharmony_ci			m_socket.shutdown();
443e5c31af7Sopenharmony_ci	}
444e5c31af7Sopenharmony_ci
445e5c31af7Sopenharmony_ci	if (deTimer_isActive(m_keepaliveTimer))
446e5c31af7Sopenharmony_ci		deTimer_disable(m_keepaliveTimer);
447e5c31af7Sopenharmony_ci
448e5c31af7Sopenharmony_ci	if (m_sendThread.isRunning())
449e5c31af7Sopenharmony_ci		m_sendThread.stop();
450e5c31af7Sopenharmony_ci
451e5c31af7Sopenharmony_ci	if (m_recvThread.isRunning())
452e5c31af7Sopenharmony_ci		m_recvThread.stop();
453e5c31af7Sopenharmony_ci
454e5c31af7Sopenharmony_ci	if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
455e5c31af7Sopenharmony_ci		m_socket.close();
456e5c31af7Sopenharmony_ci}
457e5c31af7Sopenharmony_ci
458e5c31af7Sopenharmony_civoid TcpIpLink::connect (const de::SocketAddress& address)
459e5c31af7Sopenharmony_ci{
460e5c31af7Sopenharmony_ci	XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
461e5c31af7Sopenharmony_ci	XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
462e5c31af7Sopenharmony_ci	XE_CHECK(!m_sendThread.isRunning());
463e5c31af7Sopenharmony_ci	XE_CHECK(!m_recvThread.isRunning());
464e5c31af7Sopenharmony_ci
465e5c31af7Sopenharmony_ci	m_socket.connect(address);
466e5c31af7Sopenharmony_ci
467e5c31af7Sopenharmony_ci	try
468e5c31af7Sopenharmony_ci	{
469e5c31af7Sopenharmony_ci		// Clear error and set state to ready.
470e5c31af7Sopenharmony_ci		m_state.setState(COMMLINKSTATE_READY, "");
471e5c31af7Sopenharmony_ci		m_state.onKeepaliveReceived();
472e5c31af7Sopenharmony_ci
473e5c31af7Sopenharmony_ci		// Launch threads.
474e5c31af7Sopenharmony_ci		m_sendThread.start();
475e5c31af7Sopenharmony_ci		m_recvThread.start();
476e5c31af7Sopenharmony_ci
477e5c31af7Sopenharmony_ci		XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
478e5c31af7Sopenharmony_ci	}
479e5c31af7Sopenharmony_ci	catch (const std::exception& e)
480e5c31af7Sopenharmony_ci	{
481e5c31af7Sopenharmony_ci		closeConnection();
482e5c31af7Sopenharmony_ci		m_state.setState(COMMLINKSTATE_ERROR, e.what());
483e5c31af7Sopenharmony_ci		throw;
484e5c31af7Sopenharmony_ci	}
485e5c31af7Sopenharmony_ci}
486e5c31af7Sopenharmony_ci
487e5c31af7Sopenharmony_civoid TcpIpLink::disconnect (void)
488e5c31af7Sopenharmony_ci{
489e5c31af7Sopenharmony_ci	try
490e5c31af7Sopenharmony_ci	{
491e5c31af7Sopenharmony_ci		closeConnection();
492e5c31af7Sopenharmony_ci		m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
493e5c31af7Sopenharmony_ci	}
494e5c31af7Sopenharmony_ci	catch (const std::exception& e)
495e5c31af7Sopenharmony_ci	{
496e5c31af7Sopenharmony_ci		m_state.setState(COMMLINKSTATE_ERROR, e.what());
497e5c31af7Sopenharmony_ci	}
498e5c31af7Sopenharmony_ci}
499e5c31af7Sopenharmony_ci
500e5c31af7Sopenharmony_civoid TcpIpLink::reset (void)
501e5c31af7Sopenharmony_ci{
502e5c31af7Sopenharmony_ci	// \note Just clears error state if we are connected.
503e5c31af7Sopenharmony_ci	if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
504e5c31af7Sopenharmony_ci	{
505e5c31af7Sopenharmony_ci		m_state.setState(COMMLINKSTATE_READY, "");
506e5c31af7Sopenharmony_ci
507e5c31af7Sopenharmony_ci		// \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
508e5c31af7Sopenharmony_ci	}
509e5c31af7Sopenharmony_ci	else
510e5c31af7Sopenharmony_ci		disconnect(); // Abnormal state/usage. Disconnect socket.
511e5c31af7Sopenharmony_ci}
512e5c31af7Sopenharmony_ci
513e5c31af7Sopenharmony_civoid TcpIpLink::keepaliveTimerCallback (void* ptr)
514e5c31af7Sopenharmony_ci{
515e5c31af7Sopenharmony_ci	TcpIpLink*	link			= static_cast<TcpIpLink*>(ptr);
516e5c31af7Sopenharmony_ci	deUint64	lastKeepalive	= link->m_state.getLastKeepaliveRecevied();
517e5c31af7Sopenharmony_ci	deUint64	curTime			= deGetMicroseconds();
518e5c31af7Sopenharmony_ci
519e5c31af7Sopenharmony_ci	// Check for timeout.
520e5c31af7Sopenharmony_ci	if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000)
521e5c31af7Sopenharmony_ci		link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
522e5c31af7Sopenharmony_ci
523e5c31af7Sopenharmony_ci	// Enqueue new keepalive.
524e5c31af7Sopenharmony_ci	try
525e5c31af7Sopenharmony_ci	{
526e5c31af7Sopenharmony_ci		writeKeepalive(link->m_sendThread.getBuffer());
527e5c31af7Sopenharmony_ci	}
528e5c31af7Sopenharmony_ci	catch (const de::BlockBuffer<deUint8>::CanceledException&)
529e5c31af7Sopenharmony_ci	{
530e5c31af7Sopenharmony_ci		// Ignore. Can happen in connection teardown.
531e5c31af7Sopenharmony_ci	}
532e5c31af7Sopenharmony_ci}
533e5c31af7Sopenharmony_ci
534e5c31af7Sopenharmony_ciCommLinkState TcpIpLink::getState (void) const
535e5c31af7Sopenharmony_ci{
536e5c31af7Sopenharmony_ci	return m_state.getState();
537e5c31af7Sopenharmony_ci}
538e5c31af7Sopenharmony_ci
539e5c31af7Sopenharmony_ciCommLinkState TcpIpLink::getState (std::string& message) const
540e5c31af7Sopenharmony_ci{
541e5c31af7Sopenharmony_ci	return m_state.getState(message);
542e5c31af7Sopenharmony_ci}
543e5c31af7Sopenharmony_ci
544e5c31af7Sopenharmony_civoid TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr)
545e5c31af7Sopenharmony_ci{
546e5c31af7Sopenharmony_ci	m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
547e5c31af7Sopenharmony_ci}
548e5c31af7Sopenharmony_ci
549e5c31af7Sopenharmony_civoid TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList)
550e5c31af7Sopenharmony_ci{
551e5c31af7Sopenharmony_ci	XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
552e5c31af7Sopenharmony_ci
553e5c31af7Sopenharmony_ci	m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
554e5c31af7Sopenharmony_ci	writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
555e5c31af7Sopenharmony_ci}
556e5c31af7Sopenharmony_ci
557e5c31af7Sopenharmony_civoid TcpIpLink::stopTestProcess (void)
558e5c31af7Sopenharmony_ci{
559e5c31af7Sopenharmony_ci	XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
560e5c31af7Sopenharmony_ci	writeStopExecution(m_sendThread.getBuffer());
561e5c31af7Sopenharmony_ci}
562e5c31af7Sopenharmony_ci
563e5c31af7Sopenharmony_ci} // xe
564