1/* 2 * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "span_join.h" 17#include <vector> 18#include "string_help.h" 19 20namespace SysTuning { 21namespace TraceStreamer { 22 23const std::string TS_COLUMN_NAME = "ts"; 24const std::string DUR_COLUMN_NAME = "dur"; 25const uint32_t RESULT = 2; 26constexpr int32_t MINSIZE = 5; 27constexpr int32_t MAXSIZE = 1024; 28constexpr int32_t NEXT_NUMBER = 1; 29constexpr int32_t TSANDDUR_COLUMN = 2; 30constexpr int32_t PARTITIONED_COUNT = 3; 31 32enum class Index : int32_t { TS, DUR, PARTITION }; 33 34SpanJoin::SpanJoin(const TraceDataCache *dataCache) : TableBase(dataCache) 35{ 36 tableColumn_ = {}; 37 tablePriKey_ = {}; 38 tableFirstDesc_ = {}; 39 tableSecondDesc_ = {}; 40} 41 42void SpanJoin::Init(int32_t argc, const char *const *argv) 43{ 44 if (argc < MINSIZE) { 45 return; 46 } 47 // Parse the fields of the two tables separately 48 TableParse tableFirstParse; 49 Parse(std::string(reinterpret_cast<const char *>(argv[3])), tableFirstParse); 50 TableParse tableSecondParse; 51 Parse(std::string(reinterpret_cast<const char *>(argv[4])), tableSecondParse); 52 53 // you must ensure that the two partitions exist and are the same when using 54 if (tableFirstDesc_.partition != tableSecondDesc_.partition) { 55 return; 56 } 57 isSamepartitioning_ = true; 58 GetTableField(tableFirstParse, tableFirstDesc_); 59 GetTableField(tableSecondParse, tableSecondDesc_); 60 tableColumn_.emplace_back(TS_COLUMN_NAME, "INTEGER"); 61 tableColumn_.emplace_back(DUR_COLUMN_NAME, "INTEGER"); 62 tableColumn_.emplace_back(tableFirstDesc_.partition, "INTEGER"); 63 CreateCols(tableFirstDesc_, tableColumn_); 64 CreateCols(tableSecondDesc_, tableColumn_); 65 std::vector<std::string> primaryKeys = {"ts"}; 66 primaryKeys.push_back(tableFirstDesc_.partition); 67 tablePriKey_ = primaryKeys; 68 return; 69} 70 71void SpanJoin::CreateCols(TableDesc &tableDesc, std::vector<TableBase::ColumnInfo> &cols) 72{ 73 for (int32_t i = 0; i < tableDesc.cols.size(); i++) { 74 auto &n = tableDesc.cols.at(i).name_; 75 if (IsTsOrDurCol(n)) { 76 continue; 77 } 78 auto columnInfo = &mTableColumnInfo_[cols.size()]; 79 columnInfo->tableDesc = &tableDesc; 80 columnInfo->colIdx = i; 81 if (!DeduplicationForColumn(n, cols)) { 82 continue; 83 } 84 cols.emplace_back(n, tableDesc.cols.at(i).type_); 85 } 86} 87 88bool SpanJoin::DeduplicationForColumn(const std::string &name, std::vector<ColumnInfo> &cols) 89{ 90 for (size_t i = 0; i < cols.size(); i++) { 91 if (name == cols.at(i).name_) { 92 return false; 93 } 94 } 95 return true; 96} 97 98void SpanJoin::Parse(const std::string &tablePartition, TableParse &tableParse) 99{ 100 std::vector<std::string> result = base::SplitStringToVec(tablePartition, " "); 101 if (result.size() < PARTITIONED_COUNT) { 102 TS_LOGW("span_join sql is invalid!"); 103 } 104 tableParse.name = result.at(0); 105 if (0 != strcasecmp(result.at(1).c_str(), "PARTITIONED")) { 106 TS_LOGW("sql has not PARTITIONED"); 107 return; 108 } 109 tableParse.partitionCol = result.at(RESULT); 110 return; 111} 112 113bool SpanJoin::IsTsOrDurCol(const std::string &name) 114{ 115 if (name == TS_COLUMN_NAME || name == DUR_COLUMN_NAME) { 116 return true; 117 } 118 return false; 119} 120 121void SpanJoin::GetTableField(const TableParse &tableParse, TableDesc &tableDesc) 122{ 123 std::vector<TableBase::ColumnInfo> cols; 124 GetColumns(dataCache_, tableParse.name, cols); 125 int32_t tsDurCount = 0; 126 for (int32_t i = 0; i < cols.size(); i++) { 127 auto col = cols.at(i); 128 if (IsTsOrDurCol(col.name_)) { 129 tsDurCount++; 130 } 131 if (col.name_ == TS_COLUMN_NAME) { 132 tableDesc.tsIdx = i; 133 } else if (col.name_ == DUR_COLUMN_NAME) { 134 tableDesc.durIdx = i; 135 } else if (col.name_ == tableParse.partitionCol) { 136 tableDesc.partitionIdx = i; 137 } 138 } 139 if (tsDurCount != TSANDDUR_COLUMN) { 140 return; 141 } 142 tableDesc.name = tableParse.name; 143 tableDesc.partition = tableParse.partitionCol; 144 tableDesc.cols = std::move(cols); 145 return; 146} 147 148void SpanJoin::GetColumns(const TraceDataCache *dataCache, 149 const std::string &tableName, 150 std::vector<TableBase::ColumnInfo> &columns) 151{ 152 char sql[MAXSIZE]; 153 std::string querySql = "SELECT name, type from PRAGMA_table_info(\"%s\")"; 154 int32_t n = snprintf_s(sql, sizeof(sql), sizeof(sql), querySql.c_str(), tableName.c_str()); 155 if (n < 0 || n >= sizeof(sql)) { 156 TS_LOGE(" Failed to format SQL string "); 157 } 158 sqlite3_stmt *stmt = nullptr; 159 int32_t ret = sqlite3_prepare_v2(dataCache->db_, sql, n, &stmt, nullptr); 160 while (!ret) { 161 int32_t err = sqlite3_step(stmt); 162 if (err == SQLITE_ROW) { 163 columns.emplace_back((reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0))), 164 reinterpret_cast<const char *>(sqlite3_column_text(stmt, 1))); 165 continue; 166 } 167 if (err == SQLITE_DONE) { 168 break; 169 } 170 ret = err; 171 } 172 return; 173} 174 175SpanJoin::CaclSpan::CaclSpan(TableBase *tableBase, const TableDesc *tableDesc, sqlite3 *db) 176 : desc_(tableDesc), db_(db), table_(reinterpret_cast<SpanJoin *>(tableBase)) 177{ 178} 179 180SpanJoin::CaclSpan::~CaclSpan() = default; 181 182int32_t SpanJoin::CaclSpan::InitQuerySql(sqlite3_value **argv) 183{ 184 sqlQuery_ = GetSqlQuery(); 185 bool status = IsQueryNext(); 186 if (!status) { 187 return SQLITE_ERROR; 188 } 189 return SQLITE_OK; 190} 191 192std::string SpanJoin::CaclSpan::GetSqlQuery() 193{ 194 std::vector<std::string> columnNames; 195 for (int32_t i = 0; i < desc_->cols.size(); i++) { 196 columnNames.push_back(desc_->cols.at(i).name_); 197 } 198 auto str = GetMergeColumns(columnNames); 199 std::string sql = "SELECT " + str + " FROM " + desc_->name + " ORDER BY " + desc_->partition + ", " + "ts;"; 200 return sql; 201} 202 203void SpanJoin::CaclSpan::setResult(sqlite3_context *context, size_t index) const 204{ 205 if (partitionState_ != PartitionState::TS_REAL) { 206 sqlite3_result_null(context); 207 return; 208 } 209 int32_t sqliteType = sqlite3_column_type(stmt_, index); 210 if (sqliteType == SQLITE_TEXT) { 211 sqlite3_result_text(context, reinterpret_cast<const char *>(sqlite3_column_int64(stmt_, index)), -1, 212 reinterpret_cast<sqlite3_destructor_type>(-1)); 213 } else if (sqliteType == SQLITE_INTEGER) { 214 sqlite3_result_int64(context, sqlite3_column_int64(stmt_, index)); 215 } else if (sqliteType == SQLITE_FLOAT) { 216 sqlite3_result_double(context, sqlite3_column_double(stmt_, index)); 217 } 218} 219 220bool SpanJoin::CaclSpan::GetCursorNext() 221{ 222 int32_t res; 223 int32_t rowType; 224 do { 225 res = sqlite3_step(stmt_); 226 rowType = sqlite3_column_type(stmt_, desc_->partitionIdx); 227 } while (res == SQLITE_ROW && rowType == SQLITE_NULL); 228 if (res != SQLITE_ROW) { 229 isEof_ = true; 230 } else { 231 isEof_ = false; 232 } 233 234 return res == SQLITE_ROW || res == SQLITE_DONE; 235} 236 237void SpanJoin::CaclSpan::Next() 238{ 239 GetNextState(); 240 SearchNextslice(); 241} 242 243bool SpanJoin::CaclSpan::IsQueryNext() 244{ 245 int32_t res = sqlite3_prepare_v2(db_, sqlQuery_.c_str(), static_cast<int32_t>(sqlQuery_.size()), &stmt_, nullptr); 246 isEof_ = res != SQLITE_OK; 247 if (res != SQLITE_OK) { 248 return true; 249 } 250 auto status = GetCursorNext(); 251 if (!status) { 252 return false; 253 } 254 missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx)); 255 status = SearchNextslice(); 256 return status; 257} 258 259bool SpanJoin::CaclSpan::SearchNextslice() 260{ 261 while (partitionState_ != PartitionState::TS_REAL) { 262 bool status = GetNextState(); 263 if (!status) { 264 return false; 265 } 266 } 267 return true; 268} 269 270bool SpanJoin::CaclSpan::GetNextState() 271{ 272 switch (partitionState_) { 273 case PartitionState::TS_REAL: { 274 GetCursorNext(); 275 partitionState_ = PartitionState::TS_PARTITION; 276 ts_ = endTs_; 277 if (isEof_ || partition_ != sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx))) { 278 endTs_ = std::numeric_limits<int64_t>::max(); 279 } else { 280 endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx)); 281 } 282 return true; 283 } 284 case PartitionState::TS_PARTITION: { 285 if (endTs_ == std::numeric_limits<int64_t>::max()) { 286 partitionState_ = PartitionState::TS_MISSING; 287 if (isEof_) { 288 missPartitionEnd_ = std::numeric_limits<int32_t>::max(); 289 } else { 290 missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx)); 291 } 292 missPartitionStart_ = partition_ + NEXT_NUMBER; 293 ts_ = 0; 294 } else { 295 partitionState_ = PartitionState::TS_REAL; 296 ts_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx)); 297 endTs_ = ts_ + sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->durIdx)); 298 } 299 return true; 300 } 301 case PartitionState::TS_MISSING: { 302 if (missPartitionEnd_ == std::numeric_limits<int32_t>::max()) { 303 partitionState_ = PartitionState::TS_EOF; 304 } else { 305 partitionState_ = PartitionState::TS_PARTITION; 306 ts_ = 0; 307 endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx)); 308 partition_ = missPartitionEnd_; 309 } 310 return true; 311 } 312 default: 313 return false; 314 } 315} 316 317std::string SpanJoin::CaclSpan::GetMergeColumns(std::vector<std::string> &columns) 318{ 319 std::string str; 320 int32_t size = columns.size(); 321 for (int32_t i = 0; i < size; i++) { 322 if (i == size - 1) { 323 str += columns.at(i); 324 } else { 325 str += columns.at(i) + ", "; 326 } 327 } 328 return str; 329} 330 331int64_t SpanJoin::CaclSpan::GetPatitonForMiss() 332{ 333 return partitionState_ == PartitionState::TS_MISSING ? missPartitionEnd_ - NEXT_NUMBER : partition_; 334} 335 336std::unique_ptr<TableBase::Cursor> SpanJoin::CreateCursor() 337{ 338 return std::make_unique<Cursor>(dataCache_, this); 339} 340 341SpanJoin::Cursor::Cursor(const TraceDataCache *dataCache, SpanJoin *table) 342 : TableBase::Cursor(dataCache, table, 0), 343 tableFirst_(table, &table->tableFirstDesc_, dataCache_->db_), 344 tableSecond_(table, &table->tableSecondDesc_, dataCache_->db_), 345 spanTable_(table) 346{ 347} 348 349int32_t SpanJoin::Cursor::Filter(const FilterConstraints &fc, sqlite3_value **argv) 350{ 351 tableFirst_.InitQuerySql(argv); 352 tableSecond_.InitQuerySql(argv); 353 auto status = IsFindSpan(); 354 if (!status) { 355 return SQLITE_ERROR; 356 } 357 return SQLITE_OK; 358} 359 360bool SpanJoin::Cursor::CaclOverLap() 361{ 362 if (tableFirst_.ts_ >= tableSecond_.ts_) { 363 if ((tableFirst_.partitionState_ == PartitionState::TS_REAL && 364 tableSecond_.partitionState_ == PartitionState::TS_REAL) || 365 tableFirst_.ts_ < tableSecond_.endTs_) { 366 return true; 367 } 368 } else if (tableFirst_.ts_ <= tableSecond_.ts_ && tableSecond_.ts_ < tableFirst_.endTs_) { 369 return true; 370 } 371 return false; 372} 373 374bool SpanJoin::Cursor::IsFindSpan() 375{ 376 for (;;) { 377 if (tableFirst_.isEof_ || tableSecond_.isEof_) { 378 break; 379 } 380 queryNext_ = FindQueryResult(); 381 if (CaclOverLap()) { 382 break; 383 } 384 queryNext_->Next(); 385 } 386 return true; 387} 388 389SpanJoin::CaclSpan *SpanJoin::Cursor::FindQueryResult() 390{ 391 if (!spanTable_->isSamepartitioning_) { 392 return nullptr; 393 } 394 395 auto tableFirstResult = std::make_tuple(tableFirst_.GetPatitonForMiss(), tableFirst_.endTs_, 396 tableFirst_.partitionState_ == PartitionState::TS_REAL ? true : false); 397 auto tableSecondResult = std::make_tuple(tableSecond_.GetPatitonForMiss(), tableSecond_.endTs_, 398 tableSecond_.partitionState_ == PartitionState::TS_REAL ? true : false); 399 if (tableFirstResult < tableSecondResult) { 400 return &tableFirst_; 401 } 402 return &tableSecond_; 403} 404 405int32_t SpanJoin::Cursor::Column(int32_t column) const 406{ 407 switch (static_cast<Index>(column)) { 408 case Index::TS: { 409 sqlite3_result_int64(context_, static_cast<sqlite3_int64>(std::max(tableFirst_.ts_, tableSecond_.ts_))); 410 break; 411 } 412 case Index::DUR: { 413 sqlite3_result_int64(context_, 414 static_cast<sqlite3_int64>(std::min(tableFirst_.endTs_, tableSecond_.endTs_) - 415 std::max(tableFirst_.ts_, tableSecond_.ts_))); 416 break; 417 } 418 case Index::PARTITION: { 419 auto partResult = tableFirst_.partitionState_ == PartitionState::TS_REAL ? tableFirst_.partition_ 420 : tableSecond_.partition_; 421 sqlite3_result_int64(context_, static_cast<sqlite3_int64>(partResult)); 422 break; 423 } 424 default: { 425 const auto ColumnInfo = spanTable_->mTableColumnInfo_[column]; 426 if (ColumnInfo.tableDesc == tableFirst_.desc_) { 427 tableFirst_.setResult(context_, ColumnInfo.colIdx); 428 } else { 429 tableSecond_.setResult(context_, ColumnInfo.colIdx); 430 } 431 } 432 } 433 return SQLITE_OK; 434} 435 436} // namespace TraceStreamer 437} // namespace SysTuning 438