# -*- coding: utf-8 -*-
"""Defines the App class."""
from typing import ( # pylint: disable=unused-import
Any, Callable, List, Optional, Set, Tuple, Union, Dict
)
import os
import json
from itertools import product
import inspect
import shutil
import stat
from collections import namedtuple, defaultdict, OrderedDict
from subprocess import Popen, PIPE, STDOUT
import warnings
from jinja2 import Environment, FileSystemLoader
from bowtie._component import Event, Component, COMPONENT_REGISTRY
from bowtie.exceptions import (
GridIndexError, MissingRowOrColumn, NoSidebarError,
NotStatefulEvent, UsedCellsError, NoUnusedCellsError,
SizeError, WebpackError, YarnError
)
from bowtie.pager import Pager
_Import = namedtuple('_Import', ['module', 'component'])
_Control = namedtuple('_Control', ['instantiate', 'caption'])
_Schedule = namedtuple('_Schedule', ['seconds', 'function'])
_DIRECTORY = 'build'
_WEBPACK = './node_modules/.bin/webpack'
def raise_not_number(x: int) -> None:
"""Raise ``SizeError`` if ``x`` is not a number``."""
try:
float(x)
except ValueError:
raise SizeError('Must pass a number, received {}'.format(x))
class Span:
"""Define the location of a widget."""
# pylint: disable=too-few-public-methods
def __init__(self, row_start: int, column_start: int, row_end: Optional[int] = None,
column_end: Optional[int] = None) -> None:
"""Create a span for a widget.
Indexing starts at 0. Both start and end are inclusive.
Parameters
----------
row_start : int
column_start : int
row_end : int, optional
column_end : int, optional
"""
self.row_start = row_start + 1
self.column_start = column_start + 1
# add 1 to then ends because they start counting from 1
if row_end is None:
self.row_end = self.row_start + 1
else:
self.row_end = row_end + 1
if column_end is None:
self.column_end = self.column_start + 1
else:
self.column_end = column_end + 1
@property
def _key(self) -> Tuple[int, int, int, int]:
return self.row_start, self.column_start, self.row_end, self.column_end
def __hash__(self) -> int:
"""Hash for dict."""
return hash(self._key)
def __eq__(self, other) -> bool:
"""Compare eq for dict."""
# pylint: disable=protected-access
return isinstance(other, type(self)) and self._key == other._key
def __repr__(self) -> str:
"""Show the starting and ending points."""
return '({}, {}) to ({}, {})'.format(
self.row_start,
self.column_start,
self.row_end,
self.column_end
)
[docs]class Size:
"""Size of rows and columns in grid.
This is accessed through ``.rows`` and ``.columns`` from App and View instances.
This uses CSS's minmax function.
The minmax() CSS function defines a size range greater than or equal
to min and less than or equal to max. If max < min, then max is ignored
and minmax(min,max) is treated as min. As a maximum, a <flex> value
sets the flex factor of a grid track; it is invalid as a minimum.
Examples
--------
Laying out an app with the first row using 1/3 of the space
and the second row using 2/3 of the space.
>>> app = App(rows=2, columns=3)
>>> app.rows[0].fraction(1)
1fr
>>> app.rows[1].fraction(2)
2fr
"""
def __init__(self) -> None:
"""Create a default row or column size with fraction = 1."""
self.minimum = None # type: Optional[str]
self.maximum = None # type: Optional[str]
self.fraction(1)
[docs] def auto(self) -> 'Size':
"""Set the size to auto or content based."""
self.maximum = 'auto'
return self
[docs] def min_auto(self) -> 'Size':
"""Set the minimum size to auto or content based."""
self.minimum = 'auto'
return self
[docs] def pixels(self, value) -> 'Size':
"""Set the size in pixels."""
raise_not_number(value)
self.maximum = '{}px'.format(value)
return self
[docs] def min_pixels(self, value) -> 'Size':
"""Set the minimum size in pixels."""
raise_not_number(value)
self.minimum = '{}px'.format(value)
return self
[docs] def ems(self, value) -> 'Size':
"""Set the size in ems."""
raise_not_number(value)
self.maximum = '{}em'.format(value)
return self
[docs] def min_ems(self, value) -> 'Size':
"""Set the minimum size in ems."""
raise_not_number(value)
self.minimum = '{}em'.format(value)
return self
[docs] def fraction(self, value: int) -> 'Size':
"""Set the fraction of free space to use as an integer."""
raise_not_number(value)
self.maximum = '{}fr'.format(int(value))
return self
[docs] def percent(self, value) -> 'Size':
"""Set the percentage of free space to use."""
raise_not_number(value)
self.maximum = '{}%'.format(value)
return self
[docs] def min_percent(self, value) -> 'Size':
"""Set the minimum percentage of free space to use."""
raise_not_number(value)
self.minimum = '{}%'.format(value)
return self
def __repr__(self) -> str:
"""Represent the size to be inserted into a JSX template."""
if self.minimum:
return 'minmax({}, {})'.format(self.minimum, self.maximum)
return self.maximum
[docs]class Gap:
"""Margin between rows or columns of the grid.
This is accessed through ``.row_gap`` and ``.column_gap`` from App and View instances.
Examples
--------
Create a gap of 5 pixels between all rows.
>>> app = App()
>>> app.row_gap.pixels(5)
5px
"""
def __init__(self) -> None:
"""Create a default margin of zero."""
self.gap = None # type: Optional[str]
self.pixels(0)
[docs] def pixels(self, value: int) -> 'Gap':
"""Set the margin in pixels."""
raise_not_number(value)
self.gap = '{}px'.format(value)
return self
[docs] def ems(self, value: int) -> 'Gap':
"""Set the margin in ems."""
raise_not_number(value)
self.gap = '{}em'.format(value)
return self
[docs] def percent(self, value) -> 'Gap':
"""Set the margin as a percentage."""
raise_not_number(value)
self.gap = '{}%'.format(value)
return self
def __repr__(self) -> str:
"""Represent the margin to be inserted into a JSX template."""
return self.gap
def _check_index(value: Optional[int], length: int, bound: bool) -> Optional[int]:
if value is not None:
if not isinstance(value, int):
raise GridIndexError('Indices must be integers, found {}.'.format(value))
if value < 0:
value = value + length
if value < 0 + bound or value >= length + bound:
raise GridIndexError('Index out of range.')
return value
def _slice_to_start_end(slc: slice, length: int) -> Tuple[int, int]:
if slc.step is not None and slc.step != 1:
raise GridIndexError(
'slice step is not supported must be None or 1, was {}'.format(slc.step)
)
start = 0
if slc.start is not None:
start = slc.start
end = length
if slc.stop is not None:
end = slc.stop
return start, end
class Widgets(list):
"""List for storing widgets to override iadd."""
def __iadd__(self, other):
"""Append items to list when adding."""
self.append(other)
return self
def __add__(self, other):
"""Append items to list when adding."""
return self + [other]
[docs]class View:
"""Grid of widgets."""
_NEXT_UUID = 0
@classmethod
def _next_uuid(cls) -> int:
cls._NEXT_UUID += 1
return cls._NEXT_UUID
def __init__(self, rows: int = 1, columns: int = 1, sidebar: bool = True,
background_color: str = 'White') -> None:
"""Create a new grid."""
self._uuid = View._next_uuid()
self.used = OrderedDict(((key, False) for key in product(range(rows), range(columns))))
self.column_gap = Gap()
self.row_gap = Gap()
self.rows = [Size() for _ in range(rows)]
self.columns = [Size() for _ in range(columns)]
self.sidebar = sidebar
self.background_color = background_color
self.packages = set() # type: Set[str]
self.templates = set() # type: Set[str]
self.imports = set() # type: Set[_Import]
self.controllers = [] # type: List[_Control]
self.spans = defaultdict(Widgets) # type: Dict[Span, List[Component]]
@property
def _name(self) -> str:
return 'view{}.jsx'.format(self._uuid)
def _key_to_rows_columns(self, key: Any) -> Tuple[int, int, int, int]:
# FIXME spaghetti code cleanup needed!
if isinstance(key, tuple):
if len(key) == 1:
rows_cols = self._key_to_rows_columns(key[0])
else:
try:
row_key, column_key = key
except ValueError:
raise GridIndexError('Index must be 1 or 2 values, found {}'.format(key))
if isinstance(row_key, int):
row_start = _check_index(row_key, len(self.rows), False)
row_end = row_start + 1
elif isinstance(row_key, slice):
row_start, row_end = _slice_to_start_end(row_key, len(self.rows))
row_start = _check_index(row_start, len(self.rows), False)
row_end = _check_index(row_end, len(self.rows), True)
else:
raise GridIndexError(
'Cannot index with {}, pass in a int or a slice.'.format(row_key)
)
if isinstance(column_key, int):
column_start = _check_index(column_key, len(self.columns), False)
column_end = column_start + 1
elif isinstance(column_key, slice):
column_start, column_end = _slice_to_start_end(column_key, len(self.columns))
column_start = _check_index(column_start, len(self.columns), False)
column_end = _check_index(column_end, len(self.columns), True)
else:
raise GridIndexError(
'Cannot index with {}, pass in a int or a slice.'.format(column_key)
)
rows_cols = row_start, column_start, row_end, column_end
elif isinstance(key, slice):
start, end = _slice_to_start_end(key, len(self.rows))
start = _check_index(start, len(self.rows), False)
end = _check_index(end, len(self.rows), True)
rows_cols = start, 0, end, len(self.columns)
elif isinstance(key, int):
row_start = _check_index(key, len(self.rows), False)
rows_cols = row_start, 0, row_start + 1, len(self.columns)
else:
raise GridIndexError('Invalid index {}'.format(key))
return rows_cols
def __getitem__(self, key):
"""Get item from the view."""
return self.spans[Span(*self._key_to_rows_columns(key))]
def __setitem__(self, key: Any, widget: Component) -> None:
"""Add widget to the view."""
if not isinstance(widget, Widgets):
self._add(widget, *self._key_to_rows_columns(key))
[docs] def add(self, widget: Component) -> None:
"""Add a widget to the grid in the next available cell.
Searches over columns then rows for available cells.
Parameters
----------
widget : bowtie._Component
A Bowtie widget instance.
"""
self._add(widget)
def _add(self, widget: Component, row_start: Optional[int] = None,
column_start: Optional[int] = None, row_end: Optional[int] = None,
column_end: Optional[int] = None) -> None:
"""Add a widget to the grid.
Zero-based index and exclusive.
Parameters
----------
widget : bowtie._Component
A Bowtie widget instance.
row_start : int, optional
Starting row for the widget.
column_start : int, optional
Starting column for the widget.
row_end : int, optional
Ending row for the widget.
column_end : int, optional
Ending column for the widget.
"""
if not isinstance(widget, Component):
raise ValueError('Widget must be a type of Component, found {}'.format(type(widget)))
if row_start is not None and row_end is not None and row_start >= row_end:
raise GridIndexError('row_start: {} must be less than row_end: {}'
.format(row_start, row_end))
if column_start is not None and column_end is not None and column_start >= column_end:
raise GridIndexError('column_start: {} must be less than column_end: {}'
.format(column_start, column_end))
# pylint: disable=protected-access
self.packages.add(widget._PACKAGE)
self.templates.add(widget._TEMPLATE)
self.imports.add(_Import(component=widget._COMPONENT,
module=widget._TEMPLATE[:widget._TEMPLATE.find('.')]))
used_msg = 'Cell at [{}, {}] is already used.'
if row_start is None or column_start is None:
if row_start is not None:
raise MissingRowOrColumn(
'Only row_start was defined. '
'Please specify both column_start and row_start or neither.'
)
if column_start is not None:
raise MissingRowOrColumn(
'Only column_start was defined. '
'Please specify both column_start and row_start or neither.'
)
if row_end is not None or column_end is not None:
raise MissingRowOrColumn(
'If you specify an end index you must '
'specify both row_start and column_start.'
)
row, col = None, None
for (row, col), use in self.used.items():
if not use:
break
else:
raise NoUnusedCellsError()
span = Span(row, col)
self.used[row, col] = True
elif row_end is None and column_end is None:
if self.used[row_start, column_start]:
raise UsedCellsError(used_msg.format(row_start, column_start))
span = Span(row_start, column_start)
self.used[row_start, column_start] = True
else:
if row_end is None:
row_end = row_start + 1
if column_end is None:
column_end = column_start + 1
for row, col in product(range(row_start, row_end),
range(column_start, column_end)):
if self.used[row, col]:
raise UsedCellsError(used_msg.format(row, col))
for row, col in product(range(row_start, row_end),
range(column_start, column_end)):
self.used[row, col] = True
span = Span(row_start, column_start, row_end, column_end)
self.spans[span].append(widget)
caption=getattr(widget, 'caption', None)))
def _render(self, path: str, env: Environment) -> None:
"""TODO: Docstring for _render.
Parameters
----------
path : TODO
Returns
-------
TODO
"""
jsx = env.get_template('view.jsx.j2')
columns = []
if self.sidebar:
columns.append(Size().ems(18))
columns += self.columns
with open(os.path.join(path, self._name), 'w') as f:
f.write(
jsx.render(
uuid=self._uuid,
sidebar=self.sidebar,
columns=columns,
rows=self.rows,
column_gap=self.column_gap,
row_gap=self.row_gap,
background_color=self.background_color,
components=self.imports,
controls=self.controllers,
spans=self.spans
)
)
Route = namedtuple('Route', ['view', 'path', 'exact'])
[docs]class App:
"""Core class to layout, connect, build a Bowtie app."""
def __init__(self, rows: int = 1, columns: int = 1, sidebar: bool = True,
title: str = 'Bowtie App', basic_auth: bool = False,
username: str = 'username', password: str = 'password',
theme: Optional[str] = None, background_color: str = 'White',
host: str = '0.0.0.0', port: int = 9991, socketio: str = '',
debug: bool = False) -> None:
"""Create a Bowtie App.
Parameters
----------
row : int, optional
Number of rows in the grid.
columns : int, optional
Number of columns in the grid.
sidebar : bool, optional
Enable a sidebar for control widgets.
title : str, optional
Title of the HTML.
basic_auth : bool, optional
Enable basic authentication.
username : str, optional
Username for basic authentication.
password : str, optional
Password for basic authentication.
theme : str, optional
Color for Ant Design components.
background_color : str, optional
Background color of the control pane.
host : str, optional
Host IP address.
port : int, optional
Host port number.
socketio : string, optional
Socket.io path prefix, only change this for advanced deployments.
debug : bool, optional
Enable debugging in Flask. Disable in production!
"""
self.background_color = background_color
self.basic_auth = basic_auth
self.debug = debug
self.host = host
self.init = None
self.password = password
self.port = port
self.socketio = socketio
self.schedules = [] # type: List[_Schedule]
self.subscriptions = defaultdict(list) # type: Dict[Event, List[Tuple[List[Event], str]]]
self.pages = {} # type: Dict[Pager, str]
self.title = title
self.username = username
self.uploads = {} # type: Dict[int, str]
self.theme = theme
self.root = View(rows=rows, columns=columns, sidebar=sidebar,
background_color=background_color)
self.routes = [Route(view=self.root, path='/', exact=True)]
self._package_dir = os.path.dirname(__file__)
self._jinjaenv = Environment(
loader=FileSystemLoader(os.path.join(self._package_dir, 'templates')),
trim_blocks=True,
lstrip_blocks=True
)
def __getattr__(self, name: str) -> Union[Gap, List[Size]]:
"""Export attributes from root view."""
if name == 'columns':
return self.root.columns
elif name == 'rows':
return self.root.rows
elif name == 'column_gap':
return self.root.column_gap
elif name == 'row_gap':
return self.root.row_gap
else:
raise AttributeError(name)
def __getitem__(self, key):
"""Get item from root view."""
return self.root.__getitem__(key)
def __setitem__(self, key: Any, value: Component) -> None:
"""Add widget to the root view."""
self.root.__setitem__(key, value)
[docs] def add(self, widget: Component) -> None:
"""Add a widget to the grid in the next available cell.
Searches over columns then rows for available cells.
Parameters
----------
widget : bowtie._Component
A Bowtie widget instance.
"""
self.root.add(widget)
self.root.add_sidebar(widget)
[docs] def add_route(self, view, path, exact=True):
"""Add a view to the app.
Parameters
----------
view : View
path : str
exact : bool, optional
"""
assert path[0] == '/'
for route in self.routes:
assert path != route.path, 'Cannot use the same path twice'
self.routes.append(Route(view=view, path=path, exact=exact))
[docs] def respond(self, pager: Pager, func: Callable) -> None:
"""Call a function in response to a page.
When the pager calls notify, the function will be called.
Parameters
----------
pager : Pager
Pager that to signal when func is called.
func : callable
Function to be called.
Examples
--------
Using the pager to run a callback function.
>>> from bowtie.pager import Pager
>>> app = App()
>>> pager = Pager()
>>> def callback():
... pass
>>> def scheduledtask():
... pager.notify()
>>> app.respond(pager, callback)
"""
self.pages[pager] = func.__name__
[docs] def subscribe(self, func: Callable, event: Event, *events: Event) -> None:
"""Call a function in response to an event.
If more than one event is given, `func` will be given
as many arguments as there are events.
Parameters
----------
func : callable
Function to be called.
event : event
A Bowtie event.
*events : Each is an event, optional
Additional events.
Examples
--------
Subscribing a function to multiple events.
>>> from bowtie.control import Dropdown, Slider
>>> app = App()
>>> dd = Dropdown()
>>> slide = Slider()
>>> def callback(dd_item, slide_value):
... pass
>>> app.subscribe(callback, dd.on_change, slide.on_change)
"""
all_events = [event, *events]
if len(all_events) != len(set(all_events)):
raise ValueError('Subscribed to the same event multiple times. '
'All events must be unique.')
if len(all_events) > 1:
# check if we are using any non stateful events
for evt in all_events:
if evt.getter is None:
msg = '{}.on_{} is not a stateful event. It must be used alone.'
raise NotStatefulEvent(msg.format(evt.uuid, evt.name))
if event.name == 'upload':
if event.uuid in self.uploads:
warnings.warn(
('Overwriting function "{func1}" with function '
'"{func2}" for upload object "{obj}".').format(
func1=self.uploads[event.uuid],
func2=func.__name__,
obj=COMPONENT_REGISTRY[event.uuid]
), Warning)
self.uploads[event.uuid] = func.__name__
for evt in all_events:
self.subscriptions[evt].append((all_events, func.__name__))
[docs] def listen(self, event: Event, *events: Event) -> Callable:
"""Call a function in response to an event.
If more than one event is given, `func` will be given
as many arguments as there are events.
Parameters
----------
event : event
A Bowtie event.
*events : Each is an event, optional
Additional events.
Examples
--------
Subscribing a function to multiple events.
>>> from bowtie.control import Dropdown, Slider
>>> app = App()
>>> dd = Dropdown()
>>> slide = Slider()
>>> @app.listen(dd.on_change, slide.on_change)
... def callback(dd_item, slide_value):
... pass
>>> @app.listen(dd.on_change)
... @app.listen(slide.on_change)
... def callback2(value):
... pass
"""
def decorator(func):
"""Subscribe function to events."""
self.subscribe(func, event, *events)
return func
return decorator
[docs] def load(self, func):
"""Call a function on page load.
Parameters
----------
func : callable
Function to be called.
"""
self.init = func.__name__
[docs] def schedule(self, seconds, func):
"""Call a function periodically.
Parameters
----------
seconds : float
Minimum interval of function calls.
func : callable
Function to be called.
"""
self.schedules.append(_Schedule(seconds, func.__name__))
def _sourcefile(self): # pylint: disable=no-self-use
# [-1] grabs the top of the stack
return os.path.basename(inspect.stack()[-1].filename)[:-3]
def _write_templates(self, notebook: Optional[str] = None) -> Set[str]:
server = self._jinjaenv.get_template('server.py.j2')
indexhtml = self._jinjaenv.get_template('index.html.j2')
indexjsx = self._jinjaenv.get_template('index.jsx.j2')
webpack = self._jinjaenv.get_template('webpack.common.js.j2')
src, app, templates = create_directories()
webpack_path = os.path.join(_DIRECTORY, webpack.name[:-3]) # type: ignore
with open(webpack_path, 'w') as f:
f.write(
webpack.render(color=self.theme)
)
server_path = os.path.join(src, server.name[:-3]) # type: ignore
with open(server_path, 'w') as f:
f.write(
server.render(
socketio=self.socketio,
basic_auth=self.basic_auth,
username=self.username,
password=self.password,
notebook=notebook,
source_module=self._sourcefile() if not notebook else None,
subscriptions=self.subscriptions,
uploads=self.uploads,
schedules=self.schedules,
initial=self.init,
routes=self.routes,
pages=self.pages,
host="'{}'".format(self.host),
port=self.port,
debug=self.debug
)
)
perms = os.stat(server_path)
os.chmod(server_path, perms.st_mode | stat.S_IEXEC)
template_src = os.path.join(self._package_dir, 'src', 'progress.jsx')
shutil.copy(template_src, app)
template_src = os.path.join(self._package_dir, 'src', 'utils.js')
shutil.copy(template_src, app)
for route in self.routes:
for template in route.view.templates:
template_src = os.path.join(self._package_dir, 'src', template)
shutil.copy(template_src, app)
packages = set() # type: Set[str]
for route in self.routes:
route.view._render(app, self._jinjaenv) # pylint: disable=protected-access
packages |= route.view.packages
with open(os.path.join(templates, indexhtml.name[:-3]), 'w') as f: # type: ignore
f.write(
indexhtml.render(
title=self.title,
)
)
with open(os.path.join(app, indexjsx.name[:-3]), 'w') as f: # type: ignore
f.write(
indexjsx.render(
maxviewid=View._NEXT_UUID, # pylint: disable=protected-access
socketio=self.socketio,
pages=self.pages,
routes=self.routes
)
)
return packages
def _build(self, notebook: None = None) -> None:
"""Compile the Bowtie application."""
packages = self._write_templates(notebook=notebook)
if not os.path.isfile(os.path.join(_DIRECTORY, 'package.json')):
packagejson = os.path.join(self._package_dir, 'src/package.json')
shutil.copy(packagejson, _DIRECTORY)
if not os.path.isfile(os.path.join(_DIRECTORY, 'webpack.prod.json')):
webpackprod = os.path.join(self._package_dir, 'src/webpack.prod.js')
shutil.copy(webpackprod, _DIRECTORY)
if not os.path.isfile(os.path.join(_DIRECTORY, 'webpack.dev.json')):
webpackdev = os.path.join(self._package_dir, 'src/webpack.dev.js')
shutil.copy(webpackdev, _DIRECTORY)
if run(['yarn', 'install'], notebook=notebook) > 1:
raise YarnError('Error installing node packages')
packages.discard(None)
if packages:
installed = installed_packages()
new_packages = [x for x in packages if x.split('@')[0] not in installed]
if new_packages:
retval = run(['yarn', 'add'] + new_packages, notebook=notebook)
if retval > 1:
raise YarnError('Error installing node packages')
elif retval == 1:
print('Yarn error but trying to continue build')
retval = run([_WEBPACK, '--config', 'webpack.dev.js'], notebook=notebook)
if retval != 0:
raise WebpackError('Error building with webpack')
def run(command: List[str], notebook: None = None) -> int:
"""Run command from terminal and notebook and view output from subprocess."""
if notebook is None:
return Popen(command, cwd=_DIRECTORY).wait()
else:
cmd = Popen(command, cwd=_DIRECTORY, stdout=PIPE, stderr=STDOUT)
while True:
line = cmd.stdout.readline()
if line == b'' and cmd.poll() is not None:
return cmd.poll()
print(line.decode('utf-8'), end='')
raise Exception()
def installed_packages():
"""Extract installed packages as list from `package.json`."""
with open(os.path.join(_DIRECTORY, 'package.json'), 'r') as f:
packagejson = json.load(f)
return packagejson['dependencies'].keys()
def create_directories() -> Tuple[str, str, str]:
"""Create all the necessary subdirectories for the build."""
src = os.path.join(_DIRECTORY, 'src')
templates = os.path.join(src, 'templates')
app = os.path.join(src, 'app')
os.makedirs(app, exist_ok=True)
os.makedirs(templates, exist_ok=True)
return src, app, templates