WiFi接続の外部関数を作ってみた

つぶやき

WiFi接続の外部関数

# wifi_connect.py

import network
import time

class WiFiConnector:
    def __init__(self, ssid, password, timeout=20, max_retries=3):
        """
        Initialize the WiFiConnector with credentials and settings.
        
        Parameters:
            ssid (str): Wi-Fi network SSID
            password (str): Wi-Fi password
            timeout (int): Timeout in seconds per connection attempt
            max_retries (int): Number of reconnection retries
        """
        self.ssid = ssid
        self.password = password
        self.timeout = timeout
        self.max_retries = max_retries
        self.wlan = network.WLAN(network.STA_IF)

    def connect(self):
        """Attempts to connect to Wi-Fi once. Returns True if successful."""
        try:
            self.wlan.active(True)

            if not self.wlan.isconnected():
                print(f'Connecting to {self.ssid}...')
                self.wlan.connect(self.ssid, self.password)

                for _ in range(self.timeout):
                    if self.wlan.isconnected():
                        break
                    time.sleep(1)

            if self.wlan.isconnected():
                print('Connected successfully!')
                print('IP address:', self.wlan.ifconfig()[0])
                return True
            else:
                print('Connection failed: timeout.')
                return False

        except Exception as e:
            print('Error during Wi-Fi connection:', str(e))
            return False

    def reconnect(self):
        """
        Tries to reconnect to Wi-Fi, retrying up to max_retries times.

        Returns:
            bool: True if reconnection succeeded, False otherwise.
        """
        print(f'Reconnecting (max {self.max_retries} retries)...')
        for attempt in range(1, self.max_retries + 1):
            print(f'Attempt {attempt}...')
            if self.connect():
                return True
            time.sleep(2)  # wait before retrying
        print('Reconnection failed.')
        return False

    def disconnect(self):
        """Disconnects from the Wi-Fi network."""
        if self.wlan.isconnected():
            self.wlan.disconnect()
            print('Disconnected from Wi-Fi.')

    def is_connected(self):
        """Returns True if connected to Wi-Fi."""
        return self.wlan.isconnected()

    def ip_address(self):
        """Returns the current IP address, or None if not connected."""
        if self.wlan.isconnected():
            return self.wlan.ifconfig()[0]
        return None

プライベート情報

# secrets.py

secrets = {
'ssid': 'SSIDXXX',
'password': 'PASWORDXXX',
'sender_email': 'XXXXXXX@gmail.com',
'sender_name': 'RaspberryPiPico',
'sender_app_password': 'XXXX XXXX XXXX XXXX',
'recipient_email': 'YYYYYYY@gmail.com',
'email_subject': 'Email from RPi Pico',
}

main
このプログラムは、Raspberry Pi Pico Wを使って毎日決まった時刻(19:00)にWi-Fi経由で東京の天気予報を取得し、指定のメールアドレスへ自動で送信するシステムです。

# main.py

import network
import ntptime
import time
import umail
import ubinascii
import urequests as requests

from wifi_connect import WiFiConnector  # WiFi接続の外部関数
from secrets import secrets  # プライベート情報

# 設定
CITY_ID = "130010"  # 東京
SEND_HOUR = 19
SEND_MINUTE = 0
MAX_RETRIES = 3  # リトライ最大回数

# Wi-Fi接続
wifi = WiFiConnector(secrets['ssid'], secrets['password'])
if not wifi.connect():
    wifi.reconnect()
if not wifi.is_connected():
    raise RuntimeError('Wi-Fi 接続失敗')
print("Wi-Fi 接続完了")

# NTP同期
ntptime.host = "time.cloudflare.com"
try:
    ntptime.settime()
    print("NTP時刻同期成功")
except:
    print("NTP同期失敗")
    raise

# ヘッダーエンコード(日本語対応)
def encode_header(header_str):
    encoded = ubinascii.b2a_base64(header_str.encode('utf-8')).strip()
    return '=?UTF-8?B?' + encoded.decode() + '?='

# 天気情報取得(リトライ付き)
def fetch_weather():
    url = f"https://weather.tsukumijima.net/api/forecast/city/{CITY_ID}"
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            response = requests.get(url)
            weather_json = response.json()
            response.close()
            return weather_json
        except Exception as e:
            print(f"[{attempt}] 天気取得失敗: {e}")
            time.sleep(3)
    raise RuntimeError("天気取得に複数回失敗しました。")

# メール送信(リトライ付き)
def send_email(message, date_str):
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            smtp = umail.SMTP('smtp.gmail.com', 465, ssl=True)
            smtp.login(secrets['sender_email'], secrets['sender_app_password'])
            smtp.to(secrets['recipient_email'])

            smtp.write("From: {} <{}>\r\n".format(encode_header(secrets['sender_name']), secrets['sender_email']))
            smtp.write("Subject: {}\r\n".format(encode_header(secrets['email_subject'])))
            smtp.write("Content-Type: text/plain; charset=utf-8\r\n")
            smtp.write("Content-Transfer-Encoding: 7bit\r\n")
            smtp.write("\r\n")
            smtp.write("こんにちは!明日の天気予報です。\r\n")
            smtp.write(message + "\r\n")
            smtp.write("現在日時: " + date_str + "\r\n")
            smtp.write("--\r\nRPi Picoより自動送信\r\n")

            smtp.send()
            smtp.quit()
            print("✅ メール送信完了")
            return
        except Exception as e:
            print(f"[{attempt}] メール送信失敗: {e}")
            time.sleep(3)
    raise RuntimeError("メール送信に複数回失敗しました。")

# メイン処理(天気+メール)
def send_weather_email():
    weather_json = fetch_weather()
    forecast = weather_json['forecasts'][1]
    date_label = forecast['dateLabel']
    weather_telop = forecast['telop']
    rain_probability = forecast['chanceOfRain']['T12_18']
    weather_title = weather_json['title']

    if rain_probability == "--%":
        weather_msg = f"{weather_title}\n{date_label}の天気データは未確定です。"
    else:
        weather_msg = f"{weather_title}\n{date_label}の天気は「{weather_telop}」です。\n12時~18時の降水確率は {rain_probability} です。"

    now = time.localtime(time.time() + 9 * 60 * 60)
    date_str = "{0}/{1:02d}/{2:02d} {3:02d}:{4:02d}:{5:02d}".format(now[0], now[1], now[2], now[3], now[4], now[5])
    send_email(weather_msg, date_str)

# 送信フラグ管理
sent_today = False
last_sent_day = -1

# 定時送信用ループ
while True:
    now = time.localtime(time.time() + 9 * 60 * 60)
    hour = now[3]
    minute = now[4]
    today = now[2]

    if hour == SEND_HOUR and minute == SEND_MINUTE and not sent_today:
        try:
            send_weather_email()
        except Exception as e:
            print("❌ 処理失敗:", e)
        sent_today = True
        last_sent_day = today

    if today != last_sent_day:
        sent_today = False

    time.sleep(30)

コメント