您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10582|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    ( `, V$ k- g/ @3 j6 \6 e$ ?
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    . I: Y. F# T; ~; P6 U3 R

  3. ' s+ @! G& \' d# J+ t+ I
  4. $worker = new Worker();8 @- i& K3 `1 {# C! B
  5. // 4个进程# X# j0 Q9 F( R( \6 i' D2 `& K' O
  6. $worker->count = 4;% z* j( r' Q" y# C5 N) C
  7. // 每个进程启动后在当前进程新增一个Worker监听$ l! ]" B( X3 w& ]: G4 J
  8. $worker->onWorkerStart = function($worker)
    5 D$ q- H  h! N! ]
  9. {1 Q+ V: @: ~; c. {. n) E
  10.     /**$ d1 r( m% K% L: R2 L5 f
  11.      * 4个进程启动的时候都创建2016端口的Worker
    ; f5 d8 Y7 U) k+ J
  12.      * 当执行到worker->listen()时会报Address already in use错误
    : ^3 J; N5 D9 J6 D
  13.      * 如果worker->count=1则不会报错
    % o& }/ a8 E& m+ O, K! T* j
  14.      */
    - u# `( r% \" f# g/ }
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    $ ]* m  _: U8 E, ]9 v8 G& O
  16.     $inner_worker->onMessage = 'on_message';
    ' a5 B) p! g( T7 `% d& t! k
  17.     // 执行监听。这里会报Address already in use错误: g/ W; Z; ~8 L' r( Y+ v
  18.     $inner_worker->listen();& k3 i1 z$ L6 d6 s
  19. };, E. W; j6 n+ A% C
  20. 1 N8 b6 I1 O6 c9 t5 \0 o
  21. $worker->onMessage = 'on_message';% j# M" J) w1 s- W
  22. 2 q8 s* G# y$ G! E8 y$ c
  23. function on_message($connection, $data)
    ! K2 m$ C- J6 B
  24. {
    % Z5 v3 k8 }9 f8 s% E& {9 L5 j7 q
  25.     $connection->send("hello\n");9 h0 X& c+ O4 U2 a* W+ j& r& b) j
  26. }! a+ g: G6 G" E# P6 B0 [9 s" C
  27. ( s8 p5 H. w; v8 x
  28. // 运行worker
    4 s" h# h9 X& m) |" U# c! C1 o, C" _
  29. Worker::runAll();
    : Q6 v1 r! t' o0 z7 N; B
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:! |5 g3 E- X; S8 C! n9 K- F
  31. 9 E1 p6 I# A6 l# b
  32. use Workerman\Worker;
    - L2 I7 U6 g* ]" S4 J- Q
  33. require_once './Workerman/Autoloader.php';
    * @- b* {6 J/ K5 \2 N' z6 @9 T

  34. / D" d$ g; ]. ~6 ~' a; c3 [  L
  35. $worker = new Worker('text://0.0.0.0:2015');
    & v! O  O  n/ ?
  36. // 4个进程
    - D5 F5 O& |1 B+ t( E% ^, n
  37. $worker->count = 4;, c. m( d7 Z% K, s9 s: A& r
  38. // 每个进程启动后在当前进程新增一个Worker监听
    . Z& R  @. W/ \: b! x3 F% \( o
  39. $worker->onWorkerStart = function($worker)
    7 ?5 ~. Q" Q* [  T. [' {$ Z
  40. {, i" N3 Q$ p# R, I: ^3 N
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');+ y3 Y( ?+ p1 w6 a  s
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    3 Y( O( y! T2 u0 j& k. l, ^
  43.     $inner_worker->reusePort = true;- j- R/ @( M7 u) n$ ]2 J
  44.     $inner_worker->onMessage = 'on_message';% p0 W$ G0 C, J0 w6 a/ b& S8 T
  45.     // 执行监听。正常监听不会报错# R9 P; U" V, g' D
  46.     $inner_worker->listen();" o! s! p- Y1 X; O' Z3 \( j
  47. };+ j2 @  \3 |$ W! a9 Q

  48. 9 m) |+ U- |( r
  49. $worker->onMessage = 'on_message';; k) ]4 F8 j3 V
  50. % F6 R3 S. |! f6 d# z
  51. function on_message($connection, $data)
    3 S6 C9 s6 s# ?: n  Z, t9 F& w
  52. {
    2 C3 I5 ~* f4 }& F; y' D+ t
  53.     $connection->send("hello\n");& [: ?: Q8 s+ {  O2 K
  54. }  O) e7 l/ V% s- a. s8 t; [
  55. ( u3 t8 {, z5 N2 s3 w
  56. // 运行worker
    0 ]# W) X' K5 ]- S+ C5 {. }
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php: B' a8 V$ l  t4 Q( J8 [
  2. use Workerman\Worker;7 I1 X, h( w: P  b; H$ p" _
  3. require_once './Workerman/Autoloader.php';
    6 P( B0 C3 f# c/ ?, p1 |8 J
  4. // 初始化一个worker容器,监听1234端口' E* K6 y( [/ B) G. W- h  @
  5. $worker = new Worker('websocket://0.0.0.0:1234');, `) \; C  R" j; R" t4 r

  6. & Y, n8 ~$ H. `
  7. /*
    0 ]1 [. ?# o* a* u
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误: y' l3 }, a6 `% g0 K
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)% T9 R) x  J$ K8 h  S& s" G
  10. */
    " v( U5 g: I- h/ f9 D, ]
  11. $worker->count = 1;* n5 ]$ b7 _% ~: U3 n
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口; M, J: O& e5 r  Y# w% o% x
  13. $worker->onWorkerStart = function($worker)( e/ o. O' Q6 x
  14. {
    9 Z/ e) ~# E( D# l# e8 E
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符: F4 ?' }. |1 e! w: _3 A
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');9 z8 R1 K) Z; z$ A( E3 r  n% ~" g& w
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    / X& \% C3 B( e3 y! ?- W8 a3 e+ o" R
  18.     {; y5 q0 a3 [% G1 H
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据1 L/ v1 |' e# }0 S$ B  t7 t! b5 o
  20.         $data = json_decode($buffer, true);6 U, g4 Z# N$ v1 ~/ B6 r; {0 Y
  21.         $uid = $data['uid'];  b# r$ k: J4 A$ W0 q8 L0 ~
  22.         // 通过workerman,向uid的页面推送数据
    4 P: F9 @8 Q7 C7 H5 D; A% A
  23.         $ret = sendMessageByUid($uid, $buffer);
    7 Q: T# B6 r0 X# n  ?! W
  24.         // 返回推送结果
    & A6 k4 A6 g3 Z+ Q8 w, N! U3 Q
  25.         $connection->send($ret ? 'ok' : 'fail');7 y. y3 P  v- ]! w4 U) w* d
  26.     };0 z, d2 G+ n9 u% l3 |: Q" F8 ^
  27.     // ## 执行监听 ##! B( r6 X( [" O
  28.     $inner_text_worker->listen();
      a7 c  N; r  I
  29. };6 P6 z1 Y3 g2 y
  30. // 新增加一个属性,用来保存uid到connection的映射
    ( r: j/ W2 o3 _. m' |
  31. $worker->uidConnections = array();
      q0 o  F3 K$ w# z% q) E
  32. // 当有客户端发来消息时执行的回调函数
    0 A8 k7 o% d% |; t" Y
  33. $worker->onMessage = function($connection, $data); I3 m1 r1 F- N6 s% @6 g, n
  34. {, Y' o5 D) ~% f, E) t  Y7 m
  35.     global $worker;. i) w- s8 h+ F- L: ?" a5 b$ Y
  36.     // 判断当前客户端是否已经验证,既是否设置了uid/ j  f2 ^. ~) n" I4 D4 O: M. p
  37.     if(!isset($connection->uid))* s1 w4 `- p9 D$ r
  38.     {
    " e: s9 E- p' c3 V7 @6 S! u. ^
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    - f; X  d. C+ Z, R: ?) t  i
  40.        $connection->uid = $data;
    " ^& x4 A# Q, k7 P" r8 v
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    * `8 e7 O; L& @, T
  42.         * 实现针对特定uid推送数据
    : b7 @6 W" |7 \3 S* v2 {6 L
  43.         */
    , x. d3 x/ V- e( o( u
  44.        $worker->uidConnections[$connection->uid] = $connection;6 r, o6 U9 Q, _* u' M- y
  45.        return;
    2 n1 @8 Z; v3 h" p* `& O3 C
  46.     }
    ' Q. I7 Q5 w+ X2 O' \% k
  47. };6 F' m/ ^7 U9 i

  48. & H# A9 u9 a3 B3 p
  49. // 当有客户端连接断开时, t; [, C( j* C- F. i
  50. $worker->onClose = function($connection)! J. f1 U6 C- I7 e( ~, @: m
  51. {
    $ N: W6 c5 l9 W+ Z
  52.     global $worker;! }4 @4 t% V2 Z: |/ C, o. r
  53.     if(isset($connection->uid)): \* {' R8 A7 c- T4 ?- M
  54.     {0 k( |! U* l2 E4 u
  55.         // 连接断开时删除映射
    ( i! z# J# W1 \
  56.         unset($worker->uidConnections[$connection->uid]);
    5 @. X2 ?  t3 x' A$ U! h9 ?
  57.     }! i3 S" Q1 ~9 I+ A3 _0 t1 q
  58. };
    4 F, ?; V4 _9 |$ n: w
  59. % j5 z7 i' ~5 e3 M
  60. // 向所有验证的用户推送数据
      g1 S, N/ c. `5 X) E1 r
  61. function broadcast($message)* _  h0 a0 H1 {& D6 [, U3 `
  62. {$ t) |# h" U  M% n
  63.    global $worker;0 _; y4 G! a: k" _' i0 _' B8 G$ O( ^
  64.    foreach($worker->uidConnections as $connection)
    / A0 N3 p/ ?" F4 V3 P8 v& F
  65.    {' z/ D* I( B; Q
  66.         $connection->send($message);
    : w8 d' E" {% j% I; c
  67.    }! a# l  ^1 y* U/ ~
  68. }' Q. g: V7 L3 a' o9 f3 N5 h: t

  69. ' B$ Q) k5 M. H8 j3 t
  70. // 针对uid推送数据/ m. T% ~8 y! ~6 G+ G
  71. function sendMessageByUid($uid, $message)
    7 F/ `* z. }2 j1 U2 k2 f
  72. {
    2 Z$ T# @( [. p
  73.     global $worker;) U; u/ C3 F/ _9 ^0 n4 b
  74.     if(isset($worker->uidConnections[$uid]))
    ! w1 E+ [0 M# p; ?
  75.     {/ q* h  w- d& T* n/ G+ X2 {. ]9 t" j
  76.         $connection = $worker->uidConnections[$uid];
    7 x* C5 S" j, J  J
  77.         $connection->send($message);2 s- ^1 Q! Q+ o9 X% `
  78.         return true;
    9 V0 J9 r. N  |7 n! N3 L; D
  79.     }9 m; s0 S8 q( G
  80.     return false;3 z) `) B1 D  M- A- D
  81. }* N" h1 l. @! K" G* G$ t3 w

  82. 8 {: Y6 t/ a- [  D/ ~; p  `7 S, i
  83. // 运行所有的worker( \) B) {$ ~- _9 K- J, u
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    7 ^# p$ Z0 b9 V7 n
  2. ws.onopen = function(){
    ( @2 P5 f" X7 h2 Z! u- [2 }
  3.     var uid = 'uid1';
    9 l/ |+ O  f7 l8 W  r- @6 k
  4.     ws.send(uid);) ]5 W( P) q2 p& B
  5. };
    7 ^" r/ t+ ^. ~
  6. ws.onmessage = function(e){
    ; D) b0 H0 }  m* @
  7.     alert(e.data);  B+ x0 S6 J2 o
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    1 e8 m+ Z. l& {, |6 B6 _& w1 a
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);8 Y7 u8 G9 w2 ?: R( X
  3. // 推送的数据,包含uid字段,表示是给这个uid推送4 e% n9 D' ~0 m, `1 R' {
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');& B" X# z3 i/ h, P
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符+ U" E5 v8 b; \' J
  6. fwrite($client, json_encode($data)."\n");; L" ]( B# t: I% A* ]2 E) ]
  7. // 读取推送结果
    0 |0 {, \  }9 W9 [
  8. echo fread($client, 8192);
复制代码
" i* _) m6 o+ A
( ?# ^2 a: H7 z
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-20 14:07 , Processed in 0.105802 second(s), 22 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!