1/*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "span_join.h"
17#include <vector>
18#include "string_help.h"
19
20namespace SysTuning {
21namespace TraceStreamer {
22
23const std::string TS_COLUMN_NAME = "ts";
24const std::string DUR_COLUMN_NAME = "dur";
25const uint32_t RESULT = 2;
26constexpr int32_t MINSIZE = 5;
27constexpr int32_t MAXSIZE = 1024;
28constexpr int32_t NEXT_NUMBER = 1;
29constexpr int32_t TSANDDUR_COLUMN = 2;
30constexpr int32_t PARTITIONED_COUNT = 3;
31
32enum class Index : int32_t { TS, DUR, PARTITION };
33
34SpanJoin::SpanJoin(const TraceDataCache *dataCache) : TableBase(dataCache)
35{
36    tableColumn_ = {};
37    tablePriKey_ = {};
38    tableFirstDesc_ = {};
39    tableSecondDesc_ = {};
40}
41
42void SpanJoin::Init(int32_t argc, const char *const *argv)
43{
44    if (argc < MINSIZE) {
45        return;
46    }
47    // Parse the fields of the two tables separately
48    TableParse tableFirstParse;
49    Parse(std::string(reinterpret_cast<const char *>(argv[3])), tableFirstParse);
50    TableParse tableSecondParse;
51    Parse(std::string(reinterpret_cast<const char *>(argv[4])), tableSecondParse);
52
53    // you must ensure that the two partitions exist and are the same when using
54    if (tableFirstDesc_.partition != tableSecondDesc_.partition) {
55        return;
56    }
57    isSamepartitioning_ = true;
58    GetTableField(tableFirstParse, tableFirstDesc_);
59    GetTableField(tableSecondParse, tableSecondDesc_);
60    tableColumn_.emplace_back(TS_COLUMN_NAME, "INTEGER");
61    tableColumn_.emplace_back(DUR_COLUMN_NAME, "INTEGER");
62    tableColumn_.emplace_back(tableFirstDesc_.partition, "INTEGER");
63    CreateCols(tableFirstDesc_, tableColumn_);
64    CreateCols(tableSecondDesc_, tableColumn_);
65    std::vector<std::string> primaryKeys = {"ts"};
66    primaryKeys.push_back(tableFirstDesc_.partition);
67    tablePriKey_ = primaryKeys;
68    return;
69}
70
71void SpanJoin::CreateCols(TableDesc &tableDesc, std::vector<TableBase::ColumnInfo> &cols)
72{
73    for (int32_t i = 0; i < tableDesc.cols.size(); i++) {
74        auto &n = tableDesc.cols.at(i).name_;
75        if (IsTsOrDurCol(n)) {
76            continue;
77        }
78        auto columnInfo = &mTableColumnInfo_[cols.size()];
79        columnInfo->tableDesc = &tableDesc;
80        columnInfo->colIdx = i;
81        if (!DeduplicationForColumn(n, cols)) {
82            continue;
83        }
84        cols.emplace_back(n, tableDesc.cols.at(i).type_);
85    }
86}
87
88bool SpanJoin::DeduplicationForColumn(const std::string &name, std::vector<ColumnInfo> &cols)
89{
90    for (size_t i = 0; i < cols.size(); i++) {
91        if (name == cols.at(i).name_) {
92            return false;
93        }
94    }
95    return true;
96}
97
98void SpanJoin::Parse(const std::string &tablePartition, TableParse &tableParse)
99{
100    std::vector<std::string> result = base::SplitStringToVec(tablePartition, " ");
101    if (result.size() < PARTITIONED_COUNT) {
102        TS_LOGW("span_join sql is invalid!");
103    }
104    tableParse.name = result.at(0);
105    if (0 != strcasecmp(result.at(1).c_str(), "PARTITIONED")) {
106        TS_LOGW("sql has not PARTITIONED");
107        return;
108    }
109    tableParse.partitionCol = result.at(RESULT);
110    return;
111}
112
113bool SpanJoin::IsTsOrDurCol(const std::string &name)
114{
115    if (name == TS_COLUMN_NAME || name == DUR_COLUMN_NAME) {
116        return true;
117    }
118    return false;
119}
120
121void SpanJoin::GetTableField(const TableParse &tableParse, TableDesc &tableDesc)
122{
123    std::vector<TableBase::ColumnInfo> cols;
124    GetColumns(dataCache_, tableParse.name, cols);
125    int32_t tsDurCount = 0;
126    for (int32_t i = 0; i < cols.size(); i++) {
127        auto col = cols.at(i);
128        if (IsTsOrDurCol(col.name_)) {
129            tsDurCount++;
130        }
131        if (col.name_ == TS_COLUMN_NAME) {
132            tableDesc.tsIdx = i;
133        } else if (col.name_ == DUR_COLUMN_NAME) {
134            tableDesc.durIdx = i;
135        } else if (col.name_ == tableParse.partitionCol) {
136            tableDesc.partitionIdx = i;
137        }
138    }
139    if (tsDurCount != TSANDDUR_COLUMN) {
140        return;
141    }
142    tableDesc.name = tableParse.name;
143    tableDesc.partition = tableParse.partitionCol;
144    tableDesc.cols = std::move(cols);
145    return;
146}
147
148void SpanJoin::GetColumns(const TraceDataCache *dataCache,
149                          const std::string &tableName,
150                          std::vector<TableBase::ColumnInfo> &columns)
151{
152    char sql[MAXSIZE];
153    std::string querySql = "SELECT name, type from PRAGMA_table_info(\"%s\")";
154    int32_t n = snprintf_s(sql, sizeof(sql), sizeof(sql), querySql.c_str(), tableName.c_str());
155    if (n < 0 || n >= sizeof(sql)) {
156        TS_LOGE(" Failed to format SQL string ");
157    }
158    sqlite3_stmt *stmt = nullptr;
159    int32_t ret = sqlite3_prepare_v2(dataCache->db_, sql, n, &stmt, nullptr);
160    while (!ret) {
161        int32_t err = sqlite3_step(stmt);
162        if (err == SQLITE_ROW) {
163            columns.emplace_back((reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0))),
164                                 reinterpret_cast<const char *>(sqlite3_column_text(stmt, 1)));
165            continue;
166        }
167        if (err == SQLITE_DONE) {
168            break;
169        }
170        ret = err;
171    }
172    return;
173}
174
175SpanJoin::CaclSpan::CaclSpan(TableBase *tableBase, const TableDesc *tableDesc, sqlite3 *db)
176    : desc_(tableDesc), db_(db), table_(reinterpret_cast<SpanJoin *>(tableBase))
177{
178}
179
180SpanJoin::CaclSpan::~CaclSpan() = default;
181
182int32_t SpanJoin::CaclSpan::InitQuerySql(sqlite3_value **argv)
183{
184    sqlQuery_ = GetSqlQuery();
185    bool status = IsQueryNext();
186    if (!status) {
187        return SQLITE_ERROR;
188    }
189    return SQLITE_OK;
190}
191
192std::string SpanJoin::CaclSpan::GetSqlQuery()
193{
194    std::vector<std::string> columnNames;
195    for (int32_t i = 0; i < desc_->cols.size(); i++) {
196        columnNames.push_back(desc_->cols.at(i).name_);
197    }
198    auto str = GetMergeColumns(columnNames);
199    std::string sql = "SELECT " + str + " FROM " + desc_->name + " ORDER BY " + desc_->partition + ", " + "ts;";
200    return sql;
201}
202
203void SpanJoin::CaclSpan::setResult(sqlite3_context *context, size_t index) const
204{
205    if (partitionState_ != PartitionState::TS_REAL) {
206        sqlite3_result_null(context);
207        return;
208    }
209    int32_t sqliteType = sqlite3_column_type(stmt_, index);
210    if (sqliteType == SQLITE_TEXT) {
211        sqlite3_result_text(context, reinterpret_cast<const char *>(sqlite3_column_int64(stmt_, index)), -1,
212                            reinterpret_cast<sqlite3_destructor_type>(-1));
213    } else if (sqliteType == SQLITE_INTEGER) {
214        sqlite3_result_int64(context, sqlite3_column_int64(stmt_, index));
215    } else if (sqliteType == SQLITE_FLOAT) {
216        sqlite3_result_double(context, sqlite3_column_double(stmt_, index));
217    }
218}
219
220bool SpanJoin::CaclSpan::GetCursorNext()
221{
222    int32_t res;
223    int32_t rowType;
224    do {
225        res = sqlite3_step(stmt_);
226        rowType = sqlite3_column_type(stmt_, desc_->partitionIdx);
227    } while (res == SQLITE_ROW && rowType == SQLITE_NULL);
228    if (res != SQLITE_ROW) {
229        isEof_ = true;
230    } else {
231        isEof_ = false;
232    }
233
234    return res == SQLITE_ROW || res == SQLITE_DONE;
235}
236
237void SpanJoin::CaclSpan::Next()
238{
239    GetNextState();
240    SearchNextslice();
241}
242
243bool SpanJoin::CaclSpan::IsQueryNext()
244{
245    int32_t res = sqlite3_prepare_v2(db_, sqlQuery_.c_str(), static_cast<int32_t>(sqlQuery_.size()), &stmt_, nullptr);
246    isEof_ = res != SQLITE_OK;
247    if (res != SQLITE_OK) {
248        return true;
249    }
250    auto status = GetCursorNext();
251    if (!status) {
252        return false;
253    }
254    missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx));
255    status = SearchNextslice();
256    return status;
257}
258
259bool SpanJoin::CaclSpan::SearchNextslice()
260{
261    while (partitionState_ != PartitionState::TS_REAL) {
262        bool status = GetNextState();
263        if (!status) {
264            return false;
265        }
266    }
267    return true;
268}
269
270bool SpanJoin::CaclSpan::GetNextState()
271{
272    switch (partitionState_) {
273        case PartitionState::TS_REAL: {
274            GetCursorNext();
275            partitionState_ = PartitionState::TS_PARTITION;
276            ts_ = endTs_;
277            if (isEof_ || partition_ != sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx))) {
278                endTs_ = std::numeric_limits<int64_t>::max();
279            } else {
280                endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
281            }
282            return true;
283        }
284        case PartitionState::TS_PARTITION: {
285            if (endTs_ == std::numeric_limits<int64_t>::max()) {
286                partitionState_ = PartitionState::TS_MISSING;
287                if (isEof_) {
288                    missPartitionEnd_ = std::numeric_limits<int32_t>::max();
289                } else {
290                    missPartitionEnd_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->partitionIdx));
291                }
292                missPartitionStart_ = partition_ + NEXT_NUMBER;
293                ts_ = 0;
294            } else {
295                partitionState_ = PartitionState::TS_REAL;
296                ts_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
297                endTs_ = ts_ + sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->durIdx));
298            }
299            return true;
300        }
301        case PartitionState::TS_MISSING: {
302            if (missPartitionEnd_ == std::numeric_limits<int32_t>::max()) {
303                partitionState_ = PartitionState::TS_EOF;
304            } else {
305                partitionState_ = PartitionState::TS_PARTITION;
306                ts_ = 0;
307                endTs_ = sqlite3_column_int64(stmt_, static_cast<int32_t>(desc_->tsIdx));
308                partition_ = missPartitionEnd_;
309            }
310            return true;
311        }
312        default:
313            return false;
314    }
315}
316
317std::string SpanJoin::CaclSpan::GetMergeColumns(std::vector<std::string> &columns)
318{
319    std::string str;
320    int32_t size = columns.size();
321    for (int32_t i = 0; i < size; i++) {
322        if (i == size - 1) {
323            str += columns.at(i);
324        } else {
325            str += columns.at(i) + ", ";
326        }
327    }
328    return str;
329}
330
331int64_t SpanJoin::CaclSpan::GetPatitonForMiss()
332{
333    return partitionState_ == PartitionState::TS_MISSING ? missPartitionEnd_ - NEXT_NUMBER : partition_;
334}
335
336std::unique_ptr<TableBase::Cursor> SpanJoin::CreateCursor()
337{
338    return std::make_unique<Cursor>(dataCache_, this);
339}
340
341SpanJoin::Cursor::Cursor(const TraceDataCache *dataCache, SpanJoin *table)
342    : TableBase::Cursor(dataCache, table, 0),
343      tableFirst_(table, &table->tableFirstDesc_, dataCache_->db_),
344      tableSecond_(table, &table->tableSecondDesc_, dataCache_->db_),
345      spanTable_(table)
346{
347}
348
349int32_t SpanJoin::Cursor::Filter(const FilterConstraints &fc, sqlite3_value **argv)
350{
351    tableFirst_.InitQuerySql(argv);
352    tableSecond_.InitQuerySql(argv);
353    auto status = IsFindSpan();
354    if (!status) {
355        return SQLITE_ERROR;
356    }
357    return SQLITE_OK;
358}
359
360bool SpanJoin::Cursor::CaclOverLap()
361{
362    if (tableFirst_.ts_ >= tableSecond_.ts_) {
363        if ((tableFirst_.partitionState_ == PartitionState::TS_REAL &&
364             tableSecond_.partitionState_ == PartitionState::TS_REAL) ||
365            tableFirst_.ts_ < tableSecond_.endTs_) {
366            return true;
367        }
368    } else if (tableFirst_.ts_ <= tableSecond_.ts_ && tableSecond_.ts_ < tableFirst_.endTs_) {
369        return true;
370    }
371    return false;
372}
373
374bool SpanJoin::Cursor::IsFindSpan()
375{
376    for (;;) {
377        if (tableFirst_.isEof_ || tableSecond_.isEof_) {
378            break;
379        }
380        queryNext_ = FindQueryResult();
381        if (CaclOverLap()) {
382            break;
383        }
384        queryNext_->Next();
385    }
386    return true;
387}
388
389SpanJoin::CaclSpan *SpanJoin::Cursor::FindQueryResult()
390{
391    if (!spanTable_->isSamepartitioning_) {
392        return nullptr;
393    }
394
395    auto tableFirstResult = std::make_tuple(tableFirst_.GetPatitonForMiss(), tableFirst_.endTs_,
396                                            tableFirst_.partitionState_ == PartitionState::TS_REAL ? true : false);
397    auto tableSecondResult = std::make_tuple(tableSecond_.GetPatitonForMiss(), tableSecond_.endTs_,
398                                             tableSecond_.partitionState_ == PartitionState::TS_REAL ? true : false);
399    if (tableFirstResult < tableSecondResult) {
400        return &tableFirst_;
401    }
402    return &tableSecond_;
403}
404
405int32_t SpanJoin::Cursor::Column(int32_t column) const
406{
407    switch (static_cast<Index>(column)) {
408        case Index::TS: {
409            sqlite3_result_int64(context_, static_cast<sqlite3_int64>(std::max(tableFirst_.ts_, tableSecond_.ts_)));
410            break;
411        }
412        case Index::DUR: {
413            sqlite3_result_int64(context_,
414                                 static_cast<sqlite3_int64>(std::min(tableFirst_.endTs_, tableSecond_.endTs_) -
415                                                            std::max(tableFirst_.ts_, tableSecond_.ts_)));
416            break;
417        }
418        case Index::PARTITION: {
419            auto partResult = tableFirst_.partitionState_ == PartitionState::TS_REAL ? tableFirst_.partition_
420                                                                                     : tableSecond_.partition_;
421            sqlite3_result_int64(context_, static_cast<sqlite3_int64>(partResult));
422            break;
423        }
424        default: {
425            const auto ColumnInfo = spanTable_->mTableColumnInfo_[column];
426            if (ColumnInfo.tableDesc == tableFirst_.desc_) {
427                tableFirst_.setResult(context_, ColumnInfo.colIdx);
428            } else {
429                tableSecond_.setResult(context_, ColumnInfo.colIdx);
430            }
431        }
432    }
433    return SQLITE_OK;
434}
435
436} // namespace TraceStreamer
437} // namespace SysTuning
438