ANNOUNCE: a few pattern management tools

Pixelblaze is great! The problem is, when you have one you soon want another…and when you have more than one, now you have two problems.

I regularly backup my Pixelblazes to disk, and check the pattern code into source control when I’ve made significant changes (and when I remember). But now that I have the same patterns on multiple PBs, I can’t always remember which one is the latest.

Unfortunately, diff-ing the exported pattern files (EPE) doesn’t work because they contain metadata (specifically, a unique ID and a preview JPG) that changes every time the EPE is imported and saved to a new PB, even though the pattern code remains the same.

Loading two patterns into the PB editor, copy-and-paste-ing them each into a text editor, and diff-ing the resulting text files works, but it’s so tedious that I rarely bother. Still, laziness is a great motivator so over the weekend I wrote a utility to compare two patterns and highlight the differences.

For instance, I’ve got two copies of KITT.epe that are not byte-identical, but has anything changed?

$ python3 diffPatterns.py PB1/KITT.epe PB2/KITT.epe

So in this case the pattern code is unchanged between the two files; the only difference is the Id which gets regenerated when the pattern is imported to a new Pixelblaze.

And I’ve got the source code of this Pendulum Waves pattern checked into source control, but is the repository version the same as what’s on the PB?

$ python3 diffPatterns.py Patterns/PendulumWaves.js PB1/PendulumWaves.epe

They’re definitely not the same, so I’ll need to decide which version is most current and update either the PB or the repository.

To update the repository I could open the pattern in the PB editor, copy-and-paste the code into a text editor, and save it as a .js file, but this is the 21st century and I’m lazy so I’ll let $ python3 epe2js.py PB1/PendulumWaves.epe do the extraction and create PendulumWaves.js for me.

The Python source code for diffPatterns is here:

diffPatterns.py
#!/usr/bin/env python3

import os, argparse, pathlib, json, difflib
from prettytable import PrettyTable
from html.parser import HTMLParser
from colorama import init, Fore, Back, Style

# ------------------------------------------------

class TableParser(HTMLParser):
    def __init__(self):
    #Since Python 3, we need to call the __init__() function 
    #of the parent class
        super().__init__()
        self.reset()
        self.table = PrettyTable()
        self.headings = []
        self.columns = []
        self.element = ""
        self.doneHeader = False
 
    #Defining what the methods should output when called by HTMLParser.
    def handle_starttag(self, tag, attrs):
        if tag == 'span':
            for a in attrs:
                if a[0] == 'class' and a[1] == 'diff_chg':
                    self.element += Back.YELLOW
                if a[0] == 'class' and a[1] == 'diff_sub':
                    self.element += Back.RED
                if a[0] == 'class' and a[1] == 'diff_add':
                    self.element += Back.GREEN

    def handle_data(self, data):
        self.element += data.replace(u'\u00A0',' ')
 
    def handle_endtag(self, tag):
        # Capture each heading.
        if tag == 'th':
            self.headings.append(self.element)
            self.element = ''
        # Save the completed set of headings.
        if tag == 'thead':
            self.table.field_names = ['<#', self.headings[1], '>#', self.headings[3]]
            self.table.align['<#'] = 'r'
            self.table.align[self.headings[1]] = 'l'
            self.table.align['>#'] = 'r'
            self.table.align[self.headings[3]] = 'l'
            self.doneHeader = True
        # Capture within-line changes.
        if tag == 'span':
            self.element += Style.RESET_ALL
        # Capture each datum.
        if tag == 'td':
            self.columns.append(self.element)
            self.element = ''
        # Save the completed row of data.
        if tag == 'tr':
            if self.doneHeader:
                self.table.add_row([ self.columns[1], self.columns[2], self.columns[4], self.columns[5] ])
                self.columns.clear()

    def getTable(self):
        return self.table.get_string()

# ------------------------------------------------

# Here's where the magic happens.
if __name__ == "__main__":

    # Parse command line.
    parser = argparse.ArgumentParser()
    parser.add_argument("leftFile", help="The first pattern file to be compared")
    parser.add_argument("rightFile", help="The second pattern file to be compared")
    parser.add_argument("--full", action='store_true', help="Show full files with differences highlighted; otherwise show only N context lines")
    args = parser.parse_args()

    # Check the command-line arguments.
    leftSuffix = pathlib.Path(args.leftFile).suffix.lower()
    rightSuffix = pathlib.Path(args.rightFile).suffix.lower()
    if not (leftSuffix in ['.epe', '.js'] and rightSuffix in ['.epe', '.js']):
        print("ERROR: files to be compared must be either .EPE or .JS")
        exit(-1)

    # Extract the source code from the (.EPE) pattern files.
    leftName = args.leftFile #pathlib.Path(args.leftFile).name
    leftText = ''
    with open(args.leftFile, "r") as left:
        leftBlob = left.read()
        left.close()
        if leftSuffix == '.epe':
            leftJson = json.loads(leftBlob)
            leftText = str.splitlines('Name: ' + leftJson['name'] + '\n' + 'Id: ' + leftJson['id'] + '\n' + leftJson['sources']['main'])
        else:
            leftText = str.splitlines(leftBlob)

    rightName = args.rightFile #pathlib.Path(args.rightFile).name
    rightText = ''
    with open(args.rightFile, "r") as right:
        rightBlob = right.read()
        right.close()
        if rightSuffix == '.epe':
            rightJson = json.loads(rightBlob)
            rightText = str.splitlines('Name: ' + rightJson['name'] + '\n' + 'Id: ' + rightJson['id'] + '\n' + rightJson['sources']['main'])
        else:
            rightText = str.splitlines(rightBlob)

    # create a DiffLib object and use it to diff the files.
    width = (os.get_terminal_size()[0] / 2) - 12
    diffResult = difflib.HtmlDiff(wrapcolumn=width,tabsize=4).make_table(leftText, rightText, leftName, rightName, not(args.full), numlines=3)
    p = TableParser()
    p.feed(diffResult)
    print(p.getTable())

    exit()

And the syntax is:

usage: diffPatterns.py [-h] [--full] leftFile rightFile

positional arguments:
  leftFile    The first pattern file to be compared
  rightFile   The second pattern file to be compared

optional arguments:
  -h, --help  show this help message and exit
  --full      Show full files with differences highlighted; otherwise show only N context lines

The source code for epe2js is here:

epe2js.py
#!/usr/bin/env python3

import os, argparse, pathlib, json

# ------------------------------------------------

# Here's where the magic happens.
if __name__ == "__main__":

    # Parse command line.
    parser = argparse.ArgumentParser()
    parser.add_argument("epeFile", help="The pattern file to be converted")
    parser.add_argument("--delete", action='store_true', help="Delete the EPE file after conversion")
    args = parser.parse_args()

    # Extract the source code from the (.EPE) pattern files.
    with open(args.epeFile, "rb") as fileEPE:
        with open(pathlib.Path(args.epeFile).with_suffix('.js'), "w") as fileJS:
            fileJS.write(json.loads(fileEPE.read())['sources']['main'])
            if args.delete:
                os.remove(args.epeFile)
    
    exit()

And the syntax is:

usage: epe2js.py [-h] [--delete] epeFile

positional arguments:
  epeFile     The pattern file to be converted

optional arguments:
  -h, --help  show this help message and exit
  --delete    Delete the EPE file after conversion

Anyone on a Mac or *n*x box should already have all the dependencies to run this; anyone on a PC can use Python for Windows or run it in their favorite Linux distribution using WSL.

6 Likes

Massive applause. Thank you for your efforts on behalf of all of us who develop in this community.

