#!/usr/bin/env python3"""Module for reading InsetChart.json channels."""fromcsvimportwriterasCsvWriterfromdatetimeimportdatetimeimportjsonfrompathlibimportPathfromtypingimportDict,List,Unionimportwarningsimportpandasaspd_CHANNELS="Channels"_DTK_VERSION="DTK_Version"_DATETIME="DateTime"_REPORT_TYPE="Report_Type"_REPORT_VERSION="Report_Version"_SIMULATION_TIMESTEP="Simulation_Timestep"_START_TIME="Start_Time"_TIMESTEPS="Timesteps"_KNOWN_KEYS={_CHANNELS,_DTK_VERSION,_DATETIME,_REPORT_TYPE,_REPORT_VERSION,_SIMULATION_TIMESTEP,_START_TIME,_TIMESTEPS,}_TYPE_INSETCHART="InsetChart"_UNITS="Units"_DATA="Data"_HEADER="Header"
[docs]classHeader(object):# Allow callers to send an arbitrary dictionary, potentially, with extra key:value pairs.def__init__(self,**kwargs)->None:self._channelCount=kwargs[_CHANNELS]ifkwargsand_CHANNELSinkwargselse0self._dtkVersion=(kwargs[_DTK_VERSION]ifkwargsand_DTK_VERSIONinkwargselse"unknown-branch (unknown)")self._timeStamp=(kwargs[_DATETIME]ifkwargsand_DATETIMEinkwargselsef"{datetime.now():%a %B %d %Y %H:%M:%S}")self._reportType=(kwargs[_REPORT_TYPE]ifkwargsand_REPORT_TYPEinkwargselse_TYPE_INSETCHART)self._reportVersion=(kwargs[_REPORT_VERSION]ifkwargsand_REPORT_VERSIONinkwargselse"0.0")self._stepSize=(kwargs[_SIMULATION_TIMESTEP]ifkwargsand_SIMULATION_TIMESTEPinkwargselse1)self._startTime=kwargs[_START_TIME]ifkwargsand_START_TIMEinkwargselse0self._numTimeSteps=(kwargs[_TIMESTEPS]ifkwargsand_TIMESTEPSinkwargselse0)self._tags={key:kwargs[key]forkeyinkwargsifkeynotin_KNOWN_KEYS}return@propertydefnum_channels(self)->int:returnself._channelCount@num_channels.setterdefnum_channels(self,count:int)->None:"""> 0"""assertcount>0,"numChannels must be > 0"self._channelCount=countreturn@propertydefdtk_version(self)->str:returnself._dtkVersion@dtk_version.setterdefdtk_version(self,version:str)->None:"""major.minor"""self._dtkVersion=f"{version}"return@propertydeftime_stamp(self)->str:returnself._timeStamp@time_stamp.setterdeftime_stamp(self,timestamp:Union[datetime,str])->None:"""datetime or string"""self._timeStamp=(f"{timestamp:%a %B %d %Y %H:%M:%S}"ifisinstance(timestamp,datetime)elsef"{timestamp}")return@propertydefreport_type(self)->str:returnself._reportType@report_type.setterdefreport_type(self,report_type:str)->None:self._reportType=f"{report_type}"return@propertydefreport_version(self)->str:returnself._reportVersion@report_version.setterdefreport_version(self,version:str)->None:self._reportVersion=f"{version}"return@propertydefstep_size(self)->int:""">= 1"""returnself._stepSize@step_size.setterdefstep_size(self,size:int)->None:""">= 1"""self._stepSize=int(size)assertself._stepSize>=1,"stepSize must be >= 1"return@propertydefstart_time(self)->int:""">= 0"""returnself._startTime@start_time.setterdefstart_time(self,time:int)->None:""">= 0"""self._startTime=int(time)assertself._startTime>=0,"startTime must be >= 0"return@propertydefnum_time_steps(self)->int:""">= 1"""returnself._numTimeSteps@num_time_steps.setterdefnum_time_steps(self,count:int)->None:""">= 1"""self._numTimeSteps=int(count)assertself._numTimeSteps>0,"numTimeSteps must be > 0"return
[docs]classChannel(object):def__init__(self,title:str,units:str,data:List)->None:self._title=titleself._units=unitsself._data=datareturn@propertydeftitle(self)->str:returnself._title@title.setterdeftitle(self,title:str)->None:self._title=f"{title}"return@propertydefunits(self)->str:returnself._units@units.setterdefunits(self,units:str)->None:self._units=f"{units}"return@propertydefdata(self):returnself._datadef__getitem__(self,item):"""Index into channel data by time step"""returnself._data[item]def__setitem__(self,key,value)->None:"""Update channel data by time step"""self._data[key]=valuereturn
[docs]classChannelReport(object):def__init__(self,filename:str=None,**kwargs):iffilenameisnotNone:assertisinstance(filename,str),"filename must be a string"self._from_file(filename)else:self._header=Header(**kwargs)self._channels={}return@propertydefheader(self)->Header:returnself._header# pass-through to header@propertydefdtk_version(self)->str:returnself._header.dtk_version@dtk_version.setterdefdtk_version(self,version:str)->None:self._header.dtk_version=versionreturn@propertydeftime_stamp(self)->str:returnself._header.time_stamp@time_stamp.setterdeftime_stamp(self,time_stamp:Union[datetime,str])->None:self._header.time_stamp=time_stampreturn@propertydefreport_type(self)->str:returnself._header.report_type@report_type.setterdefreport_type(self,report_type:str)->None:self._header.report_type=report_typereturn@propertydefreport_version(self)->str:"""major.minor"""returnself._header.report_version@report_version.setterdefreport_version(self,version:str)->None:self._header.report_version=versionreturn@propertydefstep_size(self)->int:""">= 1"""returnself._header.step_size@step_size.setterdefstep_size(self,size:int)->None:""">= 1"""self._header.step_size=sizereturn@propertydefstart_time(self)->int:""">= 0"""returnself._header.start_time@start_time.setterdefstart_time(self,time:int)->None:""">= 0"""self._header.start_time=timereturn@propertydefnum_time_steps(self)->int:"""> 0"""returnself._header.num_time_steps@num_time_steps.setterdefnum_time_steps(self,count:int):"""> 0"""self._header.num_time_steps=countreturn# end pass-through@propertydefnum_channels(self)->int:returnlen(self._channels)@propertydefchannel_names(self)->List:returnsorted(self._channels)@propertydefchannels(self)->Dict:"""Channel objects keyed on channel name/title"""returnself._channelsdef__getitem__(self,item:str)->Channel:"""Return Channel object by channel name/title"""returnself._channels[item]
[docs]defas_dataframe(self)->pd.DataFrame:"""Return underlying data as a Pandas DataFrame"""dataframe=pd.DataFrame({key:self.channels[key].dataforkeyinself.channel_names})returndataframe
[docs]defwrite_file(self,filename:str,indent:int=0,separators=(",",":"))->None:"""Write inset chart to specified text file."""# in case this was generated locally, lets do some consistency checksassertlen(self._channels)>0,"Report has no channels."counts=set([len(channel.data)fortitle,channelinself.channels.items()])assert(len(counts)==1),f"Channels do not all have the same number of values ({counts})"self._header.num_channels=len(self._channels)self.num_time_steps=len(self._channels[self.channel_names[0]].data)withopen(filename,"w",encoding="utf-8")asfile:channels={}for_,channelinself.channels.items():# https://stackoverflow.com/questions/38987/how-do-i-merge-two-dictionaries-in-a-single-expressionchannels={**channels,**channel.as_dictionary()}chart={_HEADER:self.header.as_dictionary(),_CHANNELS:channels}json.dump(chart,file,indent=indent,separators=separators)return
def_from_file(self,filename:str)->None:defvalidate_file(_jason)->None:assert_HEADERin_jason,f"'{filename}' missing '{_HEADER}' object."assert(_CHANNELSin_jason[_HEADER]),f"'{filename}' missing '{_HEADER}/{_CHANNELS}' key."assert(_TIMESTEPSin_jason[_HEADER]),f"'{filename}' missing '{_HEADER}/{_TIMESTEPS}' key."assert_CHANNELSin_jason,f"'{filename}' missing '{_CHANNELS}' object."num_channels=_jason[_HEADER][_CHANNELS]channels_len=len(_jason[_CHANNELS])assertnum_channels==channels_len,(f"'{filename}': "+f"'{_HEADER}/{_CHANNELS}' ({num_channels}) does not match number of {_CHANNELS} ({channels_len}).")returndefvalidate_channel(_channel,_title,_header)->None:assert_UNITSin_channel,f"Channel '{_title}' missing '{_UNITS}' entry."assert_DATAin_channel,f"Channel '{_title}' missing '{_DATA}' entry."count=len(_channel[_DATA])assert(count==_header.num_time_steps),f"Channel '{title}' data values ({count}) does not match header Time_Steps ({_header.num_time_steps})."returnwithopen(filename,"rb")asfile:jason=json.load(file)validate_file(jason)header_dict=jason[_HEADER]self._header=Header(**header_dict)self._channels={}channels=jason[_CHANNELS]fortitle,channelinchannels.items():validate_channel(channel,title,self._header)units=channel[_UNITS]data=channel[_DATA]self._channels[title]=Channel(title,units,data)return
[docs]defto_csv(self,filename:Union[str,Path],channel_names:List[str]=None,transpose:bool=False)->None:""" Write each channel from the report to a row, CSV style, in the given file. Channel name goes in the first column, channel data goes into subsequent columns. Args: filename: string or path specifying destination file channel_names: optional list of channels (by name) to write to the file transpose: write channels as columns rather than rows """ifchannel_namesisNone:channel_names=self.channel_namesifnottranspose:# defaultdata_frame=pd.DataFrame([[channel_name]+list(self[channel_name])forchannel_nameinchannel_names])# data_frame = pd.DataFrame(([channel_name] + list(self[channel_name]) for channel_name in channel_names))data_frame.to_csv(filename,header=False,index=False)else:# transposedself.as_dataframe().to_csv(filename,header=True,index=True,index_label="timestep")return