災害時の電力確保について調べていたところ、電力会社のメーターから使用電力などの情報が抜けることがわかったので、スマートメータのBルートについて解説されたサイトの情報を利用してラズパイで使用電力を蓄積するように構築した。
使用したのは、余ってるラズパイ1のモデルB+とUSB-WiFiとWi-SUNモジュールと呼ばれるスマートメータの920MHz帯で通信ができるUSBドングルである。その他、電力会社へBルート利用申請も必要である。
参照したのは以下のサイト
bey.jp
東京電力からBルート申請のIDとパスワードが届いたので、サイトの内容を利用して積算電力と瞬間電力をDBに書き込むスクリプトをpython3で動くよう作成し、グラフ化してみた。
■ソース(Python3)
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function import sys import serial import time import signal import ctypes from influxdb import InfluxDBClient client = InfluxDBClient ( host = '127.0.0.1', port=8086, username ='****', password ='****', database ='broute') measurement ='power' tags ={'place': 'uni','host': 'RL7023'} def influx_json (field, nowtime='') : json_body = [ { 'measurement': measurement, 'tags': tags, 'fields': field } ] client.write_points(json_body) # Bルート認証ID(東京電力パワーグリッドから郵送で送られてくるヤツ) rbid = "********" # Bルート認証パスワード(東京電力パワーグリッドからメールで送られてくるヤツ) rbpwd = "********" # シリアルポートデバイス名 ser = serial.Serial('/dev/ttyUSB0', 115200) def skterm(): ser.write(str.encode("SKTERM\r\n")) print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック print(ser.readline().decode('utf-8'), end="", flush=True) # EVENT21 print(ser.readline().decode('utf-8'), end="", flush=True) # OK print(ser.readline().decode('utf-8'), end="", flush=True) # ERXUDP print(ser.readline().decode('utf-8'), end="", flush=True) # EVENT27 ser.close print("END") def sig_handler(signum, frame) -> None: sys.exit(1) def main(): signal.signal(signal.SIGTERM, sig_handler) try: # とりあえずバージョンを取得してみる(やらなくてもおk) ser.write(str.encode("SKVER\r\n")) print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック print(ser.readline().decode('utf-8'), end="", flush=True) # バージョン # Bルート認証パスワード設定 ser.write(str.encode("SKSETPWD C " + rbpwd + "\r\n")) print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック print(ser.readline().decode('utf-8'), end="", flush=True) # OKが来るはず(チェック無し) # Bルート認証ID設定 ser.write(str.encode("SKSETRBID " + rbid + "\r\n")) print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック print(ser.readline().decode('utf-8'), end="", flush=True) # OKが来るはず(チェック無し) scanDuration = 4; # スキャン時間。サンプルでは6なんだけど、4でも行けるので。(ダメなら増やして再試行) scanRes = {} # スキャン結果の入れ物 # スキャンのリトライループ(何か見つかるまで) while not 'Channel' in scanRes: # アクティブスキャン(IE あり)を行う # SKSCAN 2 FFFFFFFF 8 ser.write(str.encode("SKSCAN 2 FFFFFFFF " + str(scanDuration) + "\r\n")) # スキャン1回について、スキャン終了までのループ scanEnd = False while not scanEnd : line = ser.readline() print(line.decode('utf-8'), end="") line = line.decode('utf-8') if line.startswith("EVENT 22") : # スキャン終わったよ(見つかったかどうかは関係なく) scanEnd = True elif line.startswith(" ") : # スキャンして見つかったらスペース2個あけてデータがやってくる # 例 # scanRes["Channel"]="33" # scanRes["Channel Page"]="09" # scanRes["Pan ID"]="****" # scanRes["Addr"]="****" # scanRes["LQI"]="27" # scanRes["PairID"]="01055146" cols = line.strip().split(':') scanRes[cols[0]] = cols[1] scanDuration += 1 if 8 < scanDuration : # 引数としては14まで指定できるが、7で失敗したらそれ以上は無駄っぽい print("スキャンリトライオーバー") ser.close sys.exit(1) #### 糸冬了 #### # スキャン結果からChannelを設定。 ser.write(str.encode("SKSREG S2 " + scanRes["Channel"] + "\r\n")) print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック print(ser.readline().decode('utf-8'), end="", flush=True) # OKが来るはず(チェック無し) # スキャン結果からPan IDを設定 ser.write(str.encode("SKSREG S3 " + scanRes["Pan ID"] + "\r\n")) print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック print(ser.readline().decode('utf-8'), end="", flush=True) # OKが来るはず(チェック無し) # MACアドレス(64bit)をIPV6リンクローカルアドレスに変換。 # (BP35A1の機能を使って変換しているけど、単に文字列変換すればいいのではという話も??) ser.write(str.encode("SKLL64 " + scanRes["Addr"] + "\r\n")) print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック ipv6Addr = ser.readline().decode('utf-8').strip() # PANA 接続シーケンスを開始します。 ser.write(str.encode("SKJOIN " + ipv6Addr + "\r\n")) print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック print(ser.readline().decode('utf-8'), end="", flush=True) # OKが来るはず(チェック無し) # PANA 接続完了待ち(10行ぐらいなんか返してくる) bConnected = False while not bConnected : line = ser.readline() print(line.decode('utf-8'), end="", flush=True) if line.startswith(b"EVENT 24") : print("connect PANA FAIL", flush=True) skterm() #### 糸冬了 #### elif line.startswith(b"EVENT 25") : # 接続完了! print("connect PANA OK", flush=True) bConnected = True # これ以降、シリアル通信のタイムアウトを設定 ser.timeout = 3 # スマートメーターがインスタンスリスト通知を投げてくる # (ECHONET-Lite_Ver.1.12_02.pdf p.4-16) print(ser.readline().decode('utf-8'), end="", flush=True) #無視 # 80 : 動作状態 # D3 : 係数 # D7 : 積算電力有効桁数 # E0 : 積算電力量計測値(正方向計測値) # E1 : 積算電力量単位 # E2 : 積算電力量計測値履歴1 # E3 : 積算電力量計測値(逆方向計測値) # E4 : 積算電力量計測値履歴1(逆方向計測値) # E5 : 積算履歴収集日1 # E7 : 瞬時電力計測値 # E8 : 瞬時電流計測値 # EA : 定時積算電力量計測値(正方向計測値) # EB : 定時積算電力量計測値(逆方向計測値) # EC : 積算電力量計測値履歴1(正方向,逆方向) # ED : 積算履歴収集日2 e0Data = True countNum = 0 while True : # ECHONET Lite フレーム作成 # 参考資料 # ・ECHONET-Lite_Ver.1.12_02.pdf (以下 EL) # ・Appendix_H.pdf (以下 AppH) echonetLiteFrame = b"" echonetLiteFrame += b"\x10\x81" # EHD (参考:EL p.3-2) echonetLiteFrame += b"\x00\x01" # TID (参考:EL p.3-3) # ここから EDATA echonetLiteFrame += b"\x05\xFF\x01" # SEOJ (参考:EL p.3-3 AppH p.3-408~) echonetLiteFrame += b"\x02\x88\x01" # DEOJ (参考:EL p.3-3 AppH p.3-274~) echonetLiteFrame += b"\x62" # ESV(62:プロパティ値読み出し要求) (参考:EL p.3-5) echonetLiteFrame += b"\x01" # OPC(1個)(参考:EL p.3-7) if e0Data : echonetLiteFrame += b"\xE0" # EPC 積算電力量計測値(正方向計測値) else : echonetLiteFrame += b"\xE7" # EPC 瞬時電力計測値 echonetLiteFrame += b"\x00" # PDC(参考:EL p.3-9) # コマンド送信 com1 = "SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame)) command = com1.encode() + echonetLiteFrame ser.write(command) bConnected = False while not bConnected : line = ser.readline() if line.startswith(b"OK") : bConnected = True time.sleep(0.01) line = ser.readline() # 受信データはたまに違うデータが来たり、 # 取りこぼしたりして変なデータを拾うことがあるので # チェックを厳しめにしてます。 if line.startswith(b"ERXUDP") : line = line.decode() cols = line.strip().split(' ') res = cols[8] # UDP受信データ部分 #tid = res[4:4+4]; seoj = res[8:8+6] #deoj = res[14,14+6] ESV = res[20:20+2] #OPC = res[22,22+2] if seoj == "028801" and ESV == "72" : # スマートメーター(028801)から来た応答(72)なら EPC = res[24:24+2] if EPC == "E0" : # 内容が積算電力量計測値(E0)だったら hexPower = line[-8:] # 最後の4バイト(16進数で8文字)が積算電力計測値 intPower = int(hexPower, 16) print(u"積算電力量計測値:{0}[Wh]".format(intPower)) field = {'E0': float(intPower)} influx_json(field) e0Data = False elif EPC == "E7" : # 内容が瞬時電力計測値(E7)だったら hexPower = line[-8:] # 最後の4バイト(16進数で8文字)が瞬時電力計測値 intPower = int(hexPower, 16) print(u"瞬時電力計測値:{0}[W]".format(intPower)) field = {'E7': float(intPower)} influx_json(field) if countNum == 4 : e0Data = True countNum = 0 else : countNum += 1 time.sleep(30) finally: signal.signal(signal.SIGTERM, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN) skterm() signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL) if __name__ == "__main__": sys.exit(main())
■スクリプト実行結果(整形済み)
SKVER EVER 1.2.8 OK SKSETPWD C XXXX OK SKSETRBID XXXX OK SKSCAN 2 FFFFFFFF 4 OK EVENT 22 FE80:0000:0000:0000:1207:23FF:FEA0:1207 SKSCAN 2 FFFFFFFF 5 OK EVENT 20 FE80:0000:0000:0000:1207:23FF:FEA0:1207 EPANDESC Channel:33 Channel Page:09 Pan ID:XXXX Addr:XXXX LQI:2D PairID:01055146 EVENT 22 FE80:0000:0000:0000:1207:23FF:FEA0:1207 SKSREG S2 33 OK SKSREG S3 XXXX OK SKJOIN FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX OK connect PANA OK 積算電力量計測値:170317[kWh] [{'measurement': 'power', 'tags': {'place': 'uni', 'host': 'RL7023'}, 'fields': {'E0': 170317.0}}] 瞬時電力計測値:381[W] [{'measurement': 'power', 'tags': {'place': 'uni', 'host': 'RL7023'}, 'fields': {'E7': 381.0}}]
■グラフ化
瞬間電力の高い部分はポットで湯を沸かしたときである