Saving lab state
parent
f8ef12064b
commit
e965242715
@ -0,0 +1,23 @@
|
||||
import time
|
||||
|
||||
from dht import DHT11
|
||||
from machine import Pin
|
||||
|
||||
|
||||
sensor1 = DHT11(Pin(2))
|
||||
# sensor2 = DHT11(Pin(2))
|
||||
|
||||
|
||||
while True:
|
||||
data = {
|
||||
"sensor_1": {
|
||||
"temp": sensor1.temperature(),
|
||||
"humidity": sensor1.humidity(),
|
||||
},
|
||||
# "sensor_2": {
|
||||
# "temp": sensor2.temperature(),
|
||||
# "humidity": sensor2.humidity(),
|
||||
# },
|
||||
}
|
||||
print(data)
|
||||
time.sleep(3)
|
@ -0,0 +1,17 @@
|
||||
# Open Water Change Specification
|
||||
|
||||
## Startup
|
||||
|
||||
On startup the Open Water Change(OWC) should check for a config.json file under the root directory. If found it should parse the contents and attempts to connect to wireless access points in the order that they are listed.
|
||||
|
||||
```
|
||||
{
|
||||
"xor_key": 42,
|
||||
access_points: [
|
||||
{
|
||||
"ssid": "whatever",
|
||||
"secret": ""
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
@ -0,0 +1,67 @@
|
||||
# import json
|
||||
# import network
|
||||
# import time
|
||||
|
||||
|
||||
# # def xor_encrypt_decrypt(input_string, key):
|
||||
# # output = "".join(chr(ord(char) ^ key) for char in input_string)
|
||||
# # return output
|
||||
|
||||
|
||||
# # def wifi_connect(ssid, password, timeout=10):
|
||||
# # """
|
||||
# # Connects to the specified Wi-Fi access point.
|
||||
|
||||
# # :param ssid: The SSID of the Wi-Fi network.
|
||||
# # :param password: The password for the Wi-Fi network.
|
||||
# # :param timeout: Time in seconds to wait for connection.
|
||||
# # :return: True if connected, False otherwise.
|
||||
# # """
|
||||
# # wlan = network.WLAN(network.STA_IF)
|
||||
# # wlan.active(True)
|
||||
|
||||
# # if not wlan.isconnected():
|
||||
# # print(f"Connecting to network: {ssid}...")
|
||||
# # wlan.connect(ssid, password)
|
||||
|
||||
# # start_time = time.time()
|
||||
# # while not wlan.isconnected():
|
||||
# # if time.time() - start_time > timeout:
|
||||
# # print("Connection timeout!")
|
||||
# # wlan.active(False)
|
||||
# # del wlan
|
||||
# # return False
|
||||
# # time.sleep(1)
|
||||
|
||||
# # print("Network connected!")
|
||||
# # print("IP Address:", wlan.ifconfig()[0])
|
||||
# # return True
|
||||
|
||||
|
||||
# def wifi_setup():
|
||||
# # try:
|
||||
# # with open("config.json", "r") as f:
|
||||
# # config = json.load(f)
|
||||
|
||||
# # for ap in config.get("access_points", []):
|
||||
# # result = wifi_connect(
|
||||
# # ap["ssid"], xor_encrypt_decrypt(ap["secret"], config["xor_key"])
|
||||
# # )
|
||||
# # if result:
|
||||
# # return
|
||||
|
||||
# # except (OSError, ValueError) as e:
|
||||
# # print("OSError config.json unreadable or no")
|
||||
|
||||
# print("Falling back on selfhosted AP")
|
||||
# ap = network.WLAN(network.AP_IF)
|
||||
# ap.config(essid="RP2040-AP-2", password="testing123")
|
||||
# ap.ifconfig(("192.168.4.1", "255.255.255.0", "192.168.4.1", "192.168.4.1"))
|
||||
# ap.active(True)
|
||||
# print("SSID RP2040-AP")
|
||||
# return
|
||||
|
||||
|
||||
# print("Attempting to connect to wifi")
|
||||
# time.sleep(10)
|
||||
# wifi_setup()
|
@ -0,0 +1,71 @@
|
||||
import network
|
||||
import socket
|
||||
import machine
|
||||
import time
|
||||
|
||||
# Setup RP2040 as an access point
|
||||
ap = network.WLAN(network.AP_IF)
|
||||
ap.config(essid="RP2040-AP-3", password="testing123")
|
||||
ap.ifconfig(("192.168.4.1", "255.255.255.0", "192.168.4.1", "192.168.4.1"))
|
||||
ap.active(True)
|
||||
|
||||
# Initialize onboard LED
|
||||
led = machine.Pin("LED", machine.Pin.OUT, value=1)
|
||||
motor_ctl_a = machine.Pin(16, machine.Pin.OUT, value=0)
|
||||
motor_ctl_b = machine.Pin(17, machine.Pin.OUT, value=0)
|
||||
|
||||
# Create a socket and listen on port 80
|
||||
addr = socket.getaddrinfo("0.0.0.0", 80)[0][-1]
|
||||
# s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s = socket.socket()
|
||||
s.bind(addr)
|
||||
# only allowed to listen to 5 connections in AP mode
|
||||
s.listen(5)
|
||||
|
||||
print("listening on", ap.ifconfig()[0])
|
||||
|
||||
# HTML to send to browsers
|
||||
html = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<body>
|
||||
<h1>Toggle Motor</h1>
|
||||
<button onclick="toggleLED()">Toggle LED</button>
|
||||
<script>
|
||||
function toggleLED() {
|
||||
var xhr = new XMLHttpRequest();
|
||||
xhr.open("POST", "/toggle", true);
|
||||
xhr.send();
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
|
||||
def serve():
|
||||
conn, addr = s.accept()
|
||||
print("client connected from", addr)
|
||||
request = conn.recv(1024)
|
||||
request_str = str(request)
|
||||
print("request:", request_str)
|
||||
|
||||
if "POST /toggle" in request_str:
|
||||
led.toggle()
|
||||
motor_ctl_a.toggle() # Toggle LED state
|
||||
conn.send("HTTP/1.1 200 OK\n\nLED Toggled")
|
||||
else:
|
||||
conn.send("HTTP/1.1 200 OK\n")
|
||||
conn.send("Content-Type: text/html\n")
|
||||
conn.send("Connection: close\n\n")
|
||||
conn.sendall(html) # Send HTML page
|
||||
conn.close()
|
||||
|
||||
|
||||
# Main loop: accept connections and respond
|
||||
for i in range(10):
|
||||
print(f"Preparing to serve...{i}")
|
||||
time.sleep(1)
|
||||
|
||||
while True:
|
||||
serve()
|
@ -0,0 +1,17 @@
|
||||
# Open Water Change Specification
|
||||
|
||||
## Startup
|
||||
|
||||
On startup the Open Water Change(OWC) should check for a config.json file under the root directory. If found it should parse the contents and attempts to connect to wireless access points in the order that they are listed.
|
||||
|
||||
```
|
||||
{
|
||||
"xor_key": 42,
|
||||
access_points: [
|
||||
{
|
||||
"ssid": "whatever",
|
||||
"secret": ""
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
@ -0,0 +1,67 @@
|
||||
import json
|
||||
import network
|
||||
import time
|
||||
|
||||
|
||||
def xor_encrypt_decrypt(input_string, key):
|
||||
output = "".join(chr(ord(char) ^ key) for char in input_string)
|
||||
return output
|
||||
|
||||
|
||||
def wifi_connect(ssid, password, timeout=10):
|
||||
"""
|
||||
Connects to the specified Wi-Fi access point.
|
||||
|
||||
:param ssid: The SSID of the Wi-Fi network.
|
||||
:param password: The password for the Wi-Fi network.
|
||||
:param timeout: Time in seconds to wait for connection.
|
||||
:return: True if connected, False otherwise.
|
||||
"""
|
||||
wlan = network.WLAN(network.STA_IF)
|
||||
wlan.active(True)
|
||||
|
||||
if not wlan.isconnected():
|
||||
print(f"Connecting to network: {ssid}...")
|
||||
wlan.connect(ssid, password)
|
||||
|
||||
start_time = time.time()
|
||||
while not wlan.isconnected():
|
||||
if time.time() - start_time > timeout:
|
||||
print("Connection timeout!")
|
||||
wlan.active(False)
|
||||
del wlan
|
||||
return False
|
||||
time.sleep(1)
|
||||
|
||||
print("Network connected!")
|
||||
print("IP Address:", wlan.ifconfig()[0])
|
||||
return True
|
||||
|
||||
|
||||
def wifi_setup():
|
||||
# try:
|
||||
# with open("config.json", "r") as f:
|
||||
# config = json.load(f)
|
||||
|
||||
# for ap in config.get("access_points", []):
|
||||
# result = wifi_connect(
|
||||
# ap["ssid"], xor_encrypt_decrypt(ap["secret"], config["xor_key"])
|
||||
# )
|
||||
# if result:
|
||||
# return
|
||||
|
||||
# except (OSError, ValueError) as e:
|
||||
# print("OSError config.json unreadable or no")
|
||||
|
||||
print("Falling back on selfhosted AP")
|
||||
ap = network.WLAN(network.AP_IF)
|
||||
ap.config(essid="RP2040-AP", password="openwater")
|
||||
ap.ifconfig(("192.168.4.1", "255.255.255.0", "192.168.4.1", "192.168.4.1"))
|
||||
ap.active(True)
|
||||
print("SSID RP2040-AP")
|
||||
return
|
||||
|
||||
|
||||
print("Attempting to connect to wifi")
|
||||
# time.sleep(10)
|
||||
wifi_setup()
|
@ -0,0 +1,67 @@
|
||||
import network
|
||||
import socket
|
||||
import machine
|
||||
|
||||
|
||||
# Setup RP2040 as an access point
|
||||
# ap = network.WLAN(network.AP_IF)
|
||||
# ap.config(essid="RP2040-AP", password="testing123")
|
||||
# ap.ifconfig(("192.168.4.1", "255.255.255.0", "192.168.4.1", "192.168.4.1"))
|
||||
# ap.active(True)
|
||||
|
||||
# Initialize onboard LED
|
||||
led = machine.Pin("LED", machine.Pin.OUT, value=1)
|
||||
motor_ctl_a = machine.Pin(16, machine.Pin.OUT, value=0)
|
||||
motor_ctl_b = machine.Pin(17, machine.Pin.OUT, value=0)
|
||||
|
||||
# Create a socket and listen on port 80
|
||||
addr = socket.getaddrinfo("0.0.0.0", 80)[0][-1]
|
||||
# s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s = socket.socket()
|
||||
s.bind(addr)
|
||||
# only allowed to listen to 5 connections in AP mode
|
||||
s.listen(5)
|
||||
|
||||
# print("listening on", ap.ifconfig()[0])
|
||||
|
||||
# HTML to send to browsers
|
||||
html = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<body>
|
||||
<h1>Toggle Motor</h1>
|
||||
<button onclick="toggleLED()">Toggle LED</button>
|
||||
<script>
|
||||
function toggleLED() {
|
||||
var xhr = new XMLHttpRequest();
|
||||
xhr.open("POST", "/toggle", true);
|
||||
xhr.send();
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
|
||||
def serve():
|
||||
conn, addr = s.accept()
|
||||
print("client connected from", addr)
|
||||
request = conn.recv(1024)
|
||||
request_str = str(request)
|
||||
print("request:", request_str)
|
||||
|
||||
if "POST /toggle" in request_str:
|
||||
led.toggle()
|
||||
motor_ctl_a.toggle() # Toggle LED state
|
||||
conn.send("HTTP/1.1 200 OK\n\nLED Toggled")
|
||||
else:
|
||||
conn.send("HTTP/1.1 200 OK\n")
|
||||
conn.send("Content-Type: text/html\n")
|
||||
conn.send("Connection: close\n\n")
|
||||
conn.sendall(html) # Send HTML page
|
||||
conn.close()
|
||||
|
||||
|
||||
# Main loop: accept connections and respond
|
||||
while True:
|
||||
serve()
|
Loading…
Reference in New Issue