""" """ import os import numpy as np import time import datetime try: from nicegui import app, ui, events, Client, run except ModuleNotFoundError as e: print(f'Error: {e}') exit(1) try: import plotly.graph_objects as pgo import plotly.io as pio import plotly as ply import plotly.subplots import local_file_picker import plotly_resampler as prs except ModuleNotFoundError as e: print(f'Error: {e}') exit(1) pio.templates["plotly_light"] = pgo.layout.Template( layout = pio.templates["plotly_white"].layout) #pio.templates["plotly_light"].layout['scene']['xaxis']['gridcolor'] = 'gray' pio.templates["plotly_light"].layout['xaxis']['gridcolor'] = 'lightgray' pio.templates["plotly_light"].layout['xaxis']['zerolinecolor'] = 'lightgray' pio.templates["plotly_light"].layout['yaxis']['gridcolor'] = 'lightgray' pio.templates["plotly_light"].layout['yaxis']['zerolinecolor'] = 'lightgray' ELM = { 'name': 'tempplot', 'filedir': './data', 'filename': None, 'label_file_fmt': 'File: {filename}', 'label_data_len_fmt': 'Data points: {num:,} (rendered) / {numa:,} (total)', 'label_data_temp_fmt': 'Temperature = [{min:.1f}, {max:.1f}] K', 'label_data_heat_fmt': 'Heater = [{min:.1f}, {max:.1f}] %', 'plotly_light': 'plotly_light', 'plotly_dark': 'plotly_dark', 'update_last': time.time(), 'update_timeout': 1.0, 'label_update_last_fmt': 'Last refresh: {last}', } def main_cli(): import argparse parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawTextHelpFormatter, prefix_chars='-', add_help=False, ) parser.add_argument( '--host', type=str, default='', help='default 127.0.0.1') parser.add_argument( '--port', type=str, default='8085', help='default 8080') parser.add_argument( '--dir', type=str, default='./data', help='default ./data') parser.add_argument( '-v', '--verbose', action='store_true', help='Verbose output') parser.add_argument( '-D', '--debug', action='store_true', help=argparse.SUPPRESS) parser.add_argument( '-h', '--help', action='store_true', help='Show this help message and exit') args = parser.parse_args() if args.debug: DEBUG = True print(f'[DEBUG] args: {args}') print(f'[DEBUG] __file__: {__file__}') print(f'[DEBUG] cwd: {os.getcwd()}') print(f'[DEBUG] host: {args.host}') print(f'[DEBUG] port: {args.port}') if args.help: parser.print_help() exit(0) return args def np2pgo(x, y, label=None): res = {'x': x, 'y': y} if label is not None: res.update({'name': label}) return res def fntl(): if ELM['filename'] is not None: return os.path.relpath(ELM['filename'], ELM['filedir']) def fnttt(): res = None if ELM['filename'] is not None: fn = os.path.basename(ELM['filename']) n = len(fn) if n == 18: res = f'{fn[:4]}-{fn[4:6]}-{fn[6:8]} {fn[8:10]}:{fn[10:12]}:{fn[12:14]}' return res def islandinfo(y, trigger_val, stopind_inclusive=True): """ >>> y >>> array([1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1]) >>> islandinfo(y, trigger_val=1)[0] >>> [(0, 2), (8, 9), (16, 19)] >>> islandinfo(y, trigger_val=0)[0] >>> [(3, 7), (10, 15)] """ # Setup "sentients" on either sides to make sure we have setup # "ramps" to catch the start and stop for the edge islands # (left-most and right-most islands) respectively y_ext = np.r_[False,y==trigger_val, False] # Get indices of shifts, which represent the start and stop indices idx = np.flatnonzero(y_ext[:-1] != y_ext[1:]) # Lengths of islands if needed lens = idx[1::2] - idx[:-1:2] # Using a stepsize of 2 would get us start and stop indices for each island return list(zip(idx[:-1:2], idx[1::2]-int(stopind_inclusive))), lens async def pick_file() -> None: result = await local_file_picker.local_file_picker(ELM['filedir'], multiple=False) # ui.notify(f'You chose {result}') if result: filename = result[-1] ELM['filename'] = filename # see timer below, but still here active b/c the time might be inactive ui.timer(0, lambda: update_data(), once=True) def get_data(filename=ELM['filename']): data = None if filename is not None and os.path.isfile(filename): data = np.genfromtxt(fname=filename, skip_header=1, max_rows=2) if len(data) > 0 and len(data[0]) > 6: names = ['time', 'ts', 'ta', 'tb', 'tc', 'td', 'hp', 'hl'] else: names = ['time', 'ts', 'ta', 'tb', 'hp', 'hl'] data = np.genfromtxt( fname=filename, dtype=None, skip_header=1, converters={ 0: lambda x: datetime.datetime.strptime(x, '%Y%m%d%H%M%S')}, names=names, encoding='utf-8' ) return data async def update_data(): out = '' if not app.storage.user['timer'] \ or time.time() - ELM['update_last'] > ELM['update_timeout']: ELM['update_busy'] = True filename=ELM['filename'] t1 = time.time() data_all = await run.cpu_bound(get_data, filename) t2 = time.time() data = data_all if data is not None: # time_range = ELM['number_time_range'].value time_range = app.storage.user['time_range'] if time_range is not None and time_range > 0: # delta = datetime.datetime.now() - data['time'] delta = data['time'][-1] - data['time'] data = data[np.where(delta <= datetime.timedelta(hours=time_range))] out += f"{datetime.datetime.now()}, {filename}, {data_all.shape}, {data.shape}" ELM['label_data_len'].set_text(ELM['label_data_len_fmt'].format( num=len(data), numa=len(data_all))) if len(data) > 0: if len(data[0]) > 6: ELM['label_data_temp'].set_text(ELM['label_data_temp_fmt'].format( min=np.min([data['ts'], data['ta'], data['tb'], data['tc'], data['td']]), max=np.max([data['ts'], data['ta'], data['tb'], data['tc'], data['td']]))) else: ELM['label_data_temp'].set_text(ELM['label_data_temp_fmt'].format( min=np.min([data['ts'], data['ta'], data['tb']]), max=np.max([data['ts'], data['ta'], data['tb']]))) ELM['label_data_heat'].set_text(ELM['label_data_heat_fmt'].format( min=np.min(data['hp']), max=np.max(data['hp']))) ELM['fig'].data = [] ELM['fig'].layout.shapes = [] ELM['plot'].visible = False ELM['plot'].update() # mode='lines+markers' too heavy ELM['fig'].add_trace(pgo.Scatter( **np2pgo(data['time'], data['ts'], 'Setp'), mode='lines', yaxis="y", legendgroup=None), secondary_y=False) ELM['fig'].add_trace(pgo.Scatter( **np2pgo(data['time'], data['ta'], 'Temp A'), mode='lines', yaxis="y", legendgroup=None), secondary_y=False) ELM['fig'].add_trace(pgo.Scatter( **np2pgo(data['time'], data['tb'], 'Temp B'), mode='lines', yaxis="y", legendgroup=None), secondary_y=False) if len(data) > 0 and len(data[0]) > 6: ELM['fig'].add_trace(pgo.Scatter( **np2pgo(data['time'], data['tc'], 'Temp C'), mode='lines', yaxis="y", legendgroup=None), secondary_y=False) ELM['fig'].add_trace(pgo.Scatter( **np2pgo(data['time'], data['td'], 'Temp D'), mode='lines', yaxis="y", legendgroup=None), secondary_y=False) ELM['fig'].add_trace(pgo.Scatter( **np2pgo(data['time'], data['hp'], 'Heater'), mode='lines', yaxis="y2", legendgroup=None, legend="legend2"), row=2, col=1, secondary_y=False) # display the heater range as background n = len(data['time']) # dummy plot to generate the secondary axis ELM['fig'].add_trace( pgo.Scatter( x=[data['time'][0]], y=[0], mode='lines', line_width=0, showlegend=False), # important to not interleave gaps additionally. the limit-to-view also # ensures that the autoscale is not disturbed. gap_handler=prs.aggregation.NoGapHandler(), limit_to_view=True, secondary_y=True, row=2, col=1, # yref="paper", ) shapes = [] hls = [0, 1, 2, 3, 4, 5] for hli in hls: for (i0, i1) in islandinfo(data['hl'], hli, True)[0]: shapes.append( dict( fillcolor="rgba(63, 81, 181, 0.2)", opacity=0.70, line={"width": 0}, type="rect", x0=data['time'][i0], x1=data['time'][i1], xref="x", y0=0, y1=hli/5, yref="y2 domain", #"paper" layer="below", # "between" # label=dict(text=f"{hli}", textposition="top center", font=dict(size=12)), )) ELM['fig'].update_layout(shapes=shapes) ELM['plot'].visible = True ELM['plot'].update() ELM['update_last'] = time.time() ELM['label_update_last'].set_text( ELM['label_update_last_fmt'].format( last=datetime.datetime.fromtimestamp( ELM['update_last']).strftime("%Y-%m-%dT%H:%M:%S.%f"))) out = f"{t2 - t1:.3f}, {time.time() - t1:.3f}, " + out print(out) def update_fig(): ELM['fig'].update_yaxes( row=1, col=1, range=[ELM['number_temp_min'].value, ELM['number_temp_max'].value], rangemode="nonnegative") ELM['fig'].update_yaxes( row=2, col=1, range=[ELM['number_heat_min'].value, ELM['number_heat_max'].value], rangemode="nonnegative") @ui.page('/') def index(): # initialize objects pio.templates.default = 'plotly_dark' ELM['fig'] = prs.FigureResampler(pgo.Figure()).set_subplots( rows=2, cols=1, row_heights=[0.7, 0.3], specs = [[{'t': 0.0}], [{"secondary_y": True}]], shared_xaxes=True, vertical_spacing=0.025) # render header ui.context.client.content.classes('h-[100vh]') ui.add_head_html(''' ''') dark_mode = ui.dark_mode().bind_value(app.storage.user, 'dark_mode') ui.colors(primary='#4888c4') with ui.row(align_items="center").classes('w-full'): ui.button('Choose file', on_click=pick_file, icon='folder') ELM['number_time_range'] = ui.number( label='Time range', value=6, format='%.2f', min=0, step=0.5, suffix='h', # see timer below, but still here active b/c the time might be inactive ).on('update:model-value', lambda e: update_data(), throttle=1.0) \ .props('clearable').bind_value(app.storage.user, 'time_range').classes('w-32') ELM['switch_timer'] = ui.switch('Auto-refresh').bind_value(app.storage.user, 'timer') ELM['label_update_last'] = ui.label( ELM['label_update_last_fmt'].format( last=datetime.datetime.fromtimestamp( ELM['update_last']).strftime("%Y-%m-%dT%H:%M:%S.%f"))) ui.space() ui.switch( 'Dark mode', on_change=lambda: ( ELM['fig'].update_layout( template=ELM['plotly_dark'] if dark_mode.value else ELM['plotly_light']), 'plot' in ELM and ELM['plot'].update()) ).bind_value(app.storage.user, 'dark_mode').props('icon="dark_mode"') with ui.row(): ELM['label_file'] = ui.label(ELM['label_file_fmt'].format(filename=fntl())) \ .bind_text_from(ELM, 'filename', lambda filename: ELM['label_file_fmt'].format(filename=fntl())) ELM['label_file_tt'] = ui.tooltip(None) \ .bind_text_from(ELM, 'filename', lambda filename: fnttt()) \ .bind_visibility_from(ELM, 'filename', bool) \ .move(ELM['label_file']) ELM['label_data_len'] = ui.label(ELM['label_data_len_fmt'].format(num=0, numa=0)) ELM['label_data_temp'] = ui.label(ELM['label_data_temp_fmt'].format(min=0, max=0)) ELM['label_data_heat'] = ui.label(ELM['label_data_heat_fmt'].format(min=0, max=0)) with ui.row(align_items="baseline").classes('w-full'): ui.label('Temperature') ELM['number_temp_min'] = \ ui.number( label='min', value=None, format='%.2f', min=0, step=1.0, suffix='K') \ .props('clearable').classes('w-32').on( 'update:model-value', lambda e: update_fig(), throttle=1.0) ELM['number_temp_max'] = \ ui.number( label='max', value=None, format='%.2f', min=0, step=1.0, suffix='K') \ .props('clearable').classes('w-32').on( 'update:model-value', lambda e: update_fig(), throttle=1.0) ui.label().classes('w-22') ui.label('Heater') ELM['number_heat_min'] = \ ui.number( label='min', value=0, format='%.2f', min=0, step=1.0, suffix='%') \ .props('clearable').classes('w-32').on( 'update:model-value', lambda e: update_fig(), throttle=1.0) ELM['number_heat_max'] = \ ui.number( label='max', value=100, format='%.2f', min=0, step=1.0, suffix='%') \ .props('clearable').classes('w-32').on( 'update:model-value', lambda e: update_fig(), throttle=1.0) ELM['fig'].update_layout( height=640, margin={'t': 25, 'r': 0}, #legend_tracegroupgap=140, template=ELM['plotly_dark'] if dark_mode.value else ELM['plotly_light'], yaxis1_title='Temperature, K', yaxis2_title='Power, %', yaxis2_range=[0, 100], xaxis2_title='Time', legend={ 'orientation': 'h', 'xanchor': 'left', 'yanchor': 'bottom', 'y': 1.0, }, legend2={ 'orientation': 'h', 'xanchor': 'left', 'yanchor': 'bottom', 'y': -0.13, }, ) ELM['fig'].update_yaxes( row=1, col=1, range=[ELM['number_temp_min'].value, ELM['number_temp_max'].value], rangemode="nonnegative") ELM['fig'].update_yaxes( row=2, col=1, range=[ELM['number_heat_min'].value, ELM['number_heat_max'].value], rangemode="nonnegative", showgrid=False) ELM['fig'].update_yaxes( title_text='Heater level', row=2, col=1, range=[0, 5], rangemode="nonnegative", secondary_y=True, fixedrange=True, showgrid=True, ticks="inside", tickwidth=1, # tickcolor='crimson', # nticks=6, # ticklen=5, tickvals=[0, 1, 2, 3, 4, 5], ) ELM['plot'] = ui.plotly(ELM['fig']).classes('w-full') # update_data() ui.timer(0, lambda: update_data(), once=True) ELM['timer'] = ui.timer(0.5, lambda: update_data(), immediate=False) ELM['switch_timer'].bind_value_to(ELM['timer'], 'active') def main_gui(args): ELM['filedir'] = args.dir ui.run( host=args.host, port=int(args.port), title=ELM['name'], dark=False, native=False, show=False, storage_secret='tempview') def main(args): return 0 if __name__ in {'__main__', '__mp_main__'}: args = main_cli() main_gui(args)