UniSet @VERSION@
UNetReceiver.h
1/*
2 * Copyright (c) 2015 Pavel Vainerman.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License as
6 * published by the Free Software Foundation, version 2.1.
7 *
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Lesser Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 */
16// -----------------------------------------------------------------------------
17#ifndef UNetReceiver_H_
18#define UNetReceiver_H_
19// -----------------------------------------------------------------------------
20#include <ostream>
21#include <memory>
22#include <string>
23#include <vector>
24#include <unordered_map>
25#include <sigc++/sigc++.h>
26#include <ev++.h>
27#include "UniSetObject.h"
28#include "Trigger.h"
29#include "Mutex.h"
30#include "SMInterface.h"
31#include "SharedMemory.h"
32#include "UDPPacket.h"
33#include "CommonEventLoop.h"
34#include "UNetTransport.h"
35#ifndef DISABLE_REST_API
36#include <Poco/JSON/Object.h>
37#endif
38// --------------------------------------------------------------------------
39namespace uniset
40{
41 // -----------------------------------------------------------------------------
42 /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
43 * ===============
44 * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
45 * что были посланы, сделан циклический буфер (буфер сразу выделяет память при старте).
46 * Т.к. номер получаемых пакетов монотонно растёт на основе него вычисляется индекс
47 * куда поместить пакет в буфере. Есть два индекса
48 * rnum - (read number) номер последнего обработанного пакета + 1
49 * wnum - (write number) номер следующего ожидаемого пакета (номер последнего принятого + 1)
50 * WARNING: Если придёт два пакета с одинаковым номером, то новый пакет перезатрёт прошлый в буфере
51 *
52 * При этом обработка ведётся по порядку (только пакеты идущие подряд)
53 * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
54 * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается с нового места.
55 * Т.к. используется libev и нет многопоточной работы, события обрабатываются последовательно.
56 * Раз в updatetime msec происходит обновление данных в SM, все накопившиеся пакеты обрабатываются
57 * либо пока не встретиться "дырка", либо пока rnum не догонит wnum.
58 *
59 * КЭШ
60 * ===
61 * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
62 * Идея проста: сделан вектор размером с количеством принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
63 * Порядковый номер данных в пакете является индексом в кэше.
64 * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
65 * ID который пришёл в пакете - элемент кэша обновляется.
66 * Если количество пришедших данных не совпадают с размером кэша - кэш обновляется.
67 *
68 * КЭШ (ДОПОЛНЕНИЕ)
69 * ===
70 * Т.к. в общем случае, данные могут быть разбиты на несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
71 * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
72 * Кэш в map добавляется тогда, когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим он используется для этого пакета.
73 * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов,
74 * т.е. на то что UNetSender не будет с течением времени менять количество отправляемых пакетов
75 * (работать будет, просто в map останутся лежать записи для неиспользуемых пакетов)
76 *
77 * ОПТИМИЗАЦИЯ
78 * ===
79 * В кэше так же хранится crc последних принятых данных. Если crc совпадает с тем, что пришло в пакете, то обработки не происходит.
80 * crc хранится отдельно для дискретных и отдельно для аналоговых датчиков.
81 * Эту оптимизацию можно отключить параметром --prefix-recv-ignore-crc или recvIgnoreCRC="1" в конф. файле.
82 *
83 * Обработка сбоев в номере пакетов
84 * =========================================================================
85 * Если в какой-то момент расстояние между rnum и wnum превышает maxDifferens пакетов
86 * то считается, что произошёл сбой или узел который посылал пакеты - перезагрузился
87 * Идёт попытка обработать все текущие пакеты (до первой дырки), а дальше происходит
88 * реинициализация и обработка продолжается с нового номера.
89 *
90 * =========================================================================
91 * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся, пакет не обрабатываем.
92 *
93 * Создание соединения (открытие сокета)
94 * ======================================
95 * Попытка создать сокет производиться сразу в конструкторе, если это не получается,
96 * то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь
97 * открыть сокет.. и так бесконечно, пока не получится. Это важно для систем, где в момент загрузки программы
98 * (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
99 * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть.
100 * Если такая логика не требуется, то можно задать в конструкторе
101 * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
102 * выкинуто исключение при неудачной попытке создания соединения.
103 */
104 // -----------------------------------------------------------------------------
105 class UNetReceiver final:
106 protected EvWatcher,
107 public std::enable_shared_from_this<UNetReceiver>
108 {
109 public:
110 UNetReceiver( std::unique_ptr<UNetReceiveTransport>&& transport, const std::shared_ptr<SMInterface>& smi
111 , bool nocheckConnection = false
112 , const std::string& prefix = "unet" );
113 virtual ~UNetReceiver();
114
115 void start();
116 void stop();
117
118 inline std::string getName() const noexcept
119 {
120 return myname;
121 }
122
123 // блокировать сохранение данных в SM
124 void setLockUpdate( bool st ) noexcept;
125 bool isLockUpdate() const noexcept;
126
127 void resetTimeout() noexcept;
128
129 bool isInitOK() const noexcept;
130 bool isRecvOK() const noexcept;
131 size_t getLostPacketsNum() const noexcept;
132
133 void setReceiveTimeout( timeout_t msec ) noexcept;
134 void setUpdatePause( timeout_t msec ) noexcept;
135 void setLostTimeout( timeout_t msec ) noexcept;
136 void setPrepareTime( timeout_t msec ) noexcept;
137 void setCheckConnectionPause( timeout_t msec ) noexcept;
138 void setMaxDifferens( unsigned long set ) noexcept;
139 void setEvrunTimeout(timeout_t msec ) noexcept;
140 void setInitPause( timeout_t msec ) noexcept;
141 void setBufferSize( size_t sz ) noexcept;
142 void setMaxReceiveAtTime( size_t sz ) noexcept;
143 void setIgnoreCRC( bool set ) noexcept;
144
145 void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
146 void setLostPacketsID( uniset::ObjectId id ) noexcept;
147 void setModeID( uniset::ObjectId id ) noexcept;
148
149 void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
150
151 inline std::string getTransportID() const noexcept
152 {
153 return transport->ID();
154 }
155
157 enum Event
158 {
160 evTimeout
161 };
162
163 enum class Mode : int
164 {
165 mEnabled = 0,
166 mDisabled = 1
167 };
168
169 typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
170 void connectEvent( EventSlot sl ) noexcept;
171
172 // --------------------------------------------------------------------
173 inline std::shared_ptr<DebugStream> getLog() noexcept
174 {
175 return unetlog;
176 }
177
178 std::string getShortInfo() const noexcept;
179
180#ifndef DISABLE_REST_API
181 Poco::JSON::Object::Ptr httpInfo( Poco::JSON::Object::Ptr root ) const;
182#endif
183
184 protected:
185
186 const std::shared_ptr<SMInterface> shm;
187 std::shared_ptr<DebugStream> unetlog;
188
189 enum ReceiveRetCode
190 {
191 retOK = 0,
192 retError = 1,
193 retNoData = 2
194 };
195
196 ReceiveRetCode receive() noexcept;
197 void update() noexcept;
198 void callback( ev::io& watcher, int revents ) noexcept;
199 void readEvent( ev::io& watcher ) noexcept;
200 void updateEvent( ev::periodic& watcher, int revents ) noexcept;
201 void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
202 void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
203 void onForceUpdate( ev::async& watcher, int revents ) noexcept;
204 void initEvent( ev::timer& watcher, int revents ) noexcept;
205 virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
206 virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
207 virtual std::string wname() const noexcept override
208 {
209 return myname;
210 }
211
212 void initIterators() noexcept;
213 bool createConnection( bool throwEx = false );
214 bool checkConnection();
215 size_t rnext( size_t num );
216
217 private:
218 UNetReceiver() {}
219
220 timeout_t updatepause = { 100 };
222 std::unique_ptr<UNetReceiveTransport> transport;
223 std::string addr;
224 std::string myname;
225 ev::io evReceive;
226 ev::periodic evCheckConnection;
227 ev::periodic evStatistic;
228 ev::periodic evUpdate;
229 ev::timer evInitPause;
230 ev::async evForceUpdate;
231
232 // счётчики для подсчёта статистики
233 size_t recvCount = { 0 };
234 size_t upCount = { 0 };
235 std::chrono::steady_clock::time_point t_start;
236 std::chrono::steady_clock::time_point t_end;
237 std::chrono::steady_clock::time_point t_stats;
238
239 // текущая статистика
240 struct Stats
241 {
242 float recvPerSec = {0};
243 float upPerSec = {0};
244 size_t upProcessingTime_microsec = {0};
245 size_t recvProcessingTime_microsec = {0};
246 };
247
248 Stats stats;
249
250 // делаем loop общим.. одним на всех!
251 static CommonEventLoop loop;
252
253 double checkConnectionTime = { 10.0 }; // sec
254 std::mutex checkConnMutex;
255
256 PassiveTimer ptRecvTimeout;
257 PassiveTimer ptPrepare;
258 timeout_t recvTimeout = { 5000 }; // msec
259 timeout_t prepareTime = { 2000 };
260 timeout_t evrunTimeout = { 15000 };
261 timeout_t lostTimeout = { 200 };
262 size_t maxReceiveCount = { 5 }; // количество читаемых за один раз
263
264 double initPause = { 5.0 }; // пауза на начальную инициализацию (сек)
265 std::atomic_bool initOK = { false };
266
267 PassiveTimer ptLostTimeout;
268 size_t lostPackets = { 0 };
271 IOController::IOStateList::iterator itRespond;
272 bool respondInvert = { false };
273 uniset::ObjectId sidLostPackets = { uniset::DefaultObjectId };
274 IOController::IOStateList::iterator itLostPackets;
275
276 // режим работы
278 IOController::IOStateList::iterator itMode;
279 Mode mode = { Mode::mEnabled };
280
281 std::atomic_bool activated = { false };
282
283 size_t cbufSize = { 100 };
284 std::vector<UniSetUDP::UDPMessage> cbuf; // circular buffer
285 size_t wnum = { 1 };
286 size_t rnum = { 0 };
287 UniSetUDP::UDPMessage* pack; // текущий обрабатываемый пакет
288
292 size_t maxDifferens = { 20 };
293
294 std::atomic_bool lockUpdate = { false };
296 EventSlot slEvent;
297 Trigger trTimeout;
298 std::mutex tmMutex;
299
300 struct CacheItem
301 {
302 long id = { uniset::DefaultObjectId };
303 IOController::IOStateList::iterator ioit;
304
305 CacheItem():
306 id(uniset::DefaultObjectId) {}
307 };
308 typedef std::vector<CacheItem> CacheVec;
309
310 struct CacheInfo
311 {
312 uint16_t crc;
313 CacheVec items;
314
315 CacheInfo(): crc(0) {}
316 };
317
318 // ключом является UDPMessage::getDataID()
319 typedef std::unordered_map<long, CacheInfo> CacheMap;
320 CacheMap d_icache_map;
321 CacheMap a_icache_map;
322 size_t cacheMissed; // количество промахов
323 bool ignoreCRC = { false };
325 Trigger trOnMode;
327 CacheInfo* getDCache( UniSetUDP::UDPMessage* upack ) noexcept;
328 CacheInfo* getACache( UniSetUDP::UDPMessage* pack ) noexcept;
329 };
330 // --------------------------------------------------------------------------
331} // end of namespace uniset
332// -----------------------------------------------------------------------------
333namespace std
334{
335 std::string to_string( const uniset::UNetReceiver::Mode& p );
336}
337// -----------------------------------------------------------------------------
338#endif // UNetReceiver_H_
339// -----------------------------------------------------------------------------
Definition CommonEventLoop.h:19
Definition UNetReceiver.h:108
Event
Definition UNetReceiver.h:158
@ evTimeout
Definition UNetReceiver.h:160
@ evOK
Definition UNetReceiver.h:159
Mode
Definition UNetReceiver.h:164
STL namespace.
Definition AccessConfig.h:30
const ObjectId DefaultObjectId
Definition UniSetTypes.h:71
long ObjectId
Definition UniSetTypes_i.idl:30