UniSet @VERSION@
LogDB.h
1/*
2 * Copyright (c) 2015 Pavel Vainerman.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License as
6 * published by the Free Software Foundation, version 2.1.
7 *
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Lesser Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 */
16// --------------------------------------------------------------------------
20// --------------------------------------------------------------------------
21#ifndef LogDB_H_
22#define LogDB_H_
23// --------------------------------------------------------------------------
24#include <queue>
25#include <deque>
26#include <memory>
27#include <mutex>
28#include <condition_variable>
29#include <chrono>
30#include <atomic>
31#include <ev++.h>
32#include <sigc++/sigc++.h>
33#include <Poco/JSON/Object.h>
34#include <Poco/Net/WebSocket.h>
35#include <Poco/ObjectPool.h>
36#include "UniSetTypes.h"
37#include "LogAgregator.h"
38#include "DebugStream.h"
39#include "SQLiteInterface.h"
40#include "EventLoopServer.h"
41#include "UTCPStream.h"
42#include "LogReader.h"
43#include "LogServer.h"
44#include "UHttpRequestHandler.h"
45#include "UHttpServer.h"
46#include "UTCPCore.h"
47// -------------------------------------------------------------------------
48namespace uniset
49{
50 //------------------------------------------------------------------------------------------
55 class LogDB:
56 public EventLoopServer
57#ifndef DISABLE_REST_API
58 , public Poco::Net::HTTPRequestHandler
59#endif
60 {
61 public:
62 LogDB( const std::string& name, int argc, const char* const* argv, const std::string& prefix );
63 virtual ~LogDB();
64
66 static std::shared_ptr<LogDB> init_logdb( int argc, const char* const* argv, const std::string& prefix = "logdb-" );
67
69 static void help_print();
70
71 inline std::shared_ptr<DebugStream> log()
72 {
73 return dblog;
74 }
75
76 void run( bool async );
77#ifndef DISABLE_REST_API
78 virtual void handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp ) override;
79 void onWebSocketSession( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
80 void handleOverload(Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp);
81#endif
82
83 protected:
84
85 class Log;
86 class LogWebSocket;
87
88 virtual void evfinish() override;
89 virtual void evprepare() override;
90 void onCheckBuffer( ev::timer& t, int revents );
91 void onActivate( ev::async& watcher, int revents ) ;
92 void addLog( Log* log, const std::string& txt );
93 void log2File( Log* log, const std::string& txt );
94
95 size_t getCountOfRecords( const std::string& logname = "" );
96 size_t getFirstOfOldRecord( size_t maxnum );
97
98 // экранирование кавычек (удваивание для sqlite)
99 static std::string qEscapeString( const std::string& s );
100
101#ifndef DISABLE_REST_API
102 Poco::JSON::Object::Ptr respError( Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPResponse::HTTPStatus s, const std::string& message );
103 Poco::JSON::Object::Ptr httpGetRequest( Poco::Net::HTTPServerResponse& resp, const std::string& cmd, const Poco::URI::QueryParameters& p );
104 Poco::JSON::Object::Ptr httpGetList( Poco::Net::HTTPServerResponse& resp, const Poco::URI::QueryParameters& p );
105 Poco::JSON::Object::Ptr httpGetLogs( Poco::Net::HTTPServerResponse& resp, const Poco::URI::QueryParameters& p );
106 Poco::JSON::Object::Ptr httpGetCount( Poco::Net::HTTPServerResponse& resp, const Poco::URI::QueryParameters& p );
107 Poco::JSON::Object::Ptr httpGetStatus( Poco::Net::HTTPServerResponse& resp, const Poco::URI::QueryParameters& p );
108 Poco::JSON::Object::Ptr httpDownload( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp, const Poco::URI::QueryParameters& p );
109 Poco::JSON::Object::Ptr httpLogControl(std::ostream& out, Poco::Net::HTTPServerRequest& req,
110 Poco::Net::HTTPServerResponse& resp, const std::string& logname, const Poco::URI::QueryParameters& params );
111
112 void httpWebSocketPage( std::ostream& out, Poco::Net::HTTPServerRequest& req,
113 Poco::Net::HTTPServerResponse& resp, const Poco::URI::QueryParameters& p );
114 void httpWebSocketConnectPage( Poco::Net::HTTPServerRequest& req,
115 Poco::Net::HTTPServerResponse& resp, const std::string& logname, const Poco::URI::QueryParameters& p );
116
117 bool supportsGzip( Poco::Net::HTTPServerRequest& request );
118
119 // формирование условия where для строки XX[m|h|d|M]
120 // XX m - минут, h-часов, d-дней, M - месяцев
121 static std::string qLast( const std::string& p );
122
123 // преобразование в дату 'YYYY-MM-DD' из строки 'YYYYMMDD' или 'YYYY/MM/DD'
124 static std::string qDate(const std::string& p, const char sep = '-');
125
126 std::shared_ptr<LogWebSocket> newWebSocket(Poco::Net::HTTPServerRequest* req, Poco::Net::HTTPServerResponse* resp,
127 const std::string& logname, const Poco::URI::QueryParameters& p );
128 void delWebSocket( std::shared_ptr<LogWebSocket>& ws );
129
130#endif
131 std::string myname;
132 std::unique_ptr<SQLiteInterface> db;
133 std::string dbfile;
134
135 std::string tmsFormat = { "localtime" };
137 bool activate = { false };
138 std::chrono::steady_clock::time_point startTime;
139
140 typedef std::queue<std::string> QueryBuffer;
141 QueryBuffer qbuf;
142 size_t qbufSize = { 1000 }; // размер буфера сообщений.
143
144 ev::timer flushBufferTimer;
145 double tmFlushBuffer_sec = { 1.0 };
146 void flushBuffer();
147 void rotateDB();
148
149 size_t maxdbRecords = { 200 * 1000 };
150 size_t numOverflow = { 0 }; // вычисляется из параметра "overflow factor"(float)
151
152 ev::sig sigTERM;
153 ev::sig sigQUIT;
154 ev::sig sigINT;
155 void onTerminate( ev::sig& evsig, int revents );
156
157 ev::async wsactivate; // активация LogWebSocket-ов
158
159 std::shared_ptr<uniset::LogServer> logserv;
160 std::string logserv_host = {""};
161 int logserv_port = {0};
162
163 class Log
164 {
165 public:
166 std::string name;
167 std::string ip;
168 int port = { 0 };
169 std::string cmd;
170 std::string usercmd;
171 std::string lastcmd;
172 std::string peername;
173 std::string description;
174
175 std::shared_ptr<DebugStream> dblog;
176 std::shared_ptr<DebugStream> logfile;
177
178 bool isConnected() const;
179
180 void set( ev::dynamic_loop& loop );
181 void check( ev::timer& t, int revents );
182 void event( ev::io& watcher, int revents );
183 void read( ev::io& watcher );
184 void oncommand( ev::async& watcher, int revents );
185 void write( ev::io& io );
186 void close();
187
188 typedef sigc::signal<void, Log*, const std::string&> ReadSignal;
189 ReadSignal signal_on_read();
190
191 void setCheckConnectionTime( double sec );
192 void setReadBufSize( size_t sz );
193 void setCommand( const std::string& cmd );
194
195 protected:
196 void ioprepare();
197 bool connect() noexcept;
198
199 private:
200 ReadSignal sigRead;
201 ev::io io;
202 ev::timer iocheck;
203 ev::async iocmd;
204
205 double checkConnection_sec = { 5.0 };
206
207 std::shared_ptr<UTCPStream> tcp;
208 std::vector<char> buf; // буфер для чтения сообщений
209
210 static const size_t reservsize = { 1000 };
211 std::string text;
212
213 // буфер для посылаемых данных (write buffer)
214 std::queue<UTCPCore::Buffer*> wbuf;
215
216 // очередь команд для посылки
217 std::vector<UTCPCore::Buffer*> cmdbuf;
219 };
220
221 std::vector< std::shared_ptr<Log> > logservers;
222 std::shared_ptr<DebugStream> dblog;
223
224#ifndef DISABLE_REST_API
225 std::shared_ptr<Poco::Net::HTTPServer> httpserv;
226 std::string httpHost = { "" };
227 int httpPort = { 0 };
228 std::string httpCORS_allow = { "*" };
229 std::string httpReplyAddr = { "" };
230 std::string httpJsonContentType = {"text/json; charset=UTF-8" };
231 std::string httpHtmlContentType = {"text/html; charset=UTF-8" };
232 std::string utf8Code = "UTF-8";
233 std::atomic<size_t> httpActiveRequests{0};
234 size_t httpMaxThreads = { 0 }; // рассчитывается автоматически
235 size_t httpMaxQueued = { 0 };
236 size_t httpMaxRequests = { 10 }; // максимум одновременных HTTP запросов (не WS)
237 static constexpr size_t reservedForMonitoring = 2; // резерв для /status, /help и т.п.
238
239 double wsHeartbeatTime_sec = { 3.0 };
240 double wsSendTime_sec = { 0.5 };
241 size_t wsMaxSend = { 200 };
242 double wsBackpressureTime_sec = { 15.0 };
243 size_t wsQueueBytesLimit = { 2 * 1024 * 1024 };
244 size_t wsFrameBytesLimit = { 64 * 1024 };
245 bool httpEnabledLogControl = { false };
246 bool httpEnabledDownload = { false };
247 double wsPongTimeout_sec = { 10.0 };
248 double wsMaxLifetime_sec = { 0 }; // 0 = unlimited
249
250 std::string wsPageTemplate = "";
251
260 public Poco::Net::WebSocket
261 {
262 public:
263 LogWebSocket(Poco::Net::HTTPServerRequest* req,
264 Poco::Net::HTTPServerResponse* resp,
265 std::shared_ptr<Log>& log );
266
267 virtual ~LogWebSocket();
268
269 // конечно некрасиво что это в public
270 std::shared_ptr<DebugStream> dblog;
271
272 bool isActive();
273 void set( ev::dynamic_loop& loop );
274
275 void send( ev::timer& t, int revents );
276 void ping( ev::timer& t, int revents );
277 void add( Log* log, const std::string& txt );
278
279 void term();
280 void waitCompletion();
281
282 // настройка
283 void setHearbeatTime( const double& sec );
284 void setSendPeriod( const double& sec );
285 void setMaxSendCount( size_t val );
286 void setBackpressureTimeout( const double& sec );
287 void setPendingNotice( const std::string& msg );
288 void setQueueBytesLimit( size_t bytes );
289 void setMaxFrameBytes( size_t bytes );
290 void setPongTimeout( const double& sec );
291 void setMaxLifetime( const double& sec );
292
293 protected:
294 void read( ev::io& w, int revents );
295 void checkPongTimeout( ev::timer& t, int revents );
296
297 void enqueueMessage( const std::string& msg );
298 void buildFramesFromMessages();
299 void clearFrames();
300 void logQueueStats( const std::string& reason );
301 void write();
302 void handleBackpressure();
303
304 ev::timer iosend;
305 double send_sec = { 0.5 };
306 size_t maxsend = { 200 };
307
308 ev::timer ioping;
309 double ping_sec = { 3.0 };
310
311 ev::io ioread;
312 ev::timer iopongcheck;
313 double pongTimeout_sec = { 10.0 };
314 std::atomic_bool waitingPong = { false };
315 std::chrono::steady_clock::time_point lastPingSent;
316 std::chrono::steady_clock::time_point sessionStart;
317 double maxLifetime_sec = { 0 }; // 0 = unlimited
318
319 std::mutex finishmut;
320 std::condition_variable finish;
321
322 std::atomic_bool cancelled = { false };
323
324 sigc::connection con; // подписка на появление логов..
325
326 Poco::Net::HTTPServerRequest* req;
327 Poco::Net::HTTPServerResponse* resp;
328
329 // очередь данных на посылку..
330 std::queue<UTCPCore::Buffer*> wbuf;
331 std::deque<std::string> msgQueue;
332 size_t queuedBytes = { 0 };
333 size_t queueBytesLimit = { 2 * 1024 * 1024 }; // предел буфера сообщений
334 size_t maxFrameBytes = { 64 * 1024 }; // ограничение на размер одного фрейма
335 std::chrono::steady_clock::time_point lastDiag;
336 size_t lostByOverflow = { 0 };
337 size_t backpressureCount = { 0 };
338 std::chrono::steady_clock::time_point backpressureStart;
339 bool backpressureActive = { false };
340 double backpressureTimeout_sec = { 5.0 };
341 std::string pendingNotice;
342
343 std::unique_ptr<Poco::ObjectPool<uniset::UTCPCore::Buffer>> bufPool;
344 size_t bufPoolCapacity = { 256 };
345 size_t bufPoolPeak = { 2000 };
346
347 std::shared_ptr<Log> log;
348 };
349
351 {
352 public:
353
354 LogWebSocketGuard( std::shared_ptr<LogWebSocket>& s, LogDB* l ):
355 ws(s), logdb(l) {}
356
358 {
359 ws->term();
360 logdb->delWebSocket(ws);
361 }
362
363
364 private:
365 std::shared_ptr<LogWebSocket> ws;
366 LogDB* logdb;
367 };
368
369 friend class LogWebSocketGuard;
370
371 std::list<std::shared_ptr<LogWebSocket>> wsocks;
372 uniset::uniset_rwmutex wsocksMutex;
373 size_t maxwsocks = { 50 }; // максимальное количество websocket-ов
374
376 public Poco::Net::HTTPRequestHandlerFactory
377 {
378 public:
379 LogDBRequestHandlerFactory( LogDB* l ): logdb(l) {}
380 virtual ~LogDBRequestHandlerFactory() {}
381
382 virtual Poco::Net::HTTPRequestHandler* createRequestHandler( const Poco::Net::HTTPServerRequest& req ) override;
383
384 private:
385 LogDB* logdb;
386 };
387
388 friend class LogDBOverloadRequestHandler;
389#endif
390
391 private:
392 };
393 // ----------------------------------------------------------------------------------
394} // end of namespace uniset
395//------------------------------------------------------------------------------------------
396#endif
Definition LogDB.cc:1035
The EventLoopServer class Реализация общей части всех процессов использующих libev....
Definition EventLoopServer.h:18
Definition LogDB.h:164
void setCommand(const std::string &cmd)
Definition LogDB.cc:831
Definition LogDB.h:351
Definition LogDB.h:261
Definition LogDB.h:60
std::string tmsFormat
Definition LogDB.h:135
Poco::JSON::Object::Ptr httpGetList(Poco::Net::HTTPServerResponse &resp, const Poco::URI::QueryParameters &p)
Definition LogDB.cc:1301
static std::shared_ptr< LogDB > init_logdb(int argc, const char *const *argv, const std::string &prefix="logdb-")
Definition LogDB.cc:555
static void help_print()
Definition LogDB.cc:568
Definition Mutex.h:32
Definition AccessConfig.h:30
-client