您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14820|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    5 ~5 E  j0 Q+ [- s# e
  2. require_once __DIR__ . '/Workerman/Autoloader.php';. m8 c6 b1 ?/ U  t
  3. ) E9 _6 ~1 L& [: \* x
  4. $worker = new Worker();
    # |) Y1 k9 I6 i% |! N
  5. // 4个进程
    ! M" u+ y% Q1 J0 j
  6. $worker->count = 4;
    7 b1 r' O7 K! n3 p
  7. // 每个进程启动后在当前进程新增一个Worker监听
    2 N5 F9 W/ `5 X4 U; ]) h
  8. $worker->onWorkerStart = function($worker)& a! I) b& n. Z& r7 Y
  9. {
    ( s. u1 e( x+ |) ]! B8 [
  10.     /**+ k+ s' _: N4 l* A6 l
  11.      * 4个进程启动的时候都创建2016端口的Worker# D5 w7 I" |+ J9 d4 ~. ]) Z; U7 |
  12.      * 当执行到worker->listen()时会报Address already in use错误
    & f. m' ]5 D* v6 n; ]9 b8 B* @2 r
  13.      * 如果worker->count=1则不会报错* D* z+ C' E9 p. t8 u, E
  14.      */7 N/ l2 a& N+ s
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    & U8 R6 T! ~0 ^8 x
  16.     $inner_worker->onMessage = 'on_message';
    / h0 @+ b. \- r7 s" D
  17.     // 执行监听。这里会报Address already in use错误! T; z: _" h7 s3 V
  18.     $inner_worker->listen();
    * |! G: D, [) c) [+ A+ |
  19. };! C% f! Z' w+ B
  20. 4 E0 V. q5 P0 S" N) o, x8 E
  21. $worker->onMessage = 'on_message';
    3 ~0 j& F: \! R) Y  n7 Y

  22. / g+ @) W2 ?. `0 m& I
  23. function on_message($connection, $data)5 z) y/ W9 l6 r, T5 k
  24. {  @7 C# h9 Y, ]) L% H
  25.     $connection->send("hello\n");
    / v; g2 h8 Q- z) j
  26. }4 a3 A9 l, l$ D5 P# I( s6 a* {  K9 E

  27. " f& ]6 \, ]! k+ \$ c
  28. // 运行worker" h* U  s6 E' ?+ w/ M
  29. Worker::runAll();
    7 k0 h" `8 H+ t6 I
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:+ p6 F! E9 R8 |  Y1 T8 Q& p

  31. 4 z. B1 K3 |1 @
  32. use Workerman\Worker;. M: _, `) }9 W( R
  33. require_once './Workerman/Autoloader.php';5 M& l' y7 k9 c

  34. 0 T0 _- T# s6 z5 R9 J! r4 k5 v1 z
  35. $worker = new Worker('text://0.0.0.0:2015');9 M( W# v- X  Y% A: @! n
  36. // 4个进程
    # R9 Y- c/ _: H- E+ ]
  37. $worker->count = 4;4 }' i* O- z. Z: `7 G
  38. // 每个进程启动后在当前进程新增一个Worker监听' `& l; W* F- B. @; y
  39. $worker->onWorkerStart = function($worker)
    ; [* W+ J$ y/ L5 y- @* B% Z
  40. {
    4 j( y  {; G" T; ]3 H
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');3 o9 Y5 U" ]  O, N+ |
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)$ O- W/ b; \# U" H- R: A
  43.     $inner_worker->reusePort = true;
      m1 X* {3 _& j5 U$ S. n. T3 z9 n" h
  44.     $inner_worker->onMessage = 'on_message';
    # {- g* d2 n  E* P8 `1 A* |
  45.     // 执行监听。正常监听不会报错
    8 X: |, O* {. @* a0 l( {3 g
  46.     $inner_worker->listen();( O; k. h6 Y; k. {- J& K$ s
  47. };/ }* Q5 V& T) x) v
  48. ) X7 A7 Y. H; l; A4 M9 V
  49. $worker->onMessage = 'on_message';$ r+ ~  x/ U- Z" a1 k: i

  50. ! ]3 T& t# s: B  j! b7 y' f
  51. function on_message($connection, $data)
    - k/ |" L  {4 p/ x% Z5 U+ u2 a
  52. {: B# t4 v) f5 h: B' E: A( y) k$ A
  53.     $connection->send("hello\n");% D" w9 h1 W: C1 Y# @
  54. }
    " C1 g3 O$ n' E: R& p9 q1 l

  55. 5 F! l. |, W: J! o5 H4 ^
  56. // 运行worker
    ; v2 [- ?3 t  u& t) V, F# f
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php8 _! G, o1 w- I' e# J& C- w) ~$ b
  2. use Workerman\Worker;
    + e( p8 _, Q8 f, o
  3. require_once './Workerman/Autoloader.php';% N( A* A& z8 j$ m5 w2 M2 D
  4. // 初始化一个worker容器,监听1234端口
    0 D0 {. V; h* R7 \' _
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    * d4 l2 e0 a# T6 Q! t

  6. % A' O' `, D$ _7 E* H. P# s
  7. /*8 S9 I, A$ u4 \& T- v& D
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
      R1 P8 y/ z) K1 P1 J- F4 q2 }
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    7 J# Z. b% s- P8 t' y8 L$ ~
  10. */
    5 G: @4 ~2 J4 \0 _2 n& R; L0 h/ I) }
  11. $worker->count = 1;0 Y3 X' L. ]/ p( Z( D6 `: |
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    # O/ ~9 B. @* s- m
  13. $worker->onWorkerStart = function($worker)
    ; m5 z  j6 M. w. L2 x" F
  14. {4 t/ \9 Y' h. l6 G+ N8 V% g
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符& V5 x& o2 N. p8 t# |- |; Z0 q2 `
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');+ `9 Q, ^; X( A/ I5 I
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    $ q9 w0 ~4 w9 c% _  \1 I
  18.     {6 J& z0 Y5 {$ @4 p1 O8 J' M& W
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据! X3 Q* h: k" Q, e4 j
  20.         $data = json_decode($buffer, true);
    1 p$ L( {- |! b  r
  21.         $uid = $data['uid'];: Q" r2 r+ f2 ?6 I
  22.         // 通过workerman,向uid的页面推送数据
    + g$ D5 v7 A" |( w% w5 e) v3 |
  23.         $ret = sendMessageByUid($uid, $buffer);
    / z6 I! _0 x6 v" c" e3 p- j; F
  24.         // 返回推送结果
    2 q! U7 o  p4 q- y* U
  25.         $connection->send($ret ? 'ok' : 'fail');! q/ t( N% j5 ~) n
  26.     };  k/ S0 P1 r2 b
  27.     // ## 执行监听 ##0 v2 e/ r$ t- n" B# Q
  28.     $inner_text_worker->listen();
    : E& C; w- ^. o! y9 p
  29. };
    9 d; x1 r- e" c5 g/ k  J5 p# L. c/ i
  30. // 新增加一个属性,用来保存uid到connection的映射
    7 Q! y" ^: E& M; F3 X# v
  31. $worker->uidConnections = array();
    , t# v4 v. }% X, M. _
  32. // 当有客户端发来消息时执行的回调函数  [* `. ~4 {# \" K: p" a, i
  33. $worker->onMessage = function($connection, $data)# Z$ s" A$ r9 @4 J
  34. {
    0 Y  F# z0 Y9 d. m2 Z
  35.     global $worker;
    4 g; G- `# U" _  R3 [+ i5 t
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    # S2 O! n  o! A/ u0 P
  37.     if(!isset($connection->uid))
    5 S5 C2 m# u5 V2 P5 I" w
  38.     {* V3 Y' i# y0 E, i% K
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)# M7 d+ x+ h2 V9 y% N9 ~7 L
  40.        $connection->uid = $data;
    1 B: P1 o* C$ N" R: ]# |9 s" W
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    # ]9 a" |) Q% v, H! _4 W
  42.         * 实现针对特定uid推送数据
    0 \7 n; r- a& t3 E! O+ e$ p, e
  43.         */, y9 }* g" V, b( o; Y  _
  44.        $worker->uidConnections[$connection->uid] = $connection;
    + Y% S6 q: o0 Z/ l: g
  45.        return;2 K. G! z$ L; q6 J8 t3 T  ]0 G* G' v
  46.     }
    : M- Q3 T- g2 r
  47. };+ R" T# A3 q% r* n: Y
  48. 1 y  h# b6 j( y2 U; w" l: W
  49. // 当有客户端连接断开时  p1 \8 o4 m( v+ r( r9 T7 I
  50. $worker->onClose = function($connection)7 m. M+ L$ Q+ r$ _
  51. {+ ^0 k) R+ {- R: }
  52.     global $worker;
    1 z3 @4 f5 _9 k3 @
  53.     if(isset($connection->uid))
    5 C6 M( W$ T/ o- Y5 G
  54.     {
    ' C3 X, Z9 W1 R+ k5 [  q% R( h, R0 [
  55.         // 连接断开时删除映射
    3 N7 Q9 X/ o( K) Z2 q0 [1 {
  56.         unset($worker->uidConnections[$connection->uid]);
    6 _' r% {2 C& P+ b6 v5 x$ `/ h
  57.     }8 I5 C- v2 p5 X3 x7 c" \/ x
  58. };+ s% j6 ]: n$ I9 L# I2 V  A* f4 N
  59. , ?" u" T+ W& M) b4 t
  60. // 向所有验证的用户推送数据# L7 x5 E' Z  `. O9 d# R* M
  61. function broadcast($message)
    # q! }' _: u. T$ t7 Q1 v
  62. {+ m, `2 f; P, d5 C& M/ N8 v
  63.    global $worker;, I3 P  U9 I1 ?& a5 j/ E
  64.    foreach($worker->uidConnections as $connection)
    * }$ B- Z7 T- {1 j
  65.    {
    " x! _: X+ a& b
  66.         $connection->send($message);
    / N7 B& W0 J9 T* A9 p4 n
  67.    }
    8 ^$ J; L/ l4 o: K
  68. }! C2 K% y# ]2 W& O/ L0 N
  69. ( A& v# w" `5 A
  70. // 针对uid推送数据3 D* ~: i* Y( _' p* m3 [
  71. function sendMessageByUid($uid, $message), n# b- X( a; R$ C9 n. z
  72. {
    & }: a4 x: j4 U* Q2 x8 u0 L  F
  73.     global $worker;) C; J: s3 `$ z2 U$ d3 D* T
  74.     if(isset($worker->uidConnections[$uid]))' N+ j. R3 A" d
  75.     {9 X8 z9 }( E/ P* E0 c; Y" ^0 D+ X
  76.         $connection = $worker->uidConnections[$uid];
      _) T8 v8 Q: a$ }2 q# g* t4 |
  77.         $connection->send($message);
    6 P9 L; {& z- p, P
  78.         return true;# a) L1 u7 z" }! C
  79.     }
    - g* Y( d# g0 y+ ^# R7 G# ~& B
  80.     return false;" Z- j/ V! k5 B' t7 q5 ?+ r. ^; k
  81. }+ H* @1 w6 {% c! W
  82. 4 x! X8 c& A* e  N
  83. // 运行所有的worker
    8 E$ r, z9 H% N
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    0 j) n2 d& k$ n7 R0 X, v9 Q
  2. ws.onopen = function(){
      \6 w- C  _# e5 A9 @
  3.     var uid = 'uid1';
    0 m* ^* H1 w' f
  4.     ws.send(uid);
    4 B/ b/ B3 E# W- w* V/ K2 b
  5. };( c7 f+ e* I' d3 ~4 q
  6. ws.onmessage = function(e){
    % h+ ?) O+ b* I1 F' W- t& ]2 j2 s
  7.     alert(e.data);
    5 b' E- i$ {. N$ v! \
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    9 L$ E7 h  n( i1 K" C
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);% n! L$ g# l$ g" F
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    , m' ^" L0 s7 W7 j+ f/ N/ Q: S
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    # v( x+ l, v' ~% S
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符( X0 c! O- z4 `. ~7 y  x! s6 f
  6. fwrite($client, json_encode($data)."\n");
    # O" \3 J1 U4 w# Y4 G& _- v% c7 c
  7. // 读取推送结果
    0 c+ A( A: n& \) l- u5 I. Y
  8. echo fread($client, 8192);
复制代码
  G0 y; f. k  L7 K  N: Q
* \: `" `( v$ G* {: @
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-17 23:34 , Processed in 0.088201 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!