#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# **************************************************************************** #
# This file is part of the pdpy project: https://github.com/pdpy-org
# Copyright (C) 2022 Fede Camara Halac
# **************************************************************************** #
""" PdPy file to Json-format file """
from ..patching.pdpy import PdPy
from ..connections.edge import Edge
from ..objects.obj import Obj
from ..objects.msg import Msg
from ..objects.comment import Comment
from ..primitives.point import Point
from ..memory.array import Array
from ..utilities.utils import log, tokenize
from ..utilities.regex import *
__all__ = [ 'PdPyParser' ]
LOG = 1
[docs]class PdPyParser(PdPy):
""" Reads the lines from a .pdpy file pointer `fp` and populates a `PdPy` obj
Returns
-------
A PdPy patch objects
Input
-----
1. `.pdpy` file pointer `fp`
2. `pddb` is a json file holding a pure data object database
"""
def __init__(self, pdpy_lines, pddb, **kwargs):
super().__init__(**kwargs)
self.__pdpy__ = 'PdPy'
self.__db__ = pddb
self.__msg__ = [] # to store a message
self.__cmd__ = [] # to store a command ('sinesum', etc)
self.__store_msg__ = False
self.__store_cmd__ = False
self.__arg_number__ = 0 # to store arguments
self.__store_args__ = False
self.__arguments__ = []
self.__last___obj = False
self.__canvases__ = []
self.__last__ = None # the las object
self.__auto_patch__ = True
self.__pdpy_lines__ = pdpy_lines
self.parse()
[docs] def parse(self):
""" Parse the store pdpy file lines """
for i, line in enumerate(self.__pdpy_lines__):
self.__line_num__ = i
log(LOG,"-"*30)
log(LOG,repr(line))
log(LOG,"-"*30)
# Check for comments -----------------------------------------------------
if is_ignored(line):
continue
# Check for root canvas -------------------------------------------------
root, name = is_root(line)
if root:
self.root = self.addRoot()
self.root.name = ' '.join(name) if bool(name) else self.patchname
log(LOG,"Canvas Name:",self.root.name)
if self.root.__cursor__.y > self.root.__margin__.height:
self.root.__margin__.height = self.root.__cursor__.y
self.__canvases__.append(self.root)
continue
# Check for canvas end --------------------------------------------------
if is_root_end(line):
self.__canvases__.pop()
continue
# Check for subpatch ----------------------------------------------------
subpatch, name = is_subpatch(line)
if subpatch:
__canvas__ = self.__get_canvas__()
cnv = self.addCanvas()
cnv.id = self.__obj_idx__
self.__canvas_idx__.append(__canvas__.add(cnv))
if bool(name):
cnv.name = " ".join(name).strip().replace(' ','\ ')
self.__canvases__.append(cnv)
continue
# Check for subpatch end ------------------------------------------------
if is_subpatch_end(line):
piped = is_piped(line)
# check first if pipe is present and pass an outlet
if bool(piped):
if 'outlet' not in self.__canvases__[-1].nodes:
self.__prev__ = self.__last_canvas__().__obj_idx__
self.__last__ = self.objectCreator(Obj, ('outlet'))
self.objectConnector(self.__prev__,self.__last__.id)
# restore the canvas
__canvas__ = self.__last_canvas__()
x, y = __canvas__.get_position()
setattr(__canvas__, "position", Point(x, y))
self.__last_canvas__().title = "pd " + __canvas__.name
self.__depth__ -= 1
if len(self.__canvas_idx__):
self.__canvas_idx__.pop()
__canvas__.__margin__.width += len(__canvas__.name)
__canvas__.__cursor__.y += __canvas__.__box__.height
log(LOG,"restored", __canvas__.name)
# check again and pass arguments to pipe through
if bool(piped):
string = " ".join(piped).strip()
if bool(string):
self.pdpyCreate(' '*len(self.__canvases__)*2 + string)
# parsePdPyLine(' ' * len(canvases)*2 + " ".join(piped).strip())
# patch.objectConnector()
# clear the canvas out of the stack
self.__canvases__.pop()
continue
# Check for pd-comments -------------------------------------------------
comment = is_pdtext(line)
if bool(comment) and isinstance(comment, list):
self.objectCreator(Comment, (" ".join(comment).strip()), root=True)
continue
# Check for pd objects --------------------------------------------------
objects = is_pdobj(line)
if bool(objects) and isinstance(objects, list):
self.pdpyCreate(" ".join(objects).strip())
continue
log(LOG,"parsePdPyLine: Unparsed Lines:", repr(line))
[docs] def objectConnector(self, source=None,sink=None, canvas=None):
__canvas__ = self.__last_canvas__() if canvas is None else canvas
# canvas obj_index advances before this
if source is None: source = __canvas__.__obj_idx__
if sink is None: sink = __canvas__.__obj_idx__ + 1
source_port = sink_port = 0
log(LOG,"objectConnector", source, sink)
pd_edge = (source, source_port, sink, sink_port)
edge = Edge(pd_lines=pd_edge)
__canvas__.edge(edge)
edge.__dumps__()
[docs] def objectCreator(self, objClass, argv, root=False, canvas=None):
__canvas__ = self.__last_canvas__() if canvas is None else canvas
obj = None
log(LOG,"objCreator",argv)
word_length = len(argv) if isinstance(argv,str) else len(" ".join(argv))
x, y = __canvas__.__cursor__.x, __canvas__.__cursor__.y
if not root:
__canvas__.__margin__.width += word_length
else:
__canvas__.__cursor__.y += __canvas__.__box__.height
if objClass is Comment:
obj = Comment()
obj.move(x, y)
obj.addtext(argv)
__canvas__.comment(obj)
else:
self.__obj_idx__ = __canvas__.grow()
if isinstance(argv, str):
pd_lines = [ self.__obj_idx__, x, y, argv ]
else:
pd_lines = [ self.__obj_idx__, x, y ] + argv
obj = objClass(pd_lines=pd_lines)
__canvas__.add(obj)
return obj
[docs] def pdpyCreate(self, string):
""" create pd stuff from pdpy lang
"""
log(LOG,"pdpyCreate",repr(string))
# replace -> with loadbang ------------------------------------------------
string = str(string).replace("->", "loadbang >")
# string = string.replace("<-", "< loadbang")
# tokenize ---------------------------------------------------------------
self.__tokens__ = tokenize(string.strip())
log(LOG,"Tokens are",*self.__tokens__)
# ignore comments ---------------------------------------------------------
c = ('//', '*', '/*', '*/') # comments
sc = lambda c: str(self.__tokens__[0]).startswith(c) # check on first token
if sum(filter(bool,map(sc, c))):
# exit if the line is a comment
return
# check for connect all ---------------------------------------------------
# if no connection token is presnt, then connect the entire lot
k = ("<",">","->","<-") # konnections
no = lambda token: token not in self.__tokens__
self.__k__ = sum(filter(bool,map(no,k)))==len(k) and self.__auto_patch__
log(LOG,"Connect All set to", self.__k__)
# get the previous object index -------------------------------------------
self.__prev__ = self.__last_canvas__().__obj_idx__
# get the object map
iso = lambda x : int(self.__db__.is_obj(x))
self.__isomap__ = list(map(iso, self.__tokens__))
log(LOG, "object map:", self.__isomap__)
# get the iolets map
hio = lambda x, y:self.__db__.has_iolets(x) if y else 0
self.__hiomap__ = list(map(hio, self.__tokens__, self.__isomap__))
log(LOG, "iolet map:", self.__hiomap__)
obj_count = sum(self.__isomap__)
multiobj = obj_count > 1
noobj = obj_count == 0
# arg_map = list(map(lambda x,y:self.arg_count(x) if y else None, self.__tokens__, self.__isomap__))
if not multiobj:
# single object
log(LOG,"is single object", self.__tokens__)
self.__last__ = self.parse_any(0, self.__tokens__[0])
if hasattr(self,"__last__") and self.__last__ is not None:
self.__last__.args = self.__tokens__[1:]
if not self.__last___obj and hasattr(self.__hiomap__[0], 'outlets'):
self.objectConnector()
# if self.__k__:
elif noobj:
# no object
log(LOG,"no objects", self.__tokens__)
else:
log(LOG,"is multi obj", multiobj, self.__tokens__)
for i,t in enumerate(self.__tokens__):
t = str(t).strip()
""" Get argument count from pd databasw
"""
argc = self.__db__.arg_count(t)
self.__store_args__ = argc is not None and self.__isomap__[i]
if self.__store_args__:
self.__arg_number__ = argc
log(LOG,t,"has",self.__arg_number__, "creation arguments.")
""" Pd Argument Parser
"""
log(LOG,"parse_arguments", i, t)
log(LOG,"prev_obj_arg_num", self.__arg_number__)
log(LOG, "is this an obj", self.__isomap__[i])
if self.__arg_number__ and not self.__isomap__[i]:
if hasattr(self,'__last__') and hasattr(self.__last__,'addargs'):
self.__last__.addargs([t])
self.__arg_number__ -= 1
""" Single-word messages
"""
if '->' in t or '<-' in t or not self.__isomap__[i]:
pass
elif ((t.startswith("'") or t.startswith("\"")) and t.endswith("'") or t.endswith("\"")) or t.isnumeric():
self.__last__ = self.objectCreator(Msg, t.replace("'",""))
""" Messages
"""
elif t.startswith("'") or t.startswith("\""):
self.__msg__.append(t.replace("'",""))
self.__store_msg__ = True
""" Commands
"""
elif 'sinesum' in t:
self.__cmd__.append(t)
self.__store_cmd__ = True
elif self.__store_msg__:
self.__msg__.append(t.replace("'",""))
if t.endswith("'") or t.endswith("\""):
self.__store_msg__ = False
self.__last__ = self.objectCreator(Msg, (" ".join(self.__msg__)))
self.__msg__ = []
""" Symbols prepended with '\' become arrays, like sclang
"""
elif t.startswith('\\'):
t = t.replace('\\','')
log(LOG,'symbol', t)
self.__last__ = self.objectCreator(Array, ('array', 'define', '-k', t))
""" Objects
"""
elif self.__isomap__[i]:
log(LOG,"CREATING AN OBJECT", t)
self.__last__ = self.objectCreator(Obj, (t))
else:
log(LOG,"UNKNOWN", t)
""" Connections
"""
obj = repr(t)
if i >= len(self.__tokens__):
log(LOG,"Is last object",t)
elif not self.__isomap__[i]:
log(LOG,"Is not an object",t)
# self.objectConnector(self.__prev__, self.__last__.id)
elif not hasattr(self.__hiomap__[i], 'inlets'):
log(LOG,"IOLETS",self.__hiomap__, t)
if self.__k__: # and i < len(self.__tokens__)-1:
log(LOG,"Connect All!", obj)
if self.__prev__ != -1:
self.objectConnector(self.__prev__, self.__last__.id)
elif self.__last___obj:
log(LOG,"__last___obj", obj, self.__last___obj)
else:
self.objectConnector()
else:
if "->" == t:
log(LOG,"loadbang forward",obj)
# self.__last__ = self.objectCreator(Obj, ("loadbang"))
# self.objectConnector()
# return
elif ">" == t:
log(LOG,"forward",obj)
if self.__store_cmd__:
self.__store_cmd__ = False
self.__last__ = self.objectCreator(Msg,(" ".join(self.__cmd__)))
self.objectConnector(self.__prev__, self.__last__.id)
self.__cmd__ = []
else:
self.objectConnector()
elif "<-" == t:
log(LOG,"loadbang backwards",obj)
# self.__last__ = self.objectCreator(Obj, ("loadbang"))
# self.objectConnector(self.__last__.id,self.__prev__)
# return
elif "<" == t:
log(LOG,"backwards",obj)
self.objectConnector(self.__obj_idx__-1,self.__obj_idx__)