UniSet @VERSION@
UWebSocketGate.h
1/*
2 * Copyright (c) 2017 Pavel Vainerman.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License as
6 * published by the Free Software Foundation, version 2.1.
7 *
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Lesser Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 */
16// --------------------------------------------------------------------------
20// --------------------------------------------------------------------------
21#ifndef UWebSocketGate_H_
22#define UWebSocketGate_H_
23// --------------------------------------------------------------------------
24#include <queue>
25#include <memory>
26#include <mutex>
27#include <condition_variable>
28#include <chrono>
29#include <ev++.h>
30#include <sigc++/sigc++.h>
31#include <Poco/JSON/Object.h>
32#include <Poco/Net/WebSocket.h>
33#include <Poco/ObjectPool.h>
34#include "UniSetTypes.h"
35#include "LogAgregator.h"
36#include "UniSetObject.h"
37#include "DebugStream.h"
38#include "SharedMemory.h"
39#include "SMInterface.h"
40#include "EventLoopServer.h"
41#include "UTCPStream.h"
42#include "UHttpRequestHandler.h"
43#include "UHttpServer.h"
44#include "UTCPCore.h"
45#include "RunLock.h"
46// -------------------------------------------------------------------------
47namespace uniset
48{
49 //------------------------------------------------------------------------------------------
51 public UniSetObject,
52 public EventLoopServer
53#ifndef DISABLE_REST_API
54 , public Poco::Net::HTTPRequestHandler
55#endif
56 {
57 public:
58 UWebSocketGate( uniset::ObjectId id, xmlNode* cnode
59 , uniset::ObjectId shmID
60 , const std::shared_ptr<SharedMemory>& ic = nullptr
61 , const std::string& prefix = "-ws" );
62
63 virtual ~UWebSocketGate();
64
66 static std::shared_ptr<UWebSocketGate> init_wsgate( int argc, const char* const* argv
67 , uniset::ObjectId shmID
68 , const std::shared_ptr<SharedMemory>& ic = nullptr
69 , const std::string& prefix = "ws-" );
70
72 static void help_print();
73
74 inline std::shared_ptr<DebugStream> log()
75 {
76 return mylog;
77 }
78 inline std::shared_ptr<uniset::LogAgregator> logAgregator() noexcept
79 {
80 return loga;
81 }
82
83#ifndef DISABLE_REST_API
84 virtual void handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp ) override;
85 void onWebSocketSession( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
86 Poco::JSON::Object::Ptr httpStatus();
87 Poco::JSON::Object::Ptr httpList();
88 Poco::JSON::Object::Ptr httpHelpApi();
89#endif
90
91 static Poco::JSON::Object::Ptr error_to_json( std::string_view err );
92 static void fill_error_json( Poco::JSON::Object::Ptr& p, std::string_view err );
93
94 protected:
95
96 class UWebSocket;
97
98 virtual bool activateObject() override;
99 virtual bool deactivateObject() override;
100 virtual void sysCommand( const uniset::SystemMessage* sm ) override;
101 void run( bool async );
102 virtual void evfinish() override;
103 virtual void evprepare() override;
104 void onCheckBuffer( ev::timer& t, int revents );
105 void onActivate( ev::async& watcher, int revents ) ;
106 void onCommand( ev::async& watcher, int revents );
107
108#ifndef DISABLE_REST_API
109 void httpWebSocketPage( std::ostream& out, Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
110 void httpWebSocketConnectPage(std::ostream& out, Poco::Net::HTTPServerRequest& req,
111 Poco::Net::HTTPServerResponse& resp, const std::string& params );
112
113 std::shared_ptr<UWebSocket> newWebSocket(Poco::Net::HTTPServerRequest* req, Poco::Net::HTTPServerResponse* resp, const Poco::URI::QueryParameters& qp );
114 void delWebSocket( std::shared_ptr<UWebSocket>& ws );
115
116 Poco::JSON::Object::Ptr respError( Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPResponse::HTTPStatus s, const std::string& message );
117 void makeResponseAccessHeader( Poco::Net::HTTPServerResponse& resp );
118#endif
119 void terminate();
120
121 ev::async wsactivate; // активация WebSocket-ов
122 std::shared_ptr<ev::async> wscmd;
123
124 void checkMessages( ev::timer& t, int revents );
125 virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
126 virtual uniset::SimpleInfo* getInfo( const char* userparam = 0 ) override;
127 ev::timer iocheck;
128 double check_sec = { 0.05 };
129 int maxMessagesProcessing = { 200 };
130
131 std::shared_ptr<DebugStream> mylog;
132 std::shared_ptr<uniset::LogAgregator> loga;
133 std::shared_ptr<SMInterface> shm;
134 std::unique_ptr<uniset::RunLock> runlock;
135
136 std::shared_ptr<uniset::LogServer> logserv;
137 std::string logserv_host = {""};
138 int logserv_port = { 0 };
139
140#ifndef DISABLE_REST_API
141 std::shared_ptr<Poco::Net::HTTPServer> httpserv;
142 std::string httpHost = { "" };
143 int httpPort = { 0 };
144 std::string httpCORS_allow = { "*" };
145
146 double wsHeartbeatTime_sec = { 3.0 };
147 double wsSendTime_sec = { 0.2 };
148 size_t wsMaxSend = { 5000 };
149 size_t wsMaxCmd = { 200 };
150 double wsPongTimeout_sec = { 5.0 };
151 double wsMaxLifetime_sec = { 0 }; // 0 = unlimited
152
153 int jpoolCapacity = { 200 };
154 int jpoolPeakCapacity = { 5000 };
155
164 public Poco::Net::WebSocket
165 {
166 public:
167 UWebSocket( Poco::Net::HTTPServerRequest* req,
168 Poco::Net::HTTPServerResponse* resp,
169 int jpoolCapacity = 100,
170 int jpoolPeakCapacity = 500 );
171
172 virtual ~UWebSocket();
173
174 std::string getInfo() const noexcept;
175
176 bool isActive();
177 void set( ev::dynamic_loop& loop, std::shared_ptr<ev::async> a );
178
179 void send( ev::timer& t, int revents );
180 void ping( ev::timer& t, int revents );
181 void read( ev::io& io, int revents );
182 void pong( ev::timer& t, int revents );
183 void onLifetimeExpired( ev::timer& t, int revents );
184
185 struct sinfo
186 {
187 sinfo( const std::string& _cmd, uniset::ObjectId _id ): id(_id), cmd(_cmd) {}
188
189 std::string err; // ошибка при работе с датчиком (например при заказе)
191 std::string cmd = "";
192 long value = { 0 }; // set value
194 // cache
195 std::string name;
196 };
197
198 void ask( uniset::ObjectId id );
199 void del( uniset::ObjectId id );
200 void get( uniset::ObjectId id );
201 void set( uniset::ObjectId id, long value );
202 void freeze( uniset::ObjectId id, long value );
203 void unfreeze( uniset::ObjectId id );
204 void sensorInfo( const uniset::SensorMessage* sm );
205 void doCommand( const std::shared_ptr<SMInterface>& ui );
206 static Poco::JSON::Object::Ptr to_short_json( const std::shared_ptr<sinfo>& si );
207 static Poco::JSON::Object::Ptr to_json( const uniset::SensorMessage* sm, const std::shared_ptr<sinfo>& si );
208 static void fill_short_json( Poco::JSON::Object::Ptr& p, const std::shared_ptr<sinfo>& si );
209 static void fill_json( Poco::JSON::Object::Ptr& p, const uniset::SensorMessage* sm, const std::shared_ptr<sinfo>& si );
210
211 void term();
212 void waitCompletion();
213
214 // настройка
215 void setHearbeatTime( const double& sec );
216 void setSendPeriod( const double& sec );
217 void setMaxSendCount( size_t val );
218 void setMaxCmdCount( size_t val );
219 void setPongTimeout( const double& sec );
220 void setMaxLifetime( const double& sec );
221
222 std::shared_ptr<DebugStream> mylog;
223
224 protected:
225
226 void write();
227 void sendResponse( const std::shared_ptr<sinfo>& si );
228 void sendShortResponse( const std::shared_ptr<sinfo>& si );
229 void onCommand( std::string_view cmd );
230 void sendError( std::string_view message );
231 void returnObjectToPool( Poco::JSON::Object::Ptr& json );
232
233 ev::timer iosend;
234 double send_sec = { 0.5 };
235 size_t maxsend = { 5000 };
236 size_t maxcmd = { 200 };
237 const int Kbuf = { 10 }; // коэффициент для буфера сообщений (maxsend умножается на Kbuf)
238 static const size_t sbufLen = 100 * 1024;
239 // специальный предел (меньше максимального)
240 // чтобы гарантировать что объект полностью влез в буфер
241 static const size_t sbufLim = (size_t)(0.8 * sbufLen);
242 char sbuf[sbufLen]; // буфер используемый для преобразования json в потом байт (см. send)
243
244 ev::timer ioping;
245 double ping_sec = { 3.0 };
246 static const std::string ping_str;
247 ev::timer iopong;
248 double pongTimeout_sec = { 5.0 };
249 size_t pongCounter = { 0 };
250
251 ev::timer iolifetime;
252 double maxLifetime_sec = { 0 }; // 0 = unlimited
253 std::chrono::steady_clock::time_point sessionStart;
254
255 ev::io iorecv;
256 char rbuf[64 * 1024];
257 timeout_t recvTimeout = { 200 }; // msec
258 std::shared_ptr<ev::async> cmdsignal;
259
260 std::mutex finishmut;
261 std::condition_variable finish;
262 std::mutex dataMutex; // защита smap/jbuf от параллельных потоков
263
264 std::atomic_bool cancelled = { false };
265
266 std::unordered_map<uniset::ObjectId, std::shared_ptr<sinfo> > smap;
267 std::queue< std::shared_ptr<sinfo> > qcmd; // очередь команд
268
269 Poco::Net::HTTPServerRequest* req;
270 Poco::Net::HTTPServerResponse* resp;
271
272 // очередь json-на отправку
273 std::queue<Poco::JSON::Object::Ptr> jbuf;
274 std::unique_ptr<Poco::ObjectPool< Poco::JSON::Object, Poco::JSON::Object::Ptr >> jpoolSM;
275 std::unique_ptr<Poco::ObjectPool< Poco::JSON::Object, Poco::JSON::Object::Ptr >> jpoolErr;
276 std::unique_ptr<Poco::ObjectPool< Poco::JSON::Object, Poco::JSON::Object::Ptr >> jpoolShortSM;
277
278 // очередь данных на посылку..
279 std::unique_ptr<Poco::ObjectPool< uniset::UTCPCore::Buffer >> wbufpool;
280 std::queue<uniset::UTCPCore::Buffer*> wbuf;
281 size_t maxsize; // рассчитывается сходя из max_send (см. конструктор)
282 };
283
285 {
286 public:
287
288 UWebSocketGuard( std::shared_ptr<UWebSocket>& s, UWebSocketGate* g ):
289 ws(s), wsgate(g) {}
290
292 {
293 wsgate->delWebSocket(ws);
294 }
295
296 private:
297 std::shared_ptr<UWebSocket> ws;
298 UWebSocketGate* wsgate;
299 };
300
301 friend class UWebSocketGuard;
302
303 std::list<std::shared_ptr<UWebSocket>> wsocks;
304 uniset::uniset_rwmutex wsocksMutex;
305 size_t maxwsocks = { 50 }; // максимальное количество websocket-ов
306
308 public Poco::Net::HTTPRequestHandlerFactory
309 {
310 public:
313
314 virtual Poco::Net::HTTPRequestHandler* createRequestHandler( const Poco::Net::HTTPServerRequest& req ) override;
315
316 private:
317 UWebSocketGate* wsgate;
318 };
319#endif
320
321 private:
322
323 };
324 // ----------------------------------------------------------------------------------
325} // end of namespace uniset
326//------------------------------------------------------------------------------------------
327#endif
The EventLoopServer class Реализация общей части всех процессов использующих libev....
Definition EventLoopServer.h:18
Definition MessageType.h:127
Definition MessageType.h:171
Definition UWebSocketGate.h:285
Definition UWebSocketGate.h:165
timeout_t recvTimeout
Definition UWebSocketGate.h:257
Definition UWebSocketGate.h:56
virtual bool activateObject() override
Активизация объекта (переопределяется для необходимых действий после активизации)
Definition UWebSocketGate.cc:838
static std::shared_ptr< UWebSocketGate > init_wsgate(int argc, const char *const *argv, uniset::ObjectId shmID, const std::shared_ptr< SharedMemory > &ic=nullptr, const std::string &prefix="ws-")
Definition UWebSocketGate.cc:414
virtual bool deactivateObject() override
Деактивация объекта (переопределяется для необходимых действий при завершении работы)
Definition UWebSocketGate.cc:828
static void help_print()
Definition UWebSocketGate.cc:430
Definition UniSetObject.h:80
std::shared_ptr< UInterface > ui
Definition UniSetObject.h:136
Definition Mutex.h:32
Definition AccessConfig.h:30
const ObjectId DefaultObjectId
Definition UniSetTypes.h:71
long ObjectId
Definition UniSetTypes_i.idl:30
-client
Definition UniSetTypes_i.idl:65
Definition UWebSocketGate.h:186