import csv
import time
import isystem.connect as ic
winidea_id = ''
try:
import pylab as pl
isPyLabInstalled = True
except ImportError as ex:
isPyLabInstalled = False
def initTarget(cmgr):
"""
This function initializes the target. Customize it according to
your needs.
"""
debugCtrl = ic.CDebugFacade(cmgr)
debugCtrl.reset()
debugCtrl.runUntilFunction("main")
debugCtrl.waitUntilStopped()
debugCtrl.run()
return debugCtrl
def recordWatchExpressions(debugCtrl,
watches,
recordingTimeInSeconds,
samplingInterval,
fileName,
isRecordToMemory,
isPrintToStdOut):
"""
This function evaluates watch expressions and returns list of results. The
amount of data that can be recorded is limited by the amount of system
memory.
Parameters:
debugCtrl - iSYSTEM's debugCtrl controller, which provides access to target
watches - list of strings with watch expressions, for example
['main_loop_counter,h', 'g_arrayInt[0]']
recordingTime - how long to record the expressions, in
seconds. Recording can also be terminated by
pressing the 'q' key
samplingInterval - defines how much time should pass between samples
isPrintToStdOut - if True, each row is printed to stdout during recording
Returns:
List of rows, where each row is a list containing time stamp and recorded
values in the same order as watch expressions were specified. Example for
three expressions:
[[0, 23, 45, 4.35],
[0.1, 24, -525, 1.78]
]
"""
startTime = time.time()
endTime = startTime + recordingTimeInSeconds
if isPrintToStdOut:
print(['Time'] + watches)
recordedData = []
sampleCounter = 0
while (time.time() < endTime):
currentTime = time.time() - startTime
row = [currentTime]
for expression in watches:
value = debugCtrl.evaluate(ic.IConnectDebug.fRealTime, expression)
row.append(value.getResult())
if isPrintToStdOut:
print(row)
if isRecordToMemory:
recordedData.append(row)
sampleCounter += 1
nextSamplingTime = startTime + samplingInterval * sampleCounter
sleepTime = nextSamplingTime - time.time()
if sleepTime > 0:
time.sleep(sleepTime)
return recordedData
def recordWatchExpressionsToCSV(debugCtrl,
watches,
recordingTimeInSeconds,
samplingInterval,
fileName,
isRecordToMemory,
isPrintToStdOut):
"""
This function evaluates watch expressions and writes them to CSV file.
If isRecordToMemory == False, the amount of data that can be recorded is
limited by the free disk size.
Parameters:
debugCtrl - iSYSTEM's debugCtrl controller, which provides access to target
watches - list of strings with watch expressions, for example
['main_loop_counter,h', 'g_arrayInt[0]']
recordingTime - how long to record the expressions, in
seconds. Recording can also be terminated by
pressing the 'q' key
samplingInterval - defines how much time should pass between samples
fileName - name of the output CSV file.
isRecordToMemory - if True, then data is also stored into memory
array and returned as function return value. Be
aware of memory usage in this case. If false,
an empty list is returned.
isPrintToStdOut - if True, each row is printed to stdout during recording
Returns:
List of rows, where each row is a list containing time stamp and recorded
values in the same order as watch expressions were specified. Example for
three expressions:
[[0, 23, 45, 4.35],
[0.1, 24, -525, 1.78]
]
If isRecordToMemory == False, an empty list is returned.
"""
startTime = time.time()
endTime = startTime + recordingTimeInSeconds
recordedData = []
with open(fileName, 'w') as csvFile:
csvWriter = csv.writer(csvFile)
header = ['Time'] + watches
print(header)
csvWriter.writerow(header)
sampleCounter = 0
while (time.time() < endTime):
currentTime = time.time() - startTime
row = [currentTime]
for expression in watches:
print('expression: ', expression)
value = debugCtrl.evaluate(ic.IConnectDebug.fRealTime, expression)
row.append(value.getResult())
csvWriter.writerow(row)
if isPrintToStdOut:
print(row)
if isRecordToMemory:
recordedData.append(row)
sampleCounter += 1
nextSamplingTime = startTime + samplingInterval * sampleCounter
sleepTime = nextSamplingTime - time.time()
if sleepTime > 0:
time.sleep(sleepTime)
return recordedData
def recordBatch(cmgr,
dbgCtrl,
variables,
recordingTimeInSeconds,
samplingIntervalInSeconds):
"""
This function runs batch access and returns recorded data as a list of
SBatchAccessItemResult structures.
The amount of data that can be recorded is limited by the amount of system
memory.
Parameters:
cmgr - iSYSTEM's connection manager, which provides connection to target
debugCtrl - iSYSTEM's debugCtrl controller, which provides access to target
variables - list of strings with variables, for example
['main_loop_counter', 'g_baseStruct.i_base']
recordingTime - how long to record the expressions, in
seconds.
samplingInterval - defines how much time should pass between samples
Returns:
A list of SBatchAccessItemResult structures, one structure per variable
per time stamp.
"""
numItems = len(variables)
numRuns = int(recordingTimeInSeconds / samplingIntervalInSeconds)
header = ic.SBatchAccessHeader()
header.m_dwFlags = (ic.SBatchAccessHeader.flRealTime |
ic.SBatchAccessHeader.flWantTimeStamp)
header.m_dwNumItems = numItems
header.m_dwNumRuns = numRuns
header.m_qwStartAtTime = 0
header.m_qwRunInterval = int(samplingIntervalInSeconds * 1000000)
ba_items = ic.VectorBatchAccessItem()
itemSizes = []
for var in variables:
item = ic.SBatchAccessItem()
varInfo = dbgCtrl.getSymbolInfo(ic.IConnectDebug.fRealTime, var)
item.m_byFlags = ic.SBatchAccessItem.flRead
item.m_bySize = varInfo.getSizeMAUs()
item.m_byMemArea = varInfo.getMemArea()
item.m_aAddress = varInfo.getAddress()
ba_items.push_back(item)
itemSizes.append(varInfo.getSizeMAUs())
ba_results = ic.VectorBatchAccessResult(numItems * numRuns)
dataCtrl = ic.CDataController(cmgr)
dataCtrl.batchAccess(0, header, ba_items, ba_results)
return ba_results, itemSizes
def bigEndian2Int(numBytes, cArray):
""" Converts big-endian seq. of bytes to integer. """
value = 0
for i in range(numBytes):
value <<= 8
value |= ic.CDataController.getByte(cArray, i)
return value
def batchAccessResultToCSV(fileName, ba_results, itemSizes, variables):
"""
Writes results of batchAccess() to CSV file. Uses big endian conversion.
Parameters:
fileName - then name of the output file
ba_results - results returned by function recordBatch()
itemSizes - results returned by function recordBatch()
"""
numItems = len(itemSizes)
numRuns = int(ba_results.size() / numItems)
firstAccess = ba_results[0].m_qwTimeStamp
with open(fileName, 'w') as csvFile:
csvWriter = csv.writer(csvFile)
csvWriter.writerow(['Time'] + variables)
for runIdx in range(numRuns):
row = [(ba_results[runIdx * numItems].m_qwTimeStamp - firstAccess)/1000000.]
for varIdx in range(numItems):
result = ba_results[runIdx * numItems + varIdx]
if (result.m_byResult == ic.SBatchAccessItemResult.resOK):
value = bigEndian2Int(itemSizes[varIdx], result.m_abyData)
row.append(value)
elif (result.m_byResult == ic.SBatchAccessItemResult.resAccess):
row.append('Can not access. Check access flags in header!')
elif (result.m_byResult == ic.SBatchAccessItemResult.resTimeout):
row.append('Timeout! Probably the sampling rate is invalid!')
else:
row.append('Invalid error code: ' + str(result.m_byResult))
csvWriter.writerow(row)
def batchAccessResultToList(ba_results, itemSizes):
"""
Converts results of batchAccess() to Python list. Uses big endian conversion.
Parameters:
fileName - then name of the output file
ba_results - results returned by function recordBatch()
itemSizes - results returned by function recordBatch()
"""
numItems = len(itemSizes)
numRuns = int(ba_results.size() / numItems)
firstAccess = ba_results[0].m_qwTimeStamp
recordedData = []
for runIdx in range(numRuns):
row = [(ba_results[runIdx * numItems].m_qwTimeStamp - firstAccess)/1000000.]
for varIdx in range(numItems):
result = ba_results[runIdx * numItems + varIdx]
if (result.m_byResult == ic.SBatchAccessItemResult.resOK):
value = bigEndian2Int(itemSizes[varIdx], result.m_abyData)
row.append(value)
elif (result.m_byResult == ic.SBatchAccessItemResult.resAccess):
row.append('Can not access. Check access flags in header!')
elif (result.m_byResult == ic.SBatchAccessItemResult.resTimeout):
row.append('Timeout! Probably the sampling rate is invalid!')
else:
row.append('Invalid error code: ' + str(result.m_byResult))
recordedData.append(row)
return recordedData
def writeDatatoCSV(fileName, data, expressions):
with open(fileName, 'w', neline='') as csvFile:
csvWriter = csv.writer(csvFile)
csvWriter.writerow(['Time'] + expressions)
for row in data:
csvWriter.writerow(row)
def main():
cmgr = ic.ConnectionMgr()
cmgr.connect(ic.CConnectionConfig().instanceId(winidea_id))
debugCtrl = initTarget(cmgr)
ideCtrl = ic.CIDEController(cmgr)
ideCtrl.setOption('/IDE/Debug.Symbols.Format.ANSI', True)
watches = ['main_loop_counter,h', 'g_intArray1[0],d', 'g_complexStruct.m_struct.i_base,d']
recordingTimeInSeconds = 5
samplingIntervalInSeconds = 0.2
filePrefix = 'watches'
isRecordToMemory = True
isPrintToStdOut = True
print('Recording watch expressions:')
watchResults = recordWatchExpressionsToCSV(debugCtrl,
watches,
recordingTimeInSeconds,
samplingIntervalInSeconds,
filePrefix + '.csv',
isRecordToMemory,
isPrintToStdOut)
filePrefix = 'batch'
variables = ['main_loop_counter', 'g_intArray1[0]', 'g_complexStruct.m_struct.i_base']
debugCtrl.modify(ic.IConnectDebug.fRealTime, 'main_loop_counter', '0')
samplingIntervalInSeconds = 0.5
print('Recording with batch access...')
batchResults, itemSizes = recordBatch(cmgr,
debugCtrl,
variables,
recordingTimeInSeconds,
samplingIntervalInSeconds)
batchAccessResultToCSV(filePrefix + '.csv', batchResults,
itemSizes, variables)
batchData = batchAccessResultToList(batchResults, itemSizes)
if isPyLabInstalled:
data = pl.array(batchData)
times = data[:, 0]
pl.subplot(3, 1, 1)
pl.plot(times, data[:, 1])
pl.ylabel('main_loop_counter')
pl.subplot(3, 1, 2)
pl.plot(times, data[:, 2], 'g')
pl.ylabel('g_intArray1[0]')
pl.subplot(3, 1, 3)
pl.plot(times, data[:, 3], 'r')
pl.ylabel('g_complexStruct.m_struct.i_base')
pl.xlabel('t[s]')
pl.show()
print('Done!')
if __name__ == '__main__':
main()