MAV'RIC
mavlink_protobuf_manager.hpp
1 #ifndef MAVLINKPROTOBUFMANAGER_HPP
2 #define MAVLINKPROTOBUFMANAGER_HPP
3 
4 #include <deque>
5 #include <google/protobuf/message.h>
6 #include <iostream>
7 #include <tr1/memory>
8 
9 #include <checksum.h>
10 #include <common/mavlink.h>
11 #include <mavlink_types.h>
12 #include <pixhawk/pixhawk.pb.h>
13 
14 namespace mavlink
15 {
16 
18 {
19 public:
21  : mRegisteredTypeCount(0)
22  , mStreamID(0)
23  , mVerbose(false)
24  , kExtendedHeaderSize(MAVLINK_EXTENDED_HEADER_LEN)
25  , kExtendedPayloadMaxSize(MAVLINK_MAX_EXTENDED_PAYLOAD_LEN)
26  {
27  // register GLOverlay
28  {
29  std::tr1::shared_ptr<px::GLOverlay> msg(new px::GLOverlay);
30  registerType(msg);
31  }
32 
33  // register ObstacleList
34  {
35  std::tr1::shared_ptr<px::ObstacleList> msg(new px::ObstacleList);
36  registerType(msg);
37  }
38 
39  // register ObstacleMap
40  {
41  std::tr1::shared_ptr<px::ObstacleMap> msg(new px::ObstacleMap);
42  registerType(msg);
43  }
44 
45  // register Path
46  {
47  std::tr1::shared_ptr<px::Path> msg(new px::Path);
48  registerType(msg);
49  }
50 
51  // register PointCloudXYZI
52  {
53  std::tr1::shared_ptr<px::PointCloudXYZI> msg(new px::PointCloudXYZI);
54  registerType(msg);
55  }
56 
57  // register PointCloudXYZRGB
58  {
59  std::tr1::shared_ptr<px::PointCloudXYZRGB> msg(new px::PointCloudXYZRGB);
60  registerType(msg);
61  }
62 
63  // register RGBDImage
64  {
65  std::tr1::shared_ptr<px::RGBDImage> msg(new px::RGBDImage);
66  registerType(msg);
67  }
68 
69  srand(time(NULL));
70  mStreamID = rand() + 1;
71  }
72 
73  bool fragmentMessage(uint8_t system_id, uint8_t component_id,
74  uint8_t target_system, uint8_t target_component,
75  const google::protobuf::Message& protobuf_msg,
76  std::vector<mavlink_extended_message_t>& fragments) const
77  {
78  TypeMap::const_iterator it = mTypeMap.find(protobuf_msg.GetTypeName());
79  if (it == mTypeMap.end())
80  {
81  std::cout << "# WARNING: Protobuf message with type "
82  << protobuf_msg.GetTypeName() << " is not registered."
83  << std::endl;
84  return false;
85  }
86 
87  uint8_t typecode = it->second;
88 
89  std::string data = protobuf_msg.SerializeAsString();
90 
91  int fragmentCount = (protobuf_msg.ByteSize() + kExtendedPayloadMaxSize - 1) / kExtendedPayloadMaxSize;
92  unsigned int offset = 0;
93 
94  for (int i = 0; i < fragmentCount; ++i)
95  {
96  mavlink_extended_message_t fragment;
97 
98  // write extended header data
99  uint8_t* payload = reinterpret_cast<uint8_t*>(fragment.base_msg.payload64);
100  unsigned int length = 0;
101  uint8_t flags = 0;
102 
103  if (i < fragmentCount - 1)
104  {
105  length = kExtendedPayloadMaxSize;
106  flags |= 0x1;
107  }
108  else
109  {
110  length = protobuf_msg.ByteSize() - kExtendedPayloadMaxSize * (fragmentCount - 1);
111  }
112 
113  memcpy(payload, &target_system, 1);
114  memcpy(payload + 1, &target_component, 1);
115  memcpy(payload + 2, &typecode, 1);
116  memcpy(payload + 3, &length, 4);
117  memcpy(payload + 7, &mStreamID, 2);
118  memcpy(payload + 9, &offset, 4);
119  memcpy(payload + 13, &flags, 1);
120 
121  fragment.base_msg.msgid = MAVLINK_MSG_ID_EXTENDED_MESSAGE;
122  mavlink_finalize_message(&fragment.base_msg, system_id, component_id, kExtendedHeaderSize, 0);
123 
124  // write extended payload data
125  fragment.extended_payload_len = length;
126  memcpy(fragment.extended_payload, &data[offset], length);
127 
128  fragments.push_back(fragment);
129  offset += length;
130  }
131 
132  if (mVerbose)
133  {
134  std::cerr << "# INFO: Split extended message with size "
135  << protobuf_msg.ByteSize() << " into "
136  << fragmentCount << " fragments." << std::endl;
137  }
138 
139  return true;
140  }
141 
142  bool cacheFragment(mavlink_extended_message_t& msg)
143  {
144  if (!validFragment(msg))
145  {
146  if (mVerbose)
147  {
148  std::cerr << "# WARNING: Message is not a valid fragment. "
149  << "Dropping message..." << std::endl;
150  }
151  return false;
152  }
153 
154  // read extended header
155  uint8_t* payload = reinterpret_cast<uint8_t*>(msg.base_msg.payload64);
156  uint8_t typecode = 0;
157  unsigned int length = 0;
158  unsigned short streamID = 0;
159  unsigned int offset = 0;
160  uint8_t flags = 0;
161 
162  memcpy(&typecode, payload + 2, 1);
163  memcpy(&length, payload + 3, 4);
164  memcpy(&streamID, payload + 7, 2);
165  memcpy(&offset, payload + 9, 4);
166  memcpy(&flags, payload + 13, 1);
167 
168  if (typecode >= mTypeMap.size())
169  {
170  std::cout << "# WARNING: Protobuf message with type code "
171  << static_cast<int>(typecode) << " is not registered." << std::endl;
172  return false;
173  }
174 
175  bool reassemble = false;
176 
177  FragmentQueue::iterator it = mFragmentQueue.find(streamID);
178  if (it == mFragmentQueue.end())
179  {
180  if (offset == 0)
181  {
182  mFragmentQueue[streamID].push_back(msg);
183 
184  if ((flags & 0x1) != 0x1)
185  {
186  reassemble = true;
187  }
188 
189  if (mVerbose)
190  {
191  std::cerr << "# INFO: Added fragment to new queue."
192  << std::endl;
193  }
194  }
195  else
196  {
197  if (mVerbose)
198  {
199  std::cerr << "# WARNING: Message is not a valid fragment. "
200  << "Dropping message..." << std::endl;
201  }
202  }
203  }
204  else
205  {
206  std::deque<mavlink_extended_message_t>& queue = it->second;
207 
208  if (queue.empty())
209  {
210  if (offset == 0)
211  {
212  queue.push_back(msg);
213 
214  if ((flags & 0x1) != 0x1)
215  {
216  reassemble = true;
217  }
218  }
219  else
220  {
221  if (mVerbose)
222  {
223  std::cerr << "# WARNING: Message is not a valid fragment. "
224  << "Dropping message..." << std::endl;
225  }
226  }
227  }
228  else
229  {
230  if (fragmentDataSize(queue.back()) + fragmentOffset(queue.back()) != offset)
231  {
232  if (mVerbose)
233  {
234  std::cerr << "# WARNING: Previous fragment(s) have been lost. "
235  << "Dropping message and clearing queue..." << std::endl;
236  }
237  queue.clear();
238  }
239  else
240  {
241  queue.push_back(msg);
242 
243  if ((flags & 0x1) != 0x1)
244  {
245  reassemble = true;
246  }
247  }
248  }
249  }
250 
251  if (reassemble)
252  {
253  std::deque<mavlink_extended_message_t>& queue = mFragmentQueue[streamID];
254 
255  std::string data;
256  for (size_t i = 0; i < queue.size(); ++i)
257  {
258  mavlink_extended_message_t& mavlink_msg = queue.at(i);
259 
260  data.append(reinterpret_cast<char*>(&mavlink_msg.extended_payload[0]),
261  static_cast<size_t>(mavlink_msg.extended_payload_len));
262  }
263 
264  mMessages.at(typecode)->ParseFromString(data);
265 
266  mMessageAvailable.at(typecode) = true;
267 
268  queue.clear();
269 
270  if (mVerbose)
271  {
272  std::cerr << "# INFO: Reassembled fragments for message with typename "
273  << mMessages.at(typecode)->GetTypeName() << " and size "
274  << mMessages.at(typecode)->ByteSize()
275  << "." << std::endl;
276  }
277  }
278 
279  return true;
280  }
281 
282  bool getMessage(std::tr1::shared_ptr<google::protobuf::Message>& msg)
283  {
284  for (size_t i = 0; i < mMessageAvailable.size(); ++i)
285  {
286  if (mMessageAvailable.at(i))
287  {
288  msg = mMessages.at(i);
289  mMessageAvailable.at(i) = false;
290 
291  return true;
292  }
293  }
294 
295  return false;
296  }
297 
298 private:
299  void registerType(const std::tr1::shared_ptr<google::protobuf::Message>& msg)
300  {
301  mTypeMap[msg->GetTypeName()] = mRegisteredTypeCount;
302  ++mRegisteredTypeCount;
303  mMessages.push_back(msg);
304  mMessageAvailable.push_back(false);
305  }
306 
307  bool validFragment(const mavlink_extended_message_t& msg) const
308  {
309  if (msg.base_msg.magic != MAVLINK_STX ||
310  msg.base_msg.len != kExtendedHeaderSize ||
311  msg.base_msg.msgid != MAVLINK_MSG_ID_EXTENDED_MESSAGE)
312  {
313  return false;
314  }
315 
316  uint16_t checksum;
317  checksum = crc_calculate(reinterpret_cast<const uint8_t*>(&msg.base_msg.len), MAVLINK_CORE_HEADER_LEN);
318  crc_accumulate_buffer(&checksum, reinterpret_cast<const char*>(&msg.base_msg.payload64), kExtendedHeaderSize);
319 #if MAVLINK_CRC_EXTRA
320  static const uint8_t mavlink_message_crcs[256] = MAVLINK_MESSAGE_CRCS;
321  crc_accumulate(mavlink_message_crcs[msg.base_msg.msgid], &checksum);
322 #endif
323 
324  if (mavlink_ck_a(&(msg.base_msg)) != (uint8_t)(checksum & 0xFF) &&
325  mavlink_ck_b(&(msg.base_msg)) != (uint8_t)(checksum >> 8))
326  {
327  return false;
328  }
329 
330  return true;
331  }
332 
333  unsigned int fragmentDataSize(const mavlink_extended_message_t& msg) const
334  {
335  const uint8_t* payload = reinterpret_cast<const uint8_t*>(msg.base_msg.payload64);
336 
337  return *(reinterpret_cast<const unsigned int*>(payload + 3));
338  }
339 
340  unsigned int fragmentOffset(const mavlink_extended_message_t& msg) const
341  {
342  const uint8_t* payload = reinterpret_cast<const uint8_t*>(msg.base_msg.payload64);
343 
344  return *(reinterpret_cast<const unsigned int*>(payload + 9));
345  }
346 
347  int mRegisteredTypeCount;
348  unsigned short mStreamID;
349  bool mVerbose;
350 
351  typedef std::map<std::string, uint8_t> TypeMap;
352  TypeMap mTypeMap;
353  std::vector< std::tr1::shared_ptr<google::protobuf::Message> > mMessages;
354  std::vector<bool> mMessageAvailable;
355 
356  typedef std::map<unsigned short, std::deque<mavlink_extended_message_t> > FragmentQueue;
357  FragmentQueue mFragmentQueue;
358 
359  const int kExtendedHeaderSize;
372  const int kExtendedPayloadMaxSize;
373 };
374 
375 }
376 
377 #endif