cncml手绘网

标题: 用于实例化Worker后执行监听 [打印本页]

作者: admin    时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;$ b5 m, i* U: D% Z% v
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    ; A: K1 d% f0 A/ l

  3. % h2 P  E. m( k! p1 X% {4 U
  4. $worker = new Worker();7 S0 W& N# T9 {" Q; i
  5. // 4个进程
    0 T' l0 U3 s6 {8 ~; |9 h2 V7 W9 i  U
  6. $worker->count = 4;- K% V6 J! m4 N
  7. // 每个进程启动后在当前进程新增一个Worker监听
    # l& F3 f. r* V+ F$ x* V7 F& U) ?. N
  8. $worker->onWorkerStart = function($worker)  t9 H0 E: W+ a; c& I) _
  9. {9 I1 }! w, M! H. g
  10.     /**
    4 Y. X, ~8 ~2 I5 D/ Q2 D  Z
  11.      * 4个进程启动的时候都创建2016端口的Worker0 {* z: t9 _$ A7 b# p* v3 `
  12.      * 当执行到worker->listen()时会报Address already in use错误. i& ~7 b. F) N9 h4 g2 X1 b
  13.      * 如果worker->count=1则不会报错
    # H' `( j8 b9 P
  14.      */% L" |3 q5 l8 G5 c- c
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');- _- Y* K" u# Z# |8 ?. H
  16.     $inner_worker->onMessage = 'on_message';6 U& J2 ]: B' b
  17.     // 执行监听。这里会报Address already in use错误5 o5 g4 z* B/ E7 E) b
  18.     $inner_worker->listen();
    $ m4 p2 C4 g% I; `+ k
  19. };; }3 b  Z! l% H2 G6 S: m3 g7 L
  20. . e9 x3 [; T% ~, M; k1 K5 T2 j
  21. $worker->onMessage = 'on_message';5 y* X6 f& K6 u  L5 _

  22. 9 Y- p7 ?) z# z
  23. function on_message($connection, $data)2 |' K$ f# ?, S9 N7 b# P
  24. {
    7 ^0 p) _5 D( b+ ]2 m! m
  25.     $connection->send("hello\n");
    . ?" W6 I6 \9 j9 b: E9 |* Q
  26. }, {2 K3 w% J  v) f

  27. 6 O; ~, W% G% j; p
  28. // 运行worker- l( X7 y3 h8 {4 p6 v3 V
  29. Worker::runAll();4 a: I  K6 J; N4 o
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    3 F4 F, p8 _. z8 N' Q0 a

  31. 2 m+ R. G, q, n: x& I. r
  32. use Workerman\Worker;
    6 B9 V, R9 ]3 b  r( L
  33. require_once './Workerman/Autoloader.php';
    0 q+ B8 e; ~$ z9 k4 v
  34. $ e- {! b. M) x: V0 e3 S; b
  35. $worker = new Worker('text://0.0.0.0:2015');& r" I- d5 c3 T3 d
  36. // 4个进程
    + M6 w; ~$ q) x- F' [6 E$ g' {. c
  37. $worker->count = 4;
    # d1 Q, n) x* A; i& [6 u9 M
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ( Z9 m9 q$ c5 ]0 d' r( a$ \. X
  39. $worker->onWorkerStart = function($worker)3 c- r% x7 A) i1 T- n7 u' Y) R
  40. {
    - |: b: `6 ?+ `7 O
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    " h) W; a  h9 {
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)* g9 h3 G) k9 N  ]
  43.     $inner_worker->reusePort = true;
    : G# I4 |$ R: C% q2 a
  44.     $inner_worker->onMessage = 'on_message';
    # ~/ d# W5 l4 d, ~; u4 X6 p( _
  45.     // 执行监听。正常监听不会报错
    $ n$ x! }& [( k5 b
  46.     $inner_worker->listen();& h0 r& H2 Y  y: u: N  q" \' N
  47. };
    ) }9 w- ?9 h( F# d

  48. 2 `% |- s/ \. ]3 R4 x7 w( X
  49. $worker->onMessage = 'on_message';
    ) O$ z4 c; P0 n) f& o' W3 k. ^: k
  50. ' r8 m% t9 b* G' o: |- W# j
  51. function on_message($connection, $data)
    7 c# }  T$ n4 j
  52. {
    1 s6 q: k! u) a& y  [. `
  53.     $connection->send("hello\n");! h& [0 `( L- W$ e
  54. }
    5 X# p& S+ T+ t- y" Z

  55. : t8 o5 ~3 M: Q
  56. // 运行worker# U  @$ P1 P# z5 a
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php# m1 @  }, C! x& p/ F/ q" k# ]2 F
  2. use Workerman\Worker;
    " Q9 `0 j* d; w8 ]. q
  3. require_once './Workerman/Autoloader.php';
    7 M' s4 L8 K0 L" G2 Z9 L
  4. // 初始化一个worker容器,监听1234端口
    % t5 n0 e3 @) L  b
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    7 E6 F% K2 Z# }
  6. 1 ]/ F* |9 m! m1 V
  7. /*" a. a' N5 w- E9 |+ _1 f  J
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误% ]0 Y( U5 S; Y  t/ J$ p9 W( @0 e
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    2 a" c8 J. F& a0 [# a' g1 r; w
  10. */' U1 A. Y8 ~( L/ s+ f9 b0 w) o4 Y$ S/ B. ?
  11. $worker->count = 1;
    ' k- }4 l3 m. @& k* r# B
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口  i) w* }, V5 v3 j7 X& E
  13. $worker->onWorkerStart = function($worker)
    ( E* u& ^. e8 [
  14. {
    ( b, Q: N: F; D' I/ T
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符- \5 u; K& z1 N: n3 U" ~
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');0 q( A5 V2 V" h# Q  z/ P
  17.     $inner_text_worker->onMessage = function($connection, $buffer)8 p7 g/ U: @+ B0 B% o( h
  18.     {
    % @' F' D0 @* p2 t) y3 |
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    3 A# ?8 i& m& {9 T
  20.         $data = json_decode($buffer, true);
    : ^. [3 ]! Y. a1 Z; d; q* h4 g
  21.         $uid = $data['uid'];
    * R! h* G) m! ~, q$ n
  22.         // 通过workerman,向uid的页面推送数据
    7 `1 L- `7 F% E/ q2 \& A  g6 r8 W
  23.         $ret = sendMessageByUid($uid, $buffer);+ c  n1 m3 ]' y+ N9 k! L
  24.         // 返回推送结果
    ( Y1 v3 z7 @7 Q* h: ]6 u
  25.         $connection->send($ret ? 'ok' : 'fail');
    : ?$ g; X+ G3 U1 F0 Z, V5 s
  26.     };6 [3 I0 n: q& n; k) r5 V
  27.     // ## 执行监听 ##4 y8 t- F3 H( b. U! C; {9 [4 V
  28.     $inner_text_worker->listen();
    % u1 p0 y  j- ^1 e  s. G, O* H0 a
  29. };$ L7 ?- K$ a5 d5 S" }
  30. // 新增加一个属性,用来保存uid到connection的映射
    * \) W: A9 z6 }% L$ p* w1 K
  31. $worker->uidConnections = array();
    . C) p5 c: q# o" {
  32. // 当有客户端发来消息时执行的回调函数
    4 S/ O# v. S6 L! c* d' C
  33. $worker->onMessage = function($connection, $data)
    / H  `& e1 Q; ~- f. b$ n; E" h' g4 @
  34. {1 ]7 C/ B7 x9 h' ~8 @  G
  35.     global $worker;0 _9 ?: [0 X4 ?1 k$ y' M& m0 a5 K
  36.     // 判断当前客户端是否已经验证,既是否设置了uid# [5 Q& ~6 }6 Y8 r" j
  37.     if(!isset($connection->uid)): Q' ~7 T* i1 n$ }0 U0 k) B0 z8 g# a
  38.     {
    0 M* u1 R3 D3 c, Y9 v3 Q
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    + `  W$ x, K' ]1 `1 P( Q
  40.        $connection->uid = $data;
    . b# }+ R; ]3 O/ m6 v8 s* R! X
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    / f. D2 _7 a  q
  42.         * 实现针对特定uid推送数据$ z$ |* `! P: e; W  Z8 z
  43.         */* p8 `- f- b  e) `
  44.        $worker->uidConnections[$connection->uid] = $connection;) |' s' H; b4 a5 x
  45.        return;/ a8 h2 T0 g3 i- j- W- b4 n7 y7 S
  46.     }
    9 I: t' ?, n6 v2 s+ C2 O
  47. };
    6 A+ _- l& y( o

  48. 3 v; N0 S! W6 n; Q7 ?9 v3 \) n
  49. // 当有客户端连接断开时
    & t6 [2 u% r  H. c% H
  50. $worker->onClose = function($connection)
    1 T7 z0 V+ `" N* E: o0 }0 ~
  51. {
    ) K% @- K' D. S& }
  52.     global $worker;8 v7 |1 s6 o, \- p, m+ U
  53.     if(isset($connection->uid))
    , v, W+ r6 y8 U
  54.     {
    - }& g2 p/ z7 \- `% u* |" N; {, P
  55.         // 连接断开时删除映射
    2 x, V  M+ w% O. c( k
  56.         unset($worker->uidConnections[$connection->uid]);) B) l+ `! j  X* E/ ?% S
  57.     }
    . \# v" e4 Z# _* f3 @1 E3 R
  58. };
    # c# r1 g0 p0 k8 D: ]& O# F4 z. e
  59. 1 _+ O* T3 O9 j" K3 g
  60. // 向所有验证的用户推送数据0 r( w: e$ y, |  T* ~
  61. function broadcast($message)
    6 v/ E# ~: N  H1 Y' o
  62. {* t/ H2 d. _2 {( ?2 P
  63.    global $worker;
    " I2 B- J: v5 V, a" e
  64.    foreach($worker->uidConnections as $connection)
    ; U5 \, C6 W  d8 {: |/ f9 Q
  65.    {! C- j% A- k' l/ N/ Y
  66.         $connection->send($message);" ~9 {4 l; i) f0 C) S2 |' C
  67.    }1 U' A- z" ]% d. G. d) `
  68. }
    $ s/ j5 s: J- U0 z: Z
  69. 6 t* [- E6 S" R  \7 K3 `4 U! a
  70. // 针对uid推送数据! Q4 `! @+ W7 M( Z& B- T" _: A
  71. function sendMessageByUid($uid, $message)" j( k3 j; B1 E6 ]9 ~
  72. {
    4 Z' U" C& o1 e/ ?, ]& W9 c- x! c
  73.     global $worker;
    ! K/ }8 h+ y& }! Q! K4 N
  74.     if(isset($worker->uidConnections[$uid]))4 V4 H; t6 O" c' r
  75.     {: A& l/ g7 Y3 r# n8 a: d  N
  76.         $connection = $worker->uidConnections[$uid];  L# R) W. W2 Q. B+ ~
  77.         $connection->send($message);
    9 Q0 G, _% x5 D: b& M" ^, o
  78.         return true;
    2 }4 u. ^" s$ L' c
  79.     }
    * `# D( K! s# i# W5 O! ^! K: t
  80.     return false;* T, R( z+ A; b/ c7 y
  81. }7 Q5 J) `6 i' x

  82. 9 f2 C( }2 l! e5 R
  83. // 运行所有的worker: M2 N- U+ x4 \5 S
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');5 B/ W$ L3 k: o" `6 u! r6 m' V
  2. ws.onopen = function(){* s" F% L- ~0 s; S/ c, V7 n/ t) {
  3.     var uid = 'uid1';3 y2 s+ ]7 e9 o+ K( w/ b
  4.     ws.send(uid);8 e1 M5 b' ?8 p+ p
  5. };
    2 x( D0 O2 M5 U# C; q( B$ C
  6. ws.onmessage = function(e){
    7 u' x, R+ [* A! Q, _" a( c' {& ]
  7.     alert(e.data);
    * H+ u5 H% T% ~5 q, t
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    9 c. m* l6 l+ M% Y; z1 d( i
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    / r2 o8 {  S/ ~
  3. // 推送的数据,包含uid字段,表示是给这个uid推送; g5 J8 z5 h6 S1 v
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');' g% r! v9 T8 |9 b) L* M) l5 w
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ Q9 Z4 V9 \" q4 r* v" s
  6. fwrite($client, json_encode($data)."\n");, F7 Y6 f# ?9 P* x# v" ~' y
  7. // 读取推送结果  w: ?% R! o6 F+ r, z
  8. echo fread($client, 8192);
复制代码
0 I3 r2 `6 g6 S' `8 g: G

. |$ w0 W  }7 p5 D




欢迎光临 cncml手绘网 (http://bbs.cncml.com/) Powered by Discuz! X3.2