Assuming you’re using git, you could override the diff driver for epe files: Git - gitattributes Documentation

You could also simplify the process by using a clean filter for git. This would pre-process the file prior to be committed into the repo; basically making the content much easier for the diff program to ingest. Something like using jq to process the file and output a format like so:

Name: Test
ID: abc123

/** code here **/

jq -r '"Name: \(.name)\nID: \(.id)\n\n\(.sources.main)"'

3 Likes

And here’s another thing…

I’ve done a lot of command-line tools because they’re so useful for scripting things like a daily copy of Pixelblaze patterns to be included in a backup regime…but there are times when I wish for the point-and-click convenience of a GUI, like when I’ve just finished working on a new pattern and I’m too lazy to type backupPixelblazes --ipAddress=192.168.8.144 --patternName='swirling\ and\ whirling' --noPreviews.

Ideally I’d want a two-column filebrowser like Midnight Commander that would let me:

…but unfortunately, I couldn’t find any tools like that.

1 Like

So I wrote one. This is a virtual filesystem driver for Midnight Commander built using @zranger1’s library for communicating with Pixelblazes. It makes Pixelblazes appear as read-only devices which can be browsed just like a regular disk. The installation is a little more involved than just copying a single file, but I’ll describe what worked for me.

~=~=~=~=~

Midnight Commander comes preinstalled on some *n*x distributions, and is available from the package manager on all the common ones. On a fresh copy of Ubuntu under WSL, I had to install it with:

$ sudo apt-get install mc

The virtual filesystem driver needs Python3 (which is installed by default on recent distributions), but my WSL didn’t have the websocket-client package nor the Python package manager installed, so I first had to do:

$ sudo apt-get install pip3

…to get the package manager, and then:

$ pip3 install websocket-client

…to get the package itself.

Midnight Commander looks for plugins in a particular location; the easiest way to find out where is to ask it:

$ mc -F

…and look in the [User data] section of the output for the location of the ‘extfs.d’ folder. For example, if your username is {yourUsernameHere}:

  Home directory: /home/{yourUsernameHere}
  Profile root directory: /home/{yourUsernameHere}
  [System data]
       Config directory:   /etc/mc/
       Data directory:     /usr/share/mc/
       File extension handlers: /usr/lib/mc/ext.d/
       VFS plugins and scripts: /usr/lib/mc/
           extfs.d:        /usr/lib/mc/extfs.d/
           fish:           /usr/lib/mc/fish/
       [User data]
       Config directory:   /home/{yourUsernameHere}/.config/mc/
       Data directory:     /home/{yourUsernameHere}/.local/share/mc/
           skins:          /home/{yourUsernameHere}/.local/share/mc/skins/
           extfs.d:        /home/{yourUsernameHere}/.local/share/mc/extfs.d/   <== THIS IS THE ONE
           fish:           /home/{yourUsernameHere}/.local/share/mc/fish/
           mcedit macros:  /home/{yourUsernameHere}/.local/share/mc/mc.macros
           mcedit external macros: /home/{yourUsernameHere}/.local/share/mc/mcedit/macros.d/macro.*
       Cache directory:  /home/{yourUsernameHere}/.cache/mc/

In my case, the folder didn’t exist, so I needed to create it:

$ mkdir /home/{yourUsernameHere}/.local/share/mc/extfs.d/

Then, save the following file into the ‘extfs.d’ folder as ‘pb+’ (no suffix):

Save this file as 'pb+'
#!/usr/bin/env python3

import os, argparse, pathlib, urllib, requests, json, struct, time, math 
from pixelblaze import Pixelblaze, PixelblazeEnumerator
from datetime import datetime

# ------------------------------------------------
# USER-MODIFIABLE SETTINGS
scanTime = 3 # how long to listen when scanning for Pixelblazes 
includePatternSettings = False # exponentially increases loading time; don't do it!
debug = False

# ------------------------------------------------
# CONSTANTS
tempFolder = pathlib.Path.home() / '.cache/mc'

# Canonical folder and file names
folderFolders = 'folders'
folderFiles = 'files'
folderConfiguration = 'Configuration'
folderPatterns = 'Patterns'
folderPlaylists = 'Playlists'
filenameDeviceSettings = 'deviceSettings.json'
filenameExpanderSettings = 'expanderSettings.json'
filenameMapperSettings = 'pixelmap.js'
filenamePlaylists = 'defaultPlaylist.json'

# ------------------------------------------------
# LZstring code borrowed from https://github.com/NickWaterton/Pixelblaze-Async/blob/main/pixelblaze_async/lzstring.py

def decompressFromUint8Array(compressed):
    if compressed is None:
        return ""
    if compressed == "":
        return None
        
    resetValue = 128
    dictionary = {}
    enlargeIn = 4
    dictSize = 4
    numBits = 3
    entry = ""
    result = []

    val=compressed[0]
    position=resetValue
    index=1

    for i in range(3):
        dictionary[i] = i

    bits = 0
    maxpower = math.pow(2, 2)
    power = 1

    while power != maxpower:
        resb = val & position
        position >>= 1
        if position == 0:
            position = resetValue
            val = compressed[index]
            index += 1

        bits |= power if resb > 0 else 0
        power <<= 1

    next = bits
    if next == 0:
        bits = 0
        maxpower = math.pow(2, 8)
        power = 1
        while power != maxpower:
            resb = val & position
            position >>= 1
            if position == 0:
                position = resetValue
                val = compressed[index]
                index += 1
            bits |= power if resb > 0 else 0
            power <<= 1
        c = chr(bits)
    elif next == 1:
        bits = 0
        maxpower = math.pow(2, 16)
        power = 1
        while power != maxpower:
            resb = val & position
            position >>= 1
            if position == 0:
                position = resetValue
                val = compressed[index]
                index += 1
            bits |= power if resb > 0 else 0
            power <<= 1
        c = chr(bits)
    elif next == 2:
        return ""

    #print(bits)
    dictionary[3] = c
    w = c
    result.append(c)
    counter = 0
    while True:
        counter += 1
        if index > len(compressed):
            return ""

        bits = 0
        maxpower = math.pow(2, numBits)
        power = 1
        while power != maxpower:
            resb = val & position
            position >>= 1
            if position == 0:
                position = resetValue
                val = compressed[index]
                index += 1
            bits |= power if resb > 0 else 0
            power <<= 1

        c = bits
        if c == 0:
            bits = 0
            maxpower = math.pow(2, 8)
            power = 1
            while power != maxpower:
                resb = val & position
                position >>= 1
                if position == 0:
                    position = resetValue
                    val = compressed[index]
                    index += 1
                bits |= power if resb > 0 else 0
                power <<= 1

            dictionary[dictSize] = chr(bits)
            dictSize += 1
            c = dictSize - 1
            enlargeIn -= 1
        elif c == 1:
            bits = 0
            maxpower = math.pow(2, 16)
            power = 1
            while power != maxpower:
                resb = val & position
                position >>= 1
                if position == 0:
                    position = resetValue
                    val = compressed[index]
                    index += 1
                bits |= power if resb > 0 else 0
                power <<= 1
            dictionary[dictSize] = chr(bits)
            dictSize += 1
            c = dictSize - 1
            enlargeIn -= 1
        elif c == 2:
            return "".join(result)


        if enlargeIn == 0:
            enlargeIn = math.pow(2, numBits)
            numBits += 1

        if c in dictionary:
            entry = dictionary[c]
        else:
            if c == dictSize:
                entry = w + w[0]
            else:
                return None
        result.append(entry)

        # Add w+entry[0] to the dictionary.
        dictionary[dictSize] = w + entry[0]
        dictSize += 1
        enlargeIn -= 1

        w = entry
        if enlargeIn == 0:
            enlargeIn = math.pow(2, numBits)
            numBits += 1

