236 lines
7.0 KiB
Python
236 lines
7.0 KiB
Python
from machine import Pin
|
|
from machine import UART
|
|
import network
|
|
import time
|
|
import socket
|
|
# config option
|
|
|
|
network.hostname("opendocks.local")
|
|
ui = "index.html"
|
|
config = "config.html"
|
|
port = 80 #dont change this shit brakes
|
|
|
|
# ssid and pass
|
|
ssid = 'kainet'
|
|
password = 'peepeepoopoo'
|
|
|
|
page = open(ui, "r")
|
|
html = page.read()
|
|
|
|
# aap commands start here
|
|
class Modes:
|
|
Switch = 0x00
|
|
Voice = 0x01
|
|
Simple = 0x02
|
|
Request = 0x03
|
|
AiR = 0x04
|
|
|
|
|
|
_test_sw_mode_simple = [0xFF, 0x55, 0x03, 0x00, 0x01, 0x02, 0xFA]
|
|
_test_button_up = [0xFF, 0x55, 0x03, 0x02, 0x00, 0x00, 0xFB]
|
|
_test_play = _test_sw_mode_simple + [0xFF, 0x55, 0x03, 0x02, 0x00, 0x01, 0xFA] + _test_button_up
|
|
|
|
# generate a checksum for a request
|
|
def req_checksum(length, mode, command, parameter):
|
|
return 0x100 - ((length + mode + sum(int(a) for a in command) + sum(int(a) for a in parameter)) & 0xff)
|
|
|
|
def make_request(mode: int, command: list[int], parameter=[]):
|
|
length = 1+len(command)+len(parameter)
|
|
ret = []
|
|
|
|
if mode != Modes.Switch: # add mode switch
|
|
ret += make_request(0, [0x01, mode])
|
|
|
|
ret += [0xFF, 0x55, length, mode] + command + parameter + [req_checksum(length, mode, command, parameter)]
|
|
|
|
if mode == Modes.Simple: # add button up
|
|
ret += [0xFF, 0x55, 0x03, 0x02, 0x00, 0x00, 0xFB] # button up
|
|
return ret
|
|
|
|
class Requests:
|
|
#ButtonReleased = make_request(Modes.Simple, [0x0, 0x0])
|
|
PlayToggle = make_request(Modes.Simple, [0x0, 0x01])
|
|
VolUp = make_request(Modes.Simple, [0x0, 0x02])
|
|
VolDown = make_request(Modes.Simple, [0x0, 0x04])
|
|
Next = make_request(Modes.Simple, [0x0, 0x08])
|
|
Prev = make_request(Modes.Simple, [0x0, 0x10])
|
|
NextAlbum = make_request(Modes.Simple, [0x0, 0x20])
|
|
PrevAlbum = make_request(Modes.Simple, [0x0, 0x40])
|
|
Stop = make_request(Modes.Simple, [0x0, 0x80])
|
|
Play = make_request(Modes.Simple, [0x0, 0x0, 0x01])
|
|
Pause = make_request(Modes.Simple, [0x0, 0x0, 0x02])
|
|
MuteToggle = make_request(Modes.Simple, [0x0, 0x0, 0x04])
|
|
NextPlaylist = make_request(Modes.Simple, [0x0, 0x0, 0x20])
|
|
PrevPlaylist = make_request(Modes.Simple, [0x0, 0x0, 0x40])
|
|
ShuffleToggle = make_request(Modes.Simple, [0x0, 0x0, 0x80])
|
|
RepeatToggle = make_request(Modes.Simple, [0x0, 0x0, 0x0, 0x01])
|
|
Off = make_request(Modes.Simple, [0x0, 0x0, 0x0, 0x04])
|
|
On = make_request(Modes.Simple, [0x0, 0x0, 0x0, 0x08])
|
|
Menu = make_request(Modes.Simple, [0x0, 0x0, 0x0, 0x40])
|
|
Ok = make_request(Modes.Simple, [0x0, 0x0, 0x0, 0x80])
|
|
ScrollUp = make_request(Modes.Simple, [0x0, 0x0, 0x0, 0x0, 0x01])
|
|
ScrollDown = make_request(Modes.Simple, [0x0, 0x0, 0x0, 0x0, 0x02])
|
|
|
|
assert Requests.PlayToggle == _test_play, "invalid make_request"
|
|
|
|
# aap commands end here
|
|
|
|
"""
|
|
class AiRRequests:
|
|
GetIpodName = make_request(Modes.AiR, [0x0, 0x14])
|
|
|
|
def read_response(uart):
|
|
head = uart.read(2 + 1)
|
|
length = int(head[2])
|
|
packet = uart.read(length)
|
|
mode = packet[0]
|
|
command = [int(a) for a in packet[1:3]]
|
|
param = packet[3:-1]
|
|
return command, param
|
|
|
|
class AiR:
|
|
@staticmethod
|
|
def GetIpodName(uart):
|
|
uart.write(bytes(AiRRequests.GetIpodName))
|
|
command, param = read_response(uart)
|
|
assert command == [0x0, 0x15]
|
|
assert len(param) == 1
|
|
name = uart.read(int.from_bytes(param, 'little'))
|
|
return name.decode("utf8")
|
|
"""
|
|
|
|
class IPOD:
|
|
def __init__(self, uart) -> None:
|
|
self.uart = uart
|
|
|
|
def _cmd(self, cmd: list[int]):
|
|
self.uart.write(bytes(cmd))
|
|
|
|
def play(self): self._cmd(Requests.PlayToggle)
|
|
def next(self): self._cmd(Requests.Next)
|
|
def prev(self): self._cmd(Requests.Prev)
|
|
def menu(self): self._cmd(Requests.Menu)
|
|
def ok(self): self._cmd(Requests.Ok)
|
|
def volup(self): self._cmd(Requests.VolUp)
|
|
def voldown(self): self._cmd(Requests.VolDown)
|
|
def scrollup(self): self._cmd(Requests.ScrollUp)
|
|
def scrolldown(self): self._cmd(Requests.ScrollDown)
|
|
|
|
def init():
|
|
|
|
|
|
led = Pin("LED", Pin.OUT, value=1)
|
|
|
|
# network.hostname("opendock.local")
|
|
wlan = network.WLAN(network.STA_IF)
|
|
wlan.active(True)
|
|
wlan.config(pm = 0xa11140) # Diable powersave mode
|
|
wlan.connect(ssid, password)
|
|
name = wlan.config("hostname")
|
|
# wlan.ifconfig(('192.168.1.69', '255.255.255.0', '192.168.1.5', '192.168.1.5'))
|
|
|
|
print("hostname is: " + name)
|
|
|
|
def blink_led(frequency = 0.5, num_blinks = 3):
|
|
for _ in range(num_blinks):
|
|
led.on()
|
|
time.sleep(frequency)
|
|
led.off()
|
|
time.sleep(frequency)
|
|
|
|
# Wait for connect or fail
|
|
max_wait = 10
|
|
while max_wait > 0:
|
|
if wlan.status() < 0 or wlan.status() >= 3:
|
|
break
|
|
max_wait -= 1
|
|
print('waiting for connection...')
|
|
time.sleep(1)
|
|
|
|
# Handle connection error
|
|
if wlan.status() != 3:
|
|
blink_led(0.2, 5)
|
|
raise RuntimeError('wifi connection failed')
|
|
else:
|
|
blink_led(1, 2)
|
|
print('connected')
|
|
status = wlan.ifconfig()
|
|
print('ip = ' + status[0], ':' ,port ,sep ='')
|
|
|
|
|
|
def main():
|
|
|
|
|
|
# Open socket
|
|
addr = socket.getaddrinfo('0.0.0.0', port)[0][-1]
|
|
|
|
s = socket.socket()
|
|
s.bind(addr)
|
|
s.listen(1)
|
|
|
|
print('listening on', addr)
|
|
|
|
uart = UART(0, 19200)
|
|
uart.init(19200, bits=8, parity=None, stop=1)
|
|
|
|
ipod = IPOD(uart)
|
|
|
|
ipod._cmd(_test_play)
|
|
|
|
actions = {
|
|
"play": ipod.play,
|
|
"next": ipod.next,
|
|
"prev": ipod.prev,
|
|
"volup": ipod.volup,
|
|
"voldown": ipod.voldown,
|
|
"menu": ipod.menu,
|
|
"ok": ipod.ok,
|
|
"scrollup": ipod.scrollup,
|
|
"scrolldown": ipod.scrolldown
|
|
}
|
|
def nop():
|
|
pass
|
|
|
|
# Listen for connections
|
|
while True:
|
|
try:
|
|
cl, addr = s.accept()
|
|
print('Client connected from', addr)
|
|
r = cl.recv(16000)
|
|
lines = r.splitlines()
|
|
req = lines[0].decode("utf8").split(" ")
|
|
|
|
method = req[0]
|
|
path = req[1]
|
|
ver = req[2]
|
|
print(f"method: {method} path: {path}")
|
|
|
|
_search = path.split("?")
|
|
if len(_search) > 1:
|
|
search = _search[1]
|
|
print(f"search: {search}")
|
|
for param in search.split("&"):
|
|
k,v = param.split("=")
|
|
if k == "pod":
|
|
actions.get(v, nop)()
|
|
print(f"executed: {v}")
|
|
|
|
head = 'HTTP/1.0 200 OK\r\n'
|
|
head += "Content-type: text/html\r\n"
|
|
head += f"Content-Length: {len(html)}\r\n"
|
|
head += "\r\n"
|
|
|
|
off = 0
|
|
SEG_SIZE = 1024
|
|
while off < len(html):
|
|
cl.send(html[off:off+SEG_SIZE])
|
|
off += SEG_SIZE
|
|
|
|
cl.close()
|
|
except Exception as e:
|
|
print("Exception", e)
|
|
cl.close()
|
|
print('Connection closed')
|
|
|
|
init()
|
|
main() |