您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12175|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;. I6 d! |4 d' t. s& _+ X4 h  h
  2. require_once __DIR__ . '/Workerman/Autoloader.php';( @  i1 o: y6 X+ b( L9 N) B9 x

  3. 0 k( q' f8 x( r9 D1 W  Z5 @7 q4 d
  4. $worker = new Worker();
    5 v8 ~$ @( t$ q+ q( O
  5. // 4个进程
    ! T4 N, r7 l1 R# q7 z
  6. $worker->count = 4;9 A- t; [- E' j- C" e3 G: X9 T0 a7 e
  7. // 每个进程启动后在当前进程新增一个Worker监听( O( }. c- r* T
  8. $worker->onWorkerStart = function($worker)5 @, J6 T' j! |. x
  9. {
      o3 x5 j; f5 Z
  10.     /**/ Y! x: o/ s, N9 B( d4 o
  11.      * 4个进程启动的时候都创建2016端口的Worker
    7 E( d- }" ]! Y+ k3 Q5 ]5 ~
  12.      * 当执行到worker->listen()时会报Address already in use错误
    , c8 B' R) O; A' c
  13.      * 如果worker->count=1则不会报错
    8 T; h0 `8 v: _( h. @2 `: s
  14.      */3 l& x8 G# x  Q# T5 F
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    $ q% z  q7 w" ]$ @) U
  16.     $inner_worker->onMessage = 'on_message';* O& J3 q9 R" \7 a
  17.     // 执行监听。这里会报Address already in use错误3 i- Y  V- ]- g; D! R$ i
  18.     $inner_worker->listen();
    . Q: Z+ q$ W, c, m
  19. };
    ( X% E/ L& j5 }6 U5 Z; J4 f
  20.   R* _; y3 e3 D8 K+ o, x' p
  21. $worker->onMessage = 'on_message';
    1 |  R8 K2 r4 L: I8 p) z: G) l
  22. 6 |. u, @( `2 R, I8 }2 }
  23. function on_message($connection, $data)
    3 K' C' p4 T5 a; n( f. X
  24. {
    ' p4 q- v0 q/ c. b) a" h$ z
  25.     $connection->send("hello\n");
    $ J  z9 t4 u% u0 s4 l3 P# M
  26. }
    + |1 G/ x- Z, u, J
  27. ; J: s, Z* ^$ p* R6 e
  28. // 运行worker
    ( ?' |. y4 R, }( m* }, ^6 y" u
  29. Worker::runAll();
    2 V8 s3 ?# |- a* H  q
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:  S( x* _0 c9 T- g$ b  M, Y

  31. , |! _* m7 k: q
  32. use Workerman\Worker;) s* U: a' h# B, c3 N8 C
  33. require_once './Workerman/Autoloader.php';* W/ \4 @! g4 D( t# M6 \& Q0 z6 P$ v
  34. . K+ x) q" P2 O: V% U1 w4 C
  35. $worker = new Worker('text://0.0.0.0:2015');
    / r  [, l( U& n" t/ I6 b* x2 ~4 P! C
  36. // 4个进程/ m) k2 O( Y4 `) D
  37. $worker->count = 4;
    - \( K' [, }7 p6 H
  38. // 每个进程启动后在当前进程新增一个Worker监听
    2 b( Y: |0 M5 J1 H
  39. $worker->onWorkerStart = function($worker)
    % E5 u* B6 X* l% F' s2 Z$ v* R9 f  g
  40. {
    5 C2 j$ e: z0 C1 B1 O$ F$ l
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    6 @- e( Q! e( o) d
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    ! y. t1 d& t/ Y( D  P5 `
  43.     $inner_worker->reusePort = true;
    - J( L) {4 Y% u/ M1 `
  44.     $inner_worker->onMessage = 'on_message';
    " `( ]4 b7 k$ H
  45.     // 执行监听。正常监听不会报错
    7 k$ H1 _5 E4 n+ q2 N& `8 J- A
  46.     $inner_worker->listen();& M' a' i5 H3 `: \
  47. };
    * `- ^) q+ s9 K% ]

  48. . I5 a; e( J$ z3 T0 ~2 O7 P
  49. $worker->onMessage = 'on_message';
    1 q+ g+ u7 E9 H6 ^  M: g4 E
  50. : }/ K4 R4 p& [, N  n: d/ T
  51. function on_message($connection, $data)
    " l. y: G2 P, ?7 M* f1 ?# k/ e: E7 U
  52. {
    + W7 {1 \) A8 ?
  53.     $connection->send("hello\n");
    9 s' q) J8 ^: _* r1 s) B6 e7 q1 z
  54. }
    $ T0 d" _; }% s6 l; J* r; R
  55. / P6 B) e- u6 J
  56. // 运行worker
    6 S: x3 |- L5 q0 }
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php$ R& q+ Y& n0 y; i( f. u( c
  2. use Workerman\Worker;* T8 u9 }. N; z' v
  3. require_once './Workerman/Autoloader.php';* A2 i  ]% m, s
  4. // 初始化一个worker容器,监听1234端口
    $ r8 A6 `$ f5 b/ L
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    " I2 A: T$ r, K, X+ Y: o! M& k, ?

  6. ) z5 v% L8 J9 j5 @' @
  7. /*
    9 s& B9 l8 o6 d  E# R
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误) J1 E: Q6 g7 M% I0 b) H7 r2 s7 V
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)- a; S- C& ^) p, a8 M
  10. */
    ! p( I% M+ z1 _  \
  11. $worker->count = 1;  s0 i) w6 C; Z/ O" w  }0 W) d
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口9 x2 S1 u3 D! L9 B: l) I- e
  13. $worker->onWorkerStart = function($worker)
    " S; y3 t5 [  `
  14. {1 D& M7 o( A0 \  |) B* s% ^7 g
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符& b  ^' ^  y% |3 {% H
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');) k7 O- I: V# P
  17.     $inner_text_worker->onMessage = function($connection, $buffer)% i! J4 Y- K+ _* r
  18.     {, @8 O6 N% i8 w
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据  M* r+ _, P1 k
  20.         $data = json_decode($buffer, true);+ u- X, J' Q# p8 Z
  21.         $uid = $data['uid'];
    & a8 _* w* ^  |
  22.         // 通过workerman,向uid的页面推送数据
    8 f2 I& C4 G3 {, ], J' C
  23.         $ret = sendMessageByUid($uid, $buffer);# M5 W* B. C* g: M
  24.         // 返回推送结果
    , s" N0 ^7 ]+ o; M3 P
  25.         $connection->send($ret ? 'ok' : 'fail');
    , L# r* S: y7 ~8 s" G' x( M
  26.     };' q; N  [: f; {" S4 [
  27.     // ## 执行监听 ##
    2 m- ]4 o- ^0 c8 b5 E1 g
  28.     $inner_text_worker->listen();$ u0 ]+ F: ?+ R, c" w7 N
  29. };
    1 e; b( P* o5 I! l( c
  30. // 新增加一个属性,用来保存uid到connection的映射
    + x) L3 i5 T$ r. S7 \8 e, i
  31. $worker->uidConnections = array();9 C$ x5 @. I" x/ x6 \' B
  32. // 当有客户端发来消息时执行的回调函数
    ( \' \; w3 k8 u' V
  33. $worker->onMessage = function($connection, $data)* o7 D8 v/ q7 l6 M: ~
  34. {8 S1 g2 Q- i( _! f
  35.     global $worker;6 ^& i6 l+ x: C6 _3 ~0 r- }
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    $ \1 {% i! o9 R* Q
  37.     if(!isset($connection->uid))
    8 y/ Q9 h0 I# @; A; i
  38.     {1 b7 T' b1 F4 L. t" n' g$ R# j
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    % ~4 P# V" P* I4 t) y
  40.        $connection->uid = $data;* c/ T* R/ S) j6 w0 I, n1 _
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    . K. C8 {6 b$ e( }, o
  42.         * 实现针对特定uid推送数据
    ; G+ b  w4 U( q, i$ h$ e) E8 p; D
  43.         */
    ; T  i: p8 U6 I/ W, Z
  44.        $worker->uidConnections[$connection->uid] = $connection;
    : G& q2 \, E# I. c
  45.        return;" E& u. D9 R6 D
  46.     }1 d7 t1 y* P% `
  47. };  @' J6 z7 w' x7 y  r( ?

  48. 2 T  T9 O- }: S2 z" M# Y  H
  49. // 当有客户端连接断开时* |: h1 @# n2 c
  50. $worker->onClose = function($connection)
    ( g! ~/ k& B! ~* O  z
  51. {- o# O$ U  @* P% g+ T4 R
  52.     global $worker;
    & M$ W' R4 t$ |0 O3 E; z( @
  53.     if(isset($connection->uid))7 c' A6 h4 |4 J- ]
  54.     {" ?3 w! ~7 z$ `5 K5 k- b% f
  55.         // 连接断开时删除映射
    : D% x; }' S8 E4 Z) |, B" F, L
  56.         unset($worker->uidConnections[$connection->uid]);- _6 |. e( U( k& s
  57.     }
    # x1 v9 ~4 |8 E% R1 n6 k
  58. };
    4 U# V0 s8 e0 a- o, R! T+ r

  59. / g4 f# P; d& V
  60. // 向所有验证的用户推送数据. D* O! \/ J- U3 P7 ^+ S' E1 e
  61. function broadcast($message); j: l" v% U! s: h4 i; A# ~* A
  62. {! q! Q/ A# H& \: r$ p
  63.    global $worker;% }& s# s& ]. m" H. Q# D: ^/ Y
  64.    foreach($worker->uidConnections as $connection)
    $ [/ j) N& c, F: h1 T
  65.    {
    . i5 ^1 v5 F: P1 c5 ?' P
  66.         $connection->send($message);  Y1 [' u1 M8 L6 H
  67.    }
    ( M" ]0 S# ~( t. H
  68. }# P2 S. |: E3 m5 D
  69. ! n8 {* p6 @$ k; g% U- e
  70. // 针对uid推送数据+ m. V2 E. ]5 N" R) z- n6 d! k, o7 k
  71. function sendMessageByUid($uid, $message)
    3 m, x; G3 _: ^! w: h+ ?( x
  72. {
    ) K7 g3 Y" P9 _  V3 `0 Q
  73.     global $worker;8 L" y% J; k3 Z' I6 T
  74.     if(isset($worker->uidConnections[$uid]))
    $ s% n1 ~* \! J0 O' b$ S
  75.     {+ }5 y" Z9 ~0 `  T8 m9 c' e
  76.         $connection = $worker->uidConnections[$uid];& t1 ?8 W7 D0 ], |
  77.         $connection->send($message);
    ) S' L: P. \0 K1 S; ^
  78.         return true;" y! Z% u" [2 Y+ j8 b1 H
  79.     }; b7 m) {7 e" P" R* m/ R" U
  80.     return false;
      e: j0 d  Z$ k
  81. }
    # i6 o) ~8 r+ `2 c: Q& P' p: R
  82. ; c) Z) P5 `0 Q2 h0 ?  t2 u
  83. // 运行所有的worker+ s( ~+ p0 Y1 f* g( X4 a
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    & B4 U4 l- H% C' [7 q
  2. ws.onopen = function(){
    $ y& o5 x/ Y( I+ z
  3.     var uid = 'uid1';: {2 ]# c1 V& c* Z$ k- [$ v; z
  4.     ws.send(uid);. G4 Z8 ~3 }/ l0 H8 i
  5. };; {8 M" i3 q' r/ Q+ T& \8 ~5 I* F& ^
  6. ws.onmessage = function(e){
    ! u1 W+ Q4 `- C- l  w% {- Q! m* x% Z. F" C
  7.     alert(e.data);
    $ a- i: Y6 ]7 L" w- p/ A2 ~3 A2 g
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    0 L0 M/ M; G& U8 y' Q9 Y& G
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    ; Y+ h+ j; w% c* B7 F
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    1 U; g0 y9 ]# L) ^5 ^; M( {8 Z+ o- [% K
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');' j/ c+ W3 n. i; {: C" l
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    & N2 Y/ _( r6 R8 {) |
  6. fwrite($client, json_encode($data)."\n");
    3 s3 w  F9 _+ N
  7. // 读取推送结果
      H: @( a  ^- ^5 _4 S  D% N
  8. echo fread($client, 8192);
复制代码

. u! F. z* ^0 U
* T2 C/ s- v3 K% b- c2 t
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 23:28 , Processed in 0.118895 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!