您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14659|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;% ^3 h4 u, Q+ W4 v5 V
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    4 Y. C7 E: _) J0 \; T4 u
  3. 1 w- f1 w. ]! e
  4. $worker = new Worker();
    # T+ \) s# Z( t
  5. // 4个进程3 r7 K) x/ f) O' u  o6 F& F
  6. $worker->count = 4;# a2 U8 ?$ `# {) J1 f0 T
  7. // 每个进程启动后在当前进程新增一个Worker监听6 m  ^- p% K0 o6 }, [  _* r
  8. $worker->onWorkerStart = function($worker)' z) i4 ~5 L* w3 r* I% g6 H
  9. {! E: h- x( C. |/ B. g- Y
  10.     /**
    2 x/ N7 p/ h) t/ J
  11.      * 4个进程启动的时候都创建2016端口的Worker
    5 F, d& _5 h2 C8 v, D) V
  12.      * 当执行到worker->listen()时会报Address already in use错误5 T6 l4 S$ H$ |  k% w  d- Y) j
  13.      * 如果worker->count=1则不会报错+ I. `! O$ l9 Y
  14.      */
    0 O& q  x8 M1 |; o* K6 W
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');- ?7 [7 F" H  p2 R: B0 }
  16.     $inner_worker->onMessage = 'on_message';& D/ J# h8 q6 g% `& t: `1 g
  17.     // 执行监听。这里会报Address already in use错误
    : u' _3 Y& o, e- z# K
  18.     $inner_worker->listen();2 z; }9 {' q0 p% A/ k
  19. };$ _6 L7 @" ~4 y5 S& |+ i
  20. * M& H, H* O' ]& r7 O9 W+ Z- r
  21. $worker->onMessage = 'on_message';
    2 V; V9 E3 U& c% m! C$ x

  22. # G, z' J+ N8 M/ E" l: {% G
  23. function on_message($connection, $data). h+ [" ~" B' p) O
  24. {9 ?& R4 X6 a" |! n; @8 K' [
  25.     $connection->send("hello\n");
    6 Q% T1 Q6 b$ D
  26. }
    8 c2 \, z0 ?: J' X3 p; g$ s! y

  27. + U. [3 W- ^+ i; d6 i3 [' d& `
  28. // 运行worker9 u0 Q# k+ z: T3 W# R, d$ o9 r
  29. Worker::runAll();
    ; a! M2 Z- ]6 n7 i
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:  b* V" [' Q1 L+ [: _" R+ @* d
  31. 1 Z+ J( H6 z, N0 u1 n
  32. use Workerman\Worker;
    . T, R& J) ]4 x
  33. require_once './Workerman/Autoloader.php';' X  P3 h! q* J  w& v

  34. % D1 k3 m4 E: a* s* ^. c/ p& Q
  35. $worker = new Worker('text://0.0.0.0:2015');0 f6 f) s3 w/ [% s, y; B5 o& T
  36. // 4个进程' g7 {" Y, u) W8 Q2 g$ l
  37. $worker->count = 4;# ~$ p/ P9 R# w# W
  38. // 每个进程启动后在当前进程新增一个Worker监听
    3 k/ f1 N+ h& w( N1 \( B  J
  39. $worker->onWorkerStart = function($worker)$ G( U, D  f" \0 A
  40. {
    , l% O6 ?3 `3 I* l, ^. o7 ?7 B" b
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');; ]6 L; s+ h7 D9 ^+ v. i
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)9 V; ^+ a, d* o* Z! k# W0 Y$ z
  43.     $inner_worker->reusePort = true;
    8 d% V1 j6 V/ J' }2 \
  44.     $inner_worker->onMessage = 'on_message';/ e# s4 f5 U6 ?" P6 T8 x
  45.     // 执行监听。正常监听不会报错
    % y% K: p6 R: T1 f
  46.     $inner_worker->listen();& @2 J9 _! S3 B. ~' q: c, Y
  47. };
    2 y0 _5 k* B; C4 f

  48. ) u' }/ u0 W: ]1 a% {1 ~" P
  49. $worker->onMessage = 'on_message';. f* G. v) |" M! d* R6 A8 N7 b, E

  50. & [1 ]- f8 Q7 n  {- T6 y
  51. function on_message($connection, $data)
    0 N2 N" a$ S: X0 j5 ?- V9 ~0 h
  52. {5 m* H( n, [8 A/ `
  53.     $connection->send("hello\n");
    ) t. P( C/ o( C. N! ^
  54. }
      E; v- n  i5 L* a, ]

  55. : c% A3 p. b: b
  56. // 运行worker* I- F  G; N6 P
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    1 t4 h3 ]2 [0 H/ q
  2. use Workerman\Worker;" B  W4 D8 d1 G9 c
  3. require_once './Workerman/Autoloader.php';
    ! t8 k& i( J  k% D6 f$ }# @. m" a
  4. // 初始化一个worker容器,监听1234端口' J* u! h# o, _  \
  5. $worker = new Worker('websocket://0.0.0.0:1234');, F6 i: V8 f# i- X: p4 t4 F7 {
  6. 6 S% F5 b  F0 O! f
  7. /*
    " G: @( r4 I! h
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    8 S& f- c8 `2 k- P/ `6 x! P5 W% K: C
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)6 p5 |. _3 d* p' f9 ]4 Y
  10. */
    $ c" R* V( o; ~
  11. $worker->count = 1;
      Z0 H6 I) }9 I$ r9 u
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    3 ^3 Z% h* \! z3 I9 G
  13. $worker->onWorkerStart = function($worker)2 R# K" L2 }! ?7 X: Y7 }
  14. {/ I( R. P9 v7 `4 Q: ^
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符$ z* S( ]1 v3 Q1 ^: P
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    9 c: y0 q/ Y$ H7 a; ?
  17.     $inner_text_worker->onMessage = function($connection, $buffer), `0 j% y7 d2 \4 o
  18.     {# }* E; d5 r8 l" W3 I2 i
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    8 s/ ]# k3 Y, v  r/ y+ T
  20.         $data = json_decode($buffer, true);7 Z. R6 {8 a7 h' V; F
  21.         $uid = $data['uid'];
    2 {0 D/ W- i8 n8 L2 L& t( m' a$ I2 D
  22.         // 通过workerman,向uid的页面推送数据- @2 y9 V  f6 z$ r  v
  23.         $ret = sendMessageByUid($uid, $buffer);
    & e# s' g+ G8 _( h: Y) W4 o
  24.         // 返回推送结果
    , r' h$ C- I7 d: v! g" Q  h
  25.         $connection->send($ret ? 'ok' : 'fail');
      ?7 Z, }7 u; f! k2 j" r: N
  26.     };" W( f" N3 E1 W* N+ L3 i& V
  27.     // ## 执行监听 ##/ c  I, \- \8 a: W8 t) y3 B' i% G
  28.     $inner_text_worker->listen();
    : o7 n3 O7 ^4 t8 ~1 e
  29. };
    , W$ `' d3 [7 i9 _3 T8 K$ y
  30. // 新增加一个属性,用来保存uid到connection的映射* f! Z0 f7 `' x; N
  31. $worker->uidConnections = array();
    4 t6 H8 ^0 c6 ]- S* E4 u
  32. // 当有客户端发来消息时执行的回调函数
    & \. v  i; D% K' F4 C
  33. $worker->onMessage = function($connection, $data)
    - p9 f9 g2 g$ O/ \* N: y" o8 U
  34. {& f  Y  `6 e# B; a# M, e" j2 w0 z
  35.     global $worker;5 Q6 [/ T+ X1 O: D
  36.     // 判断当前客户端是否已经验证,既是否设置了uid$ |8 d$ ?" y+ r9 Y
  37.     if(!isset($connection->uid))
    - M! }( V" N" v; n9 p
  38.     {- Y2 p; w) W/ H3 |- ?; n4 y
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    , l# z# C2 B. i1 D' m3 Z4 k
  40.        $connection->uid = $data;
    7 ]7 e) s" ]. f1 {  P3 ~% K* {
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,3 b1 F4 [( D' Y! A/ o- d
  42.         * 实现针对特定uid推送数据
    $ ]9 C# z% j( `' p" h2 R
  43.         */
    - s; q, v( q& t& x. i9 o3 Y
  44.        $worker->uidConnections[$connection->uid] = $connection;
    5 M: m: L# S' s/ \2 i
  45.        return;
    ) k7 m7 \+ L9 _
  46.     }4 v9 u! G0 S# l. H9 U6 D2 t
  47. };: E0 S' c) U2 Z3 o

  48. 5 x4 s/ a9 ^5 Y  G- a8 g. ]
  49. // 当有客户端连接断开时
    * Q1 J2 V7 i9 S$ Q  F
  50. $worker->onClose = function($connection)
    5 [1 u5 o  ~- A8 c$ ]& p. f9 h: W
  51. {( L% i( e: c. x! d" m9 a  w
  52.     global $worker;) {2 S5 n. x1 Y# k2 J3 f, x  \2 e
  53.     if(isset($connection->uid))$ A4 E! O1 i1 f2 e& `* S( S6 n; O$ v
  54.     {
    ! w# s) O( B1 E6 o; ]% H+ x: E3 l- d6 Q
  55.         // 连接断开时删除映射
    , a& c% [7 p8 f% j+ y9 f7 O+ \8 }
  56.         unset($worker->uidConnections[$connection->uid]);
    8 e& O: y+ Z) \9 A9 x* D+ x8 j
  57.     }
    8 g7 p3 H) ]' |8 n* Q
  58. };- `( z& S  y% l$ q* R# S. N+ J

  59. 7 m" S0 p9 p8 b7 V
  60. // 向所有验证的用户推送数据, u& m* W* n& y/ R% X
  61. function broadcast($message)3 ]/ [, r% F0 T( J
  62. {8 z! b4 z3 g9 N3 i$ H; `: x8 }
  63.    global $worker;
    ' I0 R' ~" E0 W( T3 s
  64.    foreach($worker->uidConnections as $connection)8 R# q9 w# _: p; O
  65.    {2 V6 n& ]0 l! ?6 ?, l8 k5 M
  66.         $connection->send($message);, _5 T- z0 H& J/ i* N0 e5 [0 \5 b
  67.    }
    % R/ M4 T0 b+ w- z+ @4 z0 h$ L
  68. }7 ]* F8 |7 |$ Y- K, k: k4 b8 P" v
  69. : {% |3 A+ a3 x, a6 `: f' \! Z
  70. // 针对uid推送数据, D$ a8 Z7 d! m7 Z2 k
  71. function sendMessageByUid($uid, $message)8 |! Y7 J9 @2 @3 R9 d
  72. {
    ) c- J9 W# H' J7 ^, }7 K% ~
  73.     global $worker;
    2 m  V4 @+ N7 i+ r, R, V
  74.     if(isset($worker->uidConnections[$uid]))$ ~7 c- J2 t8 h
  75.     {
    3 O( l+ E7 T. d, Y! F
  76.         $connection = $worker->uidConnections[$uid];1 f3 @; w; h  Y# m) {( s
  77.         $connection->send($message);) x) L8 ?+ q2 ^+ V9 j! G& |
  78.         return true;3 J  L9 ^+ {* ~  c' [
  79.     }
    7 z8 r& [  ~9 L$ x$ B6 X' U1 b/ Z( p
  80.     return false;( i* a& b3 t- ]' Y
  81. }8 x7 P7 M" ~) C+ H* u0 I/ l4 L

  82. + c# Y* f% _. r
  83. // 运行所有的worker
    1 R" ?  P; z0 R4 c2 e
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');: E3 Q/ }+ }: {* k
  2. ws.onopen = function(){, [$ w8 f, o1 _" P! k
  3.     var uid = 'uid1';
    . |1 N( o' [; o5 i
  4.     ws.send(uid);* I1 j, U9 |* C9 H6 {
  5. };/ a! [) q+ D( n" o! w
  6. ws.onmessage = function(e){7 Q) \2 @5 S4 o( P1 q9 B
  7.     alert(e.data);
    5 l8 m" t9 U  `6 M7 U% Z$ L
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    3 m$ F5 u* `4 W, x7 [, h
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    : G/ b( w/ @# J% l
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    # {; ]' y- |3 r7 o8 b
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');2 ]" u2 c6 n; W0 w; ?5 J7 _
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
      }8 `( T" {0 R( m. [" x
  6. fwrite($client, json_encode($data)."\n");/ R6 R1 e& f; ]5 ~& v' Z5 ?% x0 s
  7. // 读取推送结果
    7 o! c% `# [5 c/ O3 I
  8. echo fread($client, 8192);
复制代码
- \% }, ?: w, T

; g0 z: i# C' a# I. X
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-1-30 11:39 , Processed in 0.066293 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!