0
点赞
收藏
分享

微信扫一扫

量化交易之python篇 - remi_web - 内网前端(remi_web.py)(去权限认证版、程序刷新时优化网页自动弹出)


from remi.server import App, Server
import remi.gui as gui
import time
from position_operator.position_operator import PositionJsonOperator as TQZJsonOperator
from position_operator.position_operator import ContractModel


"""
新增 输入文本框, 可通过搜索账户名字, 加载持仓数据
"""
class MainWeb(App):
def __init__(self, *args):
super(MainWeb, self).__init__(*args)

def idle(self):
# 刷新一次 table控件、time控件
if self.__is_refresh_time(now_time_second=time.localtime().tm_sec, interval_second=15) is True:
self.__web_refresh(current_account_name=self.current_account_name)
print(self.__time_now())

def main(self):
self.__app_load_data()

self.current_account_name = ""

return self.__add_child_widgets(current_account_name=self.current_account_name)

# private part
# - app load data / add child widgets -
def __app_load_data(self):
self.all_account_data_dictionary = TQZJsonOperator.tqz_load_jsonfile(jsonfile="all_accounts.json")

self.now_time_second = None

self.account_name_list = []
[self.account_name_list.append(account_name) for account_name in self.all_account_data_dictionary.keys()]

def __add_child_widgets(self, current_account_name):
self.layout_width = '90%'
self.window = gui.VBox(width='100%') # 全屏

# text_input控件: 输入账户名
self.text_input = self.__get_textInput()
self.account_name_list_hint_label = gui.Label(
"",
width=self.layout_width,
height='20%'
)

# time_label控件: 当前时间
self.time_label = self.__get_time_label()

# table控件: 账户对应的持仓数据
self.table = self.__get_table_list(account_name=current_account_name)

return self.__window_add_subviews(
self.text_input,
self.account_name_list_hint_label,
self.time_label,
self.table,
window=self.window
)

# -- text input widget part --
def __get_textInput(self):
text_input = gui.TextInput(
single_line=True,
hint="请输入账号 (按 Enter 确认)",
width=self.layout_width,
height="20%"
)

text_input.onkeyup.do(
callback=self.__text_input_onkeyup
)
return text_input

def __text_input_onkeyup(self, widget, current_hint, last_presskey):
self.current_account_name = current_hint
self.__web_refresh(current_account_name=self.current_account_name)

empty_account_list = []
for account_name in self.account_name_list:
if (current_hint is not "") and account_name.lower().startswith(current_hint.lower()):
empty_account_list.append(account_name)

empty_account_list_string = ', '.join(empty_account_list)

if len(empty_account_list) is not 0:
self.account_name_list_hint_label.set_text("账户查询结果: " + empty_account_list_string)
elif (len(empty_account_list) is 0) and (current_hint is ""):
self.account_name_list_hint_label.set_text("")
elif (len(empty_account_list) is 0) and (current_hint is not ""):
self.account_name_list_hint_label.set_text("不存在 %s 账户" % current_hint)

if len(empty_account_list) != 0:
current_hint = empty_account_list[0]

if (last_presskey == "13") and (current_hint in self.account_name_list): # 按下回车时, 确认账户存在
# print("账户 %s 存在" % current_hint)

self.current_account_name = current_hint
self.text_input.set_text(current_hint)
self.account_name_list_hint_label.set_text("账户查询结果: " + current_hint)
self.__web_refresh(current_account_name=current_hint)
elif (last_presskey == "13") and (current_hint not in self.account_name_list):
# print("账户 %s 不存在" % current_hint)

self.account_name_list_hint_label.set_text("不存在 %s 账户" % current_hint)
self.__web_refresh(current_account_name=current_hint)


# -- time label widget part --
def __get_time_label(self):
label_test = '更新时间: ' + self.__time_now()
return gui.Label(label_test, width=self.layout_width, height='20%') # 控件: label

@staticmethod
def __time_now():
return time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())

# -- table widget part --
def __get_table_list(self, account_name):
content_data = self.__load_table_data(account_name=account_name)
return gui.Table.new_from_list(
content=content_data,
width=self.layout_width,
fill_title=True
) # fill_title: True代表第一行是蓝色, False代表表格内容全部同色

def __load_table_data(self, account_name):
title_cell = ("合约", "当前持仓", "入场价", "目标持仓")
table_data = [title_cell]

if account_name not in self.all_account_data_dictionary.keys():
return table_data

jsonfile = self.all_account_data_dictionary[account_name]
account_data_dic = TQZJsonOperator.tqz_load_jsonfile(jsonfile=jsonfile)

contract_models = ContractModel.dictionary_to_models(data_dictionary=account_data_dic)
for contract_model in contract_models:
empty_cell = [
str(contract_model.name),
str(contract_model.position_model.lots),
str(contract_model.position_model.entry_price),
# str(contract_model.position_model.target_position),
]

target_position = contract_model.position_model.target_position
if (target_position is None) or (0 == target_position):
empty_cell.append("0")

table_data.append(tuple(empty_cell))

return table_data

# -- web refresh part --
@staticmethod
def __is_refresh_time(now_time_second, interval_second):
"""
判断当前时间, 是否应该刷新数据(table_data、table控件、time_label_控件)
:param now_time_second: 当前时间的 秒数
:param interval_second: 刷新一次所用时间
:return: 当前秒数是否应该刷新
"""

if now_time_second % interval_second is 0:
should_refresh = True
else:
should_refresh = False

return should_refresh

def __web_refresh(self, current_account_name):
self.text_input.set_text(self.current_account_name)

self.window.remove_child(self.table)
self.window.remove_child(self.time_label)
self.time_label = self.__get_time_label()
self.table = gui.Table.new_from_list(
content=self.__load_table_data(account_name=current_account_name),
width=self.layout_width,
fill_title=True
)
self.__window_add_subviews(
self.time_label,
self.table,
window=self.window
)

@staticmethod
def __window_add_subviews(*subviews, window):
[window.append(subview) for subview in subviews]
return window


if __name__ == '__main__':
# http://127.0.0.1:8877/
Server(gui_class=MainWeb, update_interval=1, port=8877, start_browser=False) # 参数 update_interval: 程序每1s调用一次 idel() 函数;

 

举报

相关推荐

0 条评论