您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10132|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;9 N& I+ i4 m: t6 L/ C& y
  2. require_once __DIR__ . '/Workerman/Autoloader.php';9 M! a  _8 j5 l
  3. 8 e8 i2 N% I( a
  4. $worker = new Worker();+ B5 c* t' @2 e( j7 H0 v
  5. // 4个进程9 S! L" X0 S: \! v& M; @
  6. $worker->count = 4;
    . F# ~* a' c+ q/ k$ F
  7. // 每个进程启动后在当前进程新增一个Worker监听& V# }* N7 W. S6 M4 C9 I
  8. $worker->onWorkerStart = function($worker)
    / e# ?6 b% X2 A+ I' I
  9. {
    # }2 j0 |; R) A% c7 C+ f
  10.     /**8 s7 l) I6 R, ]7 t
  11.      * 4个进程启动的时候都创建2016端口的Worker4 T, P. v) e2 L  O8 p5 q
  12.      * 当执行到worker->listen()时会报Address already in use错误
    . q! b3 |4 w! R* ~
  13.      * 如果worker->count=1则不会报错
    6 Q$ Z# e' h0 e" C4 U2 f
  14.      */
    5 L; z, u2 j+ T5 }6 f  }
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');7 x( {( V8 m: i) W& a
  16.     $inner_worker->onMessage = 'on_message';
    5 T& k7 r$ }! d# T/ w  E
  17.     // 执行监听。这里会报Address already in use错误
    : M, G2 C4 G+ L: B, X
  18.     $inner_worker->listen();9 U# X' t1 R# V
  19. };% S2 K, L* G$ m
  20. # ?1 ^9 E9 z) @. B4 R
  21. $worker->onMessage = 'on_message';
    4 B7 G# A  A4 u3 T7 X- Z3 w

  22. 7 r  L7 j  o9 D# O
  23. function on_message($connection, $data)
    $ E& k% P/ M; q4 w
  24. {4 a* e' \( c3 |: j, w8 y6 j' Y* J
  25.     $connection->send("hello\n");
    9 U4 d; o& ]" s1 X8 g! p
  26. }$ L2 o! v# ^2 ~. M% q
  27. & l0 y6 \1 w4 |) X( @
  28. // 运行worker$ c; Z) J* r7 B7 B3 c
  29. Worker::runAll();" e2 x2 f. l$ S+ ~5 g
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:! W7 j3 Y7 O% a- ^
  31. 5 ^' m6 j! \) w* g5 B  {
  32. use Workerman\Worker;$ ]& ?1 y/ J' L
  33. require_once './Workerman/Autoloader.php';, s& H* {; Z7 q  e6 x- G$ S

  34. $ B7 {3 f/ y+ j; e. D# c3 b7 G2 A) A
  35. $worker = new Worker('text://0.0.0.0:2015');
    / k9 ?; ?$ G# F" I+ R
  36. // 4个进程( q. O0 o4 z% W- X7 F+ C6 ]
  37. $worker->count = 4;7 e9 _% A! v' @6 |- n: U& g+ m
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ) I  t& [; ]8 Y# U
  39. $worker->onWorkerStart = function($worker)" F! X7 b# F% H& `
  40. {
    0 h- w; L, f' Y& M) g" p4 Y3 h
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');5 |) ]0 G& x+ E# u1 p& H% v8 p
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)0 t( T+ g! o8 z( G- H6 V% [
  43.     $inner_worker->reusePort = true;! H$ Y1 L7 X3 t/ W/ `
  44.     $inner_worker->onMessage = 'on_message';$ K( A' B% L& O
  45.     // 执行监听。正常监听不会报错
    ; n; a* R! D3 ~. I" _. v
  46.     $inner_worker->listen();
    / _5 U6 g" C0 q( K( ]
  47. };. Q  i0 e( T; e4 Q

  48. , Z% [5 L7 y' F) j! Q+ e9 t( |
  49. $worker->onMessage = 'on_message';
    3 x8 X; T! g# n: a3 ?6 c0 Q+ a
  50. 9 r: v. Y! s+ D9 y
  51. function on_message($connection, $data)' R2 h. y; k* Z1 K( y2 h  k
  52. {; P6 f: }/ b7 X) ^- {' R
  53.     $connection->send("hello\n");5 q" c/ r; K7 d- S
  54. }1 ~3 c' W8 Z! ^( q! n3 Z
  55. - V; \0 ^" |. h9 M
  56. // 运行worker7 s- E5 D- `8 w
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    3 Q5 m0 y% [6 D& u, n
  2. use Workerman\Worker;2 D4 `3 X+ H$ U1 p0 q
  3. require_once './Workerman/Autoloader.php';2 u* x5 n+ ^# L: R; h* R' z2 z
  4. // 初始化一个worker容器,监听1234端口7 {! S% H# W4 a( w! W$ X
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    6 l  i. C0 Z# r# v# k& H& `
  6. + o, s$ \8 T1 X6 E4 m3 @7 ~
  7. /*
    / P2 E/ y6 l1 z8 G4 d1 j# I  f; i" g6 W3 z
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误. r, z8 c- ^2 v. }: H
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    4 i9 _% X2 u6 x" `8 t/ A
  10. */
    , y/ Q+ L! {+ y4 U" Q5 {8 Y
  11. $worker->count = 1;
    0 X  d& r" I, O, Y
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口- f7 d/ o5 Y2 [
  13. $worker->onWorkerStart = function($worker)
    ' V& E! W* P: x9 V; X
  14. {( M2 X3 v/ B& \9 {5 x1 U
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    ! T# C- d$ j# y$ r$ j% h# p
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    - o! d) e% n( a- t+ e$ X# @1 j1 a
  17.     $inner_text_worker->onMessage = function($connection, $buffer)6 |* A. I  ], t( H2 Z5 s+ g1 x* m/ }
  18.     {4 R/ i$ O4 A. |2 F, ~
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据  y  y) y" v% }  R% [
  20.         $data = json_decode($buffer, true);
    " w$ ~+ F$ R9 N6 [; R5 Z
  21.         $uid = $data['uid'];
    ( }( U& v# [2 L
  22.         // 通过workerman,向uid的页面推送数据. ?' _' E1 f  ?7 G, [
  23.         $ret = sendMessageByUid($uid, $buffer);
    8 @5 J+ u. t; T, E! s6 C
  24.         // 返回推送结果7 Z) W3 Z) W- R' x$ R0 u  v3 l
  25.         $connection->send($ret ? 'ok' : 'fail');$ |' d8 h, {8 ], f
  26.     };) n) E, e" S9 m5 }( A
  27.     // ## 执行监听 ##9 `5 v# {, p$ n$ ^# e; p
  28.     $inner_text_worker->listen();8 U9 W- {4 Q! S1 `/ c
  29. };
    4 p8 z3 C' z9 t  R3 j- `, }
  30. // 新增加一个属性,用来保存uid到connection的映射: M2 M$ d9 j" L1 E' S- O! }
  31. $worker->uidConnections = array();: x$ ^. `* }5 @( C: z
  32. // 当有客户端发来消息时执行的回调函数3 |0 ]" I; R. v
  33. $worker->onMessage = function($connection, $data)2 w4 _: f$ y" `. H, \9 s0 Z- V6 s
  34. {
    / P+ [8 _0 W5 `
  35.     global $worker;
    , N9 j) c* R' t% H, a, f
  36.     // 判断当前客户端是否已经验证,既是否设置了uid" M: S% \  m! s" D- F
  37.     if(!isset($connection->uid))
    9 R/ D& `4 a4 I) \
  38.     {
    7 t9 M$ R" `: E; j  _2 b% `5 X
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)# ]0 d/ G4 h, }% _  z) G/ a6 |
  40.        $connection->uid = $data;6 {! B; w5 I7 i- h. m2 I
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,5 S  K% ]* o0 ?- w
  42.         * 实现针对特定uid推送数据& P4 s! }4 u5 r
  43.         */) {0 v! n4 ]( `/ |
  44.        $worker->uidConnections[$connection->uid] = $connection;4 O$ u; t, e3 \+ X. e9 g
  45.        return;  P. A  h' X) u  F  n$ J
  46.     }
    / `5 v: I. k9 x7 s2 {; [5 V
  47. };
    * ^6 u. y% V# Y- w$ U
  48. , J5 e: n6 x! p( A- R1 F5 k/ U
  49. // 当有客户端连接断开时5 g8 G6 B. m7 k, p- {6 C
  50. $worker->onClose = function($connection)
    0 z0 Y! |7 P! f- i2 F8 B7 ~1 ]
  51. {
    - _# \) M% u% p) [, ]) G' w- L, n
  52.     global $worker;$ I3 v$ g- s; ^( ~8 V! }
  53.     if(isset($connection->uid))
    % ~4 A5 n0 Y* d. s
  54.     {
    ; s) E# M, g! D% k
  55.         // 连接断开时删除映射4 ]/ _. K3 s6 u- U. D- N% N
  56.         unset($worker->uidConnections[$connection->uid]);
    5 W& |: M. D. Z( o* @6 l; ^
  57.     }8 H$ c: x+ P8 c; t" n0 v
  58. };
    7 N! J! @* L; ]3 h

  59. 3 G8 ?" P( U( V3 U
  60. // 向所有验证的用户推送数据  L0 v6 M0 m+ T
  61. function broadcast($message)" C3 e( b! ?% h
  62. {, c0 }+ ^' s- y7 s& X
  63.    global $worker;9 R( X: m: F; D0 W' p2 U! T+ N. u
  64.    foreach($worker->uidConnections as $connection)
    $ S8 \# K% d  L
  65.    {
    / P5 q  {& g% M; G9 d
  66.         $connection->send($message);5 I: e0 \- F7 G0 g0 ~6 |
  67.    }& g. `& B" x6 e5 ]  t/ Q5 P
  68. }) B0 J1 Y# r. |' R

  69. # w3 H9 |+ M8 z
  70. // 针对uid推送数据
    ( H4 |7 s3 J1 ]& E  I
  71. function sendMessageByUid($uid, $message)
      n: w; x$ V1 O8 H' y' \, y
  72. {
    ' _$ n- @7 V: d' J6 j; g, }
  73.     global $worker;! C6 n8 x) z; S( a& ]  ?
  74.     if(isset($worker->uidConnections[$uid]))& i/ C- Q, d6 a( |+ i
  75.     {
    6 M- b: i) N3 k1 ~
  76.         $connection = $worker->uidConnections[$uid];+ R* F) g* Q, [/ |
  77.         $connection->send($message);& H% Y% g' g6 k5 ?+ M1 \/ Y, O1 }/ m
  78.         return true;7 U' z. f- F. h1 E
  79.     }
    ( x4 c; U/ D  Z$ \5 [
  80.     return false;: Y+ M. l- x4 r% W% V! l6 `8 j
  81. }0 }9 ^( X' n2 p$ h: H2 b
  82. / y6 A& \# y% p: X/ e
  83. // 运行所有的worker
    5 ~. v, h4 O% H0 |+ J# Y
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    # d) r- f* h& a6 T
  2. ws.onopen = function(){
    ; \. }6 u4 Z5 u- \& N
  3.     var uid = 'uid1';
    . T/ P, K. M& r7 q
  4.     ws.send(uid);8 u2 G6 U2 w# `% P, L
  5. };! Z( k+ k  |# Q6 ~% f7 V
  6. ws.onmessage = function(e){1 _  M9 ]& Q" U+ |4 B+ f9 Y
  7.     alert(e.data);) w9 u' P$ J' l+ y$ i( L1 {
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口( z2 q3 t2 J2 k( i% Z. q, A/ X
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);* x' K# d7 C* ~( Y6 P, h3 ^
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    3 c1 t3 G# W5 U& a7 w
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');$ D+ Z+ A4 D$ u
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ' o: T6 q. |: v5 ?! ^
  6. fwrite($client, json_encode($data)."\n");
    $ [( W+ z, [0 J6 t$ V- K# q" ]0 d
  7. // 读取推送结果% K- x$ ^+ X& I' Z
  8. echo fread($client, 8192);
复制代码

/ Z# X* k2 h9 k1 N0 B/ `% y3 q/ m3 R1 g
( c' p2 F+ V. h. E* i- j
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-4-24 09:43 , Processed in 0.115647 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!