casa
$Rev:20696$
|
00001 #ifndef TABLE_STREAM_READER_H 00002 #define TABLE_STREAM_READER_H 00003 #include "Misc.h" 00004 #include <libxml/parser.h> 00005 #include <libxml/tree.h> 00006 #include "ASDM.h" 00007 #include "Entity.h" 00008 #include "EndianStream.h" 00009 #include "ConversionException.h" 00010 00011 #include <sys/types.h> 00012 #include <sys/stat.h> 00013 #include <unistd.h> 00014 00015 #define READBUFFERSIZE ( 50 * 1024 * 1024 ) 00016 namespace asdm { 00039 template<class T, class R> class TableStreamReader { 00040 public: 00044 TableStreamReader(){currentState = S_CLOSED; readBuffer = (char *) malloc (READBUFFERSIZE);} 00045 00049 virtual ~TableStreamReader(){;} 00050 00056 void open(const std::string& directory){ 00057 checkState(T_OPEN, "TableStreamReader::open"); 00058 // Open the file. 00059 tablePath = directory + "/"+ T::name() + ".bin"; 00060 tableFile.open(tablePath.c_str(), ios::in|ios::binary); 00061 if (!tableFile.is_open()) 00062 throw asdm::ConversionException("Could not open file " + tablePath, T::name()); 00063 00064 //streambuf * sb_p = tableFile.rdbuf()->pubsetbuf(readBuffer, READBUFFERSIZE); 00065 //cout << (unsigned long long) sb_p << endl; 00066 00067 // Determine the size of the file. 00068 struct stat filestatus; 00069 stat( tablePath.c_str(), &filestatus); 00070 fileSizeInBytes = filestatus.st_size; 00071 00072 // Locate the xmlPartMIMEHeader. 00073 std::string xmlPartMIMEHeader = "CONTENT-ID: <HEADER.XML>\n\n"; 00074 CharComparator comparator(&tableFile, 10000); 00075 std::istreambuf_iterator<char> BEGIN(tableFile.rdbuf()); 00076 std::istreambuf_iterator<char> END; 00077 std::istreambuf_iterator<char> it = std::search(BEGIN, END, xmlPartMIMEHeader.begin(), xmlPartMIMEHeader.end(), comparator); 00078 if ((it == END) || (tableFile.tellg() > 10000)) { 00079 tableFile.seekg(0); 00080 xmlPartMIMEHeader = "CONTENT-ID: <HEADER.XML>\r\n\r\n"; 00081 it = BEGIN; 00082 it = std::search(BEGIN, END, xmlPartMIMEHeader.begin(), xmlPartMIMEHeader.end(), comparator); 00083 if ((it == END) || (tableFile.tellg() > 10000)) 00084 throw asdm::ConversionException("failed to detect the beginning of the XML header.", T::name()); 00085 } 00086 // Locate the binaryPartMIMEHeader while accumulating the characters of the xml header. 00087 std::string binPartMIMEHeader = "--MIME_BOUNDARY\nCONTENT-TYPE: BINARY/OCTET-STREAM\nCONTENT-ID: <CONTENT.BIN>\n\n"; 00088 std::string xmlHeader; 00089 CharCompAccumulator compaccumulator(&xmlHeader, &tableFile, 100000); 00090 ++it; 00091 it = std::search(it, END, binPartMIMEHeader.begin(), binPartMIMEHeader.end(), compaccumulator); 00092 if ((it == END) || (tableFile.tellg() > 100000)) 00093 throw asdm::ConversionException("failed to detect the beginning of the binary part", T::name()); 00094 ++it; 00095 xmlHeader.erase(xmlHeader.end() - (binPartMIMEHeader.size() + 1), xmlHeader.end()); 00096 00097 // 00098 // We have the xmlHeader , let's parse it. 00099 // 00100 xmlDoc *doc; 00101 doc = xmlReadMemory(xmlHeader.data(), xmlHeader.size(), "BinaryTableHeader.xml", NULL, XML_PARSE_NOBLANKS); 00102 if ( doc == NULL ) 00103 throw ConversionException("Failed to parse the xmlHeader into a DOM structure.", T::name()); 00104 00105 xmlNode* root_element = xmlDocGetRootElement(doc); 00106 if ( root_element == NULL || root_element->type != XML_ELEMENT_NODE ) 00107 throw asdm::ConversionException("Failed to parse the xmlHeader into a DOM structure.", T::name()); 00108 00109 const ByteOrder* byteOrder = NULL; 00110 if ( std::string("ASDMBinaryTable").compare((const char*) root_element->name) == 0) { 00111 // Then it's an "old fashioned" MIME file for tables. 00112 // Just try to deserialize it with Big_Endian for the bytes ordering. 00113 byteOrder = asdm::ByteOrder::Big_Endian; 00114 attributesSeq = T::defaultAttributesNamesInBin(); 00115 } 00116 else if (std::string(T::name()+"Table").compare((const char*) root_element->name) == 0) { 00117 // It's a new (and correct) MIME file for tables. 00118 // 00119 // 1st ) Look for a BulkStoreRef element with an attribute byteOrder. 00120 // 00121 xmlNode* bulkStoreRef = 0; 00122 xmlNode* child = root_element->children; 00123 00124 // Skip the two first children (Entity and ContainerEntity). 00125 bulkStoreRef = (child == 0) ? 0 : ( (child->next) == 0 ? 0 : child->next->next ); 00126 00127 if ( bulkStoreRef == 0 || (bulkStoreRef->type != XML_ELEMENT_NODE) || (std::string("BulkStoreRef").compare((const char*) bulkStoreRef->name) != 0)) 00128 throw asdm::ConversionException ("Could not find the element '/"+T::name()+"Table/BulkStoreRef'. Invalid XML header '"+ xmlHeader + "'.", T::name()); 00129 00130 // We found BulkStoreRef, now look for its attribute byteOrder. 00131 _xmlAttr* byteOrderAttr = 0; 00132 for (struct _xmlAttr* attr = bulkStoreRef->properties; attr; attr = attr->next) 00133 if (string("byteOrder").compare((const char*) attr->name) == 0) { 00134 byteOrderAttr = attr; 00135 break; 00136 } 00137 00138 if (byteOrderAttr == 0) 00139 throw asdm::ConversionException("Could not find the element '/"+T::name()+"Table/BulkStoreRef/@byteOrder'. Invalid XML header '" + xmlHeader +"'.", T::name()); 00140 00141 string byteOrderValue = string((const char*) byteOrderAttr->children->content); 00142 if (!(byteOrder = asdm::ByteOrder::fromString(byteOrderValue))) 00143 throw asdm::ConversionException("No valid value retrieved for the element '/"+T::name()+"Table/BulkStoreRef/@byteOrder'. Invalid XML header '" + xmlHeader + "'.", T::name()); 00144 00145 // 00146 // 2nd) Look for the Attributes element and grab the names of the elements it contains. 00147 // 00148 xmlNode* attributes = bulkStoreRef->next; 00149 if ( attributes == 0 || (attributes->type != XML_ELEMENT_NODE) || (string("Attributes").compare((const char*) attributes->name) != 0)) 00150 throw asdm::ConversionException ("Could not find the element '/"+T::name()+"Table/Attributes'. Invalid XML header '"+ xmlHeader + "'.", T::name()); 00151 00152 xmlNode* childOfAttributes = attributes->children; 00153 00154 while ( childOfAttributes != 0 && (childOfAttributes->type == XML_ELEMENT_NODE) ) { 00155 attributesSeq.push_back(string((const char*) childOfAttributes->name)); 00156 childOfAttributes = childOfAttributes->next; 00157 } 00158 } 00159 // Create an EndianIFStream from the substring containing the binary part. 00160 eifs = asdm::EndianIFStream (&tableFile, byteOrder); 00161 00162 asdm::Entity entity = Entity::fromBin((EndianIStream &)eifs); 00163 00164 // We do nothing with that but we have to read it. 00165 asdm::Entity containerEntity = Entity::fromBin((EndianIStream &)eifs); 00166 00167 // Let's read numRows but ignore it and rely on the value specified in the ASDM.xml file. 00168 int numRows = ((EndianIStream &)eifs).readInt(); 00169 00170 // Memorize the starting point of rows. 00171 whereRowsStart = tableFile.tellg(); 00172 00173 // Update the state 00174 currentState = S_OPENED; 00175 } 00176 00181 void reset() { 00182 checkState(T_RESET, "TableStreamReader::reset"); 00183 clear(); 00184 tableFile.seekg(whereRowsStart); 00185 } 00186 00193 const std::vector<R*>& nextNRows(unsigned int nRows) { 00194 checkState(T_READ, "TableStreamReader::nextNRows"); 00195 clear(); 00196 unsigned int nread = 0; 00197 T& tableRef = (T&) asdm.getTable(T::name()); 00198 while ( hasRows() && nread < nRows ) { 00199 rows.push_back(R::fromBin((EndianIStream&) eifs, tableRef, attributesSeq)); 00200 nread++; 00201 } 00202 return rows; 00203 } 00204 00213 const std::vector<R*>& untilNBytes(unsigned int nBytes) { 00214 checkState(T_READ, "TableStreamReader::untilNBytes"); 00215 clear(); 00216 off_t whereAmI = tableFile.tellg(); 00217 if (!hasRows()) return rows; 00218 00219 T& tableRef = (T&) asdm.getTable(T::name()); 00220 do { 00221 rows.push_back(R::fromBin((EndianIStream&) eifs, tableRef , attributesSeq)); 00222 } 00223 while (((tableFile.tellg() - whereAmI) < nBytes) && hasRows()); 00224 return rows; 00225 } 00226 00230 bool hasRows() { 00231 checkState(T_CHECK, "TableStreamReader::hasRows"); 00232 return tableFile.tellg() < (fileSizeInBytes - 19); 00233 } 00234 00238 void close() { 00239 checkState(T_CLOSE, "TableStreamReader::close"); 00240 clear(); 00241 if (tableFile.is_open()) tableFile.close(); 00242 free(readBuffer); 00243 // Update the state. 00244 currentState = S_CLOSED; 00245 } 00246 00247 private: 00248 std::string tablePath; 00249 std::ifstream tableFile; 00250 off_t fileSizeInBytes; 00251 asdm::EndianIFStream eifs; 00252 std::vector<std::string> attributesSeq; 00253 asdm::ASDM asdm; 00254 std::vector<R*> rows; 00255 00256 char* readBuffer; 00257 00258 streampos whereRowsStart; 00259 00260 enum State {S_CLOSED, S_OPENED}; 00261 enum Transition {T_OPEN, T_CHECK, T_RESET, T_READ, T_CLOSE}; 00262 State currentState; 00263 00264 void checkState(Transition t, const string& methodName) const { 00265 switch (currentState) { 00266 case S_CLOSED: 00267 if (t == T_OPEN) return; 00268 00269 case S_OPENED: 00270 if (t == T_CHECK || t == T_RESET || t == T_READ || t == T_CLOSE) return; 00271 } 00272 throw ConversionException("Invalid call of method '" + methodName + "' in the current context.", T::name()); 00273 } 00277 void clear() { 00278 for (unsigned int i = 0; i < rows.size(); i++) 00279 if (rows[i]) delete rows[i]; 00280 rows.clear(); 00281 } 00282 }; 00283 } // end namespace asdm 00284 #endif