import network
import utime as time
# open WiFi interface in STA mode
wifi = network.WLAN( network.STA_IF )
# activate the WiFi interface (if_up)
wifi.active( True )
print ('Press Ctrl+C to stop WiFi scanning...')
try:
while True:
# scan WiFi
scan_results = wifi.scan()
# print scan results (a list of tuples)
print(60*'=')
for ap in scan_results:
print( ap )
print(60*'-')
except KeyboardInterrupt:
print('Terminated...')
# deactivate the WiFi interface
wifi.active(False)
import network
import ujson as json
param_names = ['essid','mac','channel','rssi','authmode','hidden']
authmodes = ['OPEN','WEP','WPA-PSK',
'WPA2-PSK','WPA/WPA2-PSK','MAX']
def mac_bytes_to_hex_str( mac_bytes ):
return ':'.join(['%02X' % x for x in mac_bytes])
def convert_to_json( found_list ):
results = {'count': len(found_list), 'found_list': []}
for ap in found_list:
pairs = dict(zip(param_names,ap))
for name in pairs:
value = pairs.get(name)
if type(value)==bytes:
if name=='mac' and len(value)==6:
value = mac_bytes_to_hex_str( value )
else:
value = value.decode('ascii')
else:
try:
value = int(value)
if name=='authmode' and 0 <= value <= 5:
value = authmodes[value]
except (ValueError, TypeError):
pass
pairs[name] = value
results['found_list'].append( pairs )
return results# open WiFi interface in STA mode
# activate the WiFi interface
wifi = network.WLAN( network.STA_IF )
wifi.active(True)
# scan WiFi and get the results as a list of tuples
scan_results = wifi.scan()
# deactivate the WiFi interface
wifi.active(False)
# convert a list of tuples to json data
results = convert_to_json( scan_results )
if results.get('count') > 0:
for item in results.get('found_list'):
print( json.dumps(item) ) # show JSON string
else:
print( 'No WiFi found')
3. Wi-Fi Connection in STA Mode
การเชื่อมต่อกับเครือข่าย Wi-Fi ในโหมด STA จะต้องระบุชื่อ SSID และรหัสผ่าน (Password) สำหรับเชื่อมต่ออุปกรณ์ไร้สาย เช่น Wireless Access Point / Router ตามตัวอย่างโค้ดต่อไปนี้
import network
import utime as time
# Specify the SSID and password of your WiFi network
WIFI_CFG = { 'ssid': "XXXXXX", 'pwd': "XXXXXXXXX" }
def connect_wifi( wifi_cfg, max_retries=10 ):
# use WiFi in station mode (not AP)
wifi_sta = network.WLAN( network.STA_IF )
# activate the WiFi interface (up)
wifi_sta.active(True)
# connect to the specified WiFi AP
wifi_sta.connect( wifi_cfg['ssid'], wifi_cfg['pwd'] )
retries = 0
while not wifi_sta.isconnected():
retries = retries + 1
if retries >= max_retries:
return None
time.sleep_ms(500)
return wifi_sta
# try to connect the network
wifi = connect_wifi( WIFI_CFG )
if wifi is None:
print( 'WiFi connection failed' )
else:
ipaddr, netmask, gateway, dns = wifi.ifconfig()
print("============================")
print("IP address :", ipaddr)
print("Net mask :", netmask)
print("Gateway :", gateway)
print("DNS server :", dns)
print("----------------------------")
4. Wi-Fi in AP Mode + Simple HTTP Server
ตัวอย่างถัดไป สาธิตการเปิดใช้งาน Wi-Fi ในโหมด AP แบบ Open (ไม่มีการป้องกันด้วยรหัสผ่าน) โดยการสร้างและเรียกใช้ฟังก์ชัน start_wifi_ap() จากนั้นเมื่อ ESP32 ทำงานเป็น Wi-Fi AP แล้ว (จะมีหมายเลขไอพีตรงกับ 192.168.4.1 และเลือกใช้ช่องสัญญาณหมายเลข 11) ยังได้มีการสร้างและเรียกใช้ฟังก์ชัน start_web_server() เพื่อให้สามารถทำงานเป็น Web Server (HTTP protocol) อย่างง่ายได้ด้วย แสดงข้อความเป็นหน้าเว็บได้
ในตัวอย่างนี้ ถ้าต้องการจบการทำงานของ Web Server เราจะใช้วิธีตรวจสอบการกดปุ่ม ดังนั้นจะต้องมีการต่อวงจรกดปุ่มภายนอกแบบ Active-Low และเลือกใช้ขา GPIO23 เป็นอินพุตสำหรับปุ่มกด
import network
import utime as time
import usocket as socket
from machine import Pin
AP_CFG = { 'essid': 'MyAP', # SSID for your AP
'authmode': network.AUTH_OPEN, 'channel': 11, }
def start_wifi_ap( ap_cfg ):
# open WiFi in AP mode
ap = network.WLAN( network.AP_IF )
# activate AP interface (up)
ap.active(True)
# configure the AP interface
try:
ap.config( **ap_cfg )
except Exception as ex:
print ( 'Cannot set AP configuration...' )
if ap.active():
return ap
else:
return False
def start_web_server( btn=None ):
global sock
# create a socket (STREAM TCP socket) for network connection
sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
# set socket option: reuse address
sock.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
# use socket in non-blocking mode
sock.setblocking( False )
# get the server address (localhost)
addr = socket.getaddrinfo('0.0.0.0',80)[0][-1]
# bind the socket to port 80 (HTTP)
sock.bind( addr )
# listen for an incoming connection
sock.listen(1)
while True:
try:
# waiting for connection
conn,addr = sock.accept()
except OSError:
if btn and btn.value()==0: # check button
sock.close() # close socket
break # break the loop
else:
continue
# read incoming request data
request = conn.recv(1024)
print( 'Content = %s' % str(request) )
# send HTML as response
html = '<html><head></head><body>'
html += '<h1>Welcome to ESP32...</h1><br>'
html += '<b>Client from {}<b><br>'.format(str(addr))
html += '</body></html>'
conn.send( html ) # send HTML response
conn.close() # close HTTP connection
time.sleep_ms(10)
sock = None
ap = start_wifi_ap( AP_CFG )
ipaddr = ap.ifconfig()[0]
print ( 'IP address:', ipaddr ) # 192.168.4.1
BTN_GPIO = const(23) # use GPIO23 for push button
btn = Pin( BTN_GPIO, Pin.IN, Pin.PULL_UP )
try:
start_web_server( btn ) # start web server
except KeyboardInterrupt:
pass
finally:
if sock:
sock.close()
ap.active(False) # turn off WiFi AP
ถ้าจะเปลี่ยนจาก Open เป็นการตั้งรหัสป้องกันด้วยวิธี WPA2-PSK ก็กำหนดค่าสำหรับ Configuration ตามตัวอย่างดังนี้
ตัวอย่างถัดไปสาธิตการเปิดใช้งาน ESP32 ในโหมด STA และเชื่อมต่อกับอุปกรณ์ Wi-Fi AP ที่สามารถเชื่อมต่อไปยังอินเทอร์เน็ตได้ และใช้คำสั่ง ntptime.settime() เพื่อเชื่อมต่อกับคอมพิวเตอร์ในอินเทอร์เน็ตที่ทำหน้าที่เป็น NTP Server และอัปเดทเวลาปัจจุบันสำหรับการทำงานของวงจรของ RTC (Real-Time Clock) ของ ESP32
import network
import utime as time
import ntptime
import machine
# Specify the SSID and password of your WiFi network
WIFI_CFG = { 'ssid': "XXXX", 'pwd': "XXXXXXX" }
def connect_wifi( wifi_cfg ):
wifi_sta = network.WLAN( network.STA_IF )
wifi_sta.active(True)
wifi_sta.connect( wifi_cfg['ssid'], wifi_cfg['pwd'] )
while not wifi_sta.isconnected():
time.sleep(1.0)
print(wifi_sta.ifconfig())
def sync_ntp():
while True:
try:
# synchronize RTC with NTP server
ntptime.settime()
break
except OSError:
print('NTP server: connection timeout...')
connect_wifi( WIFI_CFG )
sync_ntp()
rtc = machine.RTC()
tm = rtc.datetime() # get current RTC datetime (now)
tz_offset = +7 # timezone offset (GMT+7)
hh, mm, ss = (tm[4]+tz_offset) % 24, tm[5], tm[6]
print( 'Time: {}:{}:{}'.format( hh, mm, ss ) )
y, m, d = tm[0], tm[1], tm[2]
print( 'Date: {}-{:02d}-{:02d}'.format( y, m, d ) )
6. HTTP GET: Getting Datetime Using WorldTime API
import network
import utime as time
import ujson as json
import urequests as requests
URL = "http://worldtimeapi.org/api/timezone/Asia/Bangkok"
# Specify the SSID and password of your WiFi network
WIFI_CFG = { 'ssid': "XXXX", 'pwd': "XXXXXXX" }
def connect_wifi( wifi_cfg, max_retries=10 ):
# use WiFi in station mode (not AP)
wifi_sta = network.WLAN( network.STA_IF )
# activate the WiFi interface (up)
wifi_sta.active(True)
# connect to the specified WiFi AP
wifi_sta.connect( wifi_cfg['ssid'], wifi_cfg['pwd'] )
retries = 0
while not wifi_sta.isconnected():
retries = retries + 1
if retries >= max_retries:
return None
time.sleep_ms(500)
return wifi_sta
def get_worldtime_data( url ):
resp = requests.get( url )
if resp.status_code==200: # request ok
try:
data = json.loads(resp.text)
except Exception as ex:
print ('JSON data error...' )
return
print( 'Timezone:', data['timezone'] )
print( 'Epoch time (Jan 1, 1970):', int(data['unixtime']) )
print( 'UTC datetime:', data['utc_datetime'] )
print( 'Local datetime:', data['datetime'] )
print( 'UTC offset (hours):', data['utc_offset'] )
print( 'UTC offset (seconds):', int(data['raw_offset']) )
else:
print ('HTTP request error...')
# try to connect the network
wifi = connect_wifi( WIFI_CFG )
if wifi is not None:
get_worldtime_data( URL )
else:
print('No WiFi connection')
ตัวอย่างเอาต์พุตที่ได้
Timezone: Asia/Bangkok
Epoch time (Jan 1, 1970): 1609939805
UTC datetime: 2021-01-06T13:30:05.138524+00:00
Local datetime: 2021-01-06T20:30:05.138524+07:00
UTC offset (hours): +07:00
UTC offset (seconds): 25200
7. HTTPS GET: Thailand's COVID-19 Status Update
import network
import utime as time
import urequests as requests
URL = "https://covid19.th-stat.com/api/open/today"
# Specify the SSID and password of your WiFi network
WIFI_CFG = { 'ssid': "XXXX", 'pwd': "XXXXXXX" }
WIFI_CFG = { 'ssid': "esl_ap", 'pwd': "cnch2687" }
COVID19_NAMES = [
('Confirmed', u'ยอดผู้ป่วยสะสม'),
('Recovered', u'ผู้ป่วยรักษาหายแล้ว'),
('Hospitalized', u'ผู้ป่วยรักษาตัวในโรงพยาบาล'),
('Deaths', u'ผู้เสียชีวิตสะสม'),
('NewConfirmed', u'ผู้ป่วยตรวจพบเพิ่มวันนี้'),
('NewRecovered', u'ผู้ป่วยรักษาหาย'),
('NewDeaths', u'ผู้เสียชีวิตวันนี้'),
('UpdateDate', u'อัปเดตล่าสุด') ]
def get_covid19_data( url ):
data = None
try:
resp = requests.get( url )
data = resp.json()
for name in COVID19_NAMES[:-1]:
if name[0] in data:
value = str(data[name[0]])
print('{}:\t\t\t\t{}'.format(name[1],value) )
name = COVID19_NAMES[-1]
update = data[ name[0] ]
print( '{}: {}'.format(name[1], update) )
except Exception as ex:
print('error:', ex)
return data
def connect_wifi( wifi_cfg ):
wifi_sta = network.WLAN( network.STA_IF )
wifi_sta.active(True)
wifi_sta.connect( wifi_cfg['ssid'], wifi_cfg['pwd'] )
while not wifi_sta.isconnected():
time.sleep(1.0)
print(wifi_sta.ifconfig())
connect_wifi( WIFI_CFG )
import gc
gc.collect()
get_covid19_data( URL )
import network
import sys
import ujson as json
import urllib.parse
import urequests as requests
JSON_CONFIG_FILE = 'config.json'
config = {}
try:
with open( JSON_CONFIG_FILE ) as json_file:
config = json.load(json_file)
except OSError as ex:
print('Cannot open JSON file')
sys.exit(-1)
sta_if = network.WLAN( network.STA_IF )
sta_if.active( True )
sta_if.connect( config['ssid'], config['pwd'] )
while not sta_if.isconnected():
print('waiting for Wi-Fi connection')
time.sleep(1.0)
#########################################################
token = config['line_notify_token']
url = 'https://notify-api.line.me/api/notify'
msg = {'message': u'ในบ้าน อุณหภูมิ 29.8 °C ความชื้น 69.0%' }
headers = {'Authorization' : 'Bearer %s' % token,
'Content-Type' : 'application/x-www-form-urlencoded' }
data = '{}={}'.format( 'message', urllib.parse.quote(msg['message']) )
try:
resp = requests.post( url, headers=headers, data=data )
print(resp.status_code, resp.text)
except Exception as ex:
print(ex)
12. ตรวจสอบการเชื่อมต่อในเครือข่ายด้วย Ping
ตัวอย่างถัดไปสาธิตการตรวจสอบการเชื่อมต่อกับเครื่องคอมพิวเตอร์หรืออุปกรณ์ในเครือข่าย โดยใช้คำสั่ง ping ซึ่งเป็นการส่งแพคเกจตามรูปแบบ ICMP (Internet Control Message Protocol)
import network
import utime as time
import ujson as json
########################################################
def checksum(data): # CRC16
if len(data) % 2 == 1:
data += b'\x00'
chksum = 0
for pos in range(0, len(data), 2):
hi_byte = data[pos]
lo_byte = data[pos + 1]
chksum += (hi_byte << 8) + lo_byte
while chksum >= 0x10000:
chksum = (chksum & 0xffff) + (chksum >> 16)
chksum = ~chksum & 0xffff
return chksum
def ping(host, count=4, timeout=5000,
interval=10, quiet=False, size=64):
import utime
import uselect
import uctypes
import usocket
import ustruct
import urandom
# prepare packet
assert( (size >= 16), "packet size too small")
assert( (size <= 1000), "packet size too big")
pkt = b'Q'*(size)
pkt_desc = {
"type" : uctypes.UINT8 | 0,
"code" : uctypes.UINT8 | 1,
"checksum" : uctypes.UINT16 | 2,
"id" : uctypes.UINT16 | 4,
"seq" : uctypes.INT16 | 6,
"timestamp": uctypes.UINT64 | 8,
} # packet header descriptor
h = uctypes.struct( uctypes.addressof(pkt),
pkt_desc, uctypes.BIG_ENDIAN )
h.type = 8 # ICMP_ECHO_REQUEST
h.code = 0
h.checksum = 0
h.id = urandom.getrandbits(16)
h.seq = 1
# init socket
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1)
sock.setblocking(0)
sock.settimeout(timeout/1000.0)
addr = usocket.getaddrinfo(host, 1)[0][-1][0] # ip address
sock.connect((addr, 1))
if not quiet:
print("PING %s (%s): %u data bytes" % (host, addr, len(pkt)))
seqs = list(range(1, count+1))
c = 1
t = 0
n_trans = 0
n_recv = 0
finish = False
while t < timeout:
if t==interval and c<=count:
# send packet
h.checksum = 0
h.seq = c
h.timestamp = utime.ticks_us()
h.checksum = checksum(pkt)
if sock.send(pkt) == size:
n_trans += 1
t = 0 # reset timeout
else:
seqs.remove(c)
c += 1
# recv packet
while True:
socks,_,_ = uselect.select([sock],[],[],0)
if socks:
# receive incoming packet as response
resp = socks[0].recv(1024)
# create a memoryview object
resp_mv = memoryview(resp)
h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]),
pkt_desc, uctypes.BIG_ENDIAN)
# validate checksum
calc_chksum = checksum(bytes(resp_mv[24:]))
chksum = ustruct.unpack("!H", resp_mv[22:24])[0]
chksum_ok = chksum == calc_chksum
assert chksum_ok, 'Checksum failed...'
seq = h2.seq
if h2.type==0 and h2.id==h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY
t_elasped = (utime.ticks_us()-h2.timestamp) / 1000
ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live
n_recv += 1
if not quiet:
args = (len(resp), addr, seq, ttl, t_elasped)
print("{} bytes from {}: icmp_seq={}, ttl={}, time={:.1f} ms".format(*args))
seqs.remove(seq)
if len(seqs) == 0:
finish = True
break
else:
break
if finish:
break
utime.sleep_ms(1)
t += 1
# close
sock.close()
ret = (n_trans, n_recv)
if not quiet:
args = (n_trans, n_recv)
print("{} packets transmitted, {} packets received".format(*args))
return (n_trans, n_recv)
########################################################
JSON_CONFIG_FILE = 'config.json'
config = {}
try:
with open( JSON_CONFIG_FILE ) as json_file:
config = json.load(json_file)
except OSError as ex:
print('Cannot open JSON file')
sys.exit(-1)
sta_if = network.WLAN( network.STA_IF )
sta_if.active( True )
sta_if.connect( config['ssid'], config['pwd'] )
while not sta_if.isconnected():
print('waiting for Wi-Fi connection')
time.sleep(1.0)
ping('8.8.8.8',count=5, timeout=5000, interval=200,
quiet=False, size=44)
########################################################
ตัวอย่างข้อความเอาต์พุต
PING 8.8.8.8 (8.8.8.8): 44 data bytes
64 bytes from 8.8.8.8: icmp_seq=1, ttl=112, time=139.7 ms
64 bytes from 8.8.8.8: icmp_seq=2, ttl=112, time=64.4 ms
64 bytes from 8.8.8.8: icmp_seq=3, ttl=112, time=150.3 ms
64 bytes from 8.8.8.8: icmp_seq=4, ttl=112, time=61.7 ms
64 bytes from 8.8.8.8: icmp_seq=5, ttl=112, time=158.2 ms
5 packets transmitted, 5 packets received
ตัวอย่างถัดไปสาธิตการเปิดใช้งาน ESP32 ในโหมด STA และเชื่อมต่อกับอุปกรณ์ Wi-Fi AP ที่สามารถเชื่อมต่อไปยังอินเทอร์เน็ตได้ และร้องขอข้อมูลแบบ HTTP Request / GET Method ไปยัง ซึ่งเป็นผู้ให้บริการข้อมูลเกี่ยวกับวันและเวลาในปัจจุบันตามโซนเวลา (Timezone) ที่ระบุไว้ ข้อมูลที่ได้รับกลับมานั้น จะอยู่ในรูปแบบของ JSON String
ตัวอย่างถัดไป สาธิตการดึงข้อมูลเกี่ยวกับผู้ป่วย COVID-19 สำหรับประเทศไทย ด้วยวิธี HTTPS GET จาก API ของเว็บ ข้อมูลที่ได้จะอยู่ในรูปของ JSON String
ตัวอย่างถัดไปแสดงการขอข้อมูลเกี่ยวกับสภาพอากาศและดัชนีชี้วัดคุณภาพอากาศจาก ซึ่งใช้วิธีสื่อสารแบบ HTTPS ตามรูปแบบ ที่ทางบริษัทกำหนดไว้ และจะได้ข้อมูลกลับมาเป็น JSON Data
() มีบริการ API ให้ดึงข้อมูลด้วยโพรโทคอล HTTP/HTTPS (ดูตัวอย่างการใช้ API และตัวอย่างข้อมูลในรูปแบบ JSON ได้จาก )
ตัวอย่างถัดไป สาธิตการดึงข้อมูลเกี่ยวกับคุณภาพอากาศในเขตกรุงเทพจากเว็บ i ใช้ HTTP GET แล้วนำข้อมูลบางส่วนซึ่งเป็น JSON Data มาแสดงผลเป็นข้อความเอาต์พุตทาง REPL
เราสามารถลองใช้ไลบรารี ซึ่งเป็นโมดูลแบบ built-in สำหรับไมโครไพธอน และใช้คำสั่ง urequests.post() สำหรับส่งข้อมูลแบบ POST ได้ แต่จากการที่ได้ลองใช้กับ LINE Notify API พบว่า มีความผิดพลาดเกิดขึ้นในการรับข้อมูลตอบกลับ (Response) แต่ก็ได้รับข้อความใน LINE App
โค้ดตัวอย่างต่อไปนี้ ดัดแปลงมาจากโค้ด (Micro-Ping for MicroPython) ที่ได้มีการแชร์ไว้ใน
เผยแพร่ภายใต้ลิขสิทธิ์
Attribution-ShareAlike 4.0 International ()