casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines
TableStreamReader.h
Go to the documentation of this file.
00001 #ifndef TABLE_STREAM_READER_H
00002 #define TABLE_STREAM_READER_H
00003 #include "Misc.h"
00004 #include <libxml/parser.h>
00005 #include <libxml/tree.h>
00006 #include "ASDM.h"
00007 #include "Entity.h"
00008 #include "EndianStream.h"
00009 #include "ConversionException.h"
00010 
00011 #include <sys/types.h>
00012 #include <sys/stat.h>
00013 #include <unistd.h>
00014 
00015 #define READBUFFERSIZE ( 50 * 1024 * 1024 )
00016 namespace asdm {
00039   template<class T, class R> class TableStreamReader {
00040   public:
00044     TableStreamReader(){currentState = S_CLOSED; readBuffer = (char *) malloc (READBUFFERSIZE);}
00045 
00049     virtual ~TableStreamReader(){;}
00050 
00056     void open(const std::string& directory){
00057       checkState(T_OPEN, "TableStreamReader::open");
00058       // Open the file.
00059       tablePath = directory + "/"+ T::name() + ".bin";
00060       tableFile.open(tablePath.c_str(), ios::in|ios::binary);
00061       if (!tableFile.is_open())
00062         throw asdm::ConversionException("Could not open file " + tablePath, T::name());
00063 
00064       //streambuf * sb_p = tableFile.rdbuf()->pubsetbuf(readBuffer, READBUFFERSIZE);
00065       //cout << (unsigned long long) sb_p << endl;
00066 
00067       // Determine the size of the file.
00068       struct stat filestatus;
00069       stat( tablePath.c_str(), &filestatus);
00070       fileSizeInBytes = filestatus.st_size;
00071 
00072       // Locate the xmlPartMIMEHeader.
00073       std::string xmlPartMIMEHeader = "CONTENT-ID: <HEADER.XML>\n\n";
00074       CharComparator comparator(&tableFile, 10000);
00075       std::istreambuf_iterator<char> BEGIN(tableFile.rdbuf());
00076       std::istreambuf_iterator<char> END;
00077       std::istreambuf_iterator<char> it = std::search(BEGIN, END, xmlPartMIMEHeader.begin(), xmlPartMIMEHeader.end(), comparator);
00078       if ((it == END) || (tableFile.tellg() > 10000)) { 
00079         tableFile.seekg(0);
00080         xmlPartMIMEHeader = "CONTENT-ID: <HEADER.XML>\r\n\r\n";
00081         it = BEGIN;
00082         it = std::search(BEGIN, END, xmlPartMIMEHeader.begin(), xmlPartMIMEHeader.end(), comparator);
00083         if ((it == END) || (tableFile.tellg() > 10000)) 
00084           throw asdm::ConversionException("failed to detect the beginning of the XML header.", T::name());
00085       }
00086       // Locate the binaryPartMIMEHeader while accumulating the characters of the xml header.   
00087       std::string binPartMIMEHeader = "--MIME_BOUNDARY\nCONTENT-TYPE: BINARY/OCTET-STREAM\nCONTENT-ID: <CONTENT.BIN>\n\n";
00088       std::string xmlHeader;
00089       CharCompAccumulator compaccumulator(&xmlHeader, &tableFile, 100000);
00090       ++it;
00091       it = std::search(it, END, binPartMIMEHeader.begin(), binPartMIMEHeader.end(), compaccumulator);
00092       if ((it == END) || (tableFile.tellg() > 100000)) 
00093         throw asdm::ConversionException("failed to detect the beginning of the binary part", T::name());
00094       ++it;
00095       xmlHeader.erase(xmlHeader.end() - (binPartMIMEHeader.size() + 1), xmlHeader.end());
00096 
00097       //
00098       // We have the xmlHeader , let's parse it.
00099       //
00100       xmlDoc *doc;
00101       doc = xmlReadMemory(xmlHeader.data(), xmlHeader.size(), "BinaryTableHeader.xml", NULL, XML_PARSE_NOBLANKS);
00102       if ( doc == NULL ) 
00103         throw ConversionException("Failed to parse the xmlHeader into a DOM structure.", T::name());
00104           
00105       xmlNode* root_element = xmlDocGetRootElement(doc);
00106       if ( root_element == NULL || root_element->type != XML_ELEMENT_NODE )
00107         throw asdm::ConversionException("Failed to parse the xmlHeader into a DOM structure.", T::name());
00108     
00109       const ByteOrder* byteOrder = NULL;
00110       if ( std::string("ASDMBinaryTable").compare((const char*) root_element->name) == 0) {
00111         // Then it's an "old fashioned" MIME file for tables.
00112         // Just try to deserialize it with Big_Endian for the bytes ordering.
00113         byteOrder = asdm::ByteOrder::Big_Endian;       
00114         attributesSeq = T::defaultAttributesNamesInBin();
00115       }
00116       else if (std::string(T::name()+"Table").compare((const char*) root_element->name) == 0) {
00117         // It's a new (and correct) MIME file for tables.
00118         //
00119         // 1st )  Look for a BulkStoreRef element with an attribute byteOrder.
00120         //
00121         xmlNode* bulkStoreRef = 0;
00122         xmlNode* child = root_element->children;
00123       
00124         // Skip the two first children (Entity and ContainerEntity).
00125         bulkStoreRef = (child ==  0) ? 0 : ( (child->next) == 0 ? 0 : child->next->next );
00126       
00127         if ( bulkStoreRef == 0 || (bulkStoreRef->type != XML_ELEMENT_NODE)  || (std::string("BulkStoreRef").compare((const char*) bulkStoreRef->name) != 0))
00128           throw asdm::ConversionException ("Could not find the element '/"+T::name()+"Table/BulkStoreRef'. Invalid XML header '"+ xmlHeader + "'.", T::name());
00129         
00130         // We found BulkStoreRef, now look for its attribute byteOrder.
00131         _xmlAttr* byteOrderAttr = 0;
00132         for (struct _xmlAttr* attr = bulkStoreRef->properties; attr; attr = attr->next) 
00133           if (string("byteOrder").compare((const char*) attr->name) == 0) {
00134             byteOrderAttr = attr;
00135             break;
00136           }
00137       
00138         if (byteOrderAttr == 0) 
00139           throw asdm::ConversionException("Could not find the element '/"+T::name()+"Table/BulkStoreRef/@byteOrder'. Invalid XML header '" + xmlHeader +"'.", T::name());
00140       
00141         string byteOrderValue = string((const char*) byteOrderAttr->children->content);
00142         if (!(byteOrder = asdm::ByteOrder::fromString(byteOrderValue)))
00143           throw asdm::ConversionException("No valid value retrieved for the element '/"+T::name()+"Table/BulkStoreRef/@byteOrder'. Invalid XML header '" + xmlHeader + "'.", T::name());
00144                 
00145         //
00146         // 2nd) Look for the Attributes element and grab the names of the elements it contains.
00147         //
00148         xmlNode* attributes = bulkStoreRef->next;
00149         if ( attributes == 0 || (attributes->type != XML_ELEMENT_NODE)  || (string("Attributes").compare((const char*) attributes->name) != 0))  
00150           throw asdm::ConversionException ("Could not find the element '/"+T::name()+"Table/Attributes'. Invalid XML header '"+ xmlHeader + "'.", T::name());
00151  
00152         xmlNode* childOfAttributes = attributes->children;
00153         
00154         while ( childOfAttributes != 0 && (childOfAttributes->type == XML_ELEMENT_NODE) ) {
00155           attributesSeq.push_back(string((const char*) childOfAttributes->name));
00156           childOfAttributes = childOfAttributes->next;
00157         }
00158       }
00159       // Create an EndianIFStream from the substring containing the binary part.
00160       eifs = asdm::EndianIFStream (&tableFile, byteOrder);
00161     
00162       asdm::Entity entity = Entity::fromBin((EndianIStream &)eifs);
00163     
00164       // We do nothing with that but we have to read it.
00165       asdm::Entity containerEntity = Entity::fromBin((EndianIStream &)eifs);
00166 
00167       // Let's read numRows but ignore it and rely on the value specified in the ASDM.xml file.    
00168       int numRows = ((EndianIStream &)eifs).readInt();
00169       
00170       // Memorize the starting point of rows.
00171       whereRowsStart = tableFile.tellg();
00172 
00173       // Update the state
00174       currentState = S_OPENED;
00175     }
00176 
00181     void reset() {
00182       checkState(T_RESET, "TableStreamReader::reset");
00183       clear();
00184       tableFile.seekg(whereRowsStart);
00185     }
00186 
00193     const std::vector<R*>& nextNRows(unsigned int nRows) {
00194       checkState(T_READ, "TableStreamReader::nextNRows"); 
00195       clear();
00196       unsigned int nread = 0;
00197       T& tableRef =  (T&) asdm.getTable(T::name());
00198       while ( hasRows() && nread < nRows ) {
00199         rows.push_back(R::fromBin((EndianIStream&) eifs, tableRef, attributesSeq));
00200         nread++;
00201       }
00202       return rows;
00203     }
00204 
00213     const std::vector<R*>& untilNBytes(unsigned int nBytes) {
00214       checkState(T_READ, "TableStreamReader::untilNBytes"); 
00215       clear();
00216       off_t whereAmI  = tableFile.tellg();
00217       if (!hasRows()) return rows;
00218 
00219       T& tableRef = (T&) asdm.getTable(T::name());
00220       do {
00221         rows.push_back(R::fromBin((EndianIStream&) eifs, tableRef , attributesSeq));
00222       }
00223       while (((tableFile.tellg() - whereAmI) < nBytes) && hasRows());
00224       return rows;
00225     }
00226 
00230     bool hasRows() {
00231       checkState(T_CHECK, "TableStreamReader::hasRows");
00232       return tableFile.tellg() < (fileSizeInBytes - 19);
00233     }
00234 
00238     void close() {
00239       checkState(T_CLOSE, "TableStreamReader::close"); 
00240       clear();
00241       if (tableFile.is_open()) tableFile.close();
00242       free(readBuffer);
00243       // Update the state.
00244       currentState = S_CLOSED;
00245     }
00246     
00247   private:
00248     std::string                 tablePath;
00249     std::ifstream               tableFile;
00250     off_t                       fileSizeInBytes;
00251     asdm::EndianIFStream        eifs;
00252     std::vector<std::string>    attributesSeq;
00253     asdm::ASDM                  asdm;
00254     std::vector<R*>             rows;
00255 
00256     char*                       readBuffer;
00257 
00258     streampos whereRowsStart;
00259 
00260     enum State {S_CLOSED, S_OPENED};
00261     enum Transition {T_OPEN, T_CHECK, T_RESET, T_READ, T_CLOSE};
00262     State currentState;
00263 
00264     void checkState(Transition t, const string& methodName) const {
00265       switch (currentState) {
00266       case S_CLOSED:
00267         if (t == T_OPEN) return;
00268 
00269       case S_OPENED:
00270         if (t == T_CHECK || t == T_RESET || t == T_READ || t == T_CLOSE) return;
00271       }
00272       throw ConversionException("Invalid call of method '" + methodName + "' in the current context.", T::name());
00273     }
00277     void clear() {
00278       for (unsigned int i = 0; i < rows.size(); i++)
00279         if (rows[i]) delete rows[i];
00280       rows.clear();
00281     }
00282   };
00283 } // end namespace asdm
00284 #endif