Use new multithreaded task que on second stage ResMed importer

This commit is contained in:
Mark Watkins 2014-06-30 21:20:12 +10:00
parent fe184a1f4a
commit 3749ee3cf8
3 changed files with 129 additions and 115 deletions

View File

@ -695,10 +695,14 @@ void ResmedImport::run()
// Ignore all the rest of the sumary data, because there is enough available to calculate it with higher accuracy.
if (sess->length() > 0) {
loader->saveMutex.lock();
if (!mach->AddSession(sess, p_profile)) {
delete sess;
loader->saveMutex.unlock();
return;
}
loader->saveMutex.unlock();
} else {
delete sess;
return;
@ -771,6 +775,108 @@ Machine *ResmedLoader::CreateMachine(QString serial, Profile *profile)
}
void ResmedImportStage2::run()
{
Session * sess = new Session(mach, R.maskon);
sess->really_set_first(qint64(R.maskon) * 1000L);
sess->really_set_last(qint64(R.maskoff) * 1000L);
// Claim this record for future imports
sess->settings[RMS9_MaskOnTime] = R.maskon;
sess->settings[CPAP_SummaryOnly] = true;
sess->SetChanged(true);
// First take the settings
if (R.set_pressure >= 0)
sess->settings[CPAP_Pressure] = R.set_pressure;
if (R.min_pressure >= 0) sess->settings[CPAP_PressureMin] = R.min_pressure;
if (R.max_pressure >= 0) sess->settings[CPAP_PressureMax] = R.max_pressure;
if (R.ps >= 0) sess->settings[CPAP_PS] = R.ps;
if (R.min_ps >= 0) sess->settings[CPAP_PSMin] = R.min_ps;
if (R.max_ps >= 0) sess->settings[CPAP_PSMax] = R.max_ps;
if (R.epap >= 0) sess->settings[CPAP_EPAP] = R.epap;
if (R.max_epap >= 0) sess->settings[CPAP_EPAPHi] = R.max_epap;
if (R.min_epap >= 0) sess->settings[CPAP_EPAPLo] = R.min_epap;
if (R.ipap >= 0) sess->settings[CPAP_IPAP] = R.ipap;
if (R.max_ipap >= 0) sess->settings[CPAP_IPAPHi] = R.max_ipap;
if (R.min_ipap >= 0) sess->settings[CPAP_IPAPLo] = R.min_ipap;
if (R.mode >= 0) sess->settings[CPAP_Mode] = R.mode;
if (R.epr >= 0) {
sess->settings[RMS9_EPR] = R.epr;
sess->settings[CPAP_PresReliefType] = (int)PR_EPR;
}
if (R.epr_set >= 0) {
sess->settings[RMS9_EPRSet] = R.epr_set;
sess->settings[CPAP_PresReliefSet] = R.epr_set;
}
if (R.leakmax >= 0) sess->setMax(CPAP_Leak, R.leakmax);
if (R.leakmax >= 0) sess->setMin(CPAP_Leak, 0);
if ((R.leakmed >= 0) && (R.leak95 >= 0) && (R.leakmax >= 0)) {
sess->m_timesummary[CPAP_Leak][short(R.leakmax / R.leakgain)]=1;
sess->m_timesummary[CPAP_Leak][short(R.leak95 / R.leakgain)]=9;
sess->m_timesummary[CPAP_Leak][short(R.leakmed / R.leakgain)]=65;
sess->m_timesummary[CPAP_Leak][0]=25;
}
// Find the matching date group for this record
QMap<QDate, QList<STRRecord *> >::iterator dtit = loader->strdate.find(R.date);
// should not be possible, but my brain hurts...
Q_ASSERT(dtit != loader->strdate.end());
if (dtit != loader->strdate.end()) {
QList<STRRecord *> & dayrecs = dtit.value();
bool hasdatasess=false;
EventDataType time=0, totaltime=0;
for (int c=0; c < dayrecs.size(); ++c) {
STRRecord *r = dayrecs[c];
if (r->sessionid > 0) {
// get complicated.. calculate all the counts for valid sessions, and use the summary to make up the rest
hasdatasess = true;
}
totaltime += r->maskoff - r->maskon;
}
if (!hasdatasess) {
for (int c=0; c < dayrecs.size(); ++c) {
STRRecord *r = dayrecs[c];
time = r->maskoff - r->maskon;
float ratio = time / totaltime;
// Add the time weighted proportion of the events counts
if (r->ai >= 0) {
sess->setCount(CPAP_Obstructive, r->ai * ratio);
sess->setCph(CPAP_Obstructive, (r->ai * ratio) / (time / 3600.0));
}
if (r->uai >= 0) {
sess->setCount(CPAP_Apnea, r->uai * ratio);
sess->setCph(CPAP_Apnea, (r->uai * ratio) / (time / 3600.0));
}
if (r->hi >= 0) {
sess->setCount(CPAP_Hypopnea, r->hi * ratio);
sess->setCph(CPAP_Hypopnea, (r->hi * ratio) / (time / 3600.0));
}
if (r->cai >= 0) {
sess->setCount(CPAP_ClearAirway, r->cai * ratio);
sess->setCph(CPAP_ClearAirway, (r->ai * ratio) / (time / 3600.0));
}
}
}
}
loader->saveMutex.lock();
mach->AddSession(sess, p_profile);
sess->Store(p_profile->Get(mach->properties[STR_PROP_Path]));
loader->saveMutex.unlock();
}
long event_cnt = 0;
@ -1240,12 +1346,9 @@ int ResmedLoader::Open(QString path, Profile *profile)
// strsess end can change above.
end = strsess.end();
m->lockSaveMutex();
m->setTotalTasks(m->totalTasks() + size);
m->unlockSaveMutex();
m->StartSaveThreads();
// m->lockSaveMutex();
// m->setTotalTasks(m->totalTasks() + size);
// m->unlockSaveMutex();
/////////////////////////////////////////////////////////////////////////////////////////////
// Scan through unmatched strsess records, and attempt to get at summary data
@ -1259,120 +1362,16 @@ int ResmedLoader::Open(QString path, Profile *profile)
}
//Q_ASSERT(R.sessionid == 0);
// the following should not happen
if (R.sessionid > 0) {
m->skipSaveTask();
continue;
}
// if ((++cnt % 5) == 0) {
// // TODO: Change me to emit once MachineLoader is QObjectified...
// if (qprogress) { qprogress->setValue(10.0 + (float(cnt) / float(size) * 90.0)); }
// QApplication::processEvents();
// }
sess = new Session(m, R.maskon);
sess->really_set_first(qint64(R.maskon) * 1000L);
sess->really_set_last(qint64(R.maskoff) * 1000L);
// Claim this record for future imports
sess->settings[RMS9_MaskOnTime] = R.maskon;
sess->settings[CPAP_SummaryOnly] = true;
sess->SetChanged(true);
// First take the settings
if (R.set_pressure >= 0)
sess->settings[CPAP_Pressure] = R.set_pressure;
if (R.min_pressure >= 0) sess->settings[CPAP_PressureMin] = R.min_pressure;
if (R.max_pressure >= 0) sess->settings[CPAP_PressureMax] = R.max_pressure;
if (R.ps >= 0) sess->settings[CPAP_PS] = R.ps;
if (R.min_ps >= 0) sess->settings[CPAP_PSMin] = R.min_ps;
if (R.max_ps >= 0) sess->settings[CPAP_PSMax] = R.max_ps;
if (R.epap >= 0) sess->settings[CPAP_EPAP] = R.epap;
if (R.max_epap >= 0) sess->settings[CPAP_EPAPHi] = R.max_epap;
if (R.min_epap >= 0) sess->settings[CPAP_EPAPLo] = R.min_epap;
if (R.ipap >= 0) sess->settings[CPAP_IPAP] = R.ipap;
if (R.max_ipap >= 0) sess->settings[CPAP_IPAPHi] = R.max_ipap;
if (R.min_ipap >= 0) sess->settings[CPAP_IPAPLo] = R.min_ipap;
if (R.mode >= 0) sess->settings[CPAP_Mode] = R.mode;
if (R.epr >= 0) {
sess->settings[RMS9_EPR] = R.epr;
sess->settings[CPAP_PresReliefType] = (int)PR_EPR;
queTask(new ResmedImportStage2(this, R, m));
}
if (R.epr_set >= 0) {
sess->settings[RMS9_EPRSet] = R.epr_set;
sess->settings[CPAP_PresReliefSet] = R.epr_set;
}
if (R.leakmax >= 0) sess->setMax(CPAP_Leak, R.leakmax);
if (R.leakmax >= 0) sess->setMin(CPAP_Leak, 0);
if ((R.leakmed >= 0) && (R.leak95 >= 0) && (R.leakmax >= 0)) {
sess->m_timesummary[CPAP_Leak][short(R.leakmax / R.leakgain)]=1;
sess->m_timesummary[CPAP_Leak][short(R.leak95 / R.leakgain)]=9;
sess->m_timesummary[CPAP_Leak][short(R.leakmed / R.leakgain)]=65;
sess->m_timesummary[CPAP_Leak][0]=25;
}
// Find the matching date group for this record
QMap<QDate, QList<STRRecord *> >::iterator dtit = strdate.find(R.date);
// should not be possible, but my brain hurts...
Q_ASSERT(dtit != strdate.end());
if (dtit != strdate.end()) {
QList<STRRecord *> & dayrecs = dtit.value();
bool hasdatasess=false;
EventDataType time=0, totaltime=0;
for (int c=0; c < dayrecs.size(); ++c) {
STRRecord *r = dayrecs[c];
if (r->sessionid > 0) {
// get complicated.. calculate all the counts for valid sessions, and use the summary to make up the rest
hasdatasess = true;
}
totaltime += r->maskoff - r->maskon;
}
if (!hasdatasess) {
for (int c=0; c < dayrecs.size(); ++c) {
STRRecord *r = dayrecs[c];
time = r->maskoff - r->maskon;
float ratio = time / totaltime;
// Add the time weighted proportion of the events counts
if (r->ai >= 0) {
sess->setCount(CPAP_Obstructive, r->ai * ratio);
sess->setCph(CPAP_Obstructive, (r->ai * ratio) / (time / 3600.0));
}
if (r->uai >= 0) {
sess->setCount(CPAP_Apnea, r->uai * ratio);
sess->setCph(CPAP_Apnea, (r->uai * ratio) / (time / 3600.0));
}
if (r->hi >= 0) {
sess->setCount(CPAP_Hypopnea, r->hi * ratio);
sess->setCph(CPAP_Hypopnea, (r->hi * ratio) / (time / 3600.0));
}
if (r->cai >= 0) {
sess->setCount(CPAP_ClearAirway, r->cai * ratio);
sess->setCph(CPAP_ClearAirway, (r->ai * ratio) / (time / 3600.0));
}
}
}
}
m->AddSession(sess, profile);
m->queSaveList(sess);
}
m->FinishSaveThreads();
runTasks();
#ifdef DEBUG_EFFICIENCY
{

View File

@ -311,6 +311,19 @@ protected:
Machine * mach;
};
class ResmedImportStage2:public ImportTask
{
public:
ResmedImportStage2(ResmedLoader * l, STRRecord r, Machine * m): loader(l), R(r), mach(m) {}
virtual ~ResmedImportStage2() {}
virtual void run();
protected:
ResmedLoader * loader;
STRRecord R;
Machine * mach;
};
/*! \class ResmedLoader
\brief Importer for ResMed S9 Data
@ -318,6 +331,7 @@ protected:
class ResmedLoader : public MachineLoader
{
friend class ResmedImport;
friend class ResmedImportStage2;
public:
ResmedLoader();
virtual ~ResmedLoader();

View File

@ -428,8 +428,9 @@ void Machine::StartSaveThreads()
m_totaltasks=0;
for (int i = 0; i < threads; i++) {
thread.push_back(new SaveThread(this, path));
QObject::connect(thread[i], SIGNAL(UpdateProgress(int)), qprogress, SLOT(setValue(int)));
SaveThread * thr = new SaveThread(this, path);
QObject::connect(thr, SIGNAL(UpdateProgress(int)), qprogress, SLOT(setValue(int)));
thread.push_back(thr);
thread[i]->start();
}