#include <kafkaQueue.h>
|
| kafka (const std::string &aName) |
|
void | init (bool isConsumer=false) |
|
| ~kafka () override |
|
void | send (const std::string &aMessage, const std::string &aTopic="") override |
|
std::string | receive (std::chrono::system_clock::duration timeout) override |
|
bool | isSenderConfigured () const override |
|
bool | isReceiverConfigured () const override |
|
| queue (const std::string &aName) |
|
virtual | ~queue ()=default |
|
Definition at line 14 of file kafkaQueue.h.
◆ kafka()
messageQueue::kafka::kafka |
( |
const std::string & |
aName | ) |
|
◆ ~kafka()
messageQueue::kafka::~kafka |
( |
| ) |
|
|
override |
◆ init()
void messageQueue::kafka::init |
( |
bool |
isConsumer = false | ) |
|
Definition at line 14 of file kafkaQueue.cpp.
17 RdKafka::Conf *pconf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
19 if (pconf->set(item.first, item.second, errstr) != RdKafka::Conf::CONF_OK) {
20 throw std::runtime_error(errstr);
23 tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
25 if (
tconf->set(item.first, item.second, errstr) != RdKafka::Conf::CONF_OK) {
26 throw std::runtime_error(errstr);
30 consumer = RdKafka::Consumer::create(pconf, errstr);
32 if (topic ==
nullptr) {
33 throw std::runtime_error(errstr);
36 consumer->start(
topics.begin()->second.get(), 0, RdKafka::Topic::OFFSET_END);
38 producer = RdKafka::Producer::create(pconf, errstr);
41 if (topic ==
nullptr) {
42 throw std::runtime_error(errstr);
References consumer, options::base::fIsSet(), producer, producerCfgStrings, tconf, topicCfgStrings, topicName, and topics.
Referenced by followKafkaRequestProvider::followKafkaRequestProvider().
◆ isReceiverConfigured()
bool messageQueue::kafka::isReceiverConfigured |
( |
| ) |
const |
|
overridevirtual |
◆ isSenderConfigured()
bool messageQueue::kafka::isSenderConfigured |
( |
| ) |
const |
|
overridevirtual |
◆ receive()
std::string messageQueue::kafka::receive |
( |
std::chrono::system_clock::duration |
timeout | ) |
|
|
overridevirtual |
Implements messageQueue::queue.
Definition at line 84 of file kafkaQueue.cpp.
86 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
87 if (message->err() == RdKafka::ERR_NO_ERROR) {
88 auto retval = std::string(
reinterpret_cast<char *
>(message->payload()),
92 }
else if (message->err() != RdKafka::ERR_REQUEST_TIMED_OUT &&
93 message->err() != RdKafka::ERR__PARTITION_EOF &&
94 message->err() != RdKafka::ERR__TIMED_OUT) {
95 throw std::runtime_error(RdKafka::err2str(message->err())
96 + std::to_string(message->err()));
References consumer, and topics.
Referenced by followKafkaRequestProvider::processSources().
◆ send()
void messageQueue::kafka::send |
( |
const std::string & |
aMessage, |
|
|
const std::string & |
aTopic = "" |
|
) |
| |
|
overridevirtual |
Implements messageQueue::queue.
Definition at line 59 of file kafkaQueue.cpp.
61 RdKafka::Topic* topic;
65 auto it =
topics.find(aTopic);
69 if (newTopic ==
nullptr) {
70 throw std::runtime_error(errstr);
72 auto result =
topics.emplace(aTopic, std::move(newTopic));
74 aTopic,
"kafkaSender",
"new dynamic topic created");
77 topic = it->second.get();
79 producer->produce(topic, RdKafka::Topic::PARTITION_UA,
80 RdKafka::Producer::RK_MSG_COPY,
81 const_cast<void*
>(
static_cast<const void *
>(aMessage.data())), aMessage.size(),
References errMsg::debug, errMsg::emit(), producer, tconf, topicName, and topics.
◆ consumer
RdKafka::Consumer* messageQueue::kafka::consumer |
|
private |
◆ producer
RdKafka::Producer* messageQueue::kafka::producer |
|
private |
◆ producerCfgStrings
options::map<std::string> messageQueue::kafka::producerCfgStrings |
|
private |
◆ tconf
RdKafka::Conf* messageQueue::kafka::tconf |
|
private |
◆ topicCfgStrings
options::map<std::string> messageQueue::kafka::topicCfgStrings |
|
private |
◆ topicName
◆ topics
std::map<std::string, std::unique_ptr<RdKafka::Topic> > messageQueue::kafka::topics |
|
private |
The documentation for this class was generated from the following files:
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!