ewmscp  ..
Public Member Functions | Private Attributes | List of all members
messageQueue::kafka Class Reference

#include <kafkaQueue.h>

Inheritance diagram for messageQueue::kafka:
[legend]
Collaboration diagram for messageQueue::kafka:
[legend]

Public Member Functions

 kafka (const std::string &aName)
 
void init (bool isConsumer=false)
 
 ~kafka () override
 
void send (const std::string &aMessage, const std::string &aTopic="") override
 
std::string receive (std::chrono::system_clock::duration timeout) override
 
bool isSenderConfigured () const override
 
bool isReceiverConfigured () const override
 
- Public Member Functions inherited from messageQueue::queue
 queue (const std::string &aName)
 
virtual ~queue ()=default
 

Private Attributes

std::map< std::string, std::unique_ptr< RdKafka::Topic > > topics
 
RdKafka::Producer * producer
 
RdKafka::Consumer * consumer
 
RdKafka::Conf * tconf
 
options::single< std::string > topicName
 
options::map< std::string > producerCfgStrings
 
options::map< std::string > topicCfgStrings
 

Detailed Description

Definition at line 14 of file kafkaQueue.h.

Constructor & Destructor Documentation

◆ kafka()

messageQueue::kafka::kafka ( const std::string &  aName)

Definition at line 6 of file kafkaQueue.cpp.

6  :
7  queue(aName),
8  producer(nullptr),
9  consumer(nullptr),
10  topicName('\0', aName + "-topic", aName + " queue topic name"),
11  producerCfgStrings('\0', aName + "-pcfg", aName + " producer/consumer config strings"),
12  topicCfgStrings('\0', aName + "-tcfg", aName + " topic config strings") {
13  }

◆ ~kafka()

messageQueue::kafka::~kafka ( )
override

Definition at line 49 of file kafkaQueue.cpp.

49  {
50  if (producer) {
51  producer->flush(1000);
52  }
53  delete producer;
54  if (consumer && !topics.empty()) {
55  consumer->stop(topics.begin()->second.get(), 0);
56  }
57  delete consumer;
58  }

References consumer, producer, and topics.

Member Function Documentation

◆ init()

void messageQueue::kafka::init ( bool  isConsumer = false)

Definition at line 14 of file kafkaQueue.cpp.

14  {
15  if (! producerCfgStrings.empty()) {
16  std::string errstr; // evil expensive reporting...
17  RdKafka::Conf *pconf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
18  for (const auto& item : producerCfgStrings) {
19  if (pconf->set(item.first, item.second, errstr) != RdKafka::Conf::CONF_OK) {
20  throw std::runtime_error(errstr);
21  }
22  }
23  tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
24  for (const auto& item : topicCfgStrings) {
25  if (tconf->set(item.first, item.second, errstr) != RdKafka::Conf::CONF_OK) {
26  throw std::runtime_error(errstr);
27  }
28  }
29  if (isConsumer) {
30  consumer = RdKafka::Consumer::create(pconf, errstr);
31  decltype(topics)::mapped_type topic(RdKafka::Topic::create(consumer, topicName, tconf, errstr));
32  if (topic == nullptr) {
33  throw std::runtime_error(errstr);
34  }
35  topics.emplace(topicName, std::move(topic));
36  consumer->start(topics.begin()->second.get(), 0, RdKafka::Topic::OFFSET_END);
37  } else {
38  producer = RdKafka::Producer::create(pconf, errstr);
39  if (topicName.fIsSet()) {
40  decltype(topics)::mapped_type topic(RdKafka::Topic::create(producer, topicName, tconf, errstr));
41  if (topic == nullptr) {
42  throw std::runtime_error(errstr);
43  }
44  topics.emplace(topicName, std::move(topic));
45  }
46  }
47  }
48  }

References consumer, options::base::fIsSet(), producer, producerCfgStrings, tconf, topicCfgStrings, topicName, and topics.

Referenced by followKafkaRequestProvider::followKafkaRequestProvider().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ isReceiverConfigured()

bool messageQueue::kafka::isReceiverConfigured ( ) const
overridevirtual

Implements messageQueue::queue.

Definition at line 106 of file kafkaQueue.cpp.

106  {
107  return consumer != nullptr;
108  }

References consumer.

◆ isSenderConfigured()

bool messageQueue::kafka::isSenderConfigured ( ) const
overridevirtual

Implements messageQueue::queue.

Definition at line 103 of file kafkaQueue.cpp.

103  {
104  return producer != nullptr;
105  }

References producer.

◆ receive()

std::string messageQueue::kafka::receive ( std::chrono::system_clock::duration  timeout)
overridevirtual

Implements messageQueue::queue.

Definition at line 84 of file kafkaQueue.cpp.

