GH-721 Redo internal NetJob implementation.

NetJob is now using its own task queue and does not start more than 6 actions at the same time
This commit is contained in:
Petr Mrázek
2015-01-11 22:04:31 +01:00
parent 1151037f96
commit 0886786bb5
7 changed files with 130 additions and 46 deletions

View File

@ -29,7 +29,7 @@ enum JobStatus
};
typedef std::shared_ptr<class NetAction> NetActionPtr;
class NetAction : public QObject
class NetAction : public QObject, public std::enable_shared_from_this<NetAction>
{
Q_OBJECT
protected:
@ -51,6 +51,11 @@ public:
{
return m_failures;
}
NetActionPtr getSharedPtr()
{
return shared_from_this();
}
public:
/// the network reply
std::shared_ptr<QNetworkReply> m_reply;

View File

@ -28,46 +28,27 @@ void NetJob::partSucceeded(int index)
auto &slot = parts_progress[index];
partProgress(index, slot.total_progress, slot.total_progress);
num_succeeded++;
QLOG_INFO() << m_job_name.toLocal8Bit() << "progress:" << num_succeeded << "/"
<< downloads.size();
if (num_failed + num_succeeded == downloads.size())
{
if (num_failed)
{
QLOG_ERROR() << m_job_name.toLocal8Bit() << "failed.";
emit failed();
}
else
{
QLOG_INFO() << m_job_name.toLocal8Bit() << "succeeded.";
emit succeeded();
}
}
m_doing.remove(index);
m_done.insert(index);
disconnect(downloads[index].get(), 0, this, 0);
startMoreParts();
}
void NetJob::partFailed(int index)
{
m_doing.remove(index);
auto &slot = parts_progress[index];
if (slot.failures == 3)
{
QLOG_ERROR() << "Part" << index << "failed 3 times (" << downloads[index]->m_url << ")";
num_failed++;
if (num_failed + num_succeeded == downloads.size())
{
QLOG_ERROR() << m_job_name.toLocal8Bit() << "failed.";
emit failed();
}
m_failed.insert(index);
}
else
{
QLOG_ERROR() << "Part" << index << "failed, restarting (" << downloads[index]->m_url
<< ")";
// restart the job
slot.failures++;
downloads[index]->start();
m_todo.enqueue(index);
}
disconnect(downloads[index].get(), 0, this, 0);
startMoreParts();
}
void NetJob::partProgress(int index, qint64 bytesReceived, qint64 bytesTotal)
@ -88,25 +69,58 @@ void NetJob::start()
{
QLOG_INFO() << m_job_name.toLocal8Bit() << " started.";
m_running = true;
for (auto iter : downloads)
for (int i = 0; i < downloads.size(); i++)
{
connect(iter.get(), SIGNAL(succeeded(int)), SLOT(partSucceeded(int)));
connect(iter.get(), SIGNAL(failed(int)), SLOT(partFailed(int)));
connect(iter.get(), SIGNAL(progress(int, qint64, qint64)),
m_todo.enqueue(i);
}
startMoreParts();
}
void NetJob::startMoreParts()
{
// check for final conditions if there's nothing in the queue
if(!m_todo.size())
{
if(!m_doing.size())
{
if(!m_failed.size())
{
QLOG_INFO() << m_job_name.toLocal8Bit() << "succeeded.";
emit succeeded();
}
else
{
QLOG_ERROR() << m_job_name.toLocal8Bit() << "failed.";
emit failed();
}
}
return;
}
// otherwise try to start more parts
while (m_doing.size() < 6)
{
if(!m_todo.size())
return;
int doThis = m_todo.dequeue();
m_doing.insert(doThis);
auto part = downloads[doThis];
// connect signals :D
connect(part.get(), SIGNAL(succeeded(int)), SLOT(partSucceeded(int)));
connect(part.get(), SIGNAL(failed(int)), SLOT(partFailed(int)));
connect(part.get(), SIGNAL(progress(int, qint64, qint64)),
SLOT(partProgress(int, qint64, qint64)));
iter->start();
part->start();
}
}
QStringList NetJob::getFailedFiles()
{
QStringList failed;
for (auto download : downloads)
for (auto index: m_failed)
{
if (download->m_status == Job_Failed)
{
failed.push_back(download->m_url.toString());
}
failed.push_back(downloads[index]->m_url.toString());
}
failed.sort();
return failed;
}

View File

@ -22,9 +22,10 @@
#include "CacheDownload.h"
#include "HttpMetaCache.h"
#include "logic/tasks/ProgressProvider.h"
#include "logic/QObjectPtr.h"
class NetJob;
typedef std::shared_ptr<NetJob> NetJobPtr;
typedef QObjectPtr<NetJob> NetJobPtr;
class NetJob : public ProgressProvider
{
@ -81,18 +82,22 @@ public:
return m_running;
}
QStringList getFailedFiles();
private:
void startMoreParts();
signals:
void started();
void progress(qint64 current, qint64 total);
void succeeded();
void failed();
public
slots:
public slots:
virtual void start();
// FIXME: implement
virtual void abort() {};
private
slots:
private slots:
void partProgress(int index, qint64 bytesReceived, qint64 bytesTotal);
void partSucceeded(int index);
void partFailed(int index);
@ -107,9 +112,11 @@ private:
QString m_job_name;
QList<NetActionPtr> downloads;
QList<part_info> parts_progress;
QQueue<int> m_todo;
QSet<int> m_doing;
QSet<int> m_done;
QSet<int> m_failed;
qint64 current_progress = 0;
qint64 total_progress = 0;
int num_succeeded = 0;
int num_failed = 0;
bool m_running = false;
};