いんくらyochさんの日記

興味あることの忘備録

Wi-SUNで使用電力を可視化

災害時の電力確保について調べていたところ、電力会社のメーターから使用電力などの情報が抜けることがわかったので、スマートメータのBルートについて解説されたサイトの情報を利用してラズパイで使用電力を蓄積するように構築した。

使用したのは、余ってるラズパイ1のモデルB+とUSB-WiFiとWi-SUNモジュールと呼ばれるスマートメータの920MHz帯で通信ができるUSBドングルである。その他、電力会社へBルート利用申請も必要である。
参照したのは以下のサイト
bey.jp
東京電力からBルート申請のIDとパスワードが届いたので、サイトの内容を利用して積算電力と瞬間電力をDBに書き込むスクリプトpython3で動くよう作成し、グラフ化してみた。
■ソース(Python3)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function

import sys
import serial
import time
import signal
import ctypes

from influxdb import InfluxDBClient
client = InfluxDBClient ( host = '127.0.0.1', port=8086, username ='****', password ='****', database ='broute')
measurement ='power'
tags ={'place': 'uni','host': 'RL7023'}

def influx_json (field, nowtime='') :
    json_body = [
         {
              'measurement': measurement,
              'tags': tags,
              'fields': field
         }
    ]
    client.write_points(json_body)

# Bルート認証ID(東京電力パワーグリッドから郵送で送られてくるヤツ)
rbid  = "********"
# Bルート認証パスワード(東京電力パワーグリッドからメールで送られてくるヤツ)
rbpwd = "********"
# シリアルポートデバイス名
ser = serial.Serial('/dev/ttyUSB0', 115200)

def skterm():
    ser.write(str.encode("SKTERM\r\n"))
    print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック
    print(ser.readline().decode('utf-8'), end="", flush=True) # EVENT21
    print(ser.readline().decode('utf-8'), end="", flush=True) # OK
    print(ser.readline().decode('utf-8'), end="", flush=True) # ERXUDP
    print(ser.readline().decode('utf-8'), end="", flush=True) # EVENT27
    ser.close
    print("END")

def sig_handler(signum, frame) -> None:
    sys.exit(1)