84  {
85  auto message = consumer->consume(topics.begin()->second.get(), 0,
86  std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
87  if (message->err() == RdKafka::ERR_NO_ERROR) {
88  auto retval = std::string(reinterpret_cast<char *>(message->payload()),
89  message->len());
90  delete message;
91  return retval;
92  } else if (message->err() != RdKafka::ERR_REQUEST_TIMED_OUT &&
93  message->err() != RdKafka::ERR__PARTITION_EOF &&
94  message->err() != RdKafka::ERR__TIMED_OUT) {
95  throw std::runtime_error(RdKafka::err2str(message->err())
96  + std::to_string(message->err()));
97  }
98  delete message;
99  return "";
100  }

References consumer, and topics.

Referenced by followKafkaRequestProvider::processSources().

Here is the caller graph for this function:

◆ send()

void messageQueue::kafka::send ( const std::string &  aMessage,
const std::string &  aTopic = "" 
)
overridevirtual

Implements messageQueue::queue.

Definition at line 59 of file kafkaQueue.cpp.

60  {
61  RdKafka::Topic* topic;
62  if (aTopic.empty()) {
63  topic = topics[topicName].get();
64  } else {
65  auto it = topics.find(aTopic);
66  if (it == topics.end()) {
67  std::string errstr;
68  decltype(topics)::mapped_type newTopic(RdKafka::Topic::create(producer, aTopic, tconf, errstr));
69  if (newTopic == nullptr) {
70  throw std::runtime_error(errstr);
71  }
72  auto result = topics.emplace(aTopic, std::move(newTopic));
74  aTopic, "kafkaSender", "new dynamic topic created");
75  it = result.first;
76  }
77  topic = it->second.get();
78  }
79  producer->produce(topic, RdKafka::Topic::PARTITION_UA,
80  RdKafka::Producer::RK_MSG_COPY,
81  const_cast<void*>(static_cast<const void *>(aMessage.data())), aMessage.size(),
82  nullptr, nullptr);
83  }

References errMsg::debug, errMsg::emit(), producer, tconf, topicName, and topics.

Here is the call graph for this function:

Member Data Documentation

◆ consumer

RdKafka::Consumer* messageQueue::kafka::consumer
private

Definition at line 18 of file kafkaQueue.h.

Referenced by init(), isReceiverConfigured(), receive(), and ~kafka().

◆ producer

RdKafka::Producer* messageQueue::kafka::producer
private

Definition at line 17 of file kafkaQueue.h.

Referenced by init(), isSenderConfigured(), send(), and ~kafka().

◆ producerCfgStrings

options::map<std::string> messageQueue::kafka::producerCfgStrings
private

Definition at line 21 of file kafkaQueue.h.

Referenced by init().

◆ tconf

RdKafka::Conf* messageQueue::kafka::tconf
private

Definition at line 19 of file kafkaQueue.h.

Referenced by init(), and send().

◆ topicCfgStrings

options::map<std::string> messageQueue::kafka::topicCfgStrings
private

Definition at line 22 of file kafkaQueue.h.

Referenced by init().

◆ topicName

options::single<std::string> messageQueue::kafka::topicName
private

Definition at line 20 of file kafkaQueue.h.

Referenced by init(), and send().

◆ topics

std::map<std::string, std::unique_ptr<RdKafka::Topic> > messageQueue::kafka::topics
private

Definition at line 16 of file kafkaQueue.h.

Referenced by init(), receive(), send(), and ~kafka().


The documentation for this class was generated from the following files:
messageQueue::queue::queue
queue(const std::string &aName)
Definition: messageQueue.h:11
errMsg::location
class for defining the location of a error message in the source code.
Definition: errMsgQueue.h:14
messageQueue::kafka::producer
RdKafka::Producer * producer
Definition: kafkaQueue.h:17
RdKafka
Definition: kafkaQueue.h:7
options::base::fIsSet
virtual bool fIsSet() const
check if this option was set, regardless of from command line or config file
Definition: Options.h:263
errMsg::level::debug
@ debug
messageQueue::kafka::topicCfgStrings
options::map< std::string > topicCfgStrings
Definition: kafkaQueue.h:22
messageQueue::kafka::tconf
RdKafka::Conf * tconf
Definition: kafkaQueue.h:19
messageQueue::kafka::topics
std::map< std::string, std::unique_ptr< RdKafka::Topic > > topics
Definition: kafkaQueue.h:16
messageQueue::kafka::topicName
options::single< std::string > topicName
Definition: kafkaQueue.h:20
errMsg::emit
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
Definition: errMsgQueue.h:148
messageQueue::kafka::producerCfgStrings
options::map< std::string > producerCfgStrings
Definition: kafkaQueue.h:21
messageQueue::kafka::consumer
RdKafka::Consumer * consumer
Definition: kafkaQueue.h:18