您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14658|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;' n1 S% s4 B" p" H+ [
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    1 d/ O& h, j: n8 }* y  U& L

  3. 6 `, G7 U+ l$ N9 w* I  b4 |
  4. $worker = new Worker();* I' w' m4 r4 ?
  5. // 4个进程
    1 N" I  @2 J0 H9 E0 `
  6. $worker->count = 4;! F& |& I7 L( L6 L% ]. y
  7. // 每个进程启动后在当前进程新增一个Worker监听
    ' L2 F( k' B# v/ D0 k! c' ~) c
  8. $worker->onWorkerStart = function($worker)
    2 B0 n$ e% s4 z6 F( Y/ ]
  9. {+ X  B  P: l# _4 S
  10.     /**
    4 y% o( \% n/ @% _' J2 \' X0 M+ p( n
  11.      * 4个进程启动的时候都创建2016端口的Worker9 e4 F6 p6 c# u* c
  12.      * 当执行到worker->listen()时会报Address already in use错误; T7 `- G. P! |7 \$ w, H
  13.      * 如果worker->count=1则不会报错# V5 w) S: c3 @
  14.      */! @7 g: h. R% `6 ?7 F( l5 ]" B
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');! N* G& [" D1 ~  [: R
  16.     $inner_worker->onMessage = 'on_message';" S* J2 |" S0 p7 \9 O
  17.     // 执行监听。这里会报Address already in use错误- V. s: ?- x6 z
  18.     $inner_worker->listen();
    ( ?. e, p' v$ X; G2 J( K
  19. };
    4 _1 e3 F* t% z

  20. * B) z0 L# j7 s$ e% ^6 ]) k
  21. $worker->onMessage = 'on_message';: B- W" i, B4 n, x' Q5 j8 \2 x
  22. , \$ x) w, m# L% O
  23. function on_message($connection, $data)
    : ~, L7 T# M& i, t$ z/ W
  24. {
    ! _* U$ Q/ ^* R1 j  ^
  25.     $connection->send("hello\n");
    ' X# j! @# p+ _, j
  26. }0 v* w% L3 |! s  ^$ R
  27. - Q, i4 s: E& |
  28. // 运行worker- I1 b8 V" j! G5 N' L6 q
  29. Worker::runAll();
    8 y% _: g' t# u% s9 _+ m  w
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    ' v  }. `0 x4 t7 t

  31. " j/ j. q$ x3 N. P
  32. use Workerman\Worker;5 J0 f8 |* d" I0 l
  33. require_once './Workerman/Autoloader.php';
    ) D* W) n) o; G

  34. & @* H( X) @9 E/ H
  35. $worker = new Worker('text://0.0.0.0:2015');
    ( N# Q  `5 c1 V# }6 s5 T
  36. // 4个进程- t! E3 ?/ ?5 K1 ~' T' i
  37. $worker->count = 4;) L7 a& t$ v3 ]1 ~5 Q
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ' h) N; u$ U: z
  39. $worker->onWorkerStart = function($worker)
    9 L* S5 a; N$ I7 R% |3 n
  40. {
    $ v7 p0 P6 f9 }; b
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');1 P, B9 p- P$ d0 E. S4 G. C. C! i6 O
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)2 [2 m3 h8 y& ~+ F" C+ q! v
  43.     $inner_worker->reusePort = true;9 x2 @& C4 ^) I0 S5 d' y5 a/ Z: Y' U
  44.     $inner_worker->onMessage = 'on_message';
    6 ]8 ~" F2 [" Z
  45.     // 执行监听。正常监听不会报错
    5 U  `5 O% A- O5 Z( Y0 w1 @
  46.     $inner_worker->listen();
    " M- @( G/ v4 [4 I2 j
  47. };) S9 o; y: {+ I
  48. " k) l; K! V" e; c7 w
  49. $worker->onMessage = 'on_message';
    % }: \* R. X  q7 C) `; l

  50. $ T4 ?, c; ~% f# ?, j5 t
  51. function on_message($connection, $data), s, c$ B6 `# D
  52. {
    1 K6 \7 ^) U7 ?7 @
  53.     $connection->send("hello\n");
    ! `" I4 O8 p! s9 a
  54. }, D9 Y% ?8 r" r* L, k1 W

  55. 6 V9 ~  v. T% S- E: y
  56. // 运行worker
    , e1 W  U1 z0 R3 b
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    * q& N4 G! K$ k) S
  2. use Workerman\Worker;
    * z: o8 P$ t) C% K5 t4 w1 j
  3. require_once './Workerman/Autoloader.php';
    2 n* x* Z2 T/ K2 U( x+ k
  4. // 初始化一个worker容器,监听1234端口* `) C+ J, e) i9 t( a
  5. $worker = new Worker('websocket://0.0.0.0:1234');' q. E! t  V" E

  6. % i; ?1 F$ T; C$ i" M
  7. /*
    % r, K6 z" G( Y( C
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    3 z% ?  F  U" K
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    / n6 C: ^6 c+ M* {
  10. */3 [: |1 h, ]4 O( `  `4 G2 h
  11. $worker->count = 1;7 A0 C0 l" P5 n" o6 i8 f
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    ! m; p! Q5 [  e. J
  13. $worker->onWorkerStart = function($worker)
    . E) f2 h# N: i1 Q) e1 D& B
  14. {8 E: g& ~3 F5 ?4 U
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符6 z6 ^/ @% j  e8 H- f9 }" U' D* O
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    , g  m3 I6 e! U9 @. @9 ^
  17.     $inner_text_worker->onMessage = function($connection, $buffer): O1 W! U4 ^9 J3 `7 K$ n
  18.     {, \. l# ~" K* e+ b
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据8 O. O8 {1 j' q/ Z; G4 l' `  u
  20.         $data = json_decode($buffer, true);
    % r1 C  Z' J+ F8 r- N
  21.         $uid = $data['uid'];: `8 P, C  l* x
  22.         // 通过workerman,向uid的页面推送数据
      E3 u* r" ^; ~( e2 G
  23.         $ret = sendMessageByUid($uid, $buffer);
    # U: [. ^# \: n- ], B
  24.         // 返回推送结果, [! g6 i7 E+ g- N6 y
  25.         $connection->send($ret ? 'ok' : 'fail');0 H: q) U8 X; U  G: ]
  26.     };
    2 V4 G1 H9 ~( b, {1 I
  27.     // ## 执行监听 ##0 s5 U, I$ H) w& X* Z0 ^
  28.     $inner_text_worker->listen();
    7 ^8 Y) L' Y" ^/ @; D; ?6 M
  29. };" Z( f1 g# q$ W# [" ]1 ?
  30. // 新增加一个属性,用来保存uid到connection的映射0 D' ]" Q$ s: E# K
  31. $worker->uidConnections = array();. l/ `$ u  u$ J, l2 x4 A, t
  32. // 当有客户端发来消息时执行的回调函数
    : D5 O* R$ a: Q$ b& w1 O; `# K3 k$ Q
  33. $worker->onMessage = function($connection, $data)  k1 U" {  R# D" X- v/ t% w
  34. {
    3 T1 }, b- _. E) @8 |. D
  35.     global $worker;; S5 Q) x8 m7 |: }& y4 J1 y
  36.     // 判断当前客户端是否已经验证,既是否设置了uid  N: \% B7 \* A
  37.     if(!isset($connection->uid))
    * ?( y6 W+ G0 V5 ?' Z0 a+ p/ Q3 C
  38.     {
    , }7 ^$ J( S- Y4 C4 K% \
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)6 B, B$ k& z8 }) O7 m
  40.        $connection->uid = $data;
    ! x2 p- L! Q' R- {
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    7 ?$ ~& A5 r5 `' Y3 Z! k! s
  42.         * 实现针对特定uid推送数据
    5 e3 R5 H% m' _% E. @. b
  43.         */1 P7 Z" ?8 F4 |* D2 U
  44.        $worker->uidConnections[$connection->uid] = $connection;
    - Z+ w5 r5 m2 s& o+ U* I
  45.        return;; a- e% m9 S) \8 d' {+ w
  46.     }4 ^2 J. ?. a% }5 x& c
  47. };* T$ D- a* r6 [8 p  X

  48. , B8 T6 [5 _5 C& H5 ]( G
  49. // 当有客户端连接断开时
    % b1 g9 t$ [6 v  q. `) D7 ^4 M( R0 O7 j
  50. $worker->onClose = function($connection)5 B+ ?) n9 ^) J  T& P4 p* K6 x* p
  51. {8 H+ z- `1 n, ^) u6 F
  52.     global $worker;
      \2 r4 B; ~* e& Z
  53.     if(isset($connection->uid))) M  j9 k3 w& v6 r
  54.     {- [3 I2 ]9 Z$ I1 c
  55.         // 连接断开时删除映射( E. F; v9 X+ n
  56.         unset($worker->uidConnections[$connection->uid]);9 m- I6 W3 ?2 I* @- F9 p9 o
  57.     }# s( I& f4 P  |- i8 }9 w
  58. };$ l0 l' t; K* K
  59. + j, y. a: V" C) A/ T
  60. // 向所有验证的用户推送数据
    , a( Q# i. Q( ~2 q2 `& F" B2 [+ t
  61. function broadcast($message)
    5 y( n2 o5 a& r
  62. {# U% i  P/ i4 R& y% v0 }
  63.    global $worker;
    $ y9 y& @1 V/ R1 N
  64.    foreach($worker->uidConnections as $connection)
    9 P  r2 x* R  [
  65.    {) V- _3 Z: Z: W  v$ ^6 l2 ~# k: e- i8 E! h
  66.         $connection->send($message);, X, S  v8 d, z
  67.    }
    % y* W' T' x- _+ o
  68. }! ^& c9 q; o2 K/ d% ?0 J
  69. / F( k: H2 K: d  X2 w
  70. // 针对uid推送数据
    + ?: h! ^' u3 T3 ~9 D1 N2 t+ g
  71. function sendMessageByUid($uid, $message)
    ; |7 H0 j5 U, C
  72. {$ w& b" |9 p7 @% d3 P: ?! ?( p
  73.     global $worker;+ h* ^( r4 ?# L+ V! w) u) H  Q
  74.     if(isset($worker->uidConnections[$uid]))
    ; i6 @% J. K6 q$ a+ A1 D" p
  75.     {
    * \* R6 @$ T/ z& t
  76.         $connection = $worker->uidConnections[$uid];
    ) h$ v" W8 N( T; C
  77.         $connection->send($message);5 v' q: p( a' p
  78.         return true;6 D! X9 H! H* k
  79.     }; R3 F5 E( Z, P" w+ m
  80.     return false;% `& ^- m9 g- i& B% M" C. b
  81. }+ Q! k% \) r# [; t2 p$ W  N

  82. $ j  v! C7 h2 o% }7 m/ ]
  83. // 运行所有的worker
    " B$ X7 t# d$ @* v0 u! S
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');5 {9 r  d8 \3 c8 @
  2. ws.onopen = function(){% Y2 }' f7 |+ q9 v( b
  3.     var uid = 'uid1';
    1 F' T2 f& V+ T! O7 e8 [9 {
  4.     ws.send(uid);& S, C/ y  `- S3 p; A/ K# Z. i
  5. };
    & \; _* \- b7 j. _# m# i
  6. ws.onmessage = function(e){$ v# o' R# i7 G0 l9 A9 I3 C
  7.     alert(e.data);$ W6 T& ^4 [0 p2 D
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    # {: u1 x/ n8 f- L& e# Z
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);3 ?6 @5 K& r1 a# W0 N
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    ; o% h9 j2 ]  ^* v1 K
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    4 o) g. d6 p+ d2 b& m' }
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ J0 D/ ~* j( v8 ~& T8 ^
  6. fwrite($client, json_encode($data)."\n");  _0 i/ L- K, I) q
  7. // 读取推送结果; s& I* H9 \$ y4 l
  8. echo fread($client, 8192);
复制代码
, i) U% j4 _1 E0 f% {

& Z* X, `, u3 v1 [- m
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-1-30 11:36 , Processed in 0.047636 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!