CoreDX DDS Content Filter Example in C++
Here is an example demonstrating the use of Content Filters in 'C++'.
This example consists of two applications: one application that publishes data on a Topic that includes a string, and two long integer values (x and count); and one application that subscribes to this topic with a filter on the 'x' content field.
The source code presented here is the 'C++' programming language version of the applications. It can interoperate with 'filter' applications written in other languages, or running on different hardware platforms.
file: filter.ddl
struct FilterMsg { string msg; long x; long count; };
file: filter_pub.cc
/**************************************************************** * * file: filter_pub.c * desc: Provides a simple C++ ContentFilter example * This publishing application will send data * to the example 'filter_sub' subscribing * application. * **************************************************************** * * This file is provided by Twin Oaks Computing, Inc * as an example. It is provided in the hope that it will be * useful but WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * PURPOSE. TOC Inc assumes no liability or responsibilty for * the use of this information for any purpose. * ****************************************************************/ #include#include #ifdef _WIN32 #include #else #include #endif #include #include "filter.hh" #include "filterTypeSupport.hh" #include "filterDataWriter.hh" #ifdef _WIN32 # define SLEEP(d) Sleep((d)*1000) #else # define SLEEP(d) sleep((d)) #endif /**************************************************************** * main() * * Perform CoreDX DDS setup activities: * - create a Domain Participant * - create a Publisher * - register the FilterMsg data type * - create a Topic * - create a DataWriter * Write data ****************************************************************/ int main(int argc, char * argv[]) { DomainParticipant * domain; Publisher * publisher; Topic * topic; DataWriter * dw; FilterMsg filterMsg; ReturnCode_t retval; class DomainParticipantFactory * dpf = DomainParticipantFactory::get_instance(); /* create a DomainParticipant */ domain = dpf->create_participant( 0, PARTICIPANT_QOS_DEFAULT, NULL, 0 ); if ( domain == NULL ) { printf("ERROR creating domain participant.\n"); return -1; } /* create a Publisher */ publisher = domain->create_publisher(PUBLISHER_QOS_DEFAULT, NULL, 0 ); if ( publisher == NULL ) { printf("ERROR creating publisher.\n"); return -1; } /* Register the data type with the CoreDX middleware. * This is required before creating a Topic with * this data type. */ FilterMsgTypeSupport filterMsgTS; retval = filterMsgTS.register_type( domain, NULL ); if (retval != RETCODE_OK) { printf("ERROR registering type\n"); return -1; } /* Create a DDS Topic with the FilterMsg data type. */ topic = domain->create_topic("TestTopic", "FilterMsg", TOPIC_QOS_DEFAULT, NULL, 0 ); if ( topic == NULL ) { printf("ERROR creating topic.\n"); return -1; } /* Create a DataWriter on the filter topic, with * default QoS settings and no listeners. */ dw = publisher->create_datawriter( topic, DATAWRITER_QOS_DEFAULT, NULL, 0 ); if (dw == NULL) { printf("ERROR creating data writer\n"); return -1; } /* Initialize the data to send. The FilterMsg data type * has just one filter member. * Note: Alwyas initialize a filter member with * allocated memory -- the destructor will free * all filter members. */ filterMsg.msg = new char[strlen("Hello WORLD from C++!")+1]; strcpy(filterMsg.msg, "Hello WORLD from C++!"); filterMsg.x = 0; filterMsg.count = 0; while ( 1 ) { ReturnCode_t ret = dw->write ( &filterMsg, HANDLE_NIL ); printf("wrote sample: x: %d count: %d\n", filterMsg.x, filterMsg.count); fflush(stdout); if ( ret != RETCODE_OK) { printf("ERROR writing sample\n"); return -1; } SLEEP(1); filterMsg.x = (filterMsg.x + 1)%15; filterMsg.count ++; } /* Cleanup */ retval = domain -> delete_contained_entities(); if ( retval != DDS_RETCODE_OK ) printf("ERROR (%s): Unable to cleanup DDS entities\n", DDS_error(retval)); return 0; }
file: filter_sub.cc
/**************************************************************** * * file: filter_sub.c * desc: Provides a simple C++ content filter example. * This subscribing application will receive data * from the example 'filter_pub' publishing * application. * **************************************************************** * * This file is provided by Twin Oaks Computing, Inc * as an example. It is provided in the hope that it will be * useful but WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * PURPOSE. TOC Inc assumes no liability or responsibilty for * the use of this information for any purpose. * ****************************************************************/ #include#ifdef _WIN32 #include #else #include #endif #include #include "filter.hh" #include "filterTypeSupport.hh" #include "filterDataReader.hh" #ifdef _WIN32 # define SLEEP(d) Sleep((d)*1000) #else # define SLEEP(d) sleep((d)) #endif int all_done = 0; /**************************************************************** * Construct a DataReaderListener and override the * on_data_available() method with our own. All other * listener methods will be default (no-op) functions. ****************************************************************/ class DRListener : public DataReaderListener { public: void on_data_available( DataReader * dr ); }; /**************************************************************** * DataReader Listener Method: on_data_avail() * * This listener method is called when data is available to * be read on this DataReader. ****************************************************************/ void DRListener::on_data_available( DataReader * dr) { FilterMsgPtrSeq samples; SampleInfoSeq samples_info; ReturnCode_t retval; SampleStateMask ss = DDS_ANY_SAMPLE_STATE; ViewStateMask vs = DDS_ANY_VIEW_STATE; InstanceStateMask is = DDS_ANY_INSTANCE_STATE; /* Convert to our type-specific DataReader */ FilterMsgDataReader * reader = FilterMsgDataReader::narrow( dr ); /* Take any and all available samples. The take() operation * will remove the samples from the DataReader so they * won't be available on subsequent read() or take() calls. */ retval = reader->take( &samples, &samples_info, LENGTH_UNLIMITED, ss, vs, is ); if ( retval == RETCODE_OK ) { /* iterrate through the samples */ for ( unsigned int i = 0;i < samples.size(); i++) { /* If this sample does not contain valid data, * it is a dispose or other non-data command, * and, accessing any member from this sample * would be invalid. */ if ( samples_info[i]->valid_data) printf("Sample Received: msg: %s x: %d count: %d\n", samples[i]->msg, samples[i]->x, samples[i]->count); } fflush(stdout); /* read() and take() always "loan" the data, we need to * return it so CoreDX can release resources associated * with it. */ reader->return_loan( &samples, &samples_info ); } else { printf("ERROR (%s) taking samples from DataReader\n", DDS_error(retval)); } } /**************************************************************** * main() * * Perform CoreDX DDS setup activities: * - create a Domain Participant * - create a Subscriber * - register the FilterMsg data type * - create a Topic * - create a DataReader and attach the listener created above * And wait for data ****************************************************************/ int main(int argc, char * argv[]) { DomainParticipant * domain; Subscriber * subscriber; Topic * topic; ContentFilteredTopic* filtered_topic; StringSeq filter_params; DataReader * dr; DRListener drListener; ReturnCode_t retval; int count = 0; /* Get an instance of the DDS DomainPartiticpantFactory -- * we will use this to create a DomainParticipant. */ DomainParticipantFactory * dpf = DomainParticipantFactory::get_instance(); /* create a DomainParticipant */ domain = dpf->create_participant( 0, PARTICIPANT_QOS_DEFAULT, NULL, 0 ); if ( domain == NULL ) { printf("ERROR creating domain participant.\n"); return -1; } /* create a Subscriber */ subscriber = domain->create_subscriber(SUBSCRIBER_QOS_DEFAULT, NULL, 0 ); if ( subscriber == NULL ) { printf("ERROR creating subscriber\n"); return -1; } /* Register the data type with the CoreDX middleware. * This is required before creating a Topic with * this data type. */ FilterMsgTypeSupport filterMsgTS; retval = filterMsgTS.register_type( domain, NULL ); if (retval != RETCODE_OK) { printf("ERROR (%s) registering type\n", DDS_error(retval)); return -1; } /* create a DDS Topic with the FilterMsg data type. */ topic = domain->create_topic( "TestTopic", "FilterMsg", TOPIC_QOS_DEFAULT, NULL, 0 ); if ( ! topic ) { printf("ERROR creating topic\n"); return -1; } /* create a ContentFilteredTopic associated with 'topic' */ filtered_topic = domain->create_contentfilteredtopic( "FilteredTestTopic", topic, "x>%0 and x<%1", filter_params ); if ( ! filtered_topic ) { printf("ERROR creating filtered topic\n"); return -1; } /* initialize the filter parameters */ filter_params.push_back((char*)"0"); filter_params.push_back((char*)"5"); filtered_topic->set_expression_parameters(filter_params); printf("Set Filter: x>0 and x<5\n"); /* create a DDS_DataReader on the filter topic (notice * the TopicDescription is used) with default QoS settings, * and attach our listener with our on_data_available method. */ dr = subscriber->create_datareader( (TopicDescription*)filtered_topic, DATAREADER_QOS_DEFAULT, &drListener, DATA_AVAILABLE_STATUS ); if ( ! dr ) { printf("ERROR creating data reader\n"); return -1; } /* Wait forever. When data arrives at our DataReader, * our dr_on_data_avilalbe method will be invoked. */ while ( !all_done ) { SLEEP(1); count ++; if ((count%10)==0) /* multiple of 10 */ { filter_params[0] = (char*)"0"; filter_params[1] = (char*)"5"; filtered_topic->set_expression_parameters(filter_params); printf("Set Filter: x>0 and x<5\n"); } else if ((count%5)==0) /* multiple of 5 */ { filter_params[0] = (char*)"5"; filter_params[1] = (char*)"10"; filtered_topic->set_expression_parameters(filter_params); printf("Set Filter: x>5 and x<10\n"); } fflush(stdout); } /* Cleanup */ retval = domain -> delete_contained_entities(); if ( retval != DDS_RETCODE_OK ) printf("ERROR (%s): Unable to cleanup DDS entities\n", DDS_error(retval)); return 0; }