您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15225|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;# n/ C7 s( V) r, D
  2. require_once __DIR__ . '/Workerman/Autoloader.php';/ \1 T4 A# f# d, {

  3. / C7 e" e6 v- @4 K# ]( L, M6 o) N
  4. $worker = new Worker();
    ; ~5 P. v  {* m5 R9 P
  5. // 4个进程
    / s" I. i* q$ L  R
  6. $worker->count = 4;
    0 w6 Q9 n, z4 q2 {9 g7 ^. U' C
  7. // 每个进程启动后在当前进程新增一个Worker监听  t6 J/ d7 l: j
  8. $worker->onWorkerStart = function($worker)
    * H) w' L* Q% ^& t& G  D% X' s3 |
  9. {
    & v0 A: W4 |- Q5 r* ^
  10.     /**3 n$ ~. j& \4 V2 {, Z3 ~9 R/ t
  11.      * 4个进程启动的时候都创建2016端口的Worker
    8 P5 |: y0 w* T" j5 X! e
  12.      * 当执行到worker->listen()时会报Address already in use错误8 t4 t. w9 {' L5 y. m1 k# m5 P
  13.      * 如果worker->count=1则不会报错* D+ p3 E4 Q5 H: [8 u
  14.      */( d# v1 k3 d& F
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');8 v5 F- U2 J7 q
  16.     $inner_worker->onMessage = 'on_message';
    6 R! ?6 Z6 e  S% X& V
  17.     // 执行监听。这里会报Address already in use错误
    * |9 t1 a# y" D" {: g: r3 y
  18.     $inner_worker->listen();, @* _* P8 _; W) a
  19. };" l9 s' E8 \4 j7 J
  20. ) F0 D3 W3 o1 L% K
  21. $worker->onMessage = 'on_message';
    # U9 O$ t1 F( P* @
  22. 8 L* m6 |. B5 w3 p7 U) U$ L
  23. function on_message($connection, $data)0 ~/ @& k3 G; {$ W
  24. {
    - O! n2 F; m, S1 f5 q
  25.     $connection->send("hello\n");' j- z  l) K5 V
  26. }
    5 u. O$ Z: O$ h) a

  27. $ h: I& E8 y/ z
  28. // 运行worker
    + i' r: K& Z2 F, [
  29. Worker::runAll();% `$ K0 K( @' W6 v/ y
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:' f! x8 n8 `# \

  31.   Q9 Y. k$ r% `. |; t- O8 o1 b
  32. use Workerman\Worker;5 a* i& Y: R! m5 ]& W+ a. p* |
  33. require_once './Workerman/Autoloader.php';
    + i) a0 c! T3 y# B

  34. ) c! n" V2 k' F! ]: w" Q  X
  35. $worker = new Worker('text://0.0.0.0:2015');
    9 c4 q2 N* u$ M8 r$ l8 N7 \0 f
  36. // 4个进程/ O" Z$ s; z5 G  }/ y2 v
  37. $worker->count = 4;# ~1 x4 D3 ~0 s' E
  38. // 每个进程启动后在当前进程新增一个Worker监听
    2 f: M# `5 Q% U
  39. $worker->onWorkerStart = function($worker)
    & k, C9 P8 m9 h# E( O! L( v7 H4 Z
  40. {& e& h9 o0 I! V1 |9 ^
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');! u* J. {6 e  Q, u3 @+ ?0 X# Y
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)# j: t- {" p: W1 T; s# @" L" m
  43.     $inner_worker->reusePort = true;  l- F: K9 t8 W/ i2 J; M
  44.     $inner_worker->onMessage = 'on_message';
    1 S5 W2 l% W1 E
  45.     // 执行监听。正常监听不会报错
    / a5 g9 {0 T& N  y! Q8 Y/ E! |
  46.     $inner_worker->listen();
    4 t; P2 s  B" t6 U* J2 s
  47. };
    ) i+ g+ C& n$ j- {! K
  48. 4 b' D( {, _: ~6 o0 X' o6 {
  49. $worker->onMessage = 'on_message';
    $ ?5 d) b4 @. U' V; c  E' C

  50. $ D+ m5 S, h1 Q2 r. @0 C. _" F$ k
  51. function on_message($connection, $data)7 m+ {! k  ~- L
  52. {& J% @: B! ^7 G4 [' {: N1 j' U& |, k
  53.     $connection->send("hello\n");3 W: l' l6 |5 D- ?
  54. }
    ( x- Q' d% M4 A$ n
  55. 2 R7 q* ]( {4 g: r: I5 \
  56. // 运行worker2 z. n& n" y  g% `- f: A
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    : P! r. `- u2 O* I- Z8 l9 A$ b8 h
  2. use Workerman\Worker;
    : \& T' r9 v/ E; q5 U
  3. require_once './Workerman/Autoloader.php';
    + p+ v% j6 [  n4 E
  4. // 初始化一个worker容器,监听1234端口7 ]: O3 D" E! V4 `9 w
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ! D- ~2 i; S+ ]" H

  6. 6 K5 j, |# ^% g& o( `
  7. /*
    6 a+ \! h/ k$ [/ P  u: S5 a
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    " S. h6 J; L6 O6 s1 M0 q
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)! X! j, a/ b) Y  m9 @0 {
  10. */9 C  ~# _+ M- F7 [5 O7 `8 r
  11. $worker->count = 1;
    5 e9 _+ o1 ~# z$ ?
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口# |+ C) {% v# p2 Z
  13. $worker->onWorkerStart = function($worker)1 E1 x. m" b; e" }; i
  14. {7 }4 r9 M/ p% w9 m# V  u6 y
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    4 {8 z* }( ~+ `+ d3 D+ P. R& B
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    , `: T7 T! _. L9 P6 C
  17.     $inner_text_worker->onMessage = function($connection, $buffer)6 R+ l( D+ t% Z! |8 C7 R
  18.     {, u% v7 Q. `8 X! C0 T
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    : U; f, ^2 [9 L3 \2 c, M
  20.         $data = json_decode($buffer, true);" h/ w# `! O  Z% }+ W
  21.         $uid = $data['uid'];
    ( C4 A) p+ h$ i/ S* Q/ d
  22.         // 通过workerman,向uid的页面推送数据
    ; C3 X7 j( q; _& n0 o8 c
  23.         $ret = sendMessageByUid($uid, $buffer);
    + j' ^* r, |) k  Q( [0 t7 x
  24.         // 返回推送结果
    * F0 V8 r( L( c; g6 z& p
  25.         $connection->send($ret ? 'ok' : 'fail');: w. P* L  p# \3 C$ W  O
  26.     };/ X8 u/ s$ Z; L! E
  27.     // ## 执行监听 ##5 t! Y& \& T- ], \
  28.     $inner_text_worker->listen();
    * z/ I2 h$ P/ z( b- C* `
  29. };
      y/ U; }1 I" Y$ N& R7 G
  30. // 新增加一个属性,用来保存uid到connection的映射
    7 C  ?4 g' x! S+ g5 m1 s
  31. $worker->uidConnections = array();
    . P& F. j8 [* F- C7 b& H# L
  32. // 当有客户端发来消息时执行的回调函数
    # @. e1 y: X+ C7 X" z& \% o' C
  33. $worker->onMessage = function($connection, $data)
    $ h! P* v5 Z4 y, y( [6 ~8 ~5 }' \0 H
  34. {
    & l/ i8 V9 d5 k
  35.     global $worker;" c: J% Q% ?( D3 n+ S0 X- o* q
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    2 i$ y- G* B" Y" Z. e* a, [
  37.     if(!isset($connection->uid))6 C, |9 r+ X0 Z- z; N
  38.     {
    / N* X5 T2 e0 A# }6 c
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)9 U% K$ x+ b  V+ r+ K
  40.        $connection->uid = $data;
    ! p; U( o% L$ m$ ~% L
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,; [) E7 O) H: B7 w+ _6 q" n
  42.         * 实现针对特定uid推送数据
    1 P$ z/ [% {6 c4 ], ]0 p3 S4 H
  43.         */
    ( B) t+ K( t; \
  44.        $worker->uidConnections[$connection->uid] = $connection;
    , q3 C2 K* R- ~7 `; j; {! x. k
  45.        return;
    ' K+ c0 P. F2 o$ i2 x3 e
  46.     }& N% p; s( y/ ^/ }6 C% V3 b. X
  47. };# a. k& K" C5 ~6 t" t8 Z' _
  48. 7 ~9 b1 n; M' g. Z" t, G" G3 y0 X
  49. // 当有客户端连接断开时
    1 B2 z: f3 G! c( i$ {# n
  50. $worker->onClose = function($connection)& q7 ~( P2 l; }/ _) \" p6 q
  51. {
    & V* H. Y9 W! A% O; j
  52.     global $worker;: c& I8 m% |  L! F, R0 u+ i) H
  53.     if(isset($connection->uid))3 b& v& g" }8 Y
  54.     {
    & E( w- N7 v; k9 h
  55.         // 连接断开时删除映射4 y9 F* A; y4 l7 w% @# N
  56.         unset($worker->uidConnections[$connection->uid]);
    5 P- }2 j& R- X+ L' g
  57.     }' K6 J; k) Z0 x5 b0 z- k
  58. };
    # r9 Z" F% r4 G% ~7 P" V

  59. 8 m3 Y& y4 E- n! q( r
  60. // 向所有验证的用户推送数据/ P& ]3 P, Q6 E# }
  61. function broadcast($message): e' f' A: g2 K/ V; H# b
  62. {! \! J! X5 C8 ]% ~
  63.    global $worker;# A& O1 B: F0 v- Q1 u
  64.    foreach($worker->uidConnections as $connection): T5 f2 G+ n9 E; E
  65.    {
    9 P( h( x: }/ E& _/ `
  66.         $connection->send($message);  _$ m, I6 S, W9 ?4 a1 \  {  b
  67.    }
    & y; F7 [) J% `  s2 n
  68. }
      F- [7 y& J6 |2 j& h
  69. 6 m4 ]6 U( p/ ~7 ~
  70. // 针对uid推送数据1 [+ Q* z# ^& |3 a* p, R
  71. function sendMessageByUid($uid, $message): j) f; W- t8 L( {9 P$ i
  72. {" ~2 \. m& k( {% G) b! z- f
  73.     global $worker;
    9 Z. p( I, C8 O! y' `
  74.     if(isset($worker->uidConnections[$uid]))
    8 u. c1 A+ O2 R8 ?4 n# i
  75.     {
    ) d4 s& h5 u& m# m
  76.         $connection = $worker->uidConnections[$uid];
    9 c* l6 A- [* P$ L+ N
  77.         $connection->send($message);
    ( s0 P- n  p9 Q
  78.         return true;
    3 O, e7 i# n- v4 J* `; ]
  79.     }
    0 e* a6 j' T" Q
  80.     return false;- m7 d& G$ |/ \1 i0 d- B) o9 B4 \
  81. }
    . Y5 B# q0 p' q  ?# H; v
  82. 5 E1 ]9 a# P$ X0 M
  83. // 运行所有的worker4 d: g' d' H7 _3 R
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    ! s# G  ^0 n; `
  2. ws.onopen = function(){& A# w$ @! y6 x! s% x, Z
  3.     var uid = 'uid1';
    - o( E- J/ |. s# P& W
  4.     ws.send(uid);7 T2 k+ q( t+ Q! w4 b2 k0 ~' R
  5. };
    6 O, F+ w3 ~; s! K% w9 D
  6. ws.onmessage = function(e){# x5 e! H, L! l5 T
  7.     alert(e.data);0 ^: m7 @+ X- _& N
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    0 W+ b0 V$ b. v  q
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    2 [* x, [1 m9 ]: E9 q/ {( K' Y2 N! A
  3. // 推送的数据,包含uid字段,表示是给这个uid推送, U$ `8 m# K/ N" r% \5 Q
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');0 J, q5 Y- P* _4 q/ F0 t
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符( G$ M1 O& X& u% Z8 h
  6. fwrite($client, json_encode($data)."\n");% X" @6 g+ Q( x- A3 h& p
  7. // 读取推送结果
    8 l; z+ W! n4 t2 \8 x/ z
  8. echo fread($client, 8192);
复制代码
9 g/ A$ @9 u. s1 e/ X" L9 i

) j/ Z4 T3 ?* I7 E5 L+ _
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-6-19 23:58 , Processed in 0.095511 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!