 |
ewmscp
..
|
Go to the documentation of this file.
20 #include <condition_variable>
23 #include <forward_list>
36 #include <sys/resource.h>
51 #include <json/json.h>
68 defineStatic(
uid,
'\0',
"uid",
"uid of created files and directories", -1);
69 defineStatic(
gid,
'\0',
"gid",
"gid of created files and directories", -1);
72 "always follow symbolic links in SOURCE",
false);
84 "access mode for copied files", 0644);
87 "prefix for statistics log lines",
"");
90 "name of request provider",
93 "print stat values at the end anyway");
97 "name of statPrefix in json output",
"");
99 "extra field to add to json messages");
104 "name of outputHandler",
"posixFile");
106 "name of inputHandler",
"posixFile");
131 if (sigNum == SIGUSR2) {
145 throw std::logic_error(
"stop request handler cannot be instatiated twice");
148 throw std::logic_error(
"illegal handler id value");
164 static std::mutex instanceProtector;
165 std::unique_lock<decltype(instanceProtector)> lock(instanceProtector);
170 "stopRequest",
"requestStop",
"no stop request instance");
177 throw std::runtime_error(
reason);
179 throw std::runtime_error(
"SIGTERM received");
186 std::remove_reference<decltype(opt)>::type::size_type
start = 0;
187 auto stop = opt.find(
',',
start);
189 auto keyword = opt.substr(
start, stop -
start);
190 if (keyword ==
"mode") {
192 }
else if (keyword ==
"ownership") {
194 }
else if (keyword ==
"timestamps") {
196 }
else if (keyword ==
"attr" || keyword ==
"xattr") {
198 }
else if (keyword ==
"acls") {
200 }
else if (keyword ==
"all") {
207 throw std::invalid_argument(
"bad --preserve argument " + keyword);
209 if (stop == std::remove_reference<decltype(opt)>::type::npos) {
213 stop = opt.find(
',',
start);
226 while (
auto request = delayedRequests.
dequeue()) {
233 std::ostream& hashStream,
234 std::ostream& logStream,
235 std::ostream& statStream) {
237 bool timedOut =
true;
238 auto lastQueueSize = requests.
size();
239 auto lastQueueSizePrint = std::chrono::system_clock::now();
241 while (timedOut ==
true) {
242 while (
auto request = results.
dequeue(std::chrono::seconds(1), timedOut)) {
243 request->printResults(hashStream, logStream);
251 auto now = std::chrono::system_clock::now();
254 statStream <<
statPrefix <<
"statistics values at " << std::fixed
255 << std::chrono::duration<double>(now.time_since_epoch()).count() <<
"\n";
256 statStream <<
statPrefix <<
"outstanding requests: " << requests.
size() <<
"\n";
261 auto queueSize = requests.
size();
263 if (lastQueueSize != queueSize &&
265 now - lastQueueSizePrint > std::chrono::seconds(1)) {
266 statStream <<
statPrefix <<
"outstanding requests: " << requests.
size() <<
"\n";
267 lastQueueSize = queueSize;
268 lastQueueSizePrint = now;
279 }
catch (
const std::exception& e) {
281 "printThread",
"caught exception ",e.what());
289 "print error location (file,line,v fct) instead of prefix");
296 errStream << msg->getLoc().getFile() <<
":" << msg->getLoc().getLine()
297 <<
": in " << msg->getLoc().getFunc() <<
"(): ";
301 errStream << msg->getObject() <<
" " << msg->getAction() <<
" " << msg->getMessage() <<
"\n";
305 if (!prefixJsonName.empty()) {
308 for (
auto& item : jsonExtraFields) {
309 root[item.first] = item.second;
312 root[
"path"] = msg->getObject();
313 root[
"operation"] = msg->getAction();
314 root[
"message"] = msg->getMessage();
315 root[
"time"] = std::chrono::duration_cast<std::chrono::duration<double>>(msg->getTimeStamp().time_since_epoch()).count();
316 static Json::FastWriter jsonWriter;
317 errQueue->
send(jsonWriter.write(root));
319 errQueue->
send(msg->getMessage());
323 }
catch (
const std::exception& e) {
333 template <
class T>
void stopQueue(T& queue,
const std::string& name) {
336 }
catch (
const std::logic_error& e) {
337 if (queue.size() > 1) {
373 int main(
int argc,
const char* argv[]) {
374 options::parser parser(EWMSCP_VERSION
" https://stash.desy.de/projects/LSDMA/repos/ewmscp/commits/" EWMSCP_COMMIT,
"", {});
376 "source files to be copied");
378 "destination file or directory");
380 "copy all source arguments into directory");
382 "use full source file name under DIRECTORY",
false);
386 "user of created files and directories",
"");
392 "group of created files and directories",
"");
396 "preserve the given attribtes");
398 "preserve mode,ownership,timestamps");
401 "file to write hash results into");
403 "file to write log messages into");
405 "file to write statistics into");
407 "file to write errors to");
410 "message to show at start or stop");
413 "set core dump limit to inf, 0 else",
false);
419 "name of a checksum");
429 parser.fParse(argc, argv);
433 if (allowCoreDumps) {
434 struct rlimit new_limit = {RLIM_INFINITY, RLIM_INFINITY};
435 throwcall::good0(setrlimit(RLIMIT_CORE, &new_limit),
"can't set coredump limit to infinity");
437 struct rlimit new_limit = {0, 0};
438 throwcall::good0(setrlimit(RLIMIT_CORE, &new_limit),
"can't set coredump limit to 0");
443 pidFile << getpid() <<
"\n";
449 kafkaOutQueue.init();
450 if (kafkaOutQueue.isSenderConfigured()) {
453 kafkaErrQueue.init();
454 if (kafkaErrQueue.isSenderConfigured()) {
455 errorOutput = &kafkaErrQueue;
460 std::thread errPrinterThread(
printErrors, std::ref(errStreamBase.getStream()),
464 auto& hashStream = hashStreamBase.getStream();
466 auto& logStream = logStreamBase.getStream();
468 auto& statStream = statStreamBase.getStream();
471 for (
const auto& name : checksumCreatorNames) {
477 "ewmscp",
"starting",
478 EWMSCP_VERSION,
" pid: ", getpid());
480 if (!startStopMsg.empty()) {
486 if (group.fIsSet()) {
487 auto entry = getgrnam(group.c_str());
492 throw std::runtime_error(
"groupname " + group +
" not found.");
497 auto entry = getpwnam(user.c_str());
502 throw std::runtime_error(
"username " + user +
" not found.");
508 nThreads = sysconf(_SC_NPROCESSORS_ONLN);
515 if (preserveOpt.fIsSet()) {
532 if (target.fIsSet() ||
noCopy) {
533 sources.push_back(destination);
534 std::string& dstRef(destination);
547 if ((
nThreads > 1) || reqProv->isFollowMode()) {
548 for (
unsigned int i = 0; i <
nThreads; i++) {
556 std::ref(hashStream), std::ref(logStream), std::ref(statStream));
563 if (reqProv->isFollowMode()) {
568 reqProv->prepareMappings(sources, destination);
570 reqProv->printMappings(logStream);
572 reqProv->processSources(sources);
573 if (reqProv->isFollowMode()) {
580 request->process(threadData);
581 request->printResults(hashStream, logStream);
592 }
catch (
const std::exception& e) {
595 "badly: ", e.what(),
", pid: ", getpid());
597 errPrinterThread.join();
603 "ewmscp",
"ended",
"sucessfully, pid: ", getpid());
605 if (!startStopMsg.empty()) {
609 errPrinterThread.join();
decltype(resultOutput) resultOutput(nullptr)
copyRequest::timedQueue delayedRequests
class that contains the parser, i.e. does that option handling
copyRequest::simpleQueue results
decltype(printStatAnyway) printStatAnyway('\0', "printStatAnyway", "print stat values at the end anyway")
static options::single< std::string > pidFileName('\0', "pidFile", "name of PID file")
static void instantiate(handlerIdType id)
class for defining the location of a error message in the source code.
void stopQueue(T &queue, const std::string &name)
class specialisation for options of type bool
decltype(dereference) dereference( 'L', "dereference", "always follow symbolic links in SOURCE", false)
static options::single< bool > perFileThreads
decltype(readRateLimit) readRateLimit("readRateLimit")
decltype(queue.size()) size() const
copyRequest::simpleQueue requests
decltype(requestProviderName) requestProviderName('\0', "requestProvider", "name of request provider", "cmdLine")
static base * newHandler(const std::string &name)
static requestProvider * newProvider(const std::string &choice, decltype(requests) aRequests, decltype(delayedRequests) aDelayedRequests, decltype(parents) aParents, decltype(InputHandler) aInputHandler, decltype(OutputHandler) aOutputHandler)
std::ostream * errStream(nullptr)
static void waitForAllInstancesGone()
void delayRequest(copyRequest::timedQueue &delayedRequests, copyRequest::simpleQueue &requests)
runs as it's own thread to move delayed requests from the delay queue to the requests queue as soon a...
decltype(quiet) quiet( 'q', "quiet", "be as quiet as possible", false)
decltype(workDir) workDir('\0', "workDir", "use DIRECTORY as work dir")
static bool retry(std::unique_ptr< base > &request, timedQueue &delayedRequests)
decltype(continueOnError) continueOnError('\0', "continueOnError", "don't stop on errors in non-follow mode", false)
static std::vector< std::atomic< unsigned int > > * processMultiplicities
static volatile std::atomic< bool > stopRequested
static waitQueues::simple< message > & getQueue()
get reference to teh message queue
decltype(gid) gid('\0', "gid", "gid of created files and directories", -1)
static void addAllowedNamesToOption(T &option)
void sigUsrHandler(int sigNum)
static void processQueue(simpleQueue &queue, simpleQueue &resultQueue, timedQueue &delayedRequests)
static handlerIdType handlerId
static void addAllowedNamesToOption(T &option)
static void sigHandler(int)
decltype(fileRateLimit) fileRateLimit("fileRateLimit")
int main(int argc, const char *argv[])
static void addAllowedNamesToOption(options::single< std::string > &option)
static std::function< void(std::ostream &)> & getStatPrinter()
we can't have static functions virtual, so we explicitly use a function to give us the proper print f...
decltype(modeBits) modeBits('\0', "mode", "access mode for copied files", 0644)
virtual void send(const std::string &aMessage, const std::string &aTopic="")=0
void set(const std::string &opt)
decltype(outputHandlerName) outputHandlerName('\0', "outputHandler", "name of outputHandler", "posixFile")
#define defineStaticNoArg(var)
defines a static variable that needs no arguments to it's constructor
static volatile std::atomic< bool > statPrintRequested(false)
static const std::string & getNameByLevel(level aLevel)
void enqueue(std::unique_ptr< T > &item)
decltype(inputHandlerName) inputHandlerName('\0', "inputHandler", "name of inputHandler", "posixFile")
static options::single< bool > printErrorLocation('\0', "printErrorLocation", "print error location (file,line,v fct) instead of prefix")
decltype(preserve) preserve
set of properties to preserve in the copy
static volatile std::atomic< bool > statResetRequested(false)
class for thread-specific data
void joinThread(std::thread &t)
static void waitForAllInstancesGone()
decltype(noCopy) noCopy('\0', "noCopy", "do not create a copy", false)
decltype(checksumCreators) checksumCreators
decltype(statPrefix) statPrefix('\0', "statPrefix", "prefix for statistics log lines", "")
void emit(level aLogLevel, const location &loc, const std::string &aObject, const std::string &aAction, const Args &... args)
function to create and enqueue a message, this is the only way that messages should be created!
decltype(verbose) verbose( 'v', "verbose", "explain what is being done", false)
static std::string reason
#define timerInst(subfunc)
std::unique_ptr< T > dequeue()
static void ThrowUpReasonably()
static void pidFileRemover()
void printErrors(std::ostream &errStream, messageQueue::queue *errQueue)
static factoryClass * newFactory(const std::string &aName)
std::forward_list< std::thread > workers
void good0(T call, const Args &... args)
template function to wrap system calls that return 0 on success
queuesAndThreads()=default
std::unique_ptr< T > dequeue(bool mayCreateNew, Types ... args)
class to make sure all threads are joined before thread object is deleted in case of an exception to ...
decltype(uid) uid('\0', "uid", "uid of created files and directories", -1)
static void RequestStop(const std::string &aReason)
decltype(nThreads) nThreads('\0', "nThreads", "number of simultaneous copys", 1)
static bool checkForInstances()
~queuesAndThreads() noexcept(true)
#define defineStatic(var,...)
defines a static variable and instatitates the constructor with the variable number of arguments.
void printResults(copyRequest::simpleQueue &results, copyRequest::simpleQueue &requests, std::ostream &hashStream, std::ostream &logStream, std::ostream &statStream)
decltype(writeRateLimit) writeRateLimit("writeRateLimit")