# ------------------------------------------------
# Pixelblaze client library borrowed (and heavily modified) from https://github.com/zranger1/pixelblaze-client/blob/main/pixelblaze/pixelblaze.py

"""
 Copyright 2020 JEM (ZRanger1)

 Permission is hereby granted, free of charge, to any person obtaining a copy of this
 software and associated documentation files (the "Software"), to deal in the Software
 without restriction, including without limitation the rights to use, copy, modify, merge,
 publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons
 to whom the Software is furnished to do so, subject to the following conditions:

 The above copyright notice and this permission notice shall be included in all copies or
 substantial portions of the Software.

 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE
 AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 THE SOFTWARE.
"""
import websocket, socket, json, time, struct, threading

class Pixelblaze:
    ws = None
    connected = False
    flash_save_enabled = False
    default_recv_timeout = 1
    ipAddr = None

    def __init__(self, addr):
        self.open(addr)

    def open(self, addr):
        if self.connected is False:
            uri = "ws://" + addr + ":81"
            self.ws = websocket.create_connection(uri, sockopt=((socket.SOL_SOCKET, socket.SO_REUSEADDR, 1), 
                                                                (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),))
            self.ws.settimeout(self.default_recv_timeout)
            self.ipAddr = addr
            self.connected = True

    def close(self):
        """Close websocket connection"""
        if self.connected is True:
            self.ws.close()
            self.connected = False

    def ws_flush(self):

        self.ws.settimeout(0.1)  
            
        try:
            while True:
                self.ws.recv()                         
        except websocket._exceptions.WebSocketTimeoutException:
            self.ws.settimeout(self.default_recv_timeout)  
        return  

    def ws_recv(self, wantBinary=False):
        result = None
        try:
            while True: 
                result = self.ws.recv()
                if (wantBinary is False) and (type(result) is str): 
                    break
                elif (wantBinary is True) and (result[0] == 0x07): 
                    break
                else:
                    result = None 

        except websocket._exceptions.WebSocketTimeoutException:
            return None 

        except websocket._exceptions.WebSocketConnectionClosedException:
            self.connected = False
            raise

        return result

    def send_string(self, cmd):
        self.ws.send(cmd.encode("utf-8"))

    def getHardwareConfig(self):
        self.ws_flush() 
        self.send_string('{"getConfig": true}')
        hw = dict()

        p1 = self.ws_recv()
        while p1 is not None:
            p2 = json.loads(p1)
            hw = {**hw, **p2}
            p1 = self.ws_recv()

        return hw

    def getPatternList(self):
        patterns = dict()
        self.ws_flush() 
        self.send_string("{ \"listPrograms\" : true }")

        frame = self.ws_recv(True)
        while frame is not None:
            listFrame = frame[2:].decode("utf-8")
            listFrame = listFrame.split("\n")
            listFrame = [m.split("\t") for m in listFrame]

            for pat in listFrame:
                if len(pat) == 2:
                    patterns[pat[0]] = pat[1]

            if frame[1] & 0x04:
                break
            frame = self.ws_recv(True)

        return patterns


class PixelblazeEnumerator:
    PORT = 1889
    SYNC_ID = 890
    BEACON_PACKET = 42
    DEVICE_TIMEOUT = 30000
    LIST_CHECK_INTERVAL = 5000

    listTimeoutCheck = 0
    isRunning = False
    threadObj = None
    listener = None
    devices = dict()
    autoSync = False

    def __init__(self, hostIP="0.0.0.0"):
        self.start(hostIP)

    def __del__(self):
        self.stop()

    def _time_in_millis(self):
        return int(round(time.time() * 1000)) % 0xFFFFFFFF

    def _unpack_beacon(self, data):
        return struct.unpack("<LLL", data)

    def start(self, hostIP):
        try:
            self.listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.listener.bind((hostIP, self.PORT))

            self.threadObj = threading.Thread(target=self._listen)
            self.isRunning = True
            self.listTimeoutCheck = 0
            self.threadObj.start()

            return True
        except socket.error as e:
            print(e)
            self.stop()
            return False

    def stop(self):
        """
        Stop listening for datagrams, terminate listener thread and close socket.
        """
        if self.listener is None:
            return
        else:
            self.isRunning = False
            self.threadObj.join()
            time.sleep(0.5)
            self.listener.close()
            self.threadObj = None
            self.listener = None

    def _listen(self):

        while self.isRunning:
            data, addr = self.listener.recvfrom(1024)
            now = self._time_in_millis()

            pkt = self._unpack_beacon(data)
            if pkt[0] == self.BEACON_PACKET:
                self.devices[pkt[1]] = {"address": addr, "timestamp": now, "sender_id": pkt[1], "sender_time": pkt[2]}
                
                if (now - self.listTimeoutCheck) >= self.LIST_CHECK_INTERVAL:
                    newlist = dict()

                    for dev, record in self.devices.items():
                        if (now - record["timestamp"]) <= self.DEVICE_TIMEOUT:
                            newlist[dev] = record

                    self.devices = newlist
                    self.listTimeoutCheck = now
                    
    def getPixelblazeList(self):
        dev = []
        for record in self.devices.values():
            dev.append(record["address"][0]) 
        return dev

# ------------------------------------------------

def urlFor(host, pathFragment):
    return urllib.parse.urlunsplit(('http', host, pathFragment, None, None))

# ------------------------------------------------

def loadSchema(useCache):

    cacheFilename = str(tempFolder / 'pbCache.json')
    cache = { 
    }

    if useCache:
        with open(cacheFilename, "r") as fp:
            cache = json.loads(fp.read())

    else:
        pbList = PixelblazeEnumerator()
        time.sleep(scanTime)
        pbList.stop()

        for pixelblazeIP in pbList.getPixelblazeList():

            try:
                pb = Pixelblaze(pixelblazeIP)

                hardwareConfig = pb.getHardwareConfig()
                pixelblazeName = safeFilename(hardwareConfig['name'])

                cache[pixelblazeName] = { 'ipAddress': pixelblazeIP, folderFolders: { } }
                
                cache[pixelblazeName][folderFolders][folderConfiguration] = { folderFiles: { } }
                cache[pixelblazeName][folderFolders][folderPatterns] = { folderFiles: { } }
                cache[pixelblazeName][folderFolders][folderPlaylists] = { folderFiles: { } }

                cache[pixelblazeName][folderFolders][folderConfiguration][folderFiles][filenameDeviceSettings] = urlFor(pixelblazeIP, '/config.json')

                pixelmap = urlFor(pixelblazeIP, '/pixelmap.txt')
                if getSizeFromPixelblaze(pixelmap):
                    cache[pixelblazeName][folderFolders][folderConfiguration][folderFiles][filenameMapperSettings] = pixelmap

                expander = urlFor(pixelblazeIP, '/obconf.dat')
                if hardwareConfig['ledType'] == 5:
                    cache[pixelblazeName][folderFolders][folderConfiguration][folderFiles][filenameExpanderSettings] = expander

                playlist = urlFor(pixelblazeIP, '/l/_defaultplaylist_')
                if getSizeFromPixelblaze(playlist):
                    cache[pixelblazeName][folderFolders][folderPlaylists][folderFiles][filenamePlaylists] = playlist

                for patternId, patternName in pb.getPatternList().items():
                    pattern = urlFor(pixelblazeIP, '/p/' + patternId)
                    cache[pixelblazeName][folderFolders][folderPatterns][folderFiles][safeFilename(patternName) + '.js'] = pattern
                    if includePatternSettings:
                        patternSettings = pattern + '.c'
                        if getSizeFromPixelblaze(patternSettings):
                            cache[pixelblazeName][folderFolders][folderPatterns][folderFiles][safeFilename(patternName) + '.json'] = patternSettings

                pb.close()

            except Exception as e:
                logToFile('loadSchema: Error fetching Pixelblaze details from ' + pixelblazeIP + ": " + repr(e))

        with open(cacheFilename, "w") as fp:
            json.dump(cache, fp)

    return cache

