您的当前位置:首页Django+Vue实现WebSocket连接的示例代码

Django+Vue实现WebSocket连接的示例代码

2020-11-27 来源:小侦探旅游网

近期有一需求:前端页面点击执行任务,实时显示后端执行情况,思考一波;发现 WebSocket 最适合做这件事。

效果

测试 ping www.baidu.com 效果

点击连接建立ws连接

后端实现

所需软件包

后端主要借助Django Channels 实现socket连接,官网文档链接

这里想实现每个连接进来加入组进行广播,所以还需要引入 channels-redis

pip

channels==2.2.0
channels-redis==2.4.0

引入

settings.py

INSTALLED_APPS = [
 'django.contrib.admin',
 'django.contrib.auth',
 'django.contrib.contenttypes',
 'django.contrib.sessions',
 'django.contrib.messages',
 'django.contrib.staticfiles',
 'rest_framework.authtoken',
 'rest_framework',
 ...
 'channels',
]

# Redis配置
REDIS_HOST = ENV_DICT.get('REDIS_HOST', '127.0.0.1')
REDIS_PORT = ENV_DICT.get('REDIS_PORT', 6379)
CHANNEL_LAYERS = {
 "default": {
 "BACKEND": "channels_redis.core.RedisChannelLayer",
 "CONFIG": {
 "hosts": [(REDIS_HOST, REDIS_PORT)],
 },
 },
}

代码

apps/consumers.py

新建一个消费处理

实现: 默认连接加入组,发送信息时的处理。

from channels.generic.websocket import WebsocketConsumer

class MyConsumer(WebsocketConsumer):
 def connect(self):
 """
 每个任务作为一个频道
 默认进入对应任务执行频道
 """
 self.job_name = self.scope['url_route']['kwargs']['job_name']
 self.job_group_name = 'job_%s' % self.job_name
 async_to_sync(self.channel_layer.group_add)(
 self.job_group_name,
 self.channel_name
 )
 self.accept()

 def disconnect(self, close_code):
 async_to_sync(self.channel_layer.group_discard)(
 self.job_group_name,
 self.channel_name
 )

 # job.message类型处理
 def job_message(self, event):

 # 默认发送收到信息
 self.send(text_data=event["text"])

apps/routing.py

ws类型路由

实现:ws/job/<job_name>由 MyConsumer 去处理。

from . import consumers
from django.urls import path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.sessions import SessionMiddlewareStack

application = ProtocolTypeRouter({
 'websocket': SessionMiddlewareStack(
 URLRouter(
 [
 path('ws/job/<str:job_name>', consumers.MyConsumer)
 ]
 )
 ),
})

apps/views.py