def main():

    signal.signal(signal.SIGTERM, sig_handler)

    try:
        # とりあえずバージョンを取得してみる(やらなくてもおk)
        ser.write(str.encode("SKVER\r\n"))
        print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック
        print(ser.readline().decode('utf-8'), end="", flush=True) # バージョン

        # Bルート認証パスワード設定
        ser.write(str.encode("SKSETPWD C " + rbpwd + "\r\n"))
        print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック
        print(ser.readline().decode('utf-8'), end="", flush=True) # OKが来るはず(チェック無し)

        # Bルート認証ID設定
        ser.write(str.encode("SKSETRBID " + rbid + "\r\n"))
        print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック
        print(ser.readline().decode('utf-8'), end="", flush=True) # OKが来るはず(チェック無し)

        scanDuration = 4;   # スキャン時間。サンプルでは6なんだけど、4でも行けるので。(ダメなら増やして再試行)
        scanRes = {} # スキャン結果の入れ物

        # スキャンのリトライループ(何か見つかるまで)
        while not 'Channel' in scanRes:

            # アクティブスキャン(IE あり)を行う
            # SKSCAN 2 FFFFFFFF 8
            ser.write(str.encode("SKSCAN 2 FFFFFFFF " + str(scanDuration) + "\r\n"))

            # スキャン1回について、スキャン終了までのループ
            scanEnd = False
            while not scanEnd :
                line = ser.readline()
                print(line.decode('utf-8'), end="")
                line = line.decode('utf-8')

                if line.startswith("EVENT 22") :
                    # スキャン終わったよ(見つかったかどうかは関係なく)
                    scanEnd = True

                elif line.startswith("  ") :
                    # スキャンして見つかったらスペース2個あけてデータがやってくる
                    # 例
                    # scanRes["Channel"]="33"
                    # scanRes["Channel Page"]="09"
                    # scanRes["Pan ID"]="****"
                    # scanRes["Addr"]="****"
                    # scanRes["LQI"]="27"
                    # scanRes["PairID"]="01055146"
                    cols = line.strip().split(':')
                    scanRes[cols[0]] = cols[1]
            scanDuration += 1

            if 8 < scanDuration :
                # 引数としては14まで指定できるが、7で失敗したらそれ以上は無駄っぽい
                print("スキャンリトライオーバー")
                ser.close
                sys.exit(1)  #### 糸冬了 ####

        # スキャン結果からChannelを設定。
        ser.write(str.encode("SKSREG S2 " + scanRes["Channel"] + "\r\n"))
        print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック
        print(ser.readline().decode('utf-8'), end="", flush=True) # OKが来るはず(チェック無し)

        # スキャン結果からPan IDを設定
        ser.write(str.encode("SKSREG S3 " + scanRes["Pan ID"] + "\r\n"))
        print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック
        print(ser.readline().decode('utf-8'), end="", flush=True) # OKが来るはず(チェック無し)

        # MACアドレス(64bit)をIPV6リンクローカルアドレスに変換。
        # (BP35A1の機能を使って変換しているけど、単に文字列変換すればいいのではという話も??)
        ser.write(str.encode("SKLL64 " + scanRes["Addr"] + "\r\n"))
        print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック
        ipv6Addr = ser.readline().decode('utf-8').strip()

        # PANA 接続シーケンスを開始します。
        ser.write(str.encode("SKJOIN " + ipv6Addr + "\r\n"))
        print(ser.readline().decode('utf-8'), end="", flush=True) # エコーバック
        print(ser.readline().decode('utf-8'), end="", flush=True) # OKが来るはず(チェック無し)

        # PANA 接続完了待ち(10行ぐらいなんか返してくる)
        bConnected = False
        while not bConnected :
            line = ser.readline()
            print(line.decode('utf-8'), end="", flush=True)
 
            if line.startswith(b"EVENT 24") :
                print("connect PANA FAIL", flush=True)
                skterm()  #### 糸冬了 ####

            elif line.startswith(b"EVENT 25") :
                # 接続完了!
                print("connect PANA OK", flush=True)
                bConnected = True

        # これ以降、シリアル通信のタイムアウトを設定
        ser.timeout = 3 

        # スマートメーターがインスタンスリスト通知を投げてくる
        # (ECHONET-Lite_Ver.1.12_02.pdf p.4-16)
        print(ser.readline().decode('utf-8'), end="", flush=True) #無視

        # 80 : 動作状態
        # D3 : 係数
        # D7 : 積算電力有効桁数
        # E0 : 積算電力量計測値(正方向計測値)
        # E1 : 積算電力量単位
        # E2 : 積算電力量計測値履歴1
        # E3 : 積算電力量計測値(逆方向計測値)
        # E4 : 積算電力量計測値履歴1(逆方向計測値)
        # E5 : 積算履歴収集日1
        # E7 : 瞬時電力計測値
        # E8 : 瞬時電流計測値
        # EA : 定時積算電力量計測値(正方向計測値)
        # EB : 定時積算電力量計測値(逆方向計測値)
        # EC : 積算電力量計測値履歴1(正方向,逆方向)
        # ED : 積算履歴収集日2

        e0Data = True
        countNum = 0

        while True :

            # ECHONET Lite フレーム作成
            #  参考資料
            #  ・ECHONET-Lite_Ver.1.12_02.pdf (以下 EL)
            #  ・Appendix_H.pdf (以下 AppH)
            echonetLiteFrame =  b""
            echonetLiteFrame += b"\x10\x81"      # EHD (参考:EL p.3-2)
            echonetLiteFrame += b"\x00\x01"      # TID (参考:EL p.3-3)
            # ここから EDATA
            echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ (参考:EL p.3-3 AppH p.3-408~)
            echonetLiteFrame += b"\x02\x88\x01"  # DEOJ (参考:EL p.3-3 AppH p.3-274~)
            echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求) (参考:EL p.3-5)
            echonetLiteFrame += b"\x01"          # OPC(1個)(参考:EL p.3-7)

            if e0Data :
                echonetLiteFrame += b"\xE0"      # EPC 積算電力量計測値(正方向計測値)
            else :
                echonetLiteFrame += b"\xE7"      # EPC 瞬時電力計測値

            echonetLiteFrame += b"\x00"          # PDC(参考:EL p.3-9)

            # コマンド送信
            com1 = "SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
            command = com1.encode() + echonetLiteFrame
            ser.write(command)

            bConnected = False
            while not bConnected :
                line = ser.readline()
                if line.startswith(b"OK") :
                    bConnected = True 

            time.sleep(0.01)

            line = ser.readline()

            # 受信データはたまに違うデータが来たり、
            # 取りこぼしたりして変なデータを拾うことがあるので
            # チェックを厳しめにしてます。

            if line.startswith(b"ERXUDP") :
                line = line.decode()
                cols = line.strip().split(' ')
                res = cols[8]   # UDP受信データ部分
                #tid = res[4:4+4];
                seoj = res[8:8+6]
                #deoj = res[14,14+6]
                ESV = res[20:20+2]
                #OPC = res[22,22+2]

                if seoj == "028801" and ESV == "72" :
                    # スマートメーター(028801)から来た応答(72)なら
                    EPC = res[24:24+2]

                    if EPC == "E0" :
                        # 内容が積算電力量計測値(E0)だったら
                        hexPower = line[-8:]	# 最後の4バイト(16進数で8文字)が積算電力計測値
                        intPower = int(hexPower, 16)
                        print(u"積算電力量計測値:{0}[Wh]".format(intPower))
                        field = {'E0': float(intPower)}
                        influx_json(field)
                        e0Data = False

                    elif EPC == "E7" :
                        # 内容が瞬時電力計測値(E7)だったら
                        hexPower = line[-8:]    # 最後の4バイト(16進数で8文字)が瞬時電力計測値
                        intPower = int(hexPower, 16)
                        print(u"瞬時電力計測値:{0}[W]".format(intPower))
                        field = {'E7': float(intPower)}
                        influx_json(field)

            if countNum == 4 :
                e0Data = True
                countNum = 0
            else :
                countNum += 1

            time.sleep(30)

    finally:
        signal.signal(signal.SIGTERM, signal.SIG_IGN)
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        skterm()
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
        signal.signal(signal.SIGINT, signal.SIG_DFL)

