您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10540|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;& o- v, T8 ?& `
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    6 e# G( h9 G. C6 p4 C) v( T: c' p: n

  3. / q) G& ?4 l9 O: k2 |
  4. $worker = new Worker();) E7 U& y5 x) x9 ^/ l
  5. // 4个进程
    2 h' d* y7 Y. v. _
  6. $worker->count = 4;
    4 w- k, f- e0 X1 `
  7. // 每个进程启动后在当前进程新增一个Worker监听* y) X/ m9 v2 y% T3 J8 Y6 Q. W3 |
  8. $worker->onWorkerStart = function($worker)
    7 H- j, a' b: O: O% X
  9. {
    - o3 @( c) R4 S0 O
  10.     /**
    0 V- o! e2 B( m5 U
  11.      * 4个进程启动的时候都创建2016端口的Worker
    7 V! G, h- g! d# D* {7 I; B. _
  12.      * 当执行到worker->listen()时会报Address already in use错误& k5 O. w3 ~! i$ P, r( ?8 t- L, }
  13.      * 如果worker->count=1则不会报错: }- G2 E+ \6 x7 s, k4 x2 b, N9 ]
  14.      */: H1 J& t2 n5 Q+ C5 E
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
      \. R6 I, ]0 ]: d
  16.     $inner_worker->onMessage = 'on_message';2 P+ d; d. h" U& h& \9 a3 x  x
  17.     // 执行监听。这里会报Address already in use错误
    $ }7 j% e2 T1 m* b0 X
  18.     $inner_worker->listen();
    , z& q$ t7 ^6 e, Q8 Z. A
  19. };' \& b+ O+ V( m: B, I
  20. 7 }: l8 K2 Q- r1 Y" S/ K+ a
  21. $worker->onMessage = 'on_message';
    ! Q3 x5 B& O+ i: F

  22. 5 p& Z" A7 w8 i! ]0 h
  23. function on_message($connection, $data)
    & L: z( h- d7 a7 R' L3 h5 z- u  ?
  24. {* k  W$ Y; g! P" @$ r" Y
  25.     $connection->send("hello\n");
    * i: b+ G0 X; C$ s& w
  26. }8 A7 q- z, o( N7 f
  27. , ^: y9 b7 I, ~! {* h4 @* R) D
  28. // 运行worker. b' ?' g' a6 i8 t& r7 }
  29. Worker::runAll();
    " m- _3 n. N! p" Y2 z, L1 _9 a
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:5 |, {7 `7 X# B8 L% Q& I  z

  31. 9 k0 p9 B9 v9 v# b
  32. use Workerman\Worker;8 u6 Q+ j2 r2 }3 Y
  33. require_once './Workerman/Autoloader.php';
    " g% B8 T% d) b8 S( H  p. G

  34. . [) o- r3 _' |
  35. $worker = new Worker('text://0.0.0.0:2015');
    : O' W+ P; T: c$ R3 E5 e
  36. // 4个进程) t. G2 l' E$ e1 t) b8 i
  37. $worker->count = 4;
    ! t* m1 w3 S/ |3 f
  38. // 每个进程启动后在当前进程新增一个Worker监听
    : W9 s: _+ z1 V% l1 b! c. _# L
  39. $worker->onWorkerStart = function($worker)
    + P% G2 F5 I" L; u7 R+ \
  40. {; s% T5 k* B/ [8 _9 C" e9 O9 j
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    % S/ p- h3 q7 r
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    . {. C, Q! n7 ?4 g$ Z: ]4 l
  43.     $inner_worker->reusePort = true;; ?. q6 D, j2 c) ]6 G
  44.     $inner_worker->onMessage = 'on_message';
    3 z8 Y1 P* w: ~/ b$ c, \  k+ h- }
  45.     // 执行监听。正常监听不会报错4 N# k- p+ E  D5 ]6 ]
  46.     $inner_worker->listen();! F8 |& C; u8 V/ e5 U$ H0 P4 f
  47. };
    & k9 f4 d$ `- r6 A( z# Z
  48. , q  A5 Y  n* }. k4 l
  49. $worker->onMessage = 'on_message';  |' Q! y. M  ]! l- `
  50. ) h% g8 j( C5 V
  51. function on_message($connection, $data)& v' F3 ?/ O  g, ]3 ]8 W
  52. {
    & u' A+ y0 _1 H1 t+ a
  53.     $connection->send("hello\n");8 T& V% O1 Y% R. u% M
  54. }
    , S; v3 K( f: M
  55. $ t9 w2 @" N. W& E" u! c" r( J
  56. // 运行worker' C& F' x+ O9 \( }2 H: j
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php. m6 a7 u" k5 i+ D* {1 y
  2. use Workerman\Worker;
    2 g4 l! X8 {* h, X& z
  3. require_once './Workerman/Autoloader.php';6 p; i. f9 _! O$ [" D' r7 S
  4. // 初始化一个worker容器,监听1234端口: N/ I7 D$ N; G, [; d$ v7 C$ k
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ( x+ C7 n3 ~2 Q7 q
  6. & [3 N4 ?( ?+ K2 q
  7. /*% d$ \; Y) b7 X5 d1 `  J/ H" S
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    9 f- @0 G$ M! h
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    ; S# e" W2 |* W9 {1 r4 w
  10. */
      F/ _  R7 w8 R: i8 n
  11. $worker->count = 1;
    ) a1 W- v* m' V, j
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    " N" e' o3 b, w5 q
  13. $worker->onWorkerStart = function($worker)* \: A( m" L' X. [8 o! d! `; W
  14. {
    % I  d% e5 `% E4 Z6 _
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符8 A  p% v- l6 w  Z  g! ]
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');5 U5 f- \9 I& g+ M! C
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    & V1 B) q& O/ S" c* _1 H  P. R
  18.     {0 r+ C$ H# ~8 g+ B2 w  z
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据3 C) }  e/ [. k& K
  20.         $data = json_decode($buffer, true);; K. x! u9 ?" f8 j3 u6 m# w1 ~, h
  21.         $uid = $data['uid'];+ O5 f; g4 m8 Q7 m+ P
  22.         // 通过workerman,向uid的页面推送数据1 d! S+ c9 R6 k! f& L/ e
  23.         $ret = sendMessageByUid($uid, $buffer);( }1 O; z) V1 _0 @& C. Y: {
  24.         // 返回推送结果2 n0 |8 D% t/ X9 i$ Y
  25.         $connection->send($ret ? 'ok' : 'fail');
    6 ^, I. l) A' T2 ^) a3 \
  26.     };
    ' k4 ~- }# s( I4 i  m
  27.     // ## 执行监听 ##
    . f1 T6 R0 g0 N+ R
  28.     $inner_text_worker->listen();
    * t. ^' ~' D# R+ y' y6 ^
  29. };, g3 p2 O0 g4 J) Y6 v( B: h
  30. // 新增加一个属性,用来保存uid到connection的映射- A& \3 c  r: w" _$ |
  31. $worker->uidConnections = array();8 ^5 o8 W7 B; z# u5 m
  32. // 当有客户端发来消息时执行的回调函数2 q& S9 J* b5 R' z  n( E
  33. $worker->onMessage = function($connection, $data)
    - W7 g: [! T  A1 Y; g* L0 l' Q$ D$ U
  34. {
    % K3 u8 z9 r$ V$ d0 C
  35.     global $worker;. B( i7 x2 A: ]$ {0 [1 n- V8 i
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    / p( V6 j8 K4 I7 z% c' t
  37.     if(!isset($connection->uid))+ [$ ?" x+ z& w
  38.     {
    0 q4 `: E! b' k" H6 t4 o; Q
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)& M; G6 J- p1 O: w3 U5 o
  40.        $connection->uid = $data;* C5 f! \9 N# c5 Q7 U- P3 u
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    , R% B4 o! g) P4 i/ e0 W9 N# x
  42.         * 实现针对特定uid推送数据1 p0 J6 v4 @4 c( j8 j9 j1 U
  43.         */  ~+ m. v$ w* h% B  o3 d  j5 X2 [
  44.        $worker->uidConnections[$connection->uid] = $connection;
    . @7 Q5 G( R2 w6 D, B+ x3 o5 R
  45.        return;# T# U8 Y1 [5 {/ Y
  46.     }4 |7 B- H  p' D4 L4 O! N
  47. };
    5 i- u$ L) V. s

  48. + y9 b8 V# p- F+ `9 Y* X! i0 K
  49. // 当有客户端连接断开时( u5 o2 r/ g/ q0 m
  50. $worker->onClose = function($connection)
    6 k  m/ ^8 \1 L1 j7 T4 B
  51. {
    2 [" Q* R- W# q$ m
  52.     global $worker;  s! }* y+ P! I& R6 j0 q- y
  53.     if(isset($connection->uid))! {" o* Z8 O, ]3 }- _) y1 b- N
  54.     {5 n( {0 q3 e. T& t, `. ^; G
  55.         // 连接断开时删除映射+ I* }# N1 y! q- m2 h5 f
  56.         unset($worker->uidConnections[$connection->uid]);
    # K0 `1 Q3 R, B8 R8 R, f. w
  57.     }
    ! c% \! U$ ^  O
  58. };
    ( k2 V6 s. Z* \5 j
  59. 9 x/ [2 s  t( r& G0 n
  60. // 向所有验证的用户推送数据
    % ?  m0 `2 m' F1 v
  61. function broadcast($message)
    1 v8 `% `# V4 i& G- V8 d
  62. {! i/ z3 A0 r# {6 d% S$ [+ u& u$ k
  63.    global $worker;' U, S3 d+ F" H% Z/ A. Y
  64.    foreach($worker->uidConnections as $connection)& h; A# l: D% T- ?1 U1 q
  65.    {
    / R# _0 ^5 f  {0 T/ u
  66.         $connection->send($message);$ }+ S7 }+ e6 v, b! o. r
  67.    }3 C; x" z6 ~7 X3 B
  68. }; R' e  p# _9 J: c2 f" ~
  69. ; N/ z5 ?" p; l+ G+ I1 ~# l' i
  70. // 针对uid推送数据
    ' E' H7 ^* D* P; [, E
  71. function sendMessageByUid($uid, $message)
    : y, p9 ^, K# Y+ L& m
  72. {6 o* {1 S5 m4 ]/ U* ]
  73.     global $worker;
    $ t2 ^* @: U+ H( {8 B, K# t' _
  74.     if(isset($worker->uidConnections[$uid]))8 W0 }+ t& E! G" f. g$ l( f$ A5 l
  75.     {
    / Z" v  S1 C7 ^% R
  76.         $connection = $worker->uidConnections[$uid];+ |% b" p5 p0 f
  77.         $connection->send($message);
    # w# z3 J& T1 N7 I
  78.         return true;1 K" b; E4 M6 Z: k+ M
  79.     }
    4 ]/ `( D# D7 O0 B  |
  80.     return false;
    ' y/ B; y; _! n# e6 r4 B
  81. }
    % a1 X) _) [$ V9 k& x

  82. ) B7 I3 g9 \0 e9 T( Q
  83. // 运行所有的worker
    ; f  d2 t0 ]6 c/ P! s# ~: Z
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');: C' e- x1 j/ }% u# W4 H* K" I! e0 k
  2. ws.onopen = function(){
    * M# R" {: k% z; s) ?- [) G
  3.     var uid = 'uid1';
    * m, i% q. N( F) e+ N" q
  4.     ws.send(uid);
    ; u6 ]5 i8 H9 @( s5 B& ^( R. W
  5. };$ x) [/ \* q8 h1 B: q( Z! ^
  6. ws.onmessage = function(e){2 Z1 i$ O1 s2 d) X
  7.     alert(e.data);
    2 x1 V. t" ]- l7 b( |
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口$ t. }9 P1 X2 m" L3 ~, o: i
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);0 {% p) V( y' h
  3. // 推送的数据,包含uid字段,表示是给这个uid推送( G. h6 S' R3 F5 y
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    , {. ]  V; A$ N- ~# A3 i  V0 e* w/ l/ z
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    * H9 V" \9 u! M
  6. fwrite($client, json_encode($data)."\n");6 w+ Z  f5 @; i1 u) _
  7. // 读取推送结果. L3 Y8 K  _9 Q& C+ b9 |
  8. echo fread($client, 8192);
复制代码
; ?6 i/ Z, w- B+ l, |4 ^
; `5 V0 g- e, k9 r
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-19 08:23 , Processed in 0.165646 second(s), 20 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!