您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14814|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;2 ~- j$ e& i  s/ J" e; P- [
  2. require_once __DIR__ . '/Workerman/Autoloader.php';4 f, |: b% r( v# K3 ^! X+ j
  3. 5 U- J, `4 h  |3 y
  4. $worker = new Worker();2 Y  O# @4 b  u% B+ f1 r
  5. // 4个进程
    ( J# P, H, V1 t
  6. $worker->count = 4;
    * e& `/ K  M- H2 S0 T: D
  7. // 每个进程启动后在当前进程新增一个Worker监听
      ]9 m( K; h/ @, H4 n2 D6 p) t
  8. $worker->onWorkerStart = function($worker)
    8 g, s4 X/ u1 w( g
  9. {
    8 o7 ^1 \4 z6 |: x1 e" L8 w, o
  10.     /**( N9 ?( M; m. t! {1 c3 N5 ]
  11.      * 4个进程启动的时候都创建2016端口的Worker
    9 b% l. X/ K: B6 h% `6 A
  12.      * 当执行到worker->listen()时会报Address already in use错误
    3 X* T" }; h% s: W, D0 I) u
  13.      * 如果worker->count=1则不会报错+ _4 i0 ]" F( |4 G
  14.      */% f2 {$ O( X$ H  V  O7 _
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    4 k# `# r" j$ M' p) A  a
  16.     $inner_worker->onMessage = 'on_message';: ?9 h# T6 E4 n# ?7 S& I
  17.     // 执行监听。这里会报Address already in use错误, m% d  r; C; l2 C1 K
  18.     $inner_worker->listen();. \0 ]  d4 q6 }/ W7 S" H# \
  19. };
      n" s4 I" {7 M' E7 O: w

  20. ! f4 d" e2 g" G* h1 A
  21. $worker->onMessage = 'on_message';
    & ?+ T4 j, s1 {: Q4 }! ?7 b8 \% n
  22. & o4 K. d1 X% D8 D/ G  r& m
  23. function on_message($connection, $data)6 ^9 s% p; ]6 {$ @5 _! E, t8 p
  24. {8 T( s: m7 S2 ~3 S5 I0 ~9 x% a
  25.     $connection->send("hello\n");
    $ X, @! |8 B4 F9 w) d: k4 R, I
  26. }0 E; K5 b) o' T4 H

  27. $ n6 J6 {/ @+ ^+ J/ {
  28. // 运行worker) @4 i/ x6 g( f9 r
  29. Worker::runAll();7 w& d1 G) K1 ]3 Q# C' E, l
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:  ]6 A& e( O% r& U) ]" T6 b6 C2 U
  31. ! h5 e4 V/ R3 M5 ?' T: o, Z# g
  32. use Workerman\Worker;
    6 i% }9 d7 |( Y& R6 E
  33. require_once './Workerman/Autoloader.php';
    6 u- l% k4 W* X  s0 ?

  34. * J. a3 ~8 u$ I$ P: w$ Y4 N
  35. $worker = new Worker('text://0.0.0.0:2015');
    2 g( z" u. [7 D. ^  Q3 a
  36. // 4个进程0 l# @8 x, z" F" g; z0 x' U
  37. $worker->count = 4;
    4 M1 B+ Z) J9 z7 A9 N/ N% V2 z
  38. // 每个进程启动后在当前进程新增一个Worker监听
    8 [; X% O/ @: n2 t
  39. $worker->onWorkerStart = function($worker), n. i  ^4 G1 [3 M" t7 K
  40. {
    + p" L3 L4 ^% f  N! Y& e
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');# k- G+ G0 E: ^3 @2 M+ ?! m$ v6 t
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    & h7 \0 X. D4 t0 |5 b
  43.     $inner_worker->reusePort = true;
    & _+ D( ]* c# V2 i: G6 R8 V, k
  44.     $inner_worker->onMessage = 'on_message';- Y3 a0 E! ]0 y. P
  45.     // 执行监听。正常监听不会报错- x$ Z$ \3 _. w
  46.     $inner_worker->listen();
    ( _0 Y* G9 X6 r4 ~$ `% p
  47. };
    . s! U" ^; U1 G8 h6 G- ]% V
  48. 5 O0 d: P0 r5 ^
  49. $worker->onMessage = 'on_message';6 Q9 D4 J. U* U! S/ B9 v: b  [

  50. 5 I  P' R, z$ y3 K4 u: l3 Z' X
  51. function on_message($connection, $data)
    + T6 n  v" @" @
  52. {3 O' M  z0 E9 C% }$ l* v" ^5 q6 W
  53.     $connection->send("hello\n");
    4 u( G* r$ ]/ w4 I7 q7 D
  54. }
    $ I1 G' p7 A- a$ M
  55. # L: b# M) f& p2 k0 y
  56. // 运行worker
    . d' h6 D  \9 |1 y
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php) V) {4 y: _. Q% D
  2. use Workerman\Worker;# j  h: ^' l8 [: t0 U9 |
  3. require_once './Workerman/Autoloader.php';
    5 ?* \6 G' o* _5 P2 b
  4. // 初始化一个worker容器,监听1234端口
    ) J7 i; a9 |8 Y
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    - j9 t7 X( ?6 c6 l* }+ S* N. t
  6. & ?8 t3 |1 r6 i( @; f2 x: Z4 E
  7. /*1 @) x! q. }% q1 R2 l
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误( Q! }' W" Z# G- }. S' q) v3 q' I% s
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    2 Z4 T) }# r% W/ v
  10. */
    7 G8 O) P1 Q. R2 P" c
  11. $worker->count = 1;; W0 p! G" f( l. B5 ]6 F- o
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    / q' t5 E  i. w  N( z" b# o
  13. $worker->onWorkerStart = function($worker)) V, m+ b6 [. h. h- g0 k1 v
  14. {/ h& A) b% E- s4 U
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    4 r! ]/ U4 y7 Y' Q' b+ S
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');6 h( I2 S8 S+ \; z& J: {7 I* n  T' p
  17.     $inner_text_worker->onMessage = function($connection, $buffer)2 W0 A2 _; ]7 X
  18.     {. ?* R  l) ^3 b& v% s% U' \
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据' k- e: o9 o; \) t/ A
  20.         $data = json_decode($buffer, true);
    8 n9 Y; B- a& ?( x+ [4 k& l
  21.         $uid = $data['uid'];
    ! F- z6 L$ R' ?3 \9 N9 Z
  22.         // 通过workerman,向uid的页面推送数据
    3 p  E5 B. w# t$ y8 {$ \0 ?" C
  23.         $ret = sendMessageByUid($uid, $buffer);) X2 z: S3 C, }6 x* [1 n0 B- ^
  24.         // 返回推送结果% A. C6 q9 E6 {
  25.         $connection->send($ret ? 'ok' : 'fail');( x9 h5 B$ f- M; v3 @
  26.     };2 D: l8 J0 x9 m  g5 H$ i
  27.     // ## 执行监听 ##
    , R# x. x7 V' \( @
  28.     $inner_text_worker->listen();
    ; E( I" |; t+ ^9 k3 L
  29. };
    ( P3 _! s% O2 Z- S8 l3 C+ m3 ~8 T. q9 G3 d
  30. // 新增加一个属性,用来保存uid到connection的映射
    % n, q9 O# e: L. a0 N. Y3 Y
  31. $worker->uidConnections = array();
    $ q+ H2 ?; I" s
  32. // 当有客户端发来消息时执行的回调函数! n& L0 g9 Q# H- N" c) [
  33. $worker->onMessage = function($connection, $data)2 [) K8 a4 O) a* ?
  34. {
    1 f. p5 r9 k0 y2 ^! V
  35.     global $worker;
    6 g* c4 g' T% \, ^: x8 p- S
  36.     // 判断当前客户端是否已经验证,既是否设置了uid# g5 S) C" C: h" Y
  37.     if(!isset($connection->uid))
    9 W3 v3 a$ ]3 R: x: |" V
  38.     {
    $ n# X! v- P5 w, @  A" M: M
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)* C7 D, r3 r0 f  q+ M
  40.        $connection->uid = $data;% O3 U9 T" e  j* t5 e
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,# P4 t  I! U& M# X* \
  42.         * 实现针对特定uid推送数据7 M: d: l, N, j; ?# U$ b
  43.         */
      I, `$ X( z, E7 E: o# L* C$ i# E, q
  44.        $worker->uidConnections[$connection->uid] = $connection;8 h7 U. F* S# f
  45.        return;/ V: G. {6 z# ^: J4 s! m- _( G
  46.     }
    ! f9 E, m' f$ e+ K2 x, f( ]
  47. };
    2 l& f) {3 M5 y0 o% Y+ {

  48. " H9 u' X3 O; W
  49. // 当有客户端连接断开时
    7 Q  R3 _" @7 Z. J) O+ m
  50. $worker->onClose = function($connection)1 k" t: X( u% J/ b- E, |
  51. {0 P# w" X, w1 n7 q$ A
  52.     global $worker;
    * j7 G9 a, C/ s$ u! R
  53.     if(isset($connection->uid))
    4 Z" O9 |% Y2 n3 q& g# `" ~
  54.     {& r1 W: |4 F9 g, Q: Q. q
  55.         // 连接断开时删除映射
    / \+ R* ^- c0 }) ^4 D$ f  L
  56.         unset($worker->uidConnections[$connection->uid]);
    & ~/ t) c- w$ g3 s
  57.     }  W5 k6 J8 y5 {# t
  58. };" N; T* I$ M+ X& R
  59. 8 r+ l: Q. L; s* y0 `
  60. // 向所有验证的用户推送数据
    4 j/ v% @0 R+ ]5 ~5 R
  61. function broadcast($message)& w! j$ k/ N" I: a2 v
  62. {) H" o9 X3 m  H+ w! B
  63.    global $worker;
    + X4 @  i5 u# [2 x; q/ d
  64.    foreach($worker->uidConnections as $connection)
    ' [% F, T9 `3 F- q/ ?
  65.    {
    ' E4 B  X( ]* C9 U+ R& q% s: Z
  66.         $connection->send($message);) f! a2 V; z2 g9 c
  67.    }
    5 S* A6 o' q0 W" o& D9 M3 {. ?6 r
  68. }6 a8 Z9 y* N1 X
  69. : Q$ v9 ^% ^2 s* o' ^, \! g" x% l
  70. // 针对uid推送数据
    % p0 s- l6 a. L. |& x
  71. function sendMessageByUid($uid, $message)( L( l/ \9 [3 P5 l
  72. {
    6 s/ ~1 ~. z* n. P, m6 B/ Q. x6 A
  73.     global $worker;* M! l5 M9 g2 ~  |- n! W
  74.     if(isset($worker->uidConnections[$uid]))' B1 y0 B2 Z( j
  75.     {( ]0 c' H6 U! a$ D6 u3 P
  76.         $connection = $worker->uidConnections[$uid];
    / D- o6 k" o5 U3 r2 Q( r; ]. ~8 V
  77.         $connection->send($message);8 D4 Z6 {  m$ D2 a2 Q. F- R2 Y
  78.         return true;8 X) h5 l+ \. c6 x$ s
  79.     }! J  _% o4 a" R% z
  80.     return false;
    , |, A0 A8 t/ z! T( W! c8 I, S
  81. }5 z( Q2 r, Y/ Z( ^2 z9 g# a
  82. $ H( k& E" D. ]& {) ^
  83. // 运行所有的worker0 F3 n" L: R' m! g5 f; K
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');, L' i3 N' q$ L+ j0 ~) h
  2. ws.onopen = function(){
    - g, |! P1 p$ ~- h" R
  3.     var uid = 'uid1';
    3 D$ Q/ h& [  B& C
  4.     ws.send(uid);% [9 q4 t( X% d3 C0 V; E
  5. };! |% \+ y3 F, \4 {% w7 U* y
  6. ws.onmessage = function(e){
    . {5 ]) b# @6 C- g0 }
  7.     alert(e.data);
    . y- {# `$ A" x4 T) a
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口3 o) l8 U  w7 x( e9 Z( h& k( |
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);0 D# b+ T- o; @- g: q
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    - e3 F$ ~6 |0 x( c
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');' |* G$ m0 f: \  r& d5 I
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符. E$ M/ _+ {  W# L4 C* ^
  6. fwrite($client, json_encode($data)."\n");) d) }% J; E- C: \8 J
  7. // 读取推送结果
    / q* R& s0 u" L% }5 {
  8. echo fread($client, 8192);
复制代码
3 Z5 X1 `/ M+ _9 G: r. @

% K+ g- C5 s$ z" [
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-17 20:18 , Processed in 0.062867 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!