您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12160|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    % G* r: I# y- H8 d6 u
  2. require_once __DIR__ . '/Workerman/Autoloader.php';% e' Q) i+ |7 K1 t- ~* t6 j

  3. ! {0 F: ~% o' n% o! m7 U
  4. $worker = new Worker();1 B2 w! G$ K: [* _* T7 M8 ~: F0 _
  5. // 4个进程
    ) A% }9 F$ W5 Z4 O3 p
  6. $worker->count = 4;
    ! D+ Z& J5 U: [4 @
  7. // 每个进程启动后在当前进程新增一个Worker监听, ^! ]/ |$ ~0 _: p; J
  8. $worker->onWorkerStart = function($worker)
      i$ B+ Q0 d0 [! C5 \. ~6 o; ]
  9. {
    5 l4 M9 r2 P5 e6 q2 s
  10.     /**
    8 j- ?/ @2 q0 w9 N. r  [. q
  11.      * 4个进程启动的时候都创建2016端口的Worker1 D6 h! N2 T1 y, r) |5 g! Y
  12.      * 当执行到worker->listen()时会报Address already in use错误5 I/ |0 Z( {, \: f& Z
  13.      * 如果worker->count=1则不会报错
    ; Y0 I+ y' O, Y2 a
  14.      */" i3 t) r& l& X  y
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');6 [* A' y0 I! Q0 `" b
  16.     $inner_worker->onMessage = 'on_message';: u& D7 l" S, R( n0 O
  17.     // 执行监听。这里会报Address already in use错误# ?4 B# C# X/ E
  18.     $inner_worker->listen();8 f$ F( r5 F/ m% E- B# f4 x
  19. };
    9 E" P, s3 s7 n6 s) x# K* i: h
  20. 8 U8 Y6 j- \: a6 i& I
  21. $worker->onMessage = 'on_message';3 T# p" H# ]6 C# z4 [
  22. 6 d0 X/ U9 c: ?; r
  23. function on_message($connection, $data). I' C6 u$ f) K
  24. {& G, _5 E9 ~3 L" b  c
  25.     $connection->send("hello\n");, T3 O" J' x4 ?" s; F
  26. }
    . ~: o1 V5 y) L  V. ^
  27. 5 D1 I5 ~( D6 g2 {; ]0 I" P
  28. // 运行worker
    / Y8 [2 U1 B6 z$ u
  29. Worker::runAll();* Q. s+ `. H7 G
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    0 u* j4 V  K* g, z) ?- t1 B: G

  31. : y* c; j% n7 t1 c! j; m
  32. use Workerman\Worker;
    1 E) \( _& c; G% h# c5 g1 J7 O
  33. require_once './Workerman/Autoloader.php';7 K# {; u! ]0 V& `5 @, D; P

  34. $ O9 L- E* E0 R" j' z
  35. $worker = new Worker('text://0.0.0.0:2015');; g" _( M  i# D
  36. // 4个进程
    8 @. z, s! @1 l2 N
  37. $worker->count = 4;
    : y2 M  f1 c( {6 N0 Z' C
  38. // 每个进程启动后在当前进程新增一个Worker监听9 v; y  E) l8 Z& z, f
  39. $worker->onWorkerStart = function($worker)
    6 z6 \7 \, ]+ r  D3 Q) D' Y7 {
  40. {
    . b" R* I9 Z! ?9 F- P
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    # s- G+ \: m3 T+ j% r7 m$ b9 X
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)' A' o1 ]( H# e
  43.     $inner_worker->reusePort = true;
    # o0 Q- L* J, ?+ l# p* u' a+ G: s
  44.     $inner_worker->onMessage = 'on_message';
    8 t- Y$ t! Q- T: J/ s" z3 G
  45.     // 执行监听。正常监听不会报错
    6 H/ q  t# U: E& t8 ~
  46.     $inner_worker->listen();
    # ]' [" e3 `( c, [& L
  47. };8 _3 W# y* h+ K0 o, p" t- O

  48. 3 l' j# r; [) s3 i
  49. $worker->onMessage = 'on_message';
    ! t/ {0 n# B8 s1 z$ |
  50. . I+ j$ Y+ v, E# W, }
  51. function on_message($connection, $data)+ Z5 c; X- H& _7 B3 S
  52. {/ M' x" V8 N4 h/ W
  53.     $connection->send("hello\n");" O6 i. M: X# ]6 q3 }
  54. }1 Z5 U2 g/ e3 b6 F' y; ^
  55. 4 r; W; ^( ~; C, E
  56. // 运行worker
    : W" G' N# D' s
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php: [: q! Q3 d: q5 Q
  2. use Workerman\Worker;
    ! R* ]2 t7 O* u2 B0 r
  3. require_once './Workerman/Autoloader.php';8 z) }" q0 ?+ @( D& a
  4. // 初始化一个worker容器,监听1234端口& |) m4 O" \, R* n! I
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    5 e+ M6 q/ z0 u+ \! Q. ^# d

  6. / j. r3 r' Q: R
  7. /*+ \2 ]& m' ~4 g+ ~) V) x3 i
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误7 @6 Z$ e! h- H; }/ Y
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)6 v& k) e$ h, g
  10. */$ ]7 E  i/ M# f* E4 Q
  11. $worker->count = 1;
    4 d4 D* D! g( Q& Y  q; G# h# t
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    1 K/ G9 Y+ ?/ c0 l
  13. $worker->onWorkerStart = function($worker)
    9 U3 d0 Q, N0 S1 |8 E% K
  14. {$ M3 ]) q# z$ j/ S: s2 |
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    . [" d- d+ e9 e5 J3 @
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    ' {/ z% b' }4 S" y
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    ' X! X2 y8 \, u4 O. z7 \
  18.     {
    * ?+ `" p: x3 Y
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据9 Q) x2 k! ]4 n: _" U# u: s
  20.         $data = json_decode($buffer, true);  N. @; `+ }7 R7 T7 d
  21.         $uid = $data['uid'];2 u% v- u8 `- @# f/ o- L
  22.         // 通过workerman,向uid的页面推送数据3 _1 C$ S/ x. r- |
  23.         $ret = sendMessageByUid($uid, $buffer);
    / z3 R7 |* k6 f) @
  24.         // 返回推送结果
    . i! `: Z8 A: t# t
  25.         $connection->send($ret ? 'ok' : 'fail');
    5 U6 M$ Z# r: q* N6 p
  26.     };
    9 B; q* K1 }- `& b* T
  27.     // ## 执行监听 ##
    7 Z4 v& l5 x4 u: F7 X% i
  28.     $inner_text_worker->listen();
    3 T3 h. T- i! l, Y! [
  29. };
    3 e1 o4 Q# b# Z+ p  \. ]+ [7 R
  30. // 新增加一个属性,用来保存uid到connection的映射
    $ _+ k' d: k; s
  31. $worker->uidConnections = array();
    5 U' C) M% ]1 g
  32. // 当有客户端发来消息时执行的回调函数
    # k  z( g3 c9 M1 i
  33. $worker->onMessage = function($connection, $data)' Z7 ~2 c) R" Z& }& }% ~
  34. {* R  y, a: r  l& D
  35.     global $worker;7 v& S- M. s- A& ^0 ^  `: F
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    3 k7 d0 z# d- o  ?( S% i
  37.     if(!isset($connection->uid))
    . o0 C2 k6 s5 u! _6 Y& f8 r: \
  38.     {' s) l; a( V: M7 |- ^7 {# M9 m
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    % _" g  c  q  S3 C* W" s9 \
  40.        $connection->uid = $data;
    - H6 k) l# t$ [* P; y1 c
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    ' R' C/ _9 v+ A; p/ X) E$ B
  42.         * 实现针对特定uid推送数据  S# N) S3 \, e. f% v
  43.         */
    & G+ M0 {' [& l; W
  44.        $worker->uidConnections[$connection->uid] = $connection;
    : O2 y' ~* n+ J( f% k& R7 U* t% @" @
  45.        return;6 w9 ~, a$ n3 t9 a$ D: i
  46.     }
    7 u, Z9 }& }8 r
  47. };
    1 `$ w: h+ C' O  L/ Y. g9 c9 K
  48. # ~% k9 e& |+ p
  49. // 当有客户端连接断开时
    - R6 Z" h( R+ x  L$ X/ t
  50. $worker->onClose = function($connection)
    " F) p( C: L# F& X
  51. {
    ; |; M( g+ g+ j$ R/ v/ p
  52.     global $worker;0 m6 u) ^" W" f5 l) w- d; p4 Z+ ~
  53.     if(isset($connection->uid))8 {& {$ K* q( h1 J& W
  54.     {
    0 s2 _( W2 e; \) X( V8 @% d' H
  55.         // 连接断开时删除映射9 y) O# V; {6 q
  56.         unset($worker->uidConnections[$connection->uid]);
    4 n& A& }; Q+ ^9 @
  57.     }
    . {% C# n: q) C( v9 Y
  58. };
    ) [% [. f( U% Z$ F$ l$ s

  59. & q2 Y" i- x+ T5 I( i7 U# \
  60. // 向所有验证的用户推送数据9 J3 d+ V, ~- N: d  q) }& S" b
  61. function broadcast($message)  P; |4 q& T; y" [6 x' y5 ?0 v
  62. {
    7 A) {+ M5 X: {3 W* X7 i- s
  63.    global $worker;
    # }7 z% M3 B/ Q, t5 I
  64.    foreach($worker->uidConnections as $connection)* R, D; m3 {* R; t  Z' g& b
  65.    {$ k$ p1 s+ l- t
  66.         $connection->send($message);
    4 r7 z, i3 V% C% |
  67.    }" }' K" I2 J4 Q
  68. }
    - [6 g, _+ D6 q( F% k

  69. * J  u6 Q4 q1 Y
  70. // 针对uid推送数据
      n+ P, d  c0 ^' j+ V' R) {% S9 I
  71. function sendMessageByUid($uid, $message)
    4 \% I3 @/ q6 _: ~
  72. {
    ; [& p/ e7 v) y* T
  73.     global $worker;
    $ {3 b8 X  A& Y$ X2 q1 b6 J
  74.     if(isset($worker->uidConnections[$uid]))
    ) o5 Y8 u0 c* ~6 k
  75.     {# ]# Y& {# J% F! N/ T  f
  76.         $connection = $worker->uidConnections[$uid];8 I; X) m0 F3 j8 z" u- ^/ K# ?# [  b5 ]
  77.         $connection->send($message);
    $ y( q+ B  u* K/ u( U% l8 Y
  78.         return true;" \+ i9 E. i8 F; H
  79.     }1 A' a+ ?& O. o0 ]- |9 j
  80.     return false;3 F- j6 n+ l5 w& |- C0 a
  81. }
    8 M: C6 |2 f: j/ S% N1 V. b! Y

  82. 0 ]2 U0 V0 [& B- \
  83. // 运行所有的worker/ m" p- k( B  P# ~/ @: ~
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    , o2 Q' w% r2 d4 U( S1 z
  2. ws.onopen = function(){0 Q, V+ S; H4 ?
  3.     var uid = 'uid1';; w3 A4 k+ f4 H4 d) r+ r
  4.     ws.send(uid);
    ( ]' ?. o9 f1 F: ~- c
  5. };
    , S# l: \; N" L# D* T& T
  6. ws.onmessage = function(e){& }* A3 |3 Y( }' B8 B0 o
  7.     alert(e.data);9 e/ I3 r* o" u0 d
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    " t6 Z; h( Z, ]! S2 [
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);0 a, y9 O# r$ g. n3 Y& O6 l* u
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    / U0 W! B% l/ u# w# M$ u# ]$ ~
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');7 e* S$ y( }& V2 a: \3 O. \
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    1 _+ k- d- Z8 m6 A& B
  6. fwrite($client, json_encode($data)."\n");# r$ L4 C8 C' V) P  R: y
  7. // 读取推送结果# L# k5 X& m7 E, l$ a! I1 F  E$ @
  8. echo fread($client, 8192);
复制代码

# B9 Q! R: H; j9 O( }9 P/ G7 J/ x* d  Q* _
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 16:59 , Processed in 0.106914 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!