29#ifndef ETL_MESSAGE_BROKER_INCLUDED
30#define ETL_MESSAGE_BROKER_INCLUDED
34#include "message_types.h"
36#include "message_router.h"
51 class subscription_node
64 void set_next(subscription_node* sub)
70 subscription_node* get_next()
const
78 set_next(ETL_NULLPTR);
82 void append(subscription_node* sub)
84 if (sub != ETL_NULLPTR)
86 sub->set_next(get_next());
91 subscription_node* p_next;
131 using etl::imessage_router::receive;
176 initialise_insertion_point(new_sub.get_router(), &new_sub);
182 initialise_insertion_point(&router, ETL_NULLPTR);
188 receive(etl::imessage_router::ALL_MESSAGE_ROUTERS, msg);
193 receive(etl::imessage_router::ALL_MESSAGE_ROUTERS, shared_msg);
197 virtual void receive(etl::message_router_id_t destination_router_id,
205 subscription* sub =
static_cast<subscription*
>(head.get_next());
207 while (sub != ETL_NULLPTR)
209 message_id_span_t message_ids = sub->message_id_list();
211 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(),
id);
213 if (itr != message_ids.end())
217 if (destination_router_id == etl::imessage_router::ALL_MESSAGE_ROUTERS ||
218 destination_router_id == router->get_message_router_id())
220 router->receive(msg);
224 sub = sub->next_subscription();
233 successor.receive(destination_router_id, msg);
238 virtual void receive(etl::message_router_id_t destination_router_id,
246 subscription* sub =
static_cast<subscription*
>(head.get_next());
248 while (sub != ETL_NULLPTR)
250 message_id_span_t message_ids = sub->message_id_list();
252 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(),
id);
254 if (itr != message_ids.end())
258 if (destination_router_id == etl::imessage_router::ALL_MESSAGE_ROUTERS ||
259 destination_router_id == router->get_message_router_id())
261 router->receive(shared_msg);
265 sub = sub->next_subscription();
276 using imessage_router::accepts;
293 ETL_DEPRECATED
virtual bool is_null_router() const ETL_OVERRIDE
299 virtual bool is_producer() const ETL_OVERRIDE
305 virtual bool is_consumer() const ETL_OVERRIDE
313 return head.get_next() == ETL_NULLPTR;
323 subscription_node* p_sub = head.get_next();
324 subscription_node* p_sub_previous = &head;
326 while (p_sub != ETL_NULLPTR)
329 if (
static_cast<subscription*
>(p_sub)->get_router() == p_target_router)
332 p_sub_previous->set_next(p_sub->get_next());
340 p_sub = p_sub->get_next();
341 p_sub_previous = p_sub_previous->get_next();
344 if (p_new_sub != ETL_NULLPTR)
347 p_sub_previous->append(p_new_sub);
351 subscription_node head;
This is the base of all message routers.
Definition: message_router_generator.h:121
Definition: message_broker.h:100
Message broker.
Definition: message_broker.h:47
void subscribe(etl::message_broker::subscription &new_sub)
Subscribe to the broker.
Definition: message_broker.h:174
message_broker()
Constructor.
Definition: message_broker.h:136
message_broker(etl::message_router_id_t id_)
Constructor.
Definition: message_broker.h:154
message_broker(etl::message_router_id_t id_, etl::imessage_router &successor_)
Constructor.
Definition: message_broker.h:164
message_broker(etl::imessage_router &successor_)
Constructor.
Definition: message_broker.h:145
virtual bool accepts(etl::message_id_t) const ETL_OVERRIDE
Message brokers accept all messages.
Definition: message_broker.h:281
Router id is out of the legal range.
Definition: message_router_generator.h:101
Definition: shared_message.h:49
Span - Fixed Extent.
Definition: span.h:62
bool has_successor() const
Does this have a successor?
Definition: successor.h:184
successor_type & get_successor() const
Definition: successor.h:174
successor()
Default constructor.
Definition: successor.h:81
#define ETL_ASSERT(b, e)
Definition: error_handler.h:316
bitset_ext
Definition: absolute.h:38
uint_least8_t message_id_t
Allow alternative type for message id.
Definition: message_types.h:40