MCS  0.3.3-alpha7
Readers.cc
1 // ----------------------------------------------------------------------^
2 // Copyright (C) 2004, 2005, 2006, 2007, 2008 Giorgio Calderone
3 // (mailto: <gcalderone@ifc.inaf.it>)
4 //
5 // This file is part of MCS.
6 //
7 // MCS is free software; you can redistribute it and/or modify
8 // it under the terms of the GNU General Public License as published by
9 // the Free Software Foundation; either version 2 of the License, or
10 // (at your option) any later version.
11 //
12 // MCS is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU General Public License
18 // along with MCS; if not, write to the Free Software
19 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 //
21 // ----------------------------------------------------------------------$
22 #include "mcs.hh"
23 using namespace mcs;
24 //#undef ENABLE_CURL
25 //CURL includes
26 #if ENABLE_CURL
27 #include <curl/curl.h>
28 #else
29 #undef ENABLE_CURL
30 #endif
31 
32 //CFITSIO include
33 #if ENABLE_CFITSIO
34 #include <fitsio.h>
35 //#include <fstream>
36 #endif //ENABLE_CFITSIO
37 
38 
39 //Synchro Pipe::synchro;
40 int Pipe::filecount = 0;
41 
43 {
44  pipefn = "";
45  pipefd[0] = 0;
46  pipefd[1] = 0;
47  flcreated = false;
48  named = false;
49 }
50 
52 {
53  //Assume that is always the thread who reads from the pipe that
54  //manages the object creation/destruction
55  //closeRead();
56 }
57 
59 { return flcreated; }
60 
62 { return pipefn; }
63 
64 
66 {
67  if (pipefd[1])
68  if (Select(pipefd[1], 0, 1, MCS_SELECT_WRITE) == -1)
69  return true;
70 
71  return false;
72 }
73 
74 
76 {
77  if (flcreated)
78  throw MCS_ERROR(MSG_PIPE_YET_OPENED);
79 
80  if (pipefd[0] + pipefd[1])
81  throw MCS_ERROR(MSG_PIPE_YET_OPENED);
82 
83  if (pipe(pipefd) != 0)
84  throw MCS_ERROR(MSG_CALLING_PIPE);
85 
86  flcreated = true;
87  named = false;
88 }
89 
91 {
92  char filename[20];
93 
94  if (flcreated)
95  throw MCS_ERROR(MSG_PIPE_YET_OPENED);
96 
97  if (pipefd[0] + pipefd[1])
98  throw MCS_ERROR(MSG_PIPE_YET_OPENED);
99 
100  do {
101  /*
102  Due to the fact that a Pipe object is typically used by two
103  concurrent threads, and that the FIFO file is not open just as
104  it is created (instead it is opened in the openRead() method)
105  it is not safe to use tempnam-like functions. So we provide our
106  own mechanisms to solve filename conflicts.
107  */
108  synchro.enter();
109  sprintf(filename, "/tmp/mcspipe%04d", filecount);
110  filecount++;
111  filecount = filecount % 10000;
112  synchro.leave();
113 
114  if (mkfifo(filename, S_IREAD | S_IWRITE) == 0)
115  break;
116 
117  sleep_ms(20);
118  } while (1);
119 
120  pipefn = filename;
121  flcreated = true;
122  named = true;
123 
124  return pipefn;
125 }
126 
127 
128 
129 
130 
131 
133 {
134  if (! flcreated)
135  throw MCS_ERROR(MSG_PIPE_NOT_CREATED);
136 
137  if (! pipefd[0]) {
138  if (named) {
139  pipefd[0] = open(pipefn.c_str(), O_RDONLY);
140  unlink(pipefn.c_str()); //Remove the FIFO, it is no longer needed
141  if (pipefd[0] == -1) throw MCS_ERROR(MSG_CANT_OPEN_FILE, pipefn.c_str());
142  }
143 
144  //Ensure the other side hand of the pipe has been opened
145  while (Select(pipefd[0], 0, 100, MCS_SELECT_READ) != 1) ;
146  }
147 
148  return pipefd[0];
149 }
150 
151 
153 {
154  if (! flcreated)
155  throw MCS_ERROR(MSG_PIPE_NOT_CREATED);
156 
157  if (! pipefd[1]) {
158  if (named) {
159  pipefd[1] = open(pipefn.c_str(), O_WRONLY);
160  unlink(pipefn.c_str()); //Remove the FIFO, it is no longer needed
161  if (pipefd[1] == -1) throw MCS_ERROR(MSG_CANT_OPEN_FILE, pipefn.c_str());
162  }
163 
164  //Ensure the other side hand of the pipe has been opened
165  while (Select(pipefd[1], 0, 100, MCS_SELECT_WRITE) != 1) ;
166  }
167 
168  return pipefd[1];
169 }
170 
171 
173 {
174  if (! flcreated) return; //Already closed
175  if (named) unlink(pipefn.c_str()); //Remove the FIFO
176 
177  if (pipefd[0]) {
178  //Close read side of the pipe
179  close(pipefd[0]);
180  pipefd[0] = 0;
181  }
182 
183  pipefn = "";
184  flcreated = false;
185  named = false;
186 }
187 
189 {
190  if (! flcreated) return; //Already closed
191  if (named) unlink(pipefn.c_str()); //Remove the FIFO
192 
193  if (pipefd[1]) {
194  //Close write side of the pipe
195  close(pipefd[1]);
196  pipefd[1] = 0;
197  }
198 }
199 
200 
201 
202 
203 
204 
205 
206 
207 
208 #if ENABLE_CURL
209 bool mcs::URLReader::fl_curl_global_init = false;
210 
211 mcs::URLReader::URLReader() : Pipe()
212 {
213  if (! fl_curl_global_init) {
214  fl_curl_global_init = true;
215  curl_global_init(CURL_GLOBAL_ALL);
216  }
217 
218  thr = NULL;
219 }
220 
221 
222 mcs::URLReader::~URLReader()
223 { Close(); }
224 
225 
226 string mcs::URLReader::url()
227 { return lurl; }
228 
229 bool mcs::URLReader::chkLocal(string& url)
230 {
231  bool local = (bool) (
232  (url.find("://") == string::npos) ||
233  (url.find("file://") == 0)
234  );
235 
236  if (local) url = subst(url, "file://", "", MCS_SUBST_LEADING);
237  return local;
238 }
239 
240 
241 int mcs::URLReader::OpenAsFD(string url)
242 {
243  local = chkLocal(url);
244  this->lurl = url;
245 
246  create();
247 
248  thr = new ThreadFunc(thread_run, this);
249  thr->startDetached();
250  return openRead();
251 }
252 
253 
254 const char* mcs::URLReader::OpenAsFifo(string url)
255 {
256  local = chkLocal(url);
257  this->lurl = url;
258 
259  string fn = createNamed();
260 
261  thr = new ThreadFunc(thread_run, this);
262  thr->startDetached();
263 
264  return fn.c_str();
265 }
266 
267 
268 
269 void mcs::URLReader::Download(string url, string fn)
270 {
271  ofstream out(fn.c_str());
272 
273  if (! out.is_open())
274  throw MCS_ERROR(MSG_CANT_OPEN_FILE, fn.csz);
275 
276  int fd = OpenAsFD(url);
277  char buf[1024];
278  int n;
279 
280  while ((n = read(fd, buf, 1024)))
281  out.write(buf, n);
282 
283  out.close();
284  Close();
285 }
286 
287 
288 
289 void mcs::URLReader::Close()
290 {
291  if (thr) {
292  closeRead();
293  lurl = "";
294 
295  while (thr->state() <= MCS_STATE_RUNNING)
296  sleep_ms(20);
297 
298  if (thr->error())
299  {
300  Event e = *thr->error();
301  delete thr;
302  thr = NULL;
303  throw e;
304  }
305  delete thr;
306  thr = NULL;
307  }
308 }
309 
310 
311 
312 int mcs::URLReader::thread_run(void* p)
313 {
314  URLReader* This = (URLReader*) p;
315  This->thread_fetch();
316  return 0;
317 }
318 
319 
320 void mcs::URLReader::thread_fetch()
321 {
322  openWrite(); //Open file for writing, it is necessary here otherwise the
323  //openRead() on the other thread won't return
324 
325  if (local) {
326  int fd = open(lurl.c_str(), O_RDONLY);
327  char buf[1024];
328  int len;
329 
330  if (fd == -1) {
331  closeWrite();
332  throw MCS_ERROR(MSG_CANT_OPEN_FILE, lurl.c_str());
333  }
334 
335  while ((len = read(fd, buf, 1024)))
336  cb_write(buf, len, 1, this);
337 
338  closeWrite();
339  }
340  else {
341  CURLcode res;
342  CURL* curl = curl_easy_init();
343 
344  if (! curl)
345  throw MCS_ERROR(MSG_CURL_INIT);
346 
347  res = curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
348  if (res != CURLE_OK) goto error;
349 
350  //Timeout in seconds
351  res = curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 60);
352  if (res != CURLE_OK) goto error;
353 
354  res = curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10);
355  if (res != CURLE_OK) goto error;
356 
357  res = curl_easy_setopt(curl, CURLOPT_URL, lurl.c_str());
358  if (res != CURLE_OK) goto error;
359 
360  //res = curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, cb_progress);
361  //if (res != CURLE_OK) goto error;
362 
363  res = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, cb_write);
364  if (res != CURLE_OK) goto error;
365 
366  res = curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
367  if (res != CURLE_OK) goto error;
368 
369  res = curl_easy_setopt(curl, CURLOPT_PRIVATE, this);
370  if (res != CURLE_OK) goto error;
371 
372  res = curl_easy_perform(curl);
373  if (res != 0) goto error;
374 
375  closeWrite();
376  return;
377 
378  error:
379  closeWrite();
380  curl_easy_cleanup((CURL*) curl);
381  throw MCS_ERROR(MSG_CURL_ERROR, curl_easy_strerror(res));
382  }
383 }
384 
385 
386 size_t mcs::URLReader::cb_write(void *ptr, size_t size, size_t nmemb,
387  void *This)
388 {
389  URLReader* p = (URLReader*) This;
390 
391  if (p->consumerHasGone()) //The pipe has been closed on the consumer side
392  return 0; //This will abort CURL data fetching
393 
394  //The Write() method is virtual because some derived class may wants
395  //to write in a different manner
396  return p->Write(ptr, size, nmemb);
397 }
398 
399 
400 void mcs::URLReader::flush()
401 { fsync(openWrite()); }
402 
403 
404 //In the base class version of the Write() method simply writes on
405 //output file descriptor.
406 unsigned int mcs::URLReader::Write(void *ptr, unsigned int size,
407  unsigned int nmemb)
408 { return write(openWrite(), ptr, size * nmemb); }
409 
410 
411 
412 
413 int mcs::URLReader::Read(char* buf, int maxlen)
414 { return read(openRead(), buf, maxlen); }
415 
416 
417 //Event* mcs::URLReader::thread_error()
418 //{
419 // if (thr)
420 // return thr->error();
421 //
422 // return NULL;
423 //}
424 
425 
426 //int mcs::URLReader::cb_progress(void *clientp,
427 // double dltotal, double dlnow,
428 // double ultotal, double ulnow)
429 //{
430 // cout << "Download statistics: " << dlnow << " : " << dltotal << endl;
431 // return 0; //Continue reading
432 //}
433 #endif
434 
435 
436 
437 #if ENABLE_CFITSIO
438 string mcs::fitsError(int status) {
439  char buf[31];
440  fits_get_errstatus(status, buf);
441  return string(buf);
442 }
443 
444 
445 FITSReader::FITSReader() : RecordSet()
446 { fptr = NULL; }
447 
448 
449 FITSReader::~FITSReader()
450 {}
451 
452 
453 #define FITSKEYLEN 70
454 #define CHECK_FITS_ERROR if (status) throw MCS_ERROR(MSG_FITS_ERROR, fitsError(status).c_str())
455 
456 
457 int FITSReader::HDUCount()
458 {
459  int status = 0;
460  fitsfile *fptr = (fitsfile*) this->fptr;
461 
462  fits_get_num_hdus(fptr, &nhdu, &status);
463 
464  return nhdu;
465 }
466 
467 
468 int FITSReader::currentHDU()
469 {
470  int i;
471  fitsfile *fptr = (fitsfile*) this->fptr;
472  fits_get_hdu_num(fptr, &i);
473 
474  return i;
475 }
476 
477 void FITSReader::selectHDU(string name, int extver)
478 {
479  int status, i;
480  fitsfile *fptr = (fitsfile*) this->fptr;
481 
482  status = 0;
483  fits_movnam_hdu(fptr, ANY_HDU, (char*) name.c_str(), extver, &status);
484  CHECK_FITS_ERROR;
485 
486  fits_get_hdu_num(fptr, &i);
487  selectHDU(i);
488 }
489 
490 
491 bool FITSReader::selectNextHDU()
492 {
493  int status = 0;
494  int exttype = ANY_HDU;
495 
496  fitsfile *fptr = (fitsfile*) this->fptr;
497 
498  fits_movrel_hdu(fptr, 1, &exttype, &status);
499  if (status == END_OF_FILE)
500  return false;
501 
502  CHECK_FITS_ERROR;
503  selectHDU(currentHDU());
504 
505  return true;
506 }
507 
508 
509 #include <json/json.h>
510 
511 /*
512  Trim input string for leading and trailing quote (') and spaces.
513 
514 */
515 std::string trimfs( std::string s ) {
516  string ss = s;
517  ss.erase( 0, ss.find_first_not_of( " '\t\n" ) );
518  ss.erase( ss.find_last_not_of( " '\t\n" ) + 1);
519  return ss;
520 }
521 
522 
523 void FITSReader::read_json_header(const string& json_string){
524 
525  Json::Value root; // will contains the root value after parsing.
526  Json::Reader reader;
527  bool parsingSuccessful = reader.parse( json_string, root );
528 
529  if ( !parsingSuccessful )
530  {
531  // report to the user the failure and their locations in the document.
532  std::cout << "Failed to parse configuration\n"
533  << reader.getFormattedErrorMessages();
534  return;
535  }
536 
537  // Get the value of the member of root named 'encoding', return 'UTF-8' if there is no
538  // such member.
539  // std::string encoding = root.get("encoding", "UTF-8" ).asString();
540 
541  // Get the value of the member of root named 'encoding', return a 'null' value if
542  // there is no such member.
543  const Json::Value fitskeys = root["keywords"];
544 
545  unsigned int nkeys= fitskeys.size();
546  string key, value;
547 
548  cout << "Found " << nkeys << " keys in JSON string" << endl;
549 
550  // pthread_init()
551 
552  //cout << "XX"<<endl;
553 
554  //header_x.addField(new Data("tototot"));
555  // header_comments.addField(new Data("tototot"));
556 
557  //cout << "XX"<<endl;
558 
559 
560  //cout << "clear comments "<< header.size() << " and " << header_comments.size() << endl;
561 
562  if(header_comments.size())
563  header_comments.clear();
564 
565 
566  // if(header_x.size())
567  // header_x.clear();
568 
569  cout << "clear comments "<< header.size() << " and " << header_comments.size() << endl;
570 
571  header.clear();
572  cout << "clear comments OK"<<endl;
573 
574  for ( int index = 0; index < nkeys; ++index ){ // Iterates over the sequence elements.
575 
576 
577  key=fitskeys[index]["key"].asString();
578  value=fitskeys[index]["value"].asString();
579 
580  value = trimfs(value);
581  cout << "Adding " << index << " : " << key <<" = " << value << endl;
582 
583  Data* d = new Data(string(value));
584  d->setName(string(key));
585  header.addField(d);
586 
587 
588  // d = new Data(string(head_comment));
589  // d->setName(string(head_name));
590  // header_comments.addField(d);
591 
592  }
593 
594  cout << "Done loading JSON keys ! Record list: header count = " << header.count() << endl;
595 
596  for(int i=0;i<header.count();i++){
597  cout << i << " : " << header[i].name() << " value " << header[i].sval()<< endl;
598  }
599 
600 
601 
602 
603  // sleep(5);
604 
605 }
606 
607 void FITSReader::selectHDU(int hdunum)
608 {
609  int status;
610  long int repeat, width;
611  int i, j;
612  int fits_type;
613  Types type;
614  bool flunsign;
615  string s;
616  char buf[FITSKEYLEN];
617  char reqColName[5];
618  char head_name[80];
619  char head_val[80];
620  char head_comment[80];
621  Record meta;
622  int nkeys;
623  int hdutype;
624 
625  fitsfile *fptr = (fitsfile*) this->fptr;
626  fits_get_num_hdus(fptr, &nhdu, &status);
627 
628  if ((hdunum <= 0) || (hdunum > nhdu))
629  throw; //ERROR
630 
631  status = 0;
632  hdutype = ANY_HDU;
633  fits_movabs_hdu(fptr, hdunum, &hdutype, &status);
634  CHECK_FITS_ERROR;
635 
636  fits_get_hdrspace(fptr, &nkeys, NULL, &status);
637  CHECK_FITS_ERROR;
638 
639  header.clear();
640  header_comments.clear();
641 
642  for (i=1; i<=nkeys; i++) {
643  fits_read_keyn(fptr, i, head_name, head_val, head_comment, &status);
644  CHECK_FITS_ERROR;
645 
646  Data* d = new Data(string(head_val));
647  d->setName(string(head_name));
648  header.addField(d);
649 
650  d = new Data(string(head_comment));
651  d->setName(string(head_name));
652  header_comments.addField(d);
653  }
654 
655  fits_get_hdu_type(fptr, &hdutype, &status);
656  CHECK_FITS_ERROR;
657 
658  if ((hdutype == BINARY_TBL) ||
659  (hdutype == ASCII_TBL) ) {
660  //Number of records
661  fits_get_num_rows(fptr, &nrows, &status);
662  CHECK_FITS_ERROR;
663 
664  //Number of fields
665  fits_get_num_cols(fptr, &ncols, &status);
666  CHECK_FITS_ERROR;
667 
668  for (i=0; i< ncols; i++) {
669  sprintf(reqColName, "%d", i+1);
670  fits_get_colname(fptr, CASEINSEN, reqColName, buf, &j, &status);
671  fits_get_coltype(fptr, j, &fits_type, &repeat, &width, &status);
672  CHECK_FITS_ERROR;
673 
674  if (! FITS2Types(fits_type, type, flunsign))
675  throw MCS_ERROR(MSG_TYPE_NOT_HANDLED, i, fits_type);
676 
677  if (! VarLenType(type))
678  width = 0;
679 
680  Data* d = new Data(NULL, type, buf, width, flunsign);
681  meta.addField(d);
682  }
683 
685  if (local)
686  i |= MCS_RS_RANDOM;
687 
688  init(i, nrows, &meta);
689  startFetch();
690  }
691  else { //Empty recordset
693  init(i, 0, &meta);
694  }
695 }
696 
697 //static FILE* _origstdin_ = stdin;
698 
699 //int fits_open_pipe(fitsfile** fptr, int filedes, int rwmode, int* status)
700 //{
701 // if (filedes == 0)
702 // stdin = _origstdin_;
703 // else
704 // _origstdin_ = fdopen(filedes, "r");
705 //
706 // return fits_open_file(fptr, "-", READONLY, status);
707 //}
708 
709 
710 //void FITSReader::open(int fd)
711 //{
712 // int status = 0;
713 // fitsfile *fptr;
714 //
715 // fits_open_pipe(&fptr, fd, READONLY, &status);
716 // CHECK_FITS_ERROR;
717 // this->fptr = fptr;
718 //
719 // fits_get_num_hdus(fptr, &nhdu, &status);
720 // CHECK_FITS_ERROR;
721 //
722 // selectHDU(1);
723 //}
724 
725 
726 //static FILE* _origstdin_ = stdin;
727 
728 void FITSReader::open(Buffer& buf)
729 {
730  int status;
731  fitsfile *fptr;
732 
733  memfile.set(buf[0], buf.size(), AUTO_FREE);
734 
735  memfilep = memfile[0];
736  memfilesize = memfile.size();
737  fits_open_memfile(&fptr, "mem.fits", READONLY,
738  &memfilep, &memfilesize,
739  0, NULL, &status);
740 
741  CHECK_FITS_ERROR;
742  this->fptr = fptr;
743 
744  fits_get_num_hdus(fptr, &nhdu, &status);
745  CHECK_FITS_ERROR;
746 
747  selectHDU(1);
748 }
749 
750 
751 
752 
753 void FITSReader::open(string fn)
754 {
755  int status;
756  fitsfile *fptr;
757 
758 #if defined(ENABLE_CURL)
759  local = URLReader::chkLocal(fn);
760 #else
761  local = true;
762 #endif
763 
764  status = 0;
765  //if (local) {
766  if (local && this->ffile_is_compressed(fn)) {
767  fits_open_file(&fptr, fn.c_str(), READONLY, &status);
768  }
769  else if (local) {
770  fits_open_file(&fptr, fn.c_str(), READONLY, &status);
771  }
772 
773 #if defined(ENABLE_CURL)
774  else {
775  //The FITSReader can read also remote files (using its parent class
776  //URLReader). Note that CFITSIO cannot read a FITS file as a
777  //stream so actually it is entirely loaded into memory and then
778  //parsed.
779 
780  //Open the remote file as a file descriptor
781  URLReader url;
782  int fd = url.OpenAsFD(fn.c_str());
783  int n;
784  char tmp[8192];
785 
786  //Load the FITS file into memory
787  memfile.free();
788  while ((n = read(fd, tmp, 8192)))
789  memfile(n) << tmp;
790 
791  memfilep = memfile[0];
792  memfilesize = memfile.size();
793  fits_open_memfile(&fptr, "mem.fits", READONLY,
794  &memfilep, &memfilesize,
795  0, NULL, &status);
796 
798  //stdin = fdopen(fd, "r");
799  //
801  //fits_open_file(&fptr, "-", READONLY, &status);
802  //
804  //stdin = _origstdin_;
805 
806  //Close the remote file descriptor. If an error occurred during timeout
807  //an exception will be thrown here.
808  url.Close();
809  }
810 #endif
811 
812  CHECK_FITS_ERROR;
813  this->fptr = fptr;
814 
815  fits_get_num_hdus(fptr, &nhdu, &status);
816  CHECK_FITS_ERROR;
817 
818  selectHDU(1);
819 }
820 
821 
822 
823 bool FITSReader::fetch(unsigned int newpos, bool random)
824 {
825  int i, status = 0;
826  fitsfile *fptr = (fitsfile*) this->fptr;
827 
828  if (newpos >= nrows)
829  return false;
830 
831  for (i=0; i<ncols; i++) {
832  void* p = rec()[i].buffer();
833  int fitstype;
834  int isnull;
835 
836  Types2FITS(rec()[i].type(), rec()[i].isUnsigned(), fitstype);
837 
838  fits_read_col(fptr, fitstype, i+1, newpos+1, 1, 1, NULL,
839  (VarLenType(rec()[i].type()) ? &p : p),
840  &isnull, &status);
841  CHECK_FITS_ERROR;
842 
843  //rec()[i].setNull(isnull);
844  rec()[i].setNull(0);
845  //if ( (fitstype == TFLOAT) && (isnan( *(float*)p) ) ) rec()[i].setNull(1);
846  //if ( (fitstype == TDOUBLE) && (isnan( *(double*)p) ) ) rec()[i].setNull(1);
847  if ( (fitstype == TFLOAT) && ! (isfinite( *(float*)p) ) ) rec()[i].setNull(1);
848  if ( (fitstype == TDOUBLE) && ! (isfinite( *(double*)p) ) ) rec()[i].setNull(1);
849  }
850 
851  return true;
852 }
853 
854 
855 void FITSReader::close() {
856  int status = 0;
857  fitsfile *fptr = (fitsfile*) this->fptr;
858 
859  if (fptr) {
860  fits_close_file(fptr, &status);
861  fptr = NULL;
862  CHECK_FITS_ERROR;
863  }
864 }
865 
866 bool FITSReader::ffile_is_compressed(string fn) {
867  char buf[2];
868 
869  try {
870  std::ifstream in(fn.c_str(), ios_base::in | ios_base::binary);
871 
872  in.read(&buf[0], 2);
873 
874  in.close();
875 
876  if ( (memcmp(buf, "\037\213", 2) == 0) || /* GZIP */
877  (memcmp(buf, "\120\113", 2) == 0) || /* PKZIP */
878  (memcmp(buf, "\037\036", 2) == 0) || /* PACK */
879  (memcmp(buf, "\037\235", 2) == 0) || /* LZW */
880  (memcmp(buf, "\037\240", 2) == 0) ) /* LZH */
881  {
882  return true; /* compressed file */
883  } else {
884  return false; /* not a compressed file */
885  }
886  } catch(Event& e) {
887  cerr <<"ffile_is_compressed: "<< e.msg() << endl;
888  return false;
889  }
890 
891 }
892 #endif //ENABLE_CFITSIO
893 
894 
895 
896 
897 
898 
899 //FileReader::FileReader() : RecordSet()
900 //{
901 // fd = 0;
902 //}
903 //
904 //FileReader::~FileReader()
905 //{
906 // this->close();
907 //}
908 //
909 //void FileReader::open(string url)
910 //{
911 // this->close();
912 //
913 // if (master.count() == 0)
914 // throw;
915 //
916 // fd = input.OpenAsFD(url);
917 // init(0);
918 // startFetch();
919 //}
920 //
921 //
922 //bool FileReader::fetch(unsigned int newpos, bool random)
923 //{
924 // if (sep.length() > 0) { //Variable length record
925 // ;
926 //
927 // }
928 // else { //Fixed length record
929 // size_t recsize = 0;
930 // int i;
931 // for (i=0; i<master.count(); i++) //Compute record size
932 // recsize += master[i].maxLength();
933 //
934 // buf.chkAllocation(recsize);
935 // unsigned int size = read(fd, buf[0], recsize);
936 //
937 // if (size < recsize)
938 // throw;
939 //
940 // //Copy data into the current record object
941 // rec() = master;
942 // unsigned int cur = 0;
943 // for (i=0; i<master.count(); i++) {
944 // buf(cur, rec()[i].maxLength()) >> rec()[i].buffer();
945 // cur += rec()[i].maxLength();
946 // }
947 // }
948 //
949 // return true;
950 //}
951 //
952 //void FileReader::close()
953 //{
954 // sep = "";
955 // eor = "";
956 // master.clear();
957 //}
958 //
959 //void FileReader::setMaster(Record& rec)
960 //{
961 // sep = "";
962 // eor = "";
963 // master = rec;
964 //}
965 //
966 //void FileReader::setMaster(Record& rec, string sep)
967 //{
968 // this->sep = sep;
969 // eor = "";
970 // master = rec;
971 //}
972 //
973 //void FileReader::setMaster(Record& rec, string sep, string eor)
974 //{
975 // this->sep = sep;
976 // this->eor = eor;
977 // master = rec;
978 //}
bool enter(int op=1, unsigned int timeout=0)
Enter, or try to enter a critical section.
Definition: Thread.cc:143
void closeRead()
Close the file descriptor associated with the consumer side of the pipe.
Definition: Readers.cc:172
#define MCS_RS_USEMETAREC
Flag for RecordSet::init().
Definition: mcs.hh:4449
bool Types2FITS(Types dbt, bool isunsigned, int &fits)
Convert a MCS type into a FITSIO type.
Definition: Data.cc:564
int openRead()
Return the file descriptor for the consumer side of the pipe.
Definition: Readers.cc:132
int pipefd[2]
Pipe file descriptors (0 read side, 1 write side).
Definition: mcs.hh:7706
void closeWrite()
Close the file descriptor associated with the producer side of the pipe.
Definition: Readers.cc:188
unsigned int size()
Return size of the buffer.
Definition: Record.cc:208
#define MCS_SUBST_LEADING
To be used with subst(), substitute only if "what" is at the beginning. See subst().
Definition: mcs.hh:502
void create()
Create an unnamed pipe, both threads will read/write using file descriptors.
Definition: Readers.cc:75
bool isReady()
Returns true if the pipe has been set-up (through the create() or createNamed() methods).
Definition: Readers.cc:58
Hold informations about an event.
Definition: mcs.hh:814
A dynamic array of Data objects.
Definition: mcs.hh:4170
Pipe()
Constructor.
Definition: Readers.cc:42
~Pipe()
Destructor.
Definition: Readers.cc:51
void sleep_ms(unsigned int millisec)
A millisecond resolution sleep function.
Definition: Thread.cc:603
The base class that implement the data abstraction layer.
Definition: mcs.hh:4510
#define MCS_ERROR(A, rest...)
Facility to easily pass all necessary parameter to an Event constructor.
Definition: mcs.hh:964
string msg()
Returns the message.
Definition: Event.cc:104
static int filecount
Used to generate a unique filename.
Definition: mcs.hh:7700
string fitsError(int status)
Return a description of a FITS error.
Definition: Readers.cc:438
High level buffer.
Definition: mcs.hh:1095
string createNamed()
Create a named pipe, that is a FIFO special file and returns the filename.
Definition: Readers.cc:90
int openWrite()
Return the file descriptor for the producer side of the pipe.
Definition: Readers.cc:152
Main include file for all MCS based applications.
void addField(Data *d)
Wrapper around Dynamic_Array.push.
Definition: Record.cc:364
#define MCS_RS_KNOW_NROWS
Flag for RecordSet::init().
Definition: mcs.hh:4457
bool VarLenType(Types type)
Tell if "type" is a variable length type.
Definition: Data.cc:349
#define MCS_STATE_RUNNING
Thread state: the separate thread is executing the run() method.
Definition: mcs.hh:2409
A general purpose data type.
Definition: mcs.hh:3092
#define MCS_RS_RANDOM
Flag for RecordSet::init().
Definition: mcs.hh:4466
string filename()
If the pipe has been set-up through the createNamed() method this returns the generated file name...
Definition: Readers.cc:61
string subst(string s, string what, string with, int op=0)
Perform substitutions on a string.
Definition: Utils.cc:135
string pipefn
Name of the FIFO file.
Definition: mcs.hh:7703
bool flcreated
Tell if a pipe has been created.
Definition: mcs.hh:7715
bool FITS2Types(int fits, Types &dbt, bool &isunsigned)
Convert a FITSIO type into a MCS type.
Definition: Data.cc:526
bool consumerHasGone()
Tell if the consumer thread is still reading.
Definition: Readers.cc:65
bool named
If it is a named pipe (a FIFO file) or not.
Definition: mcs.hh:7718
int leave()
Leave a critical section.
Definition: Thread.cc:194
Synchro synchro
To protect the filecount variable.
Definition: mcs.hh:7697
Namespace for MCS library.
Types
Enumeration of base type for Data.
Definition: mcs.hh:54

mcslogo

MCS (My Customizable Server) ver. 0.3.3-alpha7
Documentation generated on Mon May 28 07:39:41 UTC 2018