WebSocket

示例代码

requirements.txt

lessweb
redis
hiredis

index.py

import aiohttp.web
from aiohttp_wsgi import WSGIHandler

from lessweb import Application, Service
from lessweb.plugin import redis
from lessweb.plugin.redisplugin import RedisPlugin, RedisServ

async def websocket_handler(request):
    print('Websocket connection starting')
    ws = aiohttp.web.WebSocketResponse()
    await ws.prepare(request)
    print('Websocket connection ready')

    async for msg in ws:
        print(msg)
        if msg.type == aiohttp.WSMsgType.TEXT:
            print(msg.data)
            if msg.data == 'close':
                await ws.close()
            else:
                who = redis.session().get('who') or b'-'
                await ws.send_str(msg.data + '@' + who.decode())

    print('Websocket connection closed')
    return ws

def setter(serv: Service[RedisServ], who:str):
    serv().redis.set('who', who.encode(), ex=30)
    return 'ok'

app = Application()
app.add_plugin(RedisPlugin('localhost'))
app.add_get_mapping('/set', setter)

if __name__ == '__main__':
    aioapp = aiohttp.web.Application()
    aioapp.router.add_route('GET', '/ws', websocket_handler)
    aioapp.router.add_route("*", "/{path_info:.*}", WSGIHandler(app.wsgifunc()))
    aiohttp.web.run_app(aioapp, port=8080)

测试步骤

  1. 启动本地的redis服务。
  2. 安装一款叫Dark WebSocket Terminal的Chrome应用,用来发送WebSocket请求。
  3. 在终端执行pip install -r requirements.txt安装依赖的库。
  4. 运行上面的index.py,启动监听8080端口的服务。
  5. 在Dark WebSocket Terminal输入/connect ws://localhost:8080/ws,会显示连接成功。
  6. 在Dark WebSocket Terminal发送a,会收到a@-
  7. 在终端执行curl "http://localhost:8080/set?who=John",会返回ok
  8. 在Dark WebSocket Terminal发送b,会收到b@John
  9. 在Dark WebSocket Terminal发送/disconnect,会显示连接断开。

参考