1#include "tracing/node_trace_writer.h" 2 3#include "util-inl.h" 4 5#include <fcntl.h> 6#include <cstring> 7 8namespace node { 9namespace tracing { 10 11NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern) 12 : log_file_pattern_(log_file_pattern) {} 13 14void NodeTraceWriter::InitializeOnThread(uv_loop_t* loop) { 15 CHECK_NULL(tracing_loop_); 16 tracing_loop_ = loop; 17 18 flush_signal_.data = this; 19 int err = uv_async_init(tracing_loop_, &flush_signal_, 20 [](uv_async_t* signal) { 21 NodeTraceWriter* trace_writer = 22 ContainerOf(&NodeTraceWriter::flush_signal_, signal); 23 trace_writer->FlushPrivate(); 24 }); 25 CHECK_EQ(err, 0); 26 27 exit_signal_.data = this; 28 err = uv_async_init(tracing_loop_, &exit_signal_, ExitSignalCb); 29 CHECK_EQ(err, 0); 30} 31 32void NodeTraceWriter::WriteSuffix() { 33 // If our final log file has traces, then end the file appropriately. 34 // This means that if no trace events are recorded, then no trace file is 35 // produced. 36 bool should_flush = false; 37 { 38 Mutex::ScopedLock scoped_lock(stream_mutex_); 39 if (total_traces_ > 0) { 40 total_traces_ = kTracesPerFile; // Act as if we reached the file limit. 41 should_flush = true; 42 } 43 } 44 if (should_flush) { 45 Flush(true); 46 } 47} 48 49NodeTraceWriter::~NodeTraceWriter() { 50 WriteSuffix(); 51 uv_fs_t req; 52 if (fd_ != -1) { 53 CHECK_EQ(0, uv_fs_close(nullptr, &req, fd_, nullptr)); 54 uv_fs_req_cleanup(&req); 55 } 56 uv_async_send(&exit_signal_); 57 Mutex::ScopedLock scoped_lock(request_mutex_); 58 while (!exited_) { 59 exit_cond_.Wait(scoped_lock); 60 } 61} 62 63void replace_substring(std::string* target, 64 const std::string& search, 65 const std::string& insert) { 66 size_t pos = target->find(search); 67 for (; pos != std::string::npos; pos = target->find(search, pos)) { 68 target->replace(pos, search.size(), insert); 69 pos += insert.size(); 70 } 71} 72 73void NodeTraceWriter::OpenNewFileForStreaming() { 74 ++file_num_; 75 uv_fs_t req; 76 77 // Evaluate a JS-style template string, it accepts the values ${pid} and 78 // ${rotation} 79 std::string filepath(log_file_pattern_); 80 replace_substring(&filepath, "${pid}", std::to_string(uv_os_getpid())); 81 replace_substring(&filepath, "${rotation}", std::to_string(file_num_)); 82 83 if (fd_ != -1) { 84 CHECK_EQ(uv_fs_close(nullptr, &req, fd_, nullptr), 0); 85 uv_fs_req_cleanup(&req); 86 } 87 88 fd_ = uv_fs_open(nullptr, &req, filepath.c_str(), 89 O_CREAT | O_WRONLY | O_TRUNC, 0644, nullptr); 90 uv_fs_req_cleanup(&req); 91 if (fd_ < 0) { 92 fprintf(stderr, "Could not open trace file %s: %s\n", 93 filepath.c_str(), 94 uv_strerror(fd_)); 95 fd_ = -1; 96 } 97} 98 99void NodeTraceWriter::AppendTraceEvent(TraceObject* trace_event) { 100 Mutex::ScopedLock scoped_lock(stream_mutex_); 101 // If this is the first trace event, open a new file for streaming. 102 if (total_traces_ == 0) { 103 OpenNewFileForStreaming(); 104 // Constructing a new JSONTraceWriter object appends "{\"traceEvents\":[" 105 // to stream_. 106 // In other words, the constructor initializes the serialization stream 107 // to a state where we can start writing trace events to it. 108 // Repeatedly constructing and destroying json_trace_writer_ allows 109 // us to use V8's JSON writer instead of implementing our own. 110 json_trace_writer_.reset(TraceWriter::CreateJSONTraceWriter(stream_)); 111 } 112 ++total_traces_; 113 json_trace_writer_->AppendTraceEvent(trace_event); 114} 115 116void NodeTraceWriter::FlushPrivate() { 117 std::string str; 118 int highest_request_id; 119 { 120 Mutex::ScopedLock stream_scoped_lock(stream_mutex_); 121 if (total_traces_ >= kTracesPerFile) { 122 total_traces_ = 0; 123 // Destroying the member JSONTraceWriter object appends "]}" to 124 // stream_ - in other words, ending a JSON file. 125 json_trace_writer_.reset(); 126 } 127 // str() makes a copy of the contents of the stream. 128 str = stream_.str(); 129 stream_.str(""); 130 stream_.clear(); 131 } 132 { 133 Mutex::ScopedLock request_scoped_lock(request_mutex_); 134 highest_request_id = num_write_requests_; 135 } 136 WriteToFile(std::move(str), highest_request_id); 137} 138 139void NodeTraceWriter::Flush(bool blocking) { 140 Mutex::ScopedLock scoped_lock(request_mutex_); 141 { 142 // We need to lock the mutexes here in a nested fashion; stream_mutex_ 143 // protects json_trace_writer_, and without request_mutex_ there might be 144 // a time window in which the stream state changes? 145 Mutex::ScopedLock stream_mutex_lock(stream_mutex_); 146 if (!json_trace_writer_) 147 return; 148 } 149 int request_id = ++num_write_requests_; 150 int err = uv_async_send(&flush_signal_); 151 CHECK_EQ(err, 0); 152 if (blocking) { 153 // Wait until data associated with this request id has been written to disk. 154 // This guarantees that data from all earlier requests have also been 155 // written. 156 while (request_id > highest_request_id_completed_) { 157 request_cond_.Wait(scoped_lock); 158 } 159 } 160} 161 162void NodeTraceWriter::WriteToFile(std::string&& str, int highest_request_id) { 163 if (fd_ == -1) return; 164 165 uv_buf_t buf = uv_buf_init(nullptr, 0); 166 { 167 Mutex::ScopedLock lock(request_mutex_); 168 write_req_queue_.emplace(WriteRequest { 169 std::move(str), highest_request_id 170 }); 171 if (write_req_queue_.size() == 1) { 172 buf = uv_buf_init( 173 const_cast<char*>(write_req_queue_.front().str.c_str()), 174 write_req_queue_.front().str.length()); 175 } 176 } 177 // Only one write request for the same file descriptor should be active at 178 // a time. 179 if (buf.base != nullptr && fd_ != -1) { 180 StartWrite(buf); 181 } 182} 183 184void NodeTraceWriter::StartWrite(uv_buf_t buf) { 185 int err = uv_fs_write( 186 tracing_loop_, &write_req_, fd_, &buf, 1, -1, 187 [](uv_fs_t* req) { 188 NodeTraceWriter* writer = 189 ContainerOf(&NodeTraceWriter::write_req_, req); 190 writer->AfterWrite(); 191 }); 192 CHECK_EQ(err, 0); 193} 194 195void NodeTraceWriter::AfterWrite() { 196 CHECK_GE(write_req_.result, 0); 197 uv_fs_req_cleanup(&write_req_); 198 199 uv_buf_t buf = uv_buf_init(nullptr, 0); 200 { 201 Mutex::ScopedLock scoped_lock(request_mutex_); 202 int highest_request_id = write_req_queue_.front().highest_request_id; 203 write_req_queue_.pop(); 204 highest_request_id_completed_ = highest_request_id; 205 request_cond_.Broadcast(scoped_lock); 206 if (!write_req_queue_.empty()) { 207 buf = uv_buf_init( 208 const_cast<char*>(write_req_queue_.front().str.c_str()), 209 write_req_queue_.front().str.length()); 210 } 211 } 212 if (buf.base != nullptr && fd_ != -1) { 213 StartWrite(buf); 214 } 215} 216 217// static 218void NodeTraceWriter::ExitSignalCb(uv_async_t* signal) { 219 NodeTraceWriter* trace_writer = 220 ContainerOf(&NodeTraceWriter::exit_signal_, signal); 221 // Close both flush_signal_ and exit_signal_. 222 uv_close(reinterpret_cast<uv_handle_t*>(&trace_writer->flush_signal_), 223 [](uv_handle_t* signal) { 224 NodeTraceWriter* trace_writer = 225 ContainerOf(&NodeTraceWriter::flush_signal_, 226 reinterpret_cast<uv_async_t*>(signal)); 227 uv_close( 228 reinterpret_cast<uv_handle_t*>(&trace_writer->exit_signal_), 229 [](uv_handle_t* signal) { 230 NodeTraceWriter* trace_writer = 231 ContainerOf(&NodeTraceWriter::exit_signal_, 232 reinterpret_cast<uv_async_t*>(signal)); 233 Mutex::ScopedLock scoped_lock(trace_writer->request_mutex_); 234 trace_writer->exited_ = true; 235 trace_writer->exit_cond_.Signal(scoped_lock); 236 }); 237 }); 238} 239} // namespace tracing 240} // namespace node 241