Skip to content

[NeoML] DistributedTraining uses IsDnnInferenced #1110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion NeoML/include/NeoML/Dnn/DnnDistributed.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ class NEOML_API CDistributedTraining {
CPointerArray<CRandom> rands;
// Separate dnn for each thread
CPointerArray<CDnn> cnns;
// Indicates for what dnns the inference was performed
CArray<bool> isDnnInferenced;
// Separate `batchSize` for each dnn (may be empty) in a thread
CArray<int> batchSize;
// `Train()` cannot be called if it `isFirstRun`
// `batchSize` may not be equal 0, if it `isFirstRun` for `RunOnce`, `RunAndBackwardOnce` or `RunAndLearnOnce`.
// `batchSize` may not be equal 0, if it `isFirstRun` for `RunAndBackwardOnce` or `RunAndLearnOnce`.
bool isFirstRun = true;
// Containers for errors if it happened
CArray<CString> errorMessages;
Expand Down
63 changes: 43 additions & 20 deletions NeoML/src/Dnn/DnnDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ struct CDistributedTraining::CThreadParams final {
bool* const IsFirstRun;
IDistributedDataset* const Data;
CPointerArray<CDnn>& Dnns;
CArray<bool>* IsDnnInferenced;
CArray<int>& BatchSize;
const bool IsCpu;
CArray<CString>& ErrorMessages;
Expand All @@ -173,19 +174,25 @@ struct CDistributedTraining::CThreadParams final {

// RunOnce and RunAndBackwardOnce
CThreadParams( bool* isFirstRun, IDistributedDataset* data, CPointerArray<CDnn>& dnns,
CArray<int>& batchSize, bool isCpu, CArray<CString>& errorMessages ) :
CArray<bool>* isDnnInferenced, CArray<int>& batchSize, bool isCpu, CArray<CString>& errorMessages ) :
IsFirstRun( isFirstRun ),
Data( data ),
Dnns( dnns ),
IsDnnInferenced( isDnnInferenced ),
BatchSize( batchSize ),
IsCpu( isCpu ),
ErrorMessages( errorMessages )
{}
{
if( IsDnnInferenced != nullptr ) {
IsDnnInferenced->DeleteAll();
IsDnnInferenced->Add( false, Dnns.Size() );
}
}

// solver.Train
CThreadParams( CPointerArray<CDnn>& dnns,
CArray<int>& batchSize, int totalBatch, bool isCpu, CArray<CString>& errorMessages ) :
CThreadParams( nullptr, nullptr, dnns, batchSize, isCpu, errorMessages )
CThreadParams( nullptr, nullptr, dnns, nullptr, batchSize, isCpu, errorMessages )
{ TotalBatch = totalBatch; }

void SetErrorMessage( int threadIndex, CString message );
Expand All @@ -195,6 +202,7 @@ void CDistributedTraining::CThreadParams::SetErrorMessage( int threadIndex, CStr
{
IsErrorHappened = true;
ErrorMessages[threadIndex] = std::move( message );
ErrorMessages[threadIndex] += "(thread = " + Str( threadIndex ) + ")";
// This abort is monitored only inside:
// - CDnnSolver::allReduce (MathEngine.AllReduce)
// - CDnnDistributedInitializer::InitializeLayerParams (MathEngine.Broadcast)
Expand All @@ -217,6 +225,7 @@ void CDistributedTraining::initialize( CArchive& archive, int count, TDistribute
archive.Serialize( *cnns[i] );
archive.Seek( 0, static_cast<CBaseFile::TSeekPosition>( 0 ) );
}
isDnnInferenced.Add( false, count );
batchSize.Add( 0, count );
errorMessages.Add( {}, count );
}
Expand Down Expand Up @@ -342,22 +351,20 @@ float CDistributedTraining::GetLearningRate() const

void CDistributedTraining::RunOnce( IDistributedDataset& data )
{
CThreadParams function_params( &isFirstRun, &data, cnns, batchSize, isCpu, errorMessages );
CThreadParams function_params( nullptr, &data, cnns, &isDnnInferenced, batchSize, isCpu, errorMessages );

IThreadPool::TFunction f = [](int threadIndex, void* ptr)
{
CThreadParams& function_params = *static_cast<CThreadParams*>( ptr );
CPointerArray<CDnn>& cnns = function_params.Dnns;
CArray<int>& batchSize = function_params.BatchSize;
try {
CThreadGroupSwitcher groupSwitcher( function_params.IsCpu, threadIndex, cnns.Size() );
// Returns the current batch size (or 0, if there is no data for this thread on this run)
const int currBatchSize = function_params.Data->SetInputBatch( *cnns[threadIndex], threadIndex );
NeoAssert( currBatchSize > 0 || ( currBatchSize == 0 && !( *function_params.IsFirstRun ) ) );
if( currBatchSize > 0 ) {
batchSize[threadIndex] += currBatchSize;
cnns[threadIndex]->RunOnce();
function_params.IsDnnInferenced->ReplaceAt( true, threadIndex );
}
*function_params.IsFirstRun = false;
} catch( std::exception& e ) {
function_params.SetErrorMessage( threadIndex, e.what() );
}
Expand All @@ -376,7 +383,7 @@ void CDistributedTraining::RunOnce( IDistributedDataset& data )

void CDistributedTraining::RunAndBackwardOnce( IDistributedDataset& data )
{
CThreadParams function_params( &isFirstRun, &data, cnns, batchSize, isCpu, errorMessages );
CThreadParams function_params( &isFirstRun, &data, cnns, &isDnnInferenced, batchSize, isCpu, errorMessages );

IThreadPool::TFunction f = [](int threadIndex, void* ptr)
{
Expand All @@ -385,11 +392,15 @@ void CDistributedTraining::RunAndBackwardOnce( IDistributedDataset& data )
CArray<int>& batchSize = function_params.BatchSize;
try {
CThreadGroupSwitcher groupSwitcher( function_params.IsCpu, threadIndex, cnns.Size() );
// Returns the current batch size (or 0, if there is no data for this thread on this run)
const int currBatchSize = function_params.Data->SetInputBatch( *cnns[threadIndex], threadIndex );
// Cannot avoid this assert, in solver->Train() should participate all of dnns
NeoAssert( currBatchSize > 0 || ( currBatchSize == 0 && !( *function_params.IsFirstRun ) ) );
if( currBatchSize > 0 ) {
batchSize[threadIndex] += currBatchSize;
cnns[threadIndex]->RunAndBackwardOnce();
// May want retreive the sinks results after this, because RunOnce() was launched
function_params.IsDnnInferenced->ReplaceAt( true, threadIndex );
}
*function_params.IsFirstRun = false;
} catch( std::exception& e ) {
Expand Down Expand Up @@ -474,17 +485,23 @@ void CDistributedTraining::GetLastLoss( const CString& layerName, CArray<float>&
void CDistributedTraining::GetLastBlob( const CString& layerName, CObjectArray<const CDnnBlob>& blobs ) const
{
blobs.SetSize( cnns.Size() );
// Return blobs for all models
for( int i = 0; i < cnns.Size(); ++i ) {
blobs[i] = CheckCast<const CSinkLayer>( cnns[i]->GetLayer( layerName ) )->GetBlob();
blobs[i] = ( isDnnInferenced[i] )
? CheckCast<const CSinkLayer>( cnns[i]->GetLayer( layerName ) )->GetBlob()
: nullptr;
}
}

// deprecated
void CDistributedTraining::GetLastBlob( const CString& layerName, CObjectArray<CDnnBlob>& blobs ) const
{
blobs.SetSize( cnns.Size() );
// Return blobs for all models
for( int i = 0; i < cnns.Size(); ++i ) {
blobs[i] = CheckCast<const CSinkLayer>( cnns[i]->GetLayer( layerName ) )->GetBlob();
blobs[i] = ( isDnnInferenced[i] )
? CheckCast<const CSinkLayer>( cnns[i]->GetLayer( layerName ) )->GetBlob()
: nullptr;
}
}

Expand Down Expand Up @@ -518,6 +535,7 @@ struct CDistributedInference::CThreadParams final {

CThreadParams( int threadsCount, CReferenceDnnFactory& referenceDnnFactory );
void Initialize( IDistributedDataset& data );
void SetErrorMessage( int threadIndex, CString message );
};

CDistributedInference::CThreadParams::CThreadParams( int threadsCount, CReferenceDnnFactory& referenceDnnFactory )
Expand Down Expand Up @@ -546,6 +564,13 @@ void CDistributedInference::CThreadParams::Initialize( IDistributedDataset& data
IsErrorHappened = false;
}

void CDistributedInference::CThreadParams::SetErrorMessage( int threadIndex, CString message )
{
IsErrorHappened = true;
ErrorMessages[threadIndex] = std::move( message );
ErrorMessages[threadIndex] += "(thread = " + Str( threadIndex ) + ")";
}

//---------------------------------------------------------------------------------------------------------------------

CDistributedInference::CDistributedInference( const CDnn& dnn, int threadsCount,
Expand Down Expand Up @@ -588,13 +613,11 @@ void CDistributedInference::RunOnce( IDistributedDataset& data )
threadParams.IsDnnInferenced[threadIndex] = true;
}
} catch( std::exception& e ) {
threadParams.IsErrorHappened = true;
threadParams.ErrorMessages[threadIndex] = e.what();
threadParams.SetErrorMessage( threadIndex, e.what() );
}
#ifdef NEOML_USE_FINEOBJ
catch( CException* e ) {
threadParams.IsErrorHappened = true;
threadParams.ErrorMessages[threadIndex] = e->MessageText().CreateString();
threadParams.SetErrorMessage( threadIndex, e->MessageText().CreateString() );
delete e;
}
#endif // NEOML_USE_FINEOBJ
Expand All @@ -608,12 +631,12 @@ void CDistributedInference::RunOnce( IDistributedDataset& data )

void CDistributedInference::GetLastBlob( const CString& layerName, CObjectArray<const CDnnBlob>& blobs ) const
{
blobs.DeleteAll();
blobs.Add( nullptr, threadParams->Refs.Size() );
blobs.SetSize( threadParams->Refs.Size() );
// Return blobs for all models
for( int i = 0; i < threadParams->Refs.Size(); ++i ) {
if( threadParams->IsDnnInferenced[i] ) {
blobs[i] = CheckCast<const CSinkLayer>( threadParams->Refs[i]->Dnn.GetLayer( layerName ) )->GetBlob();
}
blobs[i] = ( threadParams->IsDnnInferenced[i] )
? CheckCast<const CSinkLayer>( threadParams->Refs[i]->Dnn.GetLayer( layerName ) )->GetBlob()
: nullptr;
}
}

Expand Down