Move database writing from PRS1Loader into ImportContext.

This commit is contained in:
sawinglogz 2021-09-03 15:42:10 -04:00
parent 18e97bb025
commit 947a27b78a
5 changed files with 77 additions and 33 deletions

View File

@ -24,9 +24,9 @@ ImportContext::~ImportContext()
void ImportContext::LogUnexpectedMessage(const QString & message)
{
m_mutex.lock();
m_logMutex.lock();
m_unexpectedMessages += message;
m_mutex.unlock();
m_logMutex.unlock();
}
void ImportContext::FlushUnexpectedMessages()
@ -61,6 +61,51 @@ Session* ImportContext::CreateSession(SessionID sid)
return new Session(m_machine, sid);
}
bool ImportContext::AddSession(Session* session)
{
// Make sure the session will be saved.
session->SetChanged(true);
// Update indexes, process waveform and perform flagging.
session->UpdateSummaries();
// Write the session file to disk.
bool ok = session->Store(session->machine()->getDataPath());
if (!ok) {
qWarning() << "Failed to store session" << session->session();
}
// Unload the memory-intensive data now that it's written to disk.
session->TrashEvents();
// TODO: Remove MachineLoader::addSession once all loaders use this.
// Add the session to the database
m_sessionMutex.lock();
m_sessions[session->session()] = session;
m_sessionMutex.unlock();
return ok;
}
bool ImportContext::Commit()
{
bool ok = true;
// TODO: Remove MachineLoader::finishAddingSessions once all loaders use this.
// Using a map specifically so they are inserted in order.
for (auto session : m_sessions) {
bool added = session->machine()->AddSession(session);
if (!added) {
qWarning() << "Session" << session->session() << "was not addded";
ok = false;
}
}
m_sessions.clear();
// TODO: Move what we can from finishCPAPImport into here,
// e.g. Profile::StoreMachines and Machine::SaveSummaryCache.
return ok;
}
ProfileImportContext::ProfileImportContext(Profile* profile)
: m_profile(profile)
@ -80,6 +125,11 @@ QDateTime ProfileImportContext::IgnoreSessionsOlderThan()
Machine* ProfileImportContext::CreateMachineFromInfo(const MachineInfo & info)
{
if (m_machine) {
// TODO: Ultimately a context will probably take MachineInfo as a constructor,
// once all loaders fully populate MachineInfo prior to import.
qWarning() << "ProfileImportContext::CreateMachineFromInfo called more than once for this context!";
}
m_machineInfo = info;
m_machine = m_profile->CreateMachine(m_machineInfo);
return m_machine;

View File

@ -27,6 +27,10 @@ signals:
void importEncounteredUnexpectedData(const MachineInfo & info, QSet<QString> & newMessages);
public:
// Emit the importEncounteredUnexpectedData signal if there are any new messages and clear the list.
// TODO: This will no longer need to be public once a context doesn't get reused between machines.
void FlushUnexpectedMessages();
virtual bool ShouldIgnoreOldSessions() { return false; }
virtual QDateTime IgnoreSessionsOlderThan() { return QDateTime(); }
@ -37,15 +41,25 @@ public:
virtual QString GetBackupPath();
virtual bool SessionExists(SessionID sid);
// Create an in-memory Session object for the importer to fill out.
virtual Session* CreateSession(SessionID sid);
void FlushUnexpectedMessages();
// Write the session to disk and release its memory, adding it to the queue to be committed.
virtual bool AddSession(Session* session);
// Update the database to include all the newly added sessions.
virtual bool Commit();
protected:
QMutex m_mutex;
QMutex m_logMutex;
QSet<QString> m_unexpectedMessages;
MachineInfo m_machineInfo;
Machine* m_machine;
QMutex m_sessionMutex;
QMap<SessionID, Session *> m_sessions;
};

View File

@ -794,11 +794,6 @@ int PRS1Loader::OpenMachine(const QString & path)
runTasks(AppSetting->multithreading());
emit updateMessage(QObject::tr("Finishing up..."));
QCoreApplication::processEvents();
finishAddingSessions();
return tasks;
}
@ -2496,7 +2491,7 @@ void PRS1Import::ImportWaveforms()
void PRS1Import::run()
{
if (ParseSession()) {
SaveSessionToDatabase();
loader->context()->AddSession(session);
}
}
@ -2626,27 +2621,6 @@ QList<PRS1DataChunk *> PRS1Import::ReadWaveformData(QList<QString> & files, cons
}
void PRS1Import::SaveSessionToDatabase(void)
{
// Make sure it's saved
session->SetChanged(true);
// Add the session to the database
loader->addSession(session);
// Update indexes, process waveform and perform flagging
session->UpdateSummaries();
// Save is not threadsafe
loader->saveMutex.lock();
session->Store(session->machine()->getDataPath());
loader->saveMutex.unlock();
// Unload them from memory
session->TrashEvents();
}
QList<PRS1DataChunk *> PRS1Loader::ParseFile(const QString & path)
{
QList<PRS1DataChunk *> CHUNKS;

View File

@ -126,8 +126,6 @@ protected:
CPAPMode importMode(int mode);
//! \brief Parse all the chunks in a single machine session
bool ParseSession(void);
//! \brief Save parsed session data to the database
void SaveSessionToDatabase(void);
//! \brief Cache a single slice from a summary or compliance chunk.
void AddSlice(qint64 chunk_start, PRS1ParsedEvent* e);

View File

@ -732,6 +732,10 @@ int MainWindow::importCPAP(ImportPath import, const QString &message)
int c = import.loader->Open(import.path);
progdlg->setMessage(QObject::tr("Finishing up..."));
QCoreApplication::processEvents();
ctx->Commit();
import.loader->SetContext(nullptr);
delete ctx;
@ -2443,6 +2447,10 @@ void MainWindow::importNonCPAP(MachineLoader &loader)
int res = loader.Open(files);
progress.setMessage(QObject::tr("Finishing up..."));
QCoreApplication::processEvents();
ctx->Commit();
loader.SetContext(nullptr);
delete ctx;