您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12143|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;3 Y1 T5 M7 K1 N: r  r
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    " _/ [' J; Z* P( t

  3. 6 w' D8 p3 g% y, M5 r
  4. $worker = new Worker();
    ' ?+ w$ O, R9 e( K) \7 C
  5. // 4个进程8 J% P* t  i# s2 p  p5 X
  6. $worker->count = 4;
    8 u5 a+ G- i9 |1 u: Y
  7. // 每个进程启动后在当前进程新增一个Worker监听/ f7 e3 _$ ], ^
  8. $worker->onWorkerStart = function($worker)
    / t# h7 r6 h  x+ `
  9. {
    * @6 B/ D* n  A- ?6 {* S4 a
  10.     /**
    * }  v2 l+ c+ Z% X; Z: ]7 y! o
  11.      * 4个进程启动的时候都创建2016端口的Worker; s( o. b& Y( [. |% ]7 S
  12.      * 当执行到worker->listen()时会报Address already in use错误
    ! D0 n, o5 _- S
  13.      * 如果worker->count=1则不会报错
    / P' D8 i( Z' O0 v1 h) c  C
  14.      */
    " {- b8 C% K# l% }. t: [+ e4 U  i
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');; j& D' s( y2 a7 j! n$ Y
  16.     $inner_worker->onMessage = 'on_message';2 V! p1 }4 O$ y3 ^$ T
  17.     // 执行监听。这里会报Address already in use错误
    " o0 E8 w+ k" I9 e: O! q
  18.     $inner_worker->listen();/ }5 h5 `: R% Y- t9 ]3 v9 z
  19. };
    * B/ c+ |/ t6 E/ W

  20. # Q$ O0 B, u- \
  21. $worker->onMessage = 'on_message';
    2 M5 |) L3 A6 H. k7 F1 I
  22. $ ?5 e) ?+ l! Q. z6 A! S& j
  23. function on_message($connection, $data)7 g0 Y2 _( L, S5 P; ]0 ~8 z% b3 d
  24. {
    , j8 i9 `8 [8 f8 `5 L1 L
  25.     $connection->send("hello\n");3 R' t: T2 f& D, E& _# r  {
  26. }5 ^9 n% f! J) `
  27. & K4 p8 S6 L. d
  28. // 运行worker8 x3 B$ A* `( v' H
  29. Worker::runAll();* y+ m9 n$ ~; ~7 D
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:1 _6 e6 V: R& Y; P- `6 I
  31. 5 p0 {# Q. P7 Z2 y2 [
  32. use Workerman\Worker;
    6 n, `1 R- \7 }/ B, w
  33. require_once './Workerman/Autoloader.php';
    $ K. v! t. d3 _; V% C, {
  34. ' M$ t) }* o- S8 X' x# q' h
  35. $worker = new Worker('text://0.0.0.0:2015');9 L3 k: u, \5 }" W' q, B
  36. // 4个进程
    * Z! f8 u, H8 o4 W# _( S
  37. $worker->count = 4;
    ) c: t5 R3 d+ I* N8 N" l! ?9 y
  38. // 每个进程启动后在当前进程新增一个Worker监听0 t0 L0 ]3 Q9 g5 n+ S
  39. $worker->onWorkerStart = function($worker)4 G, j+ _  G+ J6 |/ R4 P- E( s7 v; l
  40. {
    & N! h- g" B7 d# M2 _& D6 [- f& Y
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');- s" _9 W9 u  o1 N0 G8 {$ `7 u
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    / O& g7 p! I7 x
  43.     $inner_worker->reusePort = true;
    ) l8 @/ j1 W, s' a
  44.     $inner_worker->onMessage = 'on_message';
    * r! F& d% U1 t4 t" D# t
  45.     // 执行监听。正常监听不会报错
    4 E0 ?3 |+ }* z% N, ]( n
  46.     $inner_worker->listen();: E# o6 m9 v! c
  47. };
    & a, e& G1 b2 Y! }8 r/ r2 O( U5 [, K

  48. 2 t. w( j' C% ~8 A
  49. $worker->onMessage = 'on_message';
    3 p* t: w, T6 x1 P! u/ Z
  50. 7 f6 {7 W& o+ g& F
  51. function on_message($connection, $data)
    . H- c6 }7 d8 p: d
  52. {
    4 H$ V6 U8 V( p. I, j0 l' z, N( V
  53.     $connection->send("hello\n");
    " Q6 y( k6 [. E  ^
  54. }8 d7 b& Z" }) |  p- ~% ~
  55. ) p2 l- |/ M" G
  56. // 运行worker
    / F+ ^+ F" ]+ m7 v: ?3 w* K: X
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    4 I  v' L% e& P1 F
  2. use Workerman\Worker;  C& b5 D( Q9 k: q0 G( s
  3. require_once './Workerman/Autoloader.php';( M( E  n, I! X: g* p
  4. // 初始化一个worker容器,监听1234端口
    4 ?1 ^3 e# Q6 X4 K
  5. $worker = new Worker('websocket://0.0.0.0:1234');3 J+ u& y7 n" t7 d
  6. & M* ]2 D4 I- _7 K
  7. /*
    % _9 h* c3 k$ q2 |; V: ?
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    " ?' G" H: m6 P2 a' a  R
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)/ k1 t0 l$ Q; M+ z- R2 Q% E& U
  10. */- M4 J0 j, f& h7 G$ H' z0 w
  11. $worker->count = 1;. C9 k3 }# S  K8 V' `% t0 Q& w
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    4 Z0 Q2 q5 C! P* Y; r2 G
  13. $worker->onWorkerStart = function($worker)( B1 S0 B) s. ^/ w( S
  14. {
    ; z2 u& N" ~" n  j! J
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符3 d2 D( _0 S* H; S( Q
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    . d& ^% b0 W& v# E/ m) y
  17.     $inner_text_worker->onMessage = function($connection, $buffer). E" r/ M8 M$ W+ R& w: J
  18.     {) {0 T) \' G5 l4 K* |* _, i
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据& t3 S) v- }8 _
  20.         $data = json_decode($buffer, true);
    - _. \# v4 G' W( e; ~( ]9 p
  21.         $uid = $data['uid'];; p, e; \  K0 t
  22.         // 通过workerman,向uid的页面推送数据
    : a2 g: x5 v' t( ?/ [5 L
  23.         $ret = sendMessageByUid($uid, $buffer);
    ) x4 a% O4 `  A
  24.         // 返回推送结果
    , l( S+ @8 q: ^" W
  25.         $connection->send($ret ? 'ok' : 'fail');
    0 c1 Q  L- c, h' w% w: z
  26.     };
    4 G, y# }, s9 r) T4 ?' Y
  27.     // ## 执行监听 ##/ o9 o/ a$ w" K1 ~/ ~5 X- c
  28.     $inner_text_worker->listen();" Q" P7 d: b' X" F" B
  29. };
    : y' I. ]4 H( m5 O# X" E- E2 P
  30. // 新增加一个属性,用来保存uid到connection的映射
    ' K! T& n0 W( f- x7 g; k; X4 C2 p1 d: l
  31. $worker->uidConnections = array();
    ; A6 h5 g  C7 v4 @: P6 @/ c' `) c
  32. // 当有客户端发来消息时执行的回调函数0 j; }2 M: D3 C( v
  33. $worker->onMessage = function($connection, $data)5 M6 a6 G# }) I
  34. {! z1 G1 T( A0 k. }: H
  35.     global $worker;7 @+ [" j, J7 k. w0 g- a$ \
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    ; |9 D) [- E/ p9 d1 x: }
  37.     if(!isset($connection->uid))
    ( o  [$ m" M: m+ p. y% z
  38.     {% E" s+ o0 z( V$ Q5 \( Z" p0 A
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    ( q6 P; D2 @2 s" b$ _$ Y) J$ h
  40.        $connection->uid = $data;* k3 @' K" J$ c3 l3 D9 c, W
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    & D% C" ^6 t7 W! \
  42.         * 实现针对特定uid推送数据
    2 m) x9 r: c% i$ A3 Q
  43.         */# Q2 O% V; C- T; n' ^' u2 R
  44.        $worker->uidConnections[$connection->uid] = $connection;
    5 T& M, ~  Q5 z, K7 \5 D: w; O
  45.        return;
    $ X& X6 Q9 K" C8 C- n# E
  46.     }
    ; q9 K5 m; ~: x, T4 d6 ]# \" U  n
  47. };
    : w( K; ?& o, R  R0 |' h' U
  48. 8 c! Q8 J0 @7 d$ d/ u: [
  49. // 当有客户端连接断开时" w2 I% d& O6 [) \, m& [
  50. $worker->onClose = function($connection)
    9 C- |0 _2 x' B  O& I! W/ P
  51. {
    4 ^% F- V) p" S# Y; q
  52.     global $worker;$ W) b9 t; y5 R5 S/ O
  53.     if(isset($connection->uid))$ O& [; G4 Z- v8 ?
  54.     {
    0 `* b+ D) [4 o: S/ w
  55.         // 连接断开时删除映射
    ' @( n- T: Y& H& |
  56.         unset($worker->uidConnections[$connection->uid]);
    1 Y1 N* d& u$ `* q
  57.     }
    4 x* }% n; }* }! h
  58. };0 E, w, ^3 v1 R* a# v: Y

  59. " v* X  ^/ z) x; D  M3 T# }
  60. // 向所有验证的用户推送数据6 ]* E- U/ J2 u5 ^
  61. function broadcast($message)
    3 m7 z) ~: `4 ?' T, u: ^9 U
  62. {) V2 M" i  N& V3 B% b
  63.    global $worker;8 F5 G$ b" I* T
  64.    foreach($worker->uidConnections as $connection)
    9 p" N3 E7 j4 r0 s- c  B; x7 S
  65.    {
    ) m+ U6 W; w$ r% M, W* H$ V1 b
  66.         $connection->send($message);
    / y# G, x6 q" d6 [9 C& I" Q+ z% f, K
  67.    }
    & j+ s: b! j  X$ R
  68. }" [! E1 ]7 T$ m5 N: n& O7 }( }

  69. 7 n& S; q1 L8 x& Z: h0 N
  70. // 针对uid推送数据
    % @" D% I7 g+ @& N+ a
  71. function sendMessageByUid($uid, $message)
    " ]! y$ b, O/ |- Z% ^2 e( E/ l
  72. {0 Y. f( X  w( A. F8 ~3 W5 n( I
  73.     global $worker;* T) m* m, L: b9 Y! L9 E$ m5 ?$ p& d" ~
  74.     if(isset($worker->uidConnections[$uid]))9 g) C% X" M* s2 v; p# ~
  75.     {
    4 ]( c* W& X+ d1 i4 }! q9 s
  76.         $connection = $worker->uidConnections[$uid];! b* _. X# O; D6 l- M% `& r
  77.         $connection->send($message);3 K5 q' c' d; N
  78.         return true;
    5 b. g$ T) g6 \* z, u# ~9 A- h
  79.     }
    * e* U1 m5 g7 v5 n& v
  80.     return false;
    2 N( p# U/ K/ u; `
  81. }
      X( F" v6 n7 i# n& E* |
  82. + N( }5 L. ?  i+ n" c
  83. // 运行所有的worker
    . J. U+ w5 d6 m
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');0 Z  L( J% c9 A. ^1 w9 X
  2. ws.onopen = function(){. P) }% g& w. `7 v. `
  3.     var uid = 'uid1';5 k) E0 F8 r, d- k( I( K
  4.     ws.send(uid);
    % V, C& f# ]( ^& E+ w
  5. };: n. q+ R2 g0 n% O1 K( Y6 s' P) d
  6. ws.onmessage = function(e){- p9 h2 P" j( k: W
  7.     alert(e.data);# k; s! Q" H3 v; O9 Z1 J
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口$ q) z( C0 T% Y: b. ~) K
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    1 ~4 O8 k: ?6 Z
  3. // 推送的数据,包含uid字段,表示是给这个uid推送) O* X% h7 W7 D& M# E
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    % H: G* l: U3 I+ k) F
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符+ Z3 G1 }) |% {) ?: d
  6. fwrite($client, json_encode($data)."\n");$ l1 K! ~5 k+ G( S
  7. // 读取推送结果
    8 P- M1 j/ C) q0 `7 I, [2 z
  8. echo fread($client, 8192);
复制代码
+ L/ r, u  {4 e2 |2 x% B( T! p, @
/ Y0 T7 D0 Z' K
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 11:44 , Processed in 0.116904 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!