# ------------------------------------------------

def logToFile(message):
    if debug:
        logFilename = str(tempFolder / 'extfs.log')
        with open(logFilename, mode='a') as logFile:
            logFile.write("[" + datetime.now().strftime("%d-%m-%Y %H:%M") + "] " + message + '\n')

# ------------------------------------------------

def safeFilename(unsafeName):
    return unsafeName.replace("/", "∕")

# ------------------------------------------------

def getSizeFromPixelblaze(url):
    try:
        with requests.get(url, stream=True) as r:
            if r.status_code == 200:
                return r.headers['Content-length']
    except Exception as e:
        logToFile('Error fetching size of ' + url + ": " + repr(e))
        return None

# ------------------------------------------------

def downloadFromPixelblaze(url, filename):
    try:
        #time.sleep(0.2)
        with requests.get(url) as r:
            if r.status_code == 200:
                open(filename, 'wb').write(r.content)
                return True
            else:
                logToFile('downloadFromPixelblaze: GET from url ' + url + ' returned ' + str(r.status_code))
    except Exception as e:
        logToFile('downloadFromPixelblaze: error fetching ' + url + ": " + repr(e))
        return False

# ------------------------------------------------

def convertExpanderFile(inputFilePath, outputFilePath):

    with open(inputFilePath, "rb") as fp:

        binaryData = fp.read(1)
        magicNumber = struct.unpack('<B', binaryData)[0]
        if magicNumber != 5:
            logToFile('decodeExpander: obconf.dat has incorrect magic number: ' + str(magicNumber))
            return None
        
        binaryData = fp.read()
        binarySize = len(binaryData)
        if binarySize % 96 != 0:
            logToFile('decodeExpander: obconf.dat has incorrect length: ' + str(binarySize))
        
        rowSize = 12
        boards = { 'boards': [ ] }
        ledTypes = [ 'notUsed', 'WS2812B', 'drawAll', 'APA102 Data', 'APA102 Clock' ]
        colorOrders = { 0x24: 'RGB', 0x18: 'RBG', 0x09: 'BRG', 0x06: 'BGR', 0x21: 'GRB', 0x12: 'GBR', 0xE4: 'RGBW', 0xE1: 'GRBW' }
        for row in range(binarySize // rowSize):
            offsets = struct.unpack('<4B2H4x', binaryData[(row * rowSize):(row + 1) * rowSize])
            boardAddress = offsets[0] >> 3
            channel = offsets[0] % 8
            ledType = offsets[1]
            numElements = offsets[2]
            colorOrder = offsets[3]
            pixelCount = offsets[4]
            startIndex = offsets[5]
            notUsed = offsets[6:10]

            board = row // 8
            rowNumber = row % 8
            if rowNumber == 0:
                boards['boards'].append( { 'address': boardAddress, 'rows': { } } )
            boards['boards'][board]['rows'][rowNumber] = [ ]
            if ledType == 1 or ledType == 3:
                boards['boards'][board]['rows'][rowNumber].append( { 'channel': channel, 'type': ledTypes[ledType], 'startIndex': startIndex, 'count': pixelCount, 'options': colorOrders[colorOrder] } )
            else:
                boards['boards'][board]['rows'][rowNumber].append( { 'channel': channel, 'type': ledTypes[ledType] } )

        with open(outputFilePath, "w") as fp:
            fp.write(json.dumps(boards, indent=2))

# ------------------------------------------------

def convertPatternFile(inputFilePath, outputFilePath):

    with open(inputFilePath, "rb") as fp:
        binaryData = fp.read()
        header_size = 44
        offsets = struct.unpack('<11I', binaryData[:header_size])
        name_offset = offsets[1]
        name_length = offsets[2]
        jpeg_offset = offsets[3]
        jpeg_length = offsets[4]
        source_offset = offsets[7]
        source_length = offsets[8]
        bytecode_offset = offsets[9]
        bytecode_length = offsets[10]
        
        with open(outputFilePath, "w") as fp:
            epeSource = {}
            epeSource['source'] = json.loads(decompressFromUint8Array(binaryData[source_offset:source_offset+source_length]))
            fp.write(epeSource['source']['main'])

# ------------------------------------------------

def convertPlaylistFile(inputFilePath, outputFilePath):

    with open(inputFilePath, "r") as fp:
        playlist = json.loads(fp.read())

        for item in range(len(playlist['items'])):
            playlist['items'][item]['id'] = 'Test'

        with open(outputFilePath, "w") as fp:
            fp.write(json.dumps(playlist, indent=2))

# ------------------------------------------------

def addDirectoryEntry(itemName, itemSize, itemMeta):
    itemPermissions = ''
    if itemMeta == 'd':
        itemPermissions = itemMeta + 'rwx' + 'r-x' + 'r-x'
    else:
        itemPermissions = itemMeta + "rw-" + "r--" + "r--" 

    itemLinks = "1"
    itemOwner = "wizard"
    itemGroup = "wizard"
    itemTime = datetime.fromtimestamp(0)
    line = itemPermissions + " " + itemLinks + " " + itemOwner + " " + itemGroup + " " + str(itemSize) + " " + itemTime.strftime("%m-%d-%Y %H:%M") + " " + itemName
    print(line)


# EXTFS Command: list archivename
def pbList(args): # archivename
    logToFile("pbList called with archivename=" + args.archivename)

    devices = loadSchema(False)
    for device in devices.keys():
        folders = devices[device][folderFolders]
        addDirectoryEntry(device, len(folders), "d")
        for folder in folders.keys():
            files = folders[folder][folderFiles]
            addDirectoryEntry(str(pathlib.Path(device, folder)), len(files), "d")
            for file in files.keys():
                addDirectoryEntry(str(pathlib.Path(device, folder, file)), 999999, "-")   # ugly kludge; we may need to get the actual filesize to make MC behave itself.

    exit(0)

# ------------------------------------------------

# EXTFS command: "copyout archivename storedfilename extractto"
def pbCopyOut(args): # archivename, storedfilename, extractto
    logToFile("pbCopyOut called with archivename=" + args.archivename + ", storedfilename=" + args.storedfilename + ", extractto=" + args.extractto)

    devices = loadSchema(True)

    segments = args.storedfilename.split('/')
    if len(segments) != 3:
        logToFile('pbCopyOut: couldn''t make sense of storedfilename segments:' + repr(segments))
        exit(1)

    device = segments[0]
    if device not in devices.keys():
        logToFile('pbCopyOut: pixelblazeName ' + device + ' not found in cache ' + repr(devices.items()))
        exit(1)

    folder = segments[1]
    if folder not in devices[device][folderFolders].keys():
        logToFile('pbCopyOut: folder ' + folder + ' not found in cache for pixelblazeName ' + device)
        exit(1)

    file = segments[2]
    if file not in devices[device][folderFolders][folder][folderFiles].keys():
        logToFile('pbCopyOut: file ' + file + ' not found in cache for pixelblazeName ' + device + 'and folder ' + folder)
        exit(1)
    url = devices[device][folderFolders][folder][folderFiles][file]

    if folder == folderConfiguration and file == filenameExpanderSettings:
        tempFilename = str(pathlib.Path(args.extractto).with_suffix('.dat'))
        logToFile('pbCopyOut: downloading from URL ' + url + ' to ' + tempFilename)
        if downloadFromPixelblaze(url, tempFilename):
            logToFile('pbCopyOut: converting ' + tempFilename + ' to ' + args.extractto)
            convertExpanderFile(tempFilename, args.extractto)
            os.remove(tempFilename)
            exit(0)
        else:
            logToFile('pbCopyOut: what happened?')

    elif folder == folderConfiguration and file == filenameDeviceSettings:
        tempFilename = str(pathlib.Path(args.extractto).with_suffix('.tmp'))
        logToFile('pbCopyOut: downloading from URL ' + url + ' to ' + tempFilename)
        if downloadFromPixelblaze(url, tempFilename):
            logToFile('pbCopyOut: converting ' + tempFilename + ' to ' + args.extractto)

            with open(tempFilename, "r") as fp:
                settings = json.loads(fp.read())
                with open(args.extractto, "w") as fp:
                    fp.write(json.dumps(settings, indent=2))

            os.remove(tempFilename)
            exit(0)
        else:
            logToFile('pbCopyOut: what happened?')


    elif folder == folderPlaylists:
        tempFilename = str(pathlib.Path(args.extractto).with_suffix('.tmp'))
        logToFile('pbCopyOut: downloading from URL: ' + url + ' to ' + tempFilename)
        if downloadFromPixelblaze(url, tempFilename):
            logToFile('pbCopyOut: converting ' + tempFilename + ' to: ' + args.extractto)

            with open(tempFilename, "r") as fp:
                playlist = json.loads(fp.read())

                for item in range(len(playlist['items'])):
                    logToFile("folder: " + repr(devices[device][folderFolders][folderPatterns][folderFiles]))
                    for patternName, patternId in devices[device][folderFolders][folderPatterns][folderFiles].items():
                        if patternId.endswith(playlist['items'][item]['id']):
                            playlist['items'][item]['id'] = patternName[:-3]

                with open(args.extractto, "w") as fp:
                    fp.write(json.dumps(playlist, indent=2))

            os.remove(tempFilename)
            exit(0)

    elif folder == folderPatterns and file.endswith('.js'):
        tempFilename = str(pathlib.Path(args.extractto).with_suffix('.bin'))
        logToFile('pbCopyOut: downloading from URL: ' + url + ' to ' + tempFilename)
        if downloadFromPixelblaze(url, tempFilename):
            logToFile('pbCopyOut: converting ' + tempFilename + ' to: ' + args.extractto)
            convertPatternFile(tempFilename, args.extractto)
            os.remove(tempFilename)
            exit(0)
    else:
        logToFile('pbCopyOut: downloading from URL: ' + url + ' to ' + args.extractto)
        if downloadFromPixelblaze(url, args.extractto):
            exit(0)

    exit(1)

# ------------------------------------------------

# EXTFS command: "copyin archivename storedfilename sourcefile"
def pbCopyIn(args):
    exit(1)

# ------------------------------------------------

# EXTFS command: "rm archivename storedfilename"
def pbRm(args):
    exit(1)

# ------------------------------------------------

# EXTFS command: "mkdir archivename dirname"
def pbMkdir(args):
    exit(1)

# ------------------------------------------------

# EXTFS command: "rmdir archivename dirname"
def pbRmdir(args):
    exit(1)

# ------------------------------------------------

# EXTFS command: "run"
def pbRun(args):
    exit(1)

# ------------------------------------------------

# Program start.
if __name__ == "__main__":

    # Parse command line.
    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers()

    # EXTFS command: "list archivename"
    parser_list = subparsers.add_parser('list')
    parser_list.add_argument('archivename', nargs='?', default='*')
    parser_list.set_defaults(func=pbList)

    # EXTFS command: "copyout archivename storedfilename extractto"
    parser_copyout = subparsers.add_parser('copyout')
    parser_copyout.add_argument('archivename', nargs='?', default='*')
    parser_copyout.add_argument('storedfilename', nargs='?', default='*')
    parser_copyout.add_argument('extractto', nargs='?', default='*')
    parser_copyout.set_defaults(func=pbCopyOut)

    # EXTFS command: "copyin archivename storedfilename sourcefile"
    parser_copyin = subparsers.add_parser('copyin')
    parser_copyin.add_argument('archivename', nargs='?', default='*')
    parser_copyin.add_argument('storedfilename', nargs='?', default='*')
    parser_copyin.add_argument('sourcefile', nargs='?', default='*')
    parser_copyin.set_defaults(func=pbCopyIn)

    # EXTFS command: "rm archivename storedfilename"
    parser_rm = subparsers.add_parser('rm')
    parser_rm.add_argument('archivename', nargs='?', default='*')
    parser_rm.add_argument('storedfilename', nargs='?', default='*')
    parser_rm.set_defaults(func=pbRm)

    # EXTFS command: "mkdir archivename dirname"
    parser_mkdir = subparsers.add_parser('mkdir')
    parser_mkdir.add_argument('archivename', nargs='?', default='*')
    parser_mkdir.add_argument('dirname', nargs='?', default='*')
    parser_mkdir.set_defaults(func=pbMkdir)

    # EXTFS command: "rmdir archivename dirname"
    parser_rmdir = subparsers.add_parser('rmdir')
    parser_rmdir.add_argument('archivename', nargs='?', default='*')
    parser_rmdir.add_argument('dirname', nargs='?', default='*')
    parser_rmdir.set_defaults(func=pbRmdir)

    # EXTFS command: "run"
    parser_run = subparsers.add_parser('run')
    parser_run.add_argument('archivename', nargs='?', default='*')
    parser_run.add_argument('dirname', nargs='?', default='*')
    parser_run.set_defaults(func=pbRun)

    # parse the args and call whatever function was selected
    args = parser.parse_args()
    args.func(args)

    # We shouldn't get here, so if we did it's an error.
    exit(1)

And finally, the file needs to be marked as ‘executable’:

$ chmod +x /home/{yourUsernameHere}/.local/share/mc/extfs.d/pb+

~=~=~=~=~

The Pixelblaze filesystem is named “pb://”, so you can specify it as one of the (up to two, both optional) starting folders for Midnight Commander like this:

$ mc pb://

Within Midnight Commander, you can also open the Pixelblaze filesystem in the current pane by entering (in the command prompt at the bottom of the window) the command:

$ cd pb://

If you haven’t used it before, there are some good tutorials here and here that show the many useful things you can do with Midnight Commander.

Enjoy!

9 Likes

That’s the spirit! I was thinking “Hmmm, I could make a FUSE module for that” but an MC plugin will do!

2 Likes

Wow. Wow Wow Wow Wow. Wow. Did I say Wow?

1 Like

I just wanted to say that I finally got around to trying this and it’s super awesome.

Seems to work fine with the new firmware, too!

Nice! That brings back memories of Norton Commander from the 80s.

It’s me again.

With the v3.24 update of the Pixelblaze v3 firmware a few months ago, @wizard added a backup/restore feature to the Settings page that saves patterns and configuration files to a PixelBlazeBackup (.PBB) file. It’s fantastic to have a supported way of saving and restoring the state of a Pixelblaze…but if you have more than one Pixelblaze, it’s still a time-consuming manual process to back them all up regularly. It also doesn’t work on v2 Pixelblazes, of which I have several.

I like things that can be scripted and scheduled, so I wrote a little command-line tool to backup and restore both v2 and v3 Pixelblazes to and from PBB files.

The Python source code for pbbTool is here:

pbbTool.py
import argparse, fnmatch, time, requests, pathlib, math, json, socket, websocket, threading, struct, base64

# ------------------------------------------------

# @zranger1's Pixelblaze client library borrowed (and heavily modified) from https://github.com/zranger1/pixelblaze-client/blob/main/pixelblaze/pixelblaze.py

class Pixelblaze:
    """
    Presents a synchronous interface to a Pixelblaze's websocket API. The constructor takes
    the Pixelblaze's IPv4 address in the usual 12 digit numeric form (192.168.1.xxx)
    and if successful, returns a connected Pixelblaze object.
    
    To control multiple Pixelblazes, create multiple objects.    
    """
    ws = None
    connected = False
    default_recv_timeout = 1
    ipAddr = None
    
    cacheRefreshTime = 0
    cacheRefreshInterval = 1000; # milliseconds used internally
    patternCache = None
    
    def __init__(self, addr):
        """
        Create and open Pixelblaze object. Takes the Pixelblaze's IPv4 address in the
        usual 12 digit numeric form (for example, 192.168.1.xxx) 
        """
        self.setCacheRefreshTime(600)  # seconds used in public api
        self.open(addr)

    def open(self, addr):
        """
        Open websocket connection to given ip address.  Called automatically
        when a Pixelblaze object is created - it is not necessary to
        explicitly call open to connect unless the websocket has been closed by the
        user or by the Pixelblaze.
        """
        if self.connected is False:
            uri = "ws://" + addr + ":81"
            self.ws = websocket.create_connection(uri, sockopt=((socket.SOL_SOCKET, socket.SO_REUSEADDR, 1),
                                                                (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),))
            self.ws.settimeout(self.default_recv_timeout)
            self.ipAddr = addr
            self.connected = True

    def close(self):
        """Close websocket connection"""
        if self.connected is True:
            self.ws.close()
            self.connected = False

    def ws_flush(self):
        """
        Utility method: drain websocket receive buffers. Called to clear out unexpected
        packets before sending requests for data w/send_string(). We do not call it
        before simply sending commands because it has a small performance cost.
        
        This is one of the treacherously "clever" things done to make pixelblaze-client
        work as a synchronous API when the Pixelblaze may be sending out unexpected
        packets or talking to multiple devices.  We do some extra work to make sure
        we're only receiving the packets we want.
        """

        # set very short timeout.
        self.ws.settimeout(0.1)  
            
        # eat packets until we get a timeout exception on recv(), indicating 
        # that there are no more pending packets
        try:
            while True:
                self.ws.recv()                         
        except websocket._exceptions.WebSocketTimeoutException:
            # restore normal timeout when done
            self.ws.settimeout(self.default_recv_timeout)  
        return  

    def ws_recv(self, wantBinary=False, packetType=0x07):
        """
        Utility method: Blocking websocket receive that waits for a packet of a given type
        and gracefully handles errors and stray extra packets. 
        """
        result = None
        try:
            while True:  # loop until we hit timeout or have the packet we want
                result = self.ws.recv()
                if (wantBinary is False) and (type(result) is str):  # JSON string
                    break
                elif (wantBinary is True) and (result[0] == packetType):  # binary packet
                    break
                else:
                    result = None  # unhandled binary - ignore

        except websocket._exceptions.WebSocketTimeoutException:
            return None  # timeout -- we can just ignore this

        except websocket._exceptions.WebSocketConnectionClosedException:
            self.connected = False
            raise

        return result

    def send_string(self, cmd):
        """Utility method: Send string-ized JSON to the pixelblaze"""
        self.ws.send(cmd.encode("utf-8"))

    def waitForEmptyQueue(self, timeout_ms=1000):
        """
        Wait until the Pixelblaze's websocket message queue is empty, or until
        timeout_ms milliseconds have elapsed.  Returns True if an empty queue
        acknowledgement was received, False otherwise.  Throws an exception
        if the socket is disconnected.
        """
        self.ws_flush()
        self.ws.settimeout(timeout_ms / 1000)
        try:
            self.send_string('{"ping": true}')
            result = self.ws.recv()
            self.ws.settimeout(self.default_recv_timeout)
            #return True if ((result is not None) and (result.startswith('{"ack"'))) else False      
            # was failing with "startswith needs string, not int" 
            if result is not None:
                if result[0] == '{':
                    if result.startswith('{"ack"'):
                        return True
            return False
        except websocket._exceptions.WebSocketTimeoutException:
            self.ws.settimeout(self.default_recv_timeout)
          
        return False

    def getHardwareConfig(self):
        """Returns a JSON object containing all the available hardware configuration data"""
        self.ws_flush()  # make sure there are no pending packets    
        self.send_string('{"getConfig": true}')
        hw = dict()

        p1 = self.ws_recv()
        while p1 is not None:
            p2 = json.loads(p1)
            hw = {**hw, **p2}
            p1 = self.ws_recv()

        return hw

    def getPatternList(self,refresh = False):
        """
        Returns a dictionary containing the unique ID and the text name of all
        saved patterns on the Pixelblaze. Normally reads from the cached pattern
        list, which is refreshed every 10 minutes by default.
        
        To force a cache refresh, set the optional "refresh" parameter to True
        
        To change the cache refresh interval, call setCacheRefreshTime(seconds)
        """
        if refresh is True or ((self._time_in_millis() - self.cacheRefreshTime) > self.cacheRefreshInterval):
            self._refreshPatternCache()
            self.cacheRefreshTime = self._time_in_millis()
        
        return self.patternCache
    
    def _clamp(self,n, smallest, largest):
        """
        Utility Method: Why doesn't Python have clamp()?
        """ 
        return max(smallest, min(n, largest))
  
    def _time_in_millis(self):
        """
        Utility Method: Returns current time in milliseconds
        """
        return int(round(time.time() * 1000))
    
    def setCacheRefreshTime(self,seconds):
        """
        Set the interval, in seconds, at which the pattern cache is cleared and
        a new pattern list is loaded from the Pixelblaze.  Default is 600 (10 minutes)
        """        
        # a million seconds is about 277 hours
        # or about 11.5 days.  Probably long enough.
        self.cacheRefreshInterval = self._clamp(seconds,0,1000000) * 1000;      

    def _refreshPatternCache(self):
        """
        Reads a dictionary containing the unique ID and the text name of all
        saved patterns on the Pixelblaze into the pattern cache.
        """
        self.patternCache = dict()
        self.ws_flush()  # make sure there are no pending packets    
        self.send_string("{ \"listPrograms\" : true }")

        # Temporarily increase the timeout to cope with slowed-down Pixelblazes.
        self.ws.settimeout(3 * self.default_recv_timeout)
        frame = self.ws_recv(True)
        self.ws.settimeout(self.default_recv_timeout) 
        while frame is not None:
            listFrame = frame[2:].decode("utf-8")
            listFrame = listFrame.split("\n")
            listFrame = [m.split("\t") for m in listFrame]

            for pat in listFrame:
                if len(pat) == 2:
                    self.patternCache[pat[0]] = pat[1]

            if frame[1] & 0x04:
                break
            frame = self.ws_recv(True)

class PixelblazeEnumerator:
    """
    Listens on a network to detect available Pixelblazes, which the user can then list
    or open as Pixelblaze objects.  Also provides time synchronization services for
    running synchronized patterns on a network of Pixelblazes.
    """
    PORT = 1889
    BEACON_PACKET = 42
    DEVICE_TIMEOUT = 30000
    LIST_CHECK_INTERVAL = 5000

    listTimeoutCheck = 0
    isRunning = False
    threadObj = None
    listener = None
    devices = dict()

    def __init__(self, hostIP="0.0.0.0"):
        """    
        Create an object that listens continuously for Pixelblaze beacon
        packets, maintains a list of Pixelblazes and supports synchronizing time
        on multiple Pixelblazes to allows them to run patterns simultaneously.
        Takes the IPv4 address of the interface to use for listening on the calling computer.
        Listens on all available interfaces if addr is not specified.
        """
        self.start(hostIP)

    def __del__(self):
        self.stop()

    def setDeviceTimeout(self, ms):
        """
        Sets the interval in milliseconds which the enumerator will wait without
        hearing from a device before removing it from the active devices list.
        
        The default timeout is 30000 (30 seconds).
        """
        self.DEVICE_TIMEOUT = ms

    def start(self, hostIP):
        """
        Open socket for listening to Pixelblaze datagram traffic,
        set appropriate options and bind to specified interface and
        start listener thread.
        """
        try:
            self.listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.listener.bind((hostIP, self.PORT))

            self.threadObj = threading.Thread(target=self._listen)
            self.isRunning = True
            self.listTimeoutCheck = 0
            self.threadObj.start()

            return True
        except socket.error as e:
            print(e)
            self.stop()
            return False

    def stop(self):
        """
        Stop listening for datagrams, terminate listener thread and close socket.
        """
        if self.listener is None:
            return
        else:
            self.isRunning = False
            self.threadObj.join()
            time.sleep(0.5)
            self.listener.close()
            self.threadObj = None
            self.listener = None

    def _listen(self):
        """
        Internal Method: Datagram listener thread handler -- loop and listen.
        """
        while self.isRunning:
            data, addr = self.listener.recvfrom(1024)
            now = int(round(time.time() * 1000)) % 0xFFFFFFFF
            
            # check the list periodically,and remove devices we haven't seen in a while
            if (now - self.listTimeoutCheck) >= self.LIST_CHECK_INTERVAL:
                newlist = dict()

                for dev, record in self.devices.items():
                    if (now - record["timestamp"]) <= self.DEVICE_TIMEOUT:
                        newlist[dev] = record

                self.devices = newlist
                self.listTimeoutCheck = now                        

            # when we receive a beacon packet from a Pixelblaze,
            # update device record and timestamp in our device list
            pkt = struct.unpack("<LLL", data)
            if pkt[0] == self.BEACON_PACKET:
                # add pixelblaze to list of devices
                self.devices[pkt[1]] = {"address": addr, "timestamp": now, "sender_id": pkt[1], "sender_time": pkt[2]}

    def getPixelblazeList(self):
        dev = []
        for record in self.devices.values():
            dev.append(record["address"][0])  # just the ip
        return dev

# ------------------------------------------------

class PBB:
    # Members:
    __textData = None
    __fromDevice = None

    # private constructor
    def __init__(self, name, bytes):
        self.__fromDevice = name
        self.__textData = bytes

    # Static methods:
    @staticmethod
    def from_file(path):
        """Creates and returns a new PixelBlazeBackup whose contents are loaded from a file on disk."""
        newOne = object.__new__(PBB)
        with open(path, "r") as file:
            newOne.__init__(pathlib.Path(path).stem, file.read())
        return newOne

    @staticmethod
    def from_pixelblaze(ip_address, verbose = False):
        newOne = object.__new__(PBB)
        fromDevice = ip_address
        archive = {"files": {}}
        #   Get the file list.
        with requests.get(f"http://{ip_address}/list") as rList:
            if rList.status_code == 200:
                filelist = rList.text.split('\n') # returns a number of lines, each containing [filename][tab][size][newline]
            if rList.status_code == 404:
                # If the Pixelblaze doesn't support the "/list" endpoint, get the filelist using WebSocket calls.
                filelist = []
                for filename in ["apple-touch-icon.png", "favicon.ico", "config.json", "obconf.dat", "pixelmap.txt", "pixelmap.dat", "l/_defaultplaylist_"]:
                    filelist.append(f"/{filename}\t0")
                pb = Pixelblaze(ip_address)
                for filename in pb.getPatternList():
                    filelist.append(f"/p/{filename}\t0")    # the pattern blob
                    filelist.append(f"/p/{filename}.c\t0")  # the current value of any (optional) UI controls
                pb.close()
                rList.status_code = 200 # fall through to the next section.
            if rList.status_code == 200:
                filelist.sort()
                for line in filelist:
                    filename = line.split('\t')[0]
                    if len(filename) > 0 and not filename in ['/index.html.gz', '/recovery.html.gz']: 
                        if verbose: print(f"  Downloading {filename}")
                        with requests.get(f"http://{ip_address}{filename}") as rFile:
                            if rFile.status_code == 200:
                                archive['files'][filename] = base64.b64encode(rFile.content).decode('UTF-8')
                                if filename == '/config.json':
                                    fromDevice = json.loads(rFile.content)['name']
                            elif rFile.status_code != 404: print(f"GET {rFile.url} returned {rFile.status_code}")
            else: print(f"GET {rList.url} returned {rList.status_code}")
        newOne.__init__(fromDevice, json.dumps(archive, indent=2))
        return newOne

    # Class properties:
    @property
    def device_name(self):
        """Returns the name of the device from which this PixelBlazeBackup was made."""
        return self.__fromDevice

    @property
    def contents(self):
        """Returns a list of the filenames contained in this PixelBlazeBackup."""
        files = []
        for key in json.loads(self.__textData.encode().decode('utf-8-sig'))['files']:
            files.append(key)
        return files

    def get_file(self, key):
        """Returns the contents of a particular file contained in this PixelBlazeBackup."""
        if pathlib.Path(key).suffix in ['.json', '.txt']:
            return base64.b64decode(json.loads(self.__textData.encode().decode('utf-8-sig'))['files'][key]).decode('UTF-8')
        return base64.b64decode(json.loads(self.__textData.encode().decode('utf-8-sig'))['files'][key])

    # Class methods:
    def to_file(self, path=None):
        """Writes this PixelBlazeBackup to a file on disk."""
        if path is None:
            path = pathlib.Path.cwd().joinpath(self.device_name).with_suffix(".pbb")
        with open(path, "w") as file:
            file.write(self.__textData)

    def to_pixelblaze(self, ip_address, verbose=False):
        """Uploads the contents of this PixelBlazeBackup to the destination Pixelblaze."""
        # Delete all the files that are currently loaded on the Pixelblaze (excepting the WebApp itself).
        with requests.get(f"http://{ip_address}/list") as rList:
            if rList.status_code == 200:
                filelist = rList.text.split('\n') # returns a number of lines, each containing [filename][tab][size][newline]
            if rList.status_code == 404:
                # If the Pixelblaze doesn't support the "/list" endpoint, get the filelist using WebSocket calls.
                filelist = []
                for filename in ["apple-touch-icon.png", "favicon.ico", "config.json", "obconf.dat", "pixelmap.txt", "pixelmap.dat", "l/_defaultplaylist_"]:
                    filelist.append(f"/{filename}\t0")
                pb = Pixelblaze(ip_address)
                for filename in pb.getPatternList():
                    filelist.append(f"/p/{filename}\t0")    # the pattern blob
                    filelist.append(f"/p/{filename}.c\t0")  # the current value of any (optional) UI controls
                pb.close()
                rList.status_code = 200 # fall through to the next section.
            if rList.status_code == 200:
                filelist.sort()
                for line in filelist:
                    filename = line.split('\t')[0]
                    if len(filename) > 0 and not filename in ['/index.html.gz', '/recovery.html.gz']:
                        if verbose: print(f"  Deleting {filename}")
                        with requests.get(f"http://{ip_address}/delete?path={filename}") as rFile:
                            if rFile.status_code not in [200, 404]:
                                rFile.raise_for_status()
            else: 
                print(f"GET {rList.url} returned {rList.status_code}")
                rList.raise_for_status()
        # Upload everything that's in this PixelBlazeBackup to the Pixelblaze.
        for filename in self.contents:
            if verbose: print(f"  Uploading {filename}")
            #print(f"Filename: {filename[1:]}, len: {len(self.get_file(filename))}, type: {type(self.get_file(filename))}, preview: {self.get_file(filename)[:20]}")
            #proxy = { 'http': 'http://192.168.8.116:8888' }
            fileData = {'data': (filename[1:], self.get_file(filename))}
            #with requests.post(f"http://{ip_address}/edit", files=fileData, proxies=proxy) as rPost:
            with requests.post(f"http://{ip_address}/edit", files=fileData) as rPost:
                #print(f"    Headers: {rPost.request.headers}\n    Data: {rPost.request.body[:40]}")
                if rPost.status_code != 200:
                    #print(f"POST {filename} returned {rPost.status_code}")
                    rPost.raise_for_status()
        # Send a reboot command so the Pixelblaze will recognize the new configuration.
        with requests.post(f"http://{ip_address}/reboot") as rReboot:
            if rReboot.status_code not in [200, 404]:
                #print(f"POST {filename} returned {rReboot.status_code}")
                rReboot.raise_for_status()

# ------------------------------------------------

# Here's where the magic happens.
if __name__ == "__main__":

    def backupPBB(ip_address, filename):
        print(f"Backing up {ip_address} to {filename}")
        PBB.from_pixelblaze(ip_address, verbose=args.verbose).to_file(filename)

    def restorePBB(ip_address, filename):
        print(f"Restoring {ip_address} from {filename}")
        PBB.from_file(filename).to_pixelblaze(ip_address, verbose=args.verbose)

    # Create the top-level parser.
    parser = argparse.ArgumentParser(prog='pbbTool')
    # Add common arguments.
    parser.add_argument("--verbose", action='store_true', help="Display debugging output")
    # Create separate subparsers for the commands.
    subparsers = parser.add_subparsers(required=True, dest='command')
    # Create the subparser for the "backup" command.
    parserBackup = subparsers.add_parser('backup', help='backup --ipAddress=* --pbbFile=*')
    parserBackup.add_argument("--ipAddress", default='*', help="The (wildcard-enabled) IP address of the Pixelblaze to backup; defaults to '*'")
    parserBackup.add_argument("--pbbFile", default='*', help="The (wildcard-enabled) filename of the PixelBlazeBackup (PBB) file; defaults to './*'")
    parserBackup.set_defaults(func=backupPBB)
    # Create the subparser for the "restore" command.
    parserRestore = subparsers.add_parser('restore', help='restore --ipAddress=* --pbbFile=*')
    parserRestore.add_argument("--pbbFile", default='*', help="The (wildcard-enabled) filename of the PixelBlazeBackup (PBB) file; defaults to './*'")
    parserRestore.add_argument("--ipAddress", default='*', help="The (wildcard-enabled) IP address of the Pixelblaze to backup; defaults to '*'")
    parserRestore.set_defaults(func=restorePBB)
    # Parse the command line.
    args = parser.parse_args()

    if args.command in ['backup', 'restore']:
        # create a PixelblazeEnumerator object, listen for a couple of seconds, then list the Pixelblazes we found.
        pbList = PixelblazeEnumerator()
        time.sleep(3) # Wait while we listen for Pixelblaze broadcast packets.
        for pixelblazeIP in pbList.getPixelblazeList():
            if (fnmatch.fnmatch(pixelblazeIP, args.ipAddress)):
                try:
                    # Get the device name so we can use it in the PBB filenames.
                    pb = Pixelblaze(pixelblazeIP)
                    pixelblazeName = pb.getHardwareConfig()['name']
                    pb.close()
                    # Substitute in the correct device name for any wildcards in the filename.
                    filename = pathlib.Path(args.pbbFile.replace('*', pixelblazeName)).with_suffix('.pbb')
                    # Call the appropriate routine to backup or restore.
                    args.func(pixelblazeIP, filename)
                except Exception as e:
                    print(f"Error connecting to {pixelblazeIP}: ", repr(e))
        pbList.stop()
    else:
        parser.print_usage()
    exit()

And the syntax is:

usage: pbbTool [-h] [--verbose] {backup,restore} ...

positional arguments:
  {backup,restore}
    backup          backup --ipAddress=* --pbbFile=*
    restore         restore --ipAddress=* --pbbFile=*

optional arguments:
  -h, --help        show this help message and exit
  --verbose         Display debugging output

A few words about the parameters:

  • The default value for ipAddress is the wildcard *, which will match all Pixelblazes visible on the local network. The default value for the input/output pbbFile is the wildcard *, which will be replaced with the device name ( as configured on the Settings page) of the Pixelblaze at the specified ipAddress. If the default values are not overridden, the usual behavior is to backup (or restore) all Pixelblazes to (or from) PBBs named after their device names:

    $ python3 pbbTool.py backup
    Backing up 192.168.1.17 to Pixelblaze_6C7263.pbb
    Backing up 192.168.1.18 to Pixelblaze_71F266.pbb
    Backing up 192.168.1.19 to Pixelblaze_8E2513.pbb
    Backing up 192.168.1.20 to Pixelblaze_EC941A.pbb
    
  • If you only want to backup or restore a single Pixelblaze, specify its ipAddress explicitly:

     $ python3 pbbTool.py restore --ipAddress=192.168.1.17
     Restoring 192.168.1.17 from Pixelblaze_6C7263.pbb`
    
  • If you want to use a different naming scheme, you can specify a different filename (for a single Pixelblaze) or a wildcarded pattern. For example, to include the date in the filename:

    $ python3 pbbTool.py backup --pbbFile=*_20220731
    Backing up 192.168.1.17 to Pixelblaze_6C7263_20220731.pbb
    Backing up 192.168.1.18 to Pixelblaze_71F266_20220731.pbb
    Backing up 192.168.1.19 to Pixelblaze_8E2513_20220731.pbb
    Backing up 192.168.1.20 to Pixelblaze_EC941A_20220731.pbb
    
  • The --verbose argument will output file-by-file progress:

    $ python3 pbbTool.py --verbose backup --ipAddress=192.168.1.17
    Backing up 192.168.1.17 to Pixelblaze_6C7263.pbb
      Downloading /config.json
      Downloading /config2.json
      Downloading /favicon.ico
      Downloading /obconf.dat
      Downloading /p/2tqfTPmMNHYEERyeR
      Downloading /p/6FSASbeCEamY2BXMK
      Downloading /p/z5QCZQmdsuvnYe8qw
      Downloading /p/zMXmv4ZXcWiPtR3bB
      Downloading /pixelmap.dat
      Downloading /pixelmap.txt
    

Anyone on a Mac or *n*x box should already have all the dependencies to run this; anyone on a PC can use Python for Windows or run it in their favorite Linux distribution using WSL.

3 Likes