diff --git a/cwEuCon.cpp b/cwEuCon.cpp index 69cd9c9..622c43d 100644 --- a/cwEuCon.cpp +++ b/cwEuCon.cpp @@ -16,6 +16,8 @@ #include "dns_sd/dns_sd_const.h" +#define HEART_BEAT "\x04\x00\x00\x00" + #define RESPONSE_1 "\x0a\x00\x00\x00\x6d\x62\x70\x31\x39\x00\x00\x00\x44\x45" \ "\x00\x00\x78\x3f\x39\xe5\xfe\x7f\x00\x00\xc0\x32\x17\x49\xe2\x7f" \ "\x00\x00\xe0\x7d\x2e\x01\x00\x60\x00\x00\x00\x00\x00\x00\x00\x00" \ @@ -320,7 +322,8 @@ namespace cw unsigned sockUserId; unsigned protoState; unsigned cbCnt; - time::spec_t t0; + time::spec_t nextRecvHbTs; + time::spec_t nextSendHbTs; struct fbank_str* link; uint32_t remoteAddr; } fbank_t; @@ -332,7 +335,8 @@ namespace cw fbank_t* fbankL; // List of fader banks unsigned maxFaderBankN; // maximum number of fader banks unsigned sockTimeOutMs; // socket time out - unsigned faderTcpPort; // Fader TCP port TODO: we shouuld be getting this from the MDNS SRV record + unsigned faderTcpPort; // Fader TCP port TODO: we shouuld be getting this from the MDNS SRV record + unsigned heartBeatPeriodMs; } eucon_t; inline eucon_t* _handleToPtr( handle_t h ) @@ -374,14 +378,29 @@ namespace cw return rc; } - rc_t _send_response( fbank_t* fb, const char* packet, unsigned packetN ) + rc_t _disconnect( fbank_t* fb ) + { + rc_t rc = kOkRC; + + if((rc = sock::destroy(fb->eucon->sockMgrH, fb->sockUserId)) != kOkRC ) + rc = cwLogError(rc,"Socket destroy failed on disconnect attempt on fader bank index:%i.",fb->fbIndex); + + fb->protoState = kSendHandshake_0_Id; + + return rc; + + } + + rc_t _send_response( fbank_t* fb, const char* buf, unsigned bufByteN ) { rc_t rc = kOkRC; // send the initial handshake - if((rc = sock::send( fb->eucon->sockMgrH, fb->sockUserId, kInvalidId, packet, packetN )) != kOkRC ) + if((rc = sock::send( fb->eucon->sockMgrH, fb->sockUserId, kInvalidId, buf, bufByteN )) != kOkRC ) { - rc = cwLogError(rc,"TCP '%s' send failed on fader bank index:%i.",fb->fbIndex); + rc = cwLogError(rc,"TCP '%s' send failed on fader bank index:%i. Disconnecting.",fb->fbIndex); + + _disconnect(fb); } return rc; @@ -515,11 +534,21 @@ namespace cw break; case kResponse_4_B_Id: - _send_response(fb,RESPONSE_4_B,sizeof(RESPONSE_4_B)-1); + _send_response(fb,RESPONSE_4_B,sizeof(RESPONSE_4_B)-1); + fb->protoState = kRunning_Id; + // set the initial next heart-beat times for this fader bank + time::futureMs(fb->nextSendHbTs,fb->eucon->heartBeatPeriodMs); + time::futureMs(fb->nextRecvHbTs,fb->eucon->heartBeatPeriodMs*2); break; case kRunning_Id: + + // if this is a heart-beat + if( ((uint8_t*)data)[0] == 0x03 ) + time::futureMs(fb->nextRecvHbTs,fb->eucon->heartBeatPeriodMs+1000); + + printf("%i : Rcv: %i : ",fb->fbIndex, dataByteCnt ); for(unsigned i=0; icbCnt+=1; - - if( fb->cbCnt % 20 == 0 ) - { - time::spec_t t1; - time::get(t1); - //unsigned ms = time::elapsedMs( &p->t0, &t1 ); - //printf("cb: %i %i\n",p->cbCnt,ms); - } } rc_t _on_McMix_DNS_SD_TXT( eucon_t* p, const char* numberText, unsigned numberTextCharN, const struct sockaddr_in* fromAddr ) @@ -581,7 +601,7 @@ namespace cw return cwLogError(rc,"The TCP socket for fader bank index %i failed. ", fbIndex); fb->remoteAddr = fromAddr->sin_addr.s_addr; - + // Send the initial handshake to the fader bank _sendHandshake_0( fb ); @@ -662,9 +682,10 @@ cw::rc_t cw::eucon::create( handle_t& hRef, const args_t& args ) goto errLabel; } - p->maxFaderBankN = args.maxFaderBankN; - p->sockTimeOutMs = args.sockTimeOutMs; - p->faderTcpPort = args.faderTcpPort; + p->maxFaderBankN = args.maxFaderBankN; + p->sockTimeOutMs = args.sockTimeOutMs; + p->faderTcpPort = args.faderTcpPort; + p->heartBeatPeriodMs = args.heartBeatPeriodMs; hRef.set(p); errLabel: @@ -692,10 +713,12 @@ cw::rc_t cw::eucon::destroy( handle_t& hRef ) cw::rc_t cw::eucon::exec( handle_t h, unsigned sockTimeOutMs ) { - rc_t rc = kOkRC; - eucon_t* p = _handleToPtr(h); - unsigned totalReadByteN = 0; - + rc_t rc = kOkRC; + eucon_t* p = _handleToPtr(h); + unsigned totalReadByteN = 0; + time::spec_t t0; + + // poll sockets for incoming EuCon messages if((rc = sock::receive_all(p->sockMgrH, sockTimeOutMs, totalReadByteN )) != kOkRC ) return rc; @@ -707,9 +730,30 @@ cw::rc_t cw::eucon::exec( handle_t h, unsigned sockTimeOutMs ) break; default: - cwLogError(rc,"EuCon network receive failed."); + rc = cwLogError(rc,"EuCon network receive failed."); }; - + + // get the current time + time::get(t0); + + // check the health of each fader bank + for(fbank_t* fb = p->fbankL; fb!=nullptr; fb=fb->link) + if( fb->protoState == kRunning_Id ) + { + // has it been more than 'heartBeatPerioMs' millisecnods since we received a heart beat from this fader bank + if( time::isGTE( t0, fb->nextRecvHbTs ) ) + { + cwLogInfo("Missed heart-beat disconnecting fader bank index %i.",fb->fbIndex ); + _disconnect(fb); + } + + // is it time to send a heart-beat to this fader bank + if( time::isGTE( t0, fb->nextSendHbTs ) ) + { + _send_response( fb, HEART_BEAT, sizeof(HEART_BEAT)-1 ); + time::futureMs(fb->nextSendHbTs,fb->eucon->heartBeatPeriodMs); + } + } return rc; @@ -820,7 +864,7 @@ cw::rc_t cw::eucon::test() args.maxFaderBankN = 8; args.faderTcpPort = 49168; args.maxSockN = 50; - + args.heartBeatPeriodMs = 4000; if((rc = appBegin(args,app)) != kOkRC ) return 1; diff --git a/cwEuCon.h b/cwEuCon.h index 332f2a0..51bf60b 100644 --- a/cwEuCon.h +++ b/cwEuCon.h @@ -23,6 +23,7 @@ namespace cw sock::portNumber_t faderTcpPort; // Fader TCP port (e.g. 49168) unsigned maxSockN; // maximum number of socket to allow in the socket manager unsigned maxFaderBankN; // maximum number of fader banks to support + unsigned heartBeatPeriodMs; // time between heart-beat messages from/to the fader banks } args_t; // Create the EuCon simulation manager.