#!/usr/bin/env python # -*- coding: utf-8 -*- # $Id: fritzboxdect.py 110 2017-12-18 23:48:49Z bird $ """ Fritz!box DECT sockets. Generates power consumption, current power usage, and temperature graphs. Configuration: [fritzboxdect] env.fritzboxdect_ip [ip addresses of the fritzboxes] env.fritzboxdect_password [passwords of the frizboxes] #%# family=auto contrib #%# capabilities=autoconf """ __copyright__ = \ """ Copyright (c) 2017 Knut St. Osmundsen Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ __version__ = "$Revision: 110 $" # Standard Python imports. import hashlib import httplib import os import sys from xml.etree import ElementTree from xml.dom import minidom if sys.version_info[0] < 3: from urllib2 import quote as urllib_quote; from urllib import urlencode as urllib_urlencode; else: from urllib.parse import quote as urllib_quote; # pylint: disable=F0401,E0611 from urllib.parse import urlencode as urllib_urlencode; # pylint: disable=F0401,E0611 ## Wheter to display debug messages. g_fDebug = len(os.environ.get('debug', '')) > 0; class FritzBoxConnection(object): """ A HTTP(S) connection to a fritz!box. """ ## The user agent string to use. g_UserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" def __init__(self, sServer, sPassword, iPort = 80): # # Connect to the fritz!box. # oConn = httplib.HTTPConnection('%s:%s' % (sServer, iPort)); self.oConn = oConn; # # Login - gets a SID back. # dLoginHdrs = { "Accept": "application/xml", "Content-Type": "text/plain", "User-Agent": self.g_UserAgent, }; oConn.request("GET", '/login_sid.lua', '', dLoginHdrs) oResp = oConn.getresponse(); sData = oResp.read(); if oResp.status != 200: raise Exception('login_sid.lua response: %s %s' % (oResp.status, oResp.reason,)); oXmlData = minidom.parseString(sData); aoElmSids = oXmlData.getElementsByTagName('SID'); sSid = aoElmSids[0].firstChild.data; if sSid == '0000000000000000': # Hash the password and compose reply text. aoElmChallenges = oXmlData.getElementsByTagName('Challenge'); sChallange = aoElmChallenges[0].firstChild.data; sReplyHashedText = ('%s-%s' % (sChallange, sPassword)).decode('iso-8859-1').encode('utf-16le'); oReplyHash = hashlib.md5(); oReplyHash.update(sReplyHashedText); sReplyText = '%s-%s' % (sChallange, oReplyHash.hexdigest().lower()); # Sent it. dReplyHdrs = { "Accept": "text/html,application/xhtml+xml,application/xml", "Content-Type": "application/x-www-form-urlencoded", "User-Agent": self.g_UserAgent, }; oConn.request("GET", '/login_sid.lua?' + urllib_urlencode({'response': sReplyText,}), '', dReplyHdrs) oResp = oConn.getresponse(); sData = oResp.read(); if oResp.status != 200: raise Exception('login_sid.lua response: %s %s' % (oResp.status, oResp.reason,)); oXmlData = minidom.parseString(sData); aoElmSids = oXmlData.getElementsByTagName('SID'); sSid = aoElmSids[0].firstChild.data; if sSid == '0000000000000000': raise Exception('login failure'); # Remember the SID. self.sSid = sSid; def getPage(self, sUrl, asArgs): """ Retrieves the given page. We would like to use a directory for the arguments, but fritzy is picky about the ordering. """ dPageHdrs = { "Accept": "application/xml,text/xml", "Content-Type": "text/plain", "User-Agent": self.g_UserAgent, }; sUrl = sUrl + '?sid=' + self.sSid; for sArg in asArgs: sName, sValue = sArg.split('=') sUrl += '&' + urllib_quote(sName) + '=' + urllib_quote(sValue); if g_fDebug: print('debug: sUrl: %s' % (sUrl,)); self.oConn.request("GET", sUrl, '', dPageHdrs) oResp = self.oConn.getresponse(); sData = oResp.read(); if oResp.status != 200: raise Exception('%s response: %s %s' % (sUrl, oResp.status, oResp.reason,)); return sData; @staticmethod def getPages(sUrl, asArgs): """ Gets an array of pages from each of the frizboxes. """ asRet = []; asIps = os.environ.get('fritzboxdect_ip', '10.42.2.1 10.42.1.50').split(); if len(asIps) == 0: raise Exception('environment variable fritzboxdect_ip is empty') asPasswords = os.environ.get('fritzboxdect_password', '').split(); if len(asPasswords) == 0: raise Exception('environment variable fritzboxdect_password is empty') if g_fDebug: print('debug: asIps=%s asPasswords=%s' % (asIps, asPasswords,)); for i, sIp in enumerate(asIps): sPassword = asPasswords[i] if i < len(asPasswords) else asPasswords[-1]; oConn = FritzBoxConnection(sIp, sPassword); asRet.append(oConn.getPage(sUrl, asArgs)); del oConn; return asRet; # XML output example: # # # 1 # Socket #1 # # 1 # manuell # 1 # 0 # # # 2730 # 3602 # # # 255 # 0 # # # g_sPage = '/webservices/homeautoswitch.lua' g_fBitEnergy = 1 << 7; g_fBitTemp = 1 << 8; def getAllDeviceElements(): """ Connects to the fritz!boxes and gets the devicelistinfos. Returns array of device elements, sorted by name. """ asData = FritzBoxConnection.getPages(g_sPage, ['switchcmd=getdevicelistinfos']); aoRet = []; for sData in asData: oXmlRoot = ElementTree.fromstring(sData); for oElmDevice in oXmlRoot.findall('device'): aoRet.append(oElmDevice); def getKey(oElmDevice): oName = oElmDevice.find('name'); if oName is not None: return oName.text; return oElmDevice.get('identifier'); return sorted(aoRet, key = getKey); def getDeviceVarName(oElmDevice): """ Gets the graph variable name for the device. """ sAin = oElmDevice.get('identifier'); sAin = ''.join(sAin.split()); return 'ain_%s' % (sAin,); def getDeviceBitmask(oElmDevice): """ Gets the device bitmask. """ sBitmask = oElmDevice.get('functionbitmask'); try: fBitmask = int(sBitmask); except: sProduct = oElmDevice.get('productname'); if sProduct == 'FRITZ!DECT 210' or sProduct == 'FRITZ!DECT 200': fBitmask = 0xb80; elif sProduct == 'FRITZ!DECT 100': fBitmask = 0x500; else: fBitmask = 0; return fBitmask; def printValues(): """ Prints the values. """ aoElmDevices = getAllDeviceElements() print('multigraph power_consumption') uTotal = 0; for oElmDevice in aoElmDevices: sVarNm = getDeviceVarName(oElmDevice); if getDeviceBitmask(oElmDevice) & g_fBitEnergy: oElmPowerMeter = oElmDevice.find('powermeter'); if oElmPowerMeter is not None: sValue = oElmPowerMeter.find('energy').text; if sValue is not None: sValue = oElmPowerMeter.find('energy').text.strip(); if sValue: print('%s_wh.value %s' % (sVarNm, sValue,)); try: uTotal += int(sValue) except: pass; print('total_wh.value %s' % (uTotal,)); print('multigraph power_usage') for oElmDevice in aoElmDevices: sVarNm = getDeviceVarName(oElmDevice); if getDeviceBitmask(oElmDevice) & g_fBitEnergy: oElmPowerMeter = oElmDevice.find('powermeter'); if oElmPowerMeter is not None: sValue = oElmPowerMeter.find('power').text; if sValue is not None: sValue = sValue.strip(); if sValue: try: uTotal += int(sValue) except: pass; while len(sValue) < 4: sValue = '0' + sValue; print('%s_w.value %s.%s' % (sVarNm, sValue[:-3] , sValue[-3:])); print('total_wh.value %.3f' % (uTotal / 1000.0,)); print('multigraph temp') dAvg = {}; for oElmDevice in aoElmDevices: sVarNm = getDeviceVarName(oElmDevice); if getDeviceBitmask(oElmDevice) & g_fBitTemp: oElmTemp = oElmDevice.find('temperature'); if oElmTemp is not None: sValue = oElmTemp.find('celsius').text; if sValue is not None: sValue = sValue.strip(); if sValue: print('%s_c.value %s.%s' % (sVarNm, sValue[:-1] if len(sValue) > 0 else '0', sValue[-1:])); if oElmDevice.find('name') is not None: try: uValue = int(sValue); except: pass; else: sFloor = oElmDevice.find('name').text[:2]; if sFloor not in dAvg: dAvg[sFloor] = (1, uValue); else: dAvg[sFloor] = (dAvg[sFloor][0] + 1, dAvg[sFloor][1] + uValue); for sVarNm in sorted(dAvg.keys()): print('avg_%s_c.value %.1f' % (sVarNm, dAvg[sVarNm][1] / 10.0 / dAvg[sVarNm][0])); # done def printConfig(): """ Prints the configuration. """ aoElmDevices = getAllDeviceElements() print('multigraph power_consumption') print('graph_title Power consumption'); print('graph_vlabel Wh'); print('graph_args --base 1000'); print("graph_category house"); for oElmDevice in aoElmDevices: if getDeviceBitmask(oElmDevice) & g_fBitEnergy: sVarNm = getDeviceVarName(oElmDevice); print('%s_wh.label %s' % (sVarNm, oElmDevice.find('name').text,)); print('%s_wh.type COUNTER' % (sVarNm,)); print('%s_wh.draw LINE1' % (sVarNm,)); print('total_wh.label total'); print('total_wh.type COUNTER'); print('total_wh.draw LINE1'); print('multigraph power_usage') print('graph_title Power usage'); print('graph_vlabel W'); print('graph_args --base 1000'); print("graph_category house"); print('graph_info Current power usage around the house'); for oElmDevice in aoElmDevices: if getDeviceBitmask(oElmDevice) & g_fBitEnergy: sVarNm = getDeviceVarName(oElmDevice); print('%s_w.label %s' % (sVarNm, oElmDevice.find('name').text,)); print('%s_w.type GAUGE' % (sVarNm,)); print('%s_w.draw LINE1' % (sVarNm,)); print('total_w.label total'); print('total_w.type COUNTER'); print('total_w.draw LINE1'); print('multigraph temp') print('graph_title Temperature'); print('graph_args --base 1000'); print('graph_vlabel Degrees (C)'); print('graph_scale no'); print('graph_category house'); print('graph_info Temperatures around the house'); print('temperature.type GAUGE'); dAvg = {}; for oElmDevice in aoElmDevices: if getDeviceBitmask(oElmDevice) & g_fBitTemp: sVarNm = getDeviceVarName(oElmDevice); sName = oElmDevice.find('name').text; print('%s_c.label %s' % (sVarNm, sName,)); print('%s_c.type GAUGE' % (sVarNm,)); print('%s_c.draw LINE1' % (sVarNm,)); dAvg[sName[:2]] = 1; for sVarNm in sorted(dAvg.keys()): print('avg_%s_c.label Average %s' % (sVarNm, sVarNm,)); print('avg_%s_c.type GAUGE' % (sVarNm,)); print('avg_%s_c.draw LINE1' % (sVarNm,)); def main(asArgs): """ C-like main. """ if len(asArgs) == 2 and asArgs[1] == 'config': try: printConfig(); except Exception as oXcpt: sys.exit('Failed to retreive configuration (%s)' % (oXcpt,)); return 0; if len(asArgs) == 2 and asArgs[1] == 'autoconfig': print("yes"); return 0; if len(asArgs) == 1 \ or (len(asArgs) == 2 and asArgs[1] == 'fetch'): try: printValues(); except Exception as oXcpt: sys.exit('Failed to retreive data (%s)' % (oXcpt,)); return 0; sys.exit('Unknown request (%s)' % (asArgs,)); if __name__ == '__main__': sys.exit(main(sys.argv))