您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10256|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;  o( G. u# q5 Y) O+ s
  2. require_once __DIR__ . '/Workerman/Autoloader.php';. A  V" L/ W+ @2 F6 z7 \

  3. ( d. u0 X; v. Q$ I2 c
  4. $worker = new Worker();
    # K* G$ [5 E, Y: j5 d7 {4 t
  5. // 4个进程& s- y0 _& m5 P; _4 O
  6. $worker->count = 4;
    / v2 F% M, U7 R6 M4 z: n* ~' ]$ A
  7. // 每个进程启动后在当前进程新增一个Worker监听
    6 W+ }2 X( W0 A6 a9 c4 h
  8. $worker->onWorkerStart = function($worker)
    1 X. M$ ~0 f7 p: G$ S8 Q
  9. {) [3 Z: ?( P3 D6 l1 `
  10.     /**
    " h4 f# b* l' @- K9 C, m5 ?, C
  11.      * 4个进程启动的时候都创建2016端口的Worker
    8 a/ q  z" ~) e! w/ |0 C
  12.      * 当执行到worker->listen()时会报Address already in use错误' I% a' V4 F+ r' p7 m. i$ m
  13.      * 如果worker->count=1则不会报错) T6 `& ?. d- O7 b% w
  14.      */
    - a) g' C( L, s
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ' }! t2 N( L6 X5 h3 R& l
  16.     $inner_worker->onMessage = 'on_message';
    8 f; d$ [7 u$ @" l" a* \0 t1 v
  17.     // 执行监听。这里会报Address already in use错误& ]. F$ c7 M! s$ s& \
  18.     $inner_worker->listen();
    , r$ P# K  j6 Y" n( z5 V+ T! _7 d0 f
  19. };6 o3 G9 k- N$ c. ]

  20. 9 a# c) S2 ^' V1 Y& {
  21. $worker->onMessage = 'on_message';; _8 E9 M/ O3 }$ ?9 S
  22.   e) s. W" M! J, r, v3 x9 T
  23. function on_message($connection, $data)2 q, n. T) H7 J4 p' A" v4 [
  24. {$ X5 \/ V2 L& H' c
  25.     $connection->send("hello\n");3 b6 u# o5 f# _6 R0 o1 m! `- s
  26. }/ D7 R" O; a/ S; Z/ U7 E. Q$ L

  27. # d8 \6 L; P2 n3 t' M$ [# {5 |7 n7 w: c7 t
  28. // 运行worker& C8 A; Q6 u" K" R5 B0 q, I7 Y
  29. Worker::runAll();9 x$ D" R8 q  N! A0 E5 P: s
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    3 c* |: ?6 Q( Q( ?; U! m
  31. ) P& V6 }+ g  @7 s# v
  32. use Workerman\Worker;
    ' B5 u* P% D& C; b# Z: K& @/ D
  33. require_once './Workerman/Autoloader.php';( H3 O. Q, q0 s/ L# R: K

  34. + V  e) B' a2 u
  35. $worker = new Worker('text://0.0.0.0:2015');
    ( ]" L) K  x' N2 |
  36. // 4个进程, a* s  t* y" Y: G
  37. $worker->count = 4;
    : u$ k2 d& y: _3 d. S$ X
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ) R- g2 F' S) i" I6 m
  39. $worker->onWorkerStart = function($worker)
    9 Y: ?5 b2 P, X3 h2 o! M+ u6 \( j
  40. {
    + j5 Z2 k5 Q% t5 _- X- |
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');" u" U3 G3 {  c. B! T
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    . l4 z! c* c" \' w0 w  N- e. S( e
  43.     $inner_worker->reusePort = true;8 @& Z8 _) S) I4 ?# j9 [) T
  44.     $inner_worker->onMessage = 'on_message';9 T, i5 ~: M& C2 N" `/ l. z
  45.     // 执行监听。正常监听不会报错
    - W" L6 P# T$ b' \) r) v# ~
  46.     $inner_worker->listen();
    $ q1 [8 o# y- ]+ o( ]' y
  47. };9 V) |4 n6 {$ k
  48. , z- B3 k; \0 w: Q; k* G
  49. $worker->onMessage = 'on_message';" c0 O6 Z. ?! T. w
  50. * U8 L1 e; d; ]
  51. function on_message($connection, $data), `1 U- i5 U, T1 o4 u+ H& K$ k
  52. {
    ' L: @# C6 E$ h% e) G3 X
  53.     $connection->send("hello\n");/ i6 p) [! n$ K* h8 `
  54. }
    * K; `  \/ `+ Y) M" J" X
  55. 9 |9 \, l& ^4 b7 ]6 O9 K
  56. // 运行worker: n$ N  m1 s+ V6 G' u) G3 G
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    7 z3 X8 y& e* O
  2. use Workerman\Worker;  I+ n4 Y; I3 |! U
  3. require_once './Workerman/Autoloader.php';1 Z8 c5 O* [" |" g
  4. // 初始化一个worker容器,监听1234端口
    - b+ J4 j4 v% \4 q
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    # m! m- g$ Y5 L0 F6 w; f
  6. 7 E1 c- j% c" F2 V) b; X5 i: O
  7. /*. |+ t, i6 D: l% {, a
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误5 `/ U* K" m1 j6 d) p
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    ) E$ h7 m8 a9 {  K+ \- b- [2 _
  10. */8 `! W, B8 M; ]2 G" A' g
  11. $worker->count = 1;
    3 T5 |7 u+ C8 B
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    1 U# h: W, w4 F9 ?* }7 J% t
  13. $worker->onWorkerStart = function($worker)
    . Y* C- w. g# \2 k
  14. {2 H0 q0 D1 Y2 m. B% r! _
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    $ n- S6 Z% B, q) G9 |
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');4 L+ ]+ I' h6 D3 U. Z. L
  17.     $inner_text_worker->onMessage = function($connection, $buffer)6 B" t6 i2 p9 }- _; L, ?4 U
  18.     {
    : y- B; z% ~) R
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    + M4 F5 _* @* Y9 [) Z& C
  20.         $data = json_decode($buffer, true);( ]* @; h/ I% m0 A( [( F
  21.         $uid = $data['uid'];# z0 b. [8 c1 T2 D$ F0 ~
  22.         // 通过workerman,向uid的页面推送数据# c- o3 ^/ J; i
  23.         $ret = sendMessageByUid($uid, $buffer);; d' l/ s7 O: Q% O" s: j. z
  24.         // 返回推送结果( d! F+ H6 i9 P0 O# Q* X# d
  25.         $connection->send($ret ? 'ok' : 'fail');) o. S# ]4 K/ E6 L+ c
  26.     };" O$ Y5 P/ }2 v9 w+ c( q
  27.     // ## 执行监听 ##2 g" f. o5 {2 ?4 f7 _$ m
  28.     $inner_text_worker->listen();6 i; {3 B" W/ I/ d: Q# L, @$ I
  29. };5 W5 d, L" ^2 o2 ~$ G( c, q
  30. // 新增加一个属性,用来保存uid到connection的映射: e0 z, f4 Z' T) {8 p3 l
  31. $worker->uidConnections = array();6 ?3 d4 v9 u! |3 |/ \  ]
  32. // 当有客户端发来消息时执行的回调函数
    $ q1 K& t) k3 O( x+ f' D
  33. $worker->onMessage = function($connection, $data)
    $ r, l* A, f% c- N& E
  34. {. {$ D4 y  z0 s) K. b6 _
  35.     global $worker;" \* {6 P* F& [' n1 G/ x
  36.     // 判断当前客户端是否已经验证,既是否设置了uid/ z  f( h+ H, R/ k* p* U
  37.     if(!isset($connection->uid))
    / z7 z1 K  ?) y6 O$ ~( O) f
  38.     {
    4 e1 y/ `2 P  J: B2 O6 j
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)* m* b% ]+ Q- Z7 H; e4 z
  40.        $connection->uid = $data;5 v6 d5 X2 ]* R
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,8 S3 k5 ]+ v* x' E
  42.         * 实现针对特定uid推送数据8 ?  `7 S& k, x# I- [( C
  43.         */
    8 r+ Z8 [4 \0 e( s$ X, r% N" c
  44.        $worker->uidConnections[$connection->uid] = $connection;
    ) s( W1 o" y# l, w& }' Z9 Z& i
  45.        return;9 N0 H* P# M8 D; I8 }( J
  46.     }
      E8 ]! L3 h8 i! q: ^7 c; I+ M
  47. };
    ; b# u& K) b- |) A8 H4 y) P2 l  |( A- R

  48. , B. \& ]: A2 y3 O7 P' {9 \
  49. // 当有客户端连接断开时0 z; [2 [! V8 {  h8 V5 y! D
  50. $worker->onClose = function($connection)3 y+ R5 f. O0 O+ g- K
  51. {2 c: r& I; N% W+ x
  52.     global $worker;0 X3 c7 r1 x3 \6 t
  53.     if(isset($connection->uid))
    3 t+ F7 H3 C0 |  I9 k. N$ H7 y
  54.     {& P/ y  f0 f' K( I" c
  55.         // 连接断开时删除映射" Z& f* M$ e6 {7 n0 W- ~
  56.         unset($worker->uidConnections[$connection->uid]);
    + Z) N& P; w; X8 T  z
  57.     }5 E0 z% r5 U% Q& d( r# p1 E
  58. };
    , s, `* A5 z) J3 _

  59. 4 T. M% q9 J+ A$ \
  60. // 向所有验证的用户推送数据) e$ L( y, j/ g. W( Z, j1 Q$ C* N
  61. function broadcast($message)9 Q* ~1 w6 a% g( L. q# W: W4 u
  62. {
    - q5 w7 C. M0 U. o
  63.    global $worker;
    2 i& m7 x4 {2 R6 I3 h
  64.    foreach($worker->uidConnections as $connection)
    ) W5 z) B# h. p  G/ ]) P
  65.    {0 s3 M7 Q5 J0 Z2 e
  66.         $connection->send($message);
    7 Z6 K& n" h3 U& A) d) Y! I
  67.    }
    * i2 t2 @& @: o( B5 o( V  `
  68. }! t, {7 c$ o7 p7 Z- ^" q
  69. ! d* m  T8 h* l0 F; N) j- G8 B
  70. // 针对uid推送数据- w' \  \/ e' q5 m& T
  71. function sendMessageByUid($uid, $message)
    , L8 y8 b4 ]) A: e4 x/ {3 o8 P
  72. {
    : v3 v8 M( l, E4 F- \# M: K
  73.     global $worker;. L( ]2 k7 u  ?& L6 z2 V* ]
  74.     if(isset($worker->uidConnections[$uid]))2 ?- L0 e; y# C8 |" z  E: s- q, F
  75.     {1 f5 W: X7 `+ z8 k, p+ \
  76.         $connection = $worker->uidConnections[$uid];$ S8 e( |3 e! \4 C) C. q
  77.         $connection->send($message);
    & m. p. Z. L2 b% ]& L
  78.         return true;* n* K& e) \, j1 `1 i! g
  79.     }  P, F! E- @! _4 ^% K+ B, |$ i& p
  80.     return false;9 ?5 ?! O9 O' f5 G
  81. }1 E# }- L; X2 v+ N, ^" `. r

  82. ( z& |# D9 x" N: p
  83. // 运行所有的worker) c3 q: O- |' x7 \( G2 K3 s- Y
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');7 G8 V. r" f3 J" G  j$ i( Y% y
  2. ws.onopen = function(){2 \  a- e) U" {$ K; U: {+ w
  3.     var uid = 'uid1';
    0 ]) w& {: Z" N% x8 b
  4.     ws.send(uid);
    - l! W5 D, D- o2 a) B- f
  5. };
    7 Y7 ]% p' `4 M0 x# ?% K
  6. ws.onmessage = function(e){
    : y; C7 e3 N3 c% J# r+ O+ x4 d
  7.     alert(e.data);: I; S5 P# r" Y5 M/ [% y. x6 D
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口9 e! C( ]" x  g1 E  h3 l: U- \6 B: ]
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    / j4 \0 o$ ]; n5 w
  3. // 推送的数据,包含uid字段,表示是给这个uid推送7 y  W5 M$ i# q+ Y  a
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    ( A- @6 T% M' q
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符  q- _& S5 J/ x; B4 I
  6. fwrite($client, json_encode($data)."\n");! e# V& S% _) ^* d' S
  7. // 读取推送结果
      A( L5 i. d' c$ j8 D$ o) E
  8. echo fread($client, 8192);
复制代码

2 e7 A5 t: M& _5 p* K& f& Y# u- o6 g1 E
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-3 09:54 , Processed in 0.153478 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!