// DataAcquisitionThread.cpp : implementation file // #include "stdafx.h" #include "DasWin32.h" #include "DasWin32Version.h" #include "DataAcquisitionThread.h" #include "dasinit.h" #include "commlib.h" extern CString TextOutString; extern PORT *Port[MAX_PORTS]; extern struct DasComPortInfo Com; #ifdef _DEBUG #define new DEBUG_NEW #undef THIS_FILE static char THIS_FILE[] = __FILE__; #endif CRITICAL_SECTION CDataAcquisitionThread::m_csGDILock; HANDLE CDataAcquisitionThread::m_hAnotherDead = CreateEvent(NULL, FALSE, FALSE, NULL); ///////////////////////////////////////////////////////////////////////////// // CDataAcquisitionThread IMPLEMENT_DYNAMIC(CDataAcquisitionThread, CWinThread) CDataAcquisitionThread::CDataAcquisitionThread(CWnd* pWnd, HDC hDC) { m_bAutoDelete = FALSE; m_pMainWnd = pWnd; m_pMainWnd->GetClientRect(&m_rectBorder); m_hDC = hDC; m_hEventKill = CreateEvent( NULL, TRUE, FALSE, NULL ); m_hEventDead = CreateEvent( NULL, TRUE, FALSE, NULL ); } CDataAcquisitionThread::~CDataAcquisitionThread() { CloseHandle(m_hEventKill); CloseHandle(m_hEventDead); } #include #include #include #include #include #include #include #include #include #include #include extern "C" int IsFieldANumber(char *); extern "C" void RemoveAlphaBetics(char *); extern "C" int IsFieldPrintable(char *); extern "C" int WriteToLog(char *,char *); extern "C" void * dynamic_array(size_t,size_t,size_t,size_t,size_t,size_t); extern "C" void free_array(int,int,int,int,void *); extern "C" void wdt(unsigned short,int); extern "C" void kill_wdt(unsigned short); extern "C" int GetOphirString(PORT *,char *,int,char ,int,int); extern "C" void GetOphirConstants(PORT *,char *,int,char *,SYSTEMTIME *); extern CDataAcquisitionThread* m_threadList; extern CString PortOpenOutString,PortErrorOutString,StatsOutString, StatusOutString,DataOutString,BufferOutString, LastTimeOutString,NextTimeOutString; BOOL CDataAcquisitionThread::InitInstance() { char buf[MAX_WORKING_STRING_LEN],OutputFileName[MAX_WORKING_STRING_LEN], OphirString[MAX_WORKING_STRING_LEN],TmpOphirString[MAX_WORKING_STRING_LEN], WorkTok[MAX_WORKING_STRING_LEN],**PortBuffer,*SampleTok,*FieldTok, *SecondSample,*SecondFromLastSample,SampleDelimit[5], Delimit[40]={" !\"#$%&'()*,/:;<=>?@[\\]^_`{|}~\n\r\0"}; int i=0,j=0,k=0,NumberOfNulls=1,EndTimeIndex=0,exist=0,PortError=0,FirstTime=1,ParsedFields=0, ComSuccess=0,ComFailure=0,ComFlow=0,ComOverflow=0,StopThread=0,NeedToDoStats=0; unsigned short MaxRxBufferSize=65000,MaxTxBufferSize=25; unsigned int BlockSize=(unsigned int)RX_BLOCK_SIZE; long int *PortBufferIndex,*CurrentBufferSize,*GotData,*GotOverflow, BufferSize=(long)BUFFER_SIZE,FieldIndex,count=0,NumSamp=0, NumberOfSamplesPerField[MAX_TOKENS],MissingDataFields[MAX_TOKENS], NonPrintableFields[MAX_TOKENS],NonNumericFields[MAX_TOKENS], NonParsableFields[MAX_TOKENS],Space=0; time_t now=0,start=0,end=0,PeriodInSeconds=0,ScreenUpdate=0; double FieldValue[MAX_TOKENS],Sum[MAX_TOKENS],SumSquared[MAX_TOKENS],Avg=0.,AvgSumSquared=0.,Stdev=0.,Var=0.; struct tm tp,worktp,tend; SYSTEMTIME *StartTime,*EndTime,OphirStartTime; FILE *f1,*f2; // output initial acquisition and statistics status remarks to screen // for(i=0;istatus==ASSUCCESS) { if(ComSuccess) PortOpenOutString+=","; if(PortOpenOutString.Find("None")!=-1) PortOpenOutString=buf; else PortOpenOutString+=buf; ComSuccess++; } else { PortError++; if(Port[i]->status==ASNOMEMORY) { sprintf(buf,"05 PortOpenMSWin32 error on COM%d...Out of memory. ",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(Port[i]->status==ASINVPAR) { sprintf(buf,"06 PortOpenMSWin32 error on COM%d...Invalid Arguments in the call.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(Port[i]->status==ASINVPORT) { sprintf(buf,"07 PortOpenMSWin32 error on COM%d...Specified port is invalid or not open.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(Port[i]->status==ASINUSE) { sprintf(buf,"08 PortOpenMSWin32 error on COM%d...Port already in use.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(Port[i]->status==ASNO8250) { sprintf(buf,"09 PortOpenMSWin32 error on COM%d...No 8250 UART at the requested location.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(Port[i]->status==ASNOTSETUP) { sprintf(buf,"10 PortOpenMSWin32 error on COM%d...Port has not been initialized or opened.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(Port[i]->status==ASILLEGALPARITY) { sprintf(buf,"11 PortOpenMSWin32 error on COM%d...Parity specification not leagal for this driver.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(Port[i]->status==ASILLEGALWORDLENGTH) { sprintf(buf,"12 PortOpenMSWin32 error on COM%d...Data bit length not leagal for this driver.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(Port[i]->status==ASILLEGALSTOPBITS) { sprintf(buf,"13 PortOpenMSWin32 error on COM%d...Number of stop bits specified not leagal for this driver.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(Port[i]->status==ASILLEGALBAUDRATE) { sprintf(buf,"14 PortOpenMSWin32 error on COM%d...Baud rate specified is not leagal for this driver.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(Port[i]->status==ASGENERALERROR) { sprintf(buf,"15 PortOpenMSWin32 error on COM%d...General error, non-specific.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); PortError++; } if(PortError) { if(ComFailure) PortErrorOutString+=","; sprintf(buf,"COM%d",Com.PortNumber[i]); if(PortErrorOutString.Find("None")!=-1) PortErrorOutString=buf; else PortErrorOutString+=buf; ComFailure++; } } EnterCriticalSection(&CDataAcquisitionThread::m_csGDILock); { if(!ComSuccess) m_dc.SetTextColor(RGB(255,0,0)); else m_dc.SetTextColor(RGB(0,100,0)); m_dc.TextOut(160,92,PortOpenOutString); if(PortError) m_dc.SetTextColor(RGB(255,0,0)); else m_dc.SetTextColor(RGB(0,100,0)); m_dc.TextOut(160,108,PortErrorOutString); GdiFlush(); } LeaveCriticalSection(&CDataAcquisitionThread::m_csGDILock); } _amblksiz=(unsigned int)HEAP_BLOCK_SIZE; // allocate memory for local pointers if((PortBufferIndex=(long *)dynamic_array(Com.NumPorts,0,0,0,sizeof(long),sizeof(long *)))==NULL) { AfxMessageBox("Memory allocation error for array PortBufferIndex in " "CDataAcquisitionThread::CDataAcquisitionThread().\n",MB_OK, 0 ); exit(-1); } if((GotData=(long *)dynamic_array(Com.NumPorts,0,0,0,sizeof(long),sizeof(long *)))==NULL) { AfxMessageBox("Memory allocation error for array GotData in " "CDataAcquisitionThread::CDataAcquisitionThread().\n",MB_OK, 0 ); exit(-1); } if((GotOverflow=(long *)dynamic_array(Com.NumPorts,0,0,0,sizeof(long),sizeof(long *)))==NULL) { AfxMessageBox("Memory allocation error for array GotOverflow in " "CDataAcquisitionThread::CDataAcquisitionThread().\n",MB_OK, 0 ); exit(-1); } if((CurrentBufferSize=(long *)dynamic_array(Com.NumPorts,0,0,0,sizeof(long),sizeof(long *)))==NULL) { AfxMessageBox("Memory allocation error for array CurrentBufferSize in " "CDataAcquisitionThread::CDataAcquisitionThread().\n",MB_OK, 0 ); exit(-1); } if((PortBuffer=(char **)dynamic_array(Com.NumPorts,BufferSize,0,0,sizeof(char),sizeof(char **)))==NULL) { AfxMessageBox("Memory allocation error for array PortBuffer in" "CDataAcquisitionThread::CDataAcquisitionThread().\n",MB_OK, 0 ); exit(-1); } if((StartTime=(SYSTEMTIME *)dynamic_array(Com.NumPorts,0,0,0,sizeof(SYSTEMTIME),sizeof(SYSTEMTIME *)))==NULL) { AfxMessageBox("Memory allocation error for array StartTime in" "CDataAcquisitionThread::CDataAcquisitionThread().\n",MB_OK, 0 ); exit(-1); } if((EndTime=(SYSTEMTIME *)dynamic_array(Com.NumPorts,0,0,0,sizeof(SYSTEMTIME ),sizeof(SYSTEMTIME *)))==NULL) { AfxMessageBox("Memory allocation error for array StopTime in" "CDataAcquisitionThread::CDataAcquisitionThread().\n",MB_OK, 0 ); exit(-1); } time(&now); tp=*(gmtime(&now)); end=now-((tp.tm_min*60)+tp.tm_sec); PeriodInSeconds=Com.DiskWriteAndStatsFrequencyInMinutes*60; while(end<=now) { end+=PeriodInSeconds; time(&now); } tend=*(gmtime(&end)); for(i=0;i0) wdt((unsigned short)Com.WatchDogTimerAddressInHex,(int)REBOOT_TIME); for(i=0;iMaxRxBufferSize) { sprintf(buf,"16 Buffer overflow on COM%d.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); GotOverflow[i]++; } if(Space>0) GotData[i]++; if(Space>=(long)BlockSize) { while((PortBufferIndex[i]+(long)BlockSize)>CurrentBufferSize[i]) { if((PortBuffer[i]=(char *)realloc(PortBuffer[i],CurrentBufferSize[i]+(BufferSize*sizeof(char))))==NULL) { sprintf(buf,"17 Realloc error: NULL returned while allocating memory for PortBuffer[%d].",i); WriteToLog(Com.OutputDriveAndPath,buf); } CurrentBufferSize[i]+=BufferSize*sizeof(char); sprintf(buf,"18 Dynamic memory reallocation: COM%d total size: %ld",Com.PortNumber[i],CurrentBufferSize[i]); WriteToLog(Com.OutputDriveAndPath,buf); } ReadBuffer(Port[i],&PortBuffer[i][PortBufferIndex[i]],BlockSize); PortBufferIndex[i]+=Port[i]->count; } } // update status screen with time,data received and buffer overflow infromation if(time(&now)>=ScreenUpdate) { worktp=*(gmtime(&ScreenUpdate)); DataOutString=BufferOutString=""; sprintf(buf,"%02d-%02d-%02d %02d:%02d UTC ",worktp.tm_mon+1, worktp.tm_mday,worktp.tm_year,worktp.tm_hour,worktp.tm_min); LastTimeOutString=buf; ScreenUpdate=time(&now)+(time_t)SCREEN_UPDATE_INTERVAL_IN_MIN*60; worktp=*(gmtime(&ScreenUpdate)); sprintf(buf,"%02d-%02d-%02d %02d:%02d UTC ",worktp.tm_mon+1, worktp.tm_mday,worktp.tm_year,worktp.tm_hour,worktp.tm_min); NextTimeOutString=buf; for(i=0;i=end)||(WaitForSingleObject(m_hEventKill, 0)==WAIT_OBJECT_0)) { if(WaitForSingleObject(m_hEventKill, 0)==WAIT_OBJECT_0) { end=time(&now); StopThread++; } for(i=0;i0) { if(Space>MaxRxBufferSize) { sprintf(buf,"16 Buffer overflow on COM%d.",Com.PortNumber[i]); WriteToLog(Com.OutputDriveAndPath,buf); GotOverflow[i]++; } while((PortBufferIndex[i]+(long)BlockSize+(long)NumberOfNulls)>CurrentBufferSize[i]) { if((PortBuffer[i]=(char *)realloc(PortBuffer[i],CurrentBufferSize[i]+(BufferSize*sizeof(char))))==NULL) { sprintf(buf,"17 Realloc error: NULL returned while allocating memory for PortBuffer[%d].",i); WriteToLog(Com.OutputDriveAndPath,buf); } CurrentBufferSize[i]+=BufferSize*sizeof(char); sprintf(buf,"18 Dynamic memory reallocation: COM%d total size: %ld",Com.PortNumber[i],CurrentBufferSize[i]); WriteToLog(Com.OutputDriveAndPath,buf); } ReadBuffer(Port[i],&PortBuffer[i][PortBufferIndex[i]],BlockSize); PortBufferIndex[i]+=Port[i]->count; } GetSystemTime(&EndTime[i]); for(j=0;j0) { sprintf(OutputFileName,"%s\\%02d%03d%02d.P%02d",Com.OutputDriveAndPath, tp.tm_year,tp.tm_yday+1,tp.tm_hour,Com.PortNumber[i]); exist=_access(OutputFileName,0); if(!exist) _chmod(OutputFileName,_S_IWRITE); if((f1=_fsopen(OutputFileName,"at+",_SH_DENYWR))==NULL) { _chmod(OutputFileName,_S_IREAD); sprintf(buf,"19 Can't open %s for data output.",OutputFileName); WriteToLog(Com.OutputDriveAndPath,buf); } else { sprintf(buf,"20 Disk I/O: Write to %s.",OutputFileName); WriteToLog(Com.OutputDriveAndPath,buf); if(exist==-1) { fprintf(f1,"%s\n",Com.InstrumentType[i]); if(Com.AcquisitionInitializationFunction[i]==GET_OPHIR_CONSTANTS) fprintf(f1,"%s",OphirString); } if((exist==-1)||FirstTime) { fprintf(f1,"%02d/%02d/%02d %02d:%02d:%02d:%03d StartTime", StartTime[i].wMonth,StartTime[i].wDay, StartTime[i].wYear-(StartTime[i].wYear/100*100), StartTime[i].wHour,StartTime[i].wMinute, StartTime[i].wSecond,StartTime[i].wMilliseconds); if(PortBuffer[i][0]!='\n') fprintf(f1,"\n"); } fprintf(f1,"%s",PortBuffer[i]); if(StopThread||(tend.tm_min==0)) { if(PortBuffer[i][PortBufferIndex[i]-1]!='\n') fprintf(f1,"\n"); fprintf(f1,"%02d/%02d/%02d %02d:%02d:%02d:%03d EndTime\n", EndTime[i].wMonth,EndTime[i].wDay, EndTime[i].wYear-(EndTime[i].wYear/100*100), EndTime[i].wHour,EndTime[i].wMinute, EndTime[i].wSecond,EndTime[i].wMilliseconds); } fclose(f1); _chmod(OutputFileName,_S_IREAD); } } else { sprintf(buf,"21 No data received from COM%d " "during the period %02d:%02d:%02d to %02d:%02d:%02d", Com.PortNumber[i],StartTime[i].wHour,StartTime[i].wMinute, StartTime[i].wSecond,EndTime[i].wHour,EndTime[i].wMinute, EndTime[i].wSecond); WriteToLog(Com.OutputDriveAndPath,buf); } } // do string parsing and statistics if(NeedToDoStats) { WriteToLog(Com.OutputDriveAndPath,"24 Calculating diagnostic statistics."); sprintf(OutputFileName,"%s\\%02d%03d%02d.sta",Com.OutputDriveAndPath, tp.tm_year,tp.tm_yday+1,tp.tm_hour,Com.PortNumber[i]); if(!_access(OutputFileName,0)) _chmod(OutputFileName,_S_IWRITE); if((f2=_fsopen(OutputFileName,"at+",_SH_DENYWR))==NULL) { _chmod(OutputFileName,_S_IREAD); sprintf(buf,"19 Can't open %s for data output.",OutputFileName); WriteToLog(Com.OutputDriveAndPath,buf); } else { if(Com.NumPorts>0) EndTimeIndex=Com.NumPorts-1; else EndTimeIndex=0; tend=*(gmtime(&end)); tp=*(gmtime(&start)); fprintf(f2,"\n NOAA/ETL DasWin32 Version %s\n" " Data Acquisition Statistics\n" " Start Time: %02d/%02d/%02d %02d:%02d:%02d | End Time: %02d/%02d/%02d %02d:%02d:%02d\n\n" " Instrument Parameter Number of Samples Standard Missing Non-Printable Non-Parsable Non-Digit\n" " Name Name In Statistics Mean Deviation Samples Samples Samples Samples\n" "------------------------------------------------------------------------------------------------------------------------------\n", VERSION_NUMBER,tp.tm_mon+1,tp.tm_mday,tp.tm_year,tp.tm_hour,tp.tm_min,tp.tm_sec, tend.tm_mon+1,tend.tm_mday,tend.tm_year,tend.tm_hour,tend.tm_min,tend.tm_sec); for(j=0;jMAX_WORKING_STRING_LEN) { for(i=0;i=(unsigned int)Com.LeadingTokenCharactersToStrip[j]) strcpy(FieldTok,FieldTok+Com.LeadingTokenCharactersToStrip[j]); if(FieldIndex>=MAX_TOKENS) { for(i=0;i0) kill_wdt((unsigned short)Com.WatchDogTimerAddressInHex); // Close all comm ports for( i = 0; i < Com.NumPorts; i++ ) { if(PortClose(Port[i])!=ASSUCCESS) { sprintf(buf," Can't close COM%d\n",Com.PortNumber[i]); TextOutString+=buf; } } PortOpenOutString=PortErrorOutString=DataOutString= BufferOutString=LastTimeOutString=NextTimeOutString="None "; StatusOutString=StatsOutString="Disabled "; EnterCriticalSection(&CDataAcquisitionThread::m_csGDILock); { m_dc.SetTextColor(RGB(0,100,0)); m_dc.TextOut(160,60,StatusOutString); m_dc.TextOut(160,76,StatsOutString); m_dc.TextOut(160,92,PortOpenOutString); m_dc.TextOut(160,108,PortErrorOutString); m_dc.TextOut(160,124,DataOutString); m_dc.TextOut(160,140,BufferOutString); m_dc.TextOut(160,156,LastTimeOutString); m_dc.TextOut(160,172,NextTimeOutString); GdiFlush(); } LeaveCriticalSection(&CDataAcquisitionThread::m_csGDILock); m_dc.Detach(); WriteToLog(Com.OutputDriveAndPath,"04 Data acquisition thread stopped."); // free local memory if(PortBufferIndex!=NULL) free_array(Com.NumPorts,0,0,0,(long *)PortBufferIndex); if(GotData!=NULL) free_array(Com.NumPorts,0,0,0,(long *)GotData); if(GotOverflow!=NULL) free_array(Com.NumPorts,0,0,0,(long *)GotOverflow); if(CurrentBufferSize!=NULL)free_array(Com.NumPorts,0,0,0,(long *)CurrentBufferSize); if(PortBuffer!=NULL) free_array(Com.NumPorts,BufferSize,0,0,(char **)PortBuffer); if(StartTime!=NULL) free_array(Com.NumPorts,0,0,0,(SYSTEMTIME *)StartTime); if(EndTime!=NULL) free_array(Com.NumPorts,0,0,0,(SYSTEMTIME *)EndTime); return FALSE; } void CDataAcquisitionThread::Delete() { // TODO: perform any per-thread cleanup here CWinThread::Delete(); VERIFY(SetEvent(m_hEventDead)); VERIFY(SetEvent(m_hAnotherDead)); } void CDataAcquisitionThread::KillThread() { VERIFY(SetEvent(m_hEventKill)); SetThreadPriority(THREAD_PRIORITY_ABOVE_NORMAL); WaitForSingleObject(m_hEventDead, INFINITE); WaitForSingleObject(m_hThread, INFINITE); delete this; } BEGIN_MESSAGE_MAP(CDataAcquisitionThread, CWinThread) //{{AFX_MSG_MAP(CDataAcquisitionThread) // NOTE - the ClassWizard will add and remove mapping macros here. //}}AFX_MSG_MAP END_MESSAGE_MAP()