WiFi接続の外部関数
# wifi_connect.py
import network
import time
class WiFiConnector:
def __init__(self, ssid, password, timeout=20, max_retries=3):
"""
Initialize the WiFiConnector with credentials and settings.
Parameters:
ssid (str): Wi-Fi network SSID
password (str): Wi-Fi password
timeout (int): Timeout in seconds per connection attempt
max_retries (int): Number of reconnection retries
"""
self.ssid = ssid
self.password = password
self.timeout = timeout
self.max_retries = max_retries
self.wlan = network.WLAN(network.STA_IF)
def connect(self):
"""Attempts to connect to Wi-Fi once. Returns True if successful."""
try:
self.wlan.active(True)
if not self.wlan.isconnected():
print(f'Connecting to {self.ssid}...')
self.wlan.connect(self.ssid, self.password)
for _ in range(self.timeout):
if self.wlan.isconnected():
break
time.sleep(1)
if self.wlan.isconnected():
print('Connected successfully!')
print('IP address:', self.wlan.ifconfig()[0])
return True
else:
print('Connection failed: timeout.')
return False
except Exception as e:
print('Error during Wi-Fi connection:', str(e))
return False
def reconnect(self):
"""
Tries to reconnect to Wi-Fi, retrying up to max_retries times.
Returns:
bool: True if reconnection succeeded, False otherwise.
"""
print(f'Reconnecting (max {self.max_retries} retries)...')
for attempt in range(1, self.max_retries + 1):
print(f'Attempt {attempt}...')
if self.connect():
return True
time.sleep(2) # wait before retrying
print('Reconnection failed.')
return False
def disconnect(self):
"""Disconnects from the Wi-Fi network."""
if self.wlan.isconnected():
self.wlan.disconnect()
print('Disconnected from Wi-Fi.')
def is_connected(self):
"""Returns True if connected to Wi-Fi."""
return self.wlan.isconnected()
def ip_address(self):
"""Returns the current IP address, or None if not connected."""
if self.wlan.isconnected():
return self.wlan.ifconfig()[0]
return None
プライベート情報
# secrets.py
secrets = {
'ssid': 'SSIDXXX',
'password': 'PASWORDXXX',
'sender_email': 'XXXXXXX@gmail.com',
'sender_name': 'RaspberryPiPico',
'sender_app_password': 'XXXX XXXX XXXX XXXX',
'recipient_email': 'YYYYYYY@gmail.com',
'email_subject': 'Email from RPi Pico',
}
main
このプログラムは、Raspberry Pi Pico Wを使って毎日決まった時刻(19:00)にWi-Fi経由で東京の天気予報を取得し、指定のメールアドレスへ自動で送信するシステムです。
# main.py
import network
import ntptime
import time
import umail
import ubinascii
import urequests as requests
from wifi_connect import WiFiConnector # WiFi接続の外部関数
from secrets import secrets # プライベート情報
# 設定
CITY_ID = "130010" # 東京
SEND_HOUR = 19
SEND_MINUTE = 0
MAX_RETRIES = 3 # リトライ最大回数
# Wi-Fi接続
wifi = WiFiConnector(secrets['ssid'], secrets['password'])
if not wifi.connect():
wifi.reconnect()
if not wifi.is_connected():
raise RuntimeError('Wi-Fi 接続失敗')
print("Wi-Fi 接続完了")
# NTP同期
ntptime.host = "time.cloudflare.com"
try:
ntptime.settime()
print("NTP時刻同期成功")
except:
print("NTP同期失敗")
raise
# ヘッダーエンコード(日本語対応)
def encode_header(header_str):
encoded = ubinascii.b2a_base64(header_str.encode('utf-8')).strip()
return '=?UTF-8?B?' + encoded.decode() + '?='
# 天気情報取得(リトライ付き)
def fetch_weather():
url = f"https://weather.tsukumijima.net/api/forecast/city/{CITY_ID}"
for attempt in range(1, MAX_RETRIES + 1):
try:
response = requests.get(url)
weather_json = response.json()
response.close()
return weather_json
except Exception as e:
print(f"[{attempt}] 天気取得失敗: {e}")
time.sleep(3)
raise RuntimeError("天気取得に複数回失敗しました。")
# メール送信(リトライ付き)
def send_email(message, date_str):
for attempt in range(1, MAX_RETRIES + 1):
try:
smtp = umail.SMTP('smtp.gmail.com', 465, ssl=True)
smtp.login(secrets['sender_email'], secrets['sender_app_password'])
smtp.to(secrets['recipient_email'])
smtp.write("From: {} <{}>\r\n".format(encode_header(secrets['sender_name']), secrets['sender_email']))
smtp.write("Subject: {}\r\n".format(encode_header(secrets['email_subject'])))
smtp.write("Content-Type: text/plain; charset=utf-8\r\n")
smtp.write("Content-Transfer-Encoding: 7bit\r\n")
smtp.write("\r\n")
smtp.write("こんにちは!明日の天気予報です。\r\n")
smtp.write(message + "\r\n")
smtp.write("現在日時: " + date_str + "\r\n")
smtp.write("--\r\nRPi Picoより自動送信\r\n")
smtp.send()
smtp.quit()
print("✅ メール送信完了")
return
except Exception as e:
print(f"[{attempt}] メール送信失敗: {e}")
time.sleep(3)
raise RuntimeError("メール送信に複数回失敗しました。")
# メイン処理(天気+メール)
def send_weather_email():
weather_json = fetch_weather()
forecast = weather_json['forecasts'][1]
date_label = forecast['dateLabel']
weather_telop = forecast['telop']
rain_probability = forecast['chanceOfRain']['T12_18']
weather_title = weather_json['title']
if rain_probability == "--%":
weather_msg = f"{weather_title}\n{date_label}の天気データは未確定です。"
else:
weather_msg = f"{weather_title}\n{date_label}の天気は「{weather_telop}」です。\n12時~18時の降水確率は {rain_probability} です。"
now = time.localtime(time.time() + 9 * 60 * 60)
date_str = "{0}/{1:02d}/{2:02d} {3:02d}:{4:02d}:{5:02d}".format(now[0], now[1], now[2], now[3], now[4], now[5])
send_email(weather_msg, date_str)
# 送信フラグ管理
sent_today = False
last_sent_day = -1
# 定時送信用ループ
while True:
now = time.localtime(time.time() + 9 * 60 * 60)
hour = now[3]
minute = now[4]
today = now[2]
if hour == SEND_HOUR and minute == SEND_MINUTE and not sent_today:
try:
send_weather_email()
except Exception as e:
print("❌ 処理失敗:", e)
sent_today = True
last_sent_day = today
if today != last_sent_day:
sent_today = False
time.sleep(30)
コメント