From 3749ee3cf82e59352ef107b85d7d3bfa4bc79961 Mon Sep 17 00:00:00 2001 From: Mark Watkins Date: Mon, 30 Jun 2014 21:20:12 +1000 Subject: [PATCH] Use new multithreaded task que on second stage ResMed importer --- .../SleepLib/loader_plugins/resmed_loader.cpp | 225 +++++++++--------- .../SleepLib/loader_plugins/resmed_loader.h | 14 ++ sleepyhead/SleepLib/machine.cpp | 5 +- 3 files changed, 129 insertions(+), 115 deletions(-) diff --git a/sleepyhead/SleepLib/loader_plugins/resmed_loader.cpp b/sleepyhead/SleepLib/loader_plugins/resmed_loader.cpp index be65b9a7..f5e86d2d 100644 --- a/sleepyhead/SleepLib/loader_plugins/resmed_loader.cpp +++ b/sleepyhead/SleepLib/loader_plugins/resmed_loader.cpp @@ -695,10 +695,14 @@ void ResmedImport::run() // Ignore all the rest of the sumary data, because there is enough available to calculate it with higher accuracy. if (sess->length() > 0) { + loader->saveMutex.lock(); + if (!mach->AddSession(sess, p_profile)) { delete sess; + loader->saveMutex.unlock(); return; } + loader->saveMutex.unlock(); } else { delete sess; return; @@ -771,6 +775,108 @@ Machine *ResmedLoader::CreateMachine(QString serial, Profile *profile) } +void ResmedImportStage2::run() +{ + Session * sess = new Session(mach, R.maskon); + + sess->really_set_first(qint64(R.maskon) * 1000L); + sess->really_set_last(qint64(R.maskoff) * 1000L); + + // Claim this record for future imports + sess->settings[RMS9_MaskOnTime] = R.maskon; + sess->settings[CPAP_SummaryOnly] = true; + + sess->SetChanged(true); + + // First take the settings + if (R.set_pressure >= 0) + sess->settings[CPAP_Pressure] = R.set_pressure; + if (R.min_pressure >= 0) sess->settings[CPAP_PressureMin] = R.min_pressure; + if (R.max_pressure >= 0) sess->settings[CPAP_PressureMax] = R.max_pressure; + if (R.ps >= 0) sess->settings[CPAP_PS] = R.ps; + if (R.min_ps >= 0) sess->settings[CPAP_PSMin] = R.min_ps; + if (R.max_ps >= 0) sess->settings[CPAP_PSMax] = R.max_ps; + if (R.epap >= 0) sess->settings[CPAP_EPAP] = R.epap; + if (R.max_epap >= 0) sess->settings[CPAP_EPAPHi] = R.max_epap; + if (R.min_epap >= 0) sess->settings[CPAP_EPAPLo] = R.min_epap; + if (R.ipap >= 0) sess->settings[CPAP_IPAP] = R.ipap; + if (R.max_ipap >= 0) sess->settings[CPAP_IPAPHi] = R.max_ipap; + if (R.min_ipap >= 0) sess->settings[CPAP_IPAPLo] = R.min_ipap; + if (R.mode >= 0) sess->settings[CPAP_Mode] = R.mode; + if (R.epr >= 0) { + sess->settings[RMS9_EPR] = R.epr; + sess->settings[CPAP_PresReliefType] = (int)PR_EPR; + } + if (R.epr_set >= 0) { + sess->settings[RMS9_EPRSet] = R.epr_set; + sess->settings[CPAP_PresReliefSet] = R.epr_set; + } + if (R.leakmax >= 0) sess->setMax(CPAP_Leak, R.leakmax); + if (R.leakmax >= 0) sess->setMin(CPAP_Leak, 0); + if ((R.leakmed >= 0) && (R.leak95 >= 0) && (R.leakmax >= 0)) { + sess->m_timesummary[CPAP_Leak][short(R.leakmax / R.leakgain)]=1; + sess->m_timesummary[CPAP_Leak][short(R.leak95 / R.leakgain)]=9; + sess->m_timesummary[CPAP_Leak][short(R.leakmed / R.leakgain)]=65; + sess->m_timesummary[CPAP_Leak][0]=25; + } + + + // Find the matching date group for this record + QMap >::iterator dtit = loader->strdate.find(R.date); + + // should not be possible, but my brain hurts... + Q_ASSERT(dtit != loader->strdate.end()); + + if (dtit != loader->strdate.end()) { + QList & dayrecs = dtit.value(); + bool hasdatasess=false; + EventDataType time=0, totaltime=0; + + for (int c=0; c < dayrecs.size(); ++c) { + STRRecord *r = dayrecs[c]; + if (r->sessionid > 0) { + // get complicated.. calculate all the counts for valid sessions, and use the summary to make up the rest + hasdatasess = true; + } + totaltime += r->maskoff - r->maskon; + } + + + if (!hasdatasess) { + for (int c=0; c < dayrecs.size(); ++c) { + STRRecord *r = dayrecs[c]; + time = r->maskoff - r->maskon; + float ratio = time / totaltime; + + // Add the time weighted proportion of the events counts + if (r->ai >= 0) { + sess->setCount(CPAP_Obstructive, r->ai * ratio); + sess->setCph(CPAP_Obstructive, (r->ai * ratio) / (time / 3600.0)); + } + if (r->uai >= 0) { + sess->setCount(CPAP_Apnea, r->uai * ratio); + sess->setCph(CPAP_Apnea, (r->uai * ratio) / (time / 3600.0)); + } + if (r->hi >= 0) { + sess->setCount(CPAP_Hypopnea, r->hi * ratio); + sess->setCph(CPAP_Hypopnea, (r->hi * ratio) / (time / 3600.0)); + } + if (r->cai >= 0) { + sess->setCount(CPAP_ClearAirway, r->cai * ratio); + sess->setCph(CPAP_ClearAirway, (r->ai * ratio) / (time / 3600.0)); + } + + } + + } + } + + loader->saveMutex.lock(); + mach->AddSession(sess, p_profile); + sess->Store(p_profile->Get(mach->properties[STR_PROP_Path])); + loader->saveMutex.unlock(); +} + long event_cnt = 0; @@ -1240,12 +1346,9 @@ int ResmedLoader::Open(QString path, Profile *profile) // strsess end can change above. end = strsess.end(); - m->lockSaveMutex(); - m->setTotalTasks(m->totalTasks() + size); - m->unlockSaveMutex(); - - m->StartSaveThreads(); - +// m->lockSaveMutex(); +// m->setTotalTasks(m->totalTasks() + size); +// m->unlockSaveMutex(); ///////////////////////////////////////////////////////////////////////////////////////////// // Scan through unmatched strsess records, and attempt to get at summary data @@ -1259,120 +1362,16 @@ int ResmedLoader::Open(QString path, Profile *profile) } //Q_ASSERT(R.sessionid == 0); + // the following should not happen if (R.sessionid > 0) { m->skipSaveTask(); continue; } - -// if ((++cnt % 5) == 0) { -// // TODO: Change me to emit once MachineLoader is QObjectified... -// if (qprogress) { qprogress->setValue(10.0 + (float(cnt) / float(size) * 90.0)); } - -// QApplication::processEvents(); -// } - - sess = new Session(m, R.maskon); - - sess->really_set_first(qint64(R.maskon) * 1000L); - sess->really_set_last(qint64(R.maskoff) * 1000L); - - // Claim this record for future imports - sess->settings[RMS9_MaskOnTime] = R.maskon; - sess->settings[CPAP_SummaryOnly] = true; - - sess->SetChanged(true); - - // First take the settings - if (R.set_pressure >= 0) - sess->settings[CPAP_Pressure] = R.set_pressure; - if (R.min_pressure >= 0) sess->settings[CPAP_PressureMin] = R.min_pressure; - if (R.max_pressure >= 0) sess->settings[CPAP_PressureMax] = R.max_pressure; - if (R.ps >= 0) sess->settings[CPAP_PS] = R.ps; - if (R.min_ps >= 0) sess->settings[CPAP_PSMin] = R.min_ps; - if (R.max_ps >= 0) sess->settings[CPAP_PSMax] = R.max_ps; - if (R.epap >= 0) sess->settings[CPAP_EPAP] = R.epap; - if (R.max_epap >= 0) sess->settings[CPAP_EPAPHi] = R.max_epap; - if (R.min_epap >= 0) sess->settings[CPAP_EPAPLo] = R.min_epap; - if (R.ipap >= 0) sess->settings[CPAP_IPAP] = R.ipap; - if (R.max_ipap >= 0) sess->settings[CPAP_IPAPHi] = R.max_ipap; - if (R.min_ipap >= 0) sess->settings[CPAP_IPAPLo] = R.min_ipap; - if (R.mode >= 0) sess->settings[CPAP_Mode] = R.mode; - if (R.epr >= 0) { - sess->settings[RMS9_EPR] = R.epr; - sess->settings[CPAP_PresReliefType] = (int)PR_EPR; - } - if (R.epr_set >= 0) { - sess->settings[RMS9_EPRSet] = R.epr_set; - sess->settings[CPAP_PresReliefSet] = R.epr_set; - } - if (R.leakmax >= 0) sess->setMax(CPAP_Leak, R.leakmax); - if (R.leakmax >= 0) sess->setMin(CPAP_Leak, 0); - if ((R.leakmed >= 0) && (R.leak95 >= 0) && (R.leakmax >= 0)) { - sess->m_timesummary[CPAP_Leak][short(R.leakmax / R.leakgain)]=1; - sess->m_timesummary[CPAP_Leak][short(R.leak95 / R.leakgain)]=9; - sess->m_timesummary[CPAP_Leak][short(R.leakmed / R.leakgain)]=65; - sess->m_timesummary[CPAP_Leak][0]=25; - } - - - // Find the matching date group for this record - QMap >::iterator dtit = strdate.find(R.date); - - // should not be possible, but my brain hurts... - Q_ASSERT(dtit != strdate.end()); - - if (dtit != strdate.end()) { - QList & dayrecs = dtit.value(); - bool hasdatasess=false; - EventDataType time=0, totaltime=0; - - for (int c=0; c < dayrecs.size(); ++c) { - STRRecord *r = dayrecs[c]; - if (r->sessionid > 0) { - // get complicated.. calculate all the counts for valid sessions, and use the summary to make up the rest - hasdatasess = true; - } - totaltime += r->maskoff - r->maskon; - } - - - if (!hasdatasess) { - for (int c=0; c < dayrecs.size(); ++c) { - STRRecord *r = dayrecs[c]; - time = r->maskoff - r->maskon; - float ratio = time / totaltime; - - // Add the time weighted proportion of the events counts - if (r->ai >= 0) { - sess->setCount(CPAP_Obstructive, r->ai * ratio); - sess->setCph(CPAP_Obstructive, (r->ai * ratio) / (time / 3600.0)); - } - if (r->uai >= 0) { - sess->setCount(CPAP_Apnea, r->uai * ratio); - sess->setCph(CPAP_Apnea, (r->uai * ratio) / (time / 3600.0)); - } - if (r->hi >= 0) { - sess->setCount(CPAP_Hypopnea, r->hi * ratio); - sess->setCph(CPAP_Hypopnea, (r->hi * ratio) / (time / 3600.0)); - } - if (r->cai >= 0) { - sess->setCount(CPAP_ClearAirway, r->cai * ratio); - sess->setCph(CPAP_ClearAirway, (r->ai * ratio) / (time / 3600.0)); - } - - } - - } - } - - - - m->AddSession(sess, profile); - m->queSaveList(sess); + queTask(new ResmedImportStage2(this, R, m)); } - m->FinishSaveThreads(); + runTasks(); #ifdef DEBUG_EFFICIENCY { diff --git a/sleepyhead/SleepLib/loader_plugins/resmed_loader.h b/sleepyhead/SleepLib/loader_plugins/resmed_loader.h index 51e63561..0264fa41 100644 --- a/sleepyhead/SleepLib/loader_plugins/resmed_loader.h +++ b/sleepyhead/SleepLib/loader_plugins/resmed_loader.h @@ -311,6 +311,19 @@ protected: Machine * mach; }; +class ResmedImportStage2:public ImportTask +{ +public: + ResmedImportStage2(ResmedLoader * l, STRRecord r, Machine * m): loader(l), R(r), mach(m) {} + virtual ~ResmedImportStage2() {} + virtual void run(); + +protected: + ResmedLoader * loader; + STRRecord R; + Machine * mach; +}; + /*! \class ResmedLoader \brief Importer for ResMed S9 Data @@ -318,6 +331,7 @@ protected: class ResmedLoader : public MachineLoader { friend class ResmedImport; + friend class ResmedImportStage2; public: ResmedLoader(); virtual ~ResmedLoader(); diff --git a/sleepyhead/SleepLib/machine.cpp b/sleepyhead/SleepLib/machine.cpp index dbe10bb7..734b53bd 100644 --- a/sleepyhead/SleepLib/machine.cpp +++ b/sleepyhead/SleepLib/machine.cpp @@ -428,8 +428,9 @@ void Machine::StartSaveThreads() m_totaltasks=0; for (int i = 0; i < threads; i++) { - thread.push_back(new SaveThread(this, path)); - QObject::connect(thread[i], SIGNAL(UpdateProgress(int)), qprogress, SLOT(setValue(int))); + SaveThread * thr = new SaveThread(this, path); + QObject::connect(thr, SIGNAL(UpdateProgress(int)), qprogress, SLOT(setValue(int))); + thread.push_back(thr); thread[i]->start(); }