概要
普段天気予報を確認することが手間で,傘をもっていかず雨が降って濡れてしまうことが多々ありました.
単に天気予報を確認する習慣をつければいいのですが,Webのチェックに手間がかかる上に習慣づけは面倒です.
なのでこれらを解決してくれるものを作りました..
ボタンを押すと外出時に傘が必要かどうかしゃべるシステムを作りました.
上の画像にある黄色いボタンがそれです.
押すとこんな感じになります.
Components
・Raspberry pi 3 B+
・Raspberry pi pico W
主な役割はRaspberry pi pico WがRaspberry pi 3 B+に音声読み上げの指示だし(入力)
Raspberry pi 3 B+が天気データの収集,音声の読み上げ(出力)です.
システム構成概略
最終的にこのシステムは次の図のような構成になります.
青色の矢印がRaspberry pi pico W(以降pico W)で入力(ボタンが押された)時の処理.
赤色の矢印がRaspberry pi 3 B+(以降ラズパイ)側のみでの処理になります.
次のページからはpico W,ラズパイの処理内容を記載します.
実行内容
ラズパイでは次の二つの処理を行っています.
① 2時間おきに天気予報サイトをスクレイピングして音声データを作成する.
② pico wからの信号を受け取り,音声データを読み上げる.
①の処理について
ボタンを押した際に最新の天気予報から傘の必要性を伝えるために,2時間おきに天気予報サイトからデータを取得しています.
天気予報の取得に使用させていただいてるサイトは次のサイトになります.
tenki.jp
このサイトから私の居住地域の天気予報をスクレイピングしてます.
コードは次になります.
# distutils: language=c++
# distutils: extra_compile_args = ["-O3"]
# cython: language_level=3, boundscheck=False, wraparound=False
# cython: cdivision=True
import requests
from bs4 import BeautifulSoup
import os
############################################################################
cpdef scrape():
url='URL here'
page=requests.get(url)
if page.status_code==200:
print('http request successful')
page.encoding=page.apparent_encoding #Requestで日本語を扱えるようにする.
soup=BeautifulSoup(page.text,'lxml') #html構文解析
today_table=soup.find(id='forecast-point-1h-today') #tableのweather取得
weather=today_table.find('tr',class_='weather')
data=weather.find_all('p') #dataに今日のデータ格納
# forecasts={"ko":None,"yowa":None,"ame":None,"tuyo":None,"gou":None}
forecasts={}
check=[0,0,0,0,0]
for i in range(len(data)):
if check[0]==0 and data[i].contents[0]=="小雨":
forecasts["ko"]=i+1
check[0]=1
elif check[1]==0 and data[i].contents[0]=="弱雨":
forecasts["yowa"]=i+1
check[1]=1
elif check[2]==0 and data[i].contents[0]=="雨":
forecasts["ame"]=i+1
check[2]=1
elif check[3]==0 and data[i].contents[0]=="強雨":
forecasts["tuyo"]=i+1
check[3]=1
elif check[4]==0 and data[i].contents[0]=="豪雨":
forecasts["gou"]=i+1
check[4]=1
if check[0]==0:
forecasts["ko"]=None
check[0]=1
if check[1]==0:
forecasts["yowa"]=None
check[1]=1
if check[2]==0:
forecasts["ame"]=None
check[2]=1
if check[3]==0:
forecasts["tuyo"]=None
check[3]=1
if check[4]==0:
forecasts["gou"]=None
check[4]=1
return forecasts
開発初期段階ではボタンを押したタイミングでスクレイピングを行う予定でした.
そこでスクレイピングの高速化を図るためにCythonを使いました.
しかしながら,Cythonを用いてもPython Libraryをどうにかしない限りは大した高速化はできませんでした.
(CythonとしてビルドしたとしてもPython Libraryを用いている限り,コードの処理時間はPython Libraryに依存していました.
つまりCythonで高速化を図るならCythonコード内で処理が完結している必要がありそう.)
なのでここでCython用に最初の4行を記載していますが,実際はあまり実行時間は変わっていません(悲しい).
今回はCythonはとりあえず使ってみるという感じで取り入れています....
次が上のコードをCython用にビルドして音声データに直しているコードです.
import socket
import gc
import time
from datetime import datetime
################# 音声ファイル作成 #######################
import os
from gtts import gTTS
def make_audio(text):
japanese=text
tts = gTTS(japanese, lang='ja')
tts.save("readaloud.mp3")
print("making audio finish")
#########################################################
# Scraping cython使ってますがあまり早くなってません.
#########################################################
import scraping
def make_text():
result=scraping.scrape()
print("scraping finish")
text_time=[]
text_umb=None
if (result['yowa']!=None or result['ame']!=None or result['tuyo']!=None or result['gou']!=None):
text_umb="今日は傘を持っていきましょう."
elif result['ko']!=None and (result['yowa']==None and result['ame']==None and result['tuyo']==None and result['gou']==None):
text_umb="今日は折り畳み傘を持っていきましょう."
elif not any(result):
text_umb="今日は傘を持って行かなくて大丈夫です."
keys=list(result.keys())
now_hour=int(datetime.now().hour)
for key in keys:
if result[key]!=None:
#現在以降の天気を反映させる
if key=='ko':
if result['ko']>=now_hour:
time=str(result['ko'])+"時から小雨."
elif key=='yowa':
if result['yowa']>=now_hour:
time=str(result['yowa'])+"時から弱雨."
elif key=='ame':
if result['ame']>=now_hour:
time=str(result['ame'])+"時から雨."
elif key=='tuyo':
if result['tuyo']>=now_hour:
time=str(result['tuyo'])+"時から強い雨."
elif key=='gou':
if result['gou']>=now_hour:
time=str(result['gou'])+"時から豪雨."
text_time.append(time)
if text_umb==None:
text_full=("今日は傘を持っていく必要はありません.")
else:
text_full=text_umb+''.join(text_time)+"です."
print("text making finish")
return text_full
def task1():#audioの作成まで行う.
text_full=make_text()
make_audio(text_full)
gc.collect()#ガベージコレクションの強制実行
import scraping で先ほどのビルドしたコードを取り込んでいます.scraping.scrape()でscrapingライブラリ内の関数を実行しスクレイピングを行っています.
scraping.scrape()を実行すると,tenki.jpから取得したデータ(その日の各時間における天気)が返ってきます.
そこから,当日に雨が降るかを判断し読み上げ用テキストを作成しています.
音声データファイル(.mp3)の作成にはgTTSを使っています.
この音声データの作成には次のサイトを参考にさせていただきました.
Text to Speech テキスト読み上げ
2時間おきに実行する方法
スクレイピングに使用したtenki.jpの際とは2時間おきに更新されるので,新しい音声ファイルも2時間おきに更新することにしました.② の処理について
このページではラズパイ側で行う処理の内,pico wから信号を受信して音声ファイルを実行するまでを記載します.
ラズパイとpico wの間の通信,処理を示したのが次の図になります.
(これ↑作るの大変だった....)
次がラズパイ側で受信,音声ファイル実行しているスクリプトです.
#!/usr/bin/env python3
import socket
import gc
import os
import logging
# ログ設定
# logging.basicConfig(
# filename='/home/kayu/Desktop/weather/main.log', # ログを記録するファイル名
# level=logging.INFO, # 記録するログのレベル(DEBUG, INFO, WARNING, ERROR, CRITICAL)
# format='%(asctime)s - %(levelname)s - %(message)s', # ログメッセージのフォーマット
# )
# logging.info("this is run by bash")
#########################################################
# TCP connection
#########################################################
def pico_connect():
HOST = 'IP adress here' # Raspberry PiのIPアドレス
PORT = port num here # クライアントと同じポート番号
# ソケットの設定
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((HOST, PORT))
# try:
# sock.bind((HOST, PORT))
# except Exception as e:
# logging.info(f"Error binding socket: {e}")
sock.listen(1)
# logging.info("waitng for aconnection")
print('Waiting for a connection...')
# 接続の確立
conn, addr = sock.accept()
print(f'Connected by {addr}')
# logging.info(f'Connected by {addr}')
sock.close()
#########################################################
# Reading aloud cythonで読み上げは遅い
#########################################################
################# Japanese ###################
def read_aloud():
# logging.info("trying to read aloud")
os.system("/usr/bin/mplayer -speed 1.1 -af scaletempo /home/kayu/Desktop/weather/audio/readaloud.mp3")
# logging.info("read aloud end")
# ################# English ###################
# english='Japan\'s Health Ministry updated its Q&A page. You can find answers to such questions as how you can avoid catching/spreading the virus, what is the "cough etiquette". '
# tts = gTTS(english, lang='en')
# tts.save("./audio/english.mp3")
# os.system("mplayer ./audio/english.mp3")
#################################
# 実行
#################################
import schedule
import time
from datetime import datetime
def task2():#接続を待って読み上げまで
pico_connect()
# logging.info("connect successful")
read_aloud()
gc.collect()#ガベージコレクションの強制実行
while True:
# logging.info("task2 start")
task2()
# logging.info("task2 end\n")
# if datetime.now().hour==3:
# break#毎朝3時でmain.py終了
gc.collect()#ガベージコレクションの強制実行
logging.infoで実行時ログファイルが作成されるようにしていますが無視してください.
ラズパイ起動時にスクリプトを実行させる方法
実はここが私が一番つまずいたところです.
製作期間の1/4くらいはこれに時間を取られました.
ここで,行いたいことは "ラズパイを起動しただけで上のスクリプトを自動実行" です.
調べると複数出てくるのですが私の環境(raspberry-pi 3 b+)で動いたものを載せときます.
方法①Cron
はじめ私は一つページ前と同様にCronでの自動実行を試みました.
・超いまさらcron設定を行ってみました
・Raspberry Piでプログラムを自動起動する5種類の方法を比較・解説
ここらへんを参照しました.
Cronのlogファイルを確認するとラズパイ起動時にスクリプト自体は実行されているのですが,音声ファイル読み上げの際にエラーが起きていました.
そのエラーは実行権限に関するものであったので,権限付与を行った後に確認しても同様のエラーでした.
手動でルートから直接このスクリプトを実行した場合,音声ファイルの読み上げはうまくいくのですがCronからの自動実行だと音声出力されないという結果でした.
これ以上進展がなかったので次の手段に移ります.
方法②Systemd
次に試みたのはsystemdによる自動実行です.
・systemdを使ってスクリプト自動起動
・systemdを用いたプログラムの自動起動
同様に参照したものをいくつか挙げときます.
結果諦めました.
この方法ではスクリプトの実行すらかないませんでした.
原因としてはラズパイ内のsystemdを実行するためのフォルダ構成がおかしくなっていたことです.
(具体的にどうおかしくなっていたかは記憶から消えてしまいました.こんな時のためにエラー内容はどっかに保存したほうがよさそうですね.)
つまり,諦めました.
方法③~/.profileに書き込む(成功)
最終的に成功したのはrc.localを試していた時に見つけた~/.profileを利用するものです.
・/etc/rc.local が機能しない時の対処法
このページにある通り,~/.profileは "ユーザーがログインする際に一度だけ実行されるスクリプト" です.
しかしながら直接この~/.profileにpythonファイルの実行を書き込むだけではうまく実行されませんでした.
そこで,run.shファイルを作成しこの.shファイルにpythonファイルの実行を書き込みました.
なので, ~/.profile内に書くのはrun.shを実行するスクリプトです.
これらの書き込みには相対パスではなく,絶対パスを利用することをお勧めします.
回路図
pico w 側の回路図を載せます.(すごい単純)
はんだ付けした結果は次.
回路図と写真で使っているGNDの位置が少し違います.
プログラム
このpico wに必要な機能は次の二つ.
・スイッチが押されたことを検知する
・ラズパイとの通信を行う
プログラムを書くにあたって参照したサイトは次です.
・【Raspberry Pi Pico入門 – 4】スイッチのON/OFFを検出
・Pi Pico W からラズパイにデータを送信して表示
pico w側では躓くことなく開発を進めることができたので特筆することはありません.
次にpico w側で動いているコードを載せます.
import time
import network
import machine
import utime
import socket
HOST='ラズパイip adress'
PORT=port num
# タクトスイッチ(入力)の設定
sw= machine.Pin(19, machine.Pin.IN, machine.Pin.PULL_UP)
led= machine.Pin('LED', machine.Pin.OUT)
led.on()#電源がついたことを知らせる.
time.sleep(1)
led.off()
ssid='wi-fi ssid'
password='wi-fi password'
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
max_wait = 10
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
max_wait -= 1
print('waiting for connection...')
time.sleep(1)
if wlan.status() != 3:
raise RuntimeError('network connection failed')
else:
print('WIFI connected')
status = wlan.ifconfig()
print('ip = ' + status[0])
#ユニキャスト
def com_send():
while True:
led.off()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
while True:
if sw.value() == 0:
led.on()
print("sending message")
message="Read aloud"
# メッセージ送信
sock.send(message.encode('utf-8'))
print("sending message successful")
time.sleep(1)
break
else:
print("waiting for pushing button")
time.sleep(0.2)
# 通信の終了
sock.close()
time.sleep(15)#ラズパイ側が次のソケット開くのを待つ
led.off()
if __name__ == "__main__":
com_send()
書いてある内容も難しくはないので理解いただけると思います.
スイッチが押されると"Read aloud"というテキストがラズパイに送信されるといった感じです.
pico w側はもう書くことがない.....
改善点① 高速化
今回処理を高速化するためにCythonを使用してスクレイピングを行うようにしたが,ライブラリの関係で高速化がうまくいかなかった.
結果として2時間おきにスクレイピングを行う構成にしたため,Cythonを使うメリットがなくなってしまった.
Cythonを使う際に高速化を図るにはライブラリ自体もc/c++で使えるようにビルドする必要がある.(可能かは不明)
改善点② 美しくない(気がする)
今回のシステムを作る上で,ラズパイとpico wでほとんどの時間ソケット接続をしている.
初期段階ではpico wのボタンを押すとpico wの電源が入り,wifi接続からラズパイとの通信までを行わせる予定であった.
しかしながら上記の方法では,ボタンを押してから音声ファイルの読み上げまでに約10秒かかってしまう.
これは私が求める外出直前に傘の必要性を知りたいという要求に十分に応えられていない.(玄関で10秒も待ちたくない)
仕方なく,ソケット接続をしたままボタン押下待機という形をとっているがもっとキレイなやり方がある気がする....
改善点③ 消費電力
今回pico w側はちょうどリモコンのような感覚で扱えるようにしたかった.
そのためにpico wの電力供給方法として電池ボックスを取り付けた.
実際に電池ボックスに新しい電池を入れ,システムを稼働させているとpico w側の電池が半日で尽きてしまった.
もちろん電池自体の質がよくないのもあるだろうが,半日で3.0Vを維持できなくなってしまっていた.
他の方法としてモバイルバッテリーを用いた.
私の持ち合わせのモバイルバッテリーでは途中で電力供給が止まってしまうという問題が発生したため断念した.
(原因はpico wの消費電力がモバイルバッテリーを使用するにあたる消費電力を下回っていたから?なのか..)
最終的に近くのコンセントから電力を引っ張ってくる形となっている.
改善点④ たまに起こるバグ
このシステム自体はうまく動作しているのだが,時々バグ?が起こる.
バグの内容は
”ボタンを押した際に音声ファイルの読み上げが一回だけ行われるはずが,2,3度行われてしまう”
というものだ.
無限にループ再生してしまうといったバグだとスクリプトのwhileなどに問題があるとわかるのだが,繰り返される回数が毎回ランダムな上にボタンを押した回数も関係ないようなのである.
原因としてはpico wのはんだ付けがうまくいってないのではないかと考えている.
まとめ
今回は外出時,手軽に傘の必要性を教えてくれるシステムを作った.
未だ改善点が尽きないが現状求めていた機能はしているのでしばらく様子を見る.
作っていて嫌になることが多々あったが楽しむことができた.
質問があればここより受け付けている.
(やはり記事書くの大変です...)