dynamic_graph_manager
ros_queued_subscribe.hxx
Go to the documentation of this file.
1 
10 #ifndef DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
11 #define DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
12 #include <dynamic-graph/linear-algebra.h>
13 #include <dynamic-graph/signal-cast-helper.h>
14 #include <dynamic-graph/signal-caster.h>
15 #include <std_msgs/Float64.h>
16 #include <boost/bind.hpp>
17 #include <boost/date_time/posix_time/posix_time.hpp>
18 #include <vector>
19 #include "dynamic_graph_manager/Matrix.h"
20 #include "dynamic_graph_manager/Vector.h"
21 
22 namespace dg = dynamicgraph;
23 typedef boost::mutex::scoped_lock scoped_lock;
24 
25 namespace dynamic_graph
26 {
27 namespace internal
28 {
29 static const int BUFFER_SIZE = 1 << 10;
30 
31 template <typename T>
32 struct Add
33 {
34  void operator()(RosQueuedSubscribe& rosSubscribe,
35  const std::string& type,
36  const std::string& signal,
37  const std::string& topic)
38  {
39  typedef typename DgToRos<T>::dg_t dg_t;
40  typedef typename DgToRos<T>::ros_const_ptr_t ros_const_ptr_t;
41  typedef BindedSignal<dg_t, BUFFER_SIZE> BindedSignal_t;
42  typedef typename BindedSignal_t::Signal_t Signal_t;
43 
44  // Initialize the bindedSignal object.
45  BindedSignal_t* bs = new BindedSignal_t(&rosSubscribe);
46  DgToRos<T>::setDefault(bs->last);
47 
48  // Initialize the signal.
49  boost::format signalName("RosQueuedSubscribe(%1%)::output(%2%)::%3%");
50  signalName % rosSubscribe.getName() % type % signal;
51 
52  bs->signal.reset(new Signal_t(signalName.str()));
53  bs->signal->setFunction(
54  boost::bind(&BindedSignal_t::reader, bs, _1, _2));
55  rosSubscribe.signalRegistration(*bs->signal);
56 
57  // Initialize the subscriber.
58  typedef boost::function<void(const ros_const_ptr_t& data)> callback_t;
59  callback_t callback = boost::bind(
60  &BindedSignal_t::template writer<ros_const_ptr_t>, bs, _1);
61 
62  // Keep 50 messages in queue, but only 20 are sent every 100ms
63  // -> No message should be lost because of a full buffer
64  bs->subscriber = boost::make_shared<ros::Subscriber>(
65  rosSubscribe.nh().subscribe(topic, BUFFER_SIZE, callback));
66 
67  RosQueuedSubscribe::bindedSignal_t bindedSignal(bs);
68  rosSubscribe.bindedSignal()[signal] = bindedSignal;
69  }
70 };
71 
72 // template <typename T, typename R>
73 template <typename T, int N>
74 template <typename R>
75 void BindedSignal<T, N>::writer(const R& data)
76 {
77  // synchronize with method clear
78  boost::mutex::scoped_lock lock(wmutex);
79  if (full())
80  {
81  rmutex.lock();
82  frontIdx = (frontIdx + 1) % N;
83  rmutex.unlock();
84  }
85  converter(buffer[backIdx], data);
86  // No need to synchronize with reader here because:
87  // - if the buffer was not empty, then it stays not empty,
88  // - if it was empty, then the current value will be used at next time. It
89  // means the transmission bandwidth is too low.
90  if (!init)
91  {
92  last = buffer[backIdx];
93  init = true;
94  }
95  backIdx = (backIdx + 1) % N;
96 }
97 
98 template <typename T, int N>
99 T& BindedSignal<T, N>::reader(T& data, int time)
100 {
101  // synchronize with method clear:
102  // If reading from the list cannot be done, then return last value.
103  scoped_lock lock(rmutex, boost::try_to_lock);
104  if (!lock.owns_lock() || entity->readQueue() == -1 ||
105  time < entity->readQueue())
106  {
107  data = last;
108  }
109  else
110  {
111  if (empty())
112  data = last;
113  else
114  {
115  data = buffer[frontIdx];
116  frontIdx = (frontIdx + 1) % N;
117  last = data;
118  }
119  }
120  return data;
121 }
122 } // end of namespace internal.
123 
124 template <typename T>
125 void RosQueuedSubscribe::add(const std::string& type,
126  const std::string& signal,
127  const std::string& topic)
128 {
129  internal::Add<T>()(*this, type, signal, topic);
130 }
131 } // namespace dynamic_graph
132 
133 #endif
boost::function< void(int)> callback_t
Simple shortcut for code writing convenience.
Definition: ros_robot_state_publisher.hpp:43
this is this package namespace in order to avoid naming conflict
Definition: device.hh:22
void converter(D &dst, const S &src)
Handle ROS <-> dynamic-graph conversion.
Definition of the command.
Definition: ros_robot_state_publisher.hpp:164