订阅消息(异步)

在创建topic之后,再开启一个线程用于获取topic上的数据。

开启新的线程

加入sub_thread

System::Thread pub_thread_, sub_thread_;

创建

auto sub_thread_fn = [](MessageTest* msg_test) {
  while (1) {
    msg_test->pub_thread_.SleepUntil(1);
  }
};
this->pub_thread_.Create(sub_thread_fn, this, "msg_test_sub_thread", 256,
                           System::Thread::MEDIUM);

订阅话题

订阅话题时需要等待话题创建完成,此处如果先订阅再创建话题,就会产生死锁。所以应该把话题的创建放在构造函数里,订阅话题应当在线程运行之后。

使用话题名订阅话题,订阅时需要指定消息的存放位置

auto tp_sub = Message::Subscriber<Data>("test_topic", msg_test->pub_data_);

或者使用话题对象来订阅

auto tp_sub = Message::Subscriber<Data>(msg_test->topic_, msg_test->pub_data_);

最后从话题中得到数据

tp_sub.DumpData();

示例

因为代码现在运行在linux上,在tp_sub.DumpData()的后面加入printf("%d\n", msg_test->pub_data_.d2),可以直接在命令行查看运行的结果

结果

mod_message_test.cpp

#include "mod_message_test.hpp"

#include <thread.hpp>

using namespace Module;

MessageTest::MessageTest(Param& param) : param_(param), topic_("test_topic") {
  auto pub_thread_fn = [](MessageTest* msg_test) {
    while (1) {
      msg_test->pub_data_.d2++;
      msg_test->topic_.Publish(msg_test->pub_data_);
      msg_test->pub_thread_.SleepUntil(1);
    }
  };
  this->pub_thread_.Create(pub_thread_fn, this, "msg_test_pub_thread", 256,
                           System::Thread::MEDIUM);

  auto sub_thread_fn = [](MessageTest* msg_test) {
    auto tp_sub = Message::Subscriber<Data>("test_topic", msg_test->pub_data_);
    while (1) {
      tp_sub.DumpData();
      msg_test->pub_thread_.SleepUntil(1);
    }
  };
  this->pub_thread_.Create(sub_thread_fn, this, "msg_test_sub_thread", 256,
                           System::Thread::MEDIUM);
}

mod_message_test.hpp

#include <module.hpp>
#include <thread.hpp>

#include "om.hpp"

namespace Module {
class MessageTest {
 public:
  struct Param {};

  struct Data {
    float d1;
    int d2;
    char d3;
  };

  MessageTest(Param& param);

private:
  Param& param_;

  Data pub_data_;

  Data sub_data_;

  Message::Topic<Data> topic_;

  System::Thread pub_thread_, sub_thread_;
};
}  // namespace Module