在执行命令中获取 webSocket 消费通道,进行异步推送

  • 使用异步推送async_to_sync是因为在连接的时候采用的异步连接,所以推送必须采用异步推送。
  • 因为执行任务时间过长,启动触发运行时加入多线程,直接先返回ok,后端运行任务。
  • from subprocess import Popen,PIPE
    import threading
    
    def runPopen(job):
     """
     执行命令,返回popen
     """
     path = os.path
     Path = path.abspath(path.join(BASE_DIR, path.pardir))
     script_path = path.abspath(path.join(Path,'run.sh'))
     cmd = "sh %s %s" % (script_path, job)
     return Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
    
    def runScript(job):
     channel_layer = get_channel_layer()
     group_name = "job_%s" % job
    
     popen = runPopen(job)
     while True:
     output = popen.stdout.readline()
     if output == '' and popen.poll() is not None:
     break
    
     if output:
     output_text = str(output.strip())
     async_to_sync(
     channel_layer.group_send
     )(
     group_name, 
     {"type": "job.message", "text": output_text}
     )
     else:
     err = popen.stderr.readline()
     err_text = str(err.strip())
     async_to_sync(
     channel_layer.group_send
     )(
     group_name,
     {"type": "job.message", "text": err_text}
     )
     break
    
    class StartJob(APIView): 
     def get(self, request, job=None):
     run = threading.Thread(target=runScript, args=(job,))
     run.start()
     return HttpResponse('ok')

    apps/urls.py

    get请求就能启动任务

    urlpatterns = [
     ...
     path('start_job/<str:job>', StartJob.as_view())
    ]

    前端实现

    所需软件包

    vue-native-websocket 

    代码实现

    plugins/vueNativeWebsocket.js

    import Vue from 'vue'
    import VueNativeSock from '../utils/socket/Main.js'
    
    export default function ({ store }) {
     Vue.use(VueNativeSock, 'http://localhost:8000/ws/job', {connectManually: true,});
    }

    nuxt.config.js

    配置文件引入, 这里我使用的是 nuxt 框架

     plugins: [ 
     { 
     src: '@/plugins/vueNativeWebsocket.js', 
     ***: false 
     },
     ],

    封装 socket

    export default (connection_url, option) => {
     // 事件
     let event = ['message', 'close', 'error', 'open'];
    
     // 拷贝选项字典
     let opts = Object.assign({}, option);
    
     // 定义实例字典
     let instance = {
    
     // socket实例
     socket: '',
    
     // 是否连接状态
     is_conncet: false,
    
     // 具体连接方法
     connect: function() {
     if(connection_url) {
     let scheme = window.location.protocol === 'https:' ? 'wss' : 'ws'
     connection_url = scheme + '://' + connection_url.split('://')[1];
     this.socket = new WebSocket(connection_url);
     this.initEvent();
     }else{
     console.log('wsurl為空');
     }
     },
    
     // 初始化事件
     initEvent: function() {
     for(let i = 0; i < event.length; i++){
     this.addListener(event[i]);
     }
     },
    
     // 判断事件
     addListener: function(event) {
     this.socket.addEventListener(event, (e) => {
     switch(event){
     case 'open':
     this.is_conncet = true;
     break;
     case 'close':
     this.is_conncet = false;
     break;
     }
     typeof opts[event] == 'function' && opts[event](e);
     });
     },
    
     // 发送方法,失败则回调
     send: function(data, closeCallback) {
     console.log('socket ---> ' + data)
     if(this.socket.readyState >= 2) {
     console.log('ws已经关闭');
     closeCallback && closeCallback();
     }else{
     this.socket.send(data);
     }
     }
    
     };
    
     // 调用连接方法
     instance.connect();
     return instance;
     }

    index.vue

    具体代码

    x2Str 方法,因为后端返回的是bytes,格式 b'xxx' ,编写了方法对其进行转换。

    <template>
     <div>
    
     <el-button type="primary" @click="runFunction" >执行</el-button>
     <el-button type="primary" @click="connectWebSock" >显示</el-button>
    
     <div class="socketView">
     <span v-for="i in socketMessage" :key="i">{{i}}</span>
     </div>
     </div>
    </template>
    <script>
     import R from '@/plugins/axios';
     import ws from '@/plugins/socket'
     export default {
     data() {
     return {
     webSocket: '',
     socketMessage: [],
     }
     },
    
     methods: {
     // 打开连接的处理
     openSocket(e) {
     if (e.isTrusted) {
     const h = this.$createElement;
     this.$notify({
     title: '提示',
     message: h('i', { style: 'color: teal'}, '已建立Socket连接')
     });
     }
     },
    
     // 连接时的处理
     listenSocket(e) {
     if (e.data){
     this.socketMessage.push(this.x2Str(e.data))
     }
     },
    
     // 连接webSocket
     connectWebSock() {
     let wsuri = process.env.BACKEND_URL + '/ws/job/' + this.selectFunctions
     this.webSocket = ws(wsuri, {
     open: e => this.openSocket(e),
     message: e => this.listenSocket(e),
     close: e => this.closeSocket(e)
     })
     },
    
     // 转码
     x2Str(str) {
     if (str) {
     let reg = new RegExp("(?<=^b').*(?='$)")
     let result = str.replace(/(?:\\x[\da-fA-F]{2})+/g, m =>
     decodeURIComponent(m.replace(/\\x/g, '%'))
     )
     return reg.exec(result)[0]
     }
     },
    
     // 执行方法
     runFunction() {
     R.myRequest('GET','api/start_job/' + this.selectFunctions, {}, {}).then((response) => {
     if (response.hasOwnProperty('response')){
     this.$message({
     type: 'error',
     message: '服务端返回错误,返回码:' + response.response.status 
     });
     }; 
     if (response.data == 'ok') {
     this.$message({
     type: 'success',
     message: '开始执行[' + this.selectFunctions + ']'
     });
     }
     });
     } 
     }
    }
    </script>

    至此,实现前后端 websocket 通讯。

    显示全文