diff --git a/.hgignore b/.hgignore new file mode 100644 index 0000000..fd5e404 --- /dev/null +++ b/.hgignore @@ -0,0 +1,4 @@ +syntax: glob + +*.pyc +*.svn diff --git a/CHANGES.txt b/CHANGES.txt new file mode 100644 index 0000000..9382bca --- /dev/null +++ b/CHANGES.txt @@ -0,0 +1,18 @@ +v1.5.0, 6/27/10 -- Initial Packaging. Fully restructured into a unified API with tests. +v1.7.0 6/29/10 -- Now supports both Series 1 and Series 2 modules + (the API turned out to be the same). Additionally: + * API frame logic was split into its own class, APIFrame + * XBee renamed to XBeeBase + * XBee1 renamed to XBee + * Tests updated to reflect changes; API frame tests + moved to test_frame.py, now test APIFrame instead of + XBee base class + * Test files renamed appropriately + * PyLint score improved + * Various docstring updates + * Updated example code to reflect changes +v1.7.1 7/7/2010 -- Bug fix: Now supports receiving I/O data with 64-bit addressing + * Previously, an exception was raised when a packet with ID 0x82 + * arrived, which contains I/O samples with a 64-bit source address + * This has been fixed. + diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..07c885e --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,21 @@ +The MIT License + +Copyright (c) 2010 Paul Malmsten, Amit Synderman, Marco Sangalli + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..b8e7065 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,4 @@ +include *.txt +recursive-include docs *.txt +recursive-include examples *.txt *.py +recursive-include scripts *.py diff --git a/README.txt b/README.txt new file mode 100644 index 0000000..ca565fe --- /dev/null +++ b/README.txt @@ -0,0 +1,80 @@ +========= +XBee +========= + +XBee provides an implementation of the XBee serial communication API. It +allows one to easily access advanced features of one or more XBee +devices from an application written in Python. An example use case might +look like this:: + + #! /usr/bin/python + + # Import and init an XBee device + from xbee import XBee + import serial + + ser = serial.Serial('/dev/ttyUSB0', 9600) + xbee = XBee(ser) + + # Set remote DIO pin 2 to low (mode 4) + xbee.remote_at( + dest_addr='\x56\x78', + command='D2', + parameter='\x04') + + xbee.remote_at( + dest_addr='\x56\x78', + command='WR') + + +Usage +============ + +Series 1, Series 2 +------------------ + +To use this library with an XBee device, import the class +XBee and call its constructor with a serial port object. + +In order to send commands via the API, call a method with the same +name as the command which you would like to send with words separated +by _'s. For example, to send a Remote AT command, one would call +remote_at(). + +The arguments to be given to each method depend upon the command to be +sent. For more information concerning the names of the arguments which +are expected and the proper data types for each argument, consult the +API manual for your XBee device, or consult the source code. + +Caveats +--------- + +Escaped API operation has not been implemented at this time. + +Dependencies +============ + +PySerial + +Additional Dependencies (for running tests): +-------------------------------------------- + +Nose + +XBee Firmware +------------- + +Please ensure that your XBee device is programmed with the latest firmware +provided by Digi. Using old firmware revisions is not supported and +may result in unspecified behavior. + +Contributors +================== + +Paul Malmsten + +Special Thanks +================== + +Amit Synderman, +Marco Sangalli diff --git a/deprecated/test.py.bak b/deprecated/test.py.bak new file mode 100644 index 0000000..7f5ec9a --- /dev/null +++ b/deprecated/test.py.bak @@ -0,0 +1,28 @@ +import unittest +import os + +test_modules = ['xbee.tests.test_xbee', + 'xbee.tests.test_xbee1'] + +tests_path = 'xbee/tests' +tests_module_path = 'xbee.tests.' + +def run_tests(): + runner = unittest.TextTestRunner() + superSuite = unittest.TestSuite() + + # Walk tests directory + path = os.path.abspath(tests_path) + for path, dirs, files in os.walk(path): + for f in files: + name, ext = os.path.splitext(f) + + if ext == "py" or ext == ".py": + # Combine the name of the module with the package path + # This works because xbee.tests is a package + superSuite.addTests(unittest.defaultTestLoader.loadTestsFromName(tests_module_path + name)) + + runner.run( superSuite ) + +if __name__ == "__main__": + run_tests() diff --git a/examples/alarm.py b/examples/alarm.py new file mode 100755 index 0000000..3806e15 --- /dev/null +++ b/examples/alarm.py @@ -0,0 +1,253 @@ +#! /usr/bin/python + +""" +alarm.py + +By Paul Malmsten, 2010 +pmalmsten@gmail.com + +This module will communicate with a remote XBee device in order to +implement a simple alarm clock with bed occupancy detection. +""" +import serial +from xbee import XBee + +class DataSource(object): + """ + Represents a source from which alarm times may be pulled (i.e. an + online calendar) + """ + def next_alarm_time(self, current_time): + """ + next_alarm_time: datetime -> datetime + + Returns the next time at which the alarm should activate + """ + raise NotImplemented() + +class AlarmDevice(object): + """ + Represents alarm harware, such as input and output to an from + the real world + """ + + def __init__(self, hw): + self.hw = hw + + def activate(self): + """ + activate: None -> None + + Activates noise-making features + """ + raise NotImplementedError() + + def deactivate(self): + """ + deactivate: None -> None + + Deactivates noise-making features + """ + raise NotImplementedError() + + def bed_occupied(self): + """ + bed_occupied: None -> Boolean + + Determines whether the bed is currently occupied + """ + raise NotImplementedError() + +class WakeupRoutine(object): + """ + Represents a process by which a user should be awoken with a + particular AlarmDevice + """ + + def __init__(self, device): + self.device = device + + def trigger(self): + """ + trigger: None -> None + + Begins the specified wakeup process with the given hardware + device. Does not relinquish control until the wakeup process is + complete. + """ + raise NotImplementedError() + +# ================= Custom Classes ============================= + +class TestSource(DataSource): + def __init__(self, time): + super(TestSource, self).__init__() + self.next_time = time + + def next_alarm_time(self, current_time): + return self.next_time + +class XBeeAlarm(AlarmDevice): + DETECT_THRESH = 350 + + def __init__(self, serial_port, remote_addr): + # Open serial port, construct XBee1, configure remote device, + # store as hardware + self.remote_addr = remote_addr + + ser = serial.Serial(serial_port) + xbee = XBee(ser) + + super(XBeeAlarm, self).__init__(xbee) + + # Reset remote device + self._reset() + + def _reset(self): + """ + reset: None -> None + + Resets the remote XBee device to a standard configuration + """ + # Analog pin 0 + self.hw.remote_at( + dest_addr=self.remote_addr, + command='D0', + parameter='\x02') + + # Disengage remote LED, buzzer + self.deactivate() + self._set_send_samples(False) + + def _set_LED(self, status): + """ + _set_LED: boolean -> None + + Sets the status of the remote LED + """ + # DIO pin 1 (LED), active low + self.hw.remote_at( + dest_addr=self.remote_addr, + command='D1', + parameter='\x04' if status else '\x05') + + def _set_buzzer(self, status): + """ + _set_buzzer: boolean -> None + + Sets the status of the remote buzzer + """ + # DIO pin 1 (LED), active low + self.hw.remote_at( + dest_addr=self.remote_addr, + command='D2', + parameter='\x05' if status else '\x04') + + def _set_send_samples(self, status): + """ + _set_send_samples: boolean -> None + + Sets whether the remote device will send data samples once every + second. + """ + # Send samples once per second + self.hw.remote_at( + dest_addr=self.remote_addr, + command='IR', + parameter='\xff' if status else '\x00') + + def activate(self): + """ + activate: None -> None + + Remote XBee starts making noise and turns on LED + """ + self._set_LED(True) + self._set_buzzer(True) + + def deactivate(self): + """ + activate: None -> None + + Remote XBee starts making noise and turns on LED + """ + self._set_LED(False) + self._set_buzzer(False) + + def bed_occupied(self): + """ + bed_occupied: None -> boolean + + Determines whether the bed is currently occupied by requesting + data from the remote XBee and comparing the analog value with + a threshold. + """ + + # Receive samples from the remote device + self._set_send_samples(True) + + while True: + packet = self.hw.wait_read_frame() + + if 'adc-0' in packet['samples'][0]: + # Stop receiving samples from the remote device + self._set_send_samples(False) + return packet['samples'][0]['adc-0'] > XBeeAlarm.DETECT_THRESH + + +class SimpleWakeupRoutine(WakeupRoutine): + """ + When triggered, activates the alarm if the bed is occupied. The + alarm continues until the bed is no longer occupied. + """ + + def trigger(self): + from time import sleep + + pulse_delay = 0.1 + + + if self.device.bed_occupied(): + # Initial alarm + for x in range(0, 5): + self.device.activate() + sleep(pulse_delay) + self.device.deactivate() + sleep(pulse_delay) + + # Allow time to escape + sleep(30) + + # Extended alarm + duration = 1 + pause = 10 + + while self.device.bed_occupied(): + self.device.activate() + sleep(duration) + self.device.deactivate() + sleep(pause) + duration *= 2 + +def main(): + """ + Run through simple demonstration of alarm concept + """ + alarm = XBeeAlarm('/dev/ttyUSB0', '\x56\x78') + routine = SimpleWakeupRoutine(alarm) + + from time import sleep + while True: + """ + Run the routine with 10 second delays + """ + try: + print "Waiting 5 seconds..." + sleep(5) + print "Firing" + routine.trigger() + except KeyboardInterrupt: + break + +if __name__ == '__main__': + main() diff --git a/examples/led_adc_example.py b/examples/led_adc_example.py new file mode 100755 index 0000000..0282f8f --- /dev/null +++ b/examples/led_adc_example.py @@ -0,0 +1,106 @@ +#! /usr/bin/python + +""" +led_adc_example.py + +By Paul Malmsten, 2010 +pmalmsten@gmail.com + +A simple example which sets up a remote device to read an analog value +on ADC0 and a digital output on DIO1. It will then read voltage +measurements and write an active-low result to the remote DIO1 pin. +""" + +from xbee import XBee +import serial + +ser = serial.Serial('/dev/ttyUSB0', 9600) +xbee = XBee(ser) + +## Set up remote device +#xbee.send('remote_at', + #frame_id='A', + #dest_addr_long='\x00\x00\x00\x00\x00\x00\x00\x00', + #dest_addr='\x56\x78', + #options='\x02', + #command='D0', + #parameter='\x02') + +#print xbee.wait_read_frame()['status'] + +#xbee.send('remote_at', + #frame_id='B', + #dest_addr_long='\x00\x00\x00\x00\x00\x00\x00\x00', + #dest_addr='\x56\x78', + #options='\x02', + #command='D1', + #parameter='\x05') + +#print xbee.wait_read_frame()['status'] + +#xbee.send('remote_at', + #frame_id='C', + #dest_addr_long='\x00\x00\x00\x00\x00\x00\x00\x00', + #dest_addr='\x56\x78', + #options='\x02', + #command='IR', + #parameter='\x32') + +#print xbee.wait_read_frame()['status'] + +#xbee.send('remote_at', + #frame_id='C', + #dest_addr_long='\x00\x00\x00\x00\x00\x00\x00\x00', + #dest_addr='\x56\x78', + #options='\x02', + #command='WR') + +# Deactivate alarm pin +xbee.remote_at( + dest_addr='\x56\x78', + command='D2', + parameter='\x04') + +xbee.remote_at( + dest_addr='\x56\x78', + command='WR') + +#print xbee.wait_read_frame()['status'] + +while True: + try: + packet = xbee.wait_read_frame() + print packet + + # If it's a sample, check it + if packet['id'] == 'rx_io_data': + # Set remote LED status + if packet['samples'][0]['adc-0'] > 160: + # Active low + xbee.remote_at( + dest_addr='\x56\x78', + command='D1', + parameter='\x04') + + # Active high alarm pin + xbee.remote_at( + dest_addr='\x56\x78', + command='D2', + parameter='\x05') + + else: + xbee.remote_at( + dest_addr='\x56\x78', + command='D1', + parameter='\x05') + + # Deactivate alarm pin + xbee.remote_at( + dest_addr='\x56\x78', + command='D2', + parameter='\x04') + + except KeyboardInterrupt: + break + +ser.close() diff --git a/examples/receive_samples.py b/examples/receive_samples.py new file mode 100755 index 0000000..5d5d9f0 --- /dev/null +++ b/examples/receive_samples.py @@ -0,0 +1,33 @@ +#! /usr/bin/python + +""" +receive_samples.py + +By Paul Malmsten, 2010 +pmalmsten@gmail.com + +This example continuously reads the serial port and processes IO data +received from a remote XBee. +""" + +from xbee import XBee +import serial + +PORT = '/dev/ttyUSB0' +BAUD_RATE = 9600 + +# Open serial port +ser = serial.Serial(PORT, BAUD_RATE) + +# Create API object +xbee = XBee(ser) + +# Continuously read and print packets +while True: + try: + response = xbee.wait_read_frame() + print response + except KeyboardInterrupt: + break + +ser.close() diff --git a/examples/serial_example_series_1.py b/examples/serial_example_series_1.py new file mode 100755 index 0000000..8707cc0 --- /dev/null +++ b/examples/serial_example_series_1.py @@ -0,0 +1,61 @@ +#! /usr/bin/python + +from xbee import XBee +import serial + +""" +serial_example.py +By Paul Malmsten, 2010 + +Demonstrates reading the low-order address bits from an XBee Series 1 +device over a serial port (USB) in API-mode. +""" + +def main(): + """ + Sends an API AT command to read the lower-order address bits from + an XBee Series 1 and looks for a response + """ + try: + + # Open serial port + ser = serial.Serial('/dev/ttyUSB0', 9600) + + # Create XBee Series 1 object + xbee = XBee(ser) + + + # Send AT packet + xbee.send('at', frame_id='A', command='DH') + + # Wait for response + response = xbee.wait_read_frame() + print response + + # Send AT packet + xbee.send('at', frame_id='B', command='DL') + + # Wait for response + response = xbee.wait_read_frame() + print response + + # Send AT packet + xbee.send('at', frame_id='C', command='MY') + + # Wait for response + response = xbee.wait_read_frame() + print response + + # Send AT packet + xbee.send('at', frame_id='D', command='CE') + + # Wait for response + response = xbee.wait_read_frame() + print response + except KeyboardInterrupt: + pass + finally: + ser.close() + +if __name__ == '__main__': + main() diff --git a/scripts/shell.py b/scripts/shell.py new file mode 100755 index 0000000..f1238ae --- /dev/null +++ b/scripts/shell.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +""" +shell.py + +Amit Snyderman, 2009 + + +Updated by Paul Malmsten, 2010 +pmalmsten@gmail.com + +Provides a simple shell for testing XBee devices. Currently, the shell +only allows one to parse and print received data; sending is not +supported. +""" +# $Id: xbee-serial-terminal.py 7 2009-12-30 16:25:08Z amitsnyderman $ + +import sys, time, cmd, serial, binascii +from xbee import XBee1 + +class XBeeShell(cmd.Cmd): + def __init__(self): + cmd.Cmd.__init__(self) + self.prompt = "xbee% " + self.serial = serial.Serial() + + def default(self, p): + if not self.serial.isOpen(): + print "You must set a serial port first." + else: + if p == '+++': + self.serial.write(p) + time.sleep(2) + else: + self.serial.write('%s\r' % p) + time.sleep(0.5) + + output = '' + while self.serial.inWaiting(): + output += self.serial.read() + print output.replace('\r', '\n').rstrip() + + def do_serial(self, p): + """Set the serial port, e.g.: /dev/tty.usbserial-A4001ib8""" + try: + self.serial.port = p + self.serial.open() + print 'Opening serial port: %s' % p + except Exception, e: + print 'Unable to open serial port: %s' % p + + def do_baudrate(self, p): + """Set the serial port's baud rate, e.g.: 19200""" + self.serial.baudrate = p + + def do_watch(self, p): + if not self.serial.isOpen(): + print "You must set a serial port first." + else: + while 1: + xbee = XBee1(self.serial) + packet = xbee.wait_read_frame() + print packet + + def do_exit(self, p): + """Exits from the XBee serial console""" + self.serial.close() + return 1 + +if __name__ == '__main__': + shell = XBeeShell() + shell.cmdloop() diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..5396571 --- /dev/null +++ b/setup.py @@ -0,0 +1,16 @@ +from distutils.core import setup + +setup( + name='XBee', + version='1.7.1', + author='Paul Malmsten', + author_email='pmalmsten@gmail.com', + packages=['xbee', 'xbee.tests'], + scripts=[], + url='http://code.google.com/p/python-xbee/', + license='LICENSE.txt', + description='Python tools for working with XBee radios', + long_description=open('README.txt').read(), + requires=['serial'], + provides=['xbee','xbee.tests'] +) diff --git a/xbee/__init__.py b/xbee/__init__.py new file mode 100644 index 0000000..99aa20b --- /dev/null +++ b/xbee/__init__.py @@ -0,0 +1,8 @@ +""" +XBee package initalization file + +By Paul Malmsten, 2010 +pmalmsten@gmail.com +""" + +from xbee.impl import XBee diff --git a/xbee/base.py b/xbee/base.py new file mode 100644 index 0000000..16ed889 --- /dev/null +++ b/xbee/base.py @@ -0,0 +1,373 @@ +""" +xbee.py + +By Paul Malmsten, 2010 +Inspired by code written by Amit Synderman and Marco Sangalli +pmalmsten@gmail.com + +XBee superclass module + +This class defines data and methods common to both the Xbee Series 1 and +Series 2 modules. This class should be subclassed in order to provide +series-specific functionality. +""" +import struct +from xbee.frame import APIFrame + +class XBeeBase(object): + """ + Abstract base class providing basic API frame generation, validation, + and data extraction methods for XBee modules + """ + + def __init__(self, ser, shorthand=True): + self.serial = ser + self.shorthand = shorthand + + def write(self, data): + """ + write: binary data -> None + + Packages the given binary data in an API frame and writes the + result to the serial port + """ + self.serial.write(APIFrame(data).output()) + + def wait_for_frame(self): + """ + wait_for_frame: None -> binary data + + wait_for_frame will read from the serial port until a valid + API frame arrives. It will then return the binary data + contained within the frame. + """ + WAITING = 0 + PARSING = 1 + + data = '' + state = WAITING + + while True: + if state == WAITING: + byte = self.serial.read() + + # If a start byte is found, swich states + if byte == APIFrame.START_BYTE: + data += byte + state = PARSING + else: + # Save all following bytes + data += self.serial.read() + + if len(data) == 3: + # We have the length bytes of the data + # Now, wait for the rest to appear + data_len = struct.unpack("> h", data[1:3])[0] + + # Wait for the expected number of bytes to appear + # Grab the checksum too + data += self.serial.read(data_len + 1) + + try: + # Try to parse and return result + return APIFrame.parse(data) + except ValueError: + # Bad frame, so restart + data = '' + state = WAITING + + def build_command(self, cmd, **kwargs): + """ + build_command: string (binary data) ... -> binary data + + build_command will construct a command packet according to the + specified command's specification in api_commands. It will expect + named arguments for all fields other than those with a default + value or a length of 'None'. + + Each field will be written out in the order they are defined + in the command definition. + """ + try: + cmd_spec = self.api_commands[cmd] + except AttributeError: + raise NotImplementedError("API command specifications could not be found; use a derived class which defines 'api_commands'.") + + packet = '' + + for field in cmd_spec: + try: + # Read this field's name from the function arguments dict + data = kwargs[field['name']] + except KeyError: + # Data wasn't given + # Only a problem if the field has a specific length + if field['len'] is not None: + # Was a default value specified? + default_value = field['default'] + if default_value: + # If so, use it + data = default_value + else: + # Otherwise, fail + raise KeyError( + "The expected field %s of length %d was not provided" + % (field['name'], field['len'])) + else: + # No specific length, ignore it + data = None + + # Ensure that the proper number of elements will be written + if field['len'] and len(data) != field['len']: + raise ValueError( + "The data provided for '%s' was not %d bytes long"\ + % (field['name'], field['len'])) + + # Add the data to the packet, if it has been specified + # Otherwise, the parameter was of variable length, and not + # given + if data: + packet += data + + return packet + + def split_response(self, data): + """ + split_response: binary data -> {'id':str, + 'param':binary data, + ...} + + split_response takes a data packet received from an XBee device + and converts it into a dictionary. This dictionary provides + names for each segment of binary data as specified in the + api_responses spec. + """ + # Fetch the first byte, identify the packet + # If the spec doesn't exist, raise exception + packet_id = data[0] + try: + packet = self.api_responses[packet_id] + except AttributeError: + raise NotImplementedError("API response specifications could not be found; use a derived class which defines 'api_responses'.") + except KeyError: + raise KeyError( + "Unrecognized response packet with id byte %s" + % data[0]) + + # Current byte index in the data stream + index = 1 + + # Result info + info = {'id':packet['name']} + packet_spec = packet['structure'] + + # Parse the packet in the order specified + for field in packet_spec: + # Store the number of bytes specified + # If the data field has no length specified, store any + # leftover bytes and quit + if field['len'] is not None: + # Are we trying to read beyond the last data element? + if index + field['len'] > len(data): + raise ValueError( + "Response packet was shorter than expected") + + field_data = data[index:index + field['len']] + info[field['name']] = field_data + else: + field_data = data[index:] + + # Were there any remaining bytes? + if field_data: + # If so, store them + info[field['name']] = field_data + index += len(field_data) + break + + # Move the index + index += field['len'] + + # If there are more bytes than expected, raise an exception + if index < len(data): + raise ValueError( + "Response packet was longer than expected") + + # Check if this packet was an IO sample + # If so, process the sample data + if 'parse_as_io_samples' in packet: + field_to_process = packet['parse_as_io_samples'] + info[field_to_process] = XBeeBase.parse_samples( + info[field_to_process]) + + return info + + @staticmethod + def parse_samples_header(data): + """ + parse_samples_header: binary data in XBee IO data format -> + (int, [int ...], [int ...]) + + parse_samples_header will read the first three bytes of the + binary data given and will return the number of samples which + follow, a list of enabled digital inputs and a list of enabled + analog inputs + """ + + ## Parse the header, bytes 0-2 + dio_enabled = [] + adc_enabled = [] + + # First byte: number of samples + len_raw = data[0] + len_samples = ord(len_raw) + + # Second-third bytes: enabled pin flags + sources_raw = data[1:3] + + # In order to put the io line names in list positions which + # match their number (for ease of traversal), the second byte + # will be read first + byte_2_data = ord(sources_raw[1]) + + # Check each flag + # DIO lines 0-7 + i = 1 + for dio in range(0, 8): + if byte_2_data & i: + dio_enabled.append(dio) + i *= 2 + + # Byte 1 + byte_1_data = ord(sources_raw[0]) + + # Grab DIO8 first + if byte_1_data & 1: + dio_enabled.append(8) + + # Check each flag (after the first) + # ADC lines 0-5 + i = 2 + for adc in range(0, 6): + if byte_1_data & i: + adc_enabled.append(adc) + i *= 2 + + return (len_samples, dio_enabled, adc_enabled) + + @staticmethod + def parse_samples(data): + """ + parse_samples: binary data in XBee IO data format -> + [ {"dio-0":True, + "dio-1":False, + "adc-0":100"}, ...] + + parse_samples reads binary data from an XBee device in the IO + data format specified by the API. It will then return a + dictionary indicating the status of each enabled IO port. + """ + + ## Parse and store header information + header_data = XBeeBase.parse_samples_header(data) + len_samples, dio_enabled, adc_enabled = header_data + + samples = [] + + ## Parse the samples + # Start at byte 3 + byte_pos = 3 + + for i in range(0, len_samples): + sample = {} + + # If one or more DIO lines are set, the first two bytes + # contain their values + if dio_enabled: + # Get two bytes + values = data[byte_pos:byte_pos + 2] + + # Read and store values for all enabled DIO lines + for dio in dio_enabled: + # Second byte contains values for 0-7 + # If we want number 8, switch to the first byte, and + # move back to the beginning + if dio > 7: + sample["dio-%d" % dio] = True if ord(values[0]) & 2 ** (dio - 8) else False + else: + sample["dio-%d" % dio] = True if ord(values[1]) & 2 ** dio else False + + # Move the starting position for the next new byte + byte_pos += 2 + + # If one or more ADC lines are set, the remaining bytes, in + # pairs, represent their values, MSB first. + if adc_enabled: + for adc in adc_enabled: + # Analog reading stored in two bytes + value_raw = data[byte_pos:byte_pos + 2] + + # Unpack the bits + value = struct.unpack("> h", value_raw)[0] + + # Only 10 bits are meaningful + value &= 0x3FF + + # Save the result + sample["adc-%d" % adc] = value + + # Move the starting position for the next new byte + byte_pos += 2 + + samples.append(sample) + + return samples + + def send(self, cmd, **kwargs): + """ + send: string param=binary data ... -> None + + When send is called with the proper arguments, an API command + will be written to the serial port for this XBee device + containing the proper instructions and data. + + This method must be called with named arguments in accordance + with the api_command specification. Arguments matching all + field names other than those in reserved_names (like 'id' and + 'order') should be given, unless they are of variable length + (of 'None' in the specification. Those are optional). + """ + # Pass through the keyword arguments + self.write(self.build_command(cmd, **kwargs)) + + + def wait_read_frame(self): + """ + wait_read_frame: None -> frame info dictionary + + wait_read_frame calls XBee.wait_for_frame() and waits until a + valid frame appears on the serial port. Once it receives a frame, + wait_read_frame attempts to parse the data contained within it + and returns the resulting dictionary + """ + + frame = self.wait_for_frame() + return self.split_response(frame.data) + + def __getattr__(self, name): + """ + If a method by the name of a valid api command is called, + the arguments will be automatically sent to an appropriate + send() call + """ + # If api_commands is not defined, raise NotImplementedError\ + # If its not defined, _getattr__ will be called with its name + if name == 'api_commands': + raise NotImplementedError("API command specifications could not be found; use a derived class which defines 'api_commands'.") + + # Is shorthand enabled, and is the called name a command? + if self.shorthand and name in self.api_commands: + # If so, simply return a function which passes its arguments + # to an appropriate send() call + return lambda **kwargs: self.send(name, **kwargs) + else: + raise AttributeError("XBee has no attribute '%s'" % name) diff --git a/xbee/frame.py b/xbee/frame.py new file mode 100644 index 0000000..2bf1055 --- /dev/null +++ b/xbee/frame.py @@ -0,0 +1,114 @@ +""" +frame.py + +By Paul Malmsten, 2010 +pmalmsten@gmail.com + +Represents an API frame for communicating with an XBee +""" +import struct + +class APIFrame: + """ + Represents a frame of data to be sent to or which was received + from an XBee device + """ + + START_BYTE = '\x7E' + + def __init__(self, data): + self.data = data + + def checksum(self): + """ + checksum: None -> single checksum byte + + checksum adds all bytes of the binary, unescaped data in the + frame, saves the last byte of the result, and subtracts it from + 0xFF. The final result is the checksum + """ + total = 0 + + # Add together all bytes + for byte in self.data: + total += ord(byte) + + # Only keep the last byte + total = total & 0xFF + + # Subtract from 0xFF + return chr(0xFF - total) + + def verify(self, chksum): + """ + verify: 1 byte -> boolean + + verify checksums the frame, adds the expected checksum, and + determines whether the result is correct. The result should + be 0xFF. + """ + total = 0 + + # Add together all bytes + for byte in self.data: + total += ord(byte) + + # Add checksum too + total += ord(chksum) + + # Only keep low bits + total &= 0xFF + + # Check result + return total == 0xFF + + def len_bytes(self): + """ + len_data: None -> (MSB, LSB) 16-bit integer length, two bytes + + len_bytes counts the number of bytes to be sent and encodes the + data length in two bytes, big-endian (most significant first). + """ + count = len(self.data) + return struct.pack("> h", count) + + def output(self): + """ + output: None -> valid API frame (binary data) + + output will produce a valid API frame for transmission to an + XBee module. + """ + # start is one byte long, length is two bytes + # data is n bytes long (indicated by length) + # chksum is one byte long + return APIFrame.START_BYTE + \ + self.len_bytes() + \ + self.data + \ + self.checksum() + + @staticmethod + def parse(raw_data): + """ + parse: valid API frame (binary data) -> binary data + + Given a valid API frame, empty_frame extracts the data contained + inside it and verifies it against its checksum + """ + # First two bytes are the length of the data + raw_len = raw_data[1:3] + + # Unpack it + data_len = struct.unpack("> h", raw_len)[0] + + # Read the data + data = raw_data[3:3 + data_len] + chksum = raw_data[-1] + + # Checksum check + frame = APIFrame(data) + if not frame.verify(chksum): + raise ValueError("Invalid checksum on given frame") + + # If the result is valid, return it + return frame diff --git a/xbee/impl.py b/xbee/impl.py new file mode 100644 index 0000000..2c27df5 --- /dev/null +++ b/xbee/impl.py @@ -0,0 +1,139 @@ +""" +impl.py + +By Paul Malmsten, 2010 +Inspired by code written by Amit Synderman and Marco Sangalli +pmalmsten@gmail.com + +This module implements an XBee Series 1/Series 2 driver. +""" +import struct +from xbee.base import XBeeBase + +class XBee(XBeeBase): + """ + Provides an implementation of the XBee API for Series 1/2 modules + with recent firmware. + + Commands may be sent to a device by instansiating this class with + a serial port object (see PySerial) and then calling the send + method with the proper information specified by the API. Data may + be read from a device (syncronously only, at the moment) by calling + wait_read_frame. + """ + # Packets which can be sent to an XBee + + # Format: + # {name of command: + # [{name:field name, len:field length, default: default value sent} + # ... + # ] + # ... + # } + api_commands = {"at": + [{'name':'id', 'len':1, 'default':'\x08'}, + {'name':'frame_id', 'len':1, 'default':'\x00'}, + {'name':'command', 'len':2, 'default':None}, + {'name':'parameter', 'len':None, 'default':None}], + "queued_at": + [{'name':'id', 'len':1, 'default':'\x09'}, + {'name':'frame_id', 'len':1, 'default':'\x00'}, + {'name':'command', 'len':2, 'default':None}, + {'name':'parameter', 'len':None, 'default':None}], + "remote_at": + [{'name':'id', 'len':1, 'default':'\x17'}, + {'name':'frame_id', 'len':1, 'default':'\x00'}, + # dest_addr_long is 8 bytes (64 bits), so use an unsigned long long + {'name':'dest_addr_long', 'len':8, 'default':struct.pack('>Q', 0)}, + {'name':'dest_addr', 'len':2, 'default':'\xFF\xFE'}, + {'name':'options', 'len':1, 'default':'\x02'}, + {'name':'command', 'len':2, 'default':None}, + {'name':'parameter', 'len':None, 'default':None}], + "tx_long_addr": + [{'name':'id', 'len':1, 'default':'\x00'}, + {'name':'frame_id', 'len':1, 'default':'\x00'}, + {'name':'dest_addr', 'len':8, 'default':None}, + {'name':'options', 'len':1, 'default':'\x00'}, + {'name':'data', 'len':None, 'default':None}], + "tx": + [{'name':'id', 'len':1, 'default':'\x01'}, + {'name':'frame_id', 'len':1, 'default':'\x00'}, + {'name':'dest_addr', 'len':2, 'default':None}, + {'name':'options', 'len':1, 'default':'\x00'}, + {'name':'data', 'len':None, 'default':None}] + } + + # Packets which can be received from an XBee + + # Format: + # {id byte received from XBee: + # {name: name of response + # structure: + # [ {'name': name of field, 'len':length of field} + # ... + # ] + # parse_as_io_samples:name of field to parse as io + # } + # ... + # } + # + api_responses = {"\x80": + {'name':'rx_long_addr', + 'structure': + [{'name':'source_addr', 'len':8}, + {'name':'rssi', 'len':1}, + {'name':'options', 'len':1}, + {'name':'rf_data', 'len':None}]}, + "\x81": + {'name':'rx', + 'structure': + [{'name':'source_addr', 'len':2}, + {'name':'rssi', 'len':1}, + {'name':'options', 'len':1}, + {'name':'rf_data', 'len':None}]}, + "\x82": + {'name':'rx_io_data_long_addr', + 'structure': + [{'name':'source_addr_long','len':8}, + {'name':'rssi', 'len':1}, + {'name':'options', 'len':1}, + {'name':'samples', 'len':None}], + 'parse_as_io_samples':'samples'}, + "\x83": + {'name':'rx_io_data', + 'structure': + [{'name':'source_addr', 'len':2}, + {'name':'rssi', 'len':1}, + {'name':'options', 'len':1}, + {'name':'samples', 'len':None}], + 'parse_as_io_samples':'samples'}, + "\x89": + {'name':'tx_status', + 'structure': + [{'name':'frame_id', 'len':1}, + {'name':'status', 'len':1}]}, + "\x8a": + {'name':'status', + 'structure': + [{'name':'status', 'len':1}]}, + "\x88": + {'name':'at_response', + 'structure': + [{'name':'frame_id', 'len':1}, + {'name':'command', 'len':2}, + {'name':'status', 'len':1}, + {'name':'parameter', 'len':None}]}, + "\x97": + {'name':'remote_at_response', + 'structure': + [{'name':'frame_id', 'len':1}, + {'name':'source_addr_long','len':8}, + {'name':'source_addr', 'len':2}, + {'name':'command', 'len':2}, + {'name':'status', 'len':1}, + {'name':'parameter', 'len':None}]}, + } + + def __init__(self, *args, **kwargs): + # Call the super class constructor to save the serial port + super(XBee, self).__init__(*args, **kwargs) diff --git a/xbee/tests/Fake.py b/xbee/tests/Fake.py new file mode 100644 index 0000000..87da333 --- /dev/null +++ b/xbee/tests/Fake.py @@ -0,0 +1,45 @@ +#! /usr/bin/python +""" +Fake.py + +By Paul Malmsten, 2010 +pmalmsten@gmail.com + +Provides fake device objects for other unit tests. +""" + +class FakeDevice: + """ + Represents a fake serial port for testing purposes + """ + def __init__(self): + self.data = '' + + def write(self, data): + """ + Writes data to the fake port for later evaluation + """ + self.data = data + +class FakeReadDevice: + """ + Represents a fake serial port which can be read from in a similar + fashion to the real thing + """ + + def __init__(self, data): + self.data = data + self.read_index = 0 + + def read(self, length=1): + """ + Read the indicated number of bytes from the port + """ + # If too many bytes would be read, raise exception + if self.read_index + length > len(self.data): + raise ValueError("Not enough bytes exist!") + + read_data = self.data[self.read_index:self.read_index + length] + self.read_index += length + + return read_data diff --git a/xbee/tests/__init__.py b/xbee/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xbee/tests/test_base.py b/xbee/tests/test_base.py new file mode 100644 index 0000000..4302c19 --- /dev/null +++ b/xbee/tests/test_base.py @@ -0,0 +1,113 @@ +#! /usr/bin/python +""" +test_xbee.py + +By Paul Malmsten, 2010 +pmalmsten@gmail.com + +Tests the XBeeBase superclass module for XBee API conformance. +""" +import unittest +from xbee.base import XBeeBase +from xbee.tests.Fake import FakeDevice, FakeReadDevice + +class TestWriteToDevice(unittest.TestCase): + """ + XBeeBase class should properly write binary data in a valid API + frame to a given serial device. + """ + + def test_write(self): + """ + write method should write the expected data to the serial + device + """ + device = FakeDevice() + + xbee = XBeeBase(device) + xbee.write('\x00') + + # Check resuting state of fake device + expected_frame = '\x7E\x00\x01\x00\xFF' + self.assertEqual(device.data, expected_frame) + + def test_write_again(self): + """ + write method should write the expected data to the serial + device + """ + device = FakeDevice() + + xbee = XBeeBase(device) + xbee.write('\x00\x01\x02') + + # Check resuting state of fake device + expected_frame = '\x7E\x00\x03\x00\x01\x02\xFC' + self.assertEqual(device.data, expected_frame) + +class TestReadFromDevice(unittest.TestCase): + """ + XBeeBase class should properly read and extract data from a valid + API frame + """ + def test_read(self): + """ + wait_for_frame should properly read a frame of data + """ + device = FakeReadDevice('\x7E\x00\x01\x00\xFF') + xbee = XBeeBase(device) + + frame = xbee.wait_for_frame() + self.assertEqual(frame.data, '\x00') + + def test_read_invalid_followed_by_valid(self): + """ + wait_for_frame should skip invalid data + """ + device = FakeReadDevice( + '\x7E\x00\x01\x00\xFA' + '\x7E\x00\x01\x05\xFA') + xbee = XBeeBase(device) + + frame = xbee.wait_for_frame() + self.assertEqual(frame.data, '\x05') + +class TestNotImplementedFeatures(unittest.TestCase): + """ + In order to properly use the XBeeBase class for most situations, + it must be subclassed with the proper attributes definined. If + this is not the case, then a NotImplemented exception should be + raised as appropriate. + """ + + def setUp(self): + """ + Set up a base class XBeeBase object which does not have + api_commands or api_responses defined + """ + self.xbee = XBeeBase(None) + + def test_build_command(self): + """ + build_command should raise NotImplemented + """ + self.assertRaises(NotImplementedError, self.xbee.build_command, "at") + + def test_split_response(self): + """ + split_command should raise NotImplemented + """ + self.assertRaises(NotImplementedError, self.xbee.split_response, "\00") + + def test_shorthand(self): + """ + Shorthand calls should raise NotImplementedError + """ + try: + self.xbee.at + except NotImplementedError: + pass + else: + self.fail("Shorthand call on XBeeBase base class should raise NotImplementedError") + +if __name__ == '__main__': + unittest.main() diff --git a/xbee/tests/test_fake.py b/xbee/tests/test_fake.py new file mode 100644 index 0000000..01dc04a --- /dev/null +++ b/xbee/tests/test_fake.py @@ -0,0 +1,44 @@ +#! /usr/bin/python +""" +test_xbee.py + +By Paul Malmsten, 2010 +pmalmsten@gmail.com + +Tests fake device objects for proper functionality. +""" +import unittest +from xbee.tests.Fake import FakeReadDevice + +class TestFakeReadDevice(unittest.TestCase): + """ + FakeReadDevice class should work as intended to emluate a serial + port + """ + def setUp(self): + """ + Create a fake read device for each test + """ + self.device = FakeReadDevice("test") + + def test_read_single_byte(self): + """ + reading one byte at a time should work as expected + """ + self.assertEqual(self.device.read(), 't') + self.assertEqual(self.device.read(), 'e') + self.assertEqual(self.device.read(), 's') + self.assertEqual(self.device.read(), 't') + + def test_read_multiple_bytes(self): + """ + reading multiple bytes at a time should work as expected + """ + self.assertEqual(self.device.read(3), 'tes') + self.assertEqual(self.device.read(), 't') + + def test_read_too_many(self): + """ + attempting to read too many bytes should raise an exception + """ + self.assertRaises(ValueError, self.device.read, 5) diff --git a/xbee/tests/test_frame.py b/xbee/tests/test_frame.py new file mode 100644 index 0000000..6399618 --- /dev/null +++ b/xbee/tests/test_frame.py @@ -0,0 +1,50 @@ +#! /usr/bin/python +""" +test_frame.py + +Paul Malmsten, 2010 +pmalmsten@gmail.com + +Tests frame module for proper behavior +""" +import unittest +from xbee.frame import APIFrame + +class TestAPIFrameGeneration(unittest.TestCase): + """ + XBee class must be able to create a valid API frame given binary + data, in byte string form. + """ + def test_single_byte(self): + """ + create a frame containing a single byte + """ + data = '\x00' + # start byte, two length bytes, data byte, checksum + expected_frame = '\x7E\x00\x01\x00\xFF' + + frame = APIFrame(data).output() + self.assertEqual(frame, expected_frame) + +class TestAPIFrameParsing(unittest.TestCase): + """ + XBee class must be able to read and validate the data contained + by a valid API frame. + """ + + def test_single_byte(self): + """ + read a frame containing a single byte + """ + frame = '\x7E\x00\x01\x00\xFF' + expected_data = '\x00' + + data = APIFrame.parse(frame).data + self.assertEqual(data, expected_data) + + def test_invalid_checksum(self): + """ + when an invalid frame is read, an exception must be raised + """ + frame = '\x7E\x00\x01\x00\xF6' + self.assertRaises(ValueError, APIFrame.parse, frame) diff --git a/xbee/tests/test_impl.py b/xbee/tests/test_impl.py new file mode 100644 index 0000000..a911631 --- /dev/null +++ b/xbee/tests/test_impl.py @@ -0,0 +1,562 @@ +#! /usr/bin/python +""" +test_xbee1.py + +By Paul Malmsten, 2010 +pmalmsten@gmail.com + +Tests the XBee Series 1/2 implementation class for XBee API compliance +""" +import unittest +from xbee.tests.Fake import FakeDevice, FakeReadDevice +from xbee.impl import XBee + +class InitXBee(unittest.TestCase): + """ + Base initalization class + """ + def setUp(self): + """ + Initialize XBee object + """ + self.xbee = XBee(None) + +class TestBuildCommand(InitXBee): + """ + build_command should properly build a command packet + """ + + def test_build_at_data_mismatch(self): + """ + if not enough or incorrect data is provided, an exception should + be raised. + """ + try: + self.xbee.build_command("at") + except KeyError: + # Test passes + return + + # No exception? Fail. + self.fail( + "An exception was not raised with improper data supplied" + ) + + def test_build_at_data_len_mismatch(self): + """ + if data of incorrect length is provided, an exception should be + raised + """ + try: + self.xbee.build_command("at", frame_id="AB", command="MY") + except ValueError: + # Test passes + return + + # No exception? Fail. + self.fail( + "An exception was not raised with improper data length" + ) + + def test_build_at(self): + """ + build_command should build a valid at command packet which has + no parameter data to be saved + """ + + at_command = "MY" + frame = chr(43) + data = self.xbee.build_command( + "at", + frame_id=frame, + command=at_command + ) + + expected_data = '\x08+MY' + self.assertEqual(data, expected_data) + + def test_build_at_with_default(self): + """ + build_command should build a valid at command packet which has + no parameter data to be saved and no frame specified (the + default value of \x00 should be used) + """ + + at_command = "MY" + data = self.xbee.build_command("at", command=at_command) + + expected_data = '\x08\x00MY' + self.assertEqual(data, expected_data) + +class TestSplitResponse(InitXBee): + """ + split_response should properly split a response packet + """ + + def test_unrecognized_response(self): + """ + if a response begins with an unrecognized id byte, + split_response should raise an exception + """ + data = '\x23\x00\x00\x00' + + try: + self.xbee.split_response(data) + except KeyError: + # Passes + return + + # Test Fails + self.fail() + + def test_bad_data_long(self): + """ + if a response doesn't match the specification's layout, + split_response should raise an exception + """ + # Over length + data = '\x8a\x00\x00\x00' + self.assertRaises(ValueError, self.xbee.split_response, data) + + def test_bad_data_short(self): + """ + if a response doesn't match the specification's layout, + split_response should raise an exception + """ + # Under length + data = '\x8a' + self.assertRaises(ValueError, self.xbee.split_response, data) + + def test_split_status_response(self): + """ + split_response should properly split a status response packet + """ + data = '\x8a\x01' + + info = self.xbee.split_response(data) + expected_info = {'id':'status', + 'status':'\x01'} + + self.assertEqual(info, expected_info) + + def test_split_short_at_response(self): + """ + split_response should properly split an at_response packet which + has no parameter data + """ + + data = '\x88DMY\x01' + info = self.xbee.split_response(data) + expected_info = {'id':'at_response', + 'frame_id':'D', + 'command':'MY', + 'status':'\x01'} + self.assertEqual(info, expected_info) + + def test_split_at_resp_with_param(self): + """ + split_response should properly split an at_response packet which + has parameter data + """ + + data = '\x88DMY\x01ABCDEF' + info = self.xbee.split_response(data) + expected_info = {'id':'at_response', + 'frame_id':'D', + 'command':'MY', + 'status':'\x01', + 'parameter':'ABCDEF'} + self.assertEqual(info, expected_info) + + +class TestParseIOData(InitXBee): + """ + XBee class should properly parse IO data received from an XBee + device + """ + + def test_parse_single_dio(self): + """ + parse_samples should properly parse a packet containing a single + sample of only digital io data + """ + # One sample, ADC disabled and DIO8 enabled, DIO 0-7 enabled + header = '\x01\x01\xFF' + + # First 7 bits ignored, DIO8 high, DIO 0-7 high + sample = '\x01\xFF' + data = header + sample + + expected_results = [{'dio-0':True, + 'dio-1':True, + 'dio-2':True, + 'dio-3':True, + 'dio-4':True, + 'dio-5':True, + 'dio-6':True, + 'dio-7':True, + 'dio-8':True}] + + results = self.xbee.parse_samples(data) + + self.assertEqual(results, expected_results) + + def test_parse_single_dio_again(self): + """ + parse_samples should properly parse a packet containing a single + sample of only digital io data, which alternates between on and + off + """ + # One sample, ADC disabled and DIO8 enabled, DIO 0-7 enabled + header = '\x01\x01\xFF' + + # First 7 bits ignored, DIO8 low, DIO 0-7 alternating + sample = '\x00\xAA' + data = header + sample + + expected_results = [{'dio-0':False, + 'dio-1':True, + 'dio-2':False, + 'dio-3':True, + 'dio-4':False, + 'dio-5':True, + 'dio-6':False, + 'dio-7':True, + 'dio-8':False}] + + results = self.xbee.parse_samples(data) + + self.assertEqual(results, expected_results) + + def test_parse_single_dio_subset(self): + """ + parse_samples should properly parse a packet containing a single + sample of only digital io data for only a subset of the + available pins + """ + # One sample, ADC disabled + # DIO 1,3,5,7 enabled + header = '\x01\x00\xAA' + + # First 7 bits ignored, DIO8 low, DIO 0-7 alternating + sample = '\x00\xAA' + data = header + sample + + expected_results = [{'dio-1':True, + 'dio-3':True, + 'dio-5':True, + 'dio-7':True}] + + results = self.xbee.parse_samples(data) + + self.assertEqual(results, expected_results) + + def test_parse_single_dio_subset_again(self): + """ + parse_samples should properly parse a packet containing a single + sample of only digital io data for only a subset of the + available pins + """ + # One sample, ADC disabled + # DIO 0 enabled + header = '\x01\x00\x01' + + # First 7 bits ignored, DIO8 low, DIO 0-7 alternating + sample = '\x00\xAA' + data = header + sample + + expected_results = [{'dio-0':False}] + + results = self.xbee.parse_samples(data) + + self.assertEqual(results, expected_results) + + def test_parse_multiple_dio_subset(self): + """ + parse_samples should properly parse a packet containing two + samples of only digital io data for one dio line + """ + # Two samples, ADC disabled + # DIO 0 enabled + header = '\x02\x00\x01' + + # First 7 bits ignored, DIO8 low, DIO 0-7 alternating + sample = '\x00\xAA' + '\x00\x01' + data = header + sample + + expected_results = [{'dio-0':False}, + {'dio-0':True}] + + results = self.xbee.parse_samples(data) + + self.assertEqual(results, expected_results) + + def test_parse_multiple_dio(self): + """ + parse_samples should properly parse a packet containing three + samples of only digital io data + """ + # Three samples, ADC disabled and DIO8 enabled, DIO 0-7 enabled + header = '\x03\x01\xFF' + + # First 7 bits ignored + # First sample: all bits on + # Second sample: alternating bits on + # Third sample: all bits off + sample = '\x01\xFF' + '\x00\xAA' + '\x00\x00' + data = header + sample + + expected_results = [{'dio-0':True, + 'dio-1':True, + 'dio-2':True, + 'dio-3':True, + 'dio-4':True, + 'dio-5':True, + 'dio-6':True, + 'dio-7':True, + 'dio-8':True}, + {'dio-0':False, + 'dio-1':True, + 'dio-2':False, + 'dio-3':True, + 'dio-4':False, + 'dio-5':True, + 'dio-6':False, + 'dio-7':True, + 'dio-8':False}, + {'dio-0':False, + 'dio-1':False, + 'dio-2':False, + 'dio-3':False, + 'dio-4':False, + 'dio-5':False, + 'dio-6':False, + 'dio-7':False, + 'dio-8':False}] + + results = self.xbee.parse_samples(data) + + self.assertEqual(results, expected_results) + + def test_parse_multiple_adc_subset(self): + """ + parse_samples should parse a data packet containing multiple + samples of adc data from multiple pins in the proper order + """ + # One sample, ADC 0,1 enabled + # DIO disabled + header = '\x02\x06\x00' + + # No dio data + # ADC0 value of 0 + # ADC1 value of 255 + # ADC0 value of 5 + # ADC1 value of 7 + sample = '\x00\x00' + '\x00\xFF' + '\x00\x05' + '\x00\x07' + data = header + sample + + expected_results = [{'adc-0':0, + 'adc-1':255}, + {'adc-0':5, + 'adc-1':7}] + + results = self.xbee.parse_samples(data) + + self.assertEqual(results, expected_results) + + def test_parse_single_dio_adc_subset(self): + """ + parse_samples should properly parse a packet containing a single + sample of digital and analog io data for only a subset of the + available pins + """ + # One sample, ADC 0 enabled + # DIO 1,3,5,7 enabled + header = '\x01\x02\xAA' + + # First 7 bits ignored, DIO8 low, DIO 0-7 alternating + # ADC0 value of 255 + sample = '\x00\xAA\x00\xFF' + data = header + sample + + expected_results = [{'dio-1':True, + 'dio-3':True, + 'dio-5':True, + 'dio-7':True, + 'adc-0':255}] + + results = self.xbee.parse_samples(data) + + self.assertEqual(results, expected_results) + +class TestWriteToDevice(unittest.TestCase): + """ + XBee class should properly write binary data in a valid API + frame to a given serial device, including a valid command packet. + """ + + def test_send_at_command(self): + """ + calling send should write a full API frame containing the + API AT command packet to the serial device. + """ + + serial_port = FakeDevice() + xbee = XBee(serial_port) + + # Send an AT command + xbee.send('at', frame_id='A', command='MY') + + # Expect a full packet to be written to the device + expected_data = '\x7E\x00\x04\x08AMY\x10' + self.assertEqual(serial_port.data, expected_data) + + + def test_send_at_command_with_param(self): + """ + calling send should write a full API frame containing the + API AT command packet to the serial device. + """ + + serial_port = FakeDevice() + xbee = XBee(serial_port) + + # Send an AT command + xbee.send( + 'at', + frame_id='A', + command='MY', + parameter='\x00\x00' + ) + + # Expect a full packet to be written to the device + expected_data = '\x7E\x00\x06\x08AMY\x00\x00\x10' + self.assertEqual(serial_port.data, expected_data) + +class TestSendShorthand(unittest.TestCase): + """ + Tests shorthand for sending commands to an XBee provided by + XBee.__getattr__ + """ + + def setUp(self): + """ + Prepare a fake device to read from + """ + self.ser = FakeDevice() + self.xbee = XBee(self.ser) + + def test_send_at_command(self): + """ + Send an AT command with a shorthand call + """ + # Send an AT command + self.xbee.at(frame_id='A', command='MY') + + # Expect a full packet to be written to the device + expected_data = '\x7E\x00\x04\x08AMY\x10' + self.assertEqual(self.ser.data, expected_data) + + def test_send_at_command_with_param(self): + """ + calling send should write a full API frame containing the + API AT command packet to the serial device. + """ + + # Send an AT command + self.xbee.at(frame_id='A', command='MY', parameter='\x00\x00') + + # Expect a full packet to be written to the device + expected_data = '\x7E\x00\x06\x08AMY\x00\x00\x10' + self.assertEqual(self.ser.data, expected_data) + + def test_shorthand_disabled(self): + """ + When shorthand is disabled, any attempt at calling a + non-existant attribute should raise AttributeError + """ + self.xbee = XBee(self.ser, shorthand=False) + + try: + self.xbee.at + except AttributeError: + pass + else: + self.fail("Specified shorthand command should not exist") + +class TestReadFromDevice(unittest.TestCase): + """ + XBee class should properly read and parse binary data from a serial + port device. + """ + def test_read_at(self): + """ + read and parse a parameterless AT command + """ + device = FakeReadDevice('\x7E\x00\x05\x88DMY\x01\x8c') + xbee = XBee(device) + + info = xbee.wait_read_frame() + expected_info = {'id':'at_response', + 'frame_id':'D', + 'command':'MY', + 'status':'\x01'} + self.assertEqual(info, expected_info) + + def test_read_at_params(self): + """ + read and parse an AT command with a parameter + """ + device = FakeReadDevice( + '\x7E\x00\x08\x88DMY\x01\x00\x00\x00\x8c' + ) + xbee = XBee(device) + + info = xbee.wait_read_frame() + expected_info = {'id':'at_response', + 'frame_id':'D', + 'command':'MY', + 'status':'\x01', + 'parameter':'\x00\x00\x00'} + self.assertEqual(info, expected_info) + + def test_read_io_data(self): + """ + XBee class should properly read and parse incoming IO data + """ + ## Build IO data + # One sample, ADC 0 enabled + # DIO 1,3,5,7 enabled + header = '\x01\x02\xAA' + + # First 7 bits ignored, DIO8 low, DIO 0-7 alternating + # ADC0 value of 255 + sample = '\x00\xAA\x00\xFF' + data = header + sample + + ## Wrap data in frame + # RX frame data + rx_io_resp = '\x83\x00\x01\x28\x00' + + device = FakeReadDevice( + '\x7E\x00\x0C'+ rx_io_resp + data + '\xfd' + ) + xbee = XBee(device) + + #pdb.set_trace() + info = xbee.wait_read_frame() + expected_info = {'id':'rx_io_data', + 'source_addr':'\x00\x01', + 'rssi':'\x28', + 'options':'\x00', + 'samples': [{'dio-1':True, + 'dio-3':True, + 'dio-5':True, + 'dio-7':True, + 'adc-0':255}] + } + self.assertEqual(info, expected_info) + + +if __name__ == '__main__': + unittest.main()