Line data Source code
1 : //# VLADiskInput.cc: This class reads VLA archive files from a Disk
2 : //# Copyright (C) 1999,2000,2001,2002,2003
3 : //# Associated Universities, Inc. Washington DC, USA.
4 : //#
5 : //# This library is free software; you can redistribute it and/or modify it
6 : //# under the terms of the GNU Library General Public License as published by
7 : //# the Free Software Foundation; either version 2 of the License, or (at your
8 : //# option) any later version.
9 : //#
10 : //# This library is distributed in the hope that it will be useful, but WITHOUT
11 : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 : //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
13 : //# License for more details.
14 : //#
15 : //# You should have received a copy of the GNU Library General Public License
16 : //# along with this library; if not, write to the Free Software Foundation,
17 : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
18 : //#
19 : //# Correspondence concerning AIPS++ should be addressed as follows:
20 : //# Internet email: casa-feedback@nrao.edu.
21 : //# Postal address: AIPS++ Project Office
22 : //# National Radio Astronomy Observatory
23 : //# 520 Edgemont Road
24 : //# Charlottesville, VA 22903-2475 USA
25 : //#
26 : //# $Id: VLADiskInput.cc,v 19.2.20.3 2006/10/09 20:08:34 wyoung Exp $
27 :
28 : #include <nrao/VLA/VLADiskInput.h>
29 : #include <nrao/VLA/nreal.h>
30 : #include <casacore/casa/Utilities/Assert.h>
31 : #include <casacore/casa/Exceptions/Error.h>
32 : #include <casacore/casa/IO/MemoryIO.h>
33 : #include <casacore/casa/IO/ByteIO.h>
34 : #include <casacore/casa/OS/RegularFile.h>
35 : #include <iostream>
36 : #include <arpa/inet.h>
37 : #include <cstdlib>
38 : #include <unistd.h>
39 :
40 : #ifndef MAX_LOGICAL_RECORD_SIZE
41 : #define MAX_LOGICAL_RECORD_SIZE 850000u
42 : #endif
43 :
44 : // this is an expediant, we should get it from the IERS table
45 : #define LEAP_SECONDS 33
46 :
47 0 : String VLADiskInput::getTodaysFile(int relDay){
48 0 : int LeapSeconds = LEAP_SECONDS;
49 : time_t curTime, holdTime; /* time vars */
50 : struct tm *tmTime; /* time structure */
51 0 : ostringstream aDate;
52 0 : curTime = time(&holdTime) + LeapSeconds + relDay*86400;
53 0 : tmTime = gmtime(&curTime);
54 0 : oldDay = tmTime->tm_yday;
55 :
56 : // Format up the online vla file
57 :
58 0 : aDate << visDir << "/vla";
59 0 : aDate.width(4);
60 0 : aDate.fill('0');
61 0 : aDate << tmTime->tm_year+1900 << "-";
62 0 : aDate.width(2);
63 0 : aDate << tmTime->tm_mon+1 << "-" << tmTime->tm_mday;
64 0 : aDate << ".dat";
65 0 : return(String(aDate.str()));
66 0 : }
67 :
68 : // simple function see what today is
69 :
70 0 : Int VLADiskInput::whatsToday(){
71 0 : time_t LeapSeconds = LEAP_SECONDS;
72 : time_t curTime, holdTime; /* time vars */
73 : struct tm *tmTime; /* time structure */
74 0 : curTime = time(&holdTime) + LeapSeconds;
75 0 : tmTime = gmtime(&curTime);
76 0 : return tmTime->tm_yday;
77 : }
78 :
79 : // As we go over IAT midnight the VLA data repository file
80 : // changes its name so we have to attach to the new day's file.
81 :
82 0 : void VLADiskInput::reattachCurrent(){
83 : // OK file may not have been created yet so lets wait a bit
84 0 : RegularFile nrtFile(getTodaysFile(0));
85 0 : while(!nrtFile.exists()){
86 0 : sleep(5);
87 : }
88 : //std::cerr << "Attached to new file" << std::endl;
89 0 : itsFile = new RegularFileIO(nrtFile);
90 0 : }
91 :
92 : // Here's the online version to read data from the VLA online data repository
93 :
94 0 : VLADiskInput::VLADiskInput(const String& onlineFlag)
95 0 : :VLAArchiveInput()
96 : {
97 : // Need to check for :-x to set the relative day.
98 0 : if(onlineFlag == "online"){
99 0 : previousDay = 0;
100 0 : char *dataDir = getenv("VISDATADIR");
101 0 : if(!dataDir){
102 0 : dataDir = strdup("/home/vis-serv-mirror/vladata");
103 : }
104 0 : visDir = String(dataDir);
105 0 : Path nrtFile = Path(getTodaysFile());
106 0 : itsFile = new RegularFileIO(RegularFile(nrtFile));
107 0 : onlineFill = true;
108 0 : itsFile->seek(0, ByteIO::End);
109 0 : } else {
110 0 : throw(AipsError("Invalid online specifier " + onlineFlag));
111 : }
112 0 : }
113 :
114 0 : VLADiskInput::VLADiskInput(const Path& fileName)
115 : :VLAArchiveInput(),
116 0 : itsFile(new RegularFileIO(fileName)), onlineFill(false)
117 : {
118 0 : attachFile(fileName.absoluteName().c_str());
119 0 : }
120 :
121 0 : VLADiskInput::~VLADiskInput() {
122 : // The RegularFileIO destructor closes the disk file.
123 0 : delete itsFile;
124 0 : detachFile();
125 0 : }
126 :
127 : /*
128 : Bool VLADiskInput::read() {
129 : // Clear the internal buffers and reset the flags as we will try to read some
130 : // more data.
131 : itsMemIO.clear();
132 : // Find an initial record.
133 : Short n = 1, m;
134 : uInt curRecSize(0);
135 : Int bytesToRead(0);
136 : Int thisReadSize(0);
137 : uChar *recordPtr(0);
138 : if (findFirstRecord(m) == false) if(!onlineFill)return false;
139 : // We have the first physical record in Memory. Now decode how long this
140 : // logical record is.
141 : itsRecord.seek(0);
142 : Int logicalRecordSize;
143 : if(this->bytesRead() < this->totalBytes()){
144 : try {
145 : itsRecord >> logicalRecordSize;
146 : logicalRecordSize *= 2;
147 : curRecSize = itsMemIO.length();
148 : bytesToRead = logicalRecordSize - curRecSize;
149 : // make the buffer a bit bigger than necessary as we may need to read up to
150 : // VLAArchiveInput::BlockSize - 1 extra bytes (to pad out the block).
151 : // cerr << "Uno Setting buffer size: " << logicalRecordSize << " " << VLAArchiveInput::BlockSize << endl;
152 : if(logicalRecordSize > MAX_LOGICAL_RECORD_SIZE)
153 : return false;
154 : recordPtr = itsMemIO.setBuffer(logicalRecordSize +
155 : VLAArchiveInput::BlockSize-1);
156 : } catch (AipsError x) {
157 : std::cerr << "end of file???" << std::endl;
158 : std::cerr << x.getMesg() << std::endl;
159 : }
160 : // Sanity check here.
161 : }
162 : cerr << "A File position is " << itsFile->seek(0, ByteIO::Current) << " " << endl;
163 : while(this->bytesRead() >= this->totalBytes()){
164 : //std::cerr << "Waiting..." << std::endl;
165 : if(!onlineFill){
166 : cerr << "VLADiskInput::read end of file encountered." << endl;
167 : itsMemIO.clear();
168 : return false;
169 : }
170 : // I'm online so check to see if the day changed and we need
171 : // to move to the next day's file otherwise sleep and see if
172 : // we've got some more data.
173 : sleep(10);
174 : itsMemIO.clear();
175 : itsRecord.seek(0);
176 : Int curDay = whatsToday();
177 : if(oldDay != curDay){
178 : reattachCurrent();
179 : oldDay += 1;
180 : }
181 : //std::cerr << this->bytesRead() << " " << this->totalBytes() << std::endl;
182 : if(this->bytesRead() < this->totalBytes()){
183 : itsFile->seek(Int64(-4), ByteIO::Current);
184 : if(findFirstRecord(m)){
185 : itsRecord >> logicalRecordSize;
186 : // A new record has arrived, march on!
187 : logicalRecordSize *= 2;
188 : curRecSize = itsMemIO.length();
189 : bytesToRead = logicalRecordSize - curRecSize;
190 : // cerr << "Dos Setting buffer size: " << logicalRecordSize << " " << VLAArchiveInput::BlockSize << endl;
191 : if(logicalRecordSize > MAX_LOGICAL_RECORD_SIZE)
192 : return false;
193 : recordPtr = itsMemIO.setBuffer(logicalRecordSize +
194 : VLAArchiveInput::BlockSize-1);
195 : }
196 : }
197 : }
198 :
199 : cerr << "B File position is " << itsFile->seek(0, ByteIO::Current) << " " << endl;
200 : while (bytesToRead > 0) {
201 : thisReadSize = VLAArchiveInput::HeaderSize;
202 : DebugAssert(static_cast<Int64>(curRecSize + VLAArchiveInput::HeaderSize) <=
203 : itsMemIO.length(), AipsError);
204 : Int bytesRead =
205 : itsFile->read(VLAArchiveInput::HeaderSize, recordPtr+curRecSize, false);
206 : if (bytesRead < static_cast<Int>(VLAArchiveInput::HeaderSize)) {
207 : itsMemIO.clear();
208 : return false;
209 : }
210 : // Check the sequence numbers
211 : {
212 : itsRecord.seek(Int(curRecSize));
213 : Short newn, newm;
214 : itsRecord >> newn;
215 : itsRecord >> newm;
216 : if (newm != m || ++n != newn) {
217 : itsMemIO.clear();
218 : return false;
219 : }
220 : }
221 : // The sequence numbers are OK so read the rest of the data
222 : if (bytesToRead <
223 : static_cast<Int>(VLAArchiveInput::BlockSize*
224 : VLAArchiveInput::MaxBlocksPerPhysicalRecord)) {
225 : thisReadSize = (bytesToRead-1)/VLAArchiveInput::BlockSize + 1;
226 : thisReadSize *= VLAArchiveInput::BlockSize;
227 : } else {
228 : thisReadSize = VLAArchiveInput::BlockSize *
229 : VLAArchiveInput::MaxBlocksPerPhysicalRecord;
230 : }
231 : thisReadSize -= VLAArchiveInput::HeaderSize;
232 : DebugAssert(static_cast<Int64>(curRecSize+thisReadSize)<=itsMemIO.length(),
233 : AipsError);
234 : bytesRead = itsFile->read(thisReadSize, recordPtr+curRecSize, false);
235 : if (bytesRead < thisReadSize) {
236 : itsMemIO.clear();
237 : return false;
238 : }
239 : curRecSize += thisReadSize;
240 : bytesToRead -= thisReadSize;
241 : cerr << "File position is " << itsFile->seek(0, ByteIO::Current) << " " << endl;
242 : }
243 : cerr << "C File position is " << itsFile->seek(0, ByteIO::Current) << " " << endl;
244 : itsMemIO.setUsed(logicalRecordSize);
245 : itsRecord.seek(0);
246 : return true;
247 : }
248 : */
249 :
250 :
251 0 : Bool VLADiskInput::read() {
252 0 : Bool rstatus(true);
253 0 : itsMemIO->clear();
254 0 : Long logicalRecordSize(MAX_LOGICAL_RECORD_SIZE);
255 0 : Char* recordPtr = (Char *)itsMemIO->setBuffer(logicalRecordSize);
256 0 : logicalRecordSize = readVLALogRec(recordPtr);
257 0 : itsMemIO->setUsed(logicalRecordSize);
258 0 : itsRecord.seek(0);
259 0 : if(!logicalRecordSize)
260 0 : rstatus = false;
261 0 : return rstatus;
262 : }
263 :
264 :
265 : // Find an initial record. An initial record MUST have the first 2-byte integer
266 : // as 1 and the next 2-byte integer as a number greater than zero. If we have
267 : // not found an initial record after searching 5MBytes worth of data then just
268 : // give up.
269 : //
270 : //This is nuts for VLA Disk files and needs to be redone as we can calculate
271 : //the number of physical records and avoid all this piecemeal reading of
272 : //the file to find where the records start.
273 : //wky 2-23-08
274 0 : Bool VLADiskInput::findFirstRecord(Short& m) {
275 0 : const uInt maxBytesToSearch = 5*1024*1024;
276 0 : Short n = 0 ;
277 0 : m = 0;
278 0 : cerr << "AA File position is " << itsFile->seek(0, ByteIO::Current) << " " << endl;
279 : {
280 0 : uInt bytesSearched = 0;
281 : // Search for the correct sequence number or give up after a while.
282 0 : Bool maybe(false);
283 : uInt logicalRecordSize;
284 0 : uChar* recordPtr = itsMemIO->setBuffer(VLAArchiveInput::HeaderSize);
285 0 : while (!maybe && (bytesSearched <= maxBytesToSearch)) {
286 : //std::cerr << n << " " << m << std::endl;
287 0 : if (bytesSearched > 0) {
288 0 : if(itsFile->seek(Int64(VLAArchiveInput::BlockSize -
289 0 : VLAArchiveInput::HeaderSize),
290 0 : ByteIO::Current) <= 0)
291 0 : return false;
292 : }
293 0 : bytesSearched += VLAArchiveInput::BlockSize;
294 : // std::cerr << bytesSearched << std::endl;
295 :
296 : // std::cerr << this->bytesRead() << " " << this->totalBytes() << std::endl;
297 : Int bytesRead =
298 0 : itsFile->read(VLAArchiveInput::HeaderSize, recordPtr, false);
299 0 : cerr << "BB File position is " << itsFile->seek(0, ByteIO::Current) << " " << endl;
300 0 : if (bytesRead < static_cast<Int>(VLAArchiveInput::HeaderSize)) {
301 0 : itsMemIO->clear();
302 0 : return false;
303 : }
304 : // Find out what the sequence numbers are.
305 0 : itsRecord.seek(0);
306 0 : itsRecord >> n;
307 0 : itsRecord >> m;
308 : // OK we need to make sure that Logical record size is OK too before accepting n and m.
309 0 : if(n == 1 && m > 0 && m < 40){
310 0 : itsFile->read(4, recordPtr, false);
311 0 : itsRecord.seek(0);
312 0 : itsRecord >> logicalRecordSize;
313 0 : itsFile->seek(Int64(-4), ByteIO::Current);
314 : // itsRecord.seek(-4);
315 : //`cerr << "Logical Record Size is: " << logicalRecordSize << endl;
316 0 : if(logicalRecordSize > MAX_LOGICAL_RECORD_SIZE)
317 0 : maybe = false;
318 : else
319 0 : maybe = true;
320 : }
321 : }
322 : //cerr << n << " " << m << endl;
323 0 : if (bytesSearched > maxBytesToSearch) {
324 0 : itsMemIO->clear();
325 0 : return false;
326 : }
327 : }
328 : // OK so we have found the beginning of the first physical record. Now read
329 : // the data into the logical record.
330 0 : uInt offset = 0;
331 0 : Int bytesToCopy =
332 0 : VLAArchiveInput::MaxBlocksPerPhysicalRecord * VLAArchiveInput::BlockSize -
333 0 : VLAArchiveInput::HeaderSize;
334 0 : if (m == 1) { // If m=1 we may need to copy less than the maximum number of
335 : // blocks per physical record. To work this out we need to read and parse
336 : // the first block.
337 0 : itsRecord.seek(0);
338 0 : const Int bytesToRead =
339 0 : VLAArchiveInput::BlockSize - VLAArchiveInput::HeaderSize;
340 0 : uChar* recordPtr = itsMemIO->setBuffer(bytesToRead);
341 : // cerr << "Bytes to read " << bytesToRead << endl;
342 0 : const Int bytesRead = itsFile->read(bytesToRead, recordPtr, false);
343 0 : cerr << "CC File position is " << itsFile->seek(0, ByteIO::Current) << " " << endl;
344 0 : if (bytesRead < bytesToRead) {
345 0 : itsMemIO->clear();
346 0 : return false;
347 : }
348 0 : itsRecord.seek(0);
349 : Int logicalRecordSize;
350 0 : itsRecord >> logicalRecordSize;
351 0 : logicalRecordSize *= 2;
352 0 : bytesToCopy =
353 0 : ((logicalRecordSize - bytesToRead)/VLAArchiveInput::BlockSize + 1) *
354 0 : VLAArchiveInput::BlockSize;
355 0 : offset = bytesToRead;
356 : }
357 0 : if (bytesToCopy > 0) {
358 0 : uChar* recordPtr = itsMemIO->setBuffer(bytesToCopy+offset);
359 : // cerr << "Bytes to copy " << bytesToCopy << endl;
360 0 : Int bytesRead = itsFile->read(bytesToCopy, recordPtr+offset, false);
361 0 : cerr << "DD File position is " << itsFile->seek(0, ByteIO::Current) << " " << endl;
362 0 : if (bytesRead < bytesToCopy) {
363 0 : itsMemIO->clear();
364 0 : return false;
365 : }
366 : }
367 0 : DebugAssert(n == 1, AipsError);
368 0 : DebugAssert(m > 0, AipsError);
369 0 : return true;
370 : }
371 :
372 0 : uInt VLADiskInput::bytesRead() {
373 0 : return itsFile->seek(0, ByteIO::Current);
374 : }
375 :
376 0 : uInt VLADiskInput::totalBytes() {
377 0 : return itsFile->length();
378 : }
379 :
380 : // Local Variables:
381 : // compile-command: "gmake VLADiskInput; cd test; gmake OPTLIB=1 tVLADiskInput"
382 : // End:
|