# 数据发送/接收模板说明
# 一、简介
数据模板是基于物大师成熟稳定的DTU固件架构和平台,做固件微开发,实现数据转换,业务动态修改,自定义业务逻辑的一种方便方法。通过DTU的配置平台,配置一段动态可执行的代码到DTU内部,设备可以自动执行这部分代码,实现网络数据和串口数据的收/发/解析/计算/转换,自由度高,灵活性强。常见的使用场景:
- 设备与服务器之间进行数据格式转换,比如modbus与json转;
- 自定义设备控制逻辑,任务截取收到的数据,然后转换成自定义逻辑;
- 数据加密解密;
# 二、使用注意事项
- 在编辑的时候,注意先在文档里面编辑,然后一次性拷贝到浏览器里面,不要在浏览器里面编辑
- 需要懂 Python 语言语法
- 数据的全局变量名称为
data,类型为bytes,转换后的数据变量也是如此 - 整个脚本不要太复杂,建议不超过20KB
- 程序逻辑要健壮,如果异常会导致整个设备崩溃重启
# 三、示例
将串口数据转换成json数据
import json
# 串口数据/网络数据 全局变量名为 data,类型为 bytes
data_str = data.decode()
result = {"data": data_str}
# 转换后的数据变量名为 data,类型为 bytes
data = bytes(json.dumps(result))
# 四、模板调试工具
我们提供了模板的调试工具,用于测试数据模板,点击链接下载:python-lib.zip,下载后解压,调试步骤如下:
- 编辑data.py文件,在里面声明
data变量,类型为 bytes,作为模拟的待转换的串口数据或者网络数据。 - 编辑main.py文件,编写你的数据处理逻辑
- 在解压后的文件夹内,打开控制台或者终端,执行 pika.exe main.py (Windows) 或者 pika main.py (MacOs/Linux)
- 查看控制台输出,检查有无报错
# 物、支持的API
底层使用PikaScript支持,提供了一些基础的API
# 5.1、builtins.pyi
# int构造函数
def int(arg: any, *base) -> any: ...
# bool构造函数
def bool(arg: any) -> bool: ...
# float构造函数
def float(arg: any) -> float: ...
# str构造函数
def str(arg: any) -> str: ...
# 迭代器
def iter(arg: any) -> any: ...
# 创建一个迭代器
def range(*ax) -> any: ...
# 调试串口打印
def print(*val, **ops): ...
# 设置对象属性
def __setitem__(obj: any, key: any, val: any) -> any: ...
# 获取对象属性
def __getitem__(obj: any, key: any) -> any: ...
# 获取对象类型
def type(arg: any) -> any: ...
# 判断对象是否是某个类型
def isinstance(object: any, classinfo: any) -> bool: ...
# 获取对象长度
def len(arg: any) -> int: ...
# 创建一个列表
def list(*val) -> any: ...
# 创建一个字典
def dict(*val) -> any: ...
# 创建一个元组
def tuple(*val) -> any: ...
# int转十六进制字符串
def hex(val: int) -> str: ...
# 返回一个字符的ASCII码
def ord(val: str) -> int: ...
# ASCII码转字符
def chr(val: int) -> str: ...
# bytes构造函数
def bytes(val: any) -> bytes: ...
# 十六进制字符串转bytes
def hex2Bytes(val: str) -> bytes: ...
# 字符串格式化
def cformat(fmt: str, *var) -> str: ...
# 获取对象的指针
def id(obj: any) -> int: ...
# 打开文件,暂未实现
def open(path: str, mode: str) -> object: ...
# 获取对象的所有属性和方法
def dir(obj: any) -> list: ...
# 执行代码
def exec(code: str): ...
# 执行代码并返回结果
def eval(code: str) -> any: ...
# 获取对象属性值
def getattr(obj: object, name: str) -> any: ...
# 设置对象属性值
def setattr(obj: object, name: str, val: any): ...
# 判断对象是否包含某个属性
def hasattr(obj: object, name: str) -> int: ...
# 退出程序
def exit(): ...
# 获取输入,暂未支持
def input(*info) -> str: ...
# 获取绝对值
def abs(val: any) -> any: ...
# 返回最大值
def max(*val) -> any: ...
# 返回最小值
def min(*val) -> any: ...
# 查看函数或模块用途的详细说明
def help(name: str): ...
# 重启设备,暂未实现
def reboot(): ...
# 暂未实现
def clear(): ...
# 暂未实现
def gcdump(): ...
class RangeObj:
def __next__(self) -> any: ...
class StringObj:
def __next__(self) -> any: ...
class object:
pass
class bytearray:
def __init__(self, bytes: any):
""" convert a bytes to ByteArray """
def __iter__(self) -> any:
""" support for loop """
def __next__(self) -> any:
""" support for loop """
def __getitem__(self, __key: int) -> int:
""" support [] index """
def __setitem__(self, __key: int, __val: int): ...
def __str__(self) -> str: ...
# 字节数组转换为字符串
def decode(self) -> str: ...
# 字节数组转换为16进制字符串
def decode_hex(self) -> str: ...
def __len__(self) -> int: ...
def __contains__(self, others: any) -> int: ...
def split(self, *vars) -> list: ...
class BaseException:
pass
class Exception(BaseException):
pass
class SystemExit(BaseException):
pass
class KeyboardInterrupt(BaseException):
pass
class GeneratorExit(BaseException):
pass
class Exception(BaseException):
pass
class StopIteration(Exception):
pass
class StopAsyncIteration(Exception):
pass
class ArithmeticError(Exception):
pass
class FloatingPointError(ArithmeticError):
pass
class OverflowError(ArithmeticError):
pass
class ZeroDivisionError(ArithmeticError):
pass
class AssertionError(Exception):
pass
class AttributeError(Exception):
pass
class BufferError(Exception):
pass
class EOFError(Exception):
pass
class ImportError(Exception):
pass
class ModuleNotFoundError(ImportError):
pass
class LookupError(Exception):
pass
class IndexError(LookupError):
pass
class KeyError(LookupError):
pass
class MemoryError(Exception):
pass
class NameError(Exception):
pass
class UnboundLocalError(NameError):
pass
class OSError(Exception):
pass
class BlockingIOError(OSError):
pass
class ChildProcessError(OSError):
pass
class ConnectionError(OSError):
pass
class BrokenPipeError(ConnectionError):
pass
class ConnectionAbortedError(ConnectionError):
pass
class ConnectionRefusedError(ConnectionError):
pass
class ConnectionResetError(ConnectionError):
pass
class FileExistsError(OSError):
pass
class FileNotFoundError(OSError):
pass
class InterruptedError(OSError):
pass
class IsADirectoryError(OSError):
pass
class NotADirectoryError(OSError):
pass
class PermissionError(OSError):
pass
class ProcessLookupError(OSError):
pass
class TimeoutError(OSError):
pass
class ReferenceError(Exception):
pass
class RuntimeError(Exception):
pass
class NotImplementedError(RuntimeError):
pass
class RecursionError(RuntimeError):
pass
class SyntaxError(Exception):
pass
class IndentationError(SyntaxError):
pass
class TabError(IndentationError):
pass
class SystemError(Exception):
pass
class TypeError(Exception):
pass
class ValueError(Exception):
pass
class UnicodeError(ValueError):
pass
class UnicodeDecodeError(UnicodeError):
pass
class UnicodeEncodeError(UnicodeError):
pass
class UnicodeTranslateError(UnicodeError):
pass
class Warning(Exception):
pass
class DeprecationWarning(Warning):
pass
class PendingDeprecationWarning(Warning):
pass
class RuntimeWarning(Warning):
pass
class SyntaxWarning(Warning):
pass
class UserWarning(Warning):
pass
class FutureWarning(Warning):
pass
class ImportWarning(Warning):
pass
class UnicodeWarning(Warning):
pass
class BytesWarning(Warning):
pass
class ResourceWarning(Warning):
pass
# 5.2、ctypes.pyi
class c_uint(TinyObj):
def __init__(self, value:int):...
class c_float(TinyObj):
def __init__(self, value:float):...
class c_wchar_p(TinyObj):
def __init__(self, value:str):...
class c_bool(c_uint):...
class c_byte(c_uint):...
class c_ubyte(c_uint):...
class c_short(c_uint):...
class c_int(c_uint):...
class c_long(c_uint):...
class c_ulong(c_uint):...
class c_longlong(c_uint):...
class c_ulonglong(c_uint):...
class c_size_t(c_uint):...
class c_ssize_t(c_uint):...
class c_void_p(c_uint):...
class c_char(c_wchar_p):...
class c_wchar(c_wchar_p):...
class c_char_p(c_wchar_p):...
class c_double(c_float):...
class c_longdouble(c_float):...
class create_string_buffer(TinyObj):
def __init__(self, size:int):...
# support val = string[]
def __getitem__(self, __key: int) -> int:
pass
class c_buffer(TinyObj):
def __init__(self, value:any, size:int):
pass
# support val = string[]
def __getitem__(self, __key: int) -> int:
pass
# 5.3 PikaStdData.pyi
from PikaObj import *
import builtins
class Tuple:
def __init__(self): ...
def get(self, i: int) -> any:
"""get an arg by the index"""
def len(self) -> int:
"""get the length of list"""
def __iter__(self) -> any:
"""support for loop"""
def __next__(self) -> any:
"""support for loop"""
def __getitem__(self, __key: any) -> any:
"""support val = list[]"""
def __del__(self): ...
def __str__(self) -> str: ...
def __len__(self) -> int: ...
def __eq__(self, other: any) -> int:
"""support tuple == tuple"""
def __add__(self, others: Tuple) -> Tuple:
"""support tuple + tuple"""
def __mul__(self, n: int) -> Tuple:
"""support tuple * int"""
class List(Tuple):
def __init__(self): ...
def append(self, arg: any):
"""add an arg after the end of list"""
def set(self, i: int, arg: any):
"""set an arg by the index"""
def reverse(self):
"""reverse the list"""
def pop(self, *index) -> any:
"""pop the last element"""
def remove(self, val: any):
"""remove the first element"""
def insert(self, i: int, arg: any):
"""insert an arg before the index"""
def __setitem__(self, __key: any, __val: any):
"""support list[] = val"""
def __str__(self) -> str: ...
def __add__(self, others: List) -> List:
""" support list + list"""
def __mul__(self, n: int) -> Tuple:
""" support list * int"""
class Dict:
def __init__(self):
""" get an arg by the key """
def get(self, key: str) -> any: ...
def set(self, key: str, arg: any):
""" set an arg by the key """
def remove(self, key: str):
""" remove an arg by the key """
def __iter__(self) -> any: ...
def __next__(self) -> any: ...
def __setitem__(self, __key: any, __val: any):
""" support dict[] = val """
def __getitem__(self, __key: any) -> any:
""" support val = dict[] """
def __del__(self): ...
def __str__(self) -> str: ...
def keys(self) -> dict_keys: ...
def items(self) -> dict_items: ...
def __len__(self) -> int: ...
def __contains__(self, val: any) -> int:
""" support val in dict """
def update(self, other: Dict):
""" update dict """
def __eq__(self, other: any) -> int:
""" support dict == dict """
class dict_keys:
def __iter__(self) -> any: ...
def __next__(self) -> any: ...
def __str__(self) -> str: ...
def __len__(self) -> int: ...
class dict_items:
def __iter__(self) -> any: ...
def __next__(self) -> any: ...
def __str__(self) -> str: ...
def __len__(self) -> int: ...
class String:
def __init__(self, s: str): ...
def set(self, s: str): ...
def get(self) -> str: ...
def __iter__(self) -> any: ...
def __next__(self) -> any: ...
def __setitem__(self, __key: any, __val: any):
""" support string[] = val """
def __getitem__(self, __key: any) -> any:
""" support val = string[] """
def __str__(self) -> str:
""" support str() """
def __len__(self) -> int: ...
def encode(self, *encoding) -> bytes: ...
def startswith(self, prefix: str) -> int: ...
def endswith(self, suffix: str) -> int: ...
def isdigit(self) -> int: ...
def islower(self) -> int: ...
def isalnum(self) -> int: ...
def isalpha(self) -> int: ...
def isspace(self) -> int: ...
def split(self, *s) -> List: ...
def replace(self, old: str, new: str) -> str: ...
def strip(self, *chrs) -> str: ...
def format(self, *vars) -> str: ...
def join(self, val: any) -> str: ...
def find(self, sub: str) -> int: ...
class ByteArray(builtins.bytearray):
pass
class Utils:
def int_to_bytes(self, val: int) -> bytes:
""" convert a int to bytes """
# 5.4、PikaStdLib.pyi
from PikaObj import *
class MemChecker:
def max(self): ...
def now(self): ...
@PIKA_C_MACRO_IF("!PIKA_NANO_ENABLE")
def getMax(self) -> float: ...
@PIKA_C_MACRO_IF("!PIKA_NANO_ENABLE")
def getNow(self) -> float: ...
@PIKA_C_MACRO_IF("!PIKA_NANO_ENABLE")
def resetMax(self): ...
class REPL:
def setEcho(self, echo: bool): ...
class SysObj:
pass
# 5.5、json.pyi
# json字符串解析
def loads(json_str: str) -> any: ...
# 对象转为json字符串
def dumps(d: any) -> str: ...