diff --git a/GenericIO.cxx b/GenericIO.cxx index 5a577d277b3eeaa8ce95ac6ad038d8d6f3542ab7..9c9048b40504c6007fa515ba03a3f33954da6b88 100644 --- a/GenericIO.cxx +++ b/GenericIO.cxx @@ -738,11 +738,13 @@ nocomp: #endif // GENERICIO_NO_MPI template <bool IsBigEndian> -void GenericIO::readHeaderLeader(void *GHPtr, bool MustMatch, int SplitNRanks, - string &LocalFileName, uint64_t &HeaderSize, vector<char> &Header) { +void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks, + int Rank, int SplitNRanks, + string &LocalFileName, uint64_t &HeaderSize, + vector<char> &Header) { GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr; - if (MustMatch) { + if (MB == MismatchDisallowed) { if (SplitNRanks != (int) GH.NRanks) { stringstream ss; ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " << @@ -776,6 +778,36 @@ void GenericIO::readHeaderLeader(void *GHPtr, bool MustMatch, int SplitNRanks, } } #endif + } else if (MB == MismatchRedistribute && !Redistributing) { + Redistributing = true; + + int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size(); + int NFileRanksPerRank = NFileRanks/NRanks; + int NRemFileRank = NFileRanks % NRanks; + + if (!NFileRanksPerRank) { + // We have only the remainder, so the last NRemFileRank ranks get one + // file rank, and the others don't. + if (NRemFileRank && NRanks - Rank <= NRemFileRank) + SourceRanks.push_back(NRanks - (Rank + 1)); + } else { + // Since NRemFileRank < NRanks, and we don't want to put any extra memory + // load on rank 0 (because rank 0's memory load is normally higher than + // the other ranks anyway), the last NRemFileRank will each take + // (NFileRanksPerRank+1) file ranks. + + int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1; + for (int i = 1; i <= Rank; ++i) { + FirstFileRank = LastFileRank + 1; + LastFileRank = FirstFileRank + NFileRanksPerRank - 1; + + if (NRemFileRank && NRanks - i <= NRemFileRank) + ++LastFileRank; + } + + for (int i = FirstFileRank; i <= LastFileRank; ++i) + SourceRanks.push_back(i); + } } HeaderSize = GH.HeaderSize; @@ -790,7 +822,7 @@ void GenericIO::readHeaderLeader(void *GHPtr, bool MustMatch, int SplitNRanks, // Note: Errors from this function should be recoverable. This means that if // one rank throws an exception, then all ranks should. -void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap) { +void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) { int NRanks, Rank; #ifndef GENERICIO_NO_MPI MPI_Comm_rank(Comm, &Rank); @@ -801,7 +833,7 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap #endif if (EffRank == -1) - EffRank = Rank; + EffRank = MB == MismatchRedistribute ? 0 : Rank; if (RankMap.empty() && CheckPartMap) { // First, check to see if the file is a rank map. @@ -813,7 +845,7 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap #else GenericIO GIO(FileName, FileIOType); #endif - GIO.openAndReadHeader(true, 0, false); + GIO.openAndReadHeader(MismatchDisallowed, 0, false); RanksInMap = GIO.readNumElems(); RankMap.resize(RanksInMap + GIO.requestedExtraSpace()/sizeof(int)); @@ -845,17 +877,21 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap if (RankMap.empty()) { LocalFileName = FileName; #ifndef GENERICIO_NO_MPI - MPI_Comm_dup(Comm, &SplitComm); + MPI_Comm_dup(MB == MismatchRedistribute ? MPI_COMM_SELF : Comm, &SplitComm); #endif } else { stringstream ss; ss << FileName << "#" << RankMap[EffRank]; LocalFileName = ss.str(); #ifndef GENERICIO_NO_MPI + if (MB == MismatchRedistribute) { + MPI_Comm_dup(MPI_COMM_SELF, &SplitComm); + } else { #ifdef __bgq__ - MPI_Barrier(Comm); + MPI_Barrier(Comm); #endif - MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm); + MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm); + } #endif } @@ -896,10 +932,10 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap FH.get()->read(&GH, sizeof(GlobalHeader<false>), 0, "global header"); if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicLE) { - readHeaderLeader<false>(&GH, MustMatch, SplitNRanks, LocalFileName, + readHeaderLeader<false>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName, HeaderSize, Header); } else if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicBE) { - readHeaderLeader<true>(&GH, MustMatch, SplitNRanks, LocalFileName, + readHeaderLeader<true>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName, HeaderSize, Header); } else { string Error = "invalid file-type identifier"; @@ -935,7 +971,6 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap MPI_Bcast(&Header[0], HeaderSize, MPI_BYTE, 0, SplitComm); #endif - FH.getHeaderCache().clear(); GlobalHeader<false> *GH = (GlobalHeader<false> *) &Header[0]; @@ -945,7 +980,8 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap OpenFileName = LocalFileName; #ifndef GENERICIO_NO_MPI - MPI_Barrier(Comm); + if (!DisableCollErrChecking) + MPI_Barrier(Comm); if (FileIOType == FileIOMPI) FH.get() = new GenericFileIO_MPI(SplitComm); @@ -957,10 +993,12 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap int OpenErr = 0, TotOpenErr; try { FH.get()->open(LocalFileName, true); - MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm); + MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, + DisableCollErrChecking ? MPI_COMM_SELF : Comm); } catch (...) { OpenErr = 1; - MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm); + MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, + DisableCollErrChecking ? MPI_COMM_SELF : Comm); throw; } @@ -1096,7 +1134,7 @@ int GenericIO::readGlobalRankNumber(int EffRank) { #endif } - openAndReadHeader(false, EffRank, false); + openAndReadHeader(MismatchAllowed, EffRank, false); assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); @@ -1115,6 +1153,17 @@ int GenericIO::readGlobalRankNumber(int EffRank) { } size_t GenericIO::readNumElems(int EffRank) { + if (EffRank == -1 && Redistributing) { + DisableCollErrChecking = true; + + size_t TotalSize = 0; + for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) + TotalSize += readNumElems(SourceRanks[i]); + + DisableCollErrChecking = false; + return TotalSize; + } + if (FH.isBigEndian()) return readNumElems<true>(EffRank); return readNumElems<false>(EffRank); @@ -1130,7 +1179,8 @@ size_t GenericIO::readNumElems(int EffRank) { #endif } - openAndReadHeader(false, EffRank, false); + openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed, + EffRank, false); assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); @@ -1145,6 +1195,11 @@ size_t GenericIO::readNumElems(int EffRank) { } void GenericIO::readCoords(int Coords[3], int EffRank) { + if (EffRank == -1 && Redistributing) { + std::fill(Coords, Coords + 3, 0); + return; + } + if (FH.isBigEndian()) readCoords<true>(Coords, EffRank); else @@ -1161,7 +1216,7 @@ void GenericIO::readCoords(int Coords[3], int EffRank) { #endif } - openAndReadHeader(false, EffRank, false); + openAndReadHeader(MismatchAllowed, EffRank, false); assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); @@ -1176,16 +1231,6 @@ void GenericIO::readCoords(int Coords[3], int EffRank) { std::copy(RH->Coords, RH->Coords + 3, Coords); } -void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { - if (FH.isBigEndian()) - readData<true>(EffRank, PrintStats, CollStats); - else - readData<false>(EffRank, PrintStats, CollStats); -} - -// Note: Errors from this function should be recoverable. This means that if -// one rank throws an exception, then all ranks should. -template <bool IsBigEndian> void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { int Rank; #ifndef GENERICIO_NO_MPI @@ -1194,7 +1239,94 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { Rank = 0; #endif - openAndReadHeader(false, EffRank, false); + uint64_t TotalReadSize = 0; +#ifndef GENERICIO_NO_MPI + double StartTime = MPI_Wtime(); +#else + double StartTime = double(clock())/CLOCKS_PER_SEC; +#endif + + int NErrs[3] = { 0, 0, 0 }; + + if (EffRank == -1 && Redistributing) { + DisableCollErrChecking = true; + + size_t RowOffset = 0; + for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) { + readData(SourceRanks[i], RowOffset, Rank, TotalReadSize, NErrs); + RowOffset += readNumElems(SourceRanks[i]); + } + + DisableCollErrChecking = false; + } else { + readData(EffRank, 0, Rank, TotalReadSize, NErrs); + } + + int AllNErrs[3]; +#ifndef GENERICIO_NO_MPI + MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm); +#else + AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2]; +#endif + + if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) { + stringstream ss; + ss << "Experienced " << AllNErrs[0] << " I/O error(s), " << + AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] << + " decompression CRC error(s) reading: " << OpenFileName; + throw runtime_error(ss.str()); + } + +#ifndef GENERICIO_NO_MPI + MPI_Barrier(Comm); +#endif + +#ifndef GENERICIO_NO_MPI + double EndTime = MPI_Wtime(); +#else + double EndTime = double(clock())/CLOCKS_PER_SEC; +#endif + + double TotalTime = EndTime - StartTime; + double MaxTotalTime; +#ifndef GENERICIO_NO_MPI + if (CollStats) + MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm); + else +#endif + MaxTotalTime = TotalTime; + + uint64_t AllTotalReadSize; +#ifndef GENERICIO_NO_MPI + if (CollStats) + MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm); + else +#endif + AllTotalReadSize = TotalReadSize; + + if (Rank == 0 && PrintStats) { + double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.); + cout << "Read " << Vars.size() << " variables from " << FileName << + " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " << + Rate << " MB/s [excluding header read]" << endl; + } +} + +void GenericIO::readData(int EffRank, size_t RowOffset, int Rank, + uint64_t &TotalReadSize, int NErrs[3]) { + if (FH.isBigEndian()) + readData<true>(EffRank, RowOffset, Rank, TotalReadSize, NErrs); + else + readData<false>(EffRank, RowOffset, Rank, TotalReadSize, NErrs); +} + +// Note: Errors from this function should be recoverable. This means that if +// one rank throws an exception, then all ranks should. +template <bool IsBigEndian> +void GenericIO::readData(int EffRank, size_t RowOffset, int Rank, + uint64_t &TotalReadSize, int NErrs[3]) { + openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed, + EffRank, false); assert(FH.getHeaderCache().size() && "HeaderCache must not be empty"); @@ -1209,14 +1341,6 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart + RankIndex*GH->RanksSize]; - uint64_t TotalReadSize = 0; -#ifndef GENERICIO_NO_MPI - double StartTime = MPI_Wtime(); -#else - double StartTime = double(clock())/CLOCKS_PER_SEC; -#endif - - int NErrs[3] = { 0, 0, 0 }; for (size_t i = 0; i < Vars.size(); ++i) { uint64_t Offset = RH->Start; bool VarFound = false; @@ -1262,8 +1386,11 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { throw runtime_error(ss.str()); } + size_t VarOffset = RowOffset*Vars[i].Size; + void *VarData = ((char *) Vars[i].Data) + VarOffset; + vector<unsigned char> LData; - void *Data = Vars[i].Data; + void *Data = VarData; bool HasExtraSpace = Vars[i].HasExtraSpace; if (offsetof_safe(GH, BlocksStart) < GH->GlobalHeaderSize && GH->BlocksSize > 0) { @@ -1402,9 +1529,9 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { #endif blosc_decompress(&LData[0] + sizeof(CompressHeader<IsBigEndian>), - Vars[i].Data, Vars[i].Size*RH->NElems); + VarData, Vars[i].Size*RH->NElems); - if (CH->OrigCRC != crc64_omp(Vars[i].Data, Vars[i].Size*RH->NElems)) { + if (CH->OrigCRC != crc64_omp(VarData, Vars[i].Size*RH->NElems)) { ++NErrs[2]; break; } @@ -1413,7 +1540,7 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { // Byte swap the data if necessary. if (IsBigEndian != isBigEndian()) for (size_t j = 0; j < RH->NElems; ++j) { - char *Offset = ((char *) Vars[i].Data) + j*Vars[i].Size; + char *Offset = ((char *) VarData) + j*Vars[i].Size; bswap(Offset, Vars[i].Size); } @@ -1450,56 +1577,6 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) { if (NErrs[0] || NErrs[1] || NErrs[2]) break; } - - int AllNErrs[3]; -#ifndef GENERICIO_NO_MPI - MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm); -#else - AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2]; -#endif - - if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) { - stringstream ss; - ss << "Experienced " << AllNErrs[0] << " I/O error(s), " << - AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] << - " decompression CRC error(s) reading: " << OpenFileName; - throw runtime_error(ss.str()); - } - -#ifndef GENERICIO_NO_MPI - MPI_Barrier(Comm); -#endif - -#ifndef GENERICIO_NO_MPI - double EndTime = MPI_Wtime(); -#else - double EndTime = double(clock())/CLOCKS_PER_SEC; -#endif - - double TotalTime = EndTime - StartTime; - double MaxTotalTime; -#ifndef GENERICIO_NO_MPI - if (CollStats) - MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm); - else -#endif - MaxTotalTime = TotalTime; - - uint64_t AllTotalReadSize; -#ifndef GENERICIO_NO_MPI - if (CollStats) - MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm); - else -#endif - AllTotalReadSize = TotalReadSize; - - if (Rank == 0 && PrintStats) { - double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.); - cout << "Read " << Vars.size() << " variables from " << FileName << - " (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " << - Rate << " MB/s [excluding header read]" << endl; - } - } void GenericIO::getVariableInfo(vector<VariableInfo> &VI) { diff --git a/GenericIO.h b/GenericIO.h index 8b9539eb36e0a42f0f4dc7e2921f66532656d075..906d5a430042f46e6dc98117d9cc7cc7857b5ca9 100644 --- a/GenericIO.h +++ b/GenericIO.h @@ -187,14 +187,16 @@ public: #ifndef GENERICIO_NO_MPI GenericIO(const MPI_Comm &C, const std::string &FN, unsigned FIOT = -1) : NElems(0), FileIOType(FIOT == (unsigned) -1 ? DefaultFileIOType : FIOT), - Partition(DefaultPartition), Comm(C), FileName(FN), SplitComm(MPI_COMM_NULL) { + Partition(DefaultPartition), Comm(C), FileName(FN), Redistributing(false), + DisableCollErrChecking(false), SplitComm(MPI_COMM_NULL) { std::fill(PhysOrigin, PhysOrigin + 3, 0.0); std::fill(PhysScale, PhysScale + 3, 0.0); } #else GenericIO(const std::string &FN, unsigned FIOT = -1) : NElems(0), FileIOType(FIOT == (unsigned) -1 ? DefaultFileIOType : FIOT), - Partition(DefaultPartition), FileName(FN) { + Partition(DefaultPartition), FileName(FN), Redistributing(false), + DisableCollErrChecking(false) { std::fill(PhysOrigin, PhysOrigin + 3, 0.0); std::fill(PhysScale, PhysScale + 3, 0.0); } @@ -263,9 +265,15 @@ public: void write(); #endif + enum MismatchBehavior { + MismatchAllowed, + MismatchDisallowed, + MismatchRedistribute + }; + // Reading - void openAndReadHeader(bool MustMatch = true, int EffRank = -1, - bool CheckPartMap = true); + void openAndReadHeader(MismatchBehavior MB = MismatchDisallowed, + int EffRank = -1, bool CheckPartMap = true); int readNRanks(); void readDims(int Dims[3]); @@ -280,7 +288,6 @@ public: int getNumberOfVariables() { return this->Vars.size(); }; - void getVariableInfo(std::vector<VariableInfo> &VI); std::size_t readNumElems(int EffRank = -1); @@ -329,9 +336,9 @@ private: #endif template <bool IsBigEndian> - void readHeaderLeader(void *GHPtr, bool MustMatch, int SplitNRanks, - std::string &LocalFileName, uint64_t &HeaderSize, - std::vector<char> &Header); + void readHeaderLeader(void *GHPtr, MismatchBehavior MB, int Rank, int NRanks, + int SplitNRanks, std::string &LocalFileName, + uint64_t &HeaderSize, std::vector<char> &Header); template <bool IsBigEndian> int readNRanks(); @@ -357,8 +364,12 @@ private: template <bool IsBigEndian> void readCoords(int Coords[3], int EffRank); + void readData(int EffRank, size_t RowOffset, int Rank, + uint64_t &TotalReadSize, int NErrs[3]); + template <bool IsBigEndian> - void readData(int EffRank, bool PrintStats, bool CollStats); + void readData(int EffRank, size_t RowOffset, + int Rank, uint64_t &TotalReadSize, int NErrs[3]); template <bool IsBigEndian> void getVariableInfo(std::vector<VariableInfo> &VI); @@ -384,6 +395,10 @@ protected: static std::size_t CollectiveMPIIOThreshold; #endif + // When redistributing, the rank blocks which this process should read. + bool Redistributing, DisableCollErrChecking; + std::vector<int> SourceRanks; + std::vector<int> RankMap; #ifndef GENERICIO_NO_MPI MPI_Comm SplitComm; diff --git a/GenericIO2Cosmo.cxx b/GenericIO2Cosmo.cxx index e92e44b161175a795bbd38f897abd5699ed8f215..9caf97e7952b5ff1a76fc0fd3b6ae92a9aeaa24b 100644 --- a/GenericIO2Cosmo.cxx +++ b/GenericIO2Cosmo.cxx @@ -101,7 +101,7 @@ int main(int argc, char *argv[]) { MPI_COMM_WORLD, #endif mpiioName, Method); - GIO.openAndReadHeader(false); + GIO.openAndReadHeader(GenericIO::MismatchAllowed); int NR = GIO.readNRanks(); if (rank >= NR) { diff --git a/GenericIOBenchmarkRead.cxx b/GenericIOBenchmarkRead.cxx index a6abc9d39bfb7c81b6fbb9dd51e4764ff6d73463..2d1f5d1bf1bb32c3789fb673a892330891c07e76 100644 --- a/GenericIOBenchmarkRead.cxx +++ b/GenericIOBenchmarkRead.cxx @@ -86,7 +86,7 @@ int main(int argc, char *argv[]) { GenericIO GIO( MPI_COMM_WORLD, mpiioName, Method); - GIO.openAndReadHeader(); + GIO.openAndReadHeader(GenericIO::MismatchRedistribute); MPI_Barrier(MPI_COMM_WORLD); diff --git a/GenericIOPrint.cxx b/GenericIOPrint.cxx index 95de109b3df5988b7bfc14b7cd49bb6ae4a90162..2fd931349b1be9bee981b64cf8bbd740ef74f860 100644 --- a/GenericIOPrint.cxx +++ b/GenericIOPrint.cxx @@ -143,7 +143,7 @@ int main(int argc, char *argv[]) { #else GenericIO GIO(FileName, Method); #endif - GIO.openAndReadHeader(false, -1, !ShowMap); + GIO.openAndReadHeader(GenericIO::MismatchAllowed, -1, !ShowMap); int NR = GIO.readNRanks(); diff --git a/GenericIOVerify.cxx b/GenericIOVerify.cxx index b6e9f818ddc12c6c09b6f301b8074920cf808247..61b650496d7cbc7d7f71c5471e2261593ef80b47 100644 --- a/GenericIOVerify.cxx +++ b/GenericIOVerify.cxx @@ -105,7 +105,8 @@ int main(int argc, char *argv[]) { #else bool MustMatch = false; #endif - GIO.openAndReadHeader(MustMatch); + GIO.openAndReadHeader(MustMatch ? GenericIO::MismatchRedistribute : + GenericIO::MismatchDisallowed); if (Verbose) cout << "\theader: okay" << endl;