2 #include <librdkafka/rdkafkacpp.h>
10 topicName(
'\0', aName +
"-topic", aName +
" queue topic name"),
11 producerCfgStrings(
'\0', aName +
"-pcfg", aName +
" producer/consumer config strings"),
12 topicCfgStrings(
'\0', aName +
"-tcfg", aName +
" topic config strings") {
17 RdKafka::Conf *pconf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
19 if (pconf->set(item.first, item.second, errstr) != RdKafka::Conf::CONF_OK) {
20 throw std::runtime_error(errstr);
23 tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
25 if (
tconf->set(item.first, item.second, errstr) != RdKafka::Conf::CONF_OK) {
26 throw std::runtime_error(errstr);
30 consumer = RdKafka::Consumer::create(pconf, errstr);
32 if (topic ==
nullptr) {
33 throw std::runtime_error(errstr);
36 consumer->start(
topics.begin()->second.get(), 0, RdKafka::Topic::OFFSET_END);
38 producer = RdKafka::Producer::create(pconf, errstr);
41 if (topic ==
nullptr) {
42 throw std::runtime_error(errstr);
60 const std::string& aTopic) {
61 RdKafka::Topic* topic;
65 auto it =
topics.find(aTopic);
68 decltype(
topics)::mapped_type newTopic(RdKafka::Topic::create(
producer, aTopic,
tconf, errstr));
69 if (newTopic ==
nullptr) {
70 throw std::runtime_error(errstr);
72 auto result =
topics.emplace(aTopic, std::move(newTopic));
74 aTopic,
"kafkaSender",
"new dynamic topic created");
77 topic = it->second.get();
79 producer->produce(topic, RdKafka::Topic::PARTITION_UA,
80 RdKafka::Producer::RK_MSG_COPY,
81 const_cast<void*
>(
static_cast<const void *
>(aMessage.data())), aMessage.size(),
86 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
87 if (message->err() == RdKafka::ERR_NO_ERROR) {
88 auto retval = std::string(
reinterpret_cast<char *
>(message->payload()),
92 }
else if (message->err() != RdKafka::ERR_REQUEST_TIMED_OUT &&
93 message->err() != RdKafka::ERR__PARTITION_EOF &&
94 message->err() != RdKafka::ERR__TIMED_OUT) {
95 throw std::runtime_error(RdKafka::err2str(message->err())
96 + std::to_string(message->err()));