您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15230|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    + L0 `' U* @8 H) ^( P
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    ! Z3 M! n+ X, h4 S' c

  3. : k; ?) f: |  v& }$ ~$ |1 c6 I
  4. $worker = new Worker();$ z( _9 `" j8 }0 i* R
  5. // 4个进程( A, ^5 v( J2 [
  6. $worker->count = 4;
    # w: F& A' X. Q  Z8 S( ?* c
  7. // 每个进程启动后在当前进程新增一个Worker监听- l& j6 X6 K/ Z1 C4 N, @
  8. $worker->onWorkerStart = function($worker)$ s/ h4 b- ^$ x) r
  9. {
    2 b* s/ c( a; f. z: M
  10.     /**
    + g6 h3 g4 s, {
  11.      * 4个进程启动的时候都创建2016端口的Worker, n% P+ g3 [3 b6 ]; K7 |
  12.      * 当执行到worker->listen()时会报Address already in use错误
    : |, c$ J8 q' p" @1 Q8 t/ z
  13.      * 如果worker->count=1则不会报错
    & V2 N4 H8 @1 Q- M
  14.      */
    7 ^1 k' ^; x4 K$ P0 }- j3 e
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');0 |' ^1 l- i) L9 |
  16.     $inner_worker->onMessage = 'on_message';
    1 T+ `2 |* h% R/ p2 P3 S7 D
  17.     // 执行监听。这里会报Address already in use错误) r; v- T; I2 U4 _$ z7 d$ }* V/ _! u
  18.     $inner_worker->listen();$ i( w+ x. p: L: s* @
  19. };
    4 ]  d0 R. X- \- H
  20. 4 G. F- k5 _' [+ i! K# P  m
  21. $worker->onMessage = 'on_message';; f# o( q8 L5 X: X* F: N
  22. : {  o# q2 U% ^+ M' @
  23. function on_message($connection, $data)2 Y3 o# o# W5 P" g, F% j5 m4 G' w  O
  24. {
    $ l$ ?% A4 ~7 _2 F* h
  25.     $connection->send("hello\n");; e# r0 y! ?. S. @% }  d* S" ^
  26. }. C, V9 c' Y: a- ?
  27. # S6 K' R: ~9 L! B
  28. // 运行worker
    6 J( A  T6 @" z1 W: p. |
  29. Worker::runAll();5 W- e) j3 w' U4 N) e  o
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    6 z7 _5 Z! {: u

  31. + E2 d. N/ l% T7 W. [" b
  32. use Workerman\Worker;7 o! x. K/ G/ n) M, R! d
  33. require_once './Workerman/Autoloader.php';
    ( z! @3 {+ V' _3 S! u- Z8 L8 n
  34. ; i( ~1 K# S7 \" }9 P% s
  35. $worker = new Worker('text://0.0.0.0:2015');
    + S8 h: t: L9 ~  K
  36. // 4个进程2 N7 h5 |6 X. f) w. E: S
  37. $worker->count = 4;
    : t0 f1 f2 @4 H$ u/ Y5 m
  38. // 每个进程启动后在当前进程新增一个Worker监听
    0 W1 x7 V; D0 o: A
  39. $worker->onWorkerStart = function($worker)
      y* b1 S; G) k5 A7 N
  40. {' v1 k- M" ~( T- ~/ u& u4 C8 ^
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    : [2 H  ?0 ~$ w; V4 [( w9 f+ S- [
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)) t: ]% {" E" J) M* P6 M
  43.     $inner_worker->reusePort = true;
    % J8 w; ?2 z# q( w
  44.     $inner_worker->onMessage = 'on_message';. z/ n, O5 ?5 ~& D* S5 \
  45.     // 执行监听。正常监听不会报错3 u( V0 G3 R- V
  46.     $inner_worker->listen();8 C1 J4 h: s; h1 l- q/ w
  47. };
    + N$ M' P+ E8 Y( q; t' J

  48. 4 r2 R6 A* a+ N: ], w1 u
  49. $worker->onMessage = 'on_message';
    : u' p7 g% O/ X: M

  50. 1 S5 U+ m9 c. \8 `% d9 Q2 U/ k
  51. function on_message($connection, $data)- r$ T' X8 h( |& f; H* t
  52. {- K0 e6 @, X! l% u( R0 T6 j/ k
  53.     $connection->send("hello\n");, p0 e3 O& D( t( A) M8 i$ h3 a) f
  54. }. f# G/ A4 H  {/ Z
  55. ( F( q0 S5 y& `- C7 V' P. W
  56. // 运行worker9 P6 a( X! r1 F/ f+ U& d
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php6 n4 w7 I, c4 A
  2. use Workerman\Worker;
    * U+ U. V) u( \$ b; i
  3. require_once './Workerman/Autoloader.php';
    - D& c. o$ C2 ~' J
  4. // 初始化一个worker容器,监听1234端口- e7 N9 c6 n; |  v, V# G! ]5 G
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    * J" u3 ], |, J3 d
  6. 6 _- T  L9 j% o, u! ?
  7. /*: E! e: N0 p4 i: J8 M5 `
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误: k9 g2 c( u  F* b5 }3 _+ \+ T
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    * y; t! {! u/ C( o1 Q5 B
  10. */0 N2 F5 F) l5 R$ P2 O
  11. $worker->count = 1;8 v+ `6 s4 \: {& N
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口2 @0 f4 O2 F7 j! }" y( `8 R! [
  13. $worker->onWorkerStart = function($worker); v# Z. B6 D2 U7 Z
  14. {
    . z) A; Y" l9 i0 ?  x3 F
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符, k: ~7 S/ W. a6 I. S5 e6 n. m
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    2 z  E- i' T% ]# S4 @
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    * y8 Q7 e0 |8 t& f$ F
  18.     {  e1 {+ K* L" T  s, Z
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据) F* q( \+ r* y. c% p7 W2 V
  20.         $data = json_decode($buffer, true);
    # u+ Z% w, S5 |+ e
  21.         $uid = $data['uid'];5 M* i* L8 h( m; v
  22.         // 通过workerman,向uid的页面推送数据
    1 l& `; h+ J8 g7 R; o
  23.         $ret = sendMessageByUid($uid, $buffer);
    9 I( z; L; B0 v( W& |. g( N9 ^5 e
  24.         // 返回推送结果
    ; g% q+ V& z( e4 [  c
  25.         $connection->send($ret ? 'ok' : 'fail');
    # u. C4 s. W5 j5 r+ y. g
  26.     };
    9 L' o2 v, _1 `% Q
  27.     // ## 执行监听 ##( L4 X/ n4 m9 c% @2 G
  28.     $inner_text_worker->listen();- _6 V7 V  v3 `. m) s
  29. };- u& l  o) }7 U5 J+ V5 A9 `
  30. // 新增加一个属性,用来保存uid到connection的映射
    8 ?' m# l& T9 Z
  31. $worker->uidConnections = array();
    / p/ m. i+ A$ ?4 H$ y0 r6 N# n- |1 v
  32. // 当有客户端发来消息时执行的回调函数
    . o: _* G* _% b+ J
  33. $worker->onMessage = function($connection, $data)2 w5 {  M' z( {- C0 u& a
  34. {
    " g. T; Q1 p2 ~0 }
  35.     global $worker;2 c  l$ ]& e" M+ {
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    & z! s, j( U- ~# I
  37.     if(!isset($connection->uid))9 p+ i; `, Y$ Q# m/ R" Y
  38.     {, i7 Y5 d& s0 g8 @
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)# n$ U, U- v9 g/ B
  40.        $connection->uid = $data;
    & E; q  c; q: u0 H) D* l
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    % w0 b4 S- G1 G# j3 J6 a$ @1 o
  42.         * 实现针对特定uid推送数据
    $ U' }: [. T9 g
  43.         */
    * g3 Q, o% S# s; ]5 z5 ]4 K
  44.        $worker->uidConnections[$connection->uid] = $connection;
    $ n- C7 W& K- X0 `( A7 M7 Q/ L- ?
  45.        return;
    9 i1 u' y" S/ X
  46.     }
    ' J( T' J- j# u
  47. };4 ]7 `( S' ], e( L; _; T$ L; z
  48. , i8 h; r* l/ Z3 D, s& G
  49. // 当有客户端连接断开时; p) e* U, ^! N- @4 T+ s
  50. $worker->onClose = function($connection)) l, @. |8 ]! Q& P- V/ ~! U
  51. {' s) v( j$ ~: G) r% I6 l% h
  52.     global $worker;
    0 D8 [+ G% l3 z3 {- C3 F3 H& B
  53.     if(isset($connection->uid))% o( P3 u, Q2 A, [
  54.     {
    & D: n# Y: P/ ?2 \- ^- V7 `. [
  55.         // 连接断开时删除映射8 T- @; b6 w* y, j" z
  56.         unset($worker->uidConnections[$connection->uid]);# w9 f. U* y5 f4 P4 b: z) l8 z3 K, d
  57.     }5 p& c6 m. I! i; d
  58. };6 B) n' m2 s% y5 e  [
  59. * J0 F2 t" d& i
  60. // 向所有验证的用户推送数据
      B8 y9 b6 b9 ~0 t; S# k' i
  61. function broadcast($message)
    : P! g: d1 U4 ]1 C
  62. {% i) |# y5 m# L( |9 T$ W% _
  63.    global $worker;
    1 B( [4 Q8 w9 q" |3 N/ a4 z
  64.    foreach($worker->uidConnections as $connection); q, j' ]7 s2 T) j" x  Y/ l
  65.    {
    " T+ T& }; J7 }8 P! `* {
  66.         $connection->send($message);& b( J: c* S9 ]3 P
  67.    }
    3 I& h, L, I8 q7 P7 C: x- O
  68. }5 f7 `2 d: \9 P, f6 @, q' C7 e/ G
  69. # t/ I2 i" c( a. p
  70. // 针对uid推送数据
    : \; U2 N7 D( C( M" f+ T* V
  71. function sendMessageByUid($uid, $message)
    1 P/ q0 P) R- ?& ^3 P, m2 n, C
  72. {& t8 ?$ N# t( p+ K% R9 H  m' s- o# w  T
  73.     global $worker;8 H5 D) P' t7 |( }" l; |
  74.     if(isset($worker->uidConnections[$uid]))
    & q9 L4 Q( O8 h; A: i
  75.     {+ x' U+ i* n7 p: z
  76.         $connection = $worker->uidConnections[$uid];
    1 m+ B' ?) b- x9 h
  77.         $connection->send($message);! S: Z2 V8 D& t2 Q+ P8 P4 u2 k
  78.         return true;
    & p. A% G0 a' w' o6 p
  79.     }" [" C+ L1 C+ U" d" h9 K* ]; X
  80.     return false;
    ( z5 L1 U2 @/ m, _( [) i) G. D* ~
  81. }4 o. r2 {% n2 @# U. Q7 g

  82. + f1 i, O+ d. Z" O4 _# B
  83. // 运行所有的worker
    & d. G2 O3 D) F6 ~  M
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');; ?) {# T5 W6 H0 I$ q6 t
  2. ws.onopen = function(){3 Y2 \3 U' n4 a" q) D
  3.     var uid = 'uid1';/ G3 X5 \% y% S4 _& w' i* z, }- `% }
  4.     ws.send(uid);
    : g+ W* t; [: p# a
  5. };
    5 i4 Z7 }1 M2 W, E' z  u  z1 k
  6. ws.onmessage = function(e){
    ' O, `+ W5 _5 I0 O3 y9 K
  7.     alert(e.data);
    5 D! E2 s. F6 A& Y( Q  h
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口; ]8 m, H# E* J1 ]
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    & M$ S+ u& P! m  K
  3. // 推送的数据,包含uid字段,表示是给这个uid推送; r: N  J/ ~/ s, {+ g
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');& A; j. g  Q+ O: W7 @; R
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    % Y6 `# a- n4 e8 X0 Y" I. \* m* J
  6. fwrite($client, json_encode($data)."\n");  G7 D3 r: l' P( t
  7. // 读取推送结果; V# X1 Z+ g+ {" f
  8. echo fread($client, 8192);
复制代码

  t: ?* ?% W( N' v6 p: W4 m6 J0 e) q) |: C+ U$ P
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-6-20 04:19 , Processed in 0.085073 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!