1e5c31af7Sopenharmony_ci/*------------------------------------------------------------------------- 2e5c31af7Sopenharmony_ci * drawElements Quality Program Test Executor 3e5c31af7Sopenharmony_ci * ------------------------------------------ 4e5c31af7Sopenharmony_ci * 5e5c31af7Sopenharmony_ci * Copyright 2014 The Android Open Source Project 6e5c31af7Sopenharmony_ci * 7e5c31af7Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 8e5c31af7Sopenharmony_ci * you may not use this file except in compliance with the License. 9e5c31af7Sopenharmony_ci * You may obtain a copy of the License at 10e5c31af7Sopenharmony_ci * 11e5c31af7Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 12e5c31af7Sopenharmony_ci * 13e5c31af7Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 14e5c31af7Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 15e5c31af7Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16e5c31af7Sopenharmony_ci * See the License for the specific language governing permissions and 17e5c31af7Sopenharmony_ci * limitations under the License. 18e5c31af7Sopenharmony_ci * 19e5c31af7Sopenharmony_ci *//*! 20e5c31af7Sopenharmony_ci * \file 21e5c31af7Sopenharmony_ci * \brief Tcp/Ip communication link. 22e5c31af7Sopenharmony_ci *//*--------------------------------------------------------------------*/ 23e5c31af7Sopenharmony_ci 24e5c31af7Sopenharmony_ci#include "xeTcpIpLink.hpp" 25e5c31af7Sopenharmony_ci#include "xsProtocol.hpp" 26e5c31af7Sopenharmony_ci#include "deClock.h" 27e5c31af7Sopenharmony_ci#include "deInt32.h" 28e5c31af7Sopenharmony_ci 29e5c31af7Sopenharmony_cinamespace xe 30e5c31af7Sopenharmony_ci{ 31e5c31af7Sopenharmony_ci 32e5c31af7Sopenharmony_cienum 33e5c31af7Sopenharmony_ci{ 34e5c31af7Sopenharmony_ci SEND_BUFFER_BLOCK_SIZE = 1024, 35e5c31af7Sopenharmony_ci SEND_BUFFER_NUM_BLOCKS = 64 36e5c31af7Sopenharmony_ci}; 37e5c31af7Sopenharmony_ci 38e5c31af7Sopenharmony_ci// Utilities for writing messages out. 39e5c31af7Sopenharmony_ci 40e5c31af7Sopenharmony_cistatic void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize) 41e5c31af7Sopenharmony_ci{ 42e5c31af7Sopenharmony_ci deUint8 hdr[xs::MESSAGE_HEADER_SIZE]; 43e5c31af7Sopenharmony_ci xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE); 44e5c31af7Sopenharmony_ci dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]); 45e5c31af7Sopenharmony_ci} 46e5c31af7Sopenharmony_ci 47e5c31af7Sopenharmony_cistatic void writeKeepalive (de::BlockBuffer<deUint8>& dst) 48e5c31af7Sopenharmony_ci{ 49e5c31af7Sopenharmony_ci writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE); 50e5c31af7Sopenharmony_ci dst.flush(); 51e5c31af7Sopenharmony_ci} 52e5c31af7Sopenharmony_ci 53e5c31af7Sopenharmony_cistatic void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList) 54e5c31af7Sopenharmony_ci{ 55e5c31af7Sopenharmony_ci int nameSize = (int)strlen(name) + 1; 56e5c31af7Sopenharmony_ci int paramsSize = (int)strlen(params) + 1; 57e5c31af7Sopenharmony_ci int workDirSize = (int)strlen(workDir) + 1; 58e5c31af7Sopenharmony_ci int caseListSize = (int)strlen(caseList) + 1; 59e5c31af7Sopenharmony_ci int totalSize = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize; 60e5c31af7Sopenharmony_ci 61e5c31af7Sopenharmony_ci writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize); 62e5c31af7Sopenharmony_ci dst.write(nameSize, (const deUint8*)name); 63e5c31af7Sopenharmony_ci dst.write(paramsSize, (const deUint8*)params); 64e5c31af7Sopenharmony_ci dst.write(workDirSize, (const deUint8*)workDir); 65e5c31af7Sopenharmony_ci dst.write(caseListSize, (const deUint8*)caseList); 66e5c31af7Sopenharmony_ci dst.flush(); 67e5c31af7Sopenharmony_ci} 68e5c31af7Sopenharmony_ci 69e5c31af7Sopenharmony_cistatic void writeStopExecution (de::BlockBuffer<deUint8>& dst) 70e5c31af7Sopenharmony_ci{ 71e5c31af7Sopenharmony_ci writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE); 72e5c31af7Sopenharmony_ci dst.flush(); 73e5c31af7Sopenharmony_ci} 74e5c31af7Sopenharmony_ci 75e5c31af7Sopenharmony_ci// TcpIpLinkState 76e5c31af7Sopenharmony_ci 77e5c31af7Sopenharmony_ciTcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr) 78e5c31af7Sopenharmony_ci : m_state (initialState) 79e5c31af7Sopenharmony_ci , m_error (initialErr) 80e5c31af7Sopenharmony_ci , m_lastKeepaliveReceived (0) 81e5c31af7Sopenharmony_ci , m_stateChangedCallback (DE_NULL) 82e5c31af7Sopenharmony_ci , m_testLogDataCallback (DE_NULL) 83e5c31af7Sopenharmony_ci , m_infoLogDataCallback (DE_NULL) 84e5c31af7Sopenharmony_ci , m_userPtr (DE_NULL) 85e5c31af7Sopenharmony_ci{ 86e5c31af7Sopenharmony_ci} 87e5c31af7Sopenharmony_ci 88e5c31af7Sopenharmony_ciTcpIpLinkState::~TcpIpLinkState (void) 89e5c31af7Sopenharmony_ci{ 90e5c31af7Sopenharmony_ci} 91e5c31af7Sopenharmony_ci 92e5c31af7Sopenharmony_ciCommLinkState TcpIpLinkState::getState (void) const 93e5c31af7Sopenharmony_ci{ 94e5c31af7Sopenharmony_ci de::ScopedLock lock(m_lock); 95e5c31af7Sopenharmony_ci 96e5c31af7Sopenharmony_ci return m_state; 97e5c31af7Sopenharmony_ci} 98e5c31af7Sopenharmony_ci 99e5c31af7Sopenharmony_ciCommLinkState TcpIpLinkState::getState (std::string& error) const 100e5c31af7Sopenharmony_ci{ 101e5c31af7Sopenharmony_ci de::ScopedLock lock(m_lock); 102e5c31af7Sopenharmony_ci 103e5c31af7Sopenharmony_ci error = m_error; 104e5c31af7Sopenharmony_ci return m_state; 105e5c31af7Sopenharmony_ci} 106e5c31af7Sopenharmony_ci 107e5c31af7Sopenharmony_civoid TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr) 108e5c31af7Sopenharmony_ci{ 109e5c31af7Sopenharmony_ci de::ScopedLock lock(m_lock); 110e5c31af7Sopenharmony_ci 111e5c31af7Sopenharmony_ci m_stateChangedCallback = stateChangedCallback; 112e5c31af7Sopenharmony_ci m_testLogDataCallback = testLogDataCallback; 113e5c31af7Sopenharmony_ci m_infoLogDataCallback = infoLogDataCallback; 114e5c31af7Sopenharmony_ci m_userPtr = userPtr; 115e5c31af7Sopenharmony_ci} 116e5c31af7Sopenharmony_ci 117e5c31af7Sopenharmony_civoid TcpIpLinkState::setState (CommLinkState state, const char* error) 118e5c31af7Sopenharmony_ci{ 119e5c31af7Sopenharmony_ci CommLink::StateChangedFunc callback = DE_NULL; 120e5c31af7Sopenharmony_ci void* userPtr = DE_NULL; 121e5c31af7Sopenharmony_ci 122e5c31af7Sopenharmony_ci { 123e5c31af7Sopenharmony_ci de::ScopedLock lock(m_lock); 124e5c31af7Sopenharmony_ci 125e5c31af7Sopenharmony_ci m_state = state; 126e5c31af7Sopenharmony_ci m_error = error; 127e5c31af7Sopenharmony_ci 128e5c31af7Sopenharmony_ci callback = m_stateChangedCallback; 129e5c31af7Sopenharmony_ci userPtr = m_userPtr; 130e5c31af7Sopenharmony_ci } 131e5c31af7Sopenharmony_ci 132e5c31af7Sopenharmony_ci if (callback) 133e5c31af7Sopenharmony_ci callback(userPtr, state, error); 134e5c31af7Sopenharmony_ci} 135e5c31af7Sopenharmony_ci 136e5c31af7Sopenharmony_civoid TcpIpLinkState::onTestLogData (const deUint8* bytes, size_t numBytes) const 137e5c31af7Sopenharmony_ci{ 138e5c31af7Sopenharmony_ci CommLink::LogDataFunc callback = DE_NULL; 139e5c31af7Sopenharmony_ci void* userPtr = DE_NULL; 140e5c31af7Sopenharmony_ci 141e5c31af7Sopenharmony_ci m_lock.lock(); 142e5c31af7Sopenharmony_ci callback = m_testLogDataCallback; 143e5c31af7Sopenharmony_ci userPtr = m_userPtr; 144e5c31af7Sopenharmony_ci m_lock.unlock(); 145e5c31af7Sopenharmony_ci 146e5c31af7Sopenharmony_ci if (callback) 147e5c31af7Sopenharmony_ci callback(userPtr, bytes, numBytes); 148e5c31af7Sopenharmony_ci} 149e5c31af7Sopenharmony_ci 150e5c31af7Sopenharmony_civoid TcpIpLinkState::onInfoLogData (const deUint8* bytes, size_t numBytes) const 151e5c31af7Sopenharmony_ci{ 152e5c31af7Sopenharmony_ci CommLink::LogDataFunc callback = DE_NULL; 153e5c31af7Sopenharmony_ci void* userPtr = DE_NULL; 154e5c31af7Sopenharmony_ci 155e5c31af7Sopenharmony_ci m_lock.lock(); 156e5c31af7Sopenharmony_ci callback = m_infoLogDataCallback; 157e5c31af7Sopenharmony_ci userPtr = m_userPtr; 158e5c31af7Sopenharmony_ci m_lock.unlock(); 159e5c31af7Sopenharmony_ci 160e5c31af7Sopenharmony_ci if (callback) 161e5c31af7Sopenharmony_ci callback(userPtr, bytes, numBytes); 162e5c31af7Sopenharmony_ci} 163e5c31af7Sopenharmony_ci 164e5c31af7Sopenharmony_civoid TcpIpLinkState::onKeepaliveReceived (void) 165e5c31af7Sopenharmony_ci{ 166e5c31af7Sopenharmony_ci de::ScopedLock lock(m_lock); 167e5c31af7Sopenharmony_ci m_lastKeepaliveReceived = deGetMicroseconds(); 168e5c31af7Sopenharmony_ci} 169e5c31af7Sopenharmony_ci 170e5c31af7Sopenharmony_cideUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const 171e5c31af7Sopenharmony_ci{ 172e5c31af7Sopenharmony_ci de::ScopedLock lock(m_lock); 173e5c31af7Sopenharmony_ci return m_lastKeepaliveReceived; 174e5c31af7Sopenharmony_ci} 175e5c31af7Sopenharmony_ci 176e5c31af7Sopenharmony_ci// TcpIpSendThread 177e5c31af7Sopenharmony_ci 178e5c31af7Sopenharmony_ciTcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state) 179e5c31af7Sopenharmony_ci : m_socket (socket) 180e5c31af7Sopenharmony_ci , m_state (state) 181e5c31af7Sopenharmony_ci , m_buffer (SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS) 182e5c31af7Sopenharmony_ci , m_isRunning (false) 183e5c31af7Sopenharmony_ci{ 184e5c31af7Sopenharmony_ci} 185e5c31af7Sopenharmony_ci 186e5c31af7Sopenharmony_ciTcpIpSendThread::~TcpIpSendThread (void) 187e5c31af7Sopenharmony_ci{ 188e5c31af7Sopenharmony_ci} 189e5c31af7Sopenharmony_ci 190e5c31af7Sopenharmony_civoid TcpIpSendThread::start (void) 191e5c31af7Sopenharmony_ci{ 192e5c31af7Sopenharmony_ci DE_ASSERT(!m_isRunning); 193e5c31af7Sopenharmony_ci 194e5c31af7Sopenharmony_ci // Reset state. 195e5c31af7Sopenharmony_ci m_buffer.clear(); 196e5c31af7Sopenharmony_ci m_isRunning = true; 197e5c31af7Sopenharmony_ci 198e5c31af7Sopenharmony_ci de::Thread::start(); 199e5c31af7Sopenharmony_ci} 200e5c31af7Sopenharmony_ci 201e5c31af7Sopenharmony_civoid TcpIpSendThread::run (void) 202e5c31af7Sopenharmony_ci{ 203e5c31af7Sopenharmony_ci try 204e5c31af7Sopenharmony_ci { 205e5c31af7Sopenharmony_ci deUint8 buf[SEND_BUFFER_BLOCK_SIZE]; 206e5c31af7Sopenharmony_ci 207e5c31af7Sopenharmony_ci while (!m_buffer.isCanceled()) 208e5c31af7Sopenharmony_ci { 209e5c31af7Sopenharmony_ci size_t numToSend = 0; 210e5c31af7Sopenharmony_ci size_t numSent = 0; 211e5c31af7Sopenharmony_ci deSocketResult result = DE_SOCKETRESULT_LAST; 212e5c31af7Sopenharmony_ci 213e5c31af7Sopenharmony_ci try 214e5c31af7Sopenharmony_ci { 215e5c31af7Sopenharmony_ci // Wait for single byte and then try to read more. 216e5c31af7Sopenharmony_ci m_buffer.read(1, &buf[0]); 217e5c31af7Sopenharmony_ci numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]); 218e5c31af7Sopenharmony_ci } 219e5c31af7Sopenharmony_ci catch (const de::BlockBuffer<deUint8>::CanceledException&) 220e5c31af7Sopenharmony_ci { 221e5c31af7Sopenharmony_ci // Handled in loop condition. 222e5c31af7Sopenharmony_ci } 223e5c31af7Sopenharmony_ci 224e5c31af7Sopenharmony_ci while (numSent < numToSend) 225e5c31af7Sopenharmony_ci { 226e5c31af7Sopenharmony_ci result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent); 227e5c31af7Sopenharmony_ci 228e5c31af7Sopenharmony_ci if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) 229e5c31af7Sopenharmony_ci XE_FAIL("Connection closed"); 230e5c31af7Sopenharmony_ci else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) 231e5c31af7Sopenharmony_ci XE_FAIL("Connection terminated"); 232e5c31af7Sopenharmony_ci else if (result == DE_SOCKETRESULT_ERROR) 233e5c31af7Sopenharmony_ci XE_FAIL("Socket error"); 234e5c31af7Sopenharmony_ci else if (result == DE_SOCKETRESULT_WOULD_BLOCK) 235e5c31af7Sopenharmony_ci { 236e5c31af7Sopenharmony_ci // \note Socket should not be in non-blocking mode. 237e5c31af7Sopenharmony_ci DE_ASSERT(numSent == 0); 238e5c31af7Sopenharmony_ci deYield(); 239e5c31af7Sopenharmony_ci } 240e5c31af7Sopenharmony_ci else 241e5c31af7Sopenharmony_ci DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS); 242e5c31af7Sopenharmony_ci } 243e5c31af7Sopenharmony_ci } 244e5c31af7Sopenharmony_ci } 245e5c31af7Sopenharmony_ci catch (const std::exception& e) 246e5c31af7Sopenharmony_ci { 247e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_ERROR, e.what()); 248e5c31af7Sopenharmony_ci } 249e5c31af7Sopenharmony_ci} 250e5c31af7Sopenharmony_ci 251e5c31af7Sopenharmony_civoid TcpIpSendThread::stop (void) 252e5c31af7Sopenharmony_ci{ 253e5c31af7Sopenharmony_ci if (m_isRunning) 254e5c31af7Sopenharmony_ci { 255e5c31af7Sopenharmony_ci m_buffer.cancel(); 256e5c31af7Sopenharmony_ci join(); 257e5c31af7Sopenharmony_ci m_isRunning = false; 258e5c31af7Sopenharmony_ci } 259e5c31af7Sopenharmony_ci} 260e5c31af7Sopenharmony_ci 261e5c31af7Sopenharmony_ci// TcpIpRecvThread 262e5c31af7Sopenharmony_ci 263e5c31af7Sopenharmony_ciTcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state) 264e5c31af7Sopenharmony_ci : m_socket (socket) 265e5c31af7Sopenharmony_ci , m_state (state) 266e5c31af7Sopenharmony_ci , m_curMsgPos (0) 267e5c31af7Sopenharmony_ci , m_isRunning (false) 268e5c31af7Sopenharmony_ci{ 269e5c31af7Sopenharmony_ci} 270e5c31af7Sopenharmony_ci 271e5c31af7Sopenharmony_ciTcpIpRecvThread::~TcpIpRecvThread (void) 272e5c31af7Sopenharmony_ci{ 273e5c31af7Sopenharmony_ci} 274e5c31af7Sopenharmony_ci 275e5c31af7Sopenharmony_civoid TcpIpRecvThread::start (void) 276e5c31af7Sopenharmony_ci{ 277e5c31af7Sopenharmony_ci DE_ASSERT(!m_isRunning); 278e5c31af7Sopenharmony_ci 279e5c31af7Sopenharmony_ci // Reset state. 280e5c31af7Sopenharmony_ci m_curMsgPos = 0; 281e5c31af7Sopenharmony_ci m_isRunning = true; 282e5c31af7Sopenharmony_ci 283e5c31af7Sopenharmony_ci de::Thread::start(); 284e5c31af7Sopenharmony_ci} 285e5c31af7Sopenharmony_ci 286e5c31af7Sopenharmony_civoid TcpIpRecvThread::run (void) 287e5c31af7Sopenharmony_ci{ 288e5c31af7Sopenharmony_ci try 289e5c31af7Sopenharmony_ci { 290e5c31af7Sopenharmony_ci for (;;) 291e5c31af7Sopenharmony_ci { 292e5c31af7Sopenharmony_ci bool hasHeader = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE; 293e5c31af7Sopenharmony_ci bool hasPayload = false; 294e5c31af7Sopenharmony_ci size_t messageSize = 0; 295e5c31af7Sopenharmony_ci xs::MessageType messageType = (xs::MessageType)0; 296e5c31af7Sopenharmony_ci 297e5c31af7Sopenharmony_ci if (hasHeader) 298e5c31af7Sopenharmony_ci { 299e5c31af7Sopenharmony_ci xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize); 300e5c31af7Sopenharmony_ci hasPayload = m_curMsgPos >= messageSize; 301e5c31af7Sopenharmony_ci } 302e5c31af7Sopenharmony_ci 303e5c31af7Sopenharmony_ci if (hasPayload) 304e5c31af7Sopenharmony_ci { 305e5c31af7Sopenharmony_ci // Process message. 306e5c31af7Sopenharmony_ci handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE); 307e5c31af7Sopenharmony_ci m_curMsgPos = 0; 308e5c31af7Sopenharmony_ci } 309e5c31af7Sopenharmony_ci else 310e5c31af7Sopenharmony_ci { 311e5c31af7Sopenharmony_ci // Try to receive missing bytes. 312e5c31af7Sopenharmony_ci size_t curSize = hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE; 313e5c31af7Sopenharmony_ci size_t bytesToRecv = curSize-m_curMsgPos; 314e5c31af7Sopenharmony_ci size_t numRecv = 0; 315e5c31af7Sopenharmony_ci deSocketResult result = DE_SOCKETRESULT_LAST; 316e5c31af7Sopenharmony_ci 317e5c31af7Sopenharmony_ci if (m_curMsgBuf.size() < curSize) 318e5c31af7Sopenharmony_ci m_curMsgBuf.resize(curSize); 319e5c31af7Sopenharmony_ci 320e5c31af7Sopenharmony_ci result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv); 321e5c31af7Sopenharmony_ci 322e5c31af7Sopenharmony_ci if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) 323e5c31af7Sopenharmony_ci XE_FAIL("Connection closed"); 324e5c31af7Sopenharmony_ci else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) 325e5c31af7Sopenharmony_ci XE_FAIL("Connection terminated"); 326e5c31af7Sopenharmony_ci else if (result == DE_SOCKETRESULT_ERROR) 327e5c31af7Sopenharmony_ci XE_FAIL("Socket error"); 328e5c31af7Sopenharmony_ci else if (result == DE_SOCKETRESULT_WOULD_BLOCK) 329e5c31af7Sopenharmony_ci { 330e5c31af7Sopenharmony_ci // \note Socket should not be in non-blocking mode. 331e5c31af7Sopenharmony_ci DE_ASSERT(numRecv == 0); 332e5c31af7Sopenharmony_ci deYield(); 333e5c31af7Sopenharmony_ci } 334e5c31af7Sopenharmony_ci else 335e5c31af7Sopenharmony_ci { 336e5c31af7Sopenharmony_ci DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS); 337e5c31af7Sopenharmony_ci DE_ASSERT(numRecv <= bytesToRecv); 338e5c31af7Sopenharmony_ci m_curMsgPos += numRecv; 339e5c31af7Sopenharmony_ci // Continue receiving bytes / handle message in next iter. 340e5c31af7Sopenharmony_ci } 341e5c31af7Sopenharmony_ci } 342e5c31af7Sopenharmony_ci } 343e5c31af7Sopenharmony_ci } 344e5c31af7Sopenharmony_ci catch (const std::exception& e) 345e5c31af7Sopenharmony_ci { 346e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_ERROR, e.what()); 347e5c31af7Sopenharmony_ci } 348e5c31af7Sopenharmony_ci} 349e5c31af7Sopenharmony_ci 350e5c31af7Sopenharmony_civoid TcpIpRecvThread::stop (void) 351e5c31af7Sopenharmony_ci{ 352e5c31af7Sopenharmony_ci if (m_isRunning) 353e5c31af7Sopenharmony_ci { 354e5c31af7Sopenharmony_ci // \note Socket must be closed before terminating receive thread. 355e5c31af7Sopenharmony_ci XE_CHECK(!m_socket.isReceiveOpen()); 356e5c31af7Sopenharmony_ci 357e5c31af7Sopenharmony_ci join(); 358e5c31af7Sopenharmony_ci m_isRunning = false; 359e5c31af7Sopenharmony_ci } 360e5c31af7Sopenharmony_ci} 361e5c31af7Sopenharmony_ci 362e5c31af7Sopenharmony_civoid TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, size_t dataSize) 363e5c31af7Sopenharmony_ci{ 364e5c31af7Sopenharmony_ci switch (messageType) 365e5c31af7Sopenharmony_ci { 366e5c31af7Sopenharmony_ci case xs::MESSAGETYPE_KEEPALIVE: 367e5c31af7Sopenharmony_ci m_state.onKeepaliveReceived(); 368e5c31af7Sopenharmony_ci break; 369e5c31af7Sopenharmony_ci 370e5c31af7Sopenharmony_ci case xs::MESSAGETYPE_PROCESS_STARTED: 371e5c31af7Sopenharmony_ci XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message"); 372e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING); 373e5c31af7Sopenharmony_ci break; 374e5c31af7Sopenharmony_ci 375e5c31af7Sopenharmony_ci case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED: 376e5c31af7Sopenharmony_ci { 377e5c31af7Sopenharmony_ci xs::ProcessLaunchFailedMessage msg(data, dataSize); 378e5c31af7Sopenharmony_ci XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message"); 379e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str()); 380e5c31af7Sopenharmony_ci break; 381e5c31af7Sopenharmony_ci } 382e5c31af7Sopenharmony_ci 383e5c31af7Sopenharmony_ci case xs::MESSAGETYPE_PROCESS_FINISHED: 384e5c31af7Sopenharmony_ci { 385e5c31af7Sopenharmony_ci XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message"); 386e5c31af7Sopenharmony_ci xs::ProcessFinishedMessage msg(data, dataSize); 387e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED); 388e5c31af7Sopenharmony_ci DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code. 389e5c31af7Sopenharmony_ci break; 390e5c31af7Sopenharmony_ci } 391e5c31af7Sopenharmony_ci 392e5c31af7Sopenharmony_ci case xs::MESSAGETYPE_PROCESS_LOG_DATA: 393e5c31af7Sopenharmony_ci case xs::MESSAGETYPE_INFO: 394e5c31af7Sopenharmony_ci // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol. 395e5c31af7Sopenharmony_ci if (data[dataSize-1] == 0) 396e5c31af7Sopenharmony_ci dataSize -= 1; 397e5c31af7Sopenharmony_ci 398e5c31af7Sopenharmony_ci if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA) 399e5c31af7Sopenharmony_ci { 400e5c31af7Sopenharmony_ci XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message"); 401e5c31af7Sopenharmony_ci m_state.onTestLogData(&data[0], dataSize); 402e5c31af7Sopenharmony_ci } 403e5c31af7Sopenharmony_ci else 404e5c31af7Sopenharmony_ci m_state.onInfoLogData(&data[0], dataSize); 405e5c31af7Sopenharmony_ci break; 406e5c31af7Sopenharmony_ci 407e5c31af7Sopenharmony_ci default: 408e5c31af7Sopenharmony_ci XE_FAIL("Unknown message"); 409e5c31af7Sopenharmony_ci } 410e5c31af7Sopenharmony_ci} 411e5c31af7Sopenharmony_ci 412e5c31af7Sopenharmony_ci// TcpIpLink 413e5c31af7Sopenharmony_ci 414e5c31af7Sopenharmony_ciTcpIpLink::TcpIpLink (void) 415e5c31af7Sopenharmony_ci : m_state (COMMLINKSTATE_ERROR, "Not connected") 416e5c31af7Sopenharmony_ci , m_sendThread (m_socket, m_state) 417e5c31af7Sopenharmony_ci , m_recvThread (m_socket, m_state) 418e5c31af7Sopenharmony_ci , m_keepaliveTimer (DE_NULL) 419e5c31af7Sopenharmony_ci{ 420e5c31af7Sopenharmony_ci m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this); 421e5c31af7Sopenharmony_ci XE_CHECK(m_keepaliveTimer); 422e5c31af7Sopenharmony_ci} 423e5c31af7Sopenharmony_ci 424e5c31af7Sopenharmony_ciTcpIpLink::~TcpIpLink (void) 425e5c31af7Sopenharmony_ci{ 426e5c31af7Sopenharmony_ci try 427e5c31af7Sopenharmony_ci { 428e5c31af7Sopenharmony_ci closeConnection(); 429e5c31af7Sopenharmony_ci } 430e5c31af7Sopenharmony_ci catch (...) 431e5c31af7Sopenharmony_ci { 432e5c31af7Sopenharmony_ci // Can't do much except to ignore error. 433e5c31af7Sopenharmony_ci } 434e5c31af7Sopenharmony_ci deTimer_destroy(m_keepaliveTimer); 435e5c31af7Sopenharmony_ci} 436e5c31af7Sopenharmony_ci 437e5c31af7Sopenharmony_civoid TcpIpLink::closeConnection (void) 438e5c31af7Sopenharmony_ci{ 439e5c31af7Sopenharmony_ci { 440e5c31af7Sopenharmony_ci deSocketState state = m_socket.getState(); 441e5c31af7Sopenharmony_ci if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED) 442e5c31af7Sopenharmony_ci m_socket.shutdown(); 443e5c31af7Sopenharmony_ci } 444e5c31af7Sopenharmony_ci 445e5c31af7Sopenharmony_ci if (deTimer_isActive(m_keepaliveTimer)) 446e5c31af7Sopenharmony_ci deTimer_disable(m_keepaliveTimer); 447e5c31af7Sopenharmony_ci 448e5c31af7Sopenharmony_ci if (m_sendThread.isRunning()) 449e5c31af7Sopenharmony_ci m_sendThread.stop(); 450e5c31af7Sopenharmony_ci 451e5c31af7Sopenharmony_ci if (m_recvThread.isRunning()) 452e5c31af7Sopenharmony_ci m_recvThread.stop(); 453e5c31af7Sopenharmony_ci 454e5c31af7Sopenharmony_ci if (m_socket.getState() != DE_SOCKETSTATE_CLOSED) 455e5c31af7Sopenharmony_ci m_socket.close(); 456e5c31af7Sopenharmony_ci} 457e5c31af7Sopenharmony_ci 458e5c31af7Sopenharmony_civoid TcpIpLink::connect (const de::SocketAddress& address) 459e5c31af7Sopenharmony_ci{ 460e5c31af7Sopenharmony_ci XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED); 461e5c31af7Sopenharmony_ci XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR); 462e5c31af7Sopenharmony_ci XE_CHECK(!m_sendThread.isRunning()); 463e5c31af7Sopenharmony_ci XE_CHECK(!m_recvThread.isRunning()); 464e5c31af7Sopenharmony_ci 465e5c31af7Sopenharmony_ci m_socket.connect(address); 466e5c31af7Sopenharmony_ci 467e5c31af7Sopenharmony_ci try 468e5c31af7Sopenharmony_ci { 469e5c31af7Sopenharmony_ci // Clear error and set state to ready. 470e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_READY, ""); 471e5c31af7Sopenharmony_ci m_state.onKeepaliveReceived(); 472e5c31af7Sopenharmony_ci 473e5c31af7Sopenharmony_ci // Launch threads. 474e5c31af7Sopenharmony_ci m_sendThread.start(); 475e5c31af7Sopenharmony_ci m_recvThread.start(); 476e5c31af7Sopenharmony_ci 477e5c31af7Sopenharmony_ci XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL)); 478e5c31af7Sopenharmony_ci } 479e5c31af7Sopenharmony_ci catch (const std::exception& e) 480e5c31af7Sopenharmony_ci { 481e5c31af7Sopenharmony_ci closeConnection(); 482e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_ERROR, e.what()); 483e5c31af7Sopenharmony_ci throw; 484e5c31af7Sopenharmony_ci } 485e5c31af7Sopenharmony_ci} 486e5c31af7Sopenharmony_ci 487e5c31af7Sopenharmony_civoid TcpIpLink::disconnect (void) 488e5c31af7Sopenharmony_ci{ 489e5c31af7Sopenharmony_ci try 490e5c31af7Sopenharmony_ci { 491e5c31af7Sopenharmony_ci closeConnection(); 492e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_ERROR, "Not connected"); 493e5c31af7Sopenharmony_ci } 494e5c31af7Sopenharmony_ci catch (const std::exception& e) 495e5c31af7Sopenharmony_ci { 496e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_ERROR, e.what()); 497e5c31af7Sopenharmony_ci } 498e5c31af7Sopenharmony_ci} 499e5c31af7Sopenharmony_ci 500e5c31af7Sopenharmony_civoid TcpIpLink::reset (void) 501e5c31af7Sopenharmony_ci{ 502e5c31af7Sopenharmony_ci // \note Just clears error state if we are connected. 503e5c31af7Sopenharmony_ci if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED) 504e5c31af7Sopenharmony_ci { 505e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_READY, ""); 506e5c31af7Sopenharmony_ci 507e5c31af7Sopenharmony_ci // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers? 508e5c31af7Sopenharmony_ci } 509e5c31af7Sopenharmony_ci else 510e5c31af7Sopenharmony_ci disconnect(); // Abnormal state/usage. Disconnect socket. 511e5c31af7Sopenharmony_ci} 512e5c31af7Sopenharmony_ci 513e5c31af7Sopenharmony_civoid TcpIpLink::keepaliveTimerCallback (void* ptr) 514e5c31af7Sopenharmony_ci{ 515e5c31af7Sopenharmony_ci TcpIpLink* link = static_cast<TcpIpLink*>(ptr); 516e5c31af7Sopenharmony_ci deUint64 lastKeepalive = link->m_state.getLastKeepaliveRecevied(); 517e5c31af7Sopenharmony_ci deUint64 curTime = deGetMicroseconds(); 518e5c31af7Sopenharmony_ci 519e5c31af7Sopenharmony_ci // Check for timeout. 520e5c31af7Sopenharmony_ci if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000) 521e5c31af7Sopenharmony_ci link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout"); 522e5c31af7Sopenharmony_ci 523e5c31af7Sopenharmony_ci // Enqueue new keepalive. 524e5c31af7Sopenharmony_ci try 525e5c31af7Sopenharmony_ci { 526e5c31af7Sopenharmony_ci writeKeepalive(link->m_sendThread.getBuffer()); 527e5c31af7Sopenharmony_ci } 528e5c31af7Sopenharmony_ci catch (const de::BlockBuffer<deUint8>::CanceledException&) 529e5c31af7Sopenharmony_ci { 530e5c31af7Sopenharmony_ci // Ignore. Can happen in connection teardown. 531e5c31af7Sopenharmony_ci } 532e5c31af7Sopenharmony_ci} 533e5c31af7Sopenharmony_ci 534e5c31af7Sopenharmony_ciCommLinkState TcpIpLink::getState (void) const 535e5c31af7Sopenharmony_ci{ 536e5c31af7Sopenharmony_ci return m_state.getState(); 537e5c31af7Sopenharmony_ci} 538e5c31af7Sopenharmony_ci 539e5c31af7Sopenharmony_ciCommLinkState TcpIpLink::getState (std::string& message) const 540e5c31af7Sopenharmony_ci{ 541e5c31af7Sopenharmony_ci return m_state.getState(message); 542e5c31af7Sopenharmony_ci} 543e5c31af7Sopenharmony_ci 544e5c31af7Sopenharmony_civoid TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr) 545e5c31af7Sopenharmony_ci{ 546e5c31af7Sopenharmony_ci m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr); 547e5c31af7Sopenharmony_ci} 548e5c31af7Sopenharmony_ci 549e5c31af7Sopenharmony_civoid TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList) 550e5c31af7Sopenharmony_ci{ 551e5c31af7Sopenharmony_ci XE_CHECK(m_state.getState() == COMMLINKSTATE_READY); 552e5c31af7Sopenharmony_ci 553e5c31af7Sopenharmony_ci m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING); 554e5c31af7Sopenharmony_ci writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList); 555e5c31af7Sopenharmony_ci} 556e5c31af7Sopenharmony_ci 557e5c31af7Sopenharmony_civoid TcpIpLink::stopTestProcess (void) 558e5c31af7Sopenharmony_ci{ 559e5c31af7Sopenharmony_ci XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR); 560e5c31af7Sopenharmony_ci writeStopExecution(m_sendThread.getBuffer()); 561e5c31af7Sopenharmony_ci} 562e5c31af7Sopenharmony_ci 563e5c31af7Sopenharmony_ci} // xe 564