import network
import utime as time
# open WiFi interface in STA mode
wifi = network.WLAN( network.STA_IF )
# activate the WiFi interface (if_up)
wifi.active( True )
print ('Press Ctrl+C to stop WiFi scanning...')
try:
while True:
# scan WiFi
scan_results = wifi.scan()
# print scan results (a list of tuples)
print(60*'=')
for ap in scan_results:
print( ap )
print(60*'-')
except KeyboardInterrupt:
print('Terminated...')
# deactivate the WiFi interface
wifi.active(False)
import network
import ujson as json
param_names = ['essid','mac','channel','rssi','authmode','hidden']
authmodes = ['OPEN','WEP','WPA-PSK',
'WPA2-PSK','WPA/WPA2-PSK','MAX']
def mac_bytes_to_hex_str( mac_bytes ):
return ':'.join(['%02X' % x for x in mac_bytes])
def convert_to_json( found_list ):
results = {'count': len(found_list), 'found_list': []}
for ap in found_list:
pairs = dict(zip(param_names,ap))
for name in pairs:
value = pairs.get(name)
if type(value)==bytes:
if name=='mac' and len(value)==6:
value = mac_bytes_to_hex_str( value )
else:
value = value.decode('ascii')
else:
try:
value = int(value)
if name=='authmode' and 0 <= value <= 5:
value = authmodes[value]
except (ValueError, TypeError):
pass
pairs[name] = value
results['found_list'].append( pairs )
return results# open WiFi interface in STA mode
# activate the WiFi interface
wifi = network.WLAN( network.STA_IF )
wifi.active(True)
# scan WiFi and get the results as a list of tuples
scan_results = wifi.scan()
# deactivate the WiFi interface
wifi.active(False)
# convert a list of tuples to json data
results = convert_to_json( scan_results )
if results.get('count') > 0:
for item in results.get('found_list'):
print( json.dumps(item) ) # show JSON string
else:
print( 'No WiFi found')
3. Wi-Fi Connection in STA Mode
การเชื่อมต่อกับเครือข่าย Wi-Fi ในโหมด STA จะต้องระบุชื่อ SSID และรหัสผ่าน (Password) สำหรับเชื่อมต่ออุปกรณ์ไร้สาย เช่น Wireless Access Point / Router ตามตัวอย่างโค้ดต่อไปนี้
import network
import utime as time
# Specify the SSID and password of your WiFi network
WIFI_CFG = { 'ssid': "XXXXXX", 'pwd': "XXXXXXXXX" }
def connect_wifi( wifi_cfg, max_retries=10 ):
# use WiFi in station mode (not AP)
wifi_sta = network.WLAN( network.STA_IF )
# activate the WiFi interface (up)
wifi_sta.active(True)
# connect to the specified WiFi AP
wifi_sta.connect( wifi_cfg['ssid'], wifi_cfg['pwd'] )
retries = 0
while not wifi_sta.isconnected():
retries = retries + 1
if retries >= max_retries:
return None
time.sleep_ms(500)
return wifi_sta
# try to connect the network
wifi = connect_wifi( WIFI_CFG )
if wifi is None:
print( 'WiFi connection failed' )
else:
ipaddr, netmask, gateway, dns = wifi.ifconfig()
print("============================")
print("IP address :", ipaddr)
print("Net mask :", netmask)
print("Gateway :", gateway)
print("DNS server :", dns)
print("----------------------------")
4. Wi-Fi in AP Mode + Simple HTTP Server
ตัวอย่างถัดไป สาธิตการเปิดใช้งาน Wi-Fi ในโหมด AP แบบ Open (ไม่มีการป้องกันด้วยรหัสผ่าน) โดยการสร้างและเรียกใช้ฟังก์ชัน start_wifi_ap() จากนั้นเมื่อ ESP32 ทำงานเป็น Wi-Fi AP แล้ว (จะมีหมายเลขไอพีตรงกับ 192.168.4.1 และเลือกใช้ช่องสัญญาณหมายเลข 11) ยังได้มีการสร้างและเรียกใช้ฟังก์ชัน start_web_server() เพื่อให้สามารถทำงานเป็น Web Server (HTTP protocol) อย่างง่ายได้ด้วย แสดงข้อความเป็นหน้าเว็บได้
ในตัวอย่างนี้ ถ้าต้องการจบการทำงานของ Web Server เราจะใช้วิธีตรวจสอบการกดปุ่ม ดังนั้นจะต้องมีการต่อวงจรกดปุ่มภายนอกแบบ Active-Low และเลือกใช้ขา GPIO23 เป็นอินพุตสำหรับปุ่มกด
import network
import utime as time
import usocket as socket
from machine import Pin
AP_CFG = { 'essid': 'MyAP', # SSID for your AP
'authmode': network.AUTH_OPEN, 'channel': 11, }
def start_wifi_ap( ap_cfg ):
# open WiFi in AP mode
ap = network.WLAN( network.AP_IF )
# activate AP interface (up)
ap.active(True)
# configure the AP interface
try:
ap.config( **ap_cfg )
except Exception as ex:
print ( 'Cannot set AP configuration...' )
if ap.active():
return ap
else:
return False
def start_web_server( btn=None ):
global sock
# create a socket (STREAM TCP socket) for network connection
sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
# set socket option: reuse address
sock.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
# use socket in non-blocking mode
sock.setblocking( False )
# get the server address (localhost)
addr = socket.getaddrinfo('0.0.0.0',80)[0][-1]
# bind the socket to port 80 (HTTP)
sock.bind( addr )
# listen for an incoming connection
sock.listen(1)
while True:
try:
# waiting for connection
conn,addr = sock.accept()
except OSError:
if btn and btn.value()==0: # check button
sock.close() # close socket
break # break the loop
else:
continue
# read incoming request data
request = conn.recv(1024)
print( 'Content = %s' % str(request) )
# send HTML as response
html = '<html><head></head><body>'
html += '<h1>Welcome to ESP32...</h1><br>'
html += '<b>Client from {}<b><br>'.format(str(addr))
html += '</body></html>'
conn.send( html ) # send HTML response
conn.close() # close HTTP connection
time.sleep_ms(10)
sock = None
ap = start_wifi_ap( AP_CFG )
ipaddr = ap.ifconfig()[0]
print ( 'IP address:', ipaddr ) # 192.168.4.1
BTN_GPIO = const(23) # use GPIO23 for push button
btn = Pin( BTN_GPIO, Pin.IN, Pin.PULL_UP )
try:
start_web_server( btn ) # start web server
except KeyboardInterrupt:
pass
finally:
if sock:
sock.close()
ap.active(False) # turn off WiFi AP
ถ้าจะเปลี่ยนจาก Open เป็นการตั้งรหัสป้องกันด้วยวิธี WPA2-PSK ก็กำหนดค่าสำหรับ Configuration ตามตัวอย่างดังนี้
ตัวอย่างถัดไปสาธิตการเปิดใช้งาน ESP32 ในโหมด STA และเชื่อมต่อกับอุปกรณ์ Wi-Fi AP ที่สามารถเชื่อมต่อไปยังอินเทอร์เน็ตได้ และใช้คำสั่ง ntptime.settime() เพื่อเชื่อมต่อกับคอมพิวเตอร์ในอินเทอร์เน็ตที่ทำหน้าที่เป็น NTP Server และอัปเดทเวลาปัจจุบันสำหรับการทำงานของวงจรของ RTC (Real-Time Clock) ของ ESP32
import network
import utime as time
import ntptime
import machine
# Specify the SSID and password of your WiFi network
WIFI_CFG = { 'ssid': "XXXX", 'pwd': "XXXXXXX" }
def connect_wifi( wifi_cfg ):
wifi_sta = network.WLAN( network.STA_IF )
wifi_sta.active(True)
wifi_sta.connect( wifi_cfg['ssid'], wifi_cfg['pwd'] )
while not wifi_sta.isconnected():
time.sleep(1.0)
print(wifi_sta.ifconfig())
def sync_ntp():
while True:
try:
# synchronize RTC with NTP server
ntptime.settime()
break
except OSError:
print('NTP server: connection timeout...')
connect_wifi( WIFI_CFG )
sync_ntp()
rtc = machine.RTC()
tm = rtc.datetime() # get current RTC datetime (now)
tz_offset = +7 # timezone offset (GMT+7)
hh, mm, ss = (tm[4]+tz_offset) % 24, tm[5], tm[6]
print( 'Time: {}:{}:{}'.format( hh, mm, ss ) )
y, m, d = tm[0], tm[1], tm[2]
print( 'Date: {}-{:02d}-{:02d}'.format( y, m, d ) )
6. HTTP GET: Getting Datetime Using WorldTime API
ตัวอย่างถัดไปสาธิตการเปิดใช้งาน ESP32 ในโหมด STA และเชื่อมต่อกับอุปกรณ์ Wi-Fi AP ที่สามารถเชื่อมต่อไปยังอินเทอร์เน็ตได้ และร้องขอข้อมูลแบบ HTTP Request / GET Method ไปยัง World Time API ซึ่งเป็นผู้ให้บริการข้อมูลเกี่ยวกับวันและเวลาในปัจจุบันตามโซนเวลา (Timezone) ที่ระบุไว้ ข้อมูลที่ได้รับกลับมานั้น จะอยู่ในรูปแบบของ JSON String
import network
import utime as time
import ujson as json
import urequests as requests
URL = "http://worldtimeapi.org/api/timezone/Asia/Bangkok"
# Specify the SSID and password of your WiFi network
WIFI_CFG = { 'ssid': "XXXX", 'pwd': "XXXXXXX" }
def connect_wifi( wifi_cfg, max_retries=10 ):
# use WiFi in station mode (not AP)
wifi_sta = network.WLAN( network.STA_IF )
# activate the WiFi interface (up)
wifi_sta.active(True)
# connect to the specified WiFi AP
wifi_sta.connect( wifi_cfg['ssid'], wifi_cfg['pwd'] )
retries = 0
while not wifi_sta.isconnected():
retries = retries + 1
if retries >= max_retries:
return None
time.sleep_ms(500)
return wifi_sta
def get_worldtime_data( url ):
resp = requests.get( url )
if resp.status_code==200: # request ok
try:
data = json.loads(resp.text)
except Exception as ex:
print ('JSON data error...' )
return
print( 'Timezone:', data['timezone'] )
print( 'Epoch time (Jan 1, 1970):', int(data['unixtime']) )
print( 'UTC datetime:', data['utc_datetime'] )
print( 'Local datetime:', data['datetime'] )
print( 'UTC offset (hours):', data['utc_offset'] )
print( 'UTC offset (seconds):', int(data['raw_offset']) )
else:
print ('HTTP request error...')
# try to connect the network
wifi = connect_wifi( WIFI_CFG )
if wifi is not None:
get_worldtime_data( URL )
else:
print('No WiFi connection')
ตัวอย่างเอาต์พุตที่ได้
Timezone: Asia/Bangkok
Epoch time (Jan 1, 1970): 1609939805
UTC datetime: 2021-01-06T13:30:05.138524+00:00
Local datetime: 2021-01-06T20:30:05.138524+07:00
UTC offset (hours): +07:00
UTC offset (seconds): 25200
7. HTTPS GET: Thailand's COVID-19 Status Update
ตัวอย่างถัดไป สาธิตการดึงข้อมูลเกี่ยวกับผู้ป่วย COVID-19 สำหรับประเทศไทย ด้วยวิธี HTTPS GET จาก API ของเว็บ covid19.th-stat.comข้อมูลที่ได้จะอยู่ในรูปของ JSON String
import network
import utime as time
import urequests as requests
URL = "https://covid19.th-stat.com/api/open/today"
# Specify the SSID and password of your WiFi network
WIFI_CFG = { 'ssid': "XXXX", 'pwd': "XXXXXXX" }
WIFI_CFG = { 'ssid': "esl_ap", 'pwd': "cnch2687" }
COVID19_NAMES = [
('Confirmed', u'ยอดผู้ป่วยสะสม'),
('Recovered', u'ผู้ป่วยรักษาหายแล้ว'),
('Hospitalized', u'ผู้ป่วยรักษาตัวในโรงพยาบาล'),
('Deaths', u'ผู้เสียชีวิตสะสม'),
('NewConfirmed', u'ผู้ป่วยตรวจพบเพิ่มวันนี้'),
('NewRecovered', u'ผู้ป่วยรักษาหาย'),
('NewDeaths', u'ผู้เสียชีวิตวันนี้'),
('UpdateDate', u'อัปเดตล่าสุด') ]
def get_covid19_data( url ):
data = None
try:
resp = requests.get( url )
data = resp.json()
for name in COVID19_NAMES[:-1]:
if name[0] in data:
value = str(data[name[0]])
print('{}:\t\t\t\t{}'.format(name[1],value) )
name = COVID19_NAMES[-1]
update = data[ name[0] ]
print( '{}: {}'.format(name[1], update) )
except Exception as ex:
print('error:', ex)
return data
def connect_wifi( wifi_cfg ):
wifi_sta = network.WLAN( network.STA_IF )
wifi_sta.active(True)
wifi_sta.connect( wifi_cfg['ssid'], wifi_cfg['pwd'] )
while not wifi_sta.isconnected():
time.sleep(1.0)
print(wifi_sta.ifconfig())
connect_wifi( WIFI_CFG )
import gc
gc.collect()
get_covid19_data( URL )
เราสามารถลองใช้ไลบรารี urequests ซึ่งเป็นโมดูลแบบ built-in สำหรับไมโครไพธอน และใช้คำสั่ง urequests.post() สำหรับส่งข้อมูลแบบ POST ได้ แต่จากการที่ได้ลองใช้กับ LINE Notify API พบว่า มีความผิดพลาดเกิดขึ้นในการรับข้อมูลตอบกลับ (Response) แต่ก็ได้รับข้อความใน LINE App
import network
import sys
import ujson as json
import urllib.parse
import urequests as requests
JSON_CONFIG_FILE = 'config.json'
config = {}
try:
with open( JSON_CONFIG_FILE ) as json_file:
config = json.load(json_file)
except OSError as ex:
print('Cannot open JSON file')
sys.exit(-1)
sta_if = network.WLAN( network.STA_IF )
sta_if.active( True )
sta_if.connect( config['ssid'], config['pwd'] )
while not sta_if.isconnected():
print('waiting for Wi-Fi connection')
time.sleep(1.0)
#########################################################
token = config['line_notify_token']
url = 'https://notify-api.line.me/api/notify'
msg = {'message': u'ในบ้าน อุณหภูมิ 29.8 °C ความชื้น 69.0%' }
headers = {'Authorization' : 'Bearer %s' % token,
'Content-Type' : 'application/x-www-form-urlencoded' }
data = '{}={}'.format( 'message', urllib.parse.quote(msg['message']) )
try:
resp = requests.post( url, headers=headers, data=data )
print(resp.status_code, resp.text)
except Exception as ex:
print(ex)
12. ตรวจสอบการเชื่อมต่อในเครือข่ายด้วย Ping
ตัวอย่างถัดไปสาธิตการตรวจสอบการเชื่อมต่อกับเครื่องคอมพิวเตอร์หรืออุปกรณ์ในเครือข่าย โดยใช้คำสั่ง ping ซึ่งเป็นการส่งแพคเกจตามรูปแบบ ICMP (Internet Control Message Protocol)
โค้ดตัวอย่างต่อไปนี้ ดัดแปลงมาจากโค้ด uping.py (Micro-Ping for MicroPython) ที่ได้มีการแชร์ไว้ใน gist.github.com
import network
import utime as time
import ujson as json
########################################################
def checksum(data): # CRC16
if len(data) % 2 == 1:
data += b'\x00'
chksum = 0
for pos in range(0, len(data), 2):
hi_byte = data[pos]
lo_byte = data[pos + 1]
chksum += (hi_byte << 8) + lo_byte
while chksum >= 0x10000:
chksum = (chksum & 0xffff) + (chksum >> 16)
chksum = ~chksum & 0xffff
return chksum
def ping(host, count=4, timeout=5000,
interval=10, quiet=False, size=64):
import utime
import uselect
import uctypes
import usocket
import ustruct
import urandom
# prepare packet
assert( (size >= 16), "packet size too small")
assert( (size <= 1000), "packet size too big")
pkt = b'Q'*(size)
pkt_desc = {
"type" : uctypes.UINT8 | 0,
"code" : uctypes.UINT8 | 1,
"checksum" : uctypes.UINT16 | 2,
"id" : uctypes.UINT16 | 4,
"seq" : uctypes.INT16 | 6,
"timestamp": uctypes.UINT64 | 8,
} # packet header descriptor
h = uctypes.struct( uctypes.addressof(pkt),
pkt_desc, uctypes.BIG_ENDIAN )
h.type = 8 # ICMP_ECHO_REQUEST
h.code = 0
h.checksum = 0
h.id = urandom.getrandbits(16)
h.seq = 1
# init socket
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1)
sock.setblocking(0)
sock.settimeout(timeout/1000.0)
addr = usocket.getaddrinfo(host, 1)[0][-1][0] # ip address
sock.connect((addr, 1))
if not quiet:
print("PING %s (%s): %u data bytes" % (host, addr, len(pkt)))
seqs = list(range(1, count+1))
c = 1
t = 0
n_trans = 0
n_recv = 0
finish = False
while t < timeout:
if t==interval and c<=count:
# send packet
h.checksum = 0
h.seq = c
h.timestamp = utime.ticks_us()
h.checksum = checksum(pkt)
if sock.send(pkt) == size:
n_trans += 1
t = 0 # reset timeout
else:
seqs.remove(c)
c += 1
# recv packet
while True:
socks,_,_ = uselect.select([sock],[],[],0)
if socks:
# receive incoming packet as response
resp = socks[0].recv(1024)
# create a memoryview object
resp_mv = memoryview(resp)
h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]),
pkt_desc, uctypes.BIG_ENDIAN)
# validate checksum
calc_chksum = checksum(bytes(resp_mv[24:]))
chksum = ustruct.unpack("!H", resp_mv[22:24])[0]
chksum_ok = chksum == calc_chksum
assert chksum_ok, 'Checksum failed...'
seq = h2.seq
if h2.type==0 and h2.id==h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY
t_elasped = (utime.ticks_us()-h2.timestamp) / 1000
ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live
n_recv += 1
if not quiet:
args = (len(resp), addr, seq, ttl, t_elasped)
print("{} bytes from {}: icmp_seq={}, ttl={}, time={:.1f} ms".format(*args))
seqs.remove(seq)
if len(seqs) == 0:
finish = True
break
else:
break
if finish:
break
utime.sleep_ms(1)
t += 1
# close
sock.close()
ret = (n_trans, n_recv)
if not quiet:
args = (n_trans, n_recv)
print("{} packets transmitted, {} packets received".format(*args))
return (n_trans, n_recv)
########################################################
JSON_CONFIG_FILE = 'config.json'
config = {}
try:
with open( JSON_CONFIG_FILE ) as json_file:
config = json.load(json_file)
except OSError as ex:
print('Cannot open JSON file')
sys.exit(-1)
sta_if = network.WLAN( network.STA_IF )
sta_if.active( True )
sta_if.connect( config['ssid'], config['pwd'] )
while not sta_if.isconnected():
print('waiting for Wi-Fi connection')
time.sleep(1.0)
ping('8.8.8.8',count=5, timeout=5000, interval=200,
quiet=False, size=44)
########################################################
ตัวอย่างข้อความเอาต์พุต
PING 8.8.8.8 (8.8.8.8): 44 data bytes
64 bytes from 8.8.8.8: icmp_seq=1, ttl=112, time=139.7 ms
64 bytes from 8.8.8.8: icmp_seq=2, ttl=112, time=64.4 ms
64 bytes from 8.8.8.8: icmp_seq=3, ttl=112, time=150.3 ms
64 bytes from 8.8.8.8: icmp_seq=4, ttl=112, time=61.7 ms
64 bytes from 8.8.8.8: icmp_seq=5, ttl=112, time=158.2 ms
5 packets transmitted, 5 packets received