您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12159|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;- ?0 B/ s5 U; d" S4 G- M# W' @
  2. require_once __DIR__ . '/Workerman/Autoloader.php';/ G2 l2 M# E. @1 V2 y

  3. & `% _, w' r& f8 [( k- S  s7 j
  4. $worker = new Worker();) n2 j1 }( r- b/ q" r
  5. // 4个进程
    1 e% ^6 @: Q3 i9 F# K
  6. $worker->count = 4;: Y4 I' c' q+ U; z$ C) y/ o- C; @
  7. // 每个进程启动后在当前进程新增一个Worker监听" D/ {! |6 Y8 k1 Y# n
  8. $worker->onWorkerStart = function($worker)5 G& B# x$ ]9 f4 z6 I* \. K% M
  9. {2 t* k, w+ |4 j) ]& Q
  10.     /**
    3 w% i  q% q" t1 Q, g! J5 ]; f
  11.      * 4个进程启动的时候都创建2016端口的Worker% Q8 G/ a1 e' k4 q
  12.      * 当执行到worker->listen()时会报Address already in use错误+ ?$ O3 p0 v" o" C) l- h
  13.      * 如果worker->count=1则不会报错
    # k; ~" w3 S! ~& F; T. K' r
  14.      */2 R0 I1 c* m( N9 |! p* |! k
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');0 L' v1 E7 K  r9 w; e' d0 }! y
  16.     $inner_worker->onMessage = 'on_message';( V4 j. e  Q! y
  17.     // 执行监听。这里会报Address already in use错误
    : M( Y7 Z6 p0 l9 e8 [- L/ F
  18.     $inner_worker->listen();
    ) t" \* }0 A' K, n. ], j9 a4 R+ U
  19. };
    / I2 I6 ^; N- x# _7 ~: ~) z
  20. ; \( r$ g9 @0 q5 s$ L
  21. $worker->onMessage = 'on_message';! Z/ @3 x; c& c( X

  22. 5 ?2 T* O% T' M+ {4 Y% i% T
  23. function on_message($connection, $data)2 J4 Z7 U# c+ m& ^% j% c& W
  24. {% Y% s9 }: `4 U3 v2 d' G
  25.     $connection->send("hello\n");) C' h8 E+ y! W
  26. }
    ' X# B) ]' l6 F! d$ I

  27. 4 f  V( @4 H+ l+ D3 S" a
  28. // 运行worker& T. H5 n+ u2 y" K
  29. Worker::runAll();
    ) ?& B1 {8 K. t( I) I6 {+ e2 A
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:9 `+ V# s* E. ]3 C' b; A
  31. - ]$ g, R0 Q8 E1 q$ m1 [' g
  32. use Workerman\Worker;  C' Y& e0 s) K3 c
  33. require_once './Workerman/Autoloader.php';
    ! |) ]! l; i+ u1 u3 L

  34. 2 J* Z" @. U) }9 U' I" U/ H
  35. $worker = new Worker('text://0.0.0.0:2015');
    5 P9 m5 g- ~3 @- _* _$ y
  36. // 4个进程
    ! t# U3 f) C* \8 z3 B: K4 L4 J1 }
  37. $worker->count = 4;! p/ }( p5 h) t6 Z# P
  38. // 每个进程启动后在当前进程新增一个Worker监听
    , o: g" e. y/ E% O* N5 V! m% Y
  39. $worker->onWorkerStart = function($worker)
    - u: r# a7 ~+ p4 L/ K, {: C
  40. {+ T/ T7 \0 z8 Q- {% d
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    7 z7 b( x6 s* E( ^% k
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)$ Y: q+ I/ `! V5 R7 n/ [
  43.     $inner_worker->reusePort = true;8 t2 c' B) t6 k2 P1 P, b! {
  44.     $inner_worker->onMessage = 'on_message';
    4 J) B; u1 r, A3 ?& B# z" \9 j
  45.     // 执行监听。正常监听不会报错
    + ~/ A+ q' m  F0 P2 d$ `
  46.     $inner_worker->listen();
    * e  v7 s& _; h5 ^, Q& Y
  47. };6 p$ E3 c9 A' e: K" w  K1 C/ F
  48. 9 a5 w" i  R8 o) `
  49. $worker->onMessage = 'on_message';  @0 ^/ d! r& A9 Z: @! S
  50. : G  G( L$ n2 a3 |
  51. function on_message($connection, $data)5 _' _' C! Y# @7 t9 `7 z
  52. {
    7 m7 U! Z' ^- f" A5 d/ c
  53.     $connection->send("hello\n");
    $ H  q* H- q, @# a' o0 y
  54. }
    3 i2 y5 {8 P$ I$ W0 P9 j, P! ^1 X
  55. 6 J  \. s" e: ?! B( x
  56. // 运行worker2 Q  v& U6 g# F1 e
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    # U1 Z. M% S) G+ K/ j$ w
  2. use Workerman\Worker;- I$ B, e* |8 d4 N
  3. require_once './Workerman/Autoloader.php';1 m* y/ F. g) W) j
  4. // 初始化一个worker容器,监听1234端口
    9 e* [. a0 R# m* H
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ' q9 A8 q  d. ?; |

  6. ! N7 ]1 Q9 m' m- r: \) X7 r$ ]8 f
  7. /*
    # O, x2 O$ q$ q$ D/ v6 s1 |; I" L
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误9 @% B1 W# L  R) i( y' H! d
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    3 m4 i5 `9 c6 o$ M! {# T
  10. */
    % {$ r8 |, K" P4 f! S& b) c" b$ R
  11. $worker->count = 1;
    + Y' t- i. q: W
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    # Z) I2 ?6 b' H4 }( n/ ]* a
  13. $worker->onWorkerStart = function($worker); I. K' q  Y/ h
  14. {; A, Z9 }1 y, f8 Y8 H
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符) @9 s  g2 d9 n/ C2 E5 e& ?: X
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');$ b# G$ I' g" ~: J9 f3 F5 u( B
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    # s7 H/ a9 z2 J3 n( [
  18.     {8 r5 m4 M# Z) @- f( o/ F4 {1 b  n) H
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    : R2 o& ]' M; R( x
  20.         $data = json_decode($buffer, true);
    " Q- E# ?0 E: q# r- G; K" J4 R
  21.         $uid = $data['uid'];
    " G9 E: L. |/ O! S. z
  22.         // 通过workerman,向uid的页面推送数据& S1 ~$ y' X7 {) T- L8 O
  23.         $ret = sendMessageByUid($uid, $buffer);
    ; ]9 n) y6 _6 A) n# x: p  K
  24.         // 返回推送结果
    1 [) E! U# L7 z6 ~' W) R
  25.         $connection->send($ret ? 'ok' : 'fail');
    3 x" Z: E* k' d$ h" K$ ]
  26.     };
    1 i* ?% x/ u$ ?- V7 K
  27.     // ## 执行监听 ##  N% S) B0 t- y8 c
  28.     $inner_text_worker->listen();
    . }# v  }& r+ l
  29. };
    9 B% z4 ^+ I# N
  30. // 新增加一个属性,用来保存uid到connection的映射5 j% m  G  n' m0 g( z& ?
  31. $worker->uidConnections = array();2 O! k/ i; K# b% E
  32. // 当有客户端发来消息时执行的回调函数
    & S# I7 @+ E" p; y' W% h0 n8 l) ?
  33. $worker->onMessage = function($connection, $data)
    9 P# G, [1 }+ h. _+ [
  34. {
    % F. n' P& H. Y; J2 m
  35.     global $worker;  z4 g" w1 \' t) ~( I0 c( J5 R* l
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    5 K! f6 V6 l  @: I3 A4 p0 v
  37.     if(!isset($connection->uid))
    6 A* V) c& w- F" w- X" B0 H
  38.     {5 N5 c" g! G0 l9 ^# X4 r: i
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证); ^9 p3 w  U# y# ]& M5 }
  40.        $connection->uid = $data;
    ) J" O6 k1 ^" q3 g. V+ o* b. a
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,) f4 }- \+ T. g6 v
  42.         * 实现针对特定uid推送数据
    , {/ I. W; u- N% I3 O+ L, p
  43.         */
    4 C0 n" N, r" [9 ^: ^
  44.        $worker->uidConnections[$connection->uid] = $connection;$ ?+ Z) I$ E, m0 Y4 w
  45.        return;6 o1 H7 [( c- U' p' ?
  46.     }
    9 c# [7 H$ n, \' q
  47. };: n. Q! T, Q7 Y* a5 W6 }% N- ?

  48. . M0 P; A* `7 b7 d- m, D
  49. // 当有客户端连接断开时
    ( a5 Z: D8 q* C4 K: l' l, d
  50. $worker->onClose = function($connection)
    - d! h, Z, A& v. b, @7 U2 S) I$ L
  51. {
    0 T1 w; |* n) V5 u
  52.     global $worker;
    * j* m1 X8 p( }
  53.     if(isset($connection->uid))
    ' Z3 N5 k+ P- j
  54.     {. p* L5 C$ }; z" p
  55.         // 连接断开时删除映射
    ; Z) m' }5 H/ `) m6 p7 x) M
  56.         unset($worker->uidConnections[$connection->uid]);
    2 b4 c5 \8 P' ~: O! G$ Q: W
  57.     }& Q- j! F8 m0 v  ^4 s/ P, n
  58. };, B2 K9 W8 u8 i0 o! Y6 w
  59. ( w6 ]5 s- N4 B' t
  60. // 向所有验证的用户推送数据5 Q7 k: G$ V% r
  61. function broadcast($message)
    9 F" {% x( v0 b) n  F8 Y. v
  62. {- }& t  w  k5 ?7 ]
  63.    global $worker;
    ) p) I5 `! j" q/ w0 [
  64.    foreach($worker->uidConnections as $connection)
    ! B2 G6 m6 W1 {- h& Q2 O& l* \
  65.    {
    0 U; m7 n9 a1 C( k3 ^; L
  66.         $connection->send($message);
    / m' w* ~' U1 i% f6 q# S
  67.    }
    7 ^% R7 x/ j  j# k( q+ s
  68. }
    + U5 L  {7 Y1 k/ X, Z
  69. ( l4 S$ T" Z7 N* S& e
  70. // 针对uid推送数据
    7 ]. |( s5 F- f" _' U: `4 L) l8 j
  71. function sendMessageByUid($uid, $message)
    6 i! o7 B9 ]  i
  72. {
    ( w3 J& K2 g# @! Y8 X
  73.     global $worker;
    4 k; _$ [% c6 c
  74.     if(isset($worker->uidConnections[$uid]))/ t- Q9 |' M5 s9 a: l
  75.     {
    ! j. t) C$ [- h5 w9 V" C
  76.         $connection = $worker->uidConnections[$uid];
    + n$ T7 W" i. }+ G
  77.         $connection->send($message);0 p- _2 f7 B# F" w: k5 R
  78.         return true;
    ! v8 g2 y, k) P7 g
  79.     }) p5 a6 l7 U3 H
  80.     return false;3 A6 m( E& Y' I' D
  81. }
    ' z% }4 I" t8 G

  82. , F9 A) _/ g8 h+ J( b
  83. // 运行所有的worker8 d7 }+ q9 B5 V4 \# W" l6 A& P
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    , s, Q  x/ m* \9 S/ Q. a
  2. ws.onopen = function(){
    7 o( K2 {" }8 s. q! v6 W3 {; M
  3.     var uid = 'uid1';1 r4 o) N, `/ Z( [, u  b& S
  4.     ws.send(uid);
    4 A9 ~, z5 ^/ B3 r$ k
  5. };* |+ G2 e( i: k1 X6 e5 s
  6. ws.onmessage = function(e){
    $ r3 F& m9 Z! l6 Z$ h$ |
  7.     alert(e.data);+ ], i' h0 D; F( p# u$ Z- Y
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    $ _  C; w5 h' X/ w
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);# G9 M. c% u, G
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    / c. B" W) j0 F9 D1 i: F9 f* c3 ^$ Z
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');2 D9 D. F/ W; T& p4 n
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ' L: U4 s; N2 l: f  h
  6. fwrite($client, json_encode($data)."\n");& O" C3 ~- g( f/ D7 c
  7. // 读取推送结果: [) r! n; Y* j4 u
  8. echo fread($client, 8192);
复制代码
( v% e4 k# l) F7 s8 f: K

, M5 g% b; N; r2 {
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 16:47 , Processed in 0.108330 second(s), 20 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!