您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10425|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    0 S6 Y2 l* q2 H7 _
  2. require_once __DIR__ . '/Workerman/Autoloader.php';7 A9 N+ k; j7 I: U4 n# a: c

  3. 5 Z" D/ v; S7 |9 j: `
  4. $worker = new Worker();
    , z% C# ~: s+ Q, n1 Y
  5. // 4个进程
    ; L. S5 ]& q4 c1 d
  6. $worker->count = 4;
    6 U; @6 ~7 W# m2 V- Y0 @4 i
  7. // 每个进程启动后在当前进程新增一个Worker监听( B+ x6 T1 B( G2 [6 A% e' M( T
  8. $worker->onWorkerStart = function($worker)0 s! \# i- m: s3 Z
  9. {
    3 y" d' H  f2 F$ Q( y& V3 S  y  `
  10.     /**1 M+ n0 H) P8 x) Q" v  ?
  11.      * 4个进程启动的时候都创建2016端口的Worker
      \. h! W1 a6 Y) I
  12.      * 当执行到worker->listen()时会报Address already in use错误
    * |+ o- `2 S+ \, C# R' @
  13.      * 如果worker->count=1则不会报错
    4 H& x# [& w' |" E& M
  14.      */
    ) u/ t. t$ T! G( U8 K" J
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');" L  a, ^& P/ s" J
  16.     $inner_worker->onMessage = 'on_message';
    1 v$ m, b  a( l0 j5 |/ W8 d0 ^
  17.     // 执行监听。这里会报Address already in use错误4 m- Z; [8 P$ i) d8 \0 S* X. `
  18.     $inner_worker->listen();  ^; Q2 T/ I6 q/ {
  19. };
    / O& ]& B  E* L" [+ U! X" O4 m

  20. , M$ u  ~* A+ u1 ?, a
  21. $worker->onMessage = 'on_message';6 _' Q' A* }7 t3 y- I0 j' Q

  22. ! @/ ]# R$ X3 U6 k
  23. function on_message($connection, $data)2 ^9 b- k2 c. k% F6 A% O
  24. {
    3 N: B/ v' f9 X" Q( y1 G) w3 b
  25.     $connection->send("hello\n");
    0 n$ t& E* z  O. n$ T$ i/ `
  26. }
    8 L& {8 G+ H: s2 [9 r( m( ^9 r
  27. : n4 q8 Y2 P2 u+ \* R# k8 }
  28. // 运行worker- K0 ?. S: z- K! [
  29. Worker::runAll();
    ; t% z2 c1 S" o5 n
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:- S$ |( j# a8 ?  E* A2 x$ s. s/ C
  31. 4 j. T/ q8 v4 e" s1 w
  32. use Workerman\Worker;
    " \! |: ]7 X8 \  x4 w. Z4 C
  33. require_once './Workerman/Autoloader.php';
    7 n6 R( c- G) a$ j8 b" A3 c' U
  34. 0 K5 \( I6 K0 w- x$ x' _
  35. $worker = new Worker('text://0.0.0.0:2015');0 D4 A9 E8 s5 A( p  q2 F" y
  36. // 4个进程5 H% S( X( F  L, ?
  37. $worker->count = 4;7 x# c+ L' k& p2 g5 ]
  38. // 每个进程启动后在当前进程新增一个Worker监听% @/ U8 h0 D2 f7 A" |9 ~
  39. $worker->onWorkerStart = function($worker)
    ' ~/ v! ~4 a: V2 d
  40. {2 n% i8 [8 f+ t2 Q- Z3 R, G
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    , G7 z$ q. p% q8 m) Z  h0 ?
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    0 E; h' G5 i% n6 k+ ~2 I8 z8 D
  43.     $inner_worker->reusePort = true;9 n" z+ o4 d+ Z" S: w2 s
  44.     $inner_worker->onMessage = 'on_message';
    1 Q, M% E: k& r# @% b" y
  45.     // 执行监听。正常监听不会报错
    1 S/ }5 Y/ X  v+ ]+ r" L
  46.     $inner_worker->listen();
    , l  a0 W" H+ r/ c3 a
  47. };: g  w. U- L3 H% @* p3 c& h
  48. " d, G! w4 |( Y) S; C8 E" A; T
  49. $worker->onMessage = 'on_message';( L8 D2 \' e( e5 t  V

  50. # Y, |" I# b/ `1 S2 ^# h8 b
  51. function on_message($connection, $data)- O! f& g) Z) [
  52. {' d) q' P% \( a- B- h. Q
  53.     $connection->send("hello\n");
    9 [. n* W8 g. n; J9 [/ l# [
  54. }
    % z" u/ Q- W3 E! \. D; t

  55. ( E+ _4 y' B/ B3 K' \
  56. // 运行worker/ L7 E& Z- k! z% X4 O
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php- `) M/ L! t) \& ^, G9 `# V
  2. use Workerman\Worker;
    $ M$ k* B8 a9 q; S) o% B$ Y, G1 J
  3. require_once './Workerman/Autoloader.php';
    ; n1 c/ ~+ L' z/ |
  4. // 初始化一个worker容器,监听1234端口
    1 q9 q9 a3 o- H- M) ?4 W
  5. $worker = new Worker('websocket://0.0.0.0:1234');
      q2 H+ v" w) u- h; a9 f. v8 `

  6. & G8 w" b9 r" n) @7 X
  7. /*
    8 m" n8 C, v6 l, D
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误2 @6 l) T2 P5 n
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)7 T8 ~  t, @! O  X
  10. */
    - K7 k+ M/ n$ X( ]& l" v3 _# L. b# K3 A
  11. $worker->count = 1;
    ) c7 K8 K4 H( [- y+ }+ P! p
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口) F7 t; O* G5 e
  13. $worker->onWorkerStart = function($worker)
    $ s& U& s: A% p1 l, j
  14. {
    ; C0 E: ]5 M  `: {
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    # U6 r/ ^" b6 d1 h2 C
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');' g! D. X. y7 J( u1 e8 J  Q9 l
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    ) Z: `9 ^4 H* _0 ^6 j( C
  18.     {$ _- k- g% @+ j# n/ v
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据, T7 ]: R' D8 D9 K. m
  20.         $data = json_decode($buffer, true);
    " y6 }  f+ _( Y. T$ U" E
  21.         $uid = $data['uid'];
    1 y' ?9 q  j, R' z3 v7 s& g4 Q
  22.         // 通过workerman,向uid的页面推送数据0 c2 F8 B, F1 V! C, X1 k
  23.         $ret = sendMessageByUid($uid, $buffer);1 H" G2 Z- S1 m9 I" c
  24.         // 返回推送结果
    - j& y- l! A& L, _
  25.         $connection->send($ret ? 'ok' : 'fail');
    4 W% x" a! p$ T1 D0 H; T: |
  26.     };
    # I3 {  B; M7 H/ M
  27.     // ## 执行监听 ##
    ! w  Y5 k* j7 o4 C  ?7 B; w- A% A3 y
  28.     $inner_text_worker->listen();
    7 l/ \0 c& K- S% Y3 l
  29. };: P8 k+ v. ~5 T8 M+ F2 e3 M: Y
  30. // 新增加一个属性,用来保存uid到connection的映射
    - q* t6 O3 K+ G9 z6 j8 o0 c5 x
  31. $worker->uidConnections = array();- {2 |! ~7 x* w- n4 x# x
  32. // 当有客户端发来消息时执行的回调函数9 r' Q3 b' h1 ^: \+ |/ \; J9 m
  33. $worker->onMessage = function($connection, $data)
    $ k) S3 J* h1 E# R1 s1 _0 l" u3 V
  34. {5 X$ }- v  u- S$ e6 s. N( m* A
  35.     global $worker;2 t8 l$ n) h$ Q* `; G6 @
  36.     // 判断当前客户端是否已经验证,既是否设置了uid7 a/ r+ P# y: e: k6 V2 I
  37.     if(!isset($connection->uid))
    : _* l) g" r$ U2 u% E
  38.     {, d& E* b' q, m& r; g& p
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    : `; Y; `8 e  k' C/ r
  40.        $connection->uid = $data;
    8 ]$ q2 K& C, O6 q3 G) c/ S* d4 c
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    ) h1 R& H8 Y& T" h6 Q
  42.         * 实现针对特定uid推送数据
      V3 K- b% Y( f7 K5 a" p4 o
  43.         */: z/ p( \$ a8 q! w$ e) S
  44.        $worker->uidConnections[$connection->uid] = $connection;
    1 B6 O- V" k8 H* k
  45.        return;
    4 T0 k' A# o0 T& Y
  46.     }) B$ t8 @+ |% U# a" S  U
  47. };9 R! R8 W) q& ?! `

  48. 6 K3 o9 _( o+ e- R& l" o: I& E1 @/ I
  49. // 当有客户端连接断开时5 k, v+ a7 \' P& s$ L$ ~
  50. $worker->onClose = function($connection)9 a* {0 m! B4 L+ V  `6 ^
  51. {0 @2 E9 {1 z& Q
  52.     global $worker;
    0 [8 e3 o% I9 V5 x1 B! G
  53.     if(isset($connection->uid))2 h3 U8 y$ n4 K$ E+ Q
  54.     {. Z4 [- {9 m/ @! B' Q9 V; a9 G
  55.         // 连接断开时删除映射
    & A/ @% u% s! R4 I3 w1 o% Q! ]  E, I
  56.         unset($worker->uidConnections[$connection->uid]);! H: v; \9 `% c9 B- D' V
  57.     }. Q- [9 [  X  h* R# o/ j
  58. };
    2 v5 B, R! t1 ~  C

  59. - t* Q$ K: O' }) |$ w
  60. // 向所有验证的用户推送数据
    6 O( ~& K/ d6 U6 J1 ^$ x8 r9 K
  61. function broadcast($message), h3 R" a" x" R; V
  62. {
    2 R& v6 s( J# e9 A8 p
  63.    global $worker;  V5 r+ D; K9 r
  64.    foreach($worker->uidConnections as $connection)! N5 l" b5 c6 c7 o( }8 w. o. e2 A9 {" W
  65.    {
    ( B7 ]: S) Z4 T9 n$ ^4 C
  66.         $connection->send($message);
      E8 V, G. d# l
  67.    }; u, f( U- y  _  @0 U
  68. }- s/ _6 @8 g6 v; t, Z

  69.   O* J) E0 l& \! ?  q. y% s
  70. // 针对uid推送数据; e0 @3 r& h2 _+ m( X
  71. function sendMessageByUid($uid, $message)
    * M' b% [+ q, Z) Z  U
  72. {
    3 t9 Q& ^+ o+ N! E1 _
  73.     global $worker;
    ( Q# K$ p+ ]/ w9 O
  74.     if(isset($worker->uidConnections[$uid]))
    1 {' x* x+ x3 k6 W7 y
  75.     {
    * N  }# `8 v7 E4 h9 b4 g
  76.         $connection = $worker->uidConnections[$uid];
    6 m+ S0 R2 \- z# K- S
  77.         $connection->send($message);
    5 R! _) m; ]) l% I0 ^- q  U$ ?
  78.         return true;
    4 }+ d% `2 M% F! }7 v- W
  79.     }
    6 K& x8 N. a0 u2 t0 v7 ~
  80.     return false;: U' V# v/ g1 y! N5 D; X
  81. }
    7 \7 X( ?3 M3 D, i0 w

  82.   g; Q2 a- e. u% k$ ]9 [# C* e; Z
  83. // 运行所有的worker
    ' k" H( \% p6 d* d
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');8 \' y7 R1 ~3 W& }8 j9 y
  2. ws.onopen = function(){
    # U2 ]- ?0 q( c( v9 i
  3.     var uid = 'uid1';
    2 E; K/ _/ h, z; [) o
  4.     ws.send(uid);8 N; Y' E3 Z. O* o; q4 w
  5. };- h3 A2 y) T0 [# M$ K
  6. ws.onmessage = function(e){/ D1 |  y' h& S0 L
  7.     alert(e.data);
    2 m9 T; l: G2 |
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
      t9 e9 {& y  P" V8 R
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    7 T2 Z0 s+ `1 ^
  3. // 推送的数据,包含uid字段,表示是给这个uid推送* z  `; f& ~8 q9 G
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    ( B; ^. a1 w8 ]9 X3 M' W
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符0 n, c+ S& p- w+ k$ M
  6. fwrite($client, json_encode($data)."\n");
    / E3 r3 L2 o& M8 i, o3 [
  7. // 读取推送结果, d5 q2 C; J4 L& r
  8. echo fread($client, 8192);
复制代码
- z/ v* y* z- f( u8 q' t

& e/ H, }1 c. c/ P0 I3 v
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-9 15:22 , Processed in 0.120373 second(s), 21 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!