1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ConnectionPool"
16 #include "connection_pool.h"
17 
18 #include <base_transaction.h>
19 
20 #include <condition_variable>
21 #include <iterator>
22 #include <mutex>
23 #include <sstream>
24 #include <vector>
25 
26 #include "connection.h"
27 #include "logger.h"
28 #include "rdb_common.h"
29 #include "rdb_errno.h"
30 #include "rdb_fault_hiview_reporter.h"
31 #include "rdb_sql_statistic.h"
32 #include "sqlite_global_config.h"
33 #include "sqlite_utils.h"
34 
35 namespace OHOS {
36 namespace NativeRdb {
37 using namespace OHOS::Rdb;
38 using namespace std::chrono;
39 using Conn = Connection;
40 using ConnPool = ConnectionPool;
41 using SharedConn = std::shared_ptr<Connection>;
42 using SharedConns = std::vector<std::shared_ptr<Connection>>;
43 using SqlStatistic = DistributedRdb::SqlStatistic;
44 using Reportor = RdbFaultHiViewReporter;
45 constexpr int32_t TRANSACTION_TIMEOUT(2);
46 
Create(const RdbStoreConfig &config, int &errCode)47 std::shared_ptr<ConnPool> ConnPool::Create(const RdbStoreConfig &config, int &errCode)
48 {
49     std::shared_ptr<ConnPool> pool(new (std::nothrow) ConnPool(config));
50     if (pool == nullptr) {
51         LOG_ERROR("ConnPool::Create new failed, pool is nullptr.");
52         errCode = E_ERROR;
53         return nullptr;
54     }
55     std::shared_ptr<Connection> conn;
56     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
57         std::tie(errCode, conn) = pool->Init();
58         if (errCode != E_SQLITE_CORRUPT) {
59             break;
60         }
61         config.SetIter(ITER_V1);
62     }
63     std::string dbPath;
64     (void)SqliteGlobalConfig::GetDbPath(config, dbPath);
65     LOG_INFO("code:%{public}d app[%{public}s:%{public}s] path[%{public}s] "
66              "cfg[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]"
67              "%{public}s",
68         errCode, config.GetBundleName().c_str(), config.GetModuleName().c_str(),
69         SqliteUtils::Anonymous(dbPath).c_str(), config.GetDBType(), config.GetHaMode(), config.IsEncrypt(),
70         config.GetArea(), config.GetSecurityLevel(), config.GetRoleType(), config.IsReadOnly(),
71         Reportor::FormatBrief(Connection::Collect(config), SqliteUtils::Anonymous(config.GetName())).c_str());
72     return errCode == E_OK ? pool : nullptr;
73 }
74 
HandleDataCorruption( const RdbStoreConfig &storeConfig, int &errCode)75 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> ConnPool::HandleDataCorruption(
76     const RdbStoreConfig &storeConfig, int &errCode)
77 {
78     std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> result;
79     auto &[rebuiltType, pool] = result;
80 
81     errCode = Connection::Repair(storeConfig);
82     if (errCode == E_OK) {
83         rebuiltType = RebuiltType::REPAIRED;
84     } else {
85         Connection::Delete(storeConfig);
86         rebuiltType = RebuiltType::REBUILT;
87     }
88     pool = Create(storeConfig, errCode);
89     if (errCode != E_OK) {
90         LOG_WARN("failed, type %{public}d db %{public}s encrypt %{public}d error %{public}d, errno",
91             static_cast<uint32_t>(rebuiltType), SqliteUtils::Anonymous(storeConfig.GetName()).c_str(),
92             storeConfig.IsEncrypt(), errCode, errno);
93     } else {
94         Reportor::ReportRestore(Reportor::Create(storeConfig, E_OK, "RestoreType:Rebuild"), false);
95     }
96 
97     return result;
98 }
99 
ConnectionPool(const RdbStoreConfig &storeConfig)100 ConnPool::ConnectionPool(const RdbStoreConfig &storeConfig)
101     : config_(storeConfig), attachConfig_(storeConfig), writers_(), readers_(), transactionStack_(),
102       transactionUsed_(false)
103 {
104     attachConfig_.SetJournalMode(JournalMode::MODE_TRUNCATE);
105 }
106 
Init(bool isAttach, bool needWriter)107 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::Init(bool isAttach, bool needWriter)
108 {
109     const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
110     std::pair<int32_t, std::shared_ptr<Connection>> result;
111     auto &[errCode, conn] = result;
112     errCode = config.Initialize();
113     if (errCode != E_OK) {
114         return result;
115     }
116 
117     if (config.GetRoleType() == OWNER && !config.IsReadOnly()) {
118         // write connect count is 1
119         std::shared_ptr<ConnPool::ConnNode> node;
120         std::tie(errCode, node) = writers_.Initialize(
121             [this, isAttach]() {
122                 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
123                 return Connection::Create(config, true);
124             },
125             1, config.GetWriteTime(), true, needWriter);
126         conn = Convert2AutoConn(node);
127         if (errCode != E_OK) {
128             return result;
129         }
130     }
131 
132     maxReader_ = GetMaxReaders(config);
133     // max read connect count is 64
134     if (maxReader_ > 64) {
135         return { E_ARGS_READ_CON_OVERLOAD, nullptr };
136     }
137     auto [ret, node] = readers_.Initialize(
138         [this, isAttach]() {
139             const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
140             return Connection::Create(config, false);
141         },
142         maxReader_, config.GetReadTime(), maxReader_ == 0);
143     errCode = ret;
144     return result;
145 }
146 
~ConnectionPool()147 ConnPool::~ConnectionPool()
148 {
149     CloseAllConnections();
150 }
151 
GetMaxReaders(const RdbStoreConfig &config)152 int32_t ConnPool::GetMaxReaders(const RdbStoreConfig &config)
153 {
154     if (config.GetStorageMode() != StorageMode::MODE_MEMORY &&
155         config.GetJournalMode() == RdbStoreConfig::GetJournalModeValue(JournalMode::MODE_WAL)) {
156         return config.GetReadConSize();
157     } else {
158         return 0;
159     }
160 }
161 
Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)162 std::shared_ptr<Connection> ConnPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
163 {
164     if (node == nullptr) {
165         return nullptr;
166     }
167 
168     auto conn = node->GetConnect();
169     if (conn == nullptr) {
170         return nullptr;
171     }
172     if (isTrans) {
173         transCount_++;
174     }
175 
176     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
177         auto realPool = pool.lock();
178         if (realPool == nullptr) {
179             return;
180         }
181         if (isTrans) {
182             realPool->transCount_--;
183         }
184         realPool->ReleaseNode(node, !isTrans);
185         node = nullptr;
186     });
187 }
188 
CloseAllConnections()189 void ConnPool::CloseAllConnections()
190 {
191     writers_.Clear();
192     readers_.Clear();
193 }
194 
IsInTransaction()195 bool ConnPool::IsInTransaction()
196 {
197     return isInTransaction_.load();
198 }
199 
SetInTransaction(bool isInTransaction)200 void ConnPool::SetInTransaction(bool isInTransaction)
201 {
202     isInTransaction_.store(isInTransaction);
203 }
204 
CreateTransConn(bool limited)205 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::CreateTransConn(bool limited)
206 {
207     if (transCount_ >= MAX_TRANS && limited) {
208         writers_.Dump("NO TRANS", transCount_ + isInTransaction_);
209         return { E_DATABASE_BUSY, nullptr };
210     }
211     auto [errCode, node] = writers_.Create();
212     return { errCode, Convert2AutoConn(node, true) };
213 }
214 
AcquireConnection(bool isReadOnly)215 std::shared_ptr<Conn> ConnPool::AcquireConnection(bool isReadOnly)
216 {
217     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
218     return Acquire(isReadOnly);
219 }
220 
AcquireAll(int32_t time)221 std::pair<SharedConn, SharedConns> ConnPool::AcquireAll(int32_t time)
222 {
223     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
224     using namespace std::chrono;
225     std::pair<SharedConn, SharedConns> result;
226     auto &[writer, readers] = result;
227     auto interval = duration_cast<milliseconds>(seconds(time));
228     auto start = steady_clock::now();
229     auto writerNodes = writers_.AcquireAll(interval);
230     if (writerNodes.empty()) {
231         return {};
232     }
233     writer = Convert2AutoConn(writerNodes.front());
234 
235     auto usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
236     if (writer == nullptr || usedTime >= interval) {
237         return {};
238     }
239 
240     if (maxReader_ == 0) {
241         return result;
242     }
243 
244     readers_.Disable();
245     auto nodes = readers_.AcquireAll(interval - usedTime);
246     if (nodes.empty()) {
247         readers_.Enable();
248         return {};
249     }
250 
251     for (auto node : nodes) {
252         auto conn = Convert2AutoConn(node);
253         if (conn == nullptr) {
254             continue;
255         }
256         readers.push_back(conn);
257     }
258     return result;
259 }
260 
Acquire(bool isReadOnly, std::chrono::milliseconds ms)261 std::shared_ptr<Conn> ConnPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
262 {
263     Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
264     auto node = container->Acquire(ms);
265     if (node == nullptr) {
266         const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
267         container->Dump(header, transCount_ + isInTransaction_);
268         return nullptr;
269     }
270     return Convert2AutoConn(node);
271 }
272 
AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)273 SharedConn ConnPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
274 {
275     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
276     if (maxReader_ != 0) {
277         return Acquire(isReadOnly, ms);
278     }
279     auto node = writers_.Acquire(ms);
280     if (node == nullptr) {
281         writers_.Dump("writers_", transCount_ + isInTransaction_);
282         return nullptr;
283     }
284     auto conn = node->connect_;
285     writers_.Release(node);
286     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), conn](Connection *) {
287         auto realPool = pool.lock();
288         if (realPool == nullptr) {
289             return;
290         }
291         realPool->writers_.cond_.notify_all();
292     });
293 }
294 
ReleaseNode(std::shared_ptr<ConnNode> node, bool reuse)295 void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node,  bool reuse)
296 {
297     if (node == nullptr) {
298         return;
299     }
300 
301     auto transCount = transCount_ + isInTransaction_;
302     auto errCode = node->Unused(transCount);
303     if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
304         writers_.Dump("WAL writers_", transCount);
305     }
306 
307     auto &container = node->IsWriter() ? writers_ : readers_;
308     if (reuse) {
309         container.Release(node);
310     } else {
311         container.Drop(node);
312     }
313 }
314 
AcquireTransaction()315 int ConnPool::AcquireTransaction()
316 {
317     std::unique_lock<std::mutex> lock(transMutex_);
318     if (transCondition_.wait_for(
319         lock, std::chrono::seconds(TRANSACTION_TIMEOUT),
320         [this] { return !transactionUsed_; })) {
321         transactionUsed_ = true;
322         return E_OK;
323     }
324     LOG_WARN("transactionUsed_ is %{public}d", transactionUsed_);
325     return E_DATABASE_BUSY;
326 }
327 
ReleaseTransaction()328 void ConnPool::ReleaseTransaction()
329 {
330     {
331         std::unique_lock<std::mutex> lock(transMutex_);
332         transactionUsed_ = false;
333     }
334     transCondition_.notify_one();
335 }
336 
RestartReaders()337 int ConnPool::RestartReaders()
338 {
339     readers_.Clear();
340     auto [errCode, node] = readers_.Initialize(
341         [this]() { return Connection::Create(config_, false); }, maxReader_, config_.GetReadTime(), maxReader_ == 0);
342     return errCode;
343 }
344 
345 /**
346  * The database locale.
347  */
ConfigLocale(const std::string &localeStr)348 int ConnPool::ConfigLocale(const std::string &localeStr)
349 {
350     auto errCode = readers_.ConfigLocale(localeStr);
351     if (errCode != E_OK) {
352         return errCode;
353     }
354     return writers_.ConfigLocale(localeStr);
355 }
356 
357 /**
358  * Rename the backed up database.
359  */
ChangeDbFileForRestore(const std::string &newPath, const std::string &backupPath, const std::vector<uint8_t> &newKey, SlaveStatus &slaveStatus)360 int ConnPool::ChangeDbFileForRestore(const std::string &newPath, const std::string &backupPath,
361     const std::vector<uint8_t> &newKey, SlaveStatus &slaveStatus)
362 {
363     if (!writers_.IsFull() || config_.GetPath() == backupPath || newPath == backupPath) {
364         LOG_ERROR("Connection pool is busy now!");
365         return E_ERROR;
366     }
367     if (config_.GetDBType() == DB_VECTOR) {
368         CloseAllConnections();
369         auto [retVal, connection] = CreateTransConn();
370 
371         if (connection == nullptr) {
372             LOG_ERROR("Get null connection.");
373             return retVal;
374         }
375         retVal = connection->Restore(backupPath, newKey, slaveStatus);
376         if (retVal != E_OK) {
377             LOG_ERROR("RdDbRestore error.");
378             return retVal;
379         }
380         CloseAllConnections();
381         auto [errCode, node] = Init();
382         return errCode;
383     }
384     return RestoreByDbSqliteType(newPath, backupPath, slaveStatus);
385 }
386 
RestoreByDbSqliteType(const std::string &newPath, const std::string &backupPath, SlaveStatus &slaveStatus)387 int ConnPool::RestoreByDbSqliteType(const std::string &newPath, const std::string &backupPath, SlaveStatus &slaveStatus)
388 {
389     if (SqliteUtils::IsSlaveDbName(backupPath) && config_.GetHaMode() != HAMode::SINGLE) {
390         auto connection = AcquireConnection(false);
391         if (connection == nullptr) {
392             return E_DATABASE_BUSY;
393         }
394         return connection->Restore(backupPath, {}, slaveStatus);
395     }
396 
397     return RestoreMasterDb(newPath, backupPath);
398 }
399 
RestoreMasterDb(const std::string &newPath, const std::string &backupPath)400 int ConnPool::RestoreMasterDb(const std::string &newPath, const std::string &backupPath)
401 {
402     if (!CheckIntegrity(backupPath)) {
403         LOG_ERROR("backup file is corrupted, %{public}s", SqliteUtils::Anonymous(backupPath).c_str());
404         return E_SQLITE_CORRUPT;
405     }
406     SqliteUtils::DeleteFile(backupPath + "-shm");
407     SqliteUtils::DeleteFile(backupPath + "-wal");
408 
409     CloseAllConnections();
410     Connection::Delete(config_);
411 
412     if (config_.GetPath() != newPath) {
413         RdbStoreConfig config(newPath);
414         config.SetPath(newPath);
415         Connection::Delete(config);
416     }
417 
418     int ret = E_OK;
419     if (!SqliteUtils::CopyFile(backupPath, newPath)) {
420         ret = E_ERROR;
421     }
422     auto result = Init();
423     if (result.first != E_OK) {
424         CloseAllConnections();
425         Connection::Delete(config_);
426         result = Init();
427     }
428     return ret == E_OK ? result.first : ret;
429 }
430 
GetTransactionStack()431 std::stack<BaseTransaction> &ConnPool::GetTransactionStack()
432 {
433     return transactionStack_;
434 }
435 
GetTransactionStackMutex()436 std::mutex &ConnPool::GetTransactionStackMutex()
437 {
438     return transactionStackMutex_;
439 }
440 
DisableWal()441 std::pair<int32_t, std::shared_ptr<Conn>> ConnPool::DisableWal()
442 {
443     return Init(true, true);
444 }
445 
EnableWal()446 int ConnPool::EnableWal()
447 {
448     auto [errCode, node] = Init();
449     return errCode;
450 }
451 
Dump(bool isWriter, const char *header)452 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
453 {
454     Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
455     container->Dump(header, transCount_ + isInTransaction_);
456     return E_OK;
457 }
458 
ConnNode(std::shared_ptr<Conn> conn)459 ConnPool::ConnNode::ConnNode(std::shared_ptr<Conn> conn) : connect_(std::move(conn))
460 {
461 }
462 
GetConnect()463 std::shared_ptr<Conn> ConnPool::ConnNode::GetConnect()
464 {
465     tid_ = gettid();
466     time_ = steady_clock::now();
467     return connect_;
468 }
469 
GetUsingTime() const470 int64_t ConnPool::ConnNode::GetUsingTime() const
471 {
472     auto time = steady_clock::now() - time_;
473     return duration_cast<milliseconds>(time).count();
474 }
475 
Unused(int32_t count)476 int32_t ConnPool::ConnNode::Unused(int32_t count)
477 {
478     time_ = steady_clock::now();
479     if (connect_ == nullptr) {
480         return E_OK;
481     }
482 
483     connect_->ClearCache();
484     if (!connect_->IsWriter()) {
485         tid_ = 0;
486     }
487 
488     if (count > 0) {
489         return E_OK;
490     }
491     auto timeout = time_ > (failedTime_ + minutes(CHECK_POINT_INTERVAL)) || time_ < failedTime_;
492     int32_t errCode = connect_->TryCheckPoint(timeout);
493     if (errCode == E_INNER_WARNING || errCode == E_NOT_SUPPORT) {
494         return E_OK;
495     }
496 
497     failedTime_ = errCode != E_OK ? time_ : steady_clock::time_point();
498     return errCode;
499 }
500 
IsWriter() const501 bool ConnPool::ConnNode::IsWriter() const
502 {
503     if (connect_ != nullptr) {
504         return connect_->IsWriter();
505     }
506     return false;
507 }
508 
Initialize( Creator creator, int32_t max, int32_t timeout, bool disable, bool acquire)509 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Initialize(
510     Creator creator, int32_t max, int32_t timeout, bool disable, bool acquire)
511 {
512     std::shared_ptr<ConnNode> connNode = nullptr;
513     {
514         std::unique_lock<decltype(mutex_)> lock(mutex_);
515         disable_ = disable;
516         max_ = max;
517         creator_ = creator;
518         timeout_ = std::chrono::seconds(timeout);
519         for (int i = 0; i < max; ++i) {
520             auto errCode = ExtendNode();
521             if (errCode != E_OK) {
522                 nodes_.clear();
523                 details_.clear();
524                 return { errCode, nullptr };
525             }
526         }
527 
528         if (acquire && count_ > 0) {
529             connNode = nodes_.back();
530             nodes_.pop_back();
531             count_--;
532         }
533     }
534     cond_.notify_all();
535     return { E_OK, connNode };
536 }
537 
ConfigLocale(const std::string &locale)538 int32_t ConnPool::Container::ConfigLocale(const std::string &locale)
539 {
540     std::unique_lock<decltype(mutex_)> lock(mutex_);
541     if (total_ != count_) {
542         return E_DATABASE_BUSY;
543     }
544     for (auto it = details_.begin(); it != details_.end();) {
545         auto conn = it->lock();
546         if (conn == nullptr || conn->connect_ == nullptr) {
547             it = details_.erase(it);
548             continue;
549         }
550         conn->connect_->ConfigLocale(locale);
551     }
552     return E_OK;
553 }
554 
Acquire(std::chrono::milliseconds milliS)555 std::shared_ptr<ConnPool::ConnNode> ConnPool::Container::Acquire(std::chrono::milliseconds milliS)
556 {
557     std::unique_lock<decltype(mutex_)> lock(mutex_);
558     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
559     if (max_ == 0) {
560         return nullptr;
561     }
562     auto waiter = [this]() -> bool {
563         if (count_ > 0) {
564             return true;
565         }
566 
567         if (disable_) {
568             return false;
569         }
570         return ExtendNode() == E_OK;
571     };
572     if (cond_.wait_for(lock, interval, waiter)) {
573         if (nodes_.empty()) {
574             LOG_ERROR("nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
575                 count_, max_, total_, left_, right_);
576             count_ = 0;
577             return nullptr;
578         }
579         auto node = nodes_.back();
580         nodes_.pop_back();
581         count_--;
582         return node;
583     }
584     return nullptr;
585 }
586 
Create()587 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Create()
588 {
589     if (creator_ == nullptr) {
590         return { E_NOT_SUPPORT, nullptr };
591     }
592 
593     auto [errCode, conn] = creator_();
594     if (conn == nullptr) {
595         return { errCode, nullptr };
596     }
597 
598     auto node = std::make_shared<ConnNode>(conn);
599     if (node == nullptr) {
600         return { E_ERROR, nullptr };
601     }
602     node->id_ = MIN_TRANS_ID + trans_;
603     conn->SetId(node->id_);
604     details_.push_back(node);
605     trans_++;
606     return { E_OK, node };
607 }
608 
ExtendNode()609 int32_t ConnPool::Container::ExtendNode()
610 {
611     if (creator_ == nullptr) {
612         return E_ERROR;
613     }
614     auto [errCode, conn] = creator_();
615     if (conn == nullptr) {
616         return errCode;
617     }
618     auto node = std::make_shared<ConnNode>(conn);
619     node->id_ = right_++;
620     conn->SetId(node->id_);
621     nodes_.push_back(node);
622     details_.push_back(node);
623     count_++;
624     total_++;
625     return E_OK;
626 }
627 
AcquireAll(std::chrono::milliseconds milliS)628 std::list<std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::AcquireAll(std::chrono::milliseconds milliS)
629 {
630     std::list<std::shared_ptr<ConnNode>> nodes;
631     int32_t count = 0;
632     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
633     auto time = std::chrono::steady_clock::now() + interval;
634     std::unique_lock<decltype(mutex_)> lock(mutex_);
635     while (count < total_ && cond_.wait_until(lock, time, [this]() { return count_ > 0; })) {
636         nodes.merge(std::move(nodes_));
637         nodes_.clear();
638         count += count_;
639         count_ = 0;
640     }
641 
642     if (count != total_) {
643         count_ = count;
644         nodes_ = std::move(nodes);
645         nodes.clear();
646         return nodes;
647     }
648     auto func = [](const std::list<std::shared_ptr<ConnNode>> &nodes) -> bool {
649         for (auto &node : nodes) {
650             if (node->connect_ == nullptr) {
651                 continue;
652             }
653             if (node->connect_.use_count() != 1) {
654                 return false;
655             }
656         }
657         return true;
658     };
659     bool failed = false;
660     while (failed = !func(nodes), failed && cond_.wait_until(lock, time) != std::cv_status::timeout) {
661     }
662     if (failed) {
663         count_ = count;
664         nodes_ = std::move(nodes);
665         nodes.clear();
666     }
667     return nodes;
668 }
669 
Disable()670 void ConnPool::Container::Disable()
671 {
672     disable_ = true;
673     cond_.notify_one();
674 }
675 
Enable()676 void ConnPool::Container::Enable()
677 {
678     disable_ = false;
679     cond_.notify_one();
680 }
681 
Release(std::shared_ptr<ConnNode> node)682 int32_t ConnPool::Container::Release(std::shared_ptr<ConnNode> node)
683 {
684     {
685         std::unique_lock<decltype(mutex_)> lock(mutex_);
686         if (node->id_ < left_ || node->id_ >= right_) {
687             return E_OK;
688         }
689         if (count_ == max_) {
690             total_ = total_ > count_ ? total_ - 1 : count_;
691             RelDetails(node);
692         } else {
693             nodes_.push_front(node);
694             count_++;
695         }
696     }
697     cond_.notify_one();
698     return E_OK;
699 }
700 
Drop(std::shared_ptr<ConnNode> node)701 int32_t ConnectionPool::Container::Drop(std::shared_ptr<ConnNode> node)
702 {
703     {
704         std::unique_lock<decltype(mutex_)> lock(mutex_);
705         RelDetails(node);
706     }
707     cond_.notify_one();
708     return E_OK;
709 }
710 
RelDetails(std::shared_ptr<ConnNode> node)711 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
712 {
713     for (auto it = details_.begin(); it != details_.end();) {
714         auto detailNode = it->lock();
715         if (detailNode == nullptr || detailNode->id_ == node->id_) {
716             it = details_.erase(it);
717         } else {
718             it++;
719         }
720     }
721     return E_OK;
722 }
723 
CheckIntegrity(const std::string &dbPath)724 bool ConnectionPool::CheckIntegrity(const std::string &dbPath)
725 {
726     RdbStoreConfig config(config_);
727     config.SetPath(dbPath);
728     config.SetIntegrityCheck(IntegrityCheck::FULL);
729     auto [ret, connection] = Connection::Create(config, true);
730     return ret == E_OK;
731 }
732 
Clear()733 int32_t ConnPool::Container::Clear()
734 {
735     std::list<std::shared_ptr<ConnNode>> nodes;
736     std::list<std::weak_ptr<ConnNode>> details;
737     {
738         std::unique_lock<decltype(mutex_)> lock(mutex_);
739         nodes = std::move(nodes_);
740         details = std::move(details_);
741         disable_ = true;
742         total_ = 0;
743         count_ = 0;
744         if (right_ > MAX_RIGHT) {
745             right_ = 0;
746         }
747         left_ = right_;
748         creator_ = nullptr;
749     }
750     nodes.clear();
751     details.clear();
752     return 0;
753 }
754 
IsFull()755 bool ConnPool::Container::IsFull()
756 {
757     std::unique_lock<decltype(mutex_)> lock(mutex_);
758     return total_ == count_;
759 }
760 
Dump(const char *header, int32_t count)761 int32_t ConnPool::Container::Dump(const char *header, int32_t count)
762 {
763     std::string info;
764     std::vector<std::shared_ptr<ConnNode>> details;
765     std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
766                         std::to_string(total_) + "," + std::to_string(count_) + "]";
767     {
768         std::unique_lock<decltype(mutex_)> lock(mutex_);
769         details.reserve(details_.size());
770         for (auto &detail : details_) {
771             auto node = detail.lock();
772             if (node == nullptr) {
773                 continue;
774             }
775             details.push_back(node);
776         }
777     }
778 
779     for (auto &node : details) {
780         info.append("<")
781             .append(std::to_string(node->id_))
782             .append(",")
783             .append(std::to_string(node->tid_))
784             .append(",")
785             .append(std::to_string(node->GetUsingTime()))
786             .append(">");
787         // 256 represent that limit to info length
788         if (info.size() > 256) {
789             LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
790             info.clear();
791         }
792     }
793     LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
794     return 0;
795 }
796 } // namespace NativeRdb
797 } // namespace OHOS
798