Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public:
private:
/** This is responsible for dumping to file */
std::unique_ptr< WarpXOpenPMDPlot > m_OpenPMDPlotWriter;
int m_NumAggBTDBufferToFlush=5;
Comment thread
guj marked this conversation as resolved.
Outdated
};

#endif // WARPX_FLUSHFORMATOPENPMD_H_
6 changes: 6 additions & 0 deletions Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ FlushFormatOpenPMD::FlushFormatOpenPMD (const std::string& diag_name)
ablastr::warn_manager::WMRecordWarning("Diagnostics", warnMsg);
encoding = openPMD::IterationEncoding::groupBased;
}

pp_diag_name.query("buffer_flush_limit_btd", m_NumAggBTDBufferToFlush);
Comment thread
guj marked this conversation as resolved.
amrex::Print()<<" BTD: ForceFlushEvery: "<<m_NumAggBTDBufferToFlush<<" Buffers per snapshot "<<std::endl;
Comment thread
guj marked this conversation as resolved.
Outdated
}

//
Expand Down Expand Up @@ -176,6 +179,9 @@ FlushFormatOpenPMD::WriteToFile (
m_OpenPMDPlotWriter->WriteOpenPMDParticles(
particle_diags, static_cast<amrex::Real>(time), use_pinned_pc, isBTD, isLastBTDFlush);

if (bufferID % m_NumAggBTDBufferToFlush == 0)
m_OpenPMDPlotWriter->ForceFlush(isBTD);
Comment thread
ax3l marked this conversation as resolved.
Outdated

// signal that no further updates will be written to this iteration
m_OpenPMDPlotWriter->CloseStep(isBTD, isLastBTDFlush);
}
1 change: 1 addition & 0 deletions Source/Diagnostics/WarpXOpenPMD.H
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public:
/** Return OpenPMD File type ("bp5", "bp4", "h5" or "json")*/
std::string OpenPMDFileType () { return m_OpenPMDFileType; }

void ForceFlush(bool isBTD);
Comment thread
guj marked this conversation as resolved.
Outdated
private:
void Init (openPMD::Access access, bool isBTD);

Expand Down
77 changes: 60 additions & 17 deletions Source/Diagnostics/WarpXOpenPMD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,13 +407,27 @@ WarpXOpenPMDPlot::~WarpXOpenPMDPlot ()
}
}

/*
* If I/O is through ADIOS:
* isBTD=true => PerformPut
* this way we do not flush out every buffer in a snapshot,
* (BTD uses few data ranks so this is costly for ADIOS collective functions)
* Instead we aggregate a few buffers before calling ForceFlush(isBTD) to write out.
* Note that SPAN is used to allocate CPU data in ADIOS.
* The advantage is when SPAN is successful, PerformPut takes no action.
*
* isBTD=false => PDW
*/
Comment thread
guj marked this conversation as resolved.
Outdated
void WarpXOpenPMDPlot::flushCurrent (bool isBTD) const
{
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent");

openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);

currIteration.seriesFlush();
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
if (isBTD) {
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent-PP()");
Comment thread
guj marked this conversation as resolved.
Outdated
currIteration.seriesFlush( "adios2.engine.preferred_flush_target = \"buffer\"" );
Comment thread
guj marked this conversation as resolved.
Outdated
} else {
WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()");
currIteration.seriesFlush();
}
}

std::string
Expand Down Expand Up @@ -463,6 +477,7 @@ void WarpXOpenPMDPlot::SetStep (int ts, const std::string& dirPrefix, int file_m

void WarpXOpenPMDPlot::CloseStep (bool isBTD, bool isLastBTDFlush)
{
WARPX_PROFILE("WarpXOpenPMDPlot::CloseStep()");
// default close is true
bool callClose = true;
// close BTD file only when isLastBTDFlush is true
Expand Down Expand Up @@ -666,19 +681,37 @@ for (const auto & particle_diag : particle_diags) {
pc->getCharge(), pc->getMass(),
isBTD, isLastBTDFlush);
}
}

/*
* Flush a few BTD buffers in a snapshot
* controlled by FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5)
* can be adjusted in the input file: <diag>.buffer_flush_limit_btd
*/
Comment thread
guj marked this conversation as resolved.
Outdated
void
WarpXOpenPMDPlot::ForceFlush(bool isBTD)
{
if (!isBTD)
Comment thread
ax3l marked this conversation as resolved.
Outdated
return;

auto hasOption = m_OpenPMDoptions.find("FlattenSteps");
const bool flattenSteps = isBTD && (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);
const bool result = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos);
Comment thread
guj marked this conversation as resolved.
Outdated
Comment thread
guj marked this conversation as resolved.
Outdated

if (flattenSteps)
if (result)
{
// forcing new step so data from each btd batch in
// preferred_flush_target="buffer" can be flushed out
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
WARPX_PROFILE("WarpXOpenPMDPlot::FlattenSteps()");
Comment thread
guj marked this conversation as resolved.
Outdated
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")");
Comment thread
guj marked this conversation as resolved.
}
else
{
WARPX_PROFILE("WarpXOpenPMDPlot::PDW()");
Comment thread
guj marked this conversation as resolved.
Outdated
openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD);
currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "disk")");
Comment thread
guj marked this conversation as resolved.
}
}


void
WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc,
const std::string& name,
Expand Down Expand Up @@ -1509,12 +1542,22 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename,
// GPU pointers to the I/O library
#ifdef AMREX_USE_GPU
if (fab.arena()->isManaged() || fab.arena()->isDevice()) {
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
amrex::Gpu::dtoh_memcpy_async(data_pinned.get(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
// intentionally delayed until before we .flush(): amrex::Gpu::streamSynchronize();
mesh_comp.storeChunk(data_pinned, chunk_offset, chunk_size);
} else
{
WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H_Span()");
auto dynamicMemoryView = mesh_comp.storeChunk<amrex::Real>(
Copy link
Copy Markdown
Member

@ax3l ax3l Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, when we have an un-equal number of blocks over all MPI ranks, this will not work because storeChunk is collective :-o openPMD/openPMD-api#1794

Copy link
Copy Markdown
Member

@ax3l ax3l Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To double check: is Iteration::open() called before these blocks, so the ADIOS engine is definitely open? -- Yes
https://github.com/guj/WarpX/blob/d0bbf79e64beed82b64d29f85ec0a56ed0f5f087/Source/Diagnostics/WarpXOpenPMD.cpp#L1426-L1430

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, when we have an un-equal number of blocks over all MPI ranks, this will not work because storeChunk is collective :-o openPMD/openPMD-api#1794

Only one rank is writing when using BTD. Other ranks have no data.

Copy link
Copy Markdown
Member

@ax3l ax3l Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one rank is writing when using BTD. Other ranks have no data.

Yes, small correction: Only a small subset of ranks is writing when using BTD. This is, because BTD stations collect data in (transverse) slices that move through the simulation.

chunk_offset, chunk_size,
[&local_box](size_t size) {
(void) size;
amrex::Print()<<" span failed \n";
Comment thread
guj marked this conversation as resolved.
Outdated
amrex::BaseFab<amrex::Real> foo(local_box, 1, amrex::The_Pinned_Arena());
std::shared_ptr<amrex::Real> data_pinned(foo.release());
return data_pinned;
});
Comment thread
guj marked this conversation as resolved.
Outdated

auto span = dynamicMemoryView.currentBuffer();
amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real));
}
} else
#endif
{
amrex::Real const *local_data = fab.dataPtr(icomp);
Comment thread
ax3l marked this conversation as resolved.
Outdated
Expand Down
Loading