您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14668|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;, `5 D( [, }( p& @9 }  Q5 O
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    # q5 O, G; a: |0 |* Z- M: Z$ F$ t

  3. + x9 u5 k0 A- u) v. l
  4. $worker = new Worker();
    8 c7 o: v: {6 `9 K  g+ g3 m
  5. // 4个进程7 D, d  r! l, [. Y9 j: E
  6. $worker->count = 4;
    . u. ~, G- n4 E; ]
  7. // 每个进程启动后在当前进程新增一个Worker监听
    + o$ K  Z% {% _' n  m
  8. $worker->onWorkerStart = function($worker)9 B( `* R/ B, X
  9. {" ~+ ]. o2 b% P) P% v0 E
  10.     /**# i* Q8 m# X. L
  11.      * 4个进程启动的时候都创建2016端口的Worker
    ; Q2 i7 m) O' ]0 r6 |  ~: p# s/ g
  12.      * 当执行到worker->listen()时会报Address already in use错误8 @) _* A7 E6 K- z8 i0 c+ S6 u
  13.      * 如果worker->count=1则不会报错
    4 p, A5 B2 z$ i4 d$ C) x# W
  14.      */
    3 {4 _( L& o* \' q/ F1 ]- V
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    / K- [) J6 S9 {! _0 W; E0 s
  16.     $inner_worker->onMessage = 'on_message';) ]5 o6 J6 T7 K7 T1 ]
  17.     // 执行监听。这里会报Address already in use错误  H- Y! R" ?. h$ B6 V; g" K6 T* H& V5 n
  18.     $inner_worker->listen();
    : F: W2 [1 r3 U  R/ H  u" o' t
  19. };' U) q, B2 w3 i, N" G# p

  20.   J3 `: w  T; E" f
  21. $worker->onMessage = 'on_message';
    4 e+ e% T# t" h+ q* e: k0 Y

  22. . {& d! z$ W9 n) s. q8 q
  23. function on_message($connection, $data)
    9 c) L4 T1 \# L' ?
  24. {5 ~9 G& j$ M2 O1 v$ H
  25.     $connection->send("hello\n");
    ' l) i) `* {8 r1 s  [' g; a
  26. }4 D2 R0 H0 E" q! e$ H8 M

  27. ) n- H& b4 t4 \/ {# k$ z
  28. // 运行worker1 R) o* F+ Z* R2 x: Y! C3 L5 ^
  29. Worker::runAll();9 U( f+ P8 G3 o  V4 A8 c5 t' M
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:( h! F) f9 ?& R! v
  31. , }1 |/ D7 l- {( p/ _
  32. use Workerman\Worker;5 n# \/ f" ]# {: K
  33. require_once './Workerman/Autoloader.php';
    2 O, F9 W7 _7 z4 f. E& P
  34. 9 q: P: F" T, a7 x$ Z+ r7 S
  35. $worker = new Worker('text://0.0.0.0:2015');8 R" O6 s/ ~# W
  36. // 4个进程
    3 f( _* f/ B9 i: B
  37. $worker->count = 4;
    + \4 K4 `+ ~/ q' u) m" D5 F
  38. // 每个进程启动后在当前进程新增一个Worker监听
    0 H! v$ y' K& b
  39. $worker->onWorkerStart = function($worker)
    ) P* s% T+ p$ w3 g5 ]
  40. {- E* }( l# _  {+ ~
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    - _) F  v/ p# T$ N! B. i7 c$ f4 I4 O
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)% P0 A# w. G9 X$ a& Y& O+ \* p
  43.     $inner_worker->reusePort = true;7 b1 ^+ K) S7 J3 U( G% @+ r
  44.     $inner_worker->onMessage = 'on_message';5 o3 n7 q3 l3 t0 ?0 u/ x
  45.     // 执行监听。正常监听不会报错0 I  q/ e7 j- ^# n% |  y, _( J1 u
  46.     $inner_worker->listen();1 A) U, ~+ ]0 s( I' Y  D8 M  Q9 R
  47. };
    : Q: |! }- X. ?6 A( J' \

  48. 9 C$ P1 ~6 [: e! c1 j' S' m" a2 M- B
  49. $worker->onMessage = 'on_message';4 c' x0 Q% l* P

  50. ) [) j3 K) x) \
  51. function on_message($connection, $data)# o+ \6 U, Z! V' K2 F
  52. {$ {4 V7 X0 K  w3 t* `! m' h6 k
  53.     $connection->send("hello\n");
      ~! T/ `2 f, d( s
  54. }! U& f+ T, I7 {+ G

  55. ) v8 u! _, T/ e/ `1 H% m
  56. // 运行worker( a/ Y! Y2 }1 ?/ t$ x
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php4 ]4 p( }3 o7 T/ d$ ]
  2. use Workerman\Worker;
    ' k* X& W6 y" e! h
  3. require_once './Workerman/Autoloader.php';) S! J! b( W# n- p7 s+ b$ \' N
  4. // 初始化一个worker容器,监听1234端口6 {6 u: }/ M2 D
  5. $worker = new Worker('websocket://0.0.0.0:1234');  b0 n! H2 z0 |+ x1 {4 U
  6. : N7 R2 y* _# S  ]4 @0 E
  7. /*7 m4 V0 x9 t6 N5 k
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误- q, R0 J& x  j* t
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    - ~; {' `/ K8 q% u8 {  a7 D
  10. */) i: F. _3 R0 W) N" q( r$ i
  11. $worker->count = 1;
      z: Z1 Q& u, T, |" P$ m; ?
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口+ k3 }& t1 H4 D" R. m
  13. $worker->onWorkerStart = function($worker)
    " ]3 ~( Y- ?; L9 L
  14. {
    0 I) P& [, X! o9 G* Z, h: v' y
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符+ L, |! h  C% a4 |- ~5 T
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');2 F" H' j, }1 A; o& ?! ]" q
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    1 Y) q4 h- m$ Q, Q0 I  l+ x+ T0 h
  18.     {
    - A& l1 _$ l9 h' v4 G9 n
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    * u% B. Y4 m8 k6 ^9 |5 E
  20.         $data = json_decode($buffer, true);
    / }* F; q8 W4 A! h# }+ c
  21.         $uid = $data['uid'];
    3 t. E- f. }& p
  22.         // 通过workerman,向uid的页面推送数据
    - u! h" b, F# q
  23.         $ret = sendMessageByUid($uid, $buffer);
    . f/ s, T1 M0 y; x
  24.         // 返回推送结果1 J9 Q4 _+ ~  G" a8 h6 r
  25.         $connection->send($ret ? 'ok' : 'fail');& t% y- p* t& A+ D( g
  26.     };4 w7 U7 L  }9 t, x* ]  d! ~$ ]
  27.     // ## 执行监听 ##
    8 H* q; _8 I; n" `
  28.     $inner_text_worker->listen();. o2 K! G" K: P5 b% B
  29. };1 L( d5 z: P' k6 _/ }
  30. // 新增加一个属性,用来保存uid到connection的映射
    + N3 `" J4 }, {' M. J# s, e
  31. $worker->uidConnections = array();
    , w9 E2 `/ M  U% G
  32. // 当有客户端发来消息时执行的回调函数
    2 B( s1 A& f0 Y  Z* A: c3 f
  33. $worker->onMessage = function($connection, $data)
    ) n4 D3 b. J5 y+ X( C
  34. {
    4 d" K/ D/ R: _! j  A, O" P  {
  35.     global $worker;0 _+ k; }$ I' ?7 v. P9 F
  36.     // 判断当前客户端是否已经验证,既是否设置了uid( S$ C* B. ?7 E
  37.     if(!isset($connection->uid))4 D% e& e3 w$ H. O
  38.     {+ v5 E# l3 w, V+ Q
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)5 {0 D8 f1 E0 l0 Y
  40.        $connection->uid = $data;
    * D- ^* G7 }: n* ?
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
      e" |9 B/ M. g" `
  42.         * 实现针对特定uid推送数据
    ( ]# Z+ S$ f9 N9 y* o
  43.         */
    $ y" Z( u, m7 g) u
  44.        $worker->uidConnections[$connection->uid] = $connection;
    0 p! S! n4 U% q% K
  45.        return;6 L" \- h9 c" A% h8 Q0 B
  46.     }
    ! u6 F% ^( X4 K( g- N* U* a
  47. };
    8 q. r" Q  k( \/ s
  48.   t, O( [0 P. Q9 n5 c6 A# M8 W0 e1 h
  49. // 当有客户端连接断开时
      Z- ~& g# Q8 C
  50. $worker->onClose = function($connection)- I2 X2 O' [. A5 [' c' Y" V" [
  51. {
    / u' K; ?6 G, l# h. z' M/ E5 _9 I
  52.     global $worker;
    1 ^8 {+ c0 }" c, l* l8 ?
  53.     if(isset($connection->uid)), B- c' A6 m3 V
  54.     {8 P( X0 _6 i* ]7 P, t. `
  55.         // 连接断开时删除映射) M( G7 `% T! R; a# F- q/ K
  56.         unset($worker->uidConnections[$connection->uid]);
    $ m- n2 L3 f+ Z9 f( M# ?6 ^. l' o
  57.     }
    ) U; n  D# r$ J1 L+ T. t" x7 B
  58. };
    ) U( \1 v  F% @* c( Z) W6 A

  59. 5 I: N* n( x, Q) G" d
  60. // 向所有验证的用户推送数据
    ( S/ f7 d" i: ]1 n- M! R
  61. function broadcast($message)9 S3 G, O) r$ t( e
  62. {
    3 V; b' Q8 I. K# i# t! b" s
  63.    global $worker;
    5 o# ]% K! [8 p7 R+ z+ d: k4 o
  64.    foreach($worker->uidConnections as $connection)
    4 y5 ~  R! H. t0 {" b9 D! X4 o, T
  65.    {
    , W: k$ Z; r5 @# k) L8 i  V
  66.         $connection->send($message);5 c+ T* j3 `* T/ H
  67.    }
    & O1 C! d' x7 y) o1 N- g# W  e/ Q
  68. }
    0 a* b" X7 S5 E5 g& U& b( c+ t

  69. " L) S! ?' r0 L! c
  70. // 针对uid推送数据6 |2 e. l2 o* h/ V. ~) G  R$ v) Q
  71. function sendMessageByUid($uid, $message)
    ; o5 p( [& P. P- n+ s
  72. {# R" J. O+ P8 [0 E4 J# x
  73.     global $worker;# i  ^6 M' z. }/ H. z8 y+ q" f! O
  74.     if(isset($worker->uidConnections[$uid]))
    6 n7 c; }' \' E) w+ A
  75.     {9 @2 _- P0 K3 [$ J7 i
  76.         $connection = $worker->uidConnections[$uid];
    - u5 x. K9 u3 ?+ b
  77.         $connection->send($message);' ~. o  n/ }" m% d1 U: E, Y
  78.         return true;3 r, f/ Q  N6 @( Z- s$ v4 i
  79.     }
    ' l% o* H7 X, ^/ Z
  80.     return false;4 M0 Y6 t% G- d2 p& e4 J
  81. }; Z5 L: B  R( |+ O9 M. F

  82. " F# G. t2 F0 s* E1 X; S+ t0 M
  83. // 运行所有的worker- S/ m; J+ J; Y
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    " F$ Q: K8 \9 |3 j: `4 C& s
  2. ws.onopen = function(){/ T' R' T7 }2 z/ r6 T
  3.     var uid = 'uid1';- S( x% j  G- c: E
  4.     ws.send(uid);
    3 |) D2 K2 _6 `, _* \2 ?3 H/ X
  5. };
    6 g- m6 U4 U" \. \6 }4 o4 \
  6. ws.onmessage = function(e){& R/ |6 T7 q6 x7 G
  7.     alert(e.data);
    + B( p! Q+ h7 L( L
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口! I' k6 d# e% R. f; [! u" L
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);/ q3 `1 ?) g4 S
  3. // 推送的数据,包含uid字段,表示是给这个uid推送) B" c- e* \9 v* ?
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');9 Z# F/ I( d- ]& k
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    * V% }$ G6 X! g3 R8 g2 p- ]7 W( h
  6. fwrite($client, json_encode($data)."\n");
    0 d( b$ g- H! x" u' Y
  7. // 读取推送结果
    1 _% b2 q( n4 ]) G. c
  8. echo fread($client, 8192);
复制代码
7 B$ A; @% Q1 p. w

8 _) G) s: L; k
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-1-30 16:39 , Processed in 0.060806 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!