cncml手绘网

标题: 用于实例化Worker后执行监听 [打印本页]

作者: admin    时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    2 v0 m8 I) g( E& h
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    1 }4 }, c* P4 s5 j

  3. . p7 }7 I$ p* {5 `9 \6 |
  4. $worker = new Worker();( }6 K  S/ ~$ W9 U' y( J5 Y+ J0 _
  5. // 4个进程) k# s+ ~* f& ]+ |+ N- J# s
  6. $worker->count = 4;
    8 ?* k' a, W8 e. S- ~- s( E
  7. // 每个进程启动后在当前进程新增一个Worker监听0 g. C/ \. c* `6 o0 k6 p$ T' A- V  v0 K
  8. $worker->onWorkerStart = function($worker)
    6 F6 c* V5 p0 T. p0 O$ m5 A7 j
  9. {
    , j( E2 Y: Q. C1 C; d
  10.     /**
      l0 f% k# P" J, J( {# L. ?1 r
  11.      * 4个进程启动的时候都创建2016端口的Worker
    ( B: ]; _9 ]) m; I
  12.      * 当执行到worker->listen()时会报Address already in use错误( L- f2 T1 o. ~& P3 M1 W0 O
  13.      * 如果worker->count=1则不会报错
    8 Z! j) Y) C; S3 L) p! I5 @8 q
  14.      */. [' l* u# u# U  C  A
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');. {& ~# F5 R& w! [5 E4 M7 [! ~- D. l
  16.     $inner_worker->onMessage = 'on_message';( r( I$ w4 c& S& p* G7 e- k  C+ q
  17.     // 执行监听。这里会报Address already in use错误
      L3 o: p( Q) d1 b- T2 x
  18.     $inner_worker->listen();: e% o. |5 F! ]2 a* U( {
  19. };( n! ^, T# @9 B' N+ U: \# I: O
  20. 8 a6 P! X# {1 F( f
  21. $worker->onMessage = 'on_message';! y6 V2 S' r& Y# [4 }* l/ c
  22. 8 Y7 R6 \, a/ `4 W  x
  23. function on_message($connection, $data)
    7 \% a( h/ ?3 j$ }- t# m/ @1 c
  24. {, j8 L' Y, ?; J# G" ?: A2 O
  25.     $connection->send("hello\n");( v% d- g, V  s7 v2 _* E
  26. }* F, W2 N. Q" s% U6 W& @

  27. : U! G6 p0 D1 c
  28. // 运行worker
    ' U6 c3 V, c# |6 R
  29. Worker::runAll();
    1 @/ ?0 {( ?" n5 {" _
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    3 V* t2 B+ P2 i' B! M8 Q
  31. / T+ [; ^3 w, [6 v) q" _# D( @1 }
  32. use Workerman\Worker;8 o0 D# Z: y  _6 `) g6 I
  33. require_once './Workerman/Autoloader.php';
      p6 O  X  i" W

  34. % E  ?* K: r+ J7 }% I& k( Q6 k6 F
  35. $worker = new Worker('text://0.0.0.0:2015');% x0 r+ _# O' ~1 ?. B
  36. // 4个进程
    ; ^" O5 x8 X& Q% K
  37. $worker->count = 4;1 {# ^! g' S" A6 \* X
  38. // 每个进程启动后在当前进程新增一个Worker监听# {$ v2 v0 _  r( Y$ B+ F
  39. $worker->onWorkerStart = function($worker)
    ' L' C% R2 N" Z' w8 V- _+ K9 r
  40. {
    . T) E) F( ?; |  x2 a6 ?! F( }
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');/ z1 y9 [( p9 u( L( \9 j
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    ; P5 d) [; b+ {
  43.     $inner_worker->reusePort = true;, I; x$ g) X# M% u5 V
  44.     $inner_worker->onMessage = 'on_message';
    / M6 W; s& c$ H
  45.     // 执行监听。正常监听不会报错
    7 i  F; h# x' F5 ?, q  `$ K
  46.     $inner_worker->listen();% G3 M! D$ |6 k
  47. };5 k7 x3 R& b# u& H

  48. 7 K. r. e/ I+ g$ v% h; V" O( Y
  49. $worker->onMessage = 'on_message';; Y1 d# ^/ [' U' R. {- Y
  50. 5 Y1 X6 C) m* j
  51. function on_message($connection, $data)
    ) L" X9 @+ g0 ~5 N) M. k; b. @
  52. {, z: B' O) G9 J
  53.     $connection->send("hello\n");* U/ o5 `  C8 V
  54. }5 O# W# H! H" U! A( \5 i
  55. $ |  X: {6 G5 e; \+ k5 z
  56. // 运行worker. Z' C# p6 n# |0 l  T5 P9 {
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php- {( p5 \, Q, X$ C5 s
  2. use Workerman\Worker;
    0 k+ s/ w7 A  L+ Z
  3. require_once './Workerman/Autoloader.php';6 ~& h9 I: V1 I" E8 W
  4. // 初始化一个worker容器,监听1234端口
    * O3 E9 s) I% B
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ' ?0 p1 o# C) @8 |* I
  6. 3 Q9 N1 w2 h: V9 J0 E& B
  7. /*
    : M6 e$ k+ Z0 k' \  L% x4 h( [' m9 \
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    7 P5 K( a, [4 c2 l0 R
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)3 ~5 u, [. n0 ]1 X
  10. */
    ( {1 g$ p  |+ q, o
  11. $worker->count = 1;. q+ y. r: y# {2 L$ W
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口8 ~5 X6 ]7 f& D
  13. $worker->onWorkerStart = function($worker)( q+ O0 E. }, j- u2 M. ^$ A9 ^
  14. {. K% G( D) Z' ?# |# S. k
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符7 W. u/ v8 ~& p7 S( W# t; a% H
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    ! P1 C8 V) L  w' b2 \6 Y
  17.     $inner_text_worker->onMessage = function($connection, $buffer)7 C; V+ ?! i; J! Z, J% m: k
  18.     {
      n: h: o9 K( T6 m, p" v' e
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据: \. r5 o: ^# q+ Z0 n" l% t
  20.         $data = json_decode($buffer, true);2 o  }4 {  U7 Z" p  `
  21.         $uid = $data['uid'];& I9 W: |& C1 u; w0 c" G4 R4 }+ y
  22.         // 通过workerman,向uid的页面推送数据
    ) m0 P+ ~/ |9 `. m1 V) M0 U9 o, G2 M
  23.         $ret = sendMessageByUid($uid, $buffer);. k0 }1 {/ C% A8 u' R( D
  24.         // 返回推送结果8 C5 V, d. n3 Y9 x, w
  25.         $connection->send($ret ? 'ok' : 'fail');. I8 j3 H. ?! Q# s0 a! M% n( w
  26.     };2 a0 q% u: E/ K# [& p
  27.     // ## 执行监听 ##$ S: }3 J) `9 f% K' \
  28.     $inner_text_worker->listen();2 S: C* C% k  E$ W# r% d
  29. };
    / E) G* u7 o5 C& O( w  A
  30. // 新增加一个属性,用来保存uid到connection的映射
    8 T# V# a3 z8 ]% N
  31. $worker->uidConnections = array();8 E! T( x( g4 a" t9 I+ A" [
  32. // 当有客户端发来消息时执行的回调函数
    2 ~4 H3 i, u3 [7 Z; d+ s' U
  33. $worker->onMessage = function($connection, $data)
    ! t/ t, e' a8 Z+ _- s
  34. {! f+ u. B3 ^' r
  35.     global $worker;6 J8 m# x+ g1 d
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    ; n- b0 i, c4 u' j1 ~. C# w) ?* @
  37.     if(!isset($connection->uid))
    1 I; G, A. X* |
  38.     {
      l, x3 c+ i  r" W4 }; u" o
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)# _4 X& {, J7 M+ L+ q+ V
  40.        $connection->uid = $data;2 `8 J7 d2 V( P8 O2 K7 i) c
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    / t3 l" H- N! W
  42.         * 实现针对特定uid推送数据/ ~9 R* ~3 K6 I4 n
  43.         */6 E! g/ G" ~  d0 p' j
  44.        $worker->uidConnections[$connection->uid] = $connection;: z) O/ G. J% `+ M+ h: v) N
  45.        return;# `4 k' s: O1 _
  46.     }3 d) M8 A; V: k, T# P
  47. };- E) K- H& X% @: S: {

  48. 1 t8 b$ v) X7 k' o, s% U+ i7 R+ K
  49. // 当有客户端连接断开时
    4 t  y9 [. l9 f( q/ \
  50. $worker->onClose = function($connection)0 I3 K5 ^+ }1 u
  51. {$ {2 d* z& m0 V: m+ }# ?- ~$ g. d
  52.     global $worker;4 l2 y/ h+ B9 T* x
  53.     if(isset($connection->uid))
    . B- u0 o6 y2 R6 \  f
  54.     {4 |/ V9 G3 x2 C
  55.         // 连接断开时删除映射3 d" e8 M* p3 O! \
  56.         unset($worker->uidConnections[$connection->uid]);+ t- G; }) L8 f
  57.     }
    % [: |. {& J, }
  58. };5 u, q) \- b$ D! H

  59. 7 ^( L3 D3 c8 u" i$ R
  60. // 向所有验证的用户推送数据
    ' T5 }; X" j0 \3 `! a" n( t' N3 X
  61. function broadcast($message)- i1 Y" k9 m. o' o$ ], ?8 j
  62. {
    9 }( U; E) P5 Q5 K
  63.    global $worker;
    8 u( C. G3 p# I( w
  64.    foreach($worker->uidConnections as $connection)
    / c' Z- w! |/ F4 u3 |8 W
  65.    {
    ) p  [" A! D8 p9 S$ K
  66.         $connection->send($message);
    & K. m' N: a6 a7 B
  67.    }* A; o- ^3 K$ U% V0 b
  68. }
    / Y( Y3 T$ N' a
  69. & U* {# Y* C7 Y! E7 K3 c% ^: z; s
  70. // 针对uid推送数据
    " g- S& h: O" Z- v
  71. function sendMessageByUid($uid, $message)
    & E4 i1 H9 X4 J0 E6 C- H6 x2 f
  72. {
    1 ^8 I9 X6 }$ {% e9 L4 x
  73.     global $worker;- W. e2 H; v2 ~" \6 _/ [- ]
  74.     if(isset($worker->uidConnections[$uid]))2 S! N) I0 B5 d' j
  75.     {6 i* @  r2 W4 P2 ~
  76.         $connection = $worker->uidConnections[$uid];* A8 [4 W" f' A" E( V( k7 t9 X3 J
  77.         $connection->send($message);
    1 L0 a8 [- G' A' T
  78.         return true;
    ' d2 s+ r; }8 e3 S6 l+ U
  79.     }
    & M" N/ b# n* C" @9 b! b, L5 M
  80.     return false;
    + k; w! U0 H- @: F0 T
  81. }; s/ X7 Z8 N: @/ T; k

  82. " d. B* j% g8 Z  C& e- H
  83. // 运行所有的worker
    2 z! k, z0 _) O* L
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');" s6 {) d* d" F1 I( ^5 P
  2. ws.onopen = function(){
    8 [0 N5 R9 {5 A" U. z0 g
  3.     var uid = 'uid1';6 V" V$ x" s2 D. @
  4.     ws.send(uid);: F; O* R. s) l
  5. };, ^) e4 |" k" Z  B7 V
  6. ws.onmessage = function(e){
    2 {7 m) W( A. R) K  Z
  7.     alert(e.data);
    0 v3 g. r, R" |% X0 N
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口0 {) G6 [* S0 I+ ^: s
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    - _' z9 s- q& h% k' |3 S
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    9 _; i  W; t7 G2 S) `9 R6 z
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    * J0 m& d9 F0 H
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    3 N! O% z0 t# D4 Z) S& k+ }
  6. fwrite($client, json_encode($data)."\n");
    & @( ^* k2 _3 T3 V
  7. // 读取推送结果
    7 w% ]" L0 X- r* j; N3 M0 K
  8. echo fread($client, 8192);
复制代码

* V3 c( [* q/ J1 f5 n
: J! n2 P; T' k$ z6 r" L5 A




欢迎光临 cncml手绘网 (http://bbs.cncml.com/) Powered by Discuz! X3.2