UniSet @VERSION@
OPCUAExchange.h
1/*
2 * Copyright (c) 2023 Pavel Vainerman.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License as
6 * published by the Free Software Foundation, version 2.1.
7 *
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Lesser Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 */
16// -----------------------------------------------------------------------------
17#ifndef OPCUAExchange_H_
18#define OPCUAExchange_H_
19// -----------------------------------------------------------------------------
20#include <vector>
21#include <memory>
22#include <list>
23#include <string>
24#include <regex>
25#include <optional>
26#include <chrono>
27#include <mutex>
28#include <unordered_map>
29#include "UniXML.h"
30#include "ThreadCreator.h"
31#include "PassiveTimer.h"
32#include "Trigger.h"
33#include "IONotifyController.h"
34#include "UniSetObject.h"
35#include "Mutex.h"
36#include "MessageType.h"
37#include "SMInterface.h"
38#include "IOBase.h"
39#include "SharedMemory.h"
40#include "LogServer.h"
41#include "DebugStream.h"
42#include "LogAgregator.h"
43#include "OPCUAClient.h"
44#include "USingleProcess.h"
45// -------------------------------------------------------------------------
46#ifndef vmonit
47#define vmonit( var ) vmon.add( #var, var )
48#endif
49// -------------------------------------------------------------------------
50namespace uniset
51{
52 // ---------------------------------------------------------------------
182 // ---------------------------------------------------------------------
185 private USingleProcess,
186 public UniSetObject
187 {
188 public:
189 OPCUAExchange( uniset::ObjectId id, xmlNode* cnode,
190 uniset::ObjectId icID, const std::shared_ptr<SharedMemory>& shm = nullptr,
191 const std::string& _prefix = "opcua" );
192 virtual ~OPCUAExchange();
193
194 static std::shared_ptr<OPCUAExchange> init_opcuaexchange(int argc, const char* const* argv,
195 uniset::ObjectId icID, const std::shared_ptr<SharedMemory>& ic = nullptr,
196 const std::string& prefix = "opcua");
197
198 static void help_print( int argc, const char* const* argv );
199
200 virtual uniset::SimpleInfo* getInfo( const char* userparam = 0 ) override;
201
202 static uint8_t firstBit( uint32_t mask );
203
204 // offset = firstBit(mask)
205 static uint32_t getBits( uint32_t value, uint32_t mask, uint8_t offset );
206 // if mask = 0 return value
207 static uint32_t setBits( uint32_t value, uint32_t set, uint32_t mask, uint8_t offset );
208 // if mask=0 return set
209 static uint32_t forceSetBits( uint32_t value, uint32_t set, uint32_t mask, uint8_t offset );
210
211 using Tick = uint8_t;
212
213 static const size_t numChannels = 2;
215 {
216 std::vector<std::vector<OPCUAClient::ResultVar>> results;
217 std::vector<std::vector<opcua::ua::ReadValueId>> ids;
218 };
220 {
221 std::vector<std::vector<opcua::ua::WriteValue>> ids;
222 };
225 public IOBase
226 {
227 // т.к. IOBase содержит rwmutex с запрещённым конструктором копирования
228 // приходится здесь тоже объявлять разрешенными только операции "перемещения"
229 OPCAttribute(const OPCAttribute& r ) = delete;
230 OPCAttribute& operator=(const OPCAttribute& r) = delete;
231 OPCAttribute(OPCAttribute&& r ) = default;
232 OPCAttribute& operator=(OPCAttribute&& r) = default;
233 OPCAttribute() = default;
234
236 int32_t val { 0 };
237 Tick tick = { 0 }; // на каждом ли тике работать с этим аттрибутом
238 uint32_t mask = { 0 };
239 uint8_t offset = { 0 };
240 OPCUAClient::VarType vtype = { OPCUAClient::VarType::Int32 };
241
242 // with precision
243 float as_float();
244
245 // with precision/noprecision
246 int32_t set( float val );
247
248 std::string attrName = {""};
249 struct RdValue
250 {
251 std::shared_ptr<ReadGroup> gr;
252 size_t grIndex = {0}; // индекс в запросе (номер в запросе)
253 size_t grNumber = {0}; // Номер группы запроса в общем списке
254 int32_t get();
255 float getF();
256 opcua::StatusCode status();
257 const opcua::ua::ReadValueId& ref();
258 // Subscription
259 uint32_t subscriptionId = {0U};
260 uint32_t monitoredItemId = {0U};
261 bool subscriptionState = {false};
262 };
263 RdValue rval[numChannels];
264
265 struct WrValue
266 {
267 std::shared_ptr<WriteGroup> gr;
268 size_t grIndex = {0}; // индекс в запросе (номер в запросе)
269 size_t grNumber = {0}; // Номер группы запроса в общем списке
270 bool set( int32_t val );
271 bool setF( float val );
272 opcua::StatusCode status();
273 const opcua::ua::WriteValue& ref();
274 static opcua::Variant initValue(const std::string& stype, int32_t defvalue );
275 };
276 WrValue wval[numChannels];
277
278 friend std::ostream& operator<<(std::ostream& os, const OPCAttribute& inf );
279 friend std::ostream& operator<<(std::ostream& os, const std::shared_ptr<OPCAttribute>& inf );
280 };
281
284 {
285 emNone = 0,
290 emLastNumber
291 };
292
295 {
296 smNone = 0,
298 smAny = 2,
299 smLastNumber
300 };
301
304 {
305 std::chrono::system_clock::time_point time; // first occurrence
306 std::chrono::system_clock::time_point lastSeen; // last occurrence
307 size_t channel { 0 };
308 std::string operation; // "read" | "write" | "connect"
309 UA_StatusCode statusCode { UA_STATUSCODE_GOOD };
310 std::string nodeid; // опционально, для read/write
311 size_t count { 1 };
312 };
313
314 typedef std::list<IOBase> ThresholdList;
315 // т.к. пороговые датчики не связаны напрямую с обменом, создаём для них отдельный список
316 // и отдельно его проверяем потом
317 ThresholdList thrlist;
318
319#ifndef DISABLE_REST_API
320 // HTTP API
321 virtual Poco::JSON::Object::Ptr httpHelp( const Poco::URI::QueryParameters& p ) override;
322 virtual Poco::JSON::Object::Ptr httpRequest( const UHttp::HttpRequestContext& ctx ) override;
323 virtual Poco::JSON::Object::Ptr httpGetMyInfo( Poco::JSON::Object::Ptr root ) override;
324#endif
325
326 protected:
327
328 enum Timers
329 {
330 tmUpdates
331 };
332
333 struct Channel;
334 void channel1Thread();
335 void channel2Thread();
336 void channelThread( Channel* ch );
337 bool prepare();
338 void channelExchange( Tick tick, Channel* ch, bool writeOn );
339 void updateFromChannel( Channel* ch );
340 void updateToChannel( Channel* ch );
341 void updateFromSM();
342 void writeToSM();
343 bool isUpdateSM( bool wrFunc ) const noexcept;
344
345 virtual void sysCommand( const uniset::SystemMessage* sm ) override;
346 virtual void askSensors( UniversalIO::UIOCommand cmd );
347 virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
348 virtual void timerInfo( const uniset::TimerMessage* sm ) override;
349 virtual bool activateObject() override;
350 virtual bool deactivateObject() override;
351
352#ifndef DISABLE_REST_API
353 // Публичные HTTP-обработчики параметров
354 Poco::JSON::Object::Ptr httpGetParam( const Poco::URI::QueryParameters& p );
355 Poco::JSON::Object::Ptr httpSetParam( Poco::Net::HTTPServerResponse& resp, const Poco::URI::QueryParameters& p );
356 Poco::JSON::Object::Ptr httpStatus();
357 Poco::JSON::Object::Ptr buildLogServerInfo();
358
359 // Новые HTTP endpoints
360 Poco::JSON::Object::Ptr httpGet( const Poco::URI::QueryParameters& p );
361 Poco::JSON::Object::Ptr httpSensors( const Poco::URI::QueryParameters& p );
362 Poco::JSON::Object::Ptr httpSensor( Poco::Net::HTTPServerResponse& resp, const Poco::URI::QueryParameters& p );
363 Poco::JSON::Object::Ptr httpDiagnostics( const Poco::URI::QueryParameters& p );
364 Poco::JSON::Object::Ptr httpTakeControl( Poco::Net::HTTPServerResponse& resp, const Poco::URI::QueryParameters& p );
365 Poco::JSON::Object::Ptr httpReleaseControl( Poco::Net::HTTPServerResponse& resp, const Poco::URI::QueryParameters& p );
366
367 // Вспомогательные методы
368 std::string formatTime( const std::chrono::system_clock::time_point& tp ) const;
369 Poco::JSON::Object::Ptr sensorToJson( const std::shared_ptr<OPCAttribute>& attr, bool detailed = false, const std::string& cachedName = "" ) const;
370
371 // Защитный флаг: запретить /setparam при false
372 bool httpEnabledSetParams { true };
373#endif
374
375 // Вспомогательные методы
376 void addError( size_t channel, const std::string& operation, UA_StatusCode status, const std::string& nodeid = "" );
377
378 // Флаги управления режимом через HTTP (используются и вне REST API)
379 bool httpControlAllow { false };
380 std::atomic_bool httpControlActive { false };
382 // чтение файла конфигурации
383 void readConfiguration();
384 bool initIOItem( UniXML::iterator& it );
385 bool readItem( const std::shared_ptr<UniXML>& xml, UniXML::iterator& it, xmlNode* sec );
386 bool waitSM();
387 bool tryConnect(Channel* ch);
388 void initOutputs();
389
390 xmlNode* confnode = { nullptr };
391 timeout_t polltime = { 100 };
392 timeout_t updatetime = { 100 };
394 typedef std::vector< std::shared_ptr<OPCAttribute> > IOList;
395 IOList iolist;
396 size_t maxItem = { 0 };
397 size_t maxReadItems = { 0 };
398 size_t maxWriteItems = { 0 };
399
400 struct Channel
401 {
402 size_t num;
403 size_t idx;
404 std::shared_ptr<OPCUAClient> client;
405 uniset::Trigger trStatus;
406 uniset::PassiveTimer ptTimeout;
407 std::atomic_bool status = { false };
408 std::string addr;
409 std::string user;
410 std::string pass;
411 std::unordered_map<Tick, std::shared_ptr<ReadGroup>> readValues;
412 std::unordered_map<Tick, std::shared_ptr<WriteGroup>> writeValues;
414 IOController::IOStateList::iterator respond_it;
415 std::atomic_bool needSubscription = { false }; // Требуется создать подписку
416 };
417 Channel channels[numChannels];
418 uniset::Trigger noConnections;
419 std::atomic_uint32_t currentChannel = { 0 };
420
421 uniset::timeout_t reconnectPause = { 10000 };
422 int filtersize = { 0 };
423 float filterT = { 0.0 };
424
425 std::string s_field;
426 std::string s_fvalue;
427 std::optional<std::regex> s_fvalue_re;
428
429 std::shared_ptr<SMInterface> shm;
430 std::string prop_prefix;
431 const std::string argprefix;
432
433 PassiveTimer ptHeartBeat;
434 uniset::ObjectId sidHeartBeat;
435 int maxHeartBeat = { 10 };
436 IOController::IOStateList::iterator itHeartBeat;
437
438 bool force = { false };
439 bool force_out = { false };
440 bool writeToAllChannels = { false };
441 timeout_t smReadyTimeout = { 15000 };
442 bool enableSubscription = {false};
443 double publishingInterval = { 0.0 };
444 double samplingInterval = { -1.0 };
445 uint16_t timeoutIterate = {100};
446 uint16_t stopOnError = {0U};
447 std::atomic<uint32_t> connectCount = {0U};
448 std::atomic_bool subscription_ok = {false};
449 std::atomic_bool newSessionActivated = {false};
450
451 std::atomic_bool activated = { false };
452 std::atomic_bool cancelled = { false };
453 std::atomic_bool readconf_ok = { false };
454
455 int activateTimeout;
456 uniset::ObjectId sidTestSMReady = { uniset::DefaultObjectId };
458 IOController::IOStateList::iterator itRespond;
459
460 std::shared_ptr<LogAgregator> loga;
461 std::shared_ptr<DebugStream> opclog;
462 std::shared_ptr<LogServer> logserv;
463 std::string logserv_host = {""};
464 int logserv_port = {0};
465
466 std::shared_ptr< ThreadCreator<OPCUAExchange> > thrChannel[numChannels];
467
469 IOController::IOStateList::iterator itExchangeMode;
470 std::atomic<long> exchangeMode = { emNone };
471 std::atomic<long> sensorExchangeMode = { emNone };
473 VMonitor vmon;
474
475 struct ErrorKey
476 {
477 size_t channel;
478 std::string operation;
479 UA_StatusCode statusCode;
480 std::string nodeid;
481
482 bool operator==( const ErrorKey& other ) const noexcept
483 {
484 return channel == other.channel &&
485 statusCode == other.statusCode &&
486 operation == other.operation &&
487 nodeid == other.nodeid;
488 }
489 };
490
492 {
493 size_t operator()( const ErrorKey& k ) const noexcept
494 {
495 size_t h1 = std::hash<size_t> {}(k.channel);
496 size_t h2 = std::hash<UA_StatusCode> {}(k.statusCode);
497 size_t h3 = std::hash<std::string> {}(k.operation);
498 size_t h4 = std::hash<std::string> {}(k.nodeid);
499 return ((h1 ^ (h2 << 1)) ^ (h3 << 2)) ^ (h4 << 3);
500 }
501 };
502
503 // Диагностика: кольцевой буфер ошибок (с дедупликацией)
504 mutable std::list<ErrorRecord> errorHistory;
505 mutable std::unordered_map<ErrorKey, std::list<ErrorRecord>::iterator, ErrorKeyHash> errorHistoryIndex;
506 size_t errorHistoryMax { 100 };
507 mutable std::mutex errorHistoryMutex;
508
509 // Счётчики ошибок (не сбрасываются)
510 std::atomic<uint64_t> totalReadErrors { 0 };
511 std::atomic<uint64_t> totalWriteErrors { 0 };
512 std::atomic<uint64_t> totalConnectionLosses { 0 };
513
514 // Время запуска для uptime
515 std::chrono::steady_clock::time_point startTime;
516
517 private:
518 void doCreateSubscription(int nchannel);
519 };
520 // --------------------------------------------------------------------------
521} // end of namespace uniset
522// -----------------------------------------------------------------------------
523#endif // OPCUAExchange_H_
524// -----------------------------------------------------------------------------
Definition OPCUAExchange.h:187
timeout_t smReadyTimeout
Definition OPCUAExchange.h:441
bool writeToAllChannels
Definition OPCUAExchange.h:440
uniset::ObjectId sidExchangeMode
Definition OPCUAExchange.h:468
bool force
Definition OPCUAExchange.h:438
timeout_t updatetime
Definition OPCUAExchange.h:392
virtual bool activateObject() override
Активизация объекта (переопределяется для необходимых действий после активизации)
Definition OPCUAExchange.cc:1465
bool httpControlAllow
Definition OPCUAExchange.h:379
bool force_out
Definition OPCUAExchange.h:439
uint16_t stopOnError
Definition OPCUAExchange.h:446
xmlNode * confnode
Definition OPCUAExchange.h:390
std::atomic< long > sensorExchangeMode
Definition OPCUAExchange.h:471
virtual bool deactivateObject() override
Деактивация объекта (переопределяется для необходимых действий при завершении работы)
Definition OPCUAExchange.cc:1479
IOList iolist
Definition OPCUAExchange.h:395
std::atomic< uint32_t > connectCount
Definition OPCUAExchange.h:447
timeout_t polltime
Definition OPCUAExchange.h:391
StopMode
Definition OPCUAExchange.h:295
@ smFirstOnly
Definition OPCUAExchange.h:297
@ smAny
Definition OPCUAExchange.h:298
@ smNone
Definition OPCUAExchange.h:296
ExchangeMode
Definition OPCUAExchange.h:284
@ emSkipExchange
Definition OPCUAExchange.h:289
@ emNone
Definition OPCUAExchange.h:285
@ emWriteOnly
Definition OPCUAExchange.h:286
@ emSkipSaveToSM
Definition OPCUAExchange.h:288
@ emReadOnly
Definition OPCUAExchange.h:287
std::atomic< long > exchangeMode
Definition OPCUAExchange.h:470
std::atomic_bool httpControlActive
Definition OPCUAExchange.h:380
size_t errorHistoryMax
Definition OPCUAExchange.h:506
Пассивный таймер
Definition PassiveTimer.h:94
Definition MessageType.h:127
Definition MessageType.h:171
Definition MessageType.h:214
Definition Trigger.h:31
Definition USingleProcess.h:28
Definition UniSetObject.h:80
Definition UniXML.h:44
Definition VMonitor.h:117
Definition Mutex.h:32
Definition AccessConfig.h:30
const ObjectId DefaultObjectId
Definition UniSetTypes.h:71
long ObjectId
Definition UniSetTypes_i.idl:30
Definition IOBase.h:35
UniversalIO::IOType stype
Definition IOBase.h:103
Definition OPCUAExchange.h:401
Definition OPCUAExchange.h:492
Definition OPCUAExchange.h:476
Definition OPCUAExchange.h:304
Definition OPCUAExchange.h:250
Definition OPCUAExchange.h:266
Definition OPCUAExchange.h:226
Definition OPCUAExchange.h:215
Definition OPCUAExchange.h:220
Definition UniSetTypes_i.idl:65
Definition UHttpRequestHandler.h:87