2022-06-12 21:37:11 +05:30
# include "ConcurrentTask.h"
# include <QDebug>
2022-07-25 00:49:25 +05:30
# include <QCoreApplication>
2023-03-31 12:20:29 +05:30
# include "tasks/Task.h"
2022-06-12 21:37:11 +05:30
ConcurrentTask : : ConcurrentTask ( QObject * parent , QString task_name , int max_concurrent )
: Task ( parent ) , m_name ( task_name ) , m_total_max_size ( max_concurrent )
2022-07-25 00:49:25 +05:30
{ setObjectName ( task_name ) ; }
2022-06-12 21:37:11 +05:30
ConcurrentTask : : ~ ConcurrentTask ( )
{
for ( auto task : m_queue ) {
if ( task )
task - > deleteLater ( ) ;
}
}
2023-03-31 12:20:29 +05:30
auto ConcurrentTask : : getStepProgress ( ) const - > TaskStepProgressList
2022-06-12 21:37:11 +05:30
{
2023-03-31 12:20:29 +05:30
return m_task_progress . values ( ) ;
2022-06-12 21:37:11 +05:30
}
void ConcurrentTask : : addTask ( Task : : Ptr task )
{
2022-12-15 22:16:51 +05:30
m_queue . append ( task ) ;
2022-06-12 21:37:11 +05:30
}
void ConcurrentTask : : executeTask ( )
{
2022-09-07 20:41:42 +05:30
// Start the least amount of tasks needed, but at least one
2023-03-30 23:52:55 +05:30
// int num_starts = qMax(1, qMin(m_total_max_size, m_queue.size()));
// for (int i = 0; i < num_starts; i++) {
// QMetaObject::invokeMethod(this, &ConcurrentTask::startNext, Qt::QueuedConnection);
// }
// Start One task, startNext hadles starting the up to the m_total_max_size
// while tracking the number currently being done
QMetaObject : : invokeMethod ( this , & ConcurrentTask : : startNext , Qt : : QueuedConnection ) ;
2022-06-12 21:37:11 +05:30
}
bool ConcurrentTask : : abort ( )
{
2022-07-25 01:46:14 +05:30
m_queue . clear ( ) ;
m_aborted = true ;
2022-06-12 21:37:11 +05:30
if ( m_doing . isEmpty ( ) ) {
// Don't call emitAborted() here, we want to bypass the 'is the task running' check
emit aborted ( ) ;
emit finished ( ) ;
return true ;
}
2022-07-25 01:46:14 +05:30
bool suceedeed = true ;
2022-06-12 21:37:11 +05:30
2022-07-25 01:46:14 +05:30
QMutableHashIterator < Task * , Task : : Ptr > doing_iter ( m_doing ) ;
while ( doing_iter . hasNext ( ) ) {
auto task = doing_iter . next ( ) ;
suceedeed & = ( task . value ( ) ) - > abort ( ) ;
}
2022-06-12 21:37:11 +05:30
2022-07-25 01:46:14 +05:30
if ( suceedeed )
2022-06-12 21:37:11 +05:30
emitAborted ( ) ;
2022-07-25 01:46:14 +05:30
else
emitFailed ( tr ( " Failed to abort all running tasks. " ) ) ;
2022-06-12 21:37:11 +05:30
2022-07-25 01:46:14 +05:30
return suceedeed ;
2022-06-12 21:37:11 +05:30
}
2022-12-15 22:15:50 +05:30
void ConcurrentTask : : clear ( )
{
Q_ASSERT ( ! isRunning ( ) ) ;
m_done . clear ( ) ;
m_failed . clear ( ) ;
m_queue . clear ( ) ;
m_aborted = false ;
m_progress = 0 ;
m_stepProgress = 0 ;
}
2022-06-12 21:37:11 +05:30
void ConcurrentTask : : startNext ( )
{
if ( m_aborted | | m_doing . count ( ) > m_total_max_size )
return ;
2022-07-22 09:49:56 +05:30
if ( m_queue . isEmpty ( ) & & m_doing . isEmpty ( ) & & ! wasSuccessful ( ) ) {
2022-06-12 21:37:11 +05:30
emitSucceeded ( ) ;
return ;
}
if ( m_queue . isEmpty ( ) )
return ;
Task : : Ptr next = m_queue . dequeue ( ) ;
2023-03-30 23:52:55 +05:30
connect ( next . get ( ) , & Task : : succeeded , this , [ this , next ] ( ) { subTaskSucceeded ( next ) ; } ) ;
2022-06-12 21:37:11 +05:30
connect ( next . get ( ) , & Task : : failed , this , [ this , next ] ( QString msg ) { subTaskFailed ( next , msg ) ; } ) ;
2023-03-30 23:52:55 +05:30
connect ( next . get ( ) , & Task : : status , this , [ this , next ] ( QString msg ) { subTaskStatus ( next , msg ) ; } ) ;
2023-03-31 12:20:29 +05:30
connect ( next . get ( ) , & Task : : stepProgress , this , [ this , next ] ( TaskStepProgressList tp ) { subTaskStepProgress ( next , tp ) ; } ) ;
2022-06-12 21:37:11 +05:30
2023-03-30 23:52:55 +05:30
connect ( next . get ( ) , & Task : : progress , this , [ this , next ] ( qint64 current , qint64 total ) { subTaskProgress ( next , current , total ) ; } ) ;
2022-06-12 21:37:11 +05:30
m_doing . insert ( next . get ( ) , next ) ;
2023-03-30 23:52:55 +05:30
m_task_progress . insert ( next - > getUid ( ) , std : : make_shared < TaskStepProgress > ( TaskStepProgress ( { next - > getUid ( ) } ) ) ) ;
2022-06-12 21:37:11 +05:30
updateState ( ) ;
2023-03-31 12:20:29 +05:30
updateStepProgress ( ) ;
2022-06-12 21:37:11 +05:30
2023-01-18 00:38:50 +05:30
QCoreApplication : : processEvents ( ) ;
2022-12-15 22:16:51 +05:30
QMetaObject : : invokeMethod ( next . get ( ) , & Task : : start , Qt : : QueuedConnection ) ;
2022-07-25 00:49:25 +05:30
2022-12-15 22:16:51 +05:30
// Allow going up the number of concurrent tasks in case of tasks being added in the middle of a running task.
2023-01-25 00:14:12 +05:30
int num_starts = qMin ( m_queue . size ( ) , m_total_max_size - m_doing . size ( ) ) ;
2022-12-15 22:16:51 +05:30
for ( int i = 0 ; i < num_starts ; i + + )
QMetaObject : : invokeMethod ( this , & ConcurrentTask : : startNext , Qt : : QueuedConnection ) ;
2022-06-12 21:37:11 +05:30
}
void ConcurrentTask : : subTaskSucceeded ( Task : : Ptr task )
{
m_done . insert ( task . get ( ) , task ) ;
2023-03-30 23:52:55 +05:30
m_succeeded . insert ( task . get ( ) , task ) ;
2022-06-12 21:37:11 +05:30
m_doing . remove ( task . get ( ) ) ;
2023-03-31 12:20:29 +05:30
m_task_progress . value ( task - > getUid ( ) ) - > state = TaskStepState : : Succeeded ;
2022-06-12 21:37:11 +05:30
disconnect ( task . get ( ) , 0 , this , 0 ) ;
updateState ( ) ;
2023-03-31 12:20:29 +05:30
updateStepProgress ( ) ;
2022-06-12 21:37:11 +05:30
startNext ( ) ;
}
void ConcurrentTask : : subTaskFailed ( Task : : Ptr task , const QString & msg )
{
m_done . insert ( task . get ( ) , task ) ;
m_failed . insert ( task . get ( ) , task ) ;
m_doing . remove ( task . get ( ) ) ;
2023-03-31 12:20:29 +05:30
m_task_progress . value ( task - > getUid ( ) ) - > state = TaskStepState : : Failed ;
2022-06-12 21:37:11 +05:30
disconnect ( task . get ( ) , 0 , this , 0 ) ;
updateState ( ) ;
2023-03-31 12:20:29 +05:30
updateStepProgress ( ) ;
2022-06-12 21:37:11 +05:30
startNext ( ) ;
}
2023-03-30 23:52:55 +05:30
void ConcurrentTask : : subTaskStatus ( Task : : Ptr task , const QString & msg )
{
auto taskProgress = m_task_progress . value ( task - > getUid ( ) ) ;
2023-03-31 12:20:29 +05:30
taskProgress - > status = msg ;
taskProgress - > state = TaskStepState : : Running ;
2023-03-30 23:52:55 +05:30
updateState ( ) ;
2023-03-31 12:20:29 +05:30
updateStepProgress ( ) ;
2023-03-30 23:52:55 +05:30
}
void ConcurrentTask : : subTaskProgress ( Task : : Ptr task , qint64 current , qint64 total )
2022-06-12 21:37:11 +05:30
{
2023-03-30 23:52:55 +05:30
auto taskProgress = m_task_progress . value ( task - > getUid ( ) ) ;
taskProgress - > current = current ;
taskProgress - > total = total ;
2023-03-31 12:20:29 +05:30
taskProgress - > state = TaskStepState : : Running ;
2023-03-30 23:52:55 +05:30
taskProgress - > details = task - > getDetails ( ) ;
updateStepProgress ( ) ;
updateState ( ) ;
2022-06-12 21:37:11 +05:30
}
2023-03-31 12:20:29 +05:30
void ConcurrentTask : : subTaskStepProgress ( Task : : Ptr task , TaskStepProgressList task_step_progress )
2022-06-12 21:37:11 +05:30
{
2023-03-30 23:52:55 +05:30
for ( auto progress : task_step_progress ) {
2023-03-31 12:20:29 +05:30
if ( ! m_task_progress . contains ( progress - > uid ) ) {
m_task_progress . insert ( progress - > uid , progress ) ;
} else {
auto tp = m_task_progress . value ( progress - > uid ) ;
tp - > current = progress - > current ;
tp - > total = progress - > total ;
tp - > status = progress - > status ;
tp - > details = progress - > details ;
}
2023-03-30 23:52:55 +05:30
}
2023-03-31 12:20:29 +05:30
updateStepProgress ( ) ;
2023-03-30 23:52:55 +05:30
}
void ConcurrentTask : : updateStepProgress ( )
{
qint64 current = 0 , total = 0 ;
for ( auto taskProgress : m_task_progress ) {
current + = taskProgress - > current ;
total + = taskProgress - > total ;
}
m_stepProgress = current ;
m_stepTotalProgress = total ;
2023-03-31 12:20:29 +05:30
emit stepProgress ( m_task_progress . values ( ) ) ;
2022-06-12 21:37:11 +05:30
}
void ConcurrentTask : : updateState ( )
{
2023-03-30 23:52:55 +05:30
if ( totalSize ( ) > 1 ) {
setProgress ( m_done . count ( ) , totalSize ( ) ) ;
setStatus ( tr ( " Executing %1 task(s) (%2 out of %3 are done) " ) . arg ( QString : : number ( m_doing . count ( ) ) , QString : : number ( m_done . count ( ) ) , QString : : number ( totalSize ( ) ) ) ) ;
} else {
setProgress ( m_stepProgress , m_stepTotalProgress ) ;
QString status = tr ( " Please wait ... " ) ;
if ( m_queue . size ( ) > 0 ) {
status = tr ( " Waiting for 1 task to start ... " ) ;
} else if ( m_doing . size ( ) > 0 ) {
status = tr ( " Executing 1 task: " ) ;
} else if ( m_done . size ( ) > 0 ) {
status = tr ( " Task finished. " ) ;
}
setStatus ( status ) ;
}
2022-06-12 21:37:11 +05:30
}