if __name__ == "__main__":
    sys.exit(main())

スクリプト実行結果(整形済み)

SKVER
EVER 1.2.8
OK
SKSETPWD C XXXX
OK
SKSETRBID XXXX
OK
SKSCAN 2 FFFFFFFF 4
OK
EVENT 22 FE80:0000:0000:0000:1207:23FF:FEA0:1207
SKSCAN 2 FFFFFFFF 5
OK
EVENT 20 FE80:0000:0000:0000:1207:23FF:FEA0:1207
EPANDESC
  Channel:33
  Channel Page:09
  Pan ID:XXXX
  Addr:XXXX
  LQI:2D
  PairID:01055146

EVENT 22 FE80:0000:0000:0000:1207:23FF:FEA0:1207
SKSREG S2 33
OK
SKSREG S3 XXXX
OK
SKJOIN FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX
OK

connect PANA OK

積算電力量計測値:170317[kWh]
[{'measurement': 'power', 'tags': {'place': 'uni', 'host': 'RL7023'}, 'fields': {'E0': 170317.0}}]

瞬時電力計測値:381[W]
[{'measurement': 'power', 'tags': {'place': 'uni', 'host': 'RL7023'}, 'fields': {'E7': 381.0}}]

■グラフ化
f:id:yoch:20210423010053p:plain
瞬間電力の高い部分はポットで湯を沸かしたときである