ewmscp  ..
kafkaQueue.cpp
Go to the documentation of this file.
1 #include "kafkaQueue.h"
2 #include <librdkafka/rdkafkacpp.h>
3 #include <errMsgQueue.h>
4 
5 namespace messageQueue {
6  kafka::kafka(const std::string& aName):
7  queue(aName),
8  producer(nullptr),
9  consumer(nullptr),
10  topicName('\0', aName + "-topic", aName + " queue topic name"),
11  producerCfgStrings('\0', aName + "-pcfg", aName + " producer/consumer config strings"),
12  topicCfgStrings('\0', aName + "-tcfg", aName + " topic config strings") {
13  }
14  void kafka::init(bool isConsumer) {
15  if (! producerCfgStrings.empty()) {
16  std::string errstr; // evil expensive reporting...
17  RdKafka::Conf *pconf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
18  for (const auto& item : producerCfgStrings) {
19  if (pconf->set(item.first, item.second, errstr) != RdKafka::Conf::CONF_OK) {
20  throw std::runtime_error(errstr);
21  }
22  }
23  tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
24  for (const auto& item : topicCfgStrings) {
25  if (tconf->set(item.first, item.second, errstr) != RdKafka::Conf::CONF_OK) {
26  throw std::runtime_error(errstr);
27  }
28  }
29  if (isConsumer) {
30  consumer = RdKafka::Consumer::create(pconf, errstr);
31  decltype(topics)::mapped_type topic(RdKafka::Topic::create(consumer, topicName, tconf, errstr));
32  if (topic == nullptr) {
33  throw std::runtime_error(errstr);
34  }
35  topics.emplace(topicName, std::move(topic));
36  consumer->start(topics.begin()->second.get(), 0, RdKafka::Topic::OFFSET_END);
37  } else {
38  producer = RdKafka::Producer::create(pconf, errstr);
39  if (topicName.fIsSet()) {
40  decltype(topics)::mapped_type topic(RdKafka::Topic::create(producer, topicName, tconf, errstr));
41  if (topic == nullptr) {
42  throw std::runtime_error(errstr);
43  }
44  topics.emplace(topicName, std::move(topic));
45  }
46  }
47  }
48  }
50  if (producer) {
51  producer->flush(1000);
52  }
53  delete producer;
54  if (consumer && !topics.empty()) {
55  consumer->stop(topics.begin()->second.get(), 0);
56  }
57  delete consumer;
58  }
59  void kafka::send(const std::string& aMessage,
60  const std::string& aTopic) {
61  RdKafka::Topic* topic;
62  if (aTopic.empty()) {
63  topic = topics[topicName].get();
64  } else {
65  auto it = topics.find(aTopic);
66  if (it == topics.end()) {
67  std::string errstr;
68  decltype(topics)::mapped_type newTopic(RdKafka::Topic::create(producer, aTopic, tconf, errstr));
69  if (newTopic == nullptr) {
70  throw std::runtime_error(errstr);
71  }
72  auto result = topics.emplace(aTopic, std::move(newTopic));
74  aTopic, "kafkaSender", "new dynamic topic created");
75  it = result.first;
76  }
77  topic = it->second.get();
78  }
79  producer->produce(topic, RdKafka::Topic::PARTITION_UA,
80  RdKafka::Producer::RK_MSG_COPY,
81  const_cast<void*>(static_cast<const void *>(aMessage.data())), aMessage.size(),
82  nullptr, nullptr);
83  }
84  std::string kafka::receive(std::chrono::system_clock::duration timeout) {
85  auto message = consumer->consume(topics.begin()->second.get(), 0,
86  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
87  if (message->err() == RdKafka::ERR_NO_ERROR) {
88  auto retval = std::string(reinterpret_cast<char *>(message->payload()),
89  message->len());
90  delete message;
91  return retval;
92  } else if (message->err() != RdKafka::ERR_REQUEST_TIMED_OUT &&
93  message->err() != RdKafka::ERR__PARTITION_EOF &&
94  message->err() != RdKafka::ERR__TIMED_OUT) {
95  throw std::runtime_error(RdKafka::err2str(message->err())
96  + std::to_string(message->err()));
97  }
98  delete message;
99  return "";
100  }
101 
102 
104  return producer != nullptr;
105  }
107  return consumer != nullptr;
108  }
109 } // end namespace messageQueue
errMsgQueue.h
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
messageQueue::kafka::send
void send(const std::string &aMessage, const std::string &aTopic="") override
Definition: kafkaQueue.cpp:59
messageQueue::kafka::producer
RdKafka::Producer * producer
Definition: kafkaQueue.h:17
options::base::fIsSet
virtual bool fIsSet() const
check if this option was set, regardless of from command line or config file
Definition: Options.h:263
messageQueue::kafka::isReceiverConfigured
bool isReceiverConfigured() const override
Definition: kafkaQueue.cpp:106
messageQueue::kafka::kafka
kafka(const std::string &aName)
Definition: kafkaQueue.cpp:6
errMsg::level::debug
@ debug
kafkaQueue.h
messageQueue::kafka::init
void init(bool isConsumer=false)
Definition: kafkaQueue.cpp:14
messageQueue::kafka::topicCfgStrings
options::map< std::string > topicCfgStrings
Definition: kafkaQueue.h:22
messageQueue::kafka::~kafka
~kafka() override
Definition: kafkaQueue.cpp:49
messageQueue::kafka::tconf
RdKafka::Conf * tconf
Definition: kafkaQueue.h:19
consumer
void consumer(buffqueue *queue, buffqueue *results, unsigned long blockSize)
Definition: fileopstest.cpp:317
messageQueue::kafka::topics
std::map< std::string, std::unique_ptr< RdKafka::Topic > > topics
Definition: kafkaQueue.h:16
messageQueue::kafka::topicName
options::single< std::string > topicName
Definition: kafkaQueue.h:20
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
messageQueue::kafka::isSenderConfigured
bool isSenderConfigured() const override
Definition: kafkaQueue.cpp:103
messageQueue::queue
Definition: messageQueue.h:8
messageQueue::kafka::receive
std::string receive(std::chrono::system_clock::duration timeout) override
Definition: kafkaQueue.cpp:84
messageQueue::kafka::producerCfgStrings
options::map< std::string > producerCfgStrings
Definition: kafkaQueue.h:21
messageQueue::kafka::consumer
RdKafka::Consumer * consumer
Definition: kafkaQueue.h:18
messageQueue
Definition: kafkaQueue.cpp:5