Newer
Older
/*
* Copyright (C) 2015, UChicago Argonne, LLC
* All Rights Reserved
* Generic IO (ANL-15-066)
* Hal Finkel, Argonne National Laboratory
* Under the terms of Contract No. DE-AC02-06CH11357 with UChicago Argonne,
* LLC, the U.S. Government retains certain rights in this software.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the names of UChicago Argonne, LLC or the Department of Energy
* nor the names of its contributors may be used to endorse or promote
* products derived from this software without specific prior written
* permission.
* *****************************************************************************
* DISCLAIMER
* THE SOFTWARE IS SUPPLIED “AS IS” WITHOUT WARRANTY OF ANY KIND. NEITHER THE
* UNTED STATES GOVERNMENT, NOR THE UNITED STATES DEPARTMENT OF ENERGY, NOR
* UCHICAGO ARGONNE, LLC, NOR ANY OF THEIR EMPLOYEES, MAKES ANY WARRANTY,
* EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE
* ACCURACY, COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, DATA, APPARATUS,
* PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT INFRINGE
* PRIVATELY OWNED RIGHTS.
* *****************************************************************************
*/
#define _XOPEN_SOURCE 600
#include "CRC64.h"
#include "GenericIO.h"
extern "C" {
#include "blosc.h"
}
#include <sstream>
#include <fstream>
#include <stdexcept>
#include <cassert>
#include <cstddef>
#include <cstring>
#ifndef GENERICIO_NO_MPI
#include <ctime>
#endif
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#ifdef __bgq__
#endif
#ifndef MPI_UINT64_T
#define MPI_UINT64_T (sizeof(long) == 8 ? MPI_LONG : MPI_LONG_LONG)
#endif
using namespace std;
namespace gio {
#ifndef GENERICIO_NO_MPI
GenericFileIO_MPI::~GenericFileIO_MPI() {
(void) MPI_File_close(&FH);
}
void GenericFileIO_MPI::open(const std::string &FN, bool ForReading, bool MustExist) {
int amode = ForReading ? MPI_MODE_RDONLY : (MPI_MODE_WRONLY |
(!MustExist ? MPI_MODE_CREATE : 0));
if (MPI_File_open(Comm, const_cast<char *>(FileName.c_str()), amode,
MPI_INFO_NULL, &FH) != MPI_SUCCESS)
throw runtime_error(((!ForReading && !MustExist) ? "Unable to create the file: " :
"Unable to open the file: ") +
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
FileName);
}
void GenericFileIO_MPI::setSize(size_t sz) {
if (MPI_File_set_size(FH, sz) != MPI_SUCCESS)
throw runtime_error("Unable to set size for file: " + FileName);
}
void GenericFileIO_MPI::read(void *buf, size_t count, off_t offset,
const std::string &D) {
while (count > 0) {
MPI_Status status;
if (MPI_File_read_at(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
throw runtime_error("Unable to read " + D + " from file: " + FileName);
int scount;
(void) MPI_Get_count(&status, MPI_BYTE, &scount);
count -= scount;
buf = ((char *) buf) + scount;
offset += scount;
}
}
void GenericFileIO_MPI::write(const void *buf, size_t count, off_t offset,
const std::string &D) {
while (count > 0) {
MPI_Status status;
if (MPI_File_write_at(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
throw runtime_error("Unable to write " + D + " to file: " + FileName);
int scount = 0;
// On some systems, MPI_Get_count will not return zero even when count is zero.
if (count > 0)
(void) MPI_Get_count(&status, MPI_BYTE, &scount);
count -= scount;
buf = ((char *) buf) + scount;
offset += scount;
}
}
void GenericFileIO_MPICollective::read(void *buf, size_t count, off_t offset,
const std::string &D) {
int Continue = 0;
do {
MPI_Status status;
if (MPI_File_read_at_all(FH, offset, buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
throw runtime_error("Unable to read " + D + " from file: " + FileName);
int scount = 0;
// On some systems, MPI_Get_count will not return zero even when count is zero.
if (count > 0)
(void) MPI_Get_count(&status, MPI_BYTE, &scount);
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
count -= scount;
buf = ((char *) buf) + scount;
offset += scount;
int NeedContinue = (count > 0);
MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
} while (Continue);
}
void GenericFileIO_MPICollective::write(const void *buf, size_t count, off_t offset,
const std::string &D) {
int Continue = 0;
do {
MPI_Status status;
if (MPI_File_write_at_all(FH, offset, (void *) buf, count, MPI_BYTE, &status) != MPI_SUCCESS)
throw runtime_error("Unable to write " + D + " to file: " + FileName);
int scount;
(void) MPI_Get_count(&status, MPI_BYTE, &scount);
count -= scount;
buf = ((char *) buf) + scount;
offset += scount;
int NeedContinue = (count > 0);
MPI_Allreduce(&NeedContinue, &Continue, 1, MPI_INT, MPI_SUM, Comm);
} while (Continue);
}
#endif
GenericFileIO_POSIX::~GenericFileIO_POSIX() {
if (FH != -1) close(FH);
}
void GenericFileIO_POSIX::open(const std::string &FN, bool ForReading, bool MustExist) {
int flags = ForReading ? O_RDONLY : (O_WRONLY |
(!MustExist ? O_CREAT : 0));
int mode = S_IRUSR | S_IWUSR | S_IRGRP;
errno = 0;
if ((FH = ::open(FileName.c_str(), flags, mode)) == -1)
throw runtime_error(((!ForReading && !MustExist) ? "Unable to create the file: " :
"Unable to open the file: ") +
FileName + ": " + strerror(errno));
}
void GenericFileIO_POSIX::setSize(size_t sz) {
if (ftruncate(FH, sz) == -1)
throw runtime_error("Unable to set size for file: " + FileName);
}
void GenericFileIO_POSIX::read(void *buf, size_t count, off_t offset,
const std::string &D) {
while (count > 0) {
ssize_t scount;
errno = 0;
if ((scount = pread(FH, buf, count, offset)) == -1) {
if (errno == EINTR)
continue;
throw runtime_error("Unable to read " + D + " from file: " + FileName +
": " + strerror(errno));
}
count -= scount;
buf = ((char *) buf) + scount;
offset += scount;
}
}
void GenericFileIO_POSIX::write(const void *buf, size_t count, off_t offset,
const std::string &D) {
while (count > 0) {
ssize_t scount;
errno = 0;
if ((scount = pwrite(FH, buf, count, offset)) == -1) {
if (errno == EINTR)
continue;
throw runtime_error("Unable to write " + D + " to file: " + FileName +
": " + strerror(errno));
}
count -= scount;
buf = ((char *) buf) + scount;
offset += scount;
}
}
static bool isBigEndian() {
const uint32_t one = 1;
return !(*((char *)(&one)));
}
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
static void bswap(void *v, size_t s) {
char *p = (char *) v;
for (size_t i = 0; i < s/2; ++i)
std::swap(p[i], p[s - (i+1)]);
}
// Using #pragma pack here, instead of __attribute__((packed)) because xlc, at
// least as of v12.1, won't take __attribute__((packed)) on non-POD and/or
// templated types.
#pragma pack(1)
template <typename T, bool IsBigEndian>
struct endian_specific_value {
operator T() const {
T rvalue = value;
if (IsBigEndian != isBigEndian())
bswap(&rvalue, sizeof(T));
return rvalue;
};
endian_specific_value &operator = (T nvalue) {
if (IsBigEndian != isBigEndian())
bswap(&nvalue, sizeof(T));
value = nvalue;
return *this;
}
endian_specific_value &operator += (T nvalue) {
*this = *this + nvalue;
return *this;
}
endian_specific_value &operator -= (T nvalue) {
*this = *this - nvalue;
return *this;
}
private:
T value;
};
static const size_t CRCSize = 8;
static const size_t MagicSize = 8;
static const char *MagicBE = "HACC01B";
static const char *MagicLE = "HACC01L";
template <bool IsBigEndian>
struct GlobalHeader {
char Magic[MagicSize];
endian_specific_value<uint64_t, IsBigEndian> HeaderSize;
endian_specific_value<uint64_t, IsBigEndian> NElems; // The global total
endian_specific_value<uint64_t, IsBigEndian> Dims[3];
endian_specific_value<uint64_t, IsBigEndian> NVars;
endian_specific_value<uint64_t, IsBigEndian> VarsSize;
endian_specific_value<uint64_t, IsBigEndian> VarsStart;
endian_specific_value<uint64_t, IsBigEndian> NRanks;
endian_specific_value<uint64_t, IsBigEndian> RanksSize;
endian_specific_value<uint64_t, IsBigEndian> RanksStart;
endian_specific_value<uint64_t, IsBigEndian> GlobalHeaderSize;
endian_specific_value<double, IsBigEndian> PhysOrigin[3];
endian_specific_value<double, IsBigEndian> PhysScale[3];
endian_specific_value<uint64_t, IsBigEndian> BlocksSize;
endian_specific_value<uint64_t, IsBigEndian> BlocksStart;
};
enum {
FloatValue = (1 << 0),
SignedValue = (1 << 1),
ValueIsPhysCoordX = (1 << 2),
ValueIsPhysCoordY = (1 << 3),
ValueIsPhysCoordZ = (1 << 4),
ValueMaybePhysGhost = (1 << 5)
};
static const size_t NameSize = 256;
template <bool IsBigEndian>
struct VariableHeader {
char Name[NameSize];
endian_specific_value<uint64_t, IsBigEndian> Flags;
endian_specific_value<uint64_t, IsBigEndian> Size;
endian_specific_value<uint64_t, IsBigEndian> ElementSize;
template <bool IsBigEndian>
endian_specific_value<uint64_t, IsBigEndian> Coords[3];
endian_specific_value<uint64_t, IsBigEndian> NElems;
endian_specific_value<uint64_t, IsBigEndian> Start;
endian_specific_value<uint64_t, IsBigEndian> GlobalRank;
};
static const size_t FilterNameSize = 8;
static const size_t MaxFilters = 4;
template <bool IsBigEndian>
struct BlockHeader {
char Filters[MaxFilters][FilterNameSize];
endian_specific_value<uint64_t, IsBigEndian> Start;
endian_specific_value<uint64_t, IsBigEndian> Size;
};
template <bool IsBigEndian>
endian_specific_value<uint64_t, IsBigEndian> OrigCRC;
};
unsigned GenericIO::DefaultFileIOType = FileIOPOSIX;
int GenericIO::DefaultPartition = 0;
bool GenericIO::DefaultShouldCompress = false;
#ifndef GENERICIO_NO_MPI
std::size_t GenericIO::CollectiveMPIIOThreshold = 0;
#endif
static bool blosc_initialized = false;
static int GetSZDT(GenericIO::Variable &Var) {
if (Var.hasElementType<float>())
return SZ_FLOAT;
else if (Var.hasElementType<double>())
return SZ_DOUBLE;
else if (Var.hasElementType<uint8_t>())
return SZ_UINT8;
else if (Var.hasElementType<int8_t>())
return SZ_INT8;
else if (Var.hasElementType<uint16_t>())
return SZ_UINT16;
else if (Var.hasElementType<int16_t>())
return SZ_INT16;
else if (Var.hasElementType<uint32_t>())
return SZ_UINT32;
else if (Var.hasElementType<int32_t>())
return SZ_INT32;
else if (Var.hasElementType<uint64_t>())
return SZ_UINT64;
else if (Var.hasElementType<int64_t>())
return SZ_INT64;
else
return -1;
}

Hal Finkel
committed
void GenericIO::setFH(
#ifndef GENERICIO_NO_MPI
MPI_Comm R
#endif
) {
#ifndef GENERICIO_NO_MPI
if (FileIOType == FileIOMPI)
FH.get() = new GenericFileIO_MPI(R);
else if (FileIOType == FileIOMPICollective)
FH.get() = new GenericFileIO_MPICollective(R);
else
#endif
#ifdef GENERICIO_WITH_VELOC
if (FileIOType == FileIOVELOC)
FH.get() = new GenericFileIO_VELOC();
else
#endif
FH.get() = new GenericFileIO_POSIX();
}
void GenericIO::write() {
if (isBigEndian())
write<true>();
else
write<false>();
}
// Note: writing errors are not currently recoverable (one rank may fail
// while the others don't).
template <bool IsBigEndian>
const char *Magic = IsBigEndian ? MagicBE : MagicLE;
uint64_t FileSize = 0;
int NRanks, Rank;
MPI_Comm_rank(Comm, &Rank);
MPI_Comm_size(Comm, &NRanks);
#ifdef __bgq__
MPI_Barrier(Comm);
#endif
MPI_Comm_split(Comm, Partition, Rank, &SplitComm);
int SplitNRanks, SplitRank;
MPI_Comm_rank(SplitComm, &SplitRank);
MPI_Comm_size(SplitComm, &SplitNRanks);
bool Rank0CreateAll = false;
const char *EnvStr = getenv("GENERICIO_RANK0_CREATE_ALL");
if (EnvStr) {
int Mod = atoi(EnvStr);
Rank0CreateAll = (Mod > 0);
}
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
string LocalFileName;
if (SplitNRanks != NRanks) {
if (Rank == 0) {
// In split mode, the specified file becomes the rank map, and the real
// data is partitioned.
vector<int> MapRank, MapPartition;
MapRank.resize(NRanks);
for (int i = 0; i < NRanks; ++i) MapRank[i] = i;
MapPartition.resize(NRanks);
MPI_Gather(&Partition, 1, MPI_INT, &MapPartition[0], 1, MPI_INT, 0, Comm);
GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
GIO.setNumElems(NRanks);
GIO.addVariable("$rank", MapRank); /* this is for use by humans; the reading
code assumes that the partitions are in
rank order */
GIO.addVariable("$partition", MapPartition);
vector<int> CX, CY, CZ;
int TopoStatus;
MPI_Topo_test(Comm, &TopoStatus);
if (TopoStatus == MPI_CART) {
CX.resize(NRanks);
CY.resize(NRanks);
CZ.resize(NRanks);
for (int i = 0; i < NRanks; ++i) {
int C[3];
MPI_Cart_coords(Comm, i, 3, C);
CX[i] = C[0];
CY[i] = C[1];
CZ[i] = C[2];
}
GIO.addVariable("$x", CX);
GIO.addVariable("$y", CY);
GIO.addVariable("$z", CZ);
}
GIO.write();
// On some file systems, it can be very expensive for multiple ranks to
// create files in the same directory. Creating a new file requires a
// kind of exclusive lock that is expensive to obtain.
if (Rank0CreateAll) {
set<int> AllPartitions;
for (int i = 0; i < NRanks; ++i) AllPartitions.insert(MapPartition[i]);
for (set<int>::iterator i = AllPartitions.begin(),
e = AllPartitions.end(); i != e; ++i) {
stringstream ss;
ss << FileName << "#" << *i;
setFH(MPI_COMM_SELF);
FH.get()->open(ss.str());
close();
}
}
} else {
MPI_Gather(&Partition, 1, MPI_INT, 0, 0, MPI_INT, 0, Comm);
}
stringstream ss;
ss << FileName << "#" << Partition;
LocalFileName = ss.str();
} else {
LocalFileName = FileName;
}
#ifndef GENERICIO_NO_MPI
if(Rank0CreateAll && NRanks > 1)
MPI_Barrier(Comm);
#endif
RankHeader<IsBigEndian> RHLocal;
int Dims[3], Periods[3], Coords[3];
int TopoStatus;
MPI_Topo_test(Comm, &TopoStatus);
if (TopoStatus == MPI_CART) {
MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
} else {
Dims[0] = NRanks;
std::fill(Dims + 1, Dims + 3, 1);
std::fill(Periods, Periods + 3, 0);
Coords[0] = Rank;
std::fill(Coords + 1, Coords + 3, 0);
}
std::copy(Coords, Coords + 3, RHLocal.Coords);
RHLocal.NElems = NElems;
RHLocal.Start = 0;
RHLocal.GlobalRank = Rank;
bool ShouldCompress = DefaultShouldCompress;
EnvStr = getenv("GENERICIO_COMPRESS");
if (EnvStr) {
int Mod = atoi(EnvStr);
ShouldCompress = (Mod > 0);
}
bool NeedsBlockHeaders = ShouldCompress;
EnvStr = getenv("GENERICIO_FORCE_BLOCKS");
if (!NeedsBlockHeaders && EnvStr) {
int Mod = atoi(EnvStr);
NeedsBlockHeaders = (Mod > 0);
}
vector<BlockHeader<IsBigEndian> > LocalBlockHeaders;
vector<void *> LocalData;
vector<bool> LocalHasExtraSpace;
vector<vector<unsigned char> > LocalCData;
if (NeedsBlockHeaders) {
LocalBlockHeaders.resize(Vars.size());
LocalData.resize(Vars.size());
LocalHasExtraSpace.resize(Vars.size());
if (ShouldCompress)
LocalCData.resize(Vars.size());
for (size_t i = 0; i < Vars.size(); ++i) {
// Filters null by default, leave null starting address (needs to be
// calculated by the header-writing rank).
memset(&LocalBlockHeaders[i], 0, sizeof(BlockHeader<IsBigEndian>));
void *OrigData = Vars[i].Data;
bool FreeOrigData = false;
size_t OrigUnitSize = Vars[i].Size;
size_t OrigDataSize = NElems*Vars[i].Size;
int FilterIdx = 0;
if (Vars[i].LCI.Mode != LossyCompressionInfo::LCModeNone) {
#ifdef _OPENMP
#pragma omp master
{
#endif
if (!sz_initialized) {
SZ_Init(NULL);
confparams_cpr->szMode = 0; // Best-speed mode.
sz_initialized = true;
}
#ifdef _OPENMP
}
#endif
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
int SZDT = GetSZDT(Vars[i]);
if (SZDT == -1)
goto nosz;
int EBM;
switch (Vars[i].LCI.Mode) {
case LossyCompressionInfo::LCModeAbs:
EBM = ABS;
break;
case LossyCompressionInfo::LCModeRel:
EBM = REL;
break;
case LossyCompressionInfo::LCModeAbsAndRel:
EBM = ABS_AND_REL;
break;
case LossyCompressionInfo::LCModeAbsOrRel:
EBM = ABS_OR_REL;
break;
case LossyCompressionInfo::LCModePSNR:
EBM = PSNR;
break;
}
size_t LOutSize;
unsigned char *LCompressedData = SZ_compress_args(SZDT, Vars[i].Data, &LOutSize, EBM,
Vars[i].LCI.AbsErrThreshold, Vars[i].LCI.RelErrThreshold,
Vars[i].LCI.PSNRThreshold, 0, 0, 0, 0, NElems);
if (!LCompressedData)
goto nosz;
if (LOutSize >= NElems*Vars[i].Size) {
free(LCompressedData);
goto nosz;
}
OrigData = LCompressedData;
FreeOrigData = true;
OrigUnitSize = 1;
OrigDataSize = LOutSize;
strncpy(LocalBlockHeaders[i].Filters[FilterIdx++], LossyCompressName, FilterNameSize);
}
nosz:
LocalCData[i].resize(sizeof(CompressHeader<IsBigEndian>));
CompressHeader<IsBigEndian> *CH = (CompressHeader<IsBigEndian>*) &LocalCData[i][0];
CH->OrigCRC = crc64_omp(OrigData, OrigDataSize);
#ifdef _OPENMP
#pragma omp master
{
#endif
if (!blosc_initialized) {
blosc_init();
blosc_initialized = true;
}
#ifdef _OPENMP
blosc_set_nthreads(omp_get_max_threads());
}
#endif

Hal Finkel
committed
size_t RealOrigDataSize = NElems*Vars[i].Size;
LocalCData[i].resize(LocalCData[i].size() + RealOrigDataSize);
if (blosc_compress(9, 1, OrigUnitSize, OrigDataSize, OrigData,
&LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),

Hal Finkel
committed
RealOrigDataSize) <= 0) {
if (FreeOrigData)
free(OrigData);
if (FreeOrigData)
free(OrigData);
strncpy(LocalBlockHeaders[i].Filters[FilterIdx++], CompressName, FilterNameSize);
blosc_cbuffer_sizes(&LocalCData[i][0] + sizeof(CompressHeader<IsBigEndian>),
LocalCData[i].resize(CCBytes + sizeof(CompressHeader<IsBigEndian>));
LocalBlockHeaders[i].Size = LocalCData[i].size();
LocalCData[i].resize(LocalCData[i].size() + CRCSize);
LocalData[i] = &LocalCData[i][0];
LocalHasExtraSpace[i] = true;
} else {
nocomp:
LocalBlockHeaders[i].Size = NElems*Vars[i].Size;
LocalData[i] = Vars[i].Data;
LocalHasExtraSpace[i] = Vars[i].HasExtraSpace;
}
}
}
double StartTime = MPI_Wtime();
if (SplitRank == 0) {
uint64_t HeaderSize = sizeof(GlobalHeader<IsBigEndian>) + Vars.size()*sizeof(VariableHeader<IsBigEndian>) +
SplitNRanks*sizeof(RankHeader<IsBigEndian>) + CRCSize;
HeaderSize += SplitNRanks*Vars.size()*sizeof(BlockHeader<IsBigEndian>);
GlobalHeader<IsBigEndian> *GH = (GlobalHeader<IsBigEndian> *) &Header[0];
std::copy(Magic, Magic + MagicSize, GH->Magic);
GH->HeaderSize = HeaderSize - CRCSize;
GH->NElems = NElems; // This will be updated later
std::copy(Dims, Dims + 3, GH->Dims);
GH->NVars = Vars.size();
GH->VarsSize = sizeof(VariableHeader<IsBigEndian>);
GH->VarsStart = sizeof(GlobalHeader<IsBigEndian>);
GH->RanksSize = sizeof(RankHeader<IsBigEndian>);
GH->RanksStart = GH->VarsStart + Vars.size()*sizeof(VariableHeader<IsBigEndian>);
GH->GlobalHeaderSize = sizeof(GlobalHeader<IsBigEndian>);
std::copy(PhysOrigin, PhysOrigin + 3, GH->PhysOrigin);
std::copy(PhysScale, PhysScale + 3, GH->PhysScale);
if (!NeedsBlockHeaders) {
GH->BlocksSize = GH->BlocksStart = 0;
} else {
GH->BlocksSize = sizeof(BlockHeader<IsBigEndian>);
GH->BlocksStart = GH->RanksStart + SplitNRanks*sizeof(RankHeader<IsBigEndian>);
VariableHeader<IsBigEndian> *VH = (VariableHeader<IsBigEndian> *) &Header[GH->VarsStart];
for (size_t i = 0; i < Vars.size(); ++i, ++VH) {
string VName(Vars[i].Name);
VName.resize(NameSize);
std::copy(VName.begin(), VName.end(), VH->Name);
uint64_t VFlags = 0;
if (Vars[i].IsFloat) VFlags |= FloatValue;
if (Vars[i].IsSigned) VFlags |= SignedValue;
if (Vars[i].IsPhysCoordX) VFlags |= ValueIsPhysCoordX;
if (Vars[i].IsPhysCoordY) VFlags |= ValueIsPhysCoordY;
if (Vars[i].IsPhysCoordZ) VFlags |= ValueIsPhysCoordZ;
if (Vars[i].MaybePhysGhost) VFlags |= ValueMaybePhysGhost;
VH->Flags = VFlags;
VH->ElementSize = Vars[i].ElementSize;
}
MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE,
&Header[GH->RanksStart], sizeof(RHLocal),
MPI_BYTE, 0, SplitComm);
if (NeedsBlockHeaders) {
MPI_Gather(&LocalBlockHeaders[0],
Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
Vars.size()*sizeof(BlockHeader<IsBigEndian>), MPI_BYTE,
BlockHeader<IsBigEndian> *BH = (BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart];
for (int i = 0; i < SplitNRanks; ++i)
for (size_t j = 0; j < Vars.size(); ++j, ++BH) {
if (i == 0 && j == 0)
BH->Start = HeaderSize;
else
BH->Start = BH[-1].Start + BH[-1].Size + CRCSize;
}
RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
RH->Start = HeaderSize; ++RH;
for (int i = 1; i < SplitNRanks; ++i, ++RH) {
RH->Start =
((BlockHeader<IsBigEndian> *) &Header[GH->BlocksStart])[i*Vars.size()].Start;
GH->NElems += RH->NElems;
}
// Compute the total file size.
uint64_t LastData = BH[-1].Size + CRCSize;
FileSize = BH[-1].Start + LastData;
} else {
RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &Header[GH->RanksStart];
RH->Start = HeaderSize; ++RH;
for (int i = 1; i < SplitNRanks; ++i, ++RH) {
uint64_t PrevNElems = RH[-1].NElems;
uint64_t PrevData = PrevNElems*RecordSize + CRCSize*Vars.size();
RH->Start = RH[-1].Start + PrevData;
GH->NElems += RH->NElems;
}
// Compute the total file size.
uint64_t LastNElems = RH[-1].NElems;
uint64_t LastData = LastNElems*RecordSize + CRCSize*Vars.size();
FileSize = RH[-1].Start + LastData;
}
// Now that the starting offset has been computed, send it back to each rank.
MPI_Scatter(&Header[GH->RanksStart], sizeof(RHLocal),
MPI_BYTE, &RHLocal, sizeof(RHLocal),
MPI_BYTE, 0, SplitComm);
if (NeedsBlockHeaders)
MPI_Scatter(&Header[GH->BlocksStart],
sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
sizeof(BlockHeader<IsBigEndian>)*Vars.size(), MPI_BYTE,
0, SplitComm);
uint64_t HeaderCRC = crc64_omp(&Header[0], HeaderSize - CRCSize);
crc64_invert(HeaderCRC, &Header[HeaderSize - CRCSize]);
setFH(MPI_COMM_SELF);
FH.get()->open(LocalFileName, false, Rank0CreateAll && NRanks>1);
FH.get()->setSize(FileSize);
FH.get()->write(&Header[0], HeaderSize, 0, "header");
close();
} else {
MPI_Gather(&RHLocal, sizeof(RHLocal), MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
if (NeedsBlockHeaders)
MPI_Gather(&LocalBlockHeaders[0], Vars.size()*sizeof(BlockHeader<IsBigEndian>),
MPI_BYTE, 0, 0, MPI_BYTE, 0, SplitComm);
MPI_Scatter(0, 0, MPI_BYTE, &RHLocal, sizeof(RHLocal), MPI_BYTE, 0, SplitComm);
if (NeedsBlockHeaders)
MPI_Scatter(0, 0, MPI_BYTE, &LocalBlockHeaders[0], sizeof(BlockHeader<IsBigEndian>)*Vars.size(),
MPI_BYTE, 0, SplitComm);
}
MPI_Barrier(SplitComm);
setFH(SplitComm);
FH.get()->open(LocalFileName, false, true);
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
uint64_t Offset = RHLocal.Start;
for (size_t i = 0; i < Vars.size(); ++i) {
uint64_t WriteSize = NeedsBlockHeaders ?
LocalBlockHeaders[i].Size : NElems*Vars[i].Size;
void *Data = NeedsBlockHeaders ? LocalData[i] : Vars[i].Data;
uint64_t CRC = crc64_omp(Data, WriteSize);
bool HasExtraSpace = NeedsBlockHeaders ?
LocalHasExtraSpace[i] : Vars[i].HasExtraSpace;
char *CRCLoc = HasExtraSpace ? ((char *) Data) + WriteSize : (char *) &CRC;
if (NeedsBlockHeaders)
Offset = LocalBlockHeaders[i].Start;
// When using extra space for the CRC write, preserve the original contents.
char CRCSave[CRCSize];
if (HasExtraSpace)
std::copy(CRCLoc, CRCLoc + CRCSize, CRCSave);
crc64_invert(CRC, CRCLoc);
if (HasExtraSpace) {
FH.get()->write(Data, WriteSize + CRCSize, Offset, Vars[i].Name + " with CRC");
} else {
FH.get()->write(Data, WriteSize, Offset, Vars[i].Name);
FH.get()->write(CRCLoc, CRCSize, Offset + WriteSize, Vars[i].Name + " CRC");
}
if (HasExtraSpace)
std::copy(CRCSave, CRCSave + CRCSize, CRCLoc);
Offset += WriteSize + CRCSize;
}
close();
MPI_Barrier(Comm);
double EndTime = MPI_Wtime();
double TotalTime = EndTime - StartTime;
double MaxTotalTime;
MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
if (SplitNRanks != NRanks) {
uint64_t ContribFileSize = (SplitRank == 0) ? FileSize : 0;
MPI_Reduce(&ContribFileSize, &FileSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
}
if (Rank == 0) {
double Rate = ((double) FileSize) / MaxTotalTime / (1024.*1024.);
std::cout << "Wrote " << Vars.size() << " variables to " << FileName <<
" (" << FileSize << " bytes) in " << MaxTotalTime << "s: " <<
Rate << " MB/s" << std::endl;
}
MPI_Comm_free(&SplitComm);
SplitComm = MPI_COMM_NULL;
}
#endif // GENERICIO_NO_MPI
template <bool IsBigEndian>
void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks,
int Rank, int SplitNRanks,
string &LocalFileName, uint64_t &HeaderSize,
vector<char> &Header) {
GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr;
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
if (SplitNRanks != (int) GH.NRanks) {
stringstream ss;
ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " <<
"current: " << SplitNRanks << ", file: " << GH.NRanks;
throw runtime_error(ss.str());
}
#ifndef GENERICIO_NO_MPI
int TopoStatus;
MPI_Topo_test(Comm, &TopoStatus);
if (TopoStatus == MPI_CART) {
int Dims[3], Periods[3], Coords[3];
MPI_Cart_get(Comm, 3, Dims, Periods, Coords);
bool DimsMatch = true;
for (int i = 0; i < 3; ++i) {
if ((uint64_t) Dims[i] != GH.Dims[i]) {
DimsMatch = false;
break;
}
}
if (!DimsMatch) {
stringstream ss;
ss << "Won't read " << LocalFileName <<
": communicator-decomposition mismatch: " <<
"current: " << Dims[0] << "x" << Dims[1] << "x" << Dims[2] <<
", file: " << GH.Dims[0] << "x" << GH.Dims[1] << "x" <<
GH.Dims[2];
throw runtime_error(ss.str());
}
}
#endif
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
} else if (MB == MismatchRedistribute && !Redistributing) {
Redistributing = true;
int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size();
int NFileRanksPerRank = NFileRanks/NRanks;
int NRemFileRank = NFileRanks % NRanks;
if (!NFileRanksPerRank) {
// We have only the remainder, so the last NRemFileRank ranks get one
// file rank, and the others don't.
if (NRemFileRank && NRanks - Rank <= NRemFileRank)
SourceRanks.push_back(NRanks - (Rank + 1));
} else {
// Since NRemFileRank < NRanks, and we don't want to put any extra memory
// load on rank 0 (because rank 0's memory load is normally higher than
// the other ranks anyway), the last NRemFileRank will each take
// (NFileRanksPerRank+1) file ranks.
int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1;
for (int i = 1; i <= Rank; ++i) {
FirstFileRank = LastFileRank + 1;
LastFileRank = FirstFileRank + NFileRanksPerRank - 1;
if (NRemFileRank && NRanks - i <= NRemFileRank)
++LastFileRank;
}
for (int i = FirstFileRank; i <= LastFileRank; ++i)
SourceRanks.push_back(i);
}
}
HeaderSize = GH.HeaderSize;
Header.resize(HeaderSize + CRCSize, 0xFE /* poison */);
FH.get()->read(&Header[0], HeaderSize + CRCSize, 0, "header");
uint64_t CRC = crc64_omp(&Header[0], HeaderSize + CRCSize);
if (CRC != (uint64_t) -1) {
throw runtime_error("Header CRC check failed: " + LocalFileName);
}
}
// Note: Errors from this function should be recoverable. This means that if
// one rank throws an exception, then all ranks should.
void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) {
int NRanks, Rank;
#ifndef GENERICIO_NO_MPI
MPI_Comm_rank(Comm, &Rank);
MPI_Comm_size(Comm, &NRanks);
#else
Rank = 0;
NRanks = 1;
#endif
if (EffRank == -1)
EffRank = MB == MismatchRedistribute ? 0 : Rank;
if (RankMap.empty() && CheckPartMap) {
// First, check to see if the file is a rank map.
unsigned long RanksInMap = 0;
if (Rank == 0) {
try {
#ifndef GENERICIO_NO_MPI
GenericIO GIO(MPI_COMM_SELF, FileName, FileIOType);
#else
GenericIO GIO(FileName, FileIOType);
#endif
GIO.openAndReadHeader(MismatchDisallowed, 0, false);