27 #include <curl/curl.h> 36 #endif //ENABLE_CFITSIO 68 if (Select(
pipefd[1], 0, 1, MCS_SELECT_WRITE) == -1)
109 sprintf(filename,
"/tmp/mcspipe%04d",
filecount);
114 if (mkfifo(filename, S_IREAD | S_IWRITE) == 0)
145 while (Select(
pipefd[0], 0, 100, MCS_SELECT_READ) != 1) ;
165 while (Select(
pipefd[1], 0, 100, MCS_SELECT_WRITE) != 1) ;
209 bool mcs::URLReader::fl_curl_global_init =
false;
211 mcs::URLReader::URLReader() :
Pipe()
213 if (! fl_curl_global_init) {
214 fl_curl_global_init =
true;
215 curl_global_init(CURL_GLOBAL_ALL);
222 mcs::URLReader::~URLReader()
226 string mcs::URLReader::url()
229 bool mcs::URLReader::chkLocal(
string& url)
231 bool local = (bool) (
232 (url.find(
"://") == string::npos) ||
233 (url.find(
"file://") == 0)
241 int mcs::URLReader::OpenAsFD(
string url)
243 local = chkLocal(url);
249 thr->startDetached();
254 const char* mcs::URLReader::OpenAsFifo(
string url)
256 local = chkLocal(url);
262 thr->startDetached();
269 void mcs::URLReader::Download(
string url,
string fn)
271 ofstream out(fn.c_str());
274 throw MCS_ERROR(MSG_CANT_OPEN_FILE, fn.csz);
276 int fd = OpenAsFD(url);
280 while ((n = read(fd, buf, 1024)))
289 void mcs::URLReader::Close()
300 Event e = *thr->error();
312 int mcs::URLReader::thread_run(
void* p)
314 URLReader* This = (URLReader*) p;
315 This->thread_fetch();
320 void mcs::URLReader::thread_fetch()
326 int fd = open(lurl.c_str(), O_RDONLY);
332 throw MCS_ERROR(MSG_CANT_OPEN_FILE, lurl.c_str());
335 while ((len = read(fd, buf, 1024)))
336 cb_write(buf, len, 1,
this);
342 CURL* curl = curl_easy_init();
347 res = curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
348 if (res != CURLE_OK)
goto error;
351 res = curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 60);
352 if (res != CURLE_OK)
goto error;
354 res = curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10);
355 if (res != CURLE_OK)
goto error;
357 res = curl_easy_setopt(curl, CURLOPT_URL, lurl.c_str());
358 if (res != CURLE_OK)
goto error;
363 res = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, cb_write);
364 if (res != CURLE_OK)
goto error;
366 res = curl_easy_setopt(curl, CURLOPT_WRITEDATA,
this);
367 if (res != CURLE_OK)
goto error;
369 res = curl_easy_setopt(curl, CURLOPT_PRIVATE,
this);
370 if (res != CURLE_OK)
goto error;
372 res = curl_easy_perform(curl);
373 if (res != 0)
goto error;
380 curl_easy_cleanup((CURL*) curl);
381 throw MCS_ERROR(MSG_CURL_ERROR, curl_easy_strerror(res));
386 size_t mcs::URLReader::cb_write(
void *ptr,
size_t size,
size_t nmemb,
389 URLReader* p = (URLReader*) This;
391 if (p->consumerHasGone())
396 return p->Write(ptr, size, nmemb);
400 void mcs::URLReader::flush()
406 unsigned int mcs::URLReader::Write(
void *ptr,
unsigned int size,
408 {
return write(
openWrite(), ptr, size * nmemb); }
413 int mcs::URLReader::Read(
char* buf,
int maxlen)
414 {
return read(
openRead(), buf, maxlen); }
440 fits_get_errstatus(status, buf);
449 FITSReader::~FITSReader()
453 #define FITSKEYLEN 70 454 #define CHECK_FITS_ERROR if (status) throw MCS_ERROR(MSG_FITS_ERROR, fitsError(status).c_str()) 457 int FITSReader::HDUCount()
460 fitsfile *fptr = (fitsfile*) this->fptr;
462 fits_get_num_hdus(fptr, &nhdu, &status);
468 int FITSReader::currentHDU()
471 fitsfile *fptr = (fitsfile*) this->fptr;
472 fits_get_hdu_num(fptr, &i);
477 void FITSReader::selectHDU(
string name,
int extver)
480 fitsfile *fptr = (fitsfile*) this->fptr;
483 fits_movnam_hdu(fptr, ANY_HDU, (
char*) name.c_str(), extver, &status);
486 fits_get_hdu_num(fptr, &i);
491 bool FITSReader::selectNextHDU()
494 int exttype = ANY_HDU;
496 fitsfile *fptr = (fitsfile*) this->fptr;
498 fits_movrel_hdu(fptr, 1, &exttype, &status);
499 if (status == END_OF_FILE)
503 selectHDU(currentHDU());
509 #include <json/json.h> 515 std::string trimfs( std::string s ) {
517 ss.erase( 0, ss.find_first_not_of(
" '\t\n" ) );
518 ss.erase( ss.find_last_not_of(
" '\t\n" ) + 1);
523 void FITSReader::read_json_header(
const string& json_string){
527 bool parsingSuccessful = reader.parse( json_string, root );
529 if ( !parsingSuccessful )
532 std::cout <<
"Failed to parse configuration\n" 533 << reader.getFormattedErrorMessages();
543 const Json::Value fitskeys = root[
"keywords"];
545 unsigned int nkeys= fitskeys.size();
548 cout <<
"Found " << nkeys <<
" keys in JSON string" << endl;
562 if(header_comments.size())
563 header_comments.clear();
569 cout <<
"clear comments "<< header.size() <<
" and " << header_comments.size() << endl;
572 cout <<
"clear comments OK"<<endl;
574 for (
int index = 0; index < nkeys; ++index ){
577 key=fitskeys[index][
"key"].asString();
578 value=fitskeys[index][
"value"].asString();
580 value = trimfs(value);
581 cout <<
"Adding " << index <<
" : " << key <<
" = " << value << endl;
584 d->setName(
string(key));
594 cout <<
"Done loading JSON keys ! Record list: header count = " << header.count() << endl;
596 for(
int i=0;i<header.count();i++){
597 cout << i <<
" : " << header[i].name() <<
" value " << header[i].sval()<< endl;
607 void FITSReader::selectHDU(
int hdunum)
610 long int repeat, width;
616 char buf[FITSKEYLEN];
620 char head_comment[80];
625 fitsfile *fptr = (fitsfile*) this->fptr;
626 fits_get_num_hdus(fptr, &nhdu, &status);
628 if ((hdunum <= 0) || (hdunum > nhdu))
633 fits_movabs_hdu(fptr, hdunum, &hdutype, &status);
636 fits_get_hdrspace(fptr, &nkeys, NULL, &status);
640 header_comments.clear();
642 for (i=1; i<=nkeys; i++) {
643 fits_read_keyn(fptr, i, head_name, head_val, head_comment, &status);
646 Data* d =
new Data(
string(head_val));
647 d->setName(
string(head_name));
650 d =
new Data(
string(head_comment));
651 d->setName(
string(head_name));
652 header_comments.addField(d);
655 fits_get_hdu_type(fptr, &hdutype, &status);
658 if ((hdutype == BINARY_TBL) ||
659 (hdutype == ASCII_TBL) ) {
661 fits_get_num_rows(fptr, &nrows, &status);
665 fits_get_num_cols(fptr, &ncols, &status);
668 for (i=0; i< ncols; i++) {
669 sprintf(reqColName,
"%d", i+1);
670 fits_get_colname(fptr, CASEINSEN, reqColName, buf, &j, &status);
671 fits_get_coltype(fptr, j, &fits_type, &repeat, &width, &status);
675 throw MCS_ERROR(MSG_TYPE_NOT_HANDLED, i, fits_type);
680 Data* d =
new Data(NULL, type, buf, width, flunsign);
688 init(i, nrows, &meta);
728 void FITSReader::open(
Buffer& buf)
735 memfilep = memfile[0];
736 memfilesize = memfile.size();
737 fits_open_memfile(&fptr,
"mem.fits", READONLY,
738 &memfilep, &memfilesize,
744 fits_get_num_hdus(fptr, &nhdu, &status);
753 void FITSReader::open(
string fn)
758 #if defined(ENABLE_CURL) 759 local = URLReader::chkLocal(fn);
766 if (local && this->ffile_is_compressed(fn)) {
767 fits_open_file(&fptr, fn.c_str(), READONLY, &status);
770 fits_open_file(&fptr, fn.c_str(), READONLY, &status);
773 #if defined(ENABLE_CURL) 782 int fd = url.OpenAsFD(fn.c_str());
788 while ((n = read(fd, tmp, 8192)))
791 memfilep = memfile[0];
792 memfilesize = memfile.size();
793 fits_open_memfile(&fptr,
"mem.fits", READONLY,
794 &memfilep, &memfilesize,
815 fits_get_num_hdus(fptr, &nhdu, &status);
823 bool FITSReader::fetch(
unsigned int newpos,
bool random)
826 fitsfile *fptr = (fitsfile*) this->fptr;
831 for (i=0; i<ncols; i++) {
832 void* p = rec()[i].buffer();
836 Types2FITS(rec()[i].type(), rec()[i].isUnsigned(), fitstype);
838 fits_read_col(fptr, fitstype, i+1, newpos+1, 1, 1, NULL,
847 if ( (fitstype == TFLOAT) && ! (isfinite( *(
float*)p) ) ) rec()[i].setNull(1);
848 if ( (fitstype == TDOUBLE) && ! (isfinite( *(
double*)p) ) ) rec()[i].setNull(1);
855 void FITSReader::close() {
857 fitsfile *fptr = (fitsfile*) this->fptr;
860 fits_close_file(fptr, &status);
866 bool FITSReader::ffile_is_compressed(
string fn) {
870 std::ifstream in(fn.c_str(), ios_base::in | ios_base::binary);
876 if ( (memcmp(buf,
"\037\213", 2) == 0) ||
877 (memcmp(buf,
"\120\113", 2) == 0) ||
878 (memcmp(buf,
"\037\036", 2) == 0) ||
879 (memcmp(buf,
"\037\235", 2) == 0) ||
880 (memcmp(buf,
"\037\240", 2) == 0) )
887 cerr <<
"ffile_is_compressed: "<< e.
msg() << endl;
892 #endif //ENABLE_CFITSIO bool enter(int op=1, unsigned int timeout=0)
Enter, or try to enter a critical section.
void closeRead()
Close the file descriptor associated with the consumer side of the pipe.
#define MCS_RS_USEMETAREC
Flag for RecordSet::init().
bool Types2FITS(Types dbt, bool isunsigned, int &fits)
Convert a MCS type into a FITSIO type.
int openRead()
Return the file descriptor for the consumer side of the pipe.
int pipefd[2]
Pipe file descriptors (0 read side, 1 write side).
void closeWrite()
Close the file descriptor associated with the producer side of the pipe.
unsigned int size()
Return size of the buffer.
#define MCS_SUBST_LEADING
To be used with subst(), substitute only if "what" is at the beginning. See subst().
void create()
Create an unnamed pipe, both threads will read/write using file descriptors.
bool isReady()
Returns true if the pipe has been set-up (through the create() or createNamed() methods).
Hold informations about an event.
A dynamic array of Data objects.
void sleep_ms(unsigned int millisec)
A millisecond resolution sleep function.
The base class that implement the data abstraction layer.
#define MCS_ERROR(A, rest...)
Facility to easily pass all necessary parameter to an Event constructor.
string msg()
Returns the message.
static int filecount
Used to generate a unique filename.
string fitsError(int status)
Return a description of a FITS error.
string createNamed()
Create a named pipe, that is a FIFO special file and returns the filename.
int openWrite()
Return the file descriptor for the producer side of the pipe.
Main include file for all MCS based applications.
void addField(Data *d)
Wrapper around Dynamic_Array.push.
#define MCS_RS_KNOW_NROWS
Flag for RecordSet::init().
bool VarLenType(Types type)
Tell if "type" is a variable length type.
#define MCS_STATE_RUNNING
Thread state: the separate thread is executing the run() method.
A general purpose data type.
#define MCS_RS_RANDOM
Flag for RecordSet::init().
string filename()
If the pipe has been set-up through the createNamed() method this returns the generated file name...
string subst(string s, string what, string with, int op=0)
Perform substitutions on a string.
string pipefn
Name of the FIFO file.
bool flcreated
Tell if a pipe has been created.
bool FITS2Types(int fits, Types &dbt, bool &isunsigned)
Convert a FITSIO type into a MCS type.
bool consumerHasGone()
Tell if the consumer thread is still reading.
bool named
If it is a named pipe (a FIFO file) or not.
int leave()
Leave a critical section.
Synchro synchro
To protect the filecount variable.
Namespace for MCS library.
Types
Enumeration of base type for Data.