cwEucon.h/cpp : Initial updates to support fader bank array server.

This commit is contained in:
kevin.larke 2020-04-03 10:36:10 -04:00
parent 6d623c3a2a
commit f2bfc13419
3 changed files with 318 additions and 208 deletions

View File

@ -15,6 +15,7 @@
#include <utility> // std::forward #include <utility> // std::forward
#include <limits> // std::numeric_limits< #include <limits> // std::numeric_limits<
#include <atomic> #include <atomic>
#include <cstdint>
#if defined(OS_LINUX) || defined(OS_OSX) #if defined(OS_LINUX) || defined(OS_OSX)
#define cwPOSIX_FILE_SYS #define cwPOSIX_FILE_SYS

View File

@ -3,16 +3,17 @@
#include "cwLog.h" #include "cwLog.h"
#include "cwCommonImpl.h" #include "cwCommonImpl.h"
#include "cwMem.h" #include "cwMem.h"
#include "cwTime.h" #include "cwTime.h"
#include "cwThread.h" #include "cwThread.h"
#include "cwTcpSocket.h" #include "cwSocket.h"
#include "cwTcpSocketSrv.h"
#include "cwUtility.h" #include "cwUtility.h"
#include "cwEuCon.h" #include "cwEuCon.h"
#include "cwText.h"
#include "cwNumericConvert.h"
#include "dns_sd/dns_sd_const.h" #include "dns_sd/dns_sd_const.h"
#define RESPONSE_1 "\x0a\x00\x00\x00\x6d\x62\x70\x31\x39\x00\x00\x00\x44\x45" \ #define RESPONSE_1 "\x0a\x00\x00\x00\x6d\x62\x70\x31\x39\x00\x00\x00\x44\x45" \
@ -302,7 +303,7 @@ namespace cw
enum enum
{ {
kSendHandshake_0_Id, // send [0x0a, ...] kSendHandshake_0_Id, // send [0x0a, ...]
kWaitForHandshake_1_Id, // wait for first heart beat -> then send [0x0c ...] kWaitForHandshake_1_Id, // wait for first heart beat -> then send [0x0c ...]
kWaitForHandshake_2_Id, // wait for [0x0d ...] -> then send response_3_a kWaitForHandshake_2_Id, // wait for [0x0d ...] -> then send response_3_a
kResponse_3_A_Id, kResponse_3_A_Id,
kResponse_3_B_Id, kResponse_3_B_Id,
@ -310,177 +311,203 @@ namespace cw
kResponse_4_B_Id, kResponse_4_B_Id,
kRunning_Id kRunning_Id
}; };
struct eucon_str;
typedef struct fbank_str
{
struct eucon_str* eucon;
unsigned fbIndex;
unsigned sockUserId;
unsigned protoState;
unsigned cbCnt;
time::spec_t t0;
struct fbank_str* link;
uint32_t remoteAddr;
} fbank_t;
typedef struct eucon_str typedef struct eucon_str
{ {
srv::handle_t udpH; sock::handle_t sockMgrH;
srv::handle_t tcpH; fbank_t* fbankL;
unsigned udpRecvBufByteN; unsigned maxFaderBankN;
unsigned tcpRecvBufByteN; unsigned sockTimeOutMs;
socket::portNumber_t servicePort;
unsigned protoState;
unsigned cbCnt;
time::spec_t t0;
} eucon_t; } eucon_t;
inline eucon_t* _handleToPtr( handle_t h ) inline eucon_t* _handleToPtr( handle_t h )
{ return handleToPtr<handle_t,eucon_t>(h); } { return handleToPtr<handle_t,eucon_t>(h); }
rc_t _destroyFBank( eucon_t* p, fbank_t* fb )
{
rc_t rc = kOkRC;
if((rc = sock::destroy(p->sockMgrH, fb->sockUserId )) != kOkRC )
rc = cwLogError(rc,"Fader bank (index:%i) destroy failed.", fb->fbIndex );
return rc;
}
rc_t _destroy( eucon_t* p) rc_t _destroy( eucon_t* p)
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
rc_t rc0;
fbank_t* fb = p->fbankL;
for(; fb!=nullptr; fb=fb->link)
if((rc0 = _destroyFBank(p,fb)) != kOkRC )
rc = rc0;;
srv::destroy(p->udpH); if( p->sockMgrH.isValid() )
srv::destroy(p->tcpH); if((rc = sock::destroyMgr(p->sockMgrH)) != kOkRC )
return rc;
mem::release(p); mem::release(p);
return rc; return rc;
} }
rc_t _init( eucon_t* p ) rc_t _send_response( fbank_t* fb, const char* packet, unsigned packetN )
{
rc_t rc = kOkRC;
return rc;
}
rc_t _send_response( socket::handle_t sockH, const char* packet, unsigned packetN )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
// send the initial handshake // send the initial handshake
if((rc = socket::send( sockH, packet, packetN )) != kOkRC ) if((rc = sock::send( fb->eucon->sockMgrH, fb->sockUserId, kInvalidId, packet, packetN )) != kOkRC )
{ {
rc = cwLogError(rc,"TCP '%s' send failed."); rc = cwLogError(rc,"TCP '%s' send failed on fader bank index:%i.",fb->fbIndex);
} }
return rc; return rc;
} }
rc_t _sendHandshake_0( socket::handle_t sockH, const char* label ) rc_t _sendHandshake_0( fbank_t* fb )
{ {
return _send_response(sockH,RESPONSE_1,sizeof(RESPONSE_1)-1); printf("%i : Sent HS-0: 0x0a\n", fb->fbIndex);
return _send_response(fb,RESPONSE_1,sizeof(RESPONSE_1)-1);
} }
rc_t _sendHandshake_1( socket::handle_t sockH ) rc_t _sendHandshake_1( fbank_t* fb )
{ return _send_response(fb,RESPONSE_2,sizeof(RESPONSE_2)-1); }
fbank_t* _createFBank( eucon_t* p, unsigned fbIndex )
{ {
return _send_response(sockH,RESPONSE_2,sizeof(RESPONSE_2)-1); fbank_t* fb = mem::allocZ<fbank_t>();
fb->eucon = p;
fb->fbIndex = fbIndex;
fb->sockUserId = kBaseSockUserId + fbIndex;
fb->protoState = kSendHandshake_0_Id;
fb->link = p->fbankL;
p->fbankL = fb;
return fb;
} }
fbank_t* _fbIndexToFBank( eucon_t* p, unsigned fbIndex, bool showErrorFl=true )
void _udpReceiveCallback( void* arg, const void* data, unsigned dataByteCnt, const struct sockaddr_in* fromAddr )
{ {
rc_t rc = kOkRC; fbank_t* fb = p->fbankL;
eucon_t* p = (eucon_t*)arg; for(; fb!=nullptr; fb=fb->link)
const uint16_t* u = (const uint16_t*)data; if( fb->fbIndex == fbIndex )
return fb;
// if this is a DNS-SD reply if( showErrorFl )
if( cwIsFlag(ntohs(u[1]), kReplyHdrDnsFl ) ) cwLogError(kInvalidId,"Fader bank index %i is not valid.", fbIndex );
{
const char* name = (const char*)(u+6); return nullptr;
const char* label = "MC Mix - 1";
printf("%.*s|%li\n", name[0], name+1, strlen(label) );
// if this a 'MC Mix' DNS-SD SRV reply
if( strncmp(name+1, label, name[0]) == 0 )
{
// get the address of the advertising 'MC Mix'
char addrBuf[ INET_ADDRSTRLEN+1 ];
if(socket::addrToString( fromAddr, addrBuf, INET_ADDRSTRLEN ) == kOkRC )
{
socket::handle_t sockH = socketHandle(p->tcpH);
// if the socket is not connected
if( socket::isConnected(sockH) == false )
{
// Connect to the 'MC Mix' service
if((rc = socket::connect( sockH, addrBuf, p->servicePort )) != kOkRC )
{
cwLogError(rc,"Connection to '%s' service failed.", label );
}
else
{
printf("Connected.\n");
if( true )
{
struct sockaddr_in addr;
char aBuf[ INET_ADDRSTRLEN+1 ];
socket::peername( sockH, &addr );
socket::addrToString( &addr, aBuf, INET_ADDRSTRLEN);
printf("PEER: %i %s\n", addr.sin_port,aBuf );
}
// Send the initial handshake to the Surface
_sendHandshake_0( sockH, label );
printf("Sent HS-0: 0x0a\n");
// FENCE
p->protoState = kWaitForHandshake_1_Id;
}
}
}
}
}
} }
void _tcpReceiveCallback( void* arg, const void* data, unsigned dataByteCnt, const struct sockaddr_in* fromAddr ) fbank_t* _ipAddrToFBank( eucon_t* p, uint32_t ipAddr, bool showErrorFl=true )
{
fbank_t* fb = p->fbankL;
for(; fb!=nullptr; fb=fb->link)
if( fb->remoteAddr == ipAddr )
return fb;
if( showErrorFl )
cwLogError(kInvalidId,"Fader bank with dest. addr 0x%x was not found.", ipAddr );
return nullptr;
}
void _tcpCallback( void* arg, sock::cbOpId_t cbOpId, unsigned userId, unsigned connId, const void* data, unsigned dataByteCnt, const struct sockaddr_in* fromAddr )
{ {
eucon_t* p = static_cast<eucon_t*>(arg); eucon_t* p = static_cast<eucon_t*>(arg);
fbank_t* fb;
if( dataByteCnt > 0) switch( cbOpId )
{ {
if( dataByteCnt >= 4 ) case sock::kConnectCbId:
{ cwLogInfo("Connected: user:%i conn:%i", userId, connId );
//printHex(data,dataByteCnt); return;
unsigned hdr = *(const unsigned*)data; case sock::kDisconnectCbId:
cwLogInfo("Disconnected: user:%i conn:%i", userId, connId );
return;
switch( p->protoState ) case sock::kReceiveCbId:
{ break;
case kWaitForHandshake_1_Id:
if( hdr == 0x0b )
{
p->protoState = kWaitForHandshake_2_Id;
_sendHandshake_1( socketHandle(p->tcpH) );
printf("Rcvd (0x0b) HS 1 - sent 0x0c\n");
}
break;
case kWaitForHandshake_2_Id: default:
if( hdr == 0x0d ) cwLogError(kInvalidIdRC,"An invalid socket callback id (%i) was received.",cbOpId);
{ return;
p->protoState = kResponse_3_A_Id;
printf("Rcvd (0x0d) HS 2 - Sending setup data\n");
}
break;
}
}
} }
switch(p->protoState) // get the fader bank this message is intended for
if((fb = _ipAddrToFBank( p, fromAddr->sin_addr.s_addr )) == nullptr )
{
cwLogError(kOpFailRC,"Fader bank not found. TCP message not delivered.");
return;
}
if( data!=nullptr && dataByteCnt >= 4 )
{
//printHex(data,dataByteCnt);
unsigned hdr = *(const unsigned*)data;
switch( fb->protoState )
{
case kWaitForHandshake_1_Id:
if( hdr == 0x0b )
{
fb->protoState = kWaitForHandshake_2_Id;
_sendHandshake_1( fb );
printf("Rcvd (0x0b) HS 1 - sent 0x0c\n");
}
break;
case kWaitForHandshake_2_Id:
if( hdr == 0x0d )
{
fb->protoState = kResponse_3_A_Id;
printf("Rcvd (0x0d) HS 2 - Sending setup data\n");
}
break;
}
}
switch(fb->protoState)
{ {
case kResponse_3_A_Id: case kResponse_3_A_Id:
_send_response(socketHandle(p->tcpH),RESPONSE_3_A,sizeof(RESPONSE_3_A)-1); _send_response(fb,RESPONSE_3_A,sizeof(RESPONSE_3_A)-1);
p->protoState = kResponse_3_B_Id; fb->protoState = kResponse_3_B_Id;
break; break;
case kResponse_3_B_Id: case kResponse_3_B_Id:
_send_response(socketHandle(p->tcpH),RESPONSE_3_B,sizeof(RESPONSE_3_B)-1); _send_response(fb,RESPONSE_3_B,sizeof(RESPONSE_3_B)-1);
p->protoState = kResponse_4_A_Id; fb->protoState = kResponse_4_A_Id;
break; break;
case kResponse_4_A_Id: case kResponse_4_A_Id:
_send_response(socketHandle(p->tcpH),RESPONSE_4_A,sizeof(RESPONSE_4_A)-1); _send_response(fb,RESPONSE_4_A,sizeof(RESPONSE_4_A)-1);
p->protoState = kResponse_4_B_Id; fb->protoState = kResponse_4_B_Id;
break; break;
case kResponse_4_B_Id: case kResponse_4_B_Id:
_send_response(socketHandle(p->tcpH),RESPONSE_4_B,sizeof(RESPONSE_4_B)-1); _send_response(fb,RESPONSE_4_B,sizeof(RESPONSE_4_B)-1);
p->protoState = kRunning_Id; fb->protoState = kRunning_Id;
break; break;
case kRunning_Id: case kRunning_Id:
@ -491,82 +518,143 @@ namespace cw
} }
p->cbCnt+=1; fb->cbCnt+=1;
if( p->cbCnt % 20 == 0 ) if( fb->cbCnt % 20 == 0 )
{ {
time::spec_t t1; time::spec_t t1;
time::get(t1); time::get(t1);
//unsigned ms = time::elapsedMs( &p->t0, &t1 ); //unsigned ms = time::elapsedMs( &p->t0, &t1 );
//printf("cb: %i %i\n",p->cbCnt,ms); //printf("cb: %i %i\n",p->cbCnt,ms);
} }
} }
rc_t _on_McMix_DNS_SD_SRV( eucon_t* p, const char* numberText, unsigned numberTextCharN, const struct sockaddr_in* fromAddr )
{
rc_t rc = kOkRC;
unsigned tcpFlags = sock::kTcpFl | sock::kBlockingFl | sock::kStreamFl | sock::kReuseAddrFl | sock::kReusePortFl;
fbank_t* fb = nullptr;
unsigned fbIndex = 0;
sock::portNumber_t fbPort = fromAddr->sin_port;
char fbIP[ INET_ADDRSTRLEN+1 ];
// copy the "MC Mix" suffix number into a zero terminated string
char numbBuf[ numberTextCharN + 1 ];
strncpy(numbBuf,numberText,numberTextCharN);
numbBuf[numberTextCharN] = '\0';
// convert the "MC Mix" suffix to a number
if((rc = string_to_number<unsigned>(numbBuf,fbIndex)) != kOkRC )
return cwLogError(kSyntaxErrorRC,"The 'MC Mix' suffix number could not be parsed.");
// validate the value
if( !(0 < fbIndex && fbIndex <= p->maxFaderBankN ) )
return cwLogError(kInvalidArgRC,"The fader bank index %i is not valid.", fbIndex );
// convert fbIndex to a zero based index
fbIndex -= 1;
// If this fader bank does not already exist ....
if((fb = _fbIndexToFBank(p,fbIndex,false)) == nullptr )
{
// ... then create it
if((fb = _createFBank(p,fbIndex)) == nullptr )
return cwLogError(kOpFailRC,"The fader bank index %i failed.", fbIndex );
}
// convert the fromAddr to a string
if((rc = sock::addrToString( fromAddr, fbIP, INET_ADDRSTRLEN)) != kOkRC )
return cwLogError(rc,"IP address to string conversion failed.", fbIndex);
// create the TCP socket
if((rc = sock::create( p->sockMgrH, fb->sockUserId, sock::kInvalidPortNumber, tcpFlags, p->sockTimeOutMs, _tcpCallback, p, fbIP, fbPort )) != kOkRC )
return cwLogError(rc,"The TCP socket for fader bank index %i failed. ", fbIndex);
fb->remoteAddr = fromAddr->sin_addr.s_addr;
// Send the initial handshake to the fader bank
_sendHandshake_0( fb );
fb->protoState = kWaitForHandshake_1_Id;
return rc;
}
void _udpCallback( void* arg, sock::cbOpId_t cbId, unsigned userId, unsigned connId, const void* data, unsigned dataByteCnt, const struct sockaddr_in* fromAddr )
{
rc_t rc = kOkRC;
eucon_t* p = (eucon_t*)arg;
const uint16_t* u = (const uint16_t*)data;
// if this is a DNS-SD reply
if( cwIsFlag(ntohs(u[1]), kReplyHdrDnsFl ) )
{
const char* name = (const char*)(u+6);
const char* label = "MC Mix - ";
printf("%.*s|%li\n", name[0], name+1, strlen(label) );
// if this a 'MC Mix' DNS-SD SRV reply
if( strncmp(label, name+1, strlen(label)) == 0 )
{
unsigned n = strlen(label) + 1;
if((rc = _on_McMix_DNS_SD_SRV( p, name+n, name[0]-n, fromAddr )) != kOkRC )
cwLogError(rc,"%.*s initialization failed.",name[0],name+1);
}
}
}
} }
} }
} }
cw::rc_t cw::net::eucon::create( handle_t& hRef, socket::portNumber_t tcpPort, socket::portNumber_t servicePort, unsigned recvBufByteN, unsigned timeOutMs )
cw::rc_t cw::net::eucon::create( handle_t& hRef, const args_t& args )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
uint16_t mdnsPort = 5353; unsigned udpFlags = sock::kBlockingFl | sock::kReuseAddrFl | sock::kReusePortFl | sock::kMultiCastTtlFl | sock::kMultiCastLoopFl;
const char* mdnsIp = "224.0.0.251";
unsigned udpRecvBufByteN = recvBufByteN;
unsigned tcpRecvBufByteN = recvBufByteN;
unsigned udpTimeOutMs = timeOutMs;
unsigned tcpTimeOutMs = timeOutMs;
if((rc = destroy(hRef)) != kOkRC ) if((rc = destroy(hRef)) != kOkRC )
return rc; return rc;
eucon_t* p = mem::allocZ<eucon_t>(); eucon_t* p = mem::allocZ<eucon_t>();
p->udpRecvBufByteN = udpRecvBufByteN;
p->tcpRecvBufByteN = tcpRecvBufByteN; // create the socket manager
p->servicePort = servicePort; if((rc = sock::createMgr(p->sockMgrH, args.recvBufByteN, args.maxSockN)) != kOkRC )
p->protoState = kSendHandshake_0_Id; {
rc = cwLogError(rc,"Socket manager create failed during eucon mgr. allocation.");
// create the mDNS UDP socket server goto errLabel;
if((rc = srv::create( }
p->udpH,
mdnsPort, // create the MDNS/DNS SD UDP socket
socket::kBlockingFl | socket::kReuseAddrFl | socket::kReusePortFl | socket::kMultiCastTtlFl | socket::kMultiCastLoopFl, if((rc = sock::create( p->sockMgrH, kUdpSockUserId, args.mdnsPort, udpFlags, args.sockTimeOutMs, _udpCallback, p )) != kOkRC )
srv::kUseRecvFromFl, {
_udpReceiveCallback, rc = cwLogError(rc,"The Eucon controller UDP socket creation failed. ");
p, goto errLabel;
p->udpRecvBufByteN,
udpTimeOutMs,
NULL,
socket::kInvalidPortNumber )) != kOkRC )
{
return cwLogError(rc,"mDNS UDP socket create failed.");
} }
// add the mDNS socket to the multicast group // add the mDNS socket to the multicast group
if((rc = join_multicast_group( socketHandle(p->udpH), mdnsIp )) != kOkRC ) if((rc = join_multicast_group( p->sockMgrH, kUdpSockUserId, args.mdnsIP )) != kOkRC )
{
rc = cwLogError(rc,"The Eucon controller UDP socket could not be added to the multicast group. ");
goto errLabel; goto errLabel;
}
// set the TTL for multicast // set the TTL for multicast
if((rc = set_multicast_time_to_live( socketHandle(p->udpH), 255 )) != kOkRC ) if((rc = set_multicast_time_to_live( p->sockMgrH, kUdpSockUserId, 255 )) != kOkRC )
goto errLabel; {
rc = cwLogError(rc,"The multicast assignment to Eucon controller UDP socket failed. ");
// create the service TCP socket server
if((rc = srv::create(
p->tcpH,
tcpPort,
socket::kTcpFl | socket::kBlockingFl | socket::kStreamFl,
0,
_tcpReceiveCallback,
p,
p->tcpRecvBufByteN,
tcpTimeOutMs,
NULL,
socket::kInvalidPortNumber)) != kOkRC )
{
rc = cwLogError(rc,"mDNS TCP socket create failed.");
goto errLabel; goto errLabel;
} }
p->maxFaderBankN = args.maxFaderBankN;
p->sockTimeOutMs = args.sockTimeOutMs;
hRef.set(p); hRef.set(p);
errLabel: errLabel:
@ -592,60 +680,63 @@ cw::rc_t cw::net::eucon::destroy( handle_t& hRef )
} }
cw::rc_t cw::net::eucon::start( handle_t h ) cw::rc_t cw::net::eucon::exec( handle_t h, unsigned sockTimeOutMs )
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
eucon_t* p = _handleToPtr(h); eucon_t* p = _handleToPtr(h);
unsigned totalReadByteN = 0;
if((rc = sock::receive_all(p->sockMgrH, sockTimeOutMs, totalReadByteN )) != kOkRC )
return rc;
time::get(p->t0); switch( rc )
{
case kOkRC:
case kTimeOutRC:
rc = kOkRC;
break;
default:
cwLogError(rc,"EuCon network receive failed.");
};
if((rc = _init(p)) != kOkRC )
return rc;
// start the mDNS socket server
if((rc = srv::start( p->udpH )) != kOkRC )
return rc;
// start the TCP socket server
if((rc = srv::start( p->tcpH )) != kOkRC )
return rc;
return rc; return rc;
} }
bool quitFl = false;
void sighandler(int sig)
{
quitFl = true;
}
cw::rc_t cw::net::eucon::test() cw::rc_t cw::net::eucon::test()
{ {
rc_t rc = kOkRC; rc_t rc = kOkRC;
socket::portNumber_t tcpPort = 49170; handle_t h;
socket::portNumber_t defaultServicePort = 49168; args_t args;
const unsigned sbufN = 31;
unsigned recvBufByteN = 4096;
unsigned timeOutMs = 50;
handle_t h;
char sbuf[ sbufN+1 ];
// create the EuCon server args.recvBufByteN = 4096;
if((rc = create( h, tcpPort, defaultServicePort, recvBufByteN, timeOutMs )) != kOkRC ) args.mdnsIP = "224.0.0.251";
args.mdnsPort = 5353;
args.sockTimeOutMs = 50;
args.maxFaderBankN = 8;
args.maxSockN = 50;
cw::log::createGlobal();
// create the EuCon controller
if((rc = create( h, args )) != kOkRC )
return cwLogError(rc,"Unable to create EuCon server."); return cwLogError(rc,"Unable to create EuCon server.");
// start the EuCon server while( !quitFl )
if((rc = start( h )) != kOkRC )
goto errLabel;
while( true )
{ {
printf("? "); exec(h,args.sockTimeOutMs);
if( std::fgets(sbuf,sbufN,stdin) == sbuf )
{
if( strcmp(sbuf,"quit\n") == 0 )
break;
}
} }
errLabel:
rc = destroy(h); rc = destroy(h);
return rc; return rc;

View File

@ -7,11 +7,29 @@ namespace cw
{ {
namespace eucon namespace eucon
{ {
enum
{
kUdpSockUserId=1,
kTcpSockUserId=2,
kBaseSockUserId=3
};
typedef handle<struct eucon_str> handle_t; typedef handle<struct eucon_str> handle_t;
rc_t create( handle_t& hRef, socket::portNumber_t tcpPort, socket::portNumber_t servicePort, unsigned recvBufByteN, unsigned timeOutMs ); typedef struct args_str
{
unsigned recvBufByteN;
const char* mdnsIP;
sock::portNumber_t mdnsPort;
unsigned sockTimeOutMs;
sock::portNumber_t tcpPort;
unsigned maxSockN;
unsigned maxFaderBankN;
} args_t;
rc_t create( handle_t& hRef, const args_t& a );
rc_t destroy( handle_t& hRef ); rc_t destroy( handle_t& hRef );
rc_t start( handle_t h ); rc_t exec( handle_t h, unsigned sockTimeOutMs );
rc_t test(); rc_t test();
} }
} }