2017-06-30 22 views
6

Próbuję zasymulować odczyt danych z kolejki, takiej jak kafka. Muszę zachować wskaźnik do bieżącego rekordu, gdy przesyłam dane z pliku. Obecnie robię to ze zmienną globalną, ale wydaje się, że jest ona wspólna dla wszystkich sesji użytkownika.Jak zapisać i pobrać stan w wywołaniu zwrotnym()?

Jak zapisać ten stan użytkownika w bokeh?

def modify_doc(doc): 

    df_all = pd.read_csv('data.csv') 
    df_all['Date'] = pd.to_datetime(df_all['Date']) 

    start_data = df_all[0:10].to_dict(orient='list') 
    source = ColumnDataSource(data=start_data) 
    ... 

    def callback(): 

     # FIXME: how can we save the current_record in the user's session? 

     global current_record 
     try: 
      current_record 
     except NameError: 
      current_record = 10 

     df = df_all[current_record:current_record+1] 

     if df.shape[0] > 0: 
      # we have another record so display it 
      new_data = df.to_dict(orient='list') 
      source.stream(new_data) 
      current_record = current_record + 1 

    doc.add_root(plot) 
    doc.add_periodic_callback(callback, 250) 

Widziałem dokumentację ClientSession, ale to wydaje się działać na poziomie całego dokumentu?


podaję Minimal, kompletne i weryfikowalne poniższy przykład:

pliku: bokeh_server.py

uruchamiane lokalnie z: python3 bokeh_server.py

import pandas as pd 
from tornado.ioloop import IOLoop 
import yaml 
from jinja2 import Template 

from bokeh.application.handlers import FunctionHandler 
from bokeh.application import Application 
from bokeh.layouts import column 
from bokeh.models import ColumnDataSource, Slider, Div 
from bokeh.plotting import figure 
from bokeh.server.server import Server 
from bokeh.themes import Theme 
from bokeh.client import push_session 

import os 

# if running locally, listen on port 5000 
PORT = int(os.getenv('PORT', '5000')) 
HOST = "0.0.0.0" 

try: 
    # This is set in the cloud foundry manifest. If we are running on 
    # cloud foundry, this will be set for us. 
    ALLOW_WEBSOCKET_ORIGIN = os.getenv("ALLOW_WEBSOCKET_ORIGIN").split(',') 
except: 
    # We are not running on cloud foundry so we must be running locally 
    ALLOW_WEBSOCKET_ORIGIN = [ 'localhost:{0}'.format(PORT) ] 


io_loop = IOLoop.current() 

# This example simulates reading from a stream such as kafka 

def modify_doc(doc): 

    df_all = pd.read_csv('data.csv') 
    df_all['Date'] = pd.to_datetime(df_all['Date']) 

    start_data = df_all[0:10].to_dict(orient='list') 

    source = ColumnDataSource(data=start_data) 

    plot = figure(x_axis_type='datetime', 
        y_range=(0, 10000000), 
        y_axis_label='Y Label', 
        title="Title") 

    plot.line('Date', 'ALL_EXCL_FUEL', color='blue',  alpha=1, source=source) 
    plot.line('Date', 'MOSTLY_FOOD',  color='lightblue', alpha=1, source=source) 
    plot.line('Date', 'NON_SPECIALISED', color='grey',  alpha=1, source=source) 

    def callback(): 
     # FIXME: how can we save this in the user's session? 
     global counter 
     try: 
      counter 
     except NameError: 
      counter = 10 

     df = df_all[counter:counter+1] 

     if df.shape[0] > 0: 
      # hardcode update values for now 
      new_data = df.to_dict(orient='list') 
      source.stream(new_data) 
      counter = counter + 1 

    doc.add_root(plot) 
    doc.add_periodic_callback(callback, 250) 


bokeh_app = Application(FunctionHandler(modify_doc)) 

server = Server(
     {'/': bokeh_app}, 
     io_loop=io_loop, 
     allow_websocket_origin=ALLOW_WEBSOCKET_ORIGIN, 
     **{'port': PORT, 'address': HOST} 
     ) 
server.start() 

if __name__ == '__main__': 
    io_loop.add_callback(server.show, "/") 
    io_loop.start() 

pliku: data.csv

Date,ALL_EXCL_FUEL,MOSTLY_FOOD,NON_SPECIALISED,TEXTILE,HOUSEHOLD,OTHER,NON_STORE 
1986 Jan,1883154,747432,163708,267774,261453,281699,161088 
1986 Feb,1819796,773161,152656,223836,246502,275121,148519 
1986 Mar,1912582,797104,169440,251438,249614,292348,152638 
1986 Apr,1974419,809334,170540,275975,260086,299271,159213 
1986 May,1948915,800193,170173,274979,251175,297655,154740 
1986 Jun,2019114,821785,178366,295463,251507,311447,160546 
1986 Jul,2051539,816033,184812,297969,269786,323187,159752 
1986 Aug,2011746,804386,180911,297138,263427,310220,155665 
1986 Sep,2046678,792943,181055,305350,280640,318368,168322 
1986 Oct,2110669,810147,187728,308919,298637,325617,179621 
1986 Nov,2315710,847794,231599,352009,332079,358077,194152 
1986 Dec,2830206,970987,319570,490001,373714,469399,206536 
1987 Jan,2032021,798562,172215,288186,288534,307900,176624 
1987 Feb,1980748,805713,165682,247219,282836,313577,165721 
1987 Mar,2009717,816051,174034,256756,280207,315562,167106 
1987 Apr,2156967,862749,189729,308543,284440,336755,174751 
1987 May,2075808,834375,175464,287515,280404,330093,167957 
1987 Jun,2137092,844051,183014,304706,286522,345149,173651 
1987 Jul,2208377,847098,198848,330804,301537,356037,174054 

Odpowiedz

3

Przeprowadziłem kilka testów i odkryłem, że za każdym razem, gdy otwierano nową sesję przeglądarki za pomocą adresu URL wykresu bokeh, utworzono nową instancję bokeh Document. Odpowiedzią było dla mnie zapisanie stanu w dokumencie:

def modify_doc(doc): 

    # The first 100 records of data.csv will be loaded immediately 
    # The remaining records will be read one-by-one in the update 
    # callback which is used to simulate new, realtime data arriving 

    doc.realtime_rec_ptr = 100 

    df_all = pd.read_csv('data.csv') 
    df_all['Date'] = pd.to_datetime(df_all['Date']) 

    start_data_df = df_all[0:doc.realtime_rec_ptr] 
    start_data_df.loc[ :, 'color' ] = 'green' 

    src = ColumnDataSource(data=start_data_df.to_dict(orient='list')) 

    p = figure(x_axis_type='datetime', title="Title" 
        y_range=(0, 10000000), y_axis_label='Y Label') 

    p.line('Date','ALL_EXCL_FUEL',color='blue',alpha=1,source=src) 

    # realtime markers will be colored green, others will be blue 
    p.circle('Date','ALL_EXCL_FUEL',color='color',fill_alpha=0.2,size=4,source=src) 

    def callback(): 
     df = df_all[doc.realtime_rec_ptr:realtime_rec_ptr+1] 

     if df.shape[0] > 0: 
      df.loc[ :, 'color' ] = 'blue' 
      new_data = df.to_dict(orient='list') 
      #print(new_data) 
      source.stream(new_data) 
      doc.realtime_rec_ptr = doc.realtime_rec_ptr + 1 

    doc.add_root(p) 
    doc.add_periodic_callback(callback, 250)