# -*- coding: utf-8 -*-
# set modules dir
import numpy as np
import sys, os, re, datetime
class NqDataLoader:
FLT_NO_MOUSE = 1 << 0
FLT_NO_LETTERS = 1 << 1
FLT_NO_BACK = 1 << 2
FLT_NO_SHORT_META = 1 << 3 # space, enter, arrows, etc.
FLT_NO_LONG_META = 1 << 4 # shift, control, alt, ect.
FLT_NO_PUNCT = 1 << 5
def __init__(self):
self.dataKeys = None
self.dataHT = None
self.dataTimeStart = None
self.dataTimeEnd = None
pass
def sanityCheck( self ):
"""
Filter out keystrokes variables in the member variables.
Eliminate anything < 0.
returns the number of elements removed
"""
assert( self.dataKeys is not None and len(self.dataKeys) > 0 )
assert( self.dataHT is not None and len(self.dataHT) > 0 )
assert( self.dataTimeStart is not None and len(self.dataTimeStart) > 0 )
assert( self.dataTimeEnd is not None and len(self.dataTimeEnd) > 0 )
badLbl = self.dataTimeStart <= 0
badLbl = np.bitwise_or( badLbl, self.dataTimeEnd <= 0)
badLbl = np.bitwise_or( badLbl, self.dataHT < 0)
badLbl = np.bitwise_or( badLbl, self.dataHT >= 5)
#----- remove non consecutive start times
nonConsTmpLbl = np.zeros( len(self.dataTimeStart) ) == 0 # start with all True labels
nonConsLbl = np.zeros( len(self.dataTimeStart) ) > 0 # start with all False labels
startTmpArr = self.dataTimeStart.copy()
while ( np.sum( nonConsTmpLbl ) > 0 ):
# find non consecutive labels
nonConsTmpLbl = np.append([False], np.diff(startTmpArr)<0)
# keep track of the indeces to remove
nonConsLbl = np.bitwise_or( nonConsLbl, nonConsTmpLbl)
# changes value in the temporary array
indecesToChange = np.arange(len(nonConsTmpLbl))[nonConsTmpLbl]
startTmpArr[indecesToChange] = startTmpArr[indecesToChange-1]
badLbl = np.bitwise_or( badLbl, nonConsLbl)
#-----
# invert bad labels
goodLbl = np.bitwise_not(badLbl)
self.dataKeys = self.dataKeys[goodLbl]
self.dataHT = self.dataHT[goodLbl]
self.dataTimeStart = self.dataTimeStart[goodLbl]
self.dataTimeEnd = self.dataTimeEnd[goodLbl]
return sum(badLbl)
def loadDataFile(self, fileIn, autoFilt=True, impType=None, debug=False):
"""
Load raw data file
"""
errorStr = ''
try:
data = []
# if data.dtype == np.int64: # Sleep inertia format
if impType =='si':
data = np.genfromtxt(fileIn, dtype=long, delimiter=',', skip_header=0)
data = data - data.min()
data = data.astype(np.float64) / 1000
self.dataTimeStart = data[:,0]
self.dataTimeEnd = data[:,1]
self.dataHT = self.dataTimeEnd - self.dataTimeStart
#TO REMOVE
self.dataKeys = np.zeros(len(self.dataHT))#Just to make sanity work
remNum = self.sanityCheck()
#print remNum
else: # PD format
data = np.genfromtxt(fileIn, dtype=None, delimiter=',', skip_header=0)
# load
self.dataKeys = data['f0']
self.dataHT = data['f1']
self.dataTimeStart = data['f3'] #No CHANGED 2<->3
self.dataTimeEnd = data['f2']
remNum = self.sanityCheck()
#print '{:}, {:} %'.format( remNum, 1.0*remNum/len(self.dataHT) )
if (debug):
print 'removed ', str(remNum), ' elements'
if( autoFilt ):
self.filtData(self.FLT_NO_MOUSE | self.FLT_NO_LONG_META )
# load flight time
self.dataFT = np.array([ self.dataTimeStart[i]-self.dataTimeStart[i-1] for i in range(1,self.dataTimeStart.size) ])
self.dataFT = np.append(self.dataFT, 0)
return True
except IOError:
errorStr = 'file {:s} not found'.format(fileIn)
return errorStr
def loadDataArr(self, lstArr):
self.dataKeys = np.zeros((len(lstArr),1), dtype='S30')
self.dataHT = np.zeros((len(lstArr),1))
self.dataTimeStart = np.zeros((len(lstArr),1))
self.dataTimeEnd =np.zeros((len(lstArr),1))
i = 0
for row in lstArr:
tok = row.split(',')
self.dataKeys[i] = str(tok[0])
self.dataHT[i] = str(tok[1])
self.dataTimeStart[i] = str(tok[2])
self.dataTimeEnd[i] = str(tok[3])
i += 1
#self.loadDataFile(lstArr.toString())
def filtData(self, flags):
"""
Filter data
return (fltKeys, fltHT, fltTimeStart, fltTimeEnd)
"""
#-- filters
pMouse=re.compile('("mouse.+")')
pChar=re.compile('(".{1}")')
pBack=re.compile('("BackSpace")')
pLongMeta=re.compile('("Shift.+")|("Alt.+")|("Control.+")')
pShortMeta=re.compile('("space")|("Num_Lock")|("Return")|("P_Enter")|("Caps_Lock")|("Left")|("Right")|("Up")|("Down")')
pPunct=re.compile('("more")|("less")|("exclamdown")|("comma")|("\[65027\]")|("\[65105\]")|("ntilde")|("minus")|("equal")|("bracketleft")|("bracketright")|("semicolon")|("backslash")|("apostrophe")|("comma")|("period")|("slash")|("grave")')
#--
#-- create mask labels
lbl = np.ones(len( self.dataKeys ))==1
if( flags & self.FLT_NO_MOUSE ):
lblTmp = [ pMouse.match( k ) is None for k in self.dataKeys]
lbl = lbl & lblTmp
if( flags & self.FLT_NO_LETTERS ):
lblTmp = [ pChar.match( k ) is None for k in self.dataKeys]
lbl = lbl & lblTmp
if( flags & self.FLT_NO_BACK ):
lblTmp = [ pBack.match( k ) is None for k in self.dataKeys]
lbl = lbl & lblTmp
if( flags & self.FLT_NO_SHORT_META ):
lblTmp = [ pShortMeta.match( k ) is None for k in self.dataKeys]
lbl = lbl & lblTmp
if( flags & self.FLT_NO_LONG_META ):
lblTmp = [ pLongMeta.match( k ) is None for k in self.dataKeys]
lbl = lbl & lblTmp
if( flags & self.FLT_NO_PUNCT ):
lblTmp = [ pPunct.match( k ) is None for k in self.dataKeys]
lbl = lbl & lblTmp
#--
self.lbl = lbl
self.dataKeys = self.dataKeys[lbl]
self.dataHT = self.dataHT[lbl]
self.dataTimeStart = self.dataTimeStart[lbl]
self.dataTimeEnd = self.dataTimeEnd[lbl]
def getStdVariablesFilt( fileIn, impType=None ):
"""
Receives as parameter the location of the raw typing file
Return filtered variables (i.e. no mouse clicks, no long meta buttons, no backspaces)
format returned (array of keys, array of hold times, array of press events timestamps, array of release events timestamps )
"""
nqObj = self
res = nqObj.loadDataFile( fileIn, False, impType)
# remove delete button
nqObj.filtData(nqObj.FLT_NO_MOUSE | nqObj.FLT_NO_LONG_META | nqObj.FLT_NO_BACK )
assert(res==True) # make sure the file exists
dataKeys = nqObj.dataKeys
dataHT = nqObj.dataHT
dataTimeStart = nqObj.dataTimeStart
dataTimeEnd = nqObj.dataTimeEnd
return dataKeys, dataHT, dataTimeStart, dataTimeEnd
def getDataFiltHelper( fileIn, impType=None ):
"""
Helper method to load filtered keypress data from given file
:param fileIn: path to csv keypress file
:param impType: format of the csv file ('si': for sleep inertia data, None for PD data)
:return: list of array with dataKeys, dataHT, dataTimeStart, dataTimeEnd
"""
nqObj = NqDataLoader()
res = nqObj.loadDataFile( fileIn, False, impType)
# remove delete button
nqObj.filtData(nqObj.FLT_NO_MOUSE | nqObj.FLT_NO_LONG_META | nqObj.FLT_NO_BACK )
assert(res==True) # make sure the file exists
dataKeys = nqObj.dataKeys
dataHT = nqObj.dataHT
dataTimeStart = nqObj.dataTimeStart
dataTimeEnd = nqObj.dataTimeEnd
return dataKeys, dataHT, dataTimeStart, dataTimeEnd
def genFileStruct( dataDir, maxRepNum=4 ):
'''
Generate a dictionary with the NQ file list and test date (legacy method)
:param dataDir: base directory containing the CSV files
:param maxRepNum: integer with the maximum repetition number
:return: two dictionaries: fMap, dateMap = NQ file/date list[pID][repID][expID]
'''
fMap = {} # data container
dateMap = {}
files = os.listdir( dataDir )
p = re.compile( '([0-9]+)\.{1}([0-9]+)_([0-9]+)_([0-9]+)\.csv' )
for f in files:
m = p.match( f )
if( m ): # file found
timeStamp = m.group(1)
pID = int(m.group(2))
repID = int(m.group(3))
expID = int(m.group(4))
# store new patient
if( not fMap.has_key(pID) ):
fMap[pID] = {}
dateMap[pID] = {}
for tmpRid in range(1, maxRepNum+1):
fMap[pID][tmpRid] = {}
dateMap[pID][tmpRid] = {}
# fMap[pID] = {1: {}, 2: {}, 3: {}, 4:{}}
# store data
fMap[pID][repID][expID] = dataDir + f
dateMap[pID][repID][expID] = datetime.datetime.fromtimestamp(int(timeStamp))
else:
print f, ' no'
return fMap, dateMap