您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14664|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;0 [6 h% w  m1 V+ |3 T4 q3 W
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    + H& Z1 {8 j! o' O) q
  3. " u9 }7 d. |% L6 N- ?8 B" ^' P8 r
  4. $worker = new Worker();4 O( k, `3 S  o
  5. // 4个进程, v9 B! ]# n1 r* |, S
  6. $worker->count = 4;
    : v2 j2 ~6 }- \7 d8 o5 ]/ o& Q9 F
  7. // 每个进程启动后在当前进程新增一个Worker监听
    . v' O% ]$ G% W  b
  8. $worker->onWorkerStart = function($worker)" s1 O% l$ j  I! i! o* Z$ d
  9. {5 H$ g! x. N# @* A4 @8 e$ t1 X
  10.     /**/ {) _+ r- c; T0 r
  11.      * 4个进程启动的时候都创建2016端口的Worker* S% D  q4 {8 E" b$ y7 O  x
  12.      * 当执行到worker->listen()时会报Address already in use错误6 y; x; `, T2 E8 u' ~# Q
  13.      * 如果worker->count=1则不会报错
    9 ]& R+ X" `6 `! m! b: h1 h" g& M
  14.      */1 J, L2 Q) R( c" ^0 }' y7 b0 K
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');' ?4 F2 m  i- z: G& s) l+ u; v/ H
  16.     $inner_worker->onMessage = 'on_message';2 x: H5 w1 v* e3 [( r
  17.     // 执行监听。这里会报Address already in use错误
    & F/ Z; X+ a' d7 k+ z' t
  18.     $inner_worker->listen();8 W8 b2 d3 H) q$ e5 D+ I- X2 x
  19. };
    : G1 I9 F5 s& M, U) c& }" z$ q

  20. ! ?" \* S" D+ w) R, J9 J
  21. $worker->onMessage = 'on_message';$ d7 l9 q0 T: l0 s4 l, l% `

  22. ! O( P7 I" X. f) @- c: C/ E2 S
  23. function on_message($connection, $data)2 d$ Y; l) ?) s8 t
  24. {
    # w3 J6 c  K& t  B  X8 H
  25.     $connection->send("hello\n");
    4 }& x! a$ ?: \. x' y2 ]
  26. }
    6 C2 @4 E8 \% |5 w$ o
  27. 9 t( C6 E2 C5 K( k1 l
  28. // 运行worker
    & w5 p( l. }0 _# w9 }% l+ h/ Y
  29. Worker::runAll();4 [8 t8 r& I2 I
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
      j% X2 D1 A3 L  g
  31. & m: X( Q* D- Z6 x. o
  32. use Workerman\Worker;/ G& X; s! @$ V0 A3 j8 C; h0 [- {; J
  33. require_once './Workerman/Autoloader.php';7 C# @# b( M3 F

  34. 2 f$ \( u8 r& z2 H  j
  35. $worker = new Worker('text://0.0.0.0:2015');
    3 m4 W0 y) Z+ F: R
  36. // 4个进程
    , |. L" [& ], C( }- m. W
  37. $worker->count = 4;. F0 S* K+ P! F$ b% H" H3 i
  38. // 每个进程启动后在当前进程新增一个Worker监听" S  G2 _8 a0 [$ S! L* l4 s
  39. $worker->onWorkerStart = function($worker)
    , m- y# U! X2 p$ ]
  40. {, t9 ?0 [) Z+ D
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    9 E3 P. }9 L5 c0 B& |9 a6 O3 S" D
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)6 J) B$ a8 `0 ~3 d: b( @& L
  43.     $inner_worker->reusePort = true;$ @4 `! h+ C$ j; Y4 _% n( a  \# m
  44.     $inner_worker->onMessage = 'on_message';
    ' t  g( Q) ~9 j
  45.     // 执行监听。正常监听不会报错
    3 Z- I! H: a5 v& @7 W' ^, y
  46.     $inner_worker->listen();
    0 r* ?9 a" C& @  ^) X: @( H
  47. };# l7 t1 |& a8 @" `! x9 w
  48. 2 o; b' b) R  ]5 i) x
  49. $worker->onMessage = 'on_message';
    0 {$ Q4 \& D* v+ y
  50. # _2 m8 H/ f# E; Y- c) r  v3 l
  51. function on_message($connection, $data)8 P8 q  h8 `4 S3 Y- d  b4 v
  52. {3 v. H. s3 c* o, B9 j. @
  53.     $connection->send("hello\n");
    . S2 k' a' r( R, ~
  54. }
    % I- T0 o  b: ]. K* M
  55. 8 m( V  B2 Q3 Y, e% ~; G# x
  56. // 运行worker; B8 V* n; a) m& J
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    1 Y( _0 V6 k4 _: I8 ^4 W1 ]
  2. use Workerman\Worker;
    3 _; W' B) I! h7 ?9 k# l
  3. require_once './Workerman/Autoloader.php';
    ( Z# S! D" l5 d- R0 A1 N1 ~0 {
  4. // 初始化一个worker容器,监听1234端口$ ?0 I' ~0 h0 h( J
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    % Q  T4 k+ J& z

  6. 9 a) H% \& i7 D! z0 C" o8 s7 e
  7. /*" j) x* D) ?. H" @6 B$ S( m
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    , y& P4 w( C, o/ m) g
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    ( d5 W* ^6 f0 d; \) q
  10. */
    1 C' m' J' T7 q, @8 p
  11. $worker->count = 1;+ [; V5 `) o+ L  h
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    ! w8 O4 w0 [9 |4 T
  13. $worker->onWorkerStart = function($worker)% ~, B" F6 |% y! p0 j# p
  14. {/ h' a1 q: _2 n) u
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    ; o( J6 e3 V+ R( C& k- m
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');" c  y  y5 w5 q% ?# E4 ~
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    & g) g; k& }# X( c9 D- R' |4 x
  18.     {/ H" k# @- f& W2 x8 X% j: y
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据& X$ ^9 r2 N% W7 P
  20.         $data = json_decode($buffer, true);/ |4 H6 s# n( j
  21.         $uid = $data['uid'];" i1 ^  d+ U& {' T
  22.         // 通过workerman,向uid的页面推送数据
    ) y2 d3 @* V' h2 S8 W& R$ d
  23.         $ret = sendMessageByUid($uid, $buffer);
    2 U+ h% H. f5 q
  24.         // 返回推送结果  F) ^1 Z; @- y4 f
  25.         $connection->send($ret ? 'ok' : 'fail');% t, c* L# D" u8 {+ T- B
  26.     };5 I. Z( D/ D3 l, D* `  R$ `
  27.     // ## 执行监听 ##' D1 {  p" K, ?
  28.     $inner_text_worker->listen();, d- E1 ?, Y7 o4 J+ O# b/ |
  29. };/ i4 {2 p5 x) X5 P9 S
  30. // 新增加一个属性,用来保存uid到connection的映射0 g3 h- h  f/ t4 h7 a" m4 U! _
  31. $worker->uidConnections = array();! i0 H  a4 u# S
  32. // 当有客户端发来消息时执行的回调函数8 |7 {" v( _; @7 Z" t. O3 y
  33. $worker->onMessage = function($connection, $data)
    ! h& q1 ~, }9 S+ k) Q; ]9 Z( t
  34. {
    ' O! G2 Q9 ]) N0 y0 B
  35.     global $worker;9 M" \  A4 N/ l0 m/ R
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    3 S; Q3 |3 F/ Q7 `# Z& E
  37.     if(!isset($connection->uid))- b! h; x+ \8 Q3 J
  38.     {7 o/ ^" H1 L) R# A$ F# g2 R! W
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)* i$ y. @! H2 D9 [
  40.        $connection->uid = $data;9 z( R) E  _# \1 p! b
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    % [4 t  T0 ~. H
  42.         * 实现针对特定uid推送数据
    . @, _; M0 }1 W" i8 {) ?# }
  43.         */: y9 @# h2 `$ m! `# }! B$ ]
  44.        $worker->uidConnections[$connection->uid] = $connection;* v* X# c& x  F5 ]
  45.        return;& Z7 g0 e: L: M/ t3 l
  46.     }3 ^# `$ M, W: n5 T: K6 u
  47. };" Q) y$ u* ^$ v2 N0 g$ x

  48. " x, `' N% P. _' {) N1 d' F6 l# O6 j
  49. // 当有客户端连接断开时% T" J  y3 A* ^: w9 G; `
  50. $worker->onClose = function($connection)7 P. h( {; o8 l/ w
  51. {& W- v4 N+ J( X# K$ Y
  52.     global $worker;7 s1 L2 K/ a6 Z9 t1 b; _& x
  53.     if(isset($connection->uid))
    / t2 F; z# ]) g( G' y' h# m
  54.     {
    0 |; V4 G! P" f0 _
  55.         // 连接断开时删除映射
    ! Z6 j: I5 ~. n7 I$ V4 h
  56.         unset($worker->uidConnections[$connection->uid]);
    # u) q6 J0 E8 t* b
  57.     }1 s! s0 ~  i8 t5 |& x' S
  58. };
    * D. j4 Q* U$ N5 h2 s2 k) Y

  59. % c% F0 {! ?( {& K  o* a2 t
  60. // 向所有验证的用户推送数据
    0 r& M( q! c) B" {0 B8 g$ {
  61. function broadcast($message)
    7 g/ `0 a, W6 Z) F  p) g
  62. {4 C8 Y1 q% P: _# ~; a# s
  63.    global $worker;
    8 K2 b9 {5 V. q
  64.    foreach($worker->uidConnections as $connection)
    # D9 k: b, r; w- @* A, A4 o
  65.    {+ t9 H3 }$ O, Z5 F" U6 Z
  66.         $connection->send($message);$ q2 i' W4 y1 s  {
  67.    }
    1 r: I; g1 p( k
  68. }* [% G/ N' [: {% j( f
  69. 3 o  [) ^" C8 _3 R: M% m# Q, B
  70. // 针对uid推送数据
    6 m! P; U; ^" D
  71. function sendMessageByUid($uid, $message)+ z# f% a/ R3 Y, H
  72. {; @4 d5 s3 o% D3 I& i& E7 y
  73.     global $worker;
    ' n* c2 Y& q0 f' R
  74.     if(isset($worker->uidConnections[$uid]))
    / q9 s0 m# K2 c) {0 V' ?8 U$ @
  75.     {
    * {" r  N2 Q( |: _3 \1 m
  76.         $connection = $worker->uidConnections[$uid];
    4 b: z+ D/ B! U4 Y
  77.         $connection->send($message);! n% v  a& U4 c! i1 U
  78.         return true;
    0 t! i1 N4 X. W- f
  79.     }
    2 {; S+ Y9 X* |4 |
  80.     return false;0 ]4 ~8 q% y- t# `( G9 ?
  81. }
    - \( w5 U3 J* g/ }2 x
  82. - l. w; E# o0 G* e7 c  f: Y! ~: ^
  83. // 运行所有的worker
    + {. `, T- g! D! N: U! C8 o
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    ! l% \  c$ F9 t* v
  2. ws.onopen = function(){- l" A& C' H7 N1 q
  3.     var uid = 'uid1';6 U3 b6 n0 r% p' f1 F7 e
  4.     ws.send(uid);
    0 J0 _" u" j/ u# W4 U  J
  5. };
    8 n2 {  O" g6 ?6 W; ^0 W! x
  6. ws.onmessage = function(e){
    6 ?, B1 E  R7 ]' @5 m
  7.     alert(e.data);
    + i- O1 \; G- y% p8 D, j
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口' x" \" j, ?6 S5 q! v4 U9 c& l
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);- b- ^5 H" I9 f3 J% [
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    + Q2 V$ v5 `! h' T8 i! [2 T; e
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');  U  D, Q5 `1 ?* a9 a: w
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符" g8 _5 K( E2 E. g+ M. K
  6. fwrite($client, json_encode($data)."\n");
    ' l* g8 N  I4 N
  7. // 读取推送结果
    5 u+ {8 r# |2 }0 |& S( S5 G
  8. echo fread($client, 8192);
复制代码
% e( T( r. ~9 m% R/ H( I2 J

' l* x5 b9 V. ?& i8 f
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-1-30 15:05 , Processed in 0.048701 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!