winIDEA SDK
Loading...
Searching...
No Matches
data_recorder_with_daq.py
# This script is licensed under BSD License, see file LICENSE.txt, or search for `License` in the SDK online help.
# (c) TASKING Germany GmbH, 2023
#
# This script demonstrates recording of variables using fast data
# acquisition (DAQ) approach. The recorded data is written to CSV
# file. If the 'openpyxl' module is installed, the data is also
# written to XLSX file. If 'pylab' module is installed, the acquired
# data is also plotted.
#
# All the above functionality is grouped into functions, so you can
# easily take out only part of the script. This script can also be
# imported as a module to user's scripts, so it is easy to reuse
# functions found here.
import sys
import csv
import time
import isystem.connect as ic
winidea_id = ''
try:
import openpyxl
isOpenPyXLInstalled = True
except (ImportError) as ex:
isOpenPyXLInstalled = False
try:
import pylab as plab
isPyLabInstalled = True
except (ImportError) as ex:
isPyLabInstalled = False
def printHelp():
print("Usage: dataRecorderWithDAQ.py <variableName1 samplingRateInSeconds1> ...")
print("")
print(" If 'samplingRateInSeconds' == 0, the smallest possible sampling")
print(" time is used. Supported values: 0.001, 0.01, 0.1, 1 second.")
print(" See CDAQController::EDAQSampligFlags for all possible")
print(" values.")
print("")
print("")
print("Example: dataRecorderWithDAQ.py main_loop_counter 0.1 g_baseStruct.i_base 0.01")
print(" records main_loop_counter each 100 ms, and g_baseStruct.i_base with 10 ms period.")
def initTarget(cmgr):
"""
This function initializes the target. Customize it according to
your needs.
"""
debugCtrl = ic.CDebugFacade(cmgr)
debugCtrl.download()
debugCtrl.runUntilFunction("main")
debugCtrl.waitUntilStopped()
debugCtrl.run()
return debugCtrl
def realTimeToDAQTimeFlags(samplTime):
"""
Converts the given sampling time in seconds to EDAQSamplingFlags.
"""
if samplTime < 0.001:
return ic.CDAQController.daqSampleMax
elif samplTime < 0.01:
return ic.CDAQController.daqSample1ms
elif samplTime < 0.1:
return ic.CDAQController.daqSample10ms
elif samplTime < 1:
return ic.CDAQController.daqSample100ms
else:
return ic.CDAQController.daqSample1s
def recordVariables(cmgr,
debugCtrl,
variables,
recordingTimeInSeconds,
fileName,
isRecordToMemory,
isPrintToStdOut):
"""
This function reads varibles and writes them to CSV file.
If isRecordToMemory == False, the amount of data that can be recorded is
limited by the free disk size.
Parameters:
debugCtrl - iSYSTEM's debugCtrl controller, which provides access to target
variables - array of two element arrays, where the first element contains
variable name and the second element contains sampling interval,
for example: [['main_loop_counter', 0.1], ['g_baseStruct.i_base', 0]].
Sampling interval set to 0 means the smallest interval possible.
recordingTimeInSeconds - how long to record the data, in seconds. Recording
can also be terminated by pressing the 'q' key
fileName - name of the output CSV file.
isRecordToMemory - if True, then data is also stored into memory
array and returned as function return value. Be
aware of memory usage in this case. If false,
an empty list is returned.
isPrintToStdOut - if True, each row is printed to stdout during recording
Returns:
List of rows, where each row is a list containing time stamp and recorded
values in the same order as variable names were specified. Example for
two samples of three variables::
[[0, 23, 45, 4.35],
[0.1, 24, -525, 1.78]
]
"""
daqCtrl = ic.CDAQController(cmgr)
# check if DAQ system is available
daqInfo = daqCtrl.info()
if daqInfo.getMaxItems() == 0:
raise Exception("Data Acquisition (DAQ) system is not available.")
print('MaxItems = ', daqInfo.getMaxItems())
daqVariables = ic.DAQConfigVector()
for varSamplData in variables:
varName = varSamplData[0]
samplTime = realTimeToDAQTimeFlags(varSamplData[1])
if varName.startswith('0x'):
# Direct memory address reading (32bit variable)
memAddr = int(varName)
daqVariables.append(ic.CDAQConfigItem(4, 0, memAddr, samplTime))
else:
# Reading a regular variable
daqVariables.append(ic.CDAQConfigItem(varName, samplTime))
# note the time of the DAQ system
daqTimeStart = daqCtrl.status().getTime()
daqCtrl.configure(daqVariables)
# enable DAQ on the entire SoC
daqCtrl.enableGlobal(True)
startTime = time.time()
endTime = startTime + recordingTimeInSeconds
numVars = len(variables)
recordedData = []
with open(fileName, 'w') as csvFile:
# add parameter dialect = 'excel' for Excel specific output
# See also http://docs.python.org/library/csv.html
# for configuring CSV output
csvWriter = csv.writer(csvFile)
varNames = []
for varData in variables:
varNames.append(varData[0])
csvWriter.writerow(['Time'] + varNames)
sampleCounter = 0
lastTime = 0
row = [''] * (1 + numVars)
while time.time() < endTime:
daqStatus = daqCtrl.status()
# if any sample is available, display the status and print the samples
if daqStatus.getNumSamplesAvailable() > 0:
# print 'Last DAQ acquisition time = ', daqStatus.getLastLoopTime()
if daqStatus.getOverflow():
print('SAMPLING OVERFLOW!')
# read available samples into daqSamples
daqSamples = ic.DAQSampleVector()
daqCtrl.read(daqSamples)
# print 'Number of samples = ', daqSamples.size()
for daqSample in daqSamples:
sampleTime = daqSample.getTime() - daqTimeStart
columnIndex = daqSample.getIndex()+1
var = daqCtrl.getDataValue(daqSample)
if var.isTypeUnsigned() or var.isTypeSigned():
value = var.getLong()
elif var.isTypeFloat():
value = var.getDouble()
elif var.isTypeAddress():
value = var.getAddress().m_aAddress
elif var.isTypeCompound():
value = 'Struct'
# new time found - writing
if (sampleTime != lastTime):
# Write the last row of data if this is not the first
# data received
if (lastTime != 0):
csvWriter.writerow(row)
if isPrintToStdOut:
print(row)
if isRecordToMemory:
recordedData.append(row)
#row[:] = ''
row = [''] * (numVars+1)
row[0] = sampleTime
# Remember what time was last used to fill rhe row of data
lastTime = sampleTime
row[columnIndex] = value
# And the last line of data
if (lastTime != 0):
csvWriter.writerow(row)
if isPrintToStdOut:
print(row)
if isRecordToMemory:
recordedData.append(row)
return recordedData
def writeDataToXLSX(fileName, data, expressions):
book = openpyxl.Workbook()
sheet = book.create_sheet()
sheet.title = 'Variables'
# write header
sheet.append(['Time'] + expressions)
for row in data:
sheet.append(row)
book.save(fileName)
def main():
if len(sys.argv) < 2:
printHelp()
return
samplingInfo = []
varNames = []
# create (varName, samplingTime) pairs from params in command line
# Example input parameters:
# a_my_signal 0.01 a_my_int 0 a_my_char 1 DigitalIn.DIN0 0.1 AnalogIn.AIN1 0.1 DigitalIn.DIN125
# For digital mask testing:
# DigitalIn.DIN 1 DigitalIn.DIN2 1 DigitalIn.DIN123 1 DigitalIn.DIN765 1 DigitalIn.DIN23456 1
for idx in range(1, len(sys.argv), 2):
varName = sys.argv[idx]
varNames.append(varName)
samplingInfo.append([varName, float(sys.argv[idx + 1])])
cmgr = ic.ConnectionMgr()
cmgr.initLogger('daq', 'daqExample.log', ic.CLogger.PYTHON)
cmgr.connect(ic.CConnectionConfig().instanceId(winidea_id))
if not cmgr.isAttached():
print("The connection to winIDEA has not been established - exiting script.")
sys.exit(-1)
else:
print("Established connection to winIDEA.")
debugCtrl = initTarget(cmgr)
# Here you can modify recording time and file name.
recordingTimeInSeconds = 3 # after this time the script will stop
filePrefix = 'daqData'
isRecordToMemory = True
isPrintToStdOut = True
print('Recording ...')
daqResults = recordVariables(cmgr,
debugCtrl,
samplingInfo,
recordingTimeInSeconds,
filePrefix + '.csv',
isRecordToMemory,
isPrintToStdOut)
if isOpenPyXLInstalled and daqResults:
writeDataToXLSX(filePrefix + '.xlsx', daqResults, varNames)
if isPyLabInstalled:
lineTypes = ['k', 'r', 'g', 'b', 'k:', 'r:', 'g:', 'b:']
# create array of strings, since empty values are stored as empty strings
data = plab.array(daqResults).astype('S12')
plotIdx = 1
for varName in varNames:
plab.subplot(len(varNames), 1, plotIdx)
# filters all lines, where data is not available - if we have
# different sampling times for variables, then the less frequently
# sampled variables have no values.
linesData = data[data[:, plotIdx] != b'']
times = linesData[:, 0] # the first column is time
plab.plot(times, linesData[:, plotIdx], lineTypes[(plotIdx - 1)% len(lineTypes)])
plab.ylabel(varName)
plotIdx += 1
plab.xlabel('t[s]')
plab.show()
print('Done!')
if __name__ == '__main